假设有A和B两个系统,分别可以处理任务A和任务B。此时系统A中存在一个业务流程,需要将任务A和任务B在同一个事务中处理。下面来介绍基于消息中间件来实现这种分布式事务。(服务间通信使用dubbo)
1)A任务若提交成功,则向事务队列(user:topic)中添加一条信息,若提交失败,则整个事务失败
2)向事务队列user:topic添加消息成功,则通知B系统,开始执行B任务。若向事务队列user:topic添加消息失败,则整个事务失败
3)B系统接收到通知消息时开始执行B任务,执行之前需要检查是否是重复通知,此处应该有幂等性。B任务若执行成功,则删除事务队列(user:topic)中相对应的数据。
1、WEB层,这里是测试的入口
@Controller
@RequestMapping(value = "/system")
public class UserController extends BaseController{
@Autowired
private UserService userService;
static int i = 0;
@RequestMapping("/test")
public void test(){
BaseSysUserPo sysUserPo = new BaseSysUserPo();
sysUserPo.setUserId(""+i++);
sysUserPo.setIsDelete("0");
sysUserPo.setAmount(new BigDecimal(1));
ResultBean resultBean1 = userService.addUser(sysUserPo);
System.out.println("==============="+resultBean1.getMessage());
}
}
2、SERVICE层,服务层,也就是上面所说的A系统,这里使用User模块来代替
@Autowired
private SysUserLogic sysUserLogic;
@Autowired
private RedisTemplate<String,String> redisTemplate;
/**
* 测试添加
* @param baseSysUserPo
* @return
*/
String channel = "user:topic";
public ResultBean addUser(BaseSysUserPo baseSysUserPo) {
ResultBean resultBean1 = new ResultBean();
BaseSysMenuPo menuPo = new BaseSysMenuPo();
menuPo.setMenuId(baseSysUserPo.getUserId());
menuPo.setIsDelete("0");
menuPo.setAmount(new BigDecimal(1));
try {
resultBean1 = sysUserLogic.add(baseSysUserPo);
if (resultBean1.isSuccess()) {
if (redisTemplate.opsForList().leftPush(channel, JSONObject.toJSONString(menuPo)) > 0) {//将消息放入保证队列,如果失败,则整个事务失败。
redisTemplate.convertAndSend(channel, JSONObject.toJSONString(menuPo));//通知消息到远程服务。
} else {
resultBean1.error("确认消息发送异常");
throw new RuntimeException("确认消息发送异常");//将消息放入保证队列,失败,整个事务失败。
}
}
} catch (LogicException e) {
e.printStackTrace();
return resultBean1;
}
return resultBean1;
}
3、SERVICE层,服务层,上面所说的B系统,这里使用Menu模块代替
package com.system.listener;
import com.alibaba.fastjson.JSONObject;
import com.fengyong.base.rely.ResultBean;
import com.fengyong.core.logic.LogicException;
import com.system.po.menu.base.BaseSysMenuPo;
import com.system.service.menu.logic.SysMenuLogic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
/**
* 描述:
*
* @author fengyong
* @date 2019-03-01.
*/
@Service
public class TopicMessageListener implements MessageListener {
private RedisTemplate<String,String> redisTemplate;
@Autowired
private SysMenuLogic sysMenuLogic;
public void setRedisTemplate(RedisTemplate<String, String> redisTemplate) {
this.redisTemplate = redisTemplate;
}
@Override
public void onMessage(Message message, byte[] pattern) {
byte[] body = message.getBody();//请使用valueSerializer
byte[] channel = message.getChannel();
//请参考配置文件,本例中key,value的序列化方式均为string。
//其中key必须为stringSerializer。和redisTemplate.convertAndSend对应
String itemValue = (String)redisTemplate.getValueSerializer().deserialize(body);
String topic = (String)redisTemplate.getStringSerializer().deserialize(channel);
System.out.println(itemValue+"------"+topic);
BaseSysMenuPo menuPo = JSONObject.parseObject(itemValue,BaseSysMenuPo.class);
ResultBean resultBean = null;
try {
//TODO 此处应该判断是否是重复通知
if(menuPo.getMenuId().equals("5"))
throw new RuntimeException("数据产生异常,回滚");
resultBean = sysMenuLogic.add(menuPo);
} catch (LogicException e) {
e.printStackTrace();
}
if(resultBean.isSuccess()){//事务成功,将保证队列的数据删掉。
redisTemplate.opsForList().remove(topic,0,itemValue);
}
}
}
4、Redis的消息订阅配置
<bean id="topicMessageListener" class="com.system.listener.TopicMessageListener">
<property name="redisTemplate" ref="redisTemplate"></property>
</bean>
<bean id="topicContainer" class="org.springframework.data.redis.listener.RedisMessageListenerContainer" destroy-method="destroy">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="taskExecutor">
<bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler">
<property name="poolSize" value="3"></property>
</bean>
</property>
<property name="messageListeners">
<map>
<entry key-ref="topicMessageListener">
<bean class="org.springframework.data.redis.listener.ChannelTopic">
<constructor-arg value="user:topic"/>
</bean>
</entry>
</map>
</property>
</bean>
5、利用ab压测,由于是本地压测,并发就只设置100,请求1000次
bogon:~ fengyong$ ab -n 1000 -c 100 http://localhost:8080/system/test.html
This is ApacheBench, Version 2.3 <$Revision: 1826891 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/
Benchmarking localhost (be patient)
Completed 100 requests
Completed 200 requests
Completed 300 requests
Completed 400 requests
Completed 500 requests
Completed 600 requests
Completed 700 requests
Completed 800 requests
Completed 900 requests
Completed 1000 requests
Finished 1000 requests
Server Software: Apache-Coyote/1.1
Server Hostname: localhost
Server Port: 8080
Document Path: /system/test.html
Document Length: 0 bytes
Concurrency Level: 100
Time taken for tests: 4.043 seconds
Complete requests: 1000
Failed requests: 0
Total transferred: 121000 bytes
HTML transferred: 0 bytes
Requests per second: 247.34 [#/sec] (mean)
Time per request: 404.309 [ms] (mean)
Time per request: 4.043 [ms] (mean, across all concurrent requests)
Transfer rate: 29.23 [Kbytes/sec] received
Connection Times (ms)
min mean[+/-sd] median max
Connect: 0 1 1.1 0 5
Processing: 23 386 69.9 369 677
Waiting: 17 386 70.0 369 677
Total: 23 387 69.9 370 680
Percentage of the requests served within a certain time (ms)
50% 370
66% 398
75% 411
80% 420
90% 460
95% 521
98% 597
99% 640
100% 680 (longest request)
bogon:~ fengyong$
6、查看数据库结果,可以很明显的看到,A系统的数据库(xdj)中的User表有1000条数据,B系统的数据库(xdj2)中的Menu表有999条数据,失败的是id等于5的一条数据,然后再看redis中user:topic队列里的数据,是一条id等于5的数据。
7、该条失败的数据,可以采用人工处理。由于本人在互联网金融行业工作,接过不少渠道,下游也有不少商户,由于网络或者上游某些原因,导致我们没有收到通知时,都是人工处理,绝对不会使用程序重发请求或者使用程序换渠道重发请求,因为如果某天夜里上游的网络通信故障,导致我们的请求批量重发,那造成的损失是以万为单位的计算的,血的教训。