当前位置: 首页>前端>正文

java并发基础-同步控制工具

到计数器:CountDownLatch

让某一个线程等待直到倒计数结束,再开始执行。

  • demo
    假设有n个线程同时commit,只要有一个线程失败,其他所有线程回滚。
package com.jenson;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;

public class CountDownLatchTest implements Runnable {
    public static int THREAD_COUNT = 3;
    public static final CountDownLatch LATCH = new CountDownLatch(THREAD_COUNT);
    public static AtomicBoolean successFlag = new AtomicBoolean(Boolean.TRUE);
    private final int seq;

    public CountDownLatchTest(int seq) {
        this.seq = seq;
    }

    @Override
    public void run() {
        System.out.println("thread seq : " + seq + ", doing something");
        try {
            // 每个执行一秒钟
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        if (seq == 1) {
            // 模拟某一个报错,将标志位置为false
            successFlag.set(Boolean.FALSE);
        }
        // 事情做完了,计数
        LATCH.countDown();
        // 等待别人做完
        try {
            LATCH.await();
            System.out.println("thread seq : " + seq + ",等待完成,继续...");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        if (Boolean.TRUE.equals(successFlag.get())) {
            System.out.println("thread seq : " + seq + ",所有任务执行正常,commit...");
        } else {
            System.out.println("thread seq : " + seq + ",有任务执行失败,rollback...");
        }

    }

    public static void main(String[] args) {
        for (int i = 0; i < THREAD_COUNT; i++) {
            CountDownLatchTest demo = new CountDownLatchTest(i);
            Thread td = new Thread(demo);
            td.start();
        }
    }
}

循环栅栏:CyclicBarrier

可以实现线程间的计数等待,并且这个计数器可以反复使用
它可以接收一个参数作为barrierAction,当计数器一次计数完成后,系统会执行该动作。

  • demo
    使用 CyclicBarrier 实现上一个CountDownLatch的demo
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;

public class CyclicBarrierTest {
    public static int THREAD_COUNT = 3;
    public static final CyclicBarrier BARRIER = new CyclicBarrier(THREAD_COUNT, new BarrierAction());
    public static AtomicBoolean successFlag = new AtomicBoolean(Boolean.TRUE);

    public static class BarrierDemo implements Runnable {
        private final int seq;

        public BarrierDemo(int seq) {
            this.seq = seq;
        }

        @Override
        public void run() {
            System.out.println("thread seq : " + seq + ", doing something");

            if (seq == 1) {
                // 模拟某一个报错,将标志位置为false
                successFlag.set(Boolean.FALSE);
            }
            // 等待别人做完
            try {
                BARRIER.await();
                System.out.println("thread seq : " + seq + ",等待完成,继续...");
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
            if (Boolean.TRUE.equals(successFlag.get())) {
                System.out.println("thread seq : " + seq + ",所有任务执行正常,commit...");
            } else {
                System.out.println("thread seq : " + seq + ",有任务执行失败,rollback...");
            }

        }
    }

    public static class BarrierAction implements Runnable {
        @Override
        public void run() {
            System.out.println("通知,有一组任务执行完了....");
        }
    }

    public static void main(String[] args) throws InterruptedException {
        for (int j = 0; j < 2; j++) {
            System.out.println("a:" + BARRIER.getNumberWaiting());
            System.out.println("--->第 " + j + " 组");
            for (int i = 0; i < THREAD_COUNT; i++) {
                BarrierDemo demo = new BarrierDemo(i);
                Thread td = new Thread(demo);
                td.start();
            }
            // 为了测试循环计数,这里简单用sleep分割两组
            Thread.sleep(1000);
            successFlag.set(Boolean.TRUE);
            System.out.println("b:" + BARRIER.getNumberWaiting());
        }
    }
}

信号量:Semaphore

可以指定多个线程,同时访问某一个资源

主要方法:

  • acquire()
    尝试获得一个准入的许可。若无法获得,则线程会等待,直到有线程释放一个许可或者当前线程被中断。
    还提供了类似的acquireUninterruptibly()方法和tryAcquire()方法。

  • release()
    释放信号量

  • demo

package com.jenson;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

public class SemaphoreTest implements Runnable {
    public static final Semaphore semp = new Semaphore(5);

    @Override
    public void run() {
        try {
            semp.acquire();
            Thread.sleep(2000);
            System.out.println(Thread.currentThread().getId() + ":done!");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            semp.release();
        }
    }

    public static void main(String[] args) {
        ExecutorService exec = Executors.newFixedThreadPool(20);
        final SemaphoreTest test = new SemaphoreTest();
        for (int i = 0; i < 20; i++) {
            exec.submit(test);
        }
    }
}

线程阻塞工具类:LockSupport

方法:

  • LockSupport.park();
    park()可以阻塞当前线程,类似的还有parkNanos()、parkUntil()等方法
  • LockSupport.unpark(thread);
    唤醒指定线程

https://www.xamrdz.com/web/2ek1994650.html

相关文章: