我尝试使用Spring WebFlux + Redis创建了一个实时聊天应用

通过使用 WebFlux 可以使用 Server-Sent-Events,在结合 Redis 之后,我尝试创建了类似实时聊天的东西。

环境

    • Spring Boot 2.2.0.RC1

 

    • (Spring Framework 5.2.0.RELEASE)

 

    • (Spring Data Redis 2.2.0.RELEASE)

 

    Redis 5.0.5

要执行Server-Sent-Events,需要执行以下操作。

似乎只需要将Flux设置为body即可。

public class HelloHandler {

    public RouterFunction<ServerResponse> routes() {
        return RouterFunctions.route()
                .GET("/sse", this::sse)
                .build();
    }

    public Mono<ServerResponse> sse(ServerRequest request) {
        return ServerResponse.ok()
                .body(Flux.interval(Duration.ofMillis(1000)).take(10)
                        .map(l -> ServerSentEvent.builder(l).event("sse").build()), ServerSentEvent.class);
    }
}
@SpringBootApplication
public class HelloWebfluxApplication {

    public static void main(String[] args) {
        SpringApplication.run(HelloWebfluxApplication.class, args);
    }

    @Bean
    public RouterFunction<ServerResponse> routes() {
        return new HelloHandler().routes();
    }
}

当使用curl命令访问localhost:8080\sse时,可以确认数据以固定间隔获取的情况。

sse.gif

使用Redis的发布/订阅功能

如果能够在Flux中动态添加数据,它可能会变得像聊天一样。虽然我觉得可以自己实现,但Redis有支持响应式的API,所以可以尝试使用这个功能来实现。不仅仅使用Redis客户端lettuce,还可以使用Spring Data Redis Reactive。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>

聊天消息可通过以下类来表示。

public class Message {

    private String name;
    private String text;

    @JsonCreator
    public Message(@JsonProperty("name") String name, @JsonProperty("text") String text) {
        this.name = name;
        this.text = text;
    }

    public String getName() {
        return name;
    }

    public String getText() {
        return text;
    }

}

为了使用Jackson在JSON中传输数据,进行以下配置。

@SpringBootApplication
public class HelloWebfluxApplication {

    public static void main(String[] args) {
        SpringApplication.run(HelloWebfluxApplication.class, args);
    }

    @Bean
    public RouterFunction<ServerResponse> routes(ReactiveRedisConnectionFactory factory) {
        return new RedisHandler(reactiveRedisTemplate(factory)).route();
    }

    @Bean
    public ReactiveRedisTemplate<String, Message> reactiveRedisTemplate(ReactiveRedisConnectionFactory factory) {
        StringRedisSerializer keySerializer = new StringRedisSerializer();
        Jackson2JsonRedisSerializer<Message> valueSerializer = new Jackson2JsonRedisSerializer<>(Message.class);
        RedisSerializationContext.RedisSerializationContextBuilder<String, Message> builder =
                RedisSerializationContext.newSerializationContext(keySerializer);
        RedisSerializationContext<String, Message> context = builder.value(valueSerializer).build();
        return new ReactiveRedisTemplate<>(factory, context);
    }
}

请参考以下链接:https://www.baeldung.com/spring-data-redis-reactive

接下来,创建一个Handler。

public class RedisHandler {
    private final ReactiveRedisTemplate<String, Message> template;

    public RedisHandler(ReactiveRedisTemplate<String, Message> template) {
        this.template = template;
    }

    public RouterFunction route() {
        return RouterFunctions.route()
                .GET("/redis/sse", this::sse)
                .POST("/redis/post", this::post)
                .build();
    }

    public Mono<ServerResponse> post(ServerRequest request) {
        return request.bodyToMono(Message.class).flatMap(m ->
                template.convertAndSend("messages", m).flatMap(l -> ServerResponse.ok().body(Mono.just("done"), String.class))
        );
    }

    public Mono<ServerResponse> sse(ServerRequest request) {
        return ServerResponse.ok()
                .body(template.listenToChannel("messages").map(ReactiveSubscription.Message::getMessage)
                        .map(o -> ServerSentEvent.builder(o).event("messages").build()), ServerSentEvent.class);
    }
}

可以使用 ReactiveRedisTemplate 类的 convertAndSend 方法进行发布,并可以使用 listenToChannel 方法进行订阅。
由于 listenToChannel 方法的返回值为 Flux<? extends ReactiveSubscription.Message<String, V>>,因此需要进行转换。

请参考以下链接:https://docs.spring.io/spring-data/redis/docs/2.2.0.RELEASE/reference/html/#redis:reactive:pubsub

在访问localhost:8080/redis/sse的情况下,当向localhost:8080/redis/post发送数据时,可以立即看到发送的数据被显示出来。

pubsub.gif

我想要保存数据。

Redis的Pub/Sub功能是用于交换消息的,而不是用于持久化数据。
目前的状态下,无法像IRC那样查看过去的消息。
所以,我想要将消息保存在Redis中。

我增加了一个订阅者,并尝试在收到消息时保存数据。

public class RedisHandler {
    private final ReactiveRedisTemplate<String, Message> template;

    public RedisHandler(ReactiveRedisTemplate<String, Message> template) {
        this.template = template;
        this.template.listenToChannel("messages")
                .flatMap(o -> this.template.opsForList().rightPush("messages", o.getMessage()))
                .subscribe();
    }

    // 省略
}

在这个班里做这个处理似乎不是必要的,但先试试实现吧。
如果忘记执行subscribe()方法,数据将不会被保存,所以要注意。

在返回SSE处理中,我们将合并保存的数据并返回。

public class RedisHandler {
    // 省略

    public Mono<ServerResponse> sse(ServerRequest request) {
        Flux<Message> messages = this.template.opsForList().range("messages", 0, -1);
        return ServerResponse.ok()
                .body(messages.concatWith(template.listenToChannel("messages").map(ReactiveSubscription.Message::getMessage))
                        .map(o -> ServerSentEvent.builder(o).event("messages").build()), ServerSentEvent.class);
    }
}

使用concatWith方法将多个Flux连接起来。这样一来,就可以获取保存的数据了。

persistence.gif

无论数据保存是否失败,已连接的用户都会收到消息。

使用Redis Stream

我注意到 ReactiveRedisTemplate 中有一个名为 opsForStream 的方法,它指向了 Redis Stream 这个存在。

Redis Stream 和 Pub/Sub 不同,它似乎具有持久化数据的功能。
然而,Pub/Sub 是从服务器(Redis)端向客户端推送数据,而 Redis Stream 则需要客户端进行轮询。

此外,作为数据结构,它看起来像一个列表,但似乎可以像哈希一样保持多个字段。

虽然轮询很麻烦,但是有一个叫做 StreamReciever 的类可以很好地进行轮询,并在无限流中发出消息。
似乎没有 AutoConfigurer,所以需要自己定义 Bean。

不过,自Spring Data Redis 2.2.0起,已支持Redis Stream。

@SpringBootApplication
public class HelloWebfluxApplication {

    public static void main(String[] args) {
        SpringApplication.run(HelloWebfluxApplication.class, args);
    }

    @Bean
    public RouterFunction<ServerResponse> routes(ReactiveRedisConnectionFactory factory) {
        return new RedisHandler(reactiveRedisTemplate(factory), streamReceiver(factory)).route();
    }

    @Bean
    public ReactiveRedisTemplate<String, Message> reactiveRedisTemplate(ReactiveRedisConnectionFactory factory) {
        StringRedisSerializer keySerializer = new StringRedisSerializer();
        Jackson2JsonRedisSerializer<Message> valueSerializer = new Jackson2JsonRedisSerializer<>(Message.class);
        RedisSerializationContext.RedisSerializationContextBuilder<String, Message> builder =
                RedisSerializationContext.newSerializationContext(keySerializer);
        RedisSerializationContext<String, Message> context = builder.value(valueSerializer).build();
        return new ReactiveRedisTemplate<>(factory, context);
    }

    @Bean
    public StreamReceiver streamReceiver(ReactiveRedisConnectionFactory factory) {
        return StreamReceiver.create(factory);
    }

}

据说可以以以下方式使用。

public class RedisHandler {
    private final ReactiveRedisTemplate<String, Message> template;
    private final StreamReceiver<String, MapRecord<String, String, String>> receiver;

    public RedisHandler(ReactiveRedisTemplate<String, Message> template, StreamReceiver streamReceiver) {
        this.template = template;
        this.receiver = streamReceiver;
    }

    public RouterFunction route() {
        return RouterFunctions.route()
                .GET("/stream", this::stream)
                .POST("/stream", this::streamPost)
                .build();
    }

    public Mono<ServerResponse> streamPost(ServerRequest request) {
        return request.bodyToMono(Message.class).map(m -> {
            Map<String, String> map = new HashMap<>();
            map.put("name", m.getName());
            map.put("text", m.getText());
            return MapRecord.create("message-stream", map);
        }).flatMap(o -> ServerResponse.ok().body(this.template.opsForStream().add(o), RecordId.class));
    }

    public Mono<ServerResponse> stream(ServerRequest request) {
        return ServerResponse.ok().body(this.receiver.receive(StreamOffset.fromStart("message-stream"))
                .map(e -> new Message(e.getValue().get("name"), e.getValue().get("text")))
                .map(m -> ServerSentEvent.builder(m).event("stream").build()), ServerSentEvent.class);
    }
}

听说有两种数据存储方式,一种是以Map形式保存数据的MapRecord<S, K, V>,另一种是以对象形式保存数据的ObjectRecord<S, V>,但是由于不了解后者的用法,只能一一进行Map和Message的转换处理。这样很麻烦。

数据的注册与以往一样,仍然使用 ReactiveRedisTemplate,但使用的方法已更改为 opsForStream().add()。
数据的获取使用 StreamReceiver 的 receive 方法。可以获取的数据是 Map<K,V> 类型,因此进行了转换为 Message 类型。

尝试使用这个功能,您发布的消息将通过SSE实时传送,并且即使断开连接后重新连接,也能够获取保存的消息。

stream.gif

最后

如果使用R2DBC,是否也可以在关系型数据库上做类似的事情呢?

广告
将在 10 秒后关闭
bannerAds