当前位置: 首页>后端>正文

Java并发学习笔记——第六章 Java并发容器和框架

Java并发学习笔记——第六章 Java并发容器和框架

ConcurrentHashMap的实现原理与使用

ConcurrentHashMap是高效且线程安全的HashMap

使用ConcurrentHashMap的原因

  • HashMap线程不安全
    • HashMap在并发执行put操作时会引起死循环,因为多线程会导致HashMapEntry链表形成环形数据结构,则链表永远没有节点的nextnull
  • HashTable效率低下
    • 使用synchronized保证线程安全,在多线程情况下效率非常低。当一个线程访问HashTable的同步方法,其他线程也访问同步方法时则会进入阻塞或轮询。如线程1访问put,线程2无法访问get
  • ConcurrentHashMap使用锁分段技术
    • 相比所有线程访问HashTable竞争一把锁,ConcurrentHashMap先将数据分段存储,然后为每段数据配置一把锁,当一个线程访问其中一段数据时,其他段数据也能被正常访问。

ConcurrentHashMap的结构

Java并发学习笔记——第六章 Java并发容器和框架,第1张
ConcurrentHashMap类图

ConcurrentHashMapSegment数组构成。

SegmentHashEntry数组构成,Segment继承了可重入锁ReentrantLock,在ConcurrentHashMap充当数据分段锁的角色。当对ConcurrentHashMap的一个操作需要对HashEntry中的节点进行修改时,会获取对应的Segment锁。

HashEntry则为链表节点,存储键值对。

ConcurrentHashMap的初始化

默认容量大小为16,负载因子0.75 。

Segment数组个数为ssizessize由初始变量concurrencyLevel(默认为16)计算得出,大于等于concurrencyLevel且为2的N次方。

sshift等于ssize从1向左移位的次数。若ssize为16则sshift为4。(1 << 4 = 16;默认为4)

segmentShift用于定位参与散列运算的位数,等于32减sshift。该参数用于取再散列后的用于做定位的hash位数。(默认为28)

segmentMask为散列运算掩码,等于ssize减1 。用于对再散列后的hash取高位做定位。(默认为15)

定位Segment

ConcurrentHashMap通过对hashcode使用 Wang/Jenkins hash 的变种算法进行再散列,目的是减少散列冲突。

通过变种算法的再散列后的数最大是32位二进制数据,

如存入"0001111"、"0011111"、"0111111"、"1111111",对这些数进行再散列将生成:

0100|0111|0110|0111|1101|1010|0100|1110

1111|0111|0100|0011|0000|0001|1011|1000

0111|0111|0110|1001|0100|0110|0011|1110

1000|0011|0000|0000|1100|1000|0001|1010

由于segmentShift为28,则取再散列数右移28位,将得到的数取segmentMask模,得4、15、7、8,可以发现没有冲突。若直接对15取模,低位相同的数输出都会为15 。

ConcurrentHashMap的操作

get操作

步骤:再散列→定位Segment→散列算法定位所在HashEntry链表。

get高效的原因在于不需要加锁,除非读到值为空才加锁重读。之所以不用加锁,原因在于:

  • 对表示Segment大小的count字段定义为volatile
  • 对存储值的HashEntryvalue定义为volatile

由JMM的先行发生原则可以保证get操作不会读到过期的值。

put操作

步骤:再散列→定位到Segment→判断是否需要对HashEntry数组进行扩容→定位添加元素位置,将其放在HashEntry数组里

ConcurrentHashMap在插入前判断扩容,HashMap在插入后判断扩容。若HashMap扩容后不再插入新值,则进行了一次无用的扩容。

扩容时,会创建一个是原来容量两倍的数组,然后将原数组的元素再散列后插入到新数组里。ConcurrentHashMap不会对整个容器进行扩容,而只对某个Segment进行扩容。

size操作

Segment中的modCountputremoveclean操作后都会加1 ,用来标记Segment的版本。

ConcurrentHashMap并没有采用直接锁住所有Segment的方式获取count和,它会先尝试2次不锁住Segment的方式统计各个Segment大小,若统计过程中容器的count发生变化,再将所有Segment锁住来求count和。

而在统计size前后比较modCount是否变化来获得count是否改变。

ConcurrentLinkedQueue

一种线程安全的队列。采用“wait-free“算法实现(即CAS)。

ConcurrentLinkedQueue的结构

Java并发学习笔记——第六章 Java并发容器和框架,第2张
ConcurrentLinkedQueue类图

入队列(优化入队列效率)

入队列就是将入队节点添加到队列的尾部。

入队主要做两件事:

  1. 将入队节点设置为当前队列尾结点的next节点。
  2. 使用CAS更新tail节点:若tail节点的next节点不为空,则将入队节点设置成tail节点;否则将入队节点设置为tailnext节点。所以tail节点不总是尾节点

不让tail永远作为队列尾节点的原因在于,若每次都需要使用循环CAS更新tail节点,会降低入队效率。Doug Lea大师的方法则是每添加HOPS+1(默认为1)个节点才调用一次Node.casNext()以此减少CAS更新tail的次数。tail和尾节点距离越长,使用CAS更新tail次数越少,但每次定位尾节点的时间就越长。但这样本质上是通过增加对volatile变量的读操作减少对volatile变量的写操作,而写操作的开销要远大于读操作,因此入队效率仍然能得到提升。

出队列(优化出队列效率)

出队列就是将队列头部节点弹出。

与入队列相同,出队列使用CAS保证更新head节点,同样不会每次出队都更新head节点。

首先获取头节点,判断头节点是否为空。若为空,证明已有线程执行出队操作拿走元素;若不为空,则使用CAS将头节点引用设置为null。若CAS成功,则直接返回头节点元素;若不成功,表示已有线程进行了出队操作并更新了head节点,需要重新获取头节点。

源码中使用Node.casItem()获取节点是否为最新;使用updateHead()更新head节点。

Java的阻塞队列

阻塞队列定义

阻塞队列是一个支持两个附加操作的队列。这两个附加操作支持阻塞的插入和移除方法。

  • 支持阻塞的插入方法:当队列满时,队列会阻塞插入元素的线程,直到队列不满。
  • 支持阻塞的移除方法:当队列为空时,获取元素的线程会等待队列变为非空。

阻塞队列常用于生产者/消费者场景。

Java中的阻塞队列

JDK7提供了7个阻塞队列:

  • ArrayBlockingQueue:由数组结构组成的有界阻塞队列。

    • 可以通过增加一个重入锁(内含同步队列)使队列变得具有公平性。
  • LinkedBlockingQueue:由链表结构组成的有界阻塞队列。

    • 默认和最大长度为Integer.MAX_VALUE
  • PriorityBlockingQueue:支持优先级排序的无界阻塞队列。

    • 默认采取自然升序。
    • 可以指定构造参数Comparator类更改排序方式。
  • DelayQueue:使用优先级队列实现的支持延时获取元素的无界阻塞队列。

    • 队列中元素必须实现Delayed接口。

    • 在创建元素时可以指定多久才能从队列中获取当前元素。

    • 运用场景:

      • 缓存系统的设计:用DelayQueue保存缓存元素的有效期,用一个线程循环查询DelayQueue,能获取到元素则意味缓存有效期到了。
      • 定时任务调度:用DelayQueue保存当天将执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,如TimerQueue
    • 使用方法:

      • ScheduledThreadPoolExecutor中的ScheduledFutureTask类的实现为例,共有三步:

        ///第一步,创建对象时初始化基本数据。用time记录当前对象延迟到什么时候可以使用,使用sequenceNumber表示元素在队列的先后顺序。
        private static final AtomicLong sequencer = new AtomicLong(0);
        
        ScheduledFutureTask(Runnable r, V result, long ns, long period) {
            super(r, result);
            this.time = ns;
            this.period = period;
            this.sequenceNumber = sequencer.getAndIncrement();
        }
        
        //第二步,实现getDelay方法,该方法返回当前元素还需要延长多少时间,单位为纳秒
        public long getDelay(TimeUnit unit) {
            return unit.convert(time - now(), TimeUnit.NANOSECONDS);
        }
        
        //第三步,实现compareTo方法指定元素顺序。
        public int compareTo(Delayed other) {
            if (other == this)
                return 0;
            if (other instanceof ScheduledFutureTask) {
                ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
                long diff = time - x.time;
                if (diff < 0) 
                    return -1;
                else if (diff > 0)
                    return 1;
                else if (sequenceNumber < x.sequenceNumber)
                    return -1;
                else
                    return 1;
            }
            long d = (getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS));
            return (d == 0) 0 : ((d < 0) -1 : 1);
        }
        
      • 实现延时阻塞队列。

        //当消费者获取元素发现元素没有达到延时时,就阻塞当前线程
        long delay = first.getDelay(TimeUnit.NANOSECONDS);
        if (delay <= 0)
            return q.poll();
        else if (leader != null)
            available.await();
        else {
            Thread thisThread = Thread.currentThread();
            leader = thisThread;
            try {
                available.awaitNanos(delay);
            } finally {
                if (leader == thisThread)
                    leader = null;
            }
        }
        
  • SynchronousQueue:不存储元素的阻塞队列。

    • 每一个put操作必须等待一个take
    • 支持公平访问,默认非公平。
  • LinkedTransferQueue:由链表结构组成的无界阻塞队列。

    • 相比其他阻塞队列,多了tryTransfertransfer方法。
      • transfer方法:若有消费者正在等待接收元素,该方法可以把生产者传入的元素立刻传输给消费者。立即返回。
      • tryTransfer方法:试探生产者传入的元素是否能直接给消费者。等到消费者了才会返回。
  • LinkedBlockingDeque:由链表结构组成的双向阻塞队列。

    • 可以运用在“工作窃取”模式中。

阻塞队列实现原理

使用通知模式实现。即生产者往满的队列添加元素时会阻塞住生产者,当消费者消费了一个队列元素时,通知生产者队列可用。

阻塞主要通过LockSupport.park(this)实现。

unsafe.park是个native方法。

Fork/Join 框架

Fork/Join框架定义

是Java7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干小任务,最终汇总每个小任务结果后得到大任务结果的框架。

工作窃取算法

工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。

  • 优点:充分利用线程进行并行计算,减少了线程间的竞争。
  • 缺点:当双端队列中仅存一个任务时,还是会存在线程间的竞争;该算法消耗了更多的系统资源,如创建多个线程完成多个双端任务。

Fork/Join框架设计

Fork/Join框架需要完成:

  1. 分割任务。直到将任务分割的足够小。
  2. 执行任务并合并结果。分割的子任务分别放在双端列表中。

Fork/Join使用两个类完成以上事情:

  • ForkJoinTask:使用ForkJoin框架首先需要创建一个ForkJoin任务,它提供在任务中执行fork()join()操作的机制。通常实现时继承该类的子类:
    • RecursiveAction:用于没有返回结果的任务。
    • RecursiveTask:用于有返回结果的任务。
  • ForkJoinPoolForkJoinTask需要ForkJoinPool来执行。

当一个工作线程的双端队列中没有任务时,它会随机从一个双端队列获取任务进行执行。

Fork/Join框架原理

ForkJoinPoolForkJoinTask数组和ForkJoinWorkerThread数组组成,前者负责存放程序提交给ForkJoinPool的任务,后者负责执行这些任务。

ForkJoinTask.fork()实现原理:

当调用ForkJoinTask.fork()时,程序会调用ForkJoinWorkerThread.pushTask()异步执行该任务,并立即返回结果。

ForkJoinWorkerThread.pushTask()把当前任务存放在ForkJoinTask数组队列中,再调用ForkJoinPool.singalWork()唤醒或创建一个工作线程来执行任务。

ForkJoinTask.join()实现原理:

阻塞当前线程并等待获取结果。

总结

ConcurrentHashMap通过分段加锁提高了并发效率,且每次扩容只对某一段数据扩容。

ConcurrentLinkedQueue通过增加对volatile变量的读操作以减少对volatile的写操作以提高并发队列的入队、出队效率。

阻塞队列实现原理类似为生产者/消费者通信。

Fork/Join原理类似为分治算法,将分治算法中的递归上升到开新线程执行,且执行线程在执行完后会帮其他线程执行任务。


https://www.xamrdz.com/backend/34a1926783.html

相关文章: