当前位置: 首页>移动开发>正文

事务注解@Transactional在多线程下的体现和尽量保证多线程下的事务一致性

事务注解@Transactional在多线程下的体现

使用默认的事务传播规则,测试代码如下

ChickServiceImpl.java 将列表插入分成n个线程并发操作

......

    @Override
    @Transactional(rollbackFor = Exception.class)
    public Boolean batchConcurrentSave(List<Chick> chickList) {
        // 分成n个循环调用异步函数
        CompletableFuture[] completableFutureArr = new CompletableFuture[chickList.size()];

        for (int i = 0; i < chickList.size(); i++) {
            // 获取返回结果
            CompletableFuture<Long> chickId = chickAsyncRepository.insertAsyncChick(chickList.get(i));
            completableFutureArr[i] = chickId;
        }

        //join() 的作用:让“主线程”等待“子线程”结束之后才能继续运行
        System.out.println("异步调用完,join前.......");
        CompletableFuture.allOf(completableFutureArr).join();
        System.out.println("异步调用完,join后.......");

        return Boolean.TRUE;
    }

......

ChickAsyncRepositoryImpl.insertAsyncChick 插入数据,在插入函数中故意写个报错的代码,测试数据库回滚的情况

@Component
public class ChickAsyncRepositoryImpl implements ChickAsyncRepository {

    @Autowired
    private ChickMapper chickMapper;

    @Override
    @Async("executorBeanName")
    // @Transactional(rollbackFor = Exception.class)
    public CompletableFuture<Long> insertAsyncChick(Chick chick) {
        Long chickId = chickMapper.addChick(chick);
        System.out.println("当前线程号 : "+Thread.currentThread().getName());
        // 故意写个报错代码,测试回滚情况
        if("c".equals(chick.getName())){
            int i =1/0;
        }
        return CompletableFuture.completedFuture(chickId);
    }
}

用以下数据进行测试,当插入第三条name=c的数据时会报错

[
{
    "name":"a",
    "weight":"10"
},
{
    "name":"b",
    "weight":"11"
},
{
    "name":"c",
    "weight":"12"
},
{
    "name":"d",
    "weight":"13"
}
]
  1. ChickAsyncRepositoryImpl.insertAsyncChick 函数上不加@Transactional注解
    代码报错,但四条数据均插入成功,name为c的线程没有发生回滚,说明并发时没有继承到调用者的事务
  2. ChickAsyncRepositoryImpl.insertAsyncChick 函数上@Transactional注解
    代码报错,只有三条数据插入成功,name为c的数据回滚了,没有插入成功,说明并发时该函数没有继承到调用者的事务,自己各自创建了新事物

尽量保证多线程下的事务一致性

先尽量保证并发线程间的事务一致,流程图如下:


事务注解@Transactional在多线程下的体现和尽量保证多线程下的事务一致性,第1张
并发事务一致性流程图.png

代码实现:

ChickServiceImpl.java 将列表插入分成n个线程并发操作
使用 AtomicBoolean 作为线程间的共享变量
使用 CountDownLatch 完成线程间的互相等待

......

    @Override
    public Boolean batchAtomicConcurrentSave(List<Chick> chickList) {
        AtomicBoolean successFlag = new AtomicBoolean(Boolean.TRUE);
        // 分成n个循环调用异步函数
        CompletableFuture[] completableFutureArr = new CompletableFuture[chickList.size()];
        CountDownLatch countDownLatch = new CountDownLatch(chickList.size());
        for (int i = 0; i < chickList.size(); i++) {
            // 获取返回结果
            CompletableFuture<Long> chickId =
                    chickAsyncRepository.newInsertAsyncChick(chickList.get(i),
                            countDownLatch,
                            successFlag);
            completableFutureArr[i] = chickId;
        }

        //join() 的作用:让“主线程”等待“子线程”结束之后才能继续运行
        System.out.println("异步调用完,join前.......");
        CompletableFuture.allOf(completableFutureArr).join();
        System.out.println("异步调用完,join后.......");

        return successFlag.get();
    }

......

ChickAsyncRepositoryImpl.newInsertAsyncChick 插入数据,在插入函数中故意写个报错的代码,测试数据库回滚的情况
使用 PlatformTransactionManager 来手动控制事务、提交和回滚

/**
 * @author Jenson
 */
@Component
@Slf4j
public class ChickAsyncRepositoryImpl implements ChickAsyncRepository {

    @Autowired
    private ChickMapper chickMapper;
    @Autowired
    private PlatformTransactionManager platformTransactionManager;

    @Override
    @Async("executorBeanName")
    public CompletableFuture<Long> newInsertAsyncChick(Chick chick,
                                                       CountDownLatch countDownLatch,
                                                       AtomicBoolean successFlag) {
        System.out.println("当前线程号 : " + Thread.currentThread().getName());
        // 创建事务
        TransactionDefinition transactionDefinition = new DefaultTransactionDefinition();
        TransactionStatus transactionStatus = platformTransactionManager.getTransaction(transactionDefinition);
        Long chickId = -1L;
        try {
            chickId = chickMapper.addChick(chick);
            // 故意写个报错代码,测试回滚情况
            if ("c".equals(chick.getName())) {
                int i = 1 / 0;
            }
        } catch (Exception e) {
            System.out.println("报错了,报错的线程 : " + Thread.currentThread().getName());
            successFlag.set(Boolean.FALSE);
//            log.error("error {}", e);
        }
//        // 测试CountDownLatch是否生效
//        if ("c".equals(chick.getName())) {
//            try {
//                Thread.sleep(10000);
//            } catch (InterruptedException e) {
//                e.printStackTrace();
//            }
//        }
        countDownLatch.countDown();
        try {
            System.out.println(Thread.currentThread().getName() + "开始await : " + new Date());
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + "结束await : " + new Date());
        // 最后提交/回滚
        if(Boolean.TRUE.equals(successFlag.get())){
            platformTransactionManager.commit(transactionStatus);
        }
        else {
            platformTransactionManager.rollback(transactionStatus);
        }
        return CompletableFuture.completedFuture(chickId);
    }
}

https://www.xamrdz.com/mobile/4jy1995206.html

相关文章: