CyclicBarrier通常称为循环屏障。它和CountDownLatch很相似,都可以使线程先等待然后再执行。不过CountDownLatch是使一批线程等待另一批线程执行完后再执行;而CyclicBarrier只是使等待的线程达到一定数目后再让它们继续执行。故而CyclicBarrier内部也有一个计数器,计数器的初始值在创建对象时通过构造参数指定,每调用一次await()方法都将使阻塞的线程数+1,只有阻塞的线程数达到设定值时屏障才会打开,允许阻塞的所有线程继续执行。除此之外,CyclicBarrier还有几点需要注意的地方:
- CyclicBarrier的计数器可以重置而CountDownLatch不行,这意味着CyclicBarrier实例可以被重复使用而CountDownLatch只能被使用一次。而这也是循环屏障循环二字的语义所在。
- CyclicBarrier允许用户自定义barrierAction操作,这是个可选操作,可以在创建CyclicBarrier对象时指定。
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
一旦用户在创建CyclicBarrier对象时设置了barrierAction参数,则在阻塞线程数达到设定值屏障打开前,会调用barrierAction的run()方法完成用户自定义的操作。
场景一
公司组织旅游,大家都有经历过,10个人,中午到饭点了,需要等到10个人都到了才能开饭,先到的人坐那等着,代码如下:
try {
System.out.println("准备吃饭了======================>" + name);
TimeUnit.SECONDS.sleep(sleep);
//调用await()的时候,当前线程将会被阻塞,需要等待其他员工都到达await了才能继续
cyclicBarrier2.await();
System.out.println("开饭=======================>" + name);
} catch (Exception e) {
e.printStackTrace();
}
代码中模拟了10个员工上桌吃饭的场景,等待所有员工都到齐了才能开发,可以看到第10个员工最慢,前面的都在等待第10个员工,员工1等待了9秒,上面代码中调用cyclicBarrier.await();会让当前线程等待。当10个员工都调用了cyclicBarrier.await();之后,所有处于等待中的员工都会被唤醒,然后继续运行。
场景二:重复使用CyclicBarrier
对示场景一进行改造一下,吃饭完毕之后,所有人都去车上,待所有人都到车上之后,驱车去下一景点玩。
package com.shiguiwu.springmybatis.thread;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
/**
* @description: 循环
* @author: stone
* @date: Created by 2021/6/23 18:48
* @version: 1.0.0
* @pakeage: com.shiguiwu.springmybatis.thread
*/
public class CyclicBarrierTests {
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(10);
//最后一个线程执行改函数
private static CyclicBarrier cyclicBarrier2 = new CyclicBarrier(10,()->{
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("w我迟到了,哈哈哈");
});
@AllArgsConstructor
@NoArgsConstructor
@Data
public static class CyclicBarrierDemo implements Runnable {
private String name;
private int sleep;
@Override
public void run() {
this.eat();
this.drive();
}
void eat() {
try {
System.out.println("准备吃饭了======================>" + name);
TimeUnit.SECONDS.sleep(sleep);
//调用await()的时候,当前线程将会被阻塞,需要等待其他员工都到达await了才能继续
cyclicBarrier2.await();
System.out.println("开饭=======================>" + name);
} catch (Exception e) {
e.printStackTrace();
}
}
void drive() {
try {
System.out.println("准备开车了======================>" + name);
TimeUnit.SECONDS.sleep(sleep);
//调用await()的时候,当前线程将会被阻塞,需要等待其他员工都到达await了才能继续
cyclicBarrier2.await();
System.out.println("开车了=======================>" + name);
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
new Thread(new CyclicBarrierDemo("t" + i, i)).start();
}
}
}
坑,又是员工10最慢,要提升效率了,不能吃的太多,得减肥。
代码中CyclicBarrier相当于使用了2次,第一次用于等待所有人到达后开饭,第二次用于等待所有人上车后驱车去下一景点。注意一些先到的员工会在其他人到达之前,都处于等待状态(cyclicBarrier.await();会让当前线程阻塞),无法干其他事情,等到最后一个人到了会唤醒所有人,然后继续。
CyclicBarrier内部相当于有个计数器(构造方法传入的),每次调用await();后,计数器会减1,并且await()方法会让当前线程阻塞,等待计数器减为0的时候,所有在await()上等待的线程被唤醒,然后继续向下执行,此时计数器又会被还原为创建时的值,然后可以继续再次使用。
CyclicBarrier原理
它没有像CountDownLatch和ReentrantLock使用AQS的state变量,而CyclicBarrier是直接借助ReentrantLock加上Condition 等待唤醒的功能 进而实现的,在构建CyclicBarrier时,传入的值会赋值给CyclicBarrier内部维护count变量,也会赋值给parties变量(这是可以复用的关键),每次调用await时,会将count -1 ,操作count值是直接使用ReentrantLock来保证线程安全性,如果count不为0,则添加则condition队列中,如果count等于0时,则把节点从condition队列添加至AQS的队列中进行全部唤醒,并且将parties的值重新赋值为count的值(实现复用)
小结:CyclicBarrier则利用ReentrantLock和Condition,自身维护了count和parties变量。每次调用await将count-1,并将线程加入到condition队列上。等到count为0时,则将condition队列的节点移交至AQS队列,并全部释放。
总结
主管相当于 CountDownLatch,干活的小弟相当于做事情的线程。
老板交给主管了一个任务,让主管搞完之后立即上报给老板。主管下面有10个小弟,接到任务之后将任务划分为10个小任务分给每个小弟去干,主管一直处于等待状态(主管会调用await()方法,此方法会阻塞当前线程),让每个小弟干完之后通知一下主管(调用countDown()方法通知主管,此方法会立即返回),主管等到所有的小弟都做完了,会被唤醒,从await()方法上苏醒,然后将结果反馈给老板。期间主管会等待,会等待所有小弟将结果汇报给自己。
而CyclicBarrier是一批线程让自己等待,等待所有的线程都准备好了,自己才能继续。