LinkedBlockingQueue介绍
LinkedBlockingQueue是一个单向链表实现的阻塞队列。该队列按 FIFO排序元素,新元素插入到队列的尾部,并且队列获取操作会获得位于队列头部的元素。
此外,LinkedBlockingQueue可以指定队列的容量。如果不指定,默认容量大小等于Integer.MAX_VALUE。
LinkedBlockingQueue原理和数据结构
LinkedBlockingQueue继承于AbstractQueue,它本质上是一个FIFO(先进先出)的队列。
LinkedBlockingQueue实现了BlockingQueue接口,它支持多线程并发。当多线程竞争同一个资源时,某线程获取到该资源之后,其它线程需要阻塞等待。
LinkedBlockingQueue是通过单链表实现的:
head是链表的表头。取出数据时,都是从表头head处取出。
last是链表的表尾。新增数据时,都是从表尾last处插入。
count是链表的实际大小,即当前链表中包含的节点个数。
capacity是列表的容量,它是在创建链表时指定的。
putLock是插入锁,takeLock是取出锁;notEmpty是“非空条件”,notFull是“未满条件”。通过它们对链表进行并发控制。LinkedBlockingQueue在实现“多线程对竞争资源的互斥访问”时,对于“插入”和“取出(删除)”操作分别使用了不同的锁。对于插入操作,通过“插入锁putLock”进行同步;对于取出操作,通过“取出锁takeLock”进行同步。此外,插入锁putLock和“非满条件notFull”相关联,取出锁takeLock和“非空条件notEmpty”相关联。通过notFull和notEmpty更细腻的控制锁。
若某线程(线程A)要取出数据时,队列正好为空,则该线程会执行notEmpty.await()进行等待;当其它某个线程(线程B)向队列中插入了数据之后,会调用notEmpty.signal()唤醒“notEmpty上的等待线程”。此时,线程A会被唤醒从而得以继续运行。 此外,线程A在执行取操作前,会获取takeLock,在取操作执行完毕再释放takeLock。
若某线程(线程H)要插入数据时,队列已满,则该线程会它执行notFull.await()进行等待;当其它某个线程(线程I)取出数据之后,会调用notFull.signal()唤醒“notFull上的等待线程”。此时,线程H就会被唤醒从而得以继续运行。 此外,线程H在执行插入操作前,会获取putLock,在插入操作执行完毕才释放putLock。
LinkedBlockingQueue函数列表
LinkedBlockingQueue源码分析
下面从LinkedBlockingQueue的创建,添加,删除,遍历这几个方面对它进行分析。
1. 创建
下面以LinkedBlockingQueue(int capacity)来进行说明。
说明:
capacity是“LinkedBlockingQueue”的容量。
head和last是“LinkedBlockingQueue”的首节点和尾节点。它们在LinkedBlockingQueue中的声明如下:
链表的节点定义如下:
?2. 添加
下面以offer(E e)为例,对LinkedBlockingQueue的添加方法进行说明。
public boolean offer(E e) {
? ? if (e == null) throw new NullPointerException();
? ? // 如果“队列已满”,则返回false,表示插入失败。
? ? final AtomicInteger count = this.count;
? ? if (count.get() == capacity)
? ? ? ? return false;
? ? int c = -1;
? ? // 新建“节点e”
? ? Node<E> node = new Node(e);
? ? final ReentrantLock putLock = this.putLock;
? ? // 获取“插入锁putLock”
? ? putLock.lock();
? ? try {
? ? ? ? // 再次对“队列是不是满”的进行判断。
? ? ? ? // 若“队列未满”,则插入节点。
? ? ? ? if (count.get() < capacity) {
? ? ? ? ? ? // 插入节点
? ? ? ? ? ? enqueue(node);
? ? ? ? ? ? // 将“当前节点数量”+1,并返回“原始的数量”
? ? ? ? ? ? c = count.getAndIncrement();
? ? ? ? ? ? // 如果在插入元素之后,队列仍然未满,则唤醒notFull上的等待线程。
? ? ? ? ? ? if (c + 1 < capacity)
? ? ? ? ? ? ? ? notFull.signal();
? ? ? ? }
? ? } finally {
? ? ? ? // 释放“插入锁putLock”
? ? ? ? putLock.unlock();
? ? }
? ? // 如果在插入节点前,队列为空;则插入节点后,唤醒notEmpty上的等待线程
? ? if (c == 0)
? ? ? ? signalNotEmpty();
? ? return c >= 0;
}
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
说明:offer()的作用很简单,就是将元素E添加到队列的末尾。 enqueue()的源码如下:
private void enqueue(Node<E> node) {
? ? // assert putLock.isHeldByCurrentThread();
? ? // assert last.next == null;
? ? last = last.next = node;
}
1.
2.
3.
4.
5.
enqueue()的作用是将node添加到队列末尾,并设置node为新的尾节点! signalNotEmpty()的源码如下:
private void signalNotEmpty() {
? ? final ReentrantLock takeLock = this.takeLock;
? ? takeLock.lock();
? ? try {
? ? ? ? notEmpty.signal();
? ? } finally {
? ? ? ? takeLock.unlock();
? ? }
}
1.
2.
3.
4.
5.
6.
7.
8.
9.
signalNotEmpty()的作用是唤醒notEmpty上的等待线程。
3. 取出
下面以take()为例,对LinkedBlockingQueue的取出方法进行说明。
说明:take()的作用是取出并返回队列的头。若队列为空,则一直等待。 dequeue()的源码如下:
private E dequeue() {
? ? // assert takeLock.isHeldByCurrentThread();
? ? // assert head.item == null;
? ? Node<E> h = head;
? ? Node<E> first = h.next;
? ? h.next = h; // help GC
? ? head = first;
? ? E x = first.item;
? ? first.item = null;
? ? return x;
}
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
dequeue()的作用就是删除队列的头节点,并将表头指向“原头节点的下一个节点”。 signalNotFull()的源码如下:
private void signalNotFull() {
? ? final ReentrantLock putLock = this.putLock;
? ? putLock.lock();
? ? try {
? ? ? ? notFull.signal();
? ? } finally {
? ? ? ? putLock.unlock();
? ? }
}
1.
2.
3.
4.
5.
6.
7.
8.
9.
signalNotFull()的作用就是唤醒notFull上的等待线程。
4. 遍历
下面对LinkedBlockingQueue的遍历方法进行说明。
public Iterator<E> iterator() {
? return new Itr();
}
1.
2.
3.
iterator()实际上是返回一个Iter对象。 Itr类的定义如下:
private class Itr implements Iterator<E> {
? ? // 当前节点
? ? private Node<E> current;
? ? // 上一次返回的节点
? ? private Node<E> lastRet;
? ? // 当前节点对应的值
? ? private E currentElement;
? ? Itr() {
? ? ? ? // 同时获取“插入锁putLock” 和 “取出锁takeLock”
? ? ? ? fullyLock();
? ? ? ? try {
? ? ? ? ? ? // 设置“当前元素”为“队列表头的下一节点”,即为队列的第一个有效节点
? ? ? ? ? ? current = head.next;
? ? ? ? ? ? if (current != null)
? ? ? ? ? ? ? ? currentElement = current.item;
? ? ? ? } finally {
? ? ? ? ? ? // 释放“插入锁putLock” 和 “取出锁takeLock”
? ? ? ? ? ? fullyUnlock();
? ? ? ? }
? ? }
? ? // 返回“下一个节点是否为null”
? ? public boolean hasNext() {
? ? ? ? return current != null;
? ? }
? ? private Node<E> nextNode(Node<E> p) {
? ? ? ? for (;;) {
? ? ? ? ? ? Node<E> s = p.next;
? ? ? ? ? ? if (s == p)
? ? ? ? ? ? ? ? return head.next;
? ? ? ? ? ? if (s == null || s.item != null)
? ? ? ? ? ? ? ? return s;
? ? ? ? ? ? p = s;
? ? ? ? }
? ? }
? ? // 返回下一个节点
? ? public E next() {
? ? ? ? fullyLock();
? ? ? ? try {
? ? ? ? ? ? if (current == null)
? ? ? ? ? ? ? ? throw new NoSuchElementException();
? ? ? ? ? ? E x = currentElement;
? ? ? ? ? ? lastRet = current;
? ? ? ? ? ? current = nextNode(current);
? ? ? ? ? ? currentElement = (current == null) null : current.item;
? ? ? ? ? ? return x;
? ? ? ? } finally {
? ? ? ? ? ? fullyUnlock();
? ? ? ? }
? ? }
? ? // 删除下一个节点
? ? public void remove() {
? ? ? ? if (lastRet == null)
? ? ? ? ? ? throw new IllegalStateException();
? ? ? ? fullyLock();
? ? ? ? try {
? ? ? ? ? ? Node<E> node = lastRet;
? ? ? ? ? ? lastRet = null;
? ? ? ? ? ? for (Node<E> trail = head, p = trail.next;
? ? ? ? ? ? ? ? p != null;
? ? ? ? ? ? ? ? trail = p, p = p.next) {
? ? ? ? ? ? ? ? if (p == node) {
? ? ? ? ? ? ? ? ? ? unlink(p, trail);
? ? ? ? ? ? ? ? ? ? break;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? } finally {
? ? ? ? ? ? fullyUnlock();
? ? ? ? }
? ? }
}
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
73.
74.
75.
LinkedBlockingQueue示例
import java.util.*;
import java.util.concurrent.*;
/*
*? LinkedBlockingQueue是“线程安全”的队列,而LinkedList是非线程安全的。
*
*? 下面是“多个线程同时操作并且遍历queue”的示例
*? (01) 当queue是LinkedBlockingQueue对象时,程序能正常运行。
*? (02) 当queue是LinkedList对象时,程序会产生ConcurrentModificationException异常。
*
*/
public class LinkedBlockingQueueDemo1 {
? ? // TODO: queue是LinkedList对象时,程序会出错。
? ? //private static Queue<String> queue = new LinkedList<>();
? ? private static Queue<String> queue = new LinkedBlockingQueue<>();
? ? public static void main(String[] args) {
? ? ? ? // 同时启动两个线程对queue进行操作!
? ? ? ? new MyThread("ta").start();
? ? ? ? new MyThread("tb").start();
? ? }
? ? private static void printAll() {
? ? ? ? String value;
? ? ? ? Iterator iter = queue.iterator();
? ? ? ? while(iter.hasNext()) {
? ? ? ? ? ? value = (String)iter.next();
? ? ? ? ? ? System.out.print(value+", ");
? ? ? ? }
? ? ? ? System.out.println();
? ? }
? ? private static class MyThread extends Thread {
? ? ? ? MyThread(String name) {
? ? ? ? ? ? super(name);
? ? ? ? }
? ? ? ? @Override
? ? ? ? public void run() {
? ? ? ? ? ? ? ? int i = 0;
? ? ? ? ? ? while (i++ < 6) {
? ? ? ? ? ? ? ? // “线程名” + "-" + "序号"
? ? ? ? ? ? ? ? String val = Thread.currentThread().getName()+i;
? ? ? ? ? ? ? ? queue.add(val);
? ? ? ? ? ? ? ? // 通过“Iterator”遍历queue。
? ? ? ? ? ? ? ? printAll();
? ? ? ? ? ? }
? ? ? ? }
? ? }
}
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
其中一次运行结果:
tb1, ta1,
tb1, ta1, ta2,
tb1, ta1, ta2, ta3,
tb1, ta1, ta2, ta3, ta4,
tb1, ta1, tb1, ta2, ta1, ta3, ta2, ta4, ta3, ta5,
ta4, tb1, ta5, ta1, ta6,
ta2, tb1, ta3, ta1, ta4, ta2, ta5, ta3, ta6, ta4, tb2,
ta5, ta6, tb2,
tb1, ta1, ta2, ta3, ta4, ta5, ta6, tb2, tb3,
tb1, ta1, ta2, ta3, ta4, ta5, ta6, tb2, tb3, tb4,
tb1, ta1, ta2, ta3, ta4, ta5, ta6, tb2, tb3, tb4, tb5,
tb1, ta1, ta2, ta3, ta4, ta5, ta6, tb2, tb3, tb4, tb5, tb6,
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
结果说明: 示例程序中,启动两个线程(线程ta和线程tb)分别对LinkedBlockingQueue进行操作。以线程ta而言,它会先获取“线程名”+“序号”,然后将该字符串添加到LinkedBlockingQueue中;接着,遍历并输出LinkedBlockingQueue中的全部元素。 线程tb的操作和线程ta一样,只不过线程tb的名字和线程ta的名字不同。 当queue是LinkedBlockingQueue对象时,程序能正常运行。如果将queue改为LinkedList时,程序会产生ConcurrentModificationException异常。
-----------------------------------
Android LinkedBlockingQueue 阻塞 linkedblockingqueue使用
https://blog.51cto.com/u_16099271/8843964