到计数器:CountDownLatch
让某一个线程等待直到倒计数结束,再开始执行。
- demo
假设有n个线程同时commit,只要有一个线程失败,其他所有线程回滚。
package com.jenson;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
public class CountDownLatchTest implements Runnable {
public static int THREAD_COUNT = 3;
public static final CountDownLatch LATCH = new CountDownLatch(THREAD_COUNT);
public static AtomicBoolean successFlag = new AtomicBoolean(Boolean.TRUE);
private final int seq;
public CountDownLatchTest(int seq) {
this.seq = seq;
}
@Override
public void run() {
System.out.println("thread seq : " + seq + ", doing something");
try {
// 每个执行一秒钟
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (seq == 1) {
// 模拟某一个报错,将标志位置为false
successFlag.set(Boolean.FALSE);
}
// 事情做完了,计数
LATCH.countDown();
// 等待别人做完
try {
LATCH.await();
System.out.println("thread seq : " + seq + ",等待完成,继续...");
} catch (InterruptedException e) {
e.printStackTrace();
}
if (Boolean.TRUE.equals(successFlag.get())) {
System.out.println("thread seq : " + seq + ",所有任务执行正常,commit...");
} else {
System.out.println("thread seq : " + seq + ",有任务执行失败,rollback...");
}
}
public static void main(String[] args) {
for (int i = 0; i < THREAD_COUNT; i++) {
CountDownLatchTest demo = new CountDownLatchTest(i);
Thread td = new Thread(demo);
td.start();
}
}
}
循环栅栏:CyclicBarrier
可以实现线程间的计数等待,并且这个计数器可以反复使用
它可以接收一个参数作为barrierAction,当计数器一次计数完成后,系统会执行该动作。
- demo
使用 CyclicBarrier 实现上一个CountDownLatch的demo
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;
public class CyclicBarrierTest {
public static int THREAD_COUNT = 3;
public static final CyclicBarrier BARRIER = new CyclicBarrier(THREAD_COUNT, new BarrierAction());
public static AtomicBoolean successFlag = new AtomicBoolean(Boolean.TRUE);
public static class BarrierDemo implements Runnable {
private final int seq;
public BarrierDemo(int seq) {
this.seq = seq;
}
@Override
public void run() {
System.out.println("thread seq : " + seq + ", doing something");
if (seq == 1) {
// 模拟某一个报错,将标志位置为false
successFlag.set(Boolean.FALSE);
}
// 等待别人做完
try {
BARRIER.await();
System.out.println("thread seq : " + seq + ",等待完成,继续...");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
if (Boolean.TRUE.equals(successFlag.get())) {
System.out.println("thread seq : " + seq + ",所有任务执行正常,commit...");
} else {
System.out.println("thread seq : " + seq + ",有任务执行失败,rollback...");
}
}
}
public static class BarrierAction implements Runnable {
@Override
public void run() {
System.out.println("通知,有一组任务执行完了....");
}
}
public static void main(String[] args) throws InterruptedException {
for (int j = 0; j < 2; j++) {
System.out.println("a:" + BARRIER.getNumberWaiting());
System.out.println("--->第 " + j + " 组");
for (int i = 0; i < THREAD_COUNT; i++) {
BarrierDemo demo = new BarrierDemo(i);
Thread td = new Thread(demo);
td.start();
}
// 为了测试循环计数,这里简单用sleep分割两组
Thread.sleep(1000);
successFlag.set(Boolean.TRUE);
System.out.println("b:" + BARRIER.getNumberWaiting());
}
}
}
信号量:Semaphore
可以指定多个线程,同时访问某一个资源
主要方法:
acquire()
尝试获得一个准入的许可。若无法获得,则线程会等待,直到有线程释放一个许可或者当前线程被中断。
还提供了类似的acquireUninterruptibly()方法和tryAcquire()方法。release()
释放信号量demo
package com.jenson;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class SemaphoreTest implements Runnable {
public static final Semaphore semp = new Semaphore(5);
@Override
public void run() {
try {
semp.acquire();
Thread.sleep(2000);
System.out.println(Thread.currentThread().getId() + ":done!");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semp.release();
}
}
public static void main(String[] args) {
ExecutorService exec = Executors.newFixedThreadPool(20);
final SemaphoreTest test = new SemaphoreTest();
for (int i = 0; i < 20; i++) {
exec.submit(test);
}
}
}
线程阻塞工具类:LockSupport
方法:
- LockSupport.park();
park()可以阻塞当前线程,类似的还有parkNanos()、parkUntil()等方法 - LockSupport.unpark(thread);
唤醒指定线程