实现即时消息的方法有很多种比如websocket,sse; 而sse 又有spring mvc 实现的也有webflux 实现的。mvc实现的网上已经有很多了,而webflux 实现的不是很多,也不是很全,因此本文主要做的是webflux 实现的即时消息,sse 这里不多讲,如果有不理解的可以自行百度,谷歌。
maven 依赖在最下面
下面是最简单的实现也是应用场景最少的实现
@GetMapping(path = "/sse/{userId}",produces = MediaType.TEXT_EVENT_STREAM_VALUE )
public Flux<ServerSentEvent<String>> sse(@PathVariable String userId) {
// 每两秒推送一次
return Flux.interval(Duration.ofSeconds(2)).map(seq->
Tuples.of(seq, LocalDateTime.now())).log()
.map(data-> ServerSentEvent.<String>builder().id("1").data(data.getT2().toString()).build());
}
上面的适合股票之类的,周期性的消息。比如每两秒发送一次消息;这样的场景是合适的,但是如果是非周期性的消息呢?比如我需要再应用里发一个公告,这个公告是突然的,不确定的,那么这个逻辑就不合适了。
下面介绍非周期性消息
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.http.MediaType;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
/**
* @author haoran
*/
@RestController
@RequestMapping("/sse")
public class MessageController implements ApplicationListener {
private final SubscribableChannel subscribableChannel = MessageChannels.publishSubscribe().get();
@GetMapping(value = "/message",produces = MediaType.TEXT_EVENT_STREAM_VALUE )
public Flux<String> getMessage(){
return Flux.create(stringFluxSink -> {
MessageHandler messageHandler = message -> stringFluxSink.next(String.class.cast(message.getPayload()));
// 用户断开的时候取消订阅
stringFluxSink.onCancel(()->subscribableChannel.unsubscribe(messageHandler));
// 订阅消息
subscribableChannel.subscribe(messageHandler);
}, FluxSink.OverflowStrategy.LATEST);
}
@Override
public void onApplicationEvent(ApplicationEvent event) {
subscribableChannel.send(new GenericMessage<>(event.getSource()));
}
@PostMapping("/publish")
public void publish(@RequestParam String message){
subscribableChannel.send(new GenericMessage<>(message));
}
}
这里有个局限性 就是单服务的消息,那如果是多服务的集群消息怎么解决呢?
下面代码是使用redis 的发布订阅模式来实现webflux 的sse 集群
import indi.houhaoran.webflux.domian.MessageDTO;
import lombok.RequiredArgsConstructor;
import org.redisson.api.RedissonClient;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
/**
* @author haoran
*/
@RestController
@RequestMapping("/flux")
@RequiredArgsConstructor
public class FluxMessageController {
private final RedissonClient redissonClient;
public static final String USER_TOPIC = "user:";
public static final String BROADCAST_TOPIC = "broadcast_topic";
@GetMapping(path = "/connect/{userId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<MessageDTO> getFolderWatch(@PathVariable String userId) {
return Flux.create(sink -> {
// 订阅 广播
redissonClient.getTopic(BROADCAST_TOPIC).addListener(MessageDTO.class, (c, m) -> {
sink.next(m);
});
// 监听 用户主题 单个
redissonClient.getTopic(USER_TOPIC + userId).addListener(MessageDTO.class, (c, m) -> {
sink.next(m);
});
//加入监听如果断开链接就移除redis 的订阅
sink.onCancel(() -> {
// 断开移除
System.out.println("退出 userId:" + userId);
redissonClient.getTopic(USER_TOPIC + userId).removeAllListeners();
redissonClient.getTopic(BROADCAST_TOPIC).removeListener((Integer) redissonClient.getMap(BROADCAST_TOPIC).get(userId));
});
}, FluxSink.OverflowStrategy.LATEST);
}
@PostMapping("/publish")
public void publish(@RequestBody MessageDTO messageDTO) {
redissonClient.getTopic(BROADCAST_TOPIC).publish(messageDTO);
}
}
redisson 配置
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(connectionFactory);
redisTemplate.setConnectionFactory(connectionFactory);
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new StringRedisSerializer());
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
// 这个地方不可使用 json 序列化,否则会有问题,会出现一个 java.lang.IllegalArgumentException: Value must not be null! 错误
redisTemplate.setHashValueSerializer(new StringRedisSerializer());
return redisTemplate;
}
}
@Slf4j
@Configuration
public class RedissonConfigure {
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
SingleServerConfig singleServerConfig = config.useSingleServer();
singleServerConfig.setAddress("redis://localhost:6379");
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
objectMapper.registerModule(new JavaTimeModule());
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
config.setCodec(new JsonJacksonCodec(objectMapper));
return Redisson.create(config);
}
}
其他类
import java.io.Serializable;
/**
* @author haoran
*/
@Data
public class MessageDTO implements Serializable {
private String message;
}
调试:
由此可见当我从8080 服务发送消息,8080,8081两个服务都接收到消息了
maven 依赖
<parent>
<artifactId>webfluxdemo</artifactId>
<groupId>org.example</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>server</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
</dependency>
<dependency>
<groupId>de.ruedigermoeller</groupId>
<artifactId>fst</artifactId>
<version>2.57</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.17.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
父pom
<groupId>org.example</groupId>
<artifactId>webfluxdemo</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.4</version>
</parent>
<modules>
<module>client</module>
<module>server</module>
<module>RxJava</module>
</modules>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
</dependencyManagement>
<!-- ... -->
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
<pluginRepositories>
<pluginRepository>
<id>spring-snapshots</id>
<url>https://repo.spring.io/snapshot</url>
</pluginRepository>
<pluginRepository>
<id>spring-milestones</id>
<url>https://repo.spring.io/milestone</url>
</pluginRepository>
</pluginRepositories>
参考·1 Reactor 3 参考文档 (htmlpreview.github.io)
参考·2 https://www.lefer.cn/posts/30624/
结语:百度真垃圾,查了半天也没找到,终归要google;本文只是简单的实现了sse 在真实场景下会有很多不足,比如redis 加入订阅的是通过lamda 表达式实现的,这里最好有个实现类来实现订阅发送消息的业务。
题外话:webflux 如何实现响应式报表?