当前位置: 首页>后端>正文

java多线程-9-BlockingQueue

概述

  • ArrayBlockingQueue:数组结构,有界
  • LinkedBlockingQueue:链表结构,有界
  • PriorityBlockingQueue:带优先级,无界
  • DelayQueue:基于优先级队列(PriorityQueue)的延迟队列
  • SynchronousQueue:不存储元素,只传递
  • LinkedTransferQueue:链表结构,无界
  • LinkedBlockingDeque:链表结构,双向

ArrayBlockingQueue

  • 基于数组的不可变容量有界队列,FIFO

构造函数

public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)                          // 为0没意义
        throw new IllegalArgumentException();
    this.items = new Object[capacity];          // 队列元素数组
    lock = new ReentrantLock(fair);             // 锁
    notEmpty = lock.newCondition();             // 遇空条件队列
    notFull =  lock.newCondition();             // 遇满条件队列
}

无阻塞添加

public boolean offer(E e) {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count == items.length)
            return false;
        else {
            enqueue(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}

private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;  // 环形数组,有效元素个数用count来判断
    count++;
    notEmpty.signal(); // 唤醒一个消费者
}

阻塞添加

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

无阻塞消费

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return (count == 0) null : dequeue();
    } finally {
        lock.unlock();
    }
}

private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();
    return x;
}

阻塞消费

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

移除

public boolean remove(Object o) {
    if (o == null) return false;
    final Object[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count > 0) {
            final int putIndex = this.putIndex;
            int i = takeIndex;
            do {
                if (o.equals(items[i])) {
                    removeAt(i);
                    return true;
                }
                if (++i == items.length)
                    i = 0;
            } while (i != putIndex); // putIndex表示下一个添加位置,故而在其前应该终止
        }
        return false;
    } finally {
        lock.unlock();
    }
}

void removeAt(final int removeIndex) {
    // assert lock.getHoldCount() == 1;
    // assert items[removeIndex] != null;
    // assert removeIndex >= 0 && removeIndex < items.length;
    final Object[] items = this.items;
    if (removeIndex == takeIndex) {
        // removing front item; just advance
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
    } else {
        // an "interior" remove
        // slide over all others up through putIndex.
        final int putIndex = this.putIndex;
        for (int i = removeIndex;;) {
            int next = i + 1;
            if (next == items.length)
                next = 0;
            if (next != putIndex) {
                items[i] = items[next];
                i = next;
            } else {
                items[i] = null;
                this.putIndex = i;
                break;
            }
        }
        count--;
        if (itrs != null)
            itrs.removedAt(removeIndex);
    }
    notFull.signal();
}

Itr

  • 迭代器
Itrs
  • Shared data between iterators and their queue, allowing queue modifications to update iterators when elements are removed.
void elementDequeued() {
    // assert lock.getHoldCount() == 1;
    if (count == 0)
        queueIsEmpty();        // 清空itrs
    else if (takeIndex == 0)
        takeIndexWrapped();    // 
}

LinkedBlockingQueue

  • 基于链表的有界阻塞队列,默认最大长度为Integer.MAX_VALUE,FIFO

构造函数

public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable

public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}
    
public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
}

锁和Condition

/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
  • 双锁(ArrayBlockingQueue只有单锁)
  • 只有非公平,未提供公平锁

阻塞添加

public void put(E e) throws InterruptedException {
    if (e == null)
        throw new NullPointerException();
    // Note: convention in all put/take/etc is to preset local var
    // holding count negative to indicate failure unless set.
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        while (count.get() == capacity) {
            notFull.await();
        }
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty(); // c的值还是Increment之前的值,为0表示可能有消费线程阻塞
}

private void enqueue(Node<E> node) {
    // assert putLock.isHeldByCurrentThread();
    // assert last.next == null;
    last = last.next = node;
}

private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}

阻塞消费

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull(); // // c的值还是Increment之前的值,为capacity表示可能有生产线程阻塞
    return x;
}

private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        notFull.signal();
    } finally {
        putLock.unlock();
    }
}

private E dequeue() {
    // assert takeLock.isHeldByCurrentThread();
    // assert head.item == null;
    Node<E> h = head;
    Node<E> first = h.next;  // 为什么这里要用dummy head? 感觉是可以简化初始化吧,构造函数里做掉,后面就不用去判null了
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}

为什么ArrayBlockingQueue不用双锁

  • Q:为什么ArrayBlockingQueue的put和take不能并行?
  • A:因为是单锁
  • Q:那改成双锁就可以并行了吗?
  • A:不行,因为count是int
  • Q:那改成AtomicInteger就可以了吗?
  • A:还是不行,因为通知方式也要该,不能直接condition.signal(),而要先获得另外一把锁
  • Q:改好了,现在可以了吗?
  • A:可以了
  • Q:效率有提高吗?
  • A:待测试
  • Q:假设没有提高,原因可能是什么?
  • A:LinkedBlockingQueue需要构造节点,导致较长等待,同时存取有优化效果
    • ArrayBlockingQueue无需构造节点,相对来说,加锁和解锁时间占比可能反而较大
    • 转成双锁之后,对比原来操作,需要多竞争两次
      • Atomic变量的CAS操作
      • 获得另一把锁的condition进行通知操作
      • 可能这两部分的损耗,已经超过并发存取带来的收益

PriorityBlockingQueue

  • 基于数组的带优先级的无界阻塞队列

构造函数

public class PriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable

public PriorityBlockingQueue(int initialCapacity, Comparator<super E> comparator) {
    if (initialCapacity < 1)
        throw new IllegalArgumentException();
    this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
    this.comparator = comparator;
    this.queue = new Object[initialCapacity]; // initialCapacity只是初始容量,并不用来做限制,所以是无界
}
  • 只支持非公平锁
  • 数组初始容量11(为什么是11?)

添加(因为可以扩容,所以是无界)

// add和put都是调用offer

public boolean offer(E e) {
    if (e == null)
        throw new NullPointerException();
    final ReentrantLock lock = this.lock;
    lock.lock();
    int n, cap;
    Object[] array;
    while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);
    try {
        Comparator<super E> cmp = comparator;
        if (cmp == null)
            siftUpComparable(n, e, array); // comparator为空时需要元素自己实现Comparable接口;看内部逻辑是最小堆
        else
            siftUpUsingComparator(n, e, array, cmp);
        size = n + 1;
        notEmpty.signal();
    } finally {
        lock.unlock();
    }
    return true;
}

// 扩容
private void tryGrow(Object[] array, int oldCap) {
    lock.unlock(); // must release and then re-acquire main lock
    Object[] newArray = null;
    if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) {
        try {
            int newCap = oldCap + ((oldCap < 64) ?
                                   (oldCap + 2) : // grow faster if small
                                   (oldCap >> 1));
            if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow,MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8
                int minCap = oldCap + 1;
                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                    throw new OutOfMemoryError();
                newCap = MAX_ARRAY_SIZE;
            }
            if (newCap > oldCap && queue == array)
                newArray = new Object[newCap];
        } finally {
            allocationSpinLock = 0;
        }
    }
    if (newArray == null) // back off if another thread is allocating
        Thread.yield();
    lock.lock();
    if (newArray != null && queue == array) {
        queue = newArray;
        System.arraycopy(array, 0, newArray, 0, oldCap);
    }
}

无阻塞消费

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return dequeue();
    } finally {
        lock.unlock();
    }
}

private E dequeue() {
    int n = size - 1;
    if (n < 0)
        return null;
    else {
        Object[] array = queue;
        E result = (E) array[0]; // 返回头部
        E x = (E) array[n];      // 将尾部元素挪到头部进行下沉
        array[n] = null;
        Comparator<super E> cmp = comparator;
        if (cmp == null)
            siftDownComparable(0, x, array, n);
        else
            siftDownUsingComparator(0, x, array, n, cmp);
        size = n;
        return result;
    }
}

阻塞消费

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    E result;
    try {
        while ((result = dequeue()) == null)
            notEmpty.await();
    } finally {
        lock.unlock();
    }
    return result;
}

DelayQueue

构造函数

public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E>

public interface Delayed extends Comparable<Delayed>
  • 元素必需实现Delayed接口

添加

public boolean offer(E e) {
    final ReentrantLock lock = this.lock; // 非公平
    lock.lock();
    try {
        q.offer(e);                       // private final PriorityQueue<E> q = new PriorityQueue<E>();
        if (q.peek() == e) {
            leader = null;                // private Thread leader = null;
            available.signal();           // private final Condition available = lock.newCondition();
        }
        return true;
    } finally {
        lock.unlock();
    }
}

// PriorityQueue
public boolean offer(E e) {
    if (e == null)
        throw new NullPointerException();
    modCount++;
    int i = size;
    if (i >= queue.length)
        grow(i + 1);
    size = i + 1;
    if (i == 0)
        queue[0] = e;
    else
        siftUp(i, e);
    return true;
}
  • 这个grow有bug,不知道为什么没有修复

取值

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            if (first == null)
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    return q.poll();
                first = null; // don't retain ref while waiting
                if (leader != null)
                    available.await();
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}
  • 如果first为null,说明队列已空,直接await
  • 如果delay <= 0,那么直接拿走顶端值
  • 如果leader不为空,说明自己不是第一个线程,进入await
  • 如果leader是自己,那么等到delay结束就可以拿值了

SynchronousQueue

  • 不存储元素的阻塞队列
  • 负责把生产者线程处理的数据直接传递给消费者线程
  • put操作必须等待take操作
  • 吞吐量高于LinkedBlockingQueue和ArrayBlockingQueue

添加

public void put(E e) throws InterruptedException {
    if (e == null)
        throw new NullPointerException();
    if (transferer.transfer(e, false, 0) == null) {
        Thread.interrupted();
        throw new InterruptedException();
    }
}

LinkedTransferQueue

transfer

  • 如果当前有消费者正在等待接收元素,transfer方法可以把生产者传入的元素立刻transfer给消费者
  • 如果没有消费者在等待接收元素,transfer方法会将元素存放在队列的tail节点,并等到该元素被消费者消费了才返回
  • transfer方法关键代码
Node pred = tryAppend(s, haveData);
return awaitMatch(s, pred, e, (how == TIMED), nanos);
  • 第一行代码:试图把存放当前元素的s节点作为tail节点
  • 第二行代码:让CPU自旋等待消费者消费元素

tryTransfer

  • 试探生产者传入的元素是否能直接传给消费者
  • 如果没有消费者等待接收元素,返回false
  • 和transfer的区别是无论消费者是否接收,方法立即返回

LinkedBlockingDeque

概述

  • 双端放值,双端取值,可以分担并发压力
  • 可以运用在“工作窃取模式”???

添加

public void putFirst(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    Node<E> node = new Node<E>(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        while (!linkFirst(node))
            notFull.await();
    } finally {
        lock.unlock();
    }
}

public void putLast(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    Node<E> node = new Node<E>(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        while (!linkLast(node))
            notFull.await();
    } finally {
        lock.unlock();
    }
}

取值

public E takeFirst() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E x;
        while ( (x = unlinkFirst()) == null)
            notEmpty.await();
        return x;
    } finally {
        lock.unlock();
    }
}

public E takeLast() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E x;
        while ( (x = unlinkLast()) == null)
            notEmpty.await();
        return x;
    } finally {
        lock.unlock();
    }
}

https://www.xamrdz.com/backend/3rd1933998.html

相关文章: