事务注解@Transactional在多线程下的体现
使用默认的事务传播规则,测试代码如下
ChickServiceImpl.java 将列表插入分成n个线程并发操作
......
@Override
@Transactional(rollbackFor = Exception.class)
public Boolean batchConcurrentSave(List<Chick> chickList) {
// 分成n个循环调用异步函数
CompletableFuture[] completableFutureArr = new CompletableFuture[chickList.size()];
for (int i = 0; i < chickList.size(); i++) {
// 获取返回结果
CompletableFuture<Long> chickId = chickAsyncRepository.insertAsyncChick(chickList.get(i));
completableFutureArr[i] = chickId;
}
//join() 的作用:让“主线程”等待“子线程”结束之后才能继续运行
System.out.println("异步调用完,join前.......");
CompletableFuture.allOf(completableFutureArr).join();
System.out.println("异步调用完,join后.......");
return Boolean.TRUE;
}
......
ChickAsyncRepositoryImpl.insertAsyncChick 插入数据,在插入函数中故意写个报错的代码,测试数据库回滚的情况
@Component
public class ChickAsyncRepositoryImpl implements ChickAsyncRepository {
@Autowired
private ChickMapper chickMapper;
@Override
@Async("executorBeanName")
// @Transactional(rollbackFor = Exception.class)
public CompletableFuture<Long> insertAsyncChick(Chick chick) {
Long chickId = chickMapper.addChick(chick);
System.out.println("当前线程号 : "+Thread.currentThread().getName());
// 故意写个报错代码,测试回滚情况
if("c".equals(chick.getName())){
int i =1/0;
}
return CompletableFuture.completedFuture(chickId);
}
}
用以下数据进行测试,当插入第三条name=c的数据时会报错
[
{
"name":"a",
"weight":"10"
},
{
"name":"b",
"weight":"11"
},
{
"name":"c",
"weight":"12"
},
{
"name":"d",
"weight":"13"
}
]
- ChickAsyncRepositoryImpl.insertAsyncChick 函数上不加@Transactional注解
代码报错,但四条数据均插入成功,name为c的线程没有发生回滚,说明并发时没有继承到调用者的事务 - ChickAsyncRepositoryImpl.insertAsyncChick 函数上加@Transactional注解
代码报错,只有三条数据插入成功,name为c的数据回滚了,没有插入成功,说明并发时该函数没有继承到调用者的事务,自己各自创建了新事物
尽量保证多线程下的事务一致性
先尽量保证并发线程间的事务一致,流程图如下:
代码实现:
ChickServiceImpl.java 将列表插入分成n个线程并发操作
使用 AtomicBoolean 作为线程间的共享变量
使用 CountDownLatch 完成线程间的互相等待
......
@Override
public Boolean batchAtomicConcurrentSave(List<Chick> chickList) {
AtomicBoolean successFlag = new AtomicBoolean(Boolean.TRUE);
// 分成n个循环调用异步函数
CompletableFuture[] completableFutureArr = new CompletableFuture[chickList.size()];
CountDownLatch countDownLatch = new CountDownLatch(chickList.size());
for (int i = 0; i < chickList.size(); i++) {
// 获取返回结果
CompletableFuture<Long> chickId =
chickAsyncRepository.newInsertAsyncChick(chickList.get(i),
countDownLatch,
successFlag);
completableFutureArr[i] = chickId;
}
//join() 的作用:让“主线程”等待“子线程”结束之后才能继续运行
System.out.println("异步调用完,join前.......");
CompletableFuture.allOf(completableFutureArr).join();
System.out.println("异步调用完,join后.......");
return successFlag.get();
}
......
ChickAsyncRepositoryImpl.newInsertAsyncChick 插入数据,在插入函数中故意写个报错的代码,测试数据库回滚的情况
使用 PlatformTransactionManager 来手动控制事务、提交和回滚
/**
* @author Jenson
*/
@Component
@Slf4j
public class ChickAsyncRepositoryImpl implements ChickAsyncRepository {
@Autowired
private ChickMapper chickMapper;
@Autowired
private PlatformTransactionManager platformTransactionManager;
@Override
@Async("executorBeanName")
public CompletableFuture<Long> newInsertAsyncChick(Chick chick,
CountDownLatch countDownLatch,
AtomicBoolean successFlag) {
System.out.println("当前线程号 : " + Thread.currentThread().getName());
// 创建事务
TransactionDefinition transactionDefinition = new DefaultTransactionDefinition();
TransactionStatus transactionStatus = platformTransactionManager.getTransaction(transactionDefinition);
Long chickId = -1L;
try {
chickId = chickMapper.addChick(chick);
// 故意写个报错代码,测试回滚情况
if ("c".equals(chick.getName())) {
int i = 1 / 0;
}
} catch (Exception e) {
System.out.println("报错了,报错的线程 : " + Thread.currentThread().getName());
successFlag.set(Boolean.FALSE);
// log.error("error {}", e);
}
// // 测试CountDownLatch是否生效
// if ("c".equals(chick.getName())) {
// try {
// Thread.sleep(10000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// }
countDownLatch.countDown();
try {
System.out.println(Thread.currentThread().getName() + "开始await : " + new Date());
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "结束await : " + new Date());
// 最后提交/回滚
if(Boolean.TRUE.equals(successFlag.get())){
platformTransactionManager.commit(transactionStatus);
}
else {
platformTransactionManager.rollback(transactionStatus);
}
return CompletableFuture.completedFuture(chickId);
}
}