当前位置: 首页>后端>正文

2.4.1 例子countDownLatch、CyclicBarrier

原始对账系统,单线程

2.4.1 例子countDownLatch、CyclicBarrier,第1张
2.4.1 例子countDownLatch、CyclicBarrier,第2张

while(存在未对账订单){

????? pos = getPOrders();???// 查询未对账订单

? ????dos = getDOrders();? ?? // 查询派送单

? ????diff = check(pos, dos);? ?? // 执行对账操作

? ????save(diff);? ?? // 差异写入差异库

}

改为多线程:join

2.4.1 例子countDownLatch、CyclicBarrier,第3张

while(存在未对账订单){

? // 查询未对账订单

? Thread T1 = new Thread(()->{

? ? pos = getPOrders();

? });

? T1.start();

? // 查询派送单

? Thread T2 = new Thread(()->{

? ? dos = getDOrders();

? });

? T2.start();

? // 等待T1、T2结束

? T1.join();

? T2.join();

? diff = check(pos, dos);? ?// 执行对账操作

? save(diff);? ??// 差异写入差异库

}

创建线程耗时,用线程池优化:?CountDownLatch?

Executor executor =Executors.newFixedThreadPool(2);??// 创建2个线程的线程池

while(存在未对账订单){

? CountDownLatch latch =new CountDownLatch(2);???// 计数器初始化为2

? // 查询未对账订单

? executor.execute(()-> {

? ? pos = getPOrders();

? ? latch.countDown();

? });

? // 查询派送单

? executor.execute(()-> {

? ? dos = getDOrders();

? ? latch.countDown();

? });

? latch.await();??// 等待两个查询操作结束

? diff = check(pos, dos);???// 执行对账操作

? save(diff);??? // 差异写入差异库

}

前面我们将 getPOrders() 和 getDOrders() 这两个查询操作并行了,但这两个查询操作和对账操作 check()、save() 之间还是串行的。很显然,这两个查询操作和对账操作也是可以并行的,也就是说,在执行对账操作的时候,可以同时去执行下一轮的查询操作,这个过程可以形象化地表述为下面这幅示意图。

2.4.1 例子countDownLatch、CyclicBarrier,第4张

那接下来我们再来思考一下如何实现这步优化,两次查询操作能够和对账操作并行,对账操作还依赖查询操作的结果,这明显有点生产者 - 消费者的意思,两次查询操作是生产者,对账操作是消费者。既然是生产者 - 消费者模型,那就需要有个队列,来保存生产者生产的数据,而消费者则从这个队列消费数据。不过针对对账这个项目,我设计了两个队列,并且两个队列的元素之间还有对应关系。具体如下图所示,订单查询操作将订单查询结果插入订单队列,派送单查询操作将派送单插入派送单队列,这两个队列的元素之间是有一一对应的关系的。两个队列的好处是,对账操作可以每次从订单队列出一个元素,从派送单队列出一个元素,然后对这两个元素执行对账操作,这样数据一定不会乱掉。

2.4.1 例子countDownLatch、CyclicBarrier,第5张

下面再来看如何用双队列来实现完全的并行。一个最直接的想法是:一个线程 T1 执行订单的查询工作,一个线程 T2 执行派送单的查询工作,当线程 T1 和 T2 都各自生产完 1 条数据的时候,通知线程 T3 执行对账操作。这个想法虽看上去简单,但其实还隐藏着一个条件,那就是线程 T1 和线程 T2 的工作要步调一致,不能一个跑得太快,一个跑得太慢,只有这样才能做到各自生产完 1 条数据的时候,通知线程 T3。下面这幅图形象地描述了上面的意图:线程 T1 和线程 T2 只有都生产完 1 条数据的时候,才能一起向下执行,也就是说,线程 T1 和线程 T2 要互相等待,步调要一致;同时当线程 T1 和 T2 都生产完一条数据的时候,还要能够通知线程 T3 执行对账操作。

Vector<P> pos;? ?// 订单队列

Vector<D> dos;? ?// 派送单队列

Executor executor = Executors.newFixedThreadPool(1);??// 执行回调的线程池

final CyclicBarrier barrier = new CyclicBarrier(2, ()->{

? ????? executor.execute(()->check());? ? ?//等到barrier.await()减为0才执行

? });

void check(){

? P p = pos.remove(0);

? D d = dos.remove(0);

? diff = check(p, d);? ?// 执行对账操作

? save(diff);??? // 差异写入差异库

}

void checkAll(){

? // 循环查询订单库

? Thread T1 = new Thread(()->{

? ? while(存在未对账订单){

? ? ????? pos.add(getPOrders());??? ? ? // 查询订单库

? ? ? ????barrier.await();??? ? ? // 等待

? ? }

? });

? T1.start();?

? // 循环查询运单库

? Thread T2 = new Thread(()->{

? ? while(存在未对账订单){

? ? ????? dos.add(getDOrders());???// 查询运单库

? ? ? ????barrier.await();??? ? ? // 等待

? ? }

? });

? T2.start();

}

1.为啥要用线程池,而不是在回调函数中直接调用?

2.线程池为啥使用单线程的?

我的考虑:

1.使用线程池是为了异步操作,否则回掉函数是同步调用的,也就是本次对账操作执行完才能进行下一轮的检查。

2.线程数量固定为1,防止了多线程并发导致的数据不一致,因为订单和派送单是两个队列,只有单线程去两个队列中取消息才不会出现消息不匹配的问题。


https://www.xamrdz.com/backend/37m1997348.html

相关文章: