阻塞队列核心思想
- 有且仅有一个线程读、写
- 当队列满,阻塞生产者;当队列空时,阻塞消费者
阻塞队列要解决的问题
- 如何阻塞
- 当队列满,消费者消费后如何唤醒生产者;当队列空,生产者生产后如何唤醒消费者
实现一个阻塞队列,阻塞过程使用ReentrantLock锁和Condition来控制。
队列结构
static class BlockQueue<T> {
private Object queue[];
private int front;
private int rear;
private int maxSize;
final private Lock lock = new ReentrantLock();
Condition full = lock.newCondition();
Condition empty = lock.newCondition();
public BlockQueue(int maxSize) {
this.front = 0;
this.rear = 0;
this.maxSize = maxSize;
this.queue = new Object[maxSize];
}
/**
* 阻塞 入队方法在这
* @param element
*/
/**
* 阻塞出队方法在这
*/
}
入队
/**
* 阻塞 入队
* @param element
*/
public void put(T element) throws InterruptedException {
lock.lock();
try{
while ( (rear + 1) % maxSize == front ) {
System.out.println("Queue is full");
full.await();
}
queue[rear] = element;
rear = (rear + 1) % maxSize;
empty.signal();
} finally {
lock.unlock();
}
}
出队
/**
* 阻塞出队
*/
public T take() throws InterruptedException{
lock.lock();
try{
while( rear == front ){
System.out.println("Queue is empty");
empty.await();
}
Object element = queue[front];
queue[front] = null;
front = (front+1)%maxSize;
full.signal();
return (T) element;
}finally {
lock.unlock();
}
}
测试用例
public static void main(String[] args) throws InterruptedException {
BlockQueue<Integer> queue = new BlockQueue<Integer>(4);
queue.put(5);
new Thread(() -> {
try {
System.out.println("添加");
queue.put(11);
queue.put(12);
queue.put(13);
queue.put(14);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
System.out.println("取出");
queue.take();
Thread.sleep(1);
queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}