当前位置: 首页>后端>正文

JavaGuide知识点整理——java常见并发容器解决

JDK提供的这些并发容器大部分在java.util.concurrent包下。也就是我们常说的JUC。

  • ConcurrentHashMap:线程安全的HashMap
  • CopyOnWriteArrayList:线程安全的List,在读多写少的场合性能非常好,远远好于Vector.
  • ConcurrentLinkedQueue:高效的并发队列,使用链表实现。可以看做一个线程安全的LinkedList,这是一个非阻塞队列。
  • BlockingQueue:这是一个接口,JDK内部通过链表,数组等方式实现了这个接口,表示阻塞队列,非常适合用于作为数据共享的通道。
  • ConcurrentSkipListMap:跳表的实现。这个一个map,使用跳跳表的数据结构进行快速查找。

ConcurrentHashMap

我们知道HashMap不是线程安全的,在并发的场景下如果要保证一种可行的方式是使用Collections.synchronizedMap()方法来包装我们的HashMap,但是这是通过使用一个全局的锁来同步不同线程之间的并发访问,因为会带来不可忽视的性能问题。

所以就有了HashMap的线程安全版本ConcurrentHashMap的诞生。
在ConcurrentHashMap中,无论是读操作还是写操作都能保证很高的性能,在进行读操作时(几乎)不需要加锁,而在写操作时是通过锁分段技术只对所操作的段加锁而不影响客户端对其它段的访问。

CopyOnWriteArrayList

CopyOnWriteArrayList简介

public class CopyOnWriteArrayList<E>
    implements List<E>, RandomAccess, Cloneable, java.io.Serializable 

在很多场景中,读操作会远远大于写操作。由于读操作根本不会修改原有的数据,因此对于每次读取都进行加锁其实是一种资源浪费。我们应该允许多个线程同时访问List的内部数据,毕竟读操作是安全的。
这和我们之前在多线程章节讲过的ReentrantReadWriteLock读写锁的思想非常类似,也是读读共享,写写互斥,读写互斥,写读互斥。JDK中提供了CopyOnWriteArrayList类比相比于在读写锁的思想上更进一步。为了将读的性能发挥到极致,CopyOnWriteArrayList读取是完全不用加锁的,更厉害的是写入也不会阻塞读取操作。只有写入和写入之间需要进行同步等待。这样一来读操作的性能就会大幅度提升。

CopyOnWriteArrayList是如何实现的

CopyOnWriteArrayList类的所有可变操作(add/set等)都是通过创建底层数组的新副本来实现的。当List需要被修改的时候,并不修改原有内容,而是对原有数据进行一次复制,将修改的内容写入副本。写完之后将修改后的副本替换原来的数据,这样就可以保证写操作不会影响读操作了。
从CopyOnWriteArrayList的名字就能看出来CopyOnWriteArrayList是满足CopyOnWrite的,所谓的CopyOnWrite也就是说:在计算机,如果想要对一块内存进行修改时,我们不再原有内存块中进行操作,而是将内存拷贝一份,在新的内存中进行写操作,写完之后呢,就将原本内存指针指向新的内存,原来的内存就可以被回收了。

CopyOnWriteArrayList读取和写入源码简单分析

CopyOnWriteArrayList读取操作的实现
读取操作没有任何同步控制和锁操作。理由就是内部数组array不会发生修改,只会被另外一个array替换,因此可以保证数据安全。
下面是源码(因为get方法调用了别的方法,截图没办法在一张图上,所以用赋值的方式把用到的源码都粘到一起了):

    private transient volatile Object[] array;

    /**
     * Gets the array.  Non-private so as to also be accessible
     * from CopyOnWriteArraySet class.
     */
    final Object[] getArray() {
        return array;
    }

    @SuppressWarnings("unchecked")
    private E get(Object[] a, int index) {
        return (E) a[index];
    }

    /**
     * {@inheritDoc}
     *
     * @throws IndexOutOfBoundsException {@inheritDoc}
     */
    public E get(int index) {
        return get(getArray(), index);
    }

很容易看出CopyOnWriteArrayList中有一个对象数组。然后get 的时候就是获取这个数组的传入下标的元素。逻辑很简单。

CopyOnWriteArrayList写入操作的实现
上面也说过了CopyOnWriteArrayList这个数据类型写的时候是创建一个副本,修改完直接把引用指向副本,所以写入操作add的时候加了锁,保证了同步,避免多线程写的时候会copy出多个副本。下面是源码:

    final void setArray(Object[] a) {
        array = a;
    }
    public E set(int index, E element) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] elements = getArray();
            E oldValue = get(elements, index);

            if (oldValue != element) {
                int len = elements.length;
                Object[] newElements = Arrays.copyOf(elements, len);
                newElements[index] = element;
                setArray(newElements);
            } else {
                // Not quite a no-op; ensures volatile write semantics
                setArray(elements);
            }
            return oldValue;
        } finally {
            lock.unlock();
        }
    }

    /**
     * Appends the specified element to the end of this list.
     *
     * @param e element to be appended to this list
     * @return {@code true} (as specified by {@link Collection#add})
     */
    public boolean add(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] elements = getArray();
            int len = elements.length;
            Object[] newElements = Arrays.copyOf(elements, len + 1);
            newElements[len] = e;
            setArray(newElements);
            return true;
        } finally {
            lock.unlock();
        }
    }

    /**
     * Inserts the specified element at the specified position in this
     * list. Shifts the element currently at that position (if any) and
     * any subsequent elements to the right (adds one to their indices).
     *
     * @throws IndexOutOfBoundsException {@inheritDoc}
     */
    public void add(int index, E element) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] elements = getArray();
            int len = elements.length;
            if (index > len || index < 0)
                throw new IndexOutOfBoundsException("Index: "+index+
                                                    ", Size: "+len);
            Object[] newElements;
            int numMoved = len - index;
            if (numMoved == 0)
                newElements = Arrays.copyOf(elements, len + 1);
            else {
                newElements = new Object[len + 1];
                System.arraycopy(elements, 0, newElements, 0, index);
                System.arraycopy(elements, index, newElements, index + 1,
                                 numMoved);
            }
            newElements[index] = element;
            setArray(newElements);
        } finally {
            lock.unlock();
        }
    }

其实这三个方法的set/add代码很多是重复性的。但是主要思想就是在副本上修改,然后修改指针。用到了Arrays.copyOf,System.arraycopy等方法。也没什么好说的。

ConcurrentLinkedQueue

java提供的线程安全的Queue可以分为阻塞队列和非阻塞队列。其中阻塞队列的典型例子是BlockingQueue,非阻塞队列的典型例子是ConcurrentLinkedQueue.在实际应用中要根据实际需要选用阻塞队列或者非阻塞队列。阻塞队列可以通过加锁来实现,非阻塞队列可以通过CAS操作实现。
从名字可以看出,ConcurrentLinkedQueue这个队列使用链表作为其数据结构。ConcurrentLinkedQueue应该算是高并发环境中性能最好的队列了,它之所以能有很好的性能,是因为其内部复杂的实现。内部代码就不分析了,感兴趣的自己去找资料。我们知道ConcurrentLinkedQueue主要使用CAS非阻塞算法来实现线程安全的就好。
ConcurrentLinkedQueue适合在对性能要求相对较高,同时对队列的读写存在多个线程同时进行的场景,即如果对队列加锁的成本较高则适合使用无锁的ConcurrentLinkedQueue来代替。

BlockingQueue

BolckingQueue简介

上面我们提到了ConcurrentLinkedQueue作为高性能的非阻塞队列,下面我们要讲的就是阻塞队列BlockingQueue。阻塞队列被广泛使用在生产者-消费者问题中。其原因是BlockingQueue提供了可阻塞的插入和移除的方法。当队列容器已满,生产者线程会被阻塞。直到队列未满。当队列容器为空时,消费者线程会被阻塞,直到队列非空时为止。
BlockingQueue是一个接口,继承自Queue,其所有的实现类也可以作为Queue的实现来使用。而Queue又继承自Collection接口,下面是BlockingQueue的相关实现类:


JavaGuide知识点整理——java常见并发容器解决,第1张
BlockingQueue相关实现类

下面主要介绍三个常见的类

ArrayBlockingQueue

ArrayBlockingQueue是BlockingQueue接口的有界队列实现类。底层采用数组来实现。

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable

ArrayBlockingQueue一旦创建,容量不能改变。其并发控制采用可重入锁ReentrantLock。不管是插入操作还是读取操作都需要获取锁才能进行操作。当队列容量满时,尝试将元素放入队列将导致操作阻塞。尝试从一个空队列中获取一个元素也会同样阻塞。

ArrayBlockingQueue默认情况下不能保证线程访问队列的公平性,所谓公平性是指严格按照线程等待的时间顺序,即先等待的线程先访问到ArrayBlockingQueue。非公平性指访问ArrayBlockingQueue的顺序不是严格遵守时间顺序,有可能存在当ArrayBlockingQueue可以被访问时,长时间阻塞的线程依然无法访问。
因为如果保证公平性通常会降低吞吐量。如果需要获取公平性的ArrayBlockingQueue,可以如下配置:

private static ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(10,true);
JavaGuide知识点整理——java常见并发容器解决,第2张
fair:是否公平

LinkedBlockingQueue

LinkedBlockingQueue底层是基于单向链表实现的阻塞队列,可以当做无界也可以当做有界队列来使用,同样满足先进先出的特性,与ArrayBlockingQueue相比起来具有更高的吞吐量。为了防止LinkedBlockingQueue容量迅速增加,损耗大量内存,通常在创建LinkedBlockingQueue对象时会指定其大小,如果未指定,容量等于int最大值。

LinkedBlockingQueue有三个构造方法:
    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

    /**
     * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
     *
     * @param capacity the capacity of this queue
     * @throws IllegalArgumentException if {@code capacity} is not greater
     *         than zero
     */
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

    /**
     * Creates a {@code LinkedBlockingQueue} with a capacity of
     * {@link Integer#MAX_VALUE}, initially containing the elements of the
     * given collection,
     * added in traversal order of the collection's iterator.
     *
     * @param c the collection of elements to initially contain
     * @throws NullPointerException if the specified collection or any
     *         of its elements are null
     */
    public LinkedBlockingQueue(Collection<extends E> c) {
        this(Integer.MAX_VALUE);
        final ReentrantLock putLock = this.putLock;
        putLock.lock(); // Never contended, but necessary for visibility
        try {
            int n = 0;
            for (E e : c) {
                if (e == null)
                    throw new NullPointerException();
                if (n == capacity)
                    throw new IllegalStateException("Queue full");
                enqueue(new Node<E>(e));
                ++n;
            }
            count.set(n);
        } finally {
            putLock.unlock();
        }
    }

重点就是没传容量就默认int最大值。所以我们创建的时候最好指定大小。

PriorityBlockingQueue

PriorityBlockingQueue是一个支持优先级的无界阻塞队列。默认情况下元素采用自然顺序进行排序。也可以通过自定义类实现 compareTo()方法来指定元素排序规则。或者初始化时通过构造器参数Comparator来指定排序规则。
PriorityBlockingQueue并发控制采用的是可重入锁ReentrantLock。队列为无界队列(ArrayBlockingQueue是有界队列,LinkedBlockingQueue也可以通过在构造函数中传入capacity指定队列最大容量,但是PriorityBlockingQueue只能指定初始队列大小,后面插入元素的时候,如果空间不够的话会自动扩容。)
简单的说,它就是PriorityQueue的线程安全版本,不可以插入null值,同时插入队列的对象必须是课比较大小的,否则报ClassCastException异常。它的插入操作put方法不会block,因为他是无界队列。但是take方法在队列为空的时候会阻塞。

ConcurrentSkipListMap

ConcurrentSkipListMap使用了跳表,这里先简单说一下跳表。
对于一个单链表,即使链表是有序的,如果我们想要在其中查找某个数据,也只能从头到尾遍历链表。这样效率自然就会很低。
跳表就不一样了,跳表是一种可以用来快速查找的数据结构。有点类似于平衡树它们都可以对元素进行快速的查找。但是一个重要的区别是:对平衡树的插入和删除往往可能导致平衡树进行一次全局的调整,而对跳表的插入和删除只需要对整个数据结构的局部进行操作即可。
这样带来的好处是:在高并发的情况下,你会需要一个全局锁来保证整个平衡树的线程安全。而对于跳表,只需要部分锁即可。这样在高并发环境下,你就可以拥有更好的性能。而就查询的性能而言,跳表的时间复杂度也是O(logN)的,所以在并发数据结构中,JDK使用跳表来实现一个Map。
跳表的本质是同时维护了多个链表,并且链表是分层的。

JavaGuide知识点整理——java常见并发容器解决,第3张
跳表

最底层的链表维护了跳表内所有元素。每上面一层链表都是下面一层的子集。
跳表内的所有链表的元素都是排序的,查找时可以从顶级链表开始找。一旦发现被查找的元素大于当前链表中的取值,就会转入下一层链表继续找,这也就是说查找的过程中,搜说是跳跃式的。如上面的图想要查找10.则路线如下图:
JavaGuide知识点整理——java常见并发容器解决,第4张
6次查找到10

其实这个链表长度越长,效率提升的越明显,比如查找18:
JavaGuide知识点整理——java常见并发容器解决,第5张
7次查找18

从上面可以很容易看出来:跳表是一种利用空间换时间的算法。
使用跳表实现Map和使用哈希算法实现Map的另一个不同之处是:哈希并不会保存元素的顺序,而跳表内的所有元素都是排序的。因此在对跳表进行遍历时,你会得到一个有序的结果,所以如果你的应用需要有序性,那么跳表是不二选择。
JDK中实现这一数据结构的类就是ConcurrentSkipListMap。

本篇笔记就记到这里,如果稍微帮到你了记得点个喜欢点个关注,也祝大家工作顺顺利利,每天进步一点点~!


https://www.xamrdz.com/backend/3u71942723.html

相关文章: