当前位置: 首页>后端>正文

设计一个阻塞队列

阻塞队列核心思想

  • 有且仅有一个线程读、写
  • 当队列满,阻塞生产者;当队列空时,阻塞消费者

阻塞队列要解决的问题

  • 如何阻塞
  • 当队列满,消费者消费后如何唤醒生产者;当队列空,生产者生产后如何唤醒消费者

实现一个阻塞队列,阻塞过程使用ReentrantLock锁和Condition来控制。

队列结构

static class BlockQueue<T> {
    private Object queue[];
    private int front;
    private int rear;
    private int maxSize;

    final private Lock lock = new ReentrantLock();
    Condition full = lock.newCondition();
    Condition empty = lock.newCondition();

    public BlockQueue(int maxSize) {
        this.front = 0;
        this.rear = 0;
        this.maxSize = maxSize;
        this.queue = new Object[maxSize];
    }
     /**
     * 阻塞 入队方法在这
     * @param element
     */
     
    /**
     * 阻塞出队方法在这
     */
}

入队

    /**
     * 阻塞 入队
     * @param element
     */
    public void put(T element) throws InterruptedException {
        lock.lock();
        try{
            while ( (rear + 1) % maxSize == front ) {
                System.out.println("Queue is full");
                full.await();
            }
            queue[rear] = element;
            rear = (rear + 1) % maxSize;
            empty.signal();
        } finally {
            lock.unlock();
        }
    }

出队

   /**
     * 阻塞出队
     */
    public T take() throws InterruptedException{
        lock.lock();
        try{
            while( rear == front ){
                System.out.println("Queue is empty");
                empty.await();
            }
            Object element = queue[front];
            queue[front] = null;
            front = (front+1)%maxSize;
            full.signal();
            return (T) element;
        }finally {
            lock.unlock();
        }
    }

测试用例
public static void main(String[] args) throws InterruptedException {
BlockQueue<Integer> queue = new BlockQueue<Integer>(4);
queue.put(5);

    new Thread(() -> {
        try {
            System.out.println("添加");
            queue.put(11);
            queue.put(12);
            queue.put(13);
            queue.put(14);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();
    new Thread(() -> {
        try {
            System.out.println("取出");
            queue.take();
            Thread.sleep(1);
            queue.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();

}

https://www.xamrdz.com/backend/39n1940959.html

相关文章: