PriorityQueue
我们先看下PriorityQueue的继承结构
先看些接口Queue
public interface Queue<E> extends Collection<E>{
//返回true,或是抛出异常(如果队列的空间满了)
boolean add(E e);
//true 添加成功 false 队列满了
boolean offer(E e);
//从队列头删除一个element ,返回true,如果队列为空,抛出异常
E remove();
//从队列头删除一个element ,返回true,如果队列为空,返回false
E poll();
//从队列头得到一个element,但是不删除这个element,返回true,如果队列为空,抛出异常
E element();
//从队列头得到一个element,但是不删除这个element,返回true,如果队列为空,返回false
E peek();
}
AbstractQueue
public abstract class AbstractQueue<E>
extends AbstractCollection<E>
implements Queue<E> {
//可以看到add是基于offer实现的,如果调用offer返回false,那么抛出异常
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
同理 E remove() 基于 E poll()
E element() 基于 E peek()
}
优先队列的实现看一个内部变量就可以了,如下
//优先队列是个平衡的二叉堆,关于comparator比较器,可以在构造函数指定,也可也
//使用element的自然顺序,默认是个小堆,也就是在堆顶存储着最小的元素,
//关于堆排序的重建和算法不做介绍,一般来说获取前n个最大或最小值,可以使用n次堆排序来获取
/**
* Priority queue represented as a balanced binary heap: the two
* children of queue[n] are queue[2*n+1] and queue[2*(n+1)]. The
* priority queue is ordered by comparator, or by the elements'
* natural ordering, if comparator is null: For each node n in the
* heap and each descendant d of n, n <= d. The element with the
* lowest value is in queue[0], assuming the queue is nonempty.
*/
transient Object[] queue;
PriorityBlockingQueue
PriorityQueue是非线程安全的,juc提供了线程安全的优先队列PriorityBlockingQueue,相比于PriorityQueue,其主要实现了接口BlockingQueue的方法
在BlockingQueue里面有两个方法要注意下
//如果队列满了,那么一直等待到队列有空间,塞进去位置,除非caller线程被中断`
void put(E e) throws InterruptedException;
//如果队列为空,那么一直等待到队列有数据为止,直到拿到element,除非caller线程被中断`
E take() throws InterruptedException;
//提供超时时间的offer方法,如果在这个时间之后,队列还是满的,返回false,如果caller被中断,抛出中断异常
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
//提供超时时间的poll方法,如果在这个时间之后,队列还是空的,返回false,如果caller被中断,抛出中断异常
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
可以看到BlockingQueue主要声明了一些堵塞同步方法,
在PriorityBlockingQueue里面是声明了两个变量如下
//所有的public 操作都要使用的排它锁
private final ReentrantLock lock;
//队列为空的时候,将caller线程block在此condition上面
private final Condition notEmpty;
public void put(E e)
public void put(E e) {
offer(e); // never need to block
}
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
//先lock住
lock.lock();
int n, cap;
Object[] array;
//如果size的大小大于了队列的长度,那么需要grow 队列的length
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
Comparator<super E> cmp = comparator;
if (cmp == null)
siftUpComparable(n, e, array);
else
siftUpUsingComparator(n, e, array, cmp);
size = n + 1;
//唤醒在notEmpty上面等待的caller线程(应该是调用take()方法的线程)
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}
public E take()
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
//如果拿不到element,将自己堵塞到notEmpty里面去
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}
DelayQueue
延迟队列,只有当队列element里面的delay耗尽,才能消费到这个元素,延迟队列依赖PriorityQueue来实现这个功能,我们看下DelayQueue的继承结构,可以看到DelayQueue已经是个BlockingQueue
public void put(E e)
public void put(E e) {
offer(e);
}
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//添加数据的时候先lock
q.offer(e);
//如果此时只有caller线程添加的element e 那么将堵塞在available上面的线程唤醒
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
public E take()
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
//如果没有element 将自己block到available
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
//走到这里,说明是有数据,判断delay的时间是否到了,到了就返回
return q.poll();
first = null; // don't retain ref while waiting
if (leader != null)
//如果leader线程不为空,将自己block到available上面去
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
//不然的话将自己设置为leader,并将自己block delay的时间(注意只有leader线程才block delay的时间,其他的线程在available上block不设时间,这样设计的好处是保证有一个leader线程可以去唤醒其他的线程)
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
ArrayBlockingQueue
ArrayBlockingQueue是一个有界队列,内部封装了一个数组,使用takeIndex,putIndex循环使用。大小通过构造函数的capacity指定,里面的三个变量实现了经典的生产者 消费者模式
如下
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//如果队列已满,将自己block到notFull condition上面
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
//入队列,并唤醒block在notEmpty上的线程
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
LinkedBlockingQueue
如果没有指定LinkedBlockingQueue的capacity,默认是Integer.MAX_VALUE,内部封装了一个链表。注意LinkedBlockingQueue使用了两把ReentrantLock来分别控制put 和take操作,如下
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
ConcurrentLinkedQueue
ConcurrentLinkedQueue是无锁化的线程安全队列,其实所谓的无锁化就是使用自旋锁+cas来操作元素,我们值看下add方法,如果保证在多线程调用add的时候,元素可以正常的添加进去,且数据不出错,在javaDoc里面对ConcurrentLinkedQueue的一些特征做了特别的说明,如下
Iterators返回的只是在某个点或是Iterators方法调用的时候的快照数据,如果有其他的Thread并发的操作,Iterators不会抛出
- <p>Iterators are <i>weakly consistent</i>, returning elements
- reflecting the state of the queue at some point at or since the
- creation of the iterator. They do <em>not</em> throw {@link
- java.util.ConcurrentModificationException}, and may proceed concurrently
- with other operations. Elements contained in the queue since the creation
- of the iterator will be returned exactly once.
和其他的集合框架不一样,size方法不是o(1)的,而是o(n)的,如果在遍历队列的过程中修改了队列,返回的size也是不精确的
- <p>Beware that, unlike in most collections, the {@code size} method
- is <em>NOT</em> a constant-time operation. Because of the
- asynchronous nature of these queues, determining the current number
- of elements requires a traversal of the elements, and so may report
- inaccurate results if this collection is modified during traversal.
//对于一些批量的操作如addAll removeAll retainAll等操作的时候,调用iterator
//可能会漏掉一些数据 - Additionally, the bulk operations {@code addAll},
- {@code removeAll}, {@code retainAll}, {@code containsAll},
- {@code equals}, and {@code toArray} are <em>not</em> guaranteed
- to be performed atomically. For example, an iterator operating
- concurrently with an {@code addAll} operation might view only some
- of the added elements.
个人理解ConcurrentLinkedQueue保证的是出队入队的安全性,但是view类型的操作可能返回的数据不一致。
贴一片写的很好的博客
https://www.jianshu.com/p/231caf90f30b
SynchronousQueue
这个队列比较特殊
当执行put的时候,只有这个put的item被取走了,当前put的线程才返回
当执行take的时候,只有put数据进来了,当前take的线程才返回
在SynchronousQueue实现了公平和非公平的策略,公平策略使用有个队列来实现(队尾匹配,对头返回),而非公平策略使用一个栈来实现(永远在栈顶匹配和返回)
我们可以做个简单的测试代码如下
@Test
public void testSynchronousQueue() throws InterruptedException {
SynchronousQueue<Integer> queue = new SynchronousQueue<Integer>();
new Thread(
()-> {
try {
queue.put(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("success put 3");
}).start();
new Thread(
()-> {
try {
queue.put(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("success put 4");
}
).start();
new Thread(
()-> {
try {
queue.put(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("success put 5");
}
).start();
new Thread(()->{
try {
System.out.println("success take " + queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(()->{
try {
System.out.println("success take " + queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
Thread.sleep(1000000);
}
返回结果如下
success put 5
success take 5
success take 4
success put 4
可以看到 queue.put(3);一直被堵塞着,因为没有消费线程来消费这个put值。
ConcurrentLinkedDeque
@link https://www.cnblogs.com/txmfz/p/10975750.html
CopyOnWriteArrayList
在所有的add remove等update操作处使用lock锁住并使用新的array 数组替换原来的数组,代价很高
CopyOnWriteArraySet
内部封装了一个CopyOnWriteArrayList,使用其来实现相同的功能