springboot+redis+sse+vue实现分布式消息发布/通知
- 一、需求说明
- 二、架构选择
- 三、代码实现
- 1. sse集成
- sse服务类代码
- 客户端链接控制器
- 前端实现(vue)
- 方法调用
- 2. redis实现订阅/发布
- 监听类
- redisConfig配置
- 消息发送
一、需求说明
需求是实现web端的小红点通知,因为后端是两台机子做负载,所以需要实现分布式消息订阅发布
这里没有用消息中间件(rabbitmq…)和websoket,因为相对项目来说,这俩个比较重,所以用了相对较轻的redis和sse,都是项目自带的
二、架构选择
- redis(分布式发布订阅)
- sse (SseEmitter)
三、代码实现
1. sse集成
sse服务类代码
这里会话的key值存储可以不用这么复杂,我当时想着连接成功后可以直接将返回的sseEmitter扔到redis里去实现分布式,但是不行,序列化后取出来是发不了消息的,原因可能是存到redis里就相当于直接把连接扔了,哈哈
package com.smartvillage.framework.sse.serve;
import cn.hutool.core.collection.CollectionUtil;
import com.smartvillage.common.core.redis.RedisCache;
import com.smartvillage.common.utils.spring.SpringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.MediaType;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.stream.Collectors;
/**
* @author wangyj
* @className SseEmitterServer
* @description 消息推送服务类
* @date 22/11/9
*/
public class SseEmitterServer {
private static final Logger log = LoggerFactory.getLogger(SseEmitterServer.class);
private static final String KEY_PREFIX = "SseEmitter_";
private static final String ONLINE_SESSION_COUNT = "OnlineSessionCount";
/**
* 当前连接数
*/
// private static AtomicInteger count = new AtomicInteger(0);
/**
* 使用map对象,便于根据userId来获取对应的SseEmitter,或者放redis里面
*/
private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();
/**
* 创建用户连接并返回 SseEmitter
*
* @param sessionId 用户ID
* @return SseEmitter
*/
public static SseEmitter connect(String sessionId) {
// 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutException
SseEmitter sseEmitter = new SseEmitter(0L);
// 注册回调
sseEmitter.onCompletion(completionCallBack(sessionId));
sseEmitter.onError(errorCallBack(sessionId));
sseEmitter.onTimeout(timeoutCallBack(sessionId));
// SpringUtils.getBean(RedisCache.class).setCacheObject(getCacheKey(sessionId), sseEmitter);
// 数量+1
SpringUtils.getBean(RedisCache.class).incr(ONLINE_SESSION_COUNT,1);
sseEmitterMap.put(getCacheKey(sessionId),sseEmitter);
log.info("创建新的sse连接,当前会话:{}", sessionId);
return sseEmitter;
}
/**
* 给指定用户发送信息 -- 单播
*/
public static void sendMsg(String userId, String message) {
sendMessage(getCacheKey(userId),message);
}
/**
* 给指定用户发送信息
*/
public static void sendMessage(String cacheKey, String message) {
if (sseEmitterMap.containsKey(cacheKey)) {
// if (SpringUtils.getBean(RedisCache.class).hasKey(cacheKey)) {
try {
// SseEmitter sseEmitter = SpringUtils.getBean(RedisCache.class).getCacheObject(cacheKey);
SseEmitter sseEmitter = sseEmitterMap.get(cacheKey);
sseEmitter.send(message,MediaType.APPLICATION_JSON);
log.info("用户[{}]推送成功:{}", cacheKey, message);
} catch (IOException e) {
log.error("用户[{}]推送异常:{}", cacheKey, e.getMessage());
removeUser(cacheKey);
}
}
}
/**
* 向多人发布消息 -- 组播
*
* @param groupId 开头标识
* @param message 消息内容
*/
public static void groupSendMessage(String groupId, String message) {
// Set<String> keys = SpringUtils.getBean(RedisCache.class).keys(KEY_PREFIX + groupId + "*");
Set<String> keys = sseEmitterMap.keySet().stream().filter(k -> k.startsWith(KEY_PREFIX + groupId)).collect(Collectors.toSet());
if(CollectionUtil.isNotEmpty(keys)){
batchSendMessage(message,keys);
}
}
/**
* 群发所有人 -- 广播
*/
public static void batchSendMessage(String message) {
// Set<String> keys = SpringUtils.getBean(RedisCache.class).keys(KEY_PREFIX + "*");
Set<String> keys = sseEmitterMap.keySet();
if(CollectionUtil.isNotEmpty(keys)){
batchSendMessage(message,keys);
}
}
/**
* 群发消息
*/
public static void batchSendMessage(String message, Set<String> keys) {
keys.forEach(key -> sendMessage(key, message));
}
/**
* 移除用户连接
*/
public static void removeUser(String cacheKey) {
// SpringUtils.getBean(RedisCache.class).deleteObject(cacheKey);
sseEmitterMap.remove(cacheKey);
// 数量-1
SpringUtils.getBean(RedisCache.class).decr(ONLINE_SESSION_COUNT,1);
log.info("移除用户:{}", cacheKey);
}
/**
* 获取当前连接信息
*/
public static List<String> getIds() {
Collection<String> keys = SpringUtils.getBean(RedisCache.class).keys(KEY_PREFIX);
return keys.stream().map(k -> k.replace(KEY_PREFIX, "")).collect(Collectors.toList());
}
/**
* 获取当前连接数量
*/
public static int getUserCount() {
return SpringUtils.getBean(RedisCache.class).getCacheObject(ONLINE_SESSION_COUNT);
}
private static Runnable completionCallBack(String userId) {
return () -> {
log.info("结束连接:{}", userId);
removeUser(getCacheKey(userId));
};
}
private static Runnable timeoutCallBack(String userId) {
return () -> {
log.info("连接超时:{}", userId);
removeUser(getCacheKey(userId));
};
}
private static Consumer<Throwable> errorCallBack(String userId) {
return throwable -> {
log.info("连接异常:{}", userId);
removeUser(getCacheKey(userId));
};
}
/**
* 设置cache key
*
* @param configKey 参数键
* @return 缓存键key
*/
public static String getCacheKey(String configKey){
return KEY_PREFIX + configKey;
}
}
客户端链接控制器
/**
* @author wangyj
* @className AiWarningSseController
* @description 警告消息订阅
* @date 22/11/10
*/
@RestController
@RequestMapping("/test")
public class SseController {
@Autowired
RedisCache redisCache;
/**
* 客户端链接
* @return
*/
@GetMapping("/connect")
public SseEmitter connect() {
return SseEmitterServer.connect("test-key");
}
/**
* 消息推送
* @return
*/
@PostMapping("/post")
public AjaxResult postMessage(String msg) {
// ... 业务逻辑
// 推送消息
SseEmitterServer.sendMsg("test-key", msg))
return AjaxResult.success("推送成功");
}
/**
* 链接关闭
* @return
*/
@GetMapping("/close")
public AjaxResult close() {
SseEmitterServer.removeUser("test-key");
return AjaxResult.success();
}
}
前端实现(vue)
这里使用了组件:vue-sse(自行安装哈)
方法调用
mounted() {
// 组件挂载时订阅
this.subscribeWarnMsg();
},
beforeDestroy() {
// 组件销毁时记得关链接释放资源
this.closeWarningMessage();
},
methods: {
//...
// 消息订阅
subscribeWarnMsg() {
this.$sse
.create({
// format: "json", // 注掉就能接受消息
polyfill: true,
forcePolyfill: true,
url: process.env.VUE_APP_BASE_API + "/test/connect",
withCredentials: true,
polyfillOptions: {
// 超时时间,调长点,要不频繁重连
heartbeatTimeout: 10 * 60 * 1000,
// 携带认证token
headers: {
Authorization: 'Bearer ' + getToken(),
},
},
})
.on("message", (msg) => {
console.log(msg)
})
.on("error", (err) =>
console.error("Failed to parse or lost connection:", err)
)
.connect()
.catch((err) => console.error("Failed make initial connection:", err));
},
// 关闭订阅
closeMessage() {
return request({
url: '/test/close',
method: 'get',
}
)
}
至此sse封装完成!单节点的项目就可以正常用了~
2. redis实现订阅/发布
监听类
/**
* @author wangyj
* @className TestListener
* @description redis listener
* @date 22/11/17
*/
@Component
public class TestListener{
private static final Logger log = LoggerFactory.getLogger(TestListener.class);
public void onMessage(String msg) {
log.info(msg);
JSONObject parseObject = JSON.parseObject(msg);
Long deptId = parseObject.getLong("deptId");
// 组播
SseEmitterServer.groupSendMessage("deptId:" + deptId, msg);
// 单播
SseEmitterServer.sendMsg("test-key", msg));
}
}
redisConfig配置
/**
* redis配置
*/
@Configuration
@EnableCaching
public class RedisConfig extends CachingConfigurerSupport{
// ... 其他序列化等配置
@Bean
// 这里要注入我们刚才写的监听者类
public MessageListenerAdapter TestListenerAdapter(TestListener receiver) {
// 这个"onMessage"要和监听者类里的方法名对应,因为是反射注入的,默认是"handleMessage"?可以看下源码
return new MessageListenerAdapter(receiver,"onMessage");
}
/*@Bean
public MessageListenerAdapter listenerAdapter1(TestListener1 receiver) {
return new MessageListenerAdapter(receiver,"onMessage");
}
@Bean
public MessageListenerAdapter listenerAdapter2(TestListener2 receiver) {
return new MessageListenerAdapter(receiver,"onMessage");
}*/
/**
* redis消息监听器容器
* 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
* 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
*/
@Bean
public RedisMessageListenerContainer(RedisConnectionFactory connectionFactory,
// 这个玩意可以后面跟多个哈,名字匹配自动注入的,MessageListenerAdapter aiWarningListenerAdapter,MessageListenerAdapter listenerAdapter1,MessageListenerAdapter listenerAdapter2,当然,要有对应名字的bean,看上面注释掉的代码
MessageListenerAdapter testListenerAdapter) {
RedisMessageListenerContainer = new RedisMessageListenerContainer();
.setConnectionFactory(connectionFactory);
//订阅了一个叫chat的通道
//.addMessageListener(listenerAdapter1, new PatternTopic("chat"));
.addMessageListener(aiWarningListenerAdapter, new PatternTopic(RedisChannel.AI_WARNING));
return;
}
}
消息发送
redisCache.convertAndSend(RedisChannel.AI_WARNING, warningLog);
结合上文SseController 里面消息推送代码:
public class SseController {
@Autowired
RedisCache redisCache;
/**
* 客户端链接
* @return
*/
@GetMapping("/connect")
public SseEmitter connect() {
return SseEmitterServer.connect("test-key");
}
/**
* 消息推送
* @return
*/
@PostMapping("/post")
public AjaxResult postMessage(String msg) {
// ... 业务逻辑
// 推送消息
//SseEmitterServer.sendMsg("test-key", msg));
// 先推到redis
redisCache.convertAndSend(RedisChannel.AI_WARNING, warningLog);
return AjaxResult.success("推送成功");
}
redisCache
@Component
public class RedisCache
{
@Autowired
public RedisTemplate redisTemplate;
// ...其他方法
/**
* 消息推送
* @param channel
* @param message
*/
public void convertAndSend(String channel,Object message){
redisTemplate.convertAndSend(channel,message);
}
}
完活~
还有一种监听者配置方法,参考:
@Component
public class TestListener implements MessageListener{
private static final Logger log = LoggerFactory.getLogger(TestListener.class);
@Override
public void onMessage(Message message, byte[] pattern) {
// 订阅的频道名称
String channel = new String(message.getChannel());
// 消息体
String msg = new String(message.getBody());
}
}
redisConfig
@Bean
RedisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory, TestListener testListener ) {
RedisMessageListenerContainer = new RedisMessageListenerContainer();
container.setConnectionFactory(redisConnectionFactory);
//订阅topic - subscribe
container.addMessageListener(testListener ,new ChannelTopic("testChannel"));
return;
}
注意:
- 多个实例在消费时,要注意消费时加锁,避免重复消费的情况
- nginx超时时长
- nginx iphash
- nginx配置
client_max_body_size 300m; #设置nginx能处理的最大请求主体大小。
client_body_buffer_size 128k; #请求主体的缓冲区大小。
proxy_connect_timeout 600;
proxy_read_timeout 600;
proxy_send_timeout 600;
proxy_buffer_size 64k;
proxy_buffers 4 32k;
proxy_busy_buffers_size 64k;
proxy_temp_file_write_size 64k;
location /apis {
rewrite ^.+apis/?(.*)$ / break;
include uwsgi_params;
proxy_pass http://192.168.5.127:8088/;
# 关键参数
proxy_buffering off;
}
注意:
- 要配置代理超时时间
- 不配置proxy_buffering off的话,会出现请求发出后,接口收到直接返回,无法保持长连接。
参考网上说明:proxy_buffering这个参数用来控制是否打开后端响应内容的缓冲区,如果这个设置为off,那么proxy_buffers和proxy_busy_buffers_size这两个指令将会失效
如有问题请不吝指正~