当前位置: 首页>大数据>正文

CyclicBarrier 代码示例和底层原理

CyclicBarrier 是 Java.util.concurrent 包中提供的一个同步工具类,它允许一组线程在某个共同点处相互等待,并在所有线程都达到某个条件时继续执行。

CyclicBarrier 代码示例 1 :

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierExample {
    public static void main(String[] args) {
        final int numberOfThreads = 3;
        final CyclicBarrier barrier = new CyclicBarrier(numberOfThreads, () -> {
            System.out.println("所有线程都已到达屏障点,继续执行后续操作");
        });

        for (int i = 0; i < numberOfThreads; i++) {
            final int threadNumber = i;
            new Thread(() -> {
                try {
                    System.out.println("线程 " + threadNumber + " 执行任务");
                    Thread.sleep(1000); // 模拟任务执行时间
                    System.out.println("线程 " + threadNumber + " 到达屏障点");
                    barrier.await(); // 线程到达屏障点,等待其他线程
                    System.out.println("线程 " + threadNumber + " 继续执行后续操作");
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

在上面的示例中,首先创建了一个 CyclicBarrier 对象,指定了需要等待的线程数量为 3,同时指定了当所有线程到达屏障点后要执行的操作。然后创建了 3 个线程,每个线程执行一段任务,然后调用 await() 方法等待其他线程。当所有线程都到达屏障点时,执行操作的回调函数会被触发,并输出相应的信息。

运行上述代码,你会看到类似以下的输出:

线程 0 执行任务
线程 1 执行任务
线程 2 执行任务
线程 2 到达屏障点
线程 0 到达屏障点
线程 1 到达屏障点
所有线程都已到达屏障点,继续执行后续操作
线程 0 继续执行后续操作
线程 2 继续执行后续操作
线程 1 继续执行后续操作

可以看到,当所有线程都到达屏障点后,执行操作的回调函数被触发,并且所有线程继续执行后续操作。

CyclicBarrier 代码示例 2 :

这是一个使用 CyclicBarrier 的示例代码,模拟收集龙珠的场景。当收集到7颗龙珠后,会触发一个动作(召唤神龙)。

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierDemo {
    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> {
            System.out.println("召唤神龙");
        });

        for (int i = 1; i <= 7; i++) {
            final int temp = i;
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + "\t 收集到第" + temp + "颗龙珠");
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }, String.valueOf(i)).start();
        }
    }
}

在这个示例中,我们创建了一个 CyclicBarrier 对象,指定计数器的值为 7,当 7 个线程都调用了 await() 方法后,会触发传入的 Runnable 对象,即召唤神龙的动作。

for 循环中,我们创建了 7 个线程,并分别模拟收集到不同的龙珠。每个线程执行完收集龙珠的操作后,调用 cyclicBarrier.await() 方法,等待其他线程。

当所有线程都调用了 await() 方法后,CyclicBarrier 中的计数器会达到设定的值,触发召唤神龙的动作。输出结果可能类似于以下内容:

1    收集到第1颗龙珠
2    收集到第2颗龙珠
3    收集到第3颗龙珠
4    收集到第4颗龙珠
5    收集到第5颗龙珠
6    收集到第6颗龙珠
7    收集到第7颗龙珠
召唤神龙

这个示例展示了如何使用 CyclicBarrier 来等待多个工作线程执行完毕,并在所有线程完成后触发一个动作。

CyclicBarrier 底层原理涉及到线程同步和共享变量的操作

CyclicBarrier 的底层原理是基于 AQS(AbstractQueuedSynchronizer)实现的。 CyclicBarrier 内部维护了一个共享的同步状态(state)和一个等待队列。每个线程在调用 await() 方法时,会将线程加入等待队列,并检查当前状态是否为初始状态。如果是初始状态,则线程会进入休眠状态,等待其他线程的到达。当所有线程都调用 await() 方法后,它们会相互唤醒,继续执行后续的任务。同时,CyclicBarrier 的状态会被重置为初始状态,可以继续使用。这样就实现了线程之间的协调与等待的功能。

在 CyclicBarrier 中,有一个计数器(count)和一个屏障点(barrier)用于实现线程的同步。计数器的初始值由构造函数传入,每当一个线程调用 await() 方法时,计数器的值会减一。当计数器的值变为零时,表示所有线程都已到达屏障点,可以继续执行后续操作。

CyclicBarrier 使用了共享变量和内置的线程同步机制来实现等待和通知的功能。具体来说,它使用了以下几个关键的方法和数据结构:

  1. await():线程调用 await() 方法时,会尝试获取内置的锁,然后将计数器的值减一。如果计数器的值不为零,那么线程会被阻塞,等待其他线程到达屏障。如果计数器的值变为零,那么所有在屏障上等待的线程将会被唤醒,可以继续执行后续操作。
  2. 内置的同步机制:CyclicBarrier 内部使用了内置的同步机制,如锁、条件变量等,来实现线程的等待和通知机制。
  3. 循环使用的特性:与 CountDownLatch 不同,CyclicBarrier 是可以循环使用的。当所有线程都到达屏障后,计数器会被重置为初始值,可以进行下一轮的等待和通知。

在底层实现中,CyclicBarrier 使用了类似于 ReentrantLock 和 Condition 的机制来实现线程的等待和唤醒。当一个线程调用 await() 方法时,它会尝试获取内置的锁,然后检查计数器的值。如果计数器的值不为零,线程会通过条件变量进入等待状态。当计数器的值变为零时,最后一个到达屏障的线程会通过条件变量唤醒其他等待的线程,使它们继续执行。

需要注意的是,CyclicBarrier 是可重入的,即同一个线程可以多次调用 await() 方法等待其他线程到达屏障。只有当所有线程都调用了 await() 方法,计数器的值才会变为零,触发屏障的打开。

总结起来,CyclicBarrier 的底层原理是通过共享变量和内置的线程同步机制来实现线程的等待和通知。它提供了一种循环使用的屏障机制,可以在某个屏障点上同步等待,然后同时继续执行后续的任务。


https://www.xamrdz.com/bigdata/7az1997603.html

相关文章: