概述
- ArrayBlockingQueue:数组结构,有界
- LinkedBlockingQueue:链表结构,有界
- PriorityBlockingQueue:带优先级,无界
- DelayQueue:基于优先级队列(PriorityQueue)的延迟队列
- SynchronousQueue:不存储元素,只传递
- LinkedTransferQueue:链表结构,无界
- LinkedBlockingDeque:链表结构,双向
ArrayBlockingQueue
构造函数
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0) // 为0没意义
throw new IllegalArgumentException();
this.items = new Object[capacity]; // 队列元素数组
lock = new ReentrantLock(fair); // 锁
notEmpty = lock.newCondition(); // 遇空条件队列
notFull = lock.newCondition(); // 遇满条件队列
}
无阻塞添加
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0; // 环形数组,有效元素个数用count来判断
count++;
notEmpty.signal(); // 唤醒一个消费者
}
阻塞添加
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
无阻塞消费
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) null : dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
阻塞消费
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
移除
public boolean remove(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
do {
if (o.equals(items[i])) {
removeAt(i);
return true;
}
if (++i == items.length)
i = 0;
} while (i != putIndex); // putIndex表示下一个添加位置,故而在其前应该终止
}
return false;
} finally {
lock.unlock();
}
}
void removeAt(final int removeIndex) {
// assert lock.getHoldCount() == 1;
// assert items[removeIndex] != null;
// assert removeIndex >= 0 && removeIndex < items.length;
final Object[] items = this.items;
if (removeIndex == takeIndex) {
// removing front item; just advance
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
} else {
// an "interior" remove
// slide over all others up through putIndex.
final int putIndex = this.putIndex;
for (int i = removeIndex;;) {
int next = i + 1;
if (next == items.length)
next = 0;
if (next != putIndex) {
items[i] = items[next];
i = next;
} else {
items[i] = null;
this.putIndex = i;
break;
}
}
count--;
if (itrs != null)
itrs.removedAt(removeIndex);
}
notFull.signal();
}
Itr
Itrs
- Shared data between iterators and their queue, allowing queue modifications to update iterators when elements are removed.
void elementDequeued() {
// assert lock.getHoldCount() == 1;
if (count == 0)
queueIsEmpty(); // 清空itrs
else if (takeIndex == 0)
takeIndexWrapped(); //
}
LinkedBlockingQueue
- 基于链表的有界阻塞队列,默认最大长度为Integer.MAX_VALUE,FIFO
构造函数
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
锁和Condition
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
- 双锁(ArrayBlockingQueue只有单锁)
- 只有非公平,未提供公平锁
阻塞添加
public void put(E e) throws InterruptedException {
if (e == null)
throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty(); // c的值还是Increment之前的值,为0表示可能有消费线程阻塞
}
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
阻塞消费
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull(); // // c的值还是Increment之前的值,为capacity表示可能有生产线程阻塞
return x;
}
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next; // 为什么这里要用dummy head? 感觉是可以简化初始化吧,构造函数里做掉,后面就不用去判null了
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
为什么ArrayBlockingQueue不用双锁
- Q:为什么ArrayBlockingQueue的put和take不能并行?
- A:因为是单锁
- Q:那改成双锁就可以并行了吗?
- A:不行,因为count是int
- Q:那改成AtomicInteger就可以了吗?
- A:还是不行,因为通知方式也要该,不能直接condition.signal(),而要先获得另外一把锁
- Q:改好了,现在可以了吗?
- A:可以了
- Q:效率有提高吗?
- A:待测试
- Q:假设没有提高,原因可能是什么?
- A:LinkedBlockingQueue需要构造节点,导致较长等待,同时存取有优化效果
- ArrayBlockingQueue无需构造节点,相对来说,加锁和解锁时间占比可能反而较大
- 转成双锁之后,对比原来操作,需要多竞争两次
- Atomic变量的CAS操作
- 获得另一把锁的condition进行通知操作
- 可能这两部分的损耗,已经超过并发存取带来的收益
PriorityBlockingQueue
构造函数
public class PriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable
public PriorityBlockingQueue(int initialCapacity, Comparator<super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.comparator = comparator;
this.queue = new Object[initialCapacity]; // initialCapacity只是初始容量,并不用来做限制,所以是无界
}
- 只支持非公平锁
- 数组初始容量11(为什么是11?)
添加(因为可以扩容,所以是无界)
// add和put都是调用offer
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] array;
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
Comparator<super E> cmp = comparator;
if (cmp == null)
siftUpComparable(n, e, array); // comparator为空时需要元素自己实现Comparable接口;看内部逻辑是最小堆
else
siftUpUsingComparator(n, e, array, cmp);
size = n + 1;
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}
// 扩容
private void tryGrow(Object[] array, int oldCap) {
lock.unlock(); // must release and then re-acquire main lock
Object[] newArray = null;
if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) {
try {
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small
(oldCap >> 1));
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow,MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {
allocationSpinLock = 0;
}
}
if (newArray == null) // back off if another thread is allocating
Thread.yield();
lock.lock();
if (newArray != null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
无阻塞消费
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
int n = size - 1;
if (n < 0)
return null;
else {
Object[] array = queue;
E result = (E) array[0]; // 返回头部
E x = (E) array[n]; // 将尾部元素挪到头部进行下沉
array[n] = null;
Comparator<super E> cmp = comparator;
if (cmp == null)
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
size = n;
return result;
}
}
阻塞消费
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
while ((result = dequeue()) == null)
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}
DelayQueue
构造函数
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E>
public interface Delayed extends Comparable<Delayed>
添加
public boolean offer(E e) {
final ReentrantLock lock = this.lock; // 非公平
lock.lock();
try {
q.offer(e); // private final PriorityQueue<E> q = new PriorityQueue<E>();
if (q.peek() == e) {
leader = null; // private Thread leader = null;
available.signal(); // private final Condition available = lock.newCondition();
}
return true;
} finally {
lock.unlock();
}
}
// PriorityQueue
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
modCount++;
int i = size;
if (i >= queue.length)
grow(i + 1);
size = i + 1;
if (i == 0)
queue[0] = e;
else
siftUp(i, e);
return true;
}
取值
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return q.poll();
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
- 如果first为null,说明队列已空,直接await
- 如果delay <= 0,那么直接拿走顶端值
- 如果leader不为空,说明自己不是第一个线程,进入await
- 如果leader是自己,那么等到delay结束就可以拿值了
SynchronousQueue
- 不存储元素的阻塞队列
- 负责把生产者线程处理的数据直接传递给消费者线程
- put操作必须等待take操作
- 吞吐量高于LinkedBlockingQueue和ArrayBlockingQueue
添加
public void put(E e) throws InterruptedException {
if (e == null)
throw new NullPointerException();
if (transferer.transfer(e, false, 0) == null) {
Thread.interrupted();
throw new InterruptedException();
}
}
LinkedTransferQueue
transfer
- 如果当前有消费者正在等待接收元素,transfer方法可以把生产者传入的元素立刻transfer给消费者
- 如果没有消费者在等待接收元素,transfer方法会将元素存放在队列的tail节点,并等到该元素被消费者消费了才返回
- transfer方法关键代码
Node pred = tryAppend(s, haveData);
return awaitMatch(s, pred, e, (how == TIMED), nanos);
- 第一行代码:试图把存放当前元素的s节点作为tail节点
- 第二行代码:让CPU自旋等待消费者消费元素
tryTransfer
- 试探生产者传入的元素是否能直接传给消费者
- 如果没有消费者等待接收元素,返回false
- 和transfer的区别是无论消费者是否接收,方法立即返回
LinkedBlockingDeque
概述
- 双端放值,双端取值,可以分担并发压力
- 可以运用在“工作窃取模式”???
添加
public void putFirst(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
while (!linkFirst(node))
notFull.await();
} finally {
lock.unlock();
}
}
public void putLast(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
while (!linkLast(node))
notFull.await();
} finally {
lock.unlock();
}
}
取值
public E takeFirst() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
while ( (x = unlinkFirst()) == null)
notEmpty.await();
return x;
} finally {
lock.unlock();
}
}
public E takeLast() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
while ( (x = unlinkLast()) == null)
notEmpty.await();
return x;
} finally {
lock.unlock();
}
}