当前位置: 首页>后端>正文

JUC-队列源码解析

PriorityQueue

我们先看下PriorityQueue的继承结构


JUC-队列源码解析,第1张

先看些接口Queue

public interface Queue<E> extends Collection<E>{
//返回true,或是抛出异常(如果队列的空间满了)
boolean add(E e);
//true 添加成功  false 队列满了
 boolean offer(E e);
//从队列头删除一个element ,返回true,如果队列为空,抛出异常
 E remove();
//从队列头删除一个element ,返回true,如果队列为空,返回false
 E poll();
//从队列头得到一个element,但是不删除这个element,返回true,如果队列为空,抛出异常
E element();
//从队列头得到一个element,但是不删除这个element,返回true,如果队列为空,返回false
E peek();

}

AbstractQueue

public abstract class AbstractQueue<E>
    extends AbstractCollection<E>
    implements Queue<E> {
//可以看到add是基于offer实现的,如果调用offer返回false,那么抛出异常
public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }

同理  E remove() 基于  E poll()
          E element() 基于 E peek()
}

优先队列的实现看一个内部变量就可以了,如下

   //优先队列是个平衡的二叉堆,关于comparator比较器,可以在构造函数指定,也可也
  //使用element的自然顺序,默认是个小堆,也就是在堆顶存储着最小的元素,
  //关于堆排序的重建和算法不做介绍,一般来说获取前n个最大或最小值,可以使用n次堆排序来获取
    /**
     * Priority queue represented as a balanced binary heap: the two
     * children of queue[n] are queue[2*n+1] and queue[2*(n+1)].  The
     * priority queue is ordered by comparator, or by the elements'
     * natural ordering, if comparator is null: For each node n in the
     * heap and each descendant d of n, n <= d.  The element with the
     * lowest value is in queue[0], assuming the queue is nonempty.
     */
    transient Object[] queue;

PriorityBlockingQueue

PriorityQueue是非线程安全的,juc提供了线程安全的优先队列PriorityBlockingQueue,相比于PriorityQueue,其主要实现了接口BlockingQueue的方法


JUC-队列源码解析,第2张

在BlockingQueue里面有两个方法要注意下

//如果队列满了,那么一直等待到队列有空间,塞进去位置,除非caller线程被中断`
void put(E e) throws InterruptedException;
//如果队列为空,那么一直等待到队列有数据为止,直到拿到element,除非caller线程被中断`
E take() throws InterruptedException;
//提供超时时间的offer方法,如果在这个时间之后,队列还是满的,返回false,如果caller被中断,抛出中断异常
boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;
//提供超时时间的poll方法,如果在这个时间之后,队列还是空的,返回false,如果caller被中断,抛出中断异常
E poll(long timeout, TimeUnit unit)
        throws InterruptedException;

可以看到BlockingQueue主要声明了一些堵塞同步方法,
在PriorityBlockingQueue里面是声明了两个变量如下

//所有的public 操作都要使用的排它锁
private final ReentrantLock lock;
//队列为空的时候,将caller线程block在此condition上面
 private final Condition notEmpty;

public void put(E e)

    public void put(E e) {
        offer(e); // never need to block
    }

public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        //先lock住
        lock.lock();
        int n, cap;
        Object[] array;
       //如果size的大小大于了队列的长度,那么需要grow 队列的length
        while ((n = size) >= (cap = (array = queue).length))
            tryGrow(array, cap);
        try {
            Comparator<super E> cmp = comparator;
            if (cmp == null)
                siftUpComparable(n, e, array);
            else
                siftUpUsingComparator(n, e, array, cmp);
            size = n + 1;
            //唤醒在notEmpty上面等待的caller线程(应该是调用take()方法的线程)
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
        return true;
    }

public E take()

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        E result;
        try {
           //如果拿不到element,将自己堵塞到notEmpty里面去
            while ( (result = dequeue()) == null)
                notEmpty.await();
        } finally {
            lock.unlock();
        }
        return result;
    }

DelayQueue

延迟队列,只有当队列element里面的delay耗尽,才能消费到这个元素,延迟队列依赖PriorityQueue来实现这个功能,我们看下DelayQueue的继承结构,可以看到DelayQueue已经是个BlockingQueue


JUC-队列源码解析,第3张

public void put(E e)

public void put(E e) {
        offer(e);
    }

    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
           //添加数据的时候先lock
            q.offer(e);
           //如果此时只有caller线程添加的element e 那么将堵塞在available上面的线程唤醒
            if (q.peek() == e) {
                leader = null;
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }

public E take()

 public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
               //如果没有element 将自己block到available
                if (first == null)
                    available.await();
                else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)
                       //走到这里,说明是有数据,判断delay的时间是否到了,到了就返回
                        return q.poll();
                    first = null; // don't retain ref while waiting
                    if (leader != null)
                        //如果leader线程不为空,将自己block到available上面去
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                             //不然的话将自己设置为leader,并将自己block delay的时间(注意只有leader线程才block delay的时间,其他的线程在available上block不设时间,这样设计的好处是保证有一个leader线程可以去唤醒其他的线程)
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }

ArrayBlockingQueue

ArrayBlockingQueue是一个有界队列,内部封装了一个数组,使用takeIndex,putIndex循环使用。大小通过构造函数的capacity指定,里面的三个变量实现了经典的生产者 消费者模式
如下

    /** Main lock guarding all access */
    final ReentrantLock lock;

    /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;

 public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            //如果队列已满,将自己block到notFull condition上面
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

   //入队列,并唤醒block在notEmpty上的线程
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }

LinkedBlockingQueue

如果没有指定LinkedBlockingQueue的capacity,默认是Integer.MAX_VALUE,内部封装了一个链表。注意LinkedBlockingQueue使用了两把ReentrantLock来分别控制put 和take操作,如下

    /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();

ConcurrentLinkedQueue

ConcurrentLinkedQueue是无锁化的线程安全队列,其实所谓的无锁化就是使用自旋锁+cas来操作元素,我们值看下add方法,如果保证在多线程调用add的时候,元素可以正常的添加进去,且数据不出错,在javaDoc里面对ConcurrentLinkedQueue的一些特征做了特别的说明,如下
Iterators返回的只是在某个点或是Iterators方法调用的时候的快照数据,如果有其他的Thread并发的操作,Iterators不会抛出

  • <p>Iterators are <i>weakly consistent</i>, returning elements
  • reflecting the state of the queue at some point at or since the
  • creation of the iterator. They do <em>not</em> throw {@link
  • java.util.ConcurrentModificationException}, and may proceed concurrently
  • with other operations. Elements contained in the queue since the creation
  • of the iterator will be returned exactly once.

和其他的集合框架不一样,size方法不是o(1)的,而是o(n)的,如果在遍历队列的过程中修改了队列,返回的size也是不精确的

  • <p>Beware that, unlike in most collections, the {@code size} method
  • is <em>NOT</em> a constant-time operation. Because of the
  • asynchronous nature of these queues, determining the current number
  • of elements requires a traversal of the elements, and so may report
  • inaccurate results if this collection is modified during traversal.
    //对于一些批量的操作如addAll removeAll retainAll等操作的时候,调用iterator
    //可能会漏掉一些数据
  • Additionally, the bulk operations {@code addAll},
  • {@code removeAll}, {@code retainAll}, {@code containsAll},
  • {@code equals}, and {@code toArray} are <em>not</em> guaranteed
  • to be performed atomically. For example, an iterator operating
  • concurrently with an {@code addAll} operation might view only some
  • of the added elements.
    个人理解ConcurrentLinkedQueue保证的是出队入队的安全性,但是view类型的操作可能返回的数据不一致。
    贴一片写的很好的博客
    https://www.jianshu.com/p/231caf90f30b

SynchronousQueue

这个队列比较特殊
当执行put的时候,只有这个put的item被取走了,当前put的线程才返回
当执行take的时候,只有put数据进来了,当前take的线程才返回
在SynchronousQueue实现了公平和非公平的策略,公平策略使用有个队列来实现(队尾匹配,对头返回),而非公平策略使用一个栈来实现(永远在栈顶匹配和返回)
我们可以做个简单的测试代码如下

    @Test
    public void testSynchronousQueue() throws InterruptedException {

        SynchronousQueue<Integer> queue = new SynchronousQueue<Integer>();
        new Thread(
                ()-> {

                    try {
                        queue.put(3);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("success put 3");

        }).start();

        new Thread(
                ()-> {

                    try {
                        queue.put(4);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("success put 4");


            }
        ).start();
        new Thread(
                ()-> {

                    try {
                        queue.put(5);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("success put 5");


                }
        ).start();

        new Thread(()->{

            try {
                System.out.println("success take " + queue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        new Thread(()->{

            try {
                System.out.println("success take " + queue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }).start();




        Thread.sleep(1000000);

        
    }

返回结果如下

success put 5
success take 5
success take 4
success put 4

可以看到 queue.put(3);一直被堵塞着,因为没有消费线程来消费这个put值。

ConcurrentLinkedDeque

@link https://www.cnblogs.com/txmfz/p/10975750.html

CopyOnWriteArrayList

在所有的add remove等update操作处使用lock锁住并使用新的array 数组替换原来的数组,代价很高

CopyOnWriteArraySet

内部封装了一个CopyOnWriteArrayList,使用其来实现相同的功能


https://www.xamrdz.com/backend/3d21936846.html

相关文章: