当前位置: 首页>后端>正文

springboot推送数据 springboot sse推送


springboot+redis+sse+vue实现分布式消息发布/通知

  • 一、需求说明
  • 二、架构选择
  • 三、代码实现
  • 1. sse集成
  • sse服务类代码
  • 客户端链接控制器
  • 前端实现(vue)
  • 方法调用
  • 2. redis实现订阅/发布
  • 监听类
  • redisConfig配置
  • 消息发送


一、需求说明

需求是实现web端的小红点通知,因为后端是两台机子做负载,所以需要实现分布式消息订阅发布

这里没有用消息中间件(rabbitmq…)和websoket,因为相对项目来说,这俩个比较重,所以用了相对较轻的redis和sse,都是项目自带的

二、架构选择

  1. redis(分布式发布订阅)
  2. sse (SseEmitter)

三、代码实现

1. sse集成

sse服务类代码

这里会话的key值存储可以不用这么复杂,我当时想着连接成功后可以直接将返回的sseEmitter扔到redis里去实现分布式,但是不行,序列化后取出来是发不了消息的,原因可能是存到redis里就相当于直接把连接扔了,哈哈

package com.smartvillage.framework.sse.serve;

import cn.hutool.core.collection.CollectionUtil;
import com.smartvillage.common.core.redis.RedisCache;
import com.smartvillage.common.utils.spring.SpringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.MediaType;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.stream.Collectors;

/**
 * @author wangyj
 * @className SseEmitterServer
 * @description 消息推送服务类
 * @date 22/11/9
 */

public class SseEmitterServer {

    private static final Logger log = LoggerFactory.getLogger(SseEmitterServer.class);

    private static final String KEY_PREFIX = "SseEmitter_";
    private static final String ONLINE_SESSION_COUNT = "OnlineSessionCount";

    /**
     * 当前连接数
     */
    // private static AtomicInteger count = new AtomicInteger(0);

    /**
     * 使用map对象,便于根据userId来获取对应的SseEmitter,或者放redis里面
     */
    private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();

    /**
     * 创建用户连接并返回 SseEmitter
     *
     * @param sessionId 用户ID
     * @return SseEmitter
     */
    public static SseEmitter connect(String sessionId) {
        // 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutException
        SseEmitter sseEmitter = new SseEmitter(0L);
        // 注册回调
        sseEmitter.onCompletion(completionCallBack(sessionId));
        sseEmitter.onError(errorCallBack(sessionId));
        sseEmitter.onTimeout(timeoutCallBack(sessionId));
        // SpringUtils.getBean(RedisCache.class).setCacheObject(getCacheKey(sessionId), sseEmitter);

        // 数量+1
        SpringUtils.getBean(RedisCache.class).incr(ONLINE_SESSION_COUNT,1);
        sseEmitterMap.put(getCacheKey(sessionId),sseEmitter);

        log.info("创建新的sse连接,当前会话:{}", sessionId);
        return sseEmitter;
    }

    /**
     * 给指定用户发送信息  -- 单播
     */
    public static void sendMsg(String userId, String message) {
        sendMessage(getCacheKey(userId),message);
    }

    /**
     * 给指定用户发送信息
     */
    public static void sendMessage(String cacheKey, String message) {
        if (sseEmitterMap.containsKey(cacheKey)) {
        // if (SpringUtils.getBean(RedisCache.class).hasKey(cacheKey)) {
            try {
                // SseEmitter sseEmitter = SpringUtils.getBean(RedisCache.class).getCacheObject(cacheKey);
                SseEmitter sseEmitter = sseEmitterMap.get(cacheKey);

                sseEmitter.send(message,MediaType.APPLICATION_JSON);
                log.info("用户[{}]推送成功:{}", cacheKey, message);

            } catch (IOException e) {
                log.error("用户[{}]推送异常:{}", cacheKey, e.getMessage());
                removeUser(cacheKey);
            }
        }
    }

    /**
     * 向多人发布消息   -- 组播
     *
     * @param groupId 开头标识
     * @param message 消息内容
     */
    public static void groupSendMessage(String groupId, String message) {
        // Set<String> keys = SpringUtils.getBean(RedisCache.class).keys(KEY_PREFIX + groupId + "*");
        Set<String> keys = sseEmitterMap.keySet().stream().filter(k -> k.startsWith(KEY_PREFIX + groupId)).collect(Collectors.toSet());
        if(CollectionUtil.isNotEmpty(keys)){
            batchSendMessage(message,keys);
        }
    }

    /**
     * 群发所有人   -- 广播
     */
    public static void batchSendMessage(String message) {
        // Set<String> keys = SpringUtils.getBean(RedisCache.class).keys(KEY_PREFIX + "*");
        Set<String> keys = sseEmitterMap.keySet();
        if(CollectionUtil.isNotEmpty(keys)){
            batchSendMessage(message,keys);
        }
    }

    /**
     * 群发消息
     */
    public static void batchSendMessage(String message, Set<String> keys) {
        keys.forEach(key -> sendMessage(key, message));
    }

    /**
     * 移除用户连接
     */
    public static void removeUser(String cacheKey) {
        // SpringUtils.getBean(RedisCache.class).deleteObject(cacheKey);
        sseEmitterMap.remove(cacheKey);
        // 数量-1
        SpringUtils.getBean(RedisCache.class).decr(ONLINE_SESSION_COUNT,1);

        log.info("移除用户:{}", cacheKey);
    }

    /**
     * 获取当前连接信息
     */
    public static List<String> getIds() {
        Collection<String> keys = SpringUtils.getBean(RedisCache.class).keys(KEY_PREFIX);
        return keys.stream().map(k -> k.replace(KEY_PREFIX, "")).collect(Collectors.toList());
    }

    /**
     * 获取当前连接数量
     */
    public static int getUserCount() {
        return SpringUtils.getBean(RedisCache.class).getCacheObject(ONLINE_SESSION_COUNT);

    }

    private static Runnable completionCallBack(String userId) {
        return () -> {
            log.info("结束连接:{}", userId);
            removeUser(getCacheKey(userId));
        };
    }

    private static Runnable timeoutCallBack(String userId) {
        return () -> {
            log.info("连接超时:{}", userId);
            removeUser(getCacheKey(userId));
        };
    }

    private static Consumer<Throwable> errorCallBack(String userId) {
        return throwable -> {
            log.info("连接异常:{}", userId);
            removeUser(getCacheKey(userId));
        };
    }

    /**
     * 设置cache key
     *
     * @param configKey 参数键
     * @return 缓存键key
     */
    public static String getCacheKey(String configKey){
        return KEY_PREFIX + configKey;
    }
}

客户端链接控制器

/**
 * @author wangyj
 * @className AiWarningSseController
 * @description 警告消息订阅
 * @date 22/11/10
 */
@RestController
@RequestMapping("/test")
public class SseController {

    @Autowired
    RedisCache redisCache;

	/**
     * 客户端链接
     * @return
     */
    @GetMapping("/connect")
    public SseEmitter connect() {
        return SseEmitterServer.connect("test-key");
    }

	/**
     * 消息推送
     * @return
     */
    @PostMapping("/post")
    public AjaxResult postMessage(String msg) {
        // ... 业务逻辑
        
        // 推送消息
        SseEmitterServer.sendMsg("test-key", msg))

        return AjaxResult.success("推送成功");

    }
	
	/**
     * 链接关闭
     * @return
     */
    @GetMapping("/close")
    public AjaxResult close() {
        SseEmitterServer.removeUser("test-key");
        return AjaxResult.success();
    }

}

前端实现(vue)

这里使用了组件:vue-sse(自行安装哈)

方法调用

mounted() {
    // 组件挂载时订阅
    this.subscribeWarnMsg();
  },
  beforeDestroy() {
    // 组件销毁时记得关链接释放资源
    this.closeWarningMessage();
  },

methods: {
    //...
    // 消息订阅
    subscribeWarnMsg() {
      this.$sse
        .create({
          // format: "json", // 注掉就能接受消息
          polyfill: true,
          forcePolyfill: true,
          url: process.env.VUE_APP_BASE_API + "/test/connect",
          withCredentials: true,
          polyfillOptions: {
            // 超时时间,调长点,要不频繁重连
            heartbeatTimeout: 10 * 60 * 1000,
            // 携带认证token
            headers: {
              Authorization: 'Bearer ' + getToken(),
            },
          },
        })
        .on("message", (msg) => {
			console.log(msg)
        
        })
        .on("error", (err) =>
          console.error("Failed to parse or lost connection:", err)
        )
        .connect()
        .catch((err) => console.error("Failed make initial connection:", err));

    },
    // 关闭订阅
    closeMessage() {
	  return request({
	    url: '/test/close',
	    method: 'get',
	   }
	)
}

至此sse封装完成!单节点的项目就可以正常用了~

2. redis实现订阅/发布

监听类

/**
 * @author wangyj
 * @className TestListener
 * @description redis listener
 * @date 22/11/17
 */
@Component
public class TestListener{

    private static final Logger log = LoggerFactory.getLogger(TestListener.class);
    
    public void onMessage(String msg) {
        log.info(msg);

        JSONObject parseObject = JSON.parseObject(msg);
        Long deptId = parseObject.getLong("deptId");
        // 组播
        SseEmitterServer.groupSendMessage("deptId:" + deptId, msg);

        // 单播
        SseEmitterServer.sendMsg("test-key", msg));

    }

}

redisConfig配置

/**
 * redis配置
 */
@Configuration
@EnableCaching
public class RedisConfig extends CachingConfigurerSupport{

    // ... 其他序列化等配置
    
    @Bean
    // 这里要注入我们刚才写的监听者类
    public MessageListenerAdapter TestListenerAdapter(TestListener receiver) {
        // 这个"onMessage"要和监听者类里的方法名对应,因为是反射注入的,默认是"handleMessage"?可以看下源码
        return new MessageListenerAdapter(receiver,"onMessage");
    }

	/*@Bean
    public MessageListenerAdapter listenerAdapter1(TestListener1 receiver) {
        return new MessageListenerAdapter(receiver,"onMessage");
    }
	@Bean
    public MessageListenerAdapter listenerAdapter2(TestListener2 receiver) {
        return new MessageListenerAdapter(receiver,"onMessage");
    }*/

    /**
     * redis消息监听器容器
     * 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
     * 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
  	 */
    @Bean
    public RedisMessageListenerContainer(RedisConnectionFactory connectionFactory,
    // 这个玩意可以后面跟多个哈,名字匹配自动注入的,MessageListenerAdapter aiWarningListenerAdapter,MessageListenerAdapter listenerAdapter1,MessageListenerAdapter listenerAdapter2,当然,要有对应名字的bean,看上面注释掉的代码
	MessageListenerAdapter testListenerAdapter) {
	
        RedisMessageListenerContainer = new RedisMessageListenerContainer();
       .setConnectionFactory(connectionFactory);
        //订阅了一个叫chat的通道
        //.addMessageListener(listenerAdapter1, new PatternTopic("chat"));
       .addMessageListener(aiWarningListenerAdapter, new PatternTopic(RedisChannel.AI_WARNING));
        return;
    }
}

消息发送

redisCache.convertAndSend(RedisChannel.AI_WARNING, warningLog);

结合上文SseController 里面消息推送代码:

public class SseController {

    @Autowired
    RedisCache redisCache;

	/**
     * 客户端链接
     * @return
     */
    @GetMapping("/connect")
    public SseEmitter connect() {
        return SseEmitterServer.connect("test-key");
    }

	/**
     * 消息推送
     * @return
     */
    @PostMapping("/post")
    public AjaxResult postMessage(String msg) {
        // ... 业务逻辑
        
        // 推送消息
        //SseEmitterServer.sendMsg("test-key", msg));
		// 先推到redis
		redisCache.convertAndSend(RedisChannel.AI_WARNING, warningLog);

        return AjaxResult.success("推送成功");

    }

redisCache

@Component
public class RedisCache
{
    @Autowired
    public RedisTemplate redisTemplate;

	// ...其他方法
	
    /**
     * 消息推送
     * @param channel
     * @param message
     */
    public void convertAndSend(String channel,Object message){
        redisTemplate.convertAndSend(channel,message);
    }
}

完活~

还有一种监听者配置方法,参考:

@Component
public class TestListener implements MessageListener{

    private static final Logger log = LoggerFactory.getLogger(TestListener.class);
    
    @Override
    public void onMessage(Message message, byte[] pattern) {
        // 订阅的频道名称
        String channel = new String(message.getChannel());
        // 消息体
        String msg = new String(message.getBody());
    }

}

redisConfig

@Bean
RedisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory, TestListener testListener ) {
	RedisMessageListenerContainer = new RedisMessageListenerContainer();
	container.setConnectionFactory(redisConnectionFactory);
	//订阅topic - subscribe
	container.addMessageListener(testListener ,new ChannelTopic("testChannel"));
	return;
}

注意:

  1. 多个实例在消费时,要注意消费时加锁,避免重复消费的情况
  2. nginx超时时长
  3. nginx iphash
  4. nginx配置
client_max_body_size 300m;     #设置nginx能处理的最大请求主体大小。
client_body_buffer_size 128k;  #请求主体的缓冲区大小。 
proxy_connect_timeout 600;
proxy_read_timeout 600;
proxy_send_timeout 600;
proxy_buffer_size 64k;
proxy_buffers   4 32k;
proxy_busy_buffers_size 64k;
proxy_temp_file_write_size 64k;

location /apis {
	rewrite ^.+apis/?(.*)$ / break;
	include uwsgi_params;
	proxy_pass http://192.168.5.127:8088/;
	# 关键参数
	proxy_buffering off;
}

注意:

  1. 要配置代理超时时间
  2. 不配置proxy_buffering off的话,会出现请求发出后,接口收到直接返回,无法保持长连接。
    参考网上说明:proxy_buffering这个参数用来控制是否打开后端响应内容的缓冲区,如果这个设置为off,那么proxy_buffers和proxy_busy_buffers_size这两个指令将会失效

如有问题请不吝指正~



https://www.xamrdz.com/backend/3er1922053.html

相关文章: