我尝试使用Spring WebFlux + Redis创建了一个实时聊天应用
通过使用 WebFlux 可以使用 Server-Sent-Events,在结合 Redis 之后,我尝试创建了类似实时聊天的东西。
环境
-
- Spring Boot 2.2.0.RC1
-
- (Spring Framework 5.2.0.RELEASE)
-
- (Spring Data Redis 2.2.0.RELEASE)
- Redis 5.0.5
要执行Server-Sent-Events,需要执行以下操作。
似乎只需要将Flux设置为body即可。
public class HelloHandler {
public RouterFunction<ServerResponse> routes() {
return RouterFunctions.route()
.GET("/sse", this::sse)
.build();
}
public Mono<ServerResponse> sse(ServerRequest request) {
return ServerResponse.ok()
.body(Flux.interval(Duration.ofMillis(1000)).take(10)
.map(l -> ServerSentEvent.builder(l).event("sse").build()), ServerSentEvent.class);
}
}
@SpringBootApplication
public class HelloWebfluxApplication {
public static void main(String[] args) {
SpringApplication.run(HelloWebfluxApplication.class, args);
}
@Bean
public RouterFunction<ServerResponse> routes() {
return new HelloHandler().routes();
}
}
当使用curl命令访问localhost:8080\sse时,可以确认数据以固定间隔获取的情况。

使用Redis的发布/订阅功能
如果能够在Flux中动态添加数据,它可能会变得像聊天一样。虽然我觉得可以自己实现,但Redis有支持响应式的API,所以可以尝试使用这个功能来实现。不仅仅使用Redis客户端lettuce,还可以使用Spring Data Redis Reactive。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>
聊天消息可通过以下类来表示。
public class Message {
private String name;
private String text;
@JsonCreator
public Message(@JsonProperty("name") String name, @JsonProperty("text") String text) {
this.name = name;
this.text = text;
}
public String getName() {
return name;
}
public String getText() {
return text;
}
}
为了使用Jackson在JSON中传输数据,进行以下配置。
@SpringBootApplication
public class HelloWebfluxApplication {
public static void main(String[] args) {
SpringApplication.run(HelloWebfluxApplication.class, args);
}
@Bean
public RouterFunction<ServerResponse> routes(ReactiveRedisConnectionFactory factory) {
return new RedisHandler(reactiveRedisTemplate(factory)).route();
}
@Bean
public ReactiveRedisTemplate<String, Message> reactiveRedisTemplate(ReactiveRedisConnectionFactory factory) {
StringRedisSerializer keySerializer = new StringRedisSerializer();
Jackson2JsonRedisSerializer<Message> valueSerializer = new Jackson2JsonRedisSerializer<>(Message.class);
RedisSerializationContext.RedisSerializationContextBuilder<String, Message> builder =
RedisSerializationContext.newSerializationContext(keySerializer);
RedisSerializationContext<String, Message> context = builder.value(valueSerializer).build();
return new ReactiveRedisTemplate<>(factory, context);
}
}
请参考以下链接:https://www.baeldung.com/spring-data-redis-reactive
接下来,创建一个Handler。
public class RedisHandler {
private final ReactiveRedisTemplate<String, Message> template;
public RedisHandler(ReactiveRedisTemplate<String, Message> template) {
this.template = template;
}
public RouterFunction route() {
return RouterFunctions.route()
.GET("/redis/sse", this::sse)
.POST("/redis/post", this::post)
.build();
}
public Mono<ServerResponse> post(ServerRequest request) {
return request.bodyToMono(Message.class).flatMap(m ->
template.convertAndSend("messages", m).flatMap(l -> ServerResponse.ok().body(Mono.just("done"), String.class))
);
}
public Mono<ServerResponse> sse(ServerRequest request) {
return ServerResponse.ok()
.body(template.listenToChannel("messages").map(ReactiveSubscription.Message::getMessage)
.map(o -> ServerSentEvent.builder(o).event("messages").build()), ServerSentEvent.class);
}
}
可以使用 ReactiveRedisTemplate 类的 convertAndSend 方法进行发布,并可以使用 listenToChannel 方法进行订阅。
由于 listenToChannel 方法的返回值为 Flux<? extends ReactiveSubscription.Message<String, V>>,因此需要进行转换。
请参考以下链接:https://docs.spring.io/spring-data/redis/docs/2.2.0.RELEASE/reference/html/#redis:reactive:pubsub
在访问localhost:8080/redis/sse的情况下,当向localhost:8080/redis/post发送数据时,可以立即看到发送的数据被显示出来。

我想要保存数据。
Redis的Pub/Sub功能是用于交换消息的,而不是用于持久化数据。
目前的状态下,无法像IRC那样查看过去的消息。
所以,我想要将消息保存在Redis中。
我增加了一个订阅者,并尝试在收到消息时保存数据。
public class RedisHandler {
private final ReactiveRedisTemplate<String, Message> template;
public RedisHandler(ReactiveRedisTemplate<String, Message> template) {
this.template = template;
this.template.listenToChannel("messages")
.flatMap(o -> this.template.opsForList().rightPush("messages", o.getMessage()))
.subscribe();
}
// 省略
}
在这个班里做这个处理似乎不是必要的,但先试试实现吧。
如果忘记执行subscribe()方法,数据将不会被保存,所以要注意。
在返回SSE处理中,我们将合并保存的数据并返回。
public class RedisHandler {
// 省略
public Mono<ServerResponse> sse(ServerRequest request) {
Flux<Message> messages = this.template.opsForList().range("messages", 0, -1);
return ServerResponse.ok()
.body(messages.concatWith(template.listenToChannel("messages").map(ReactiveSubscription.Message::getMessage))
.map(o -> ServerSentEvent.builder(o).event("messages").build()), ServerSentEvent.class);
}
}
使用concatWith方法将多个Flux连接起来。这样一来,就可以获取保存的数据了。

无论数据保存是否失败,已连接的用户都会收到消息。
使用Redis Stream
我注意到 ReactiveRedisTemplate 中有一个名为 opsForStream 的方法,它指向了 Redis Stream 这个存在。
Redis Stream 和 Pub/Sub 不同,它似乎具有持久化数据的功能。
然而,Pub/Sub 是从服务器(Redis)端向客户端推送数据,而 Redis Stream 则需要客户端进行轮询。
此外,作为数据结构,它看起来像一个列表,但似乎可以像哈希一样保持多个字段。
虽然轮询很麻烦,但是有一个叫做 StreamReciever 的类可以很好地进行轮询,并在无限流中发出消息。
似乎没有 AutoConfigurer,所以需要自己定义 Bean。
不过,自Spring Data Redis 2.2.0起,已支持Redis Stream。
@SpringBootApplication
public class HelloWebfluxApplication {
public static void main(String[] args) {
SpringApplication.run(HelloWebfluxApplication.class, args);
}
@Bean
public RouterFunction<ServerResponse> routes(ReactiveRedisConnectionFactory factory) {
return new RedisHandler(reactiveRedisTemplate(factory), streamReceiver(factory)).route();
}
@Bean
public ReactiveRedisTemplate<String, Message> reactiveRedisTemplate(ReactiveRedisConnectionFactory factory) {
StringRedisSerializer keySerializer = new StringRedisSerializer();
Jackson2JsonRedisSerializer<Message> valueSerializer = new Jackson2JsonRedisSerializer<>(Message.class);
RedisSerializationContext.RedisSerializationContextBuilder<String, Message> builder =
RedisSerializationContext.newSerializationContext(keySerializer);
RedisSerializationContext<String, Message> context = builder.value(valueSerializer).build();
return new ReactiveRedisTemplate<>(factory, context);
}
@Bean
public StreamReceiver streamReceiver(ReactiveRedisConnectionFactory factory) {
return StreamReceiver.create(factory);
}
}
据说可以以以下方式使用。
public class RedisHandler {
private final ReactiveRedisTemplate<String, Message> template;
private final StreamReceiver<String, MapRecord<String, String, String>> receiver;
public RedisHandler(ReactiveRedisTemplate<String, Message> template, StreamReceiver streamReceiver) {
this.template = template;
this.receiver = streamReceiver;
}
public RouterFunction route() {
return RouterFunctions.route()
.GET("/stream", this::stream)
.POST("/stream", this::streamPost)
.build();
}
public Mono<ServerResponse> streamPost(ServerRequest request) {
return request.bodyToMono(Message.class).map(m -> {
Map<String, String> map = new HashMap<>();
map.put("name", m.getName());
map.put("text", m.getText());
return MapRecord.create("message-stream", map);
}).flatMap(o -> ServerResponse.ok().body(this.template.opsForStream().add(o), RecordId.class));
}
public Mono<ServerResponse> stream(ServerRequest request) {
return ServerResponse.ok().body(this.receiver.receive(StreamOffset.fromStart("message-stream"))
.map(e -> new Message(e.getValue().get("name"), e.getValue().get("text")))
.map(m -> ServerSentEvent.builder(m).event("stream").build()), ServerSentEvent.class);
}
}
听说有两种数据存储方式,一种是以Map形式保存数据的MapRecord<S, K, V>,另一种是以对象形式保存数据的ObjectRecord<S, V>,但是由于不了解后者的用法,只能一一进行Map和Message的转换处理。这样很麻烦。
数据的注册与以往一样,仍然使用 ReactiveRedisTemplate,但使用的方法已更改为 opsForStream().add()。
数据的获取使用 StreamReceiver 的 receive 方法。可以获取的数据是 Map<K,V> 类型,因此进行了转换为 Message 类型。
尝试使用这个功能,您发布的消息将通过SSE实时传送,并且即使断开连接后重新连接,也能够获取保存的消息。

最后
如果使用R2DBC,是否也可以在关系型数据库上做类似的事情呢?