Java并发学习笔记——第八章 Java的并发工具类
JDK并发包提供了一些很有用的并发工具类,如CountDownLatch
、CyclicBarrier
、Semaphore
工具类提供了一种并发流程控制的手段;Exchanger
工具类提供了在线程间交换数据的一种手段。
等待多线程完成的CountDownLatch
CountDownLatch
允许一个或多个线程等待其他线程完成操作,该工具类可实现的功能比使用Thread.join()
更多。
public class CountDownLatchTest {
static CountDownLatch c = new CountDownLatch(2); //构造函数接收一个int类型作为计数器,等待N个点完成,就传入N
public static void main(String[] args) throws InterruptedException {
new Thread(new Runnable() {
public void run() {
System.out.println(1);
c.countDown();
System.out.println(2);
c.countDown();
}
}).start();
c.await();
System.out.println(3);
}
}
// 输出
// 1
// 2
// 3
当调用一次CountDownLatch.countDown()
时,N就会减1 。CountDownLatch.await()
会阻塞线程,直到N为0 。
CountDownLatch.await(long time, TimeUnit unit)
可以在等待特定时间后不再阻塞当前线程。
同步屏障CyclicBarrier
CyclicBarrier
的作用是:让一组线程到达一个屏障(同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被阻塞的线程才继续运行。
CyclicBarrier
可以用于多线程计算数据,最后合并计算结果的场景。
CyclicBarrier
有两个构造方法与一个阻塞方法:
-
CyclicBarrier(int parties)
:参数表示屏障拦截的线程数量。 -
CyclicBarrier(int parties, Runnable barrierAction)
:在线程到达屏障时,优先执行barrierAction。 -
CyclicBarrier.await()
:线程执行该方法,通知CyclicBarrier
有一个线程到达了屏障。
示例
public class CyclicBarrierTest {
static CyclicBarrier c = new CyclicBarrier(2, new A());
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
try {
c.await();
} catch (Exception e) {
}
System.out.println(1);
}
}).start();
try {
c.await();
} catch (Exception e) {
}
System.out.println(2);
}
static class A implements Runnable { //只会在有线程第一次到达时会记录A,因此只会执行一次。
@Override
public void run() {
System.out.println(3);
}
}
}
///输出
// 3 3
// 1 or 2
// 2 1
CyclicBarrier与CountDownLatch的区别
后者计数器只能使用一次,但是前者计数器可以用reset()
重置。
CyclicBarrier的其他方法
CyclicBarrier
还提供了一些其他的方法,如:
-
getNumberWaiting()
:可以获得CyclicBarrier
阻塞的线程数量。 -
isBroken()
:可以了解CyclicBarrier
阻塞的线程中是否有线程被中断。
控制并发线程数的Semaphore
Semaphore
(信号量)用来控制同时访问特定资源的线程数量。
应用场景
Semaphore
可以做流量控制,特别是共用资源有限的应用场景,如数据库连接。
如有一个需求,要读取几万个文件的数据,由于是IO密集型任务,因此可以启动数十个线程并发读取,但读到内存后,还需要存储到数据库中,而数据库连接数默认只有10个。若不控制可以获取到数据库连接的数量,则会报错无法获取数据库连接。此时,可以使用Semaphore
进行流量控制。
public class SemaphoreTest {
private static final int THREAD_COUNT = 30;
private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
private static Semaphore s = new Semaphore(10);
public static void main(String[] args) {
for(int i = 0; i < THREAD_COUNT; i++) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
s.acquire();
System.out.println("save data");
s.release();
} catch (InterruptedException e) {
}
}
});
}
threadPool.shutdown();
}
}
通过调用acquire()
获取一个许可证,使用完后调用release()
将许可证归还。
还可以使用tryAcquire()
尝试获取许可证。
Semaphore的其他方法
-
int availablePermits()
:返回此Semaphore
当前可用的许可证数。 -
int getQueueLength()
:返回正在等待获取此Semaphore
许可证的线程数。 -
boolean hasQueuedThreads()
:是否有线程正在等待获取许可证。 -
void reducePermits(int reduction)
:减少reduction
个许可证。 -
Collection getQueuedThreads()
:返回所有正在等待获取许可证的线程集合。
线程间交互数据的Exchanger
Exchanger
是一个用于线程间协作、进行线程间数据交换的工具类。
它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。若第一个线程执行exchange()
方法,它就会等待第二个线程也执行exchange()
方法。
应用场景
Exchanger
可以用于校对工作。如采用AB两人进行数据录入,录入到Excel后系统需要加载这两个Excel,并对两个Excel进行校对,看看是否录入一致。
public class ExchangerTest {
private static final Exchanger<String> exgr = new Exchanger<String>();
private static ExecutorService threadPool = Executors.newFixedThreadPool(2);
public static void main(String[] args) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
String A = "数据A";
exgr.exchange(A); //A录入交换数据
} catch (InterruptedException e) {
}
}
});
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
String B = "数据B";
String A = exgr.exchange(B); //B录入交换数据,同时获得其他线程传来的数据
System.out.println("A和B数据是否一致:" + A.equals(B));
} catch (InterruptedException e) {
}
}
});
threadPool.shotdown();
}
}
若两个线程有一个没有执行exchange()
方法,则会一直等待。
为了避免一直等待,可以使用exchange(V x, long timeout, TimeUnit unit)
设置最大等待时长。
总结
灵活使用这一章介绍的并发工具类,可以完成一些并发环境下的特定需求的功能。