由于Spring Boot 2.2支持RSocket,因此我会尝试使用它
你好。由于Spring Boot 2.2支持RSocket,所以我试着使用了它。
RSocket是什么?
-
- マイクロサービス間の通信用に設計されたバイナリプロトコル。
-
- Reactive Socket。Reactive Streamsをサポート。
-
- TCP、WebSocket、Aeron、そしてHTTP/2 streamsをトランスポートプロトコルとして使用できる。
-
- 以下の4つのインタラクションモデルが使用できる。
request-response (1リクエスト、1レスポンス)
request-stream (1リクエスト、複数レスポンス)
fire-and-forget (1リクエスト、Noレスポンス
channel (複数リクエスト、複数レスポンス)
開発言語に依存しない。Java、JavaScriot、GO、.NET など
Spring 5.2、Spring Boot 2.2から正式サポート。
公式サイト : RSocketの公式サイト
与其他协议相比较
与REST相比
-
- RSocketは、RESTよりも通信上のオーバーヘッドが少ない。
- RESTで上記4つのインタラクションを実現したい場合、ポーリングなどの仕組みが必要。
与gRPC相比
- gRPCはブラウザとの通信がサポートされていない。ブラウザと通信したい場合、RESTに変換するプロキシが必要。
请参考下一个网站以获取更详细信息。
RSocketでRESTに安息(Rest)を
Differences between gRPC and RSocket
马上试一试
我已经大致了解了一下官方网站等,所以将创建一个简单的应用程序。
RSocket 是用于服务间通信的协议,所以我们将使用它。
-
- リクエストを送る側(rsocket-client)
- リクエストを受けてレスポンスを返す側(rsocket-backend)
我们需要两个东西。使用rsocket-client和rsocket-backend分别作为服务器,并且暂时进行服务器间的通信试验。应用程序的顺序图如下所示。

我想在浏览器中访问并查看结果,所以将rsocket-client应用程序作为具有HTTP端点的应用程序。
1至4是”请求-响应”的交互,5至9是”请求-流”的交互。
我希望9的响应能够随时返回,所以使用了SSE(服务器推送事件)。
项目结构
rsocket-client和rsocket-backend将作为各自的子项目进行创建。
为了避免在每个项目中分别定义请求和响应所使用的数据模型(DTO),我们还将创建名为rsocket-model的子项目来统一定义。
|- build.gradle
|- rsocket-model/
| |- build.gradle
| :
|- rsocket-client/
| |- build.gradle
| :
└ rsocket-backend/
|- build.gradle
:
共同使用的build.gradle如下:
将spring-boot-starter-rsocket和spring-boot-starter-webflux添加为依赖项。
buildscript {
repositories {
mavenCentral()
maven { url "https://plugins.gradle.org/m2/" }
}
dependencies {
classpath "org.springframework.boot:spring-boot-gradle-plugin:2.2.2.RELEASE"
classpath "io.spring.gradle:dependency-management-plugin:1.0.8.RELEASE"
}
}
allprojects {
repositories {
mavenCentral()
}
}
subprojects {
group = 'sandbox'
version = '0.0.1-SNAPSHOT'
apply plugin: "java"
apply plugin: "java-library"
apply plugin: "org.springframework.boot"
apply plugin: "io.spring.dependency-management"
sourceCompatibility = JavaVersion.VERSION_11
targetCompatibility = JavaVersion.VERSION_11
dependencyManagement {
dependencies {
dependency "org.springframework.boot:spring-boot-starter-rsocket:2.2.2.RELEASE"
dependency "org.springframework.boot:spring-boot-starter-webflux:2.2.2.RELEASE"
dependency "org.springframework.boot:spring-boot-devtools:2.2.2.RELEASE"
}
}
}
RSocket模型
rsocket-model的build.gradle文件。
project(":rsocket-model") {
dependencies {
compileOnly("org.projectlombok:lombok")
annotationProcessor("org.projectlombok:lombok")
}
bootJar {
enabled = false
}
jar {
enabled = true
}
}
请求DTO的定义。由于Java对象的编码和解码将使用Jackson,所以不要忘记创建默认构造函数。
package sandbox.rsocket;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@NoArgsConstructor
@AllArgsConstructor
@Data
public class RequestData {
String message;
}
通过定义DTO来响应。
package sandbox.rsocket;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@NoArgsConstructor
@AllArgsConstructor
@Data
public class ResponseData {
String message;
}
rsocket后端
rsocket-bankend的build.gradle文件。
project(":rsocket-backend") {
dependencies {
implementation project(':rsocket-model')
implementation 'org.springframework.boot:spring-boot-starter-rsocket'
implementation 'org.springframework.boot:spring-boot-starter-webflux'
implementation 'org.springframework.boot:spring-boot-devtools'
compileOnly("org.projectlombok:lombok")
annotationProcessor("org.projectlombok:lombok")
}
}
在application.yml中指定RSocket使用的端口号(7000)。
您还可以使用YAML指定传输协议。有关这些定义,请参阅Spring Boot文档。
8.2. RSocket服务器自动配置。
spring:
rsocket:
server:
port: 7000
# Remove commented out if enable RSocket over websocket. (Using tcp as default.)
# See the link for the details. https://docs.spring.io/spring-boot/docs/current-SNAPSHOT/reference/html/spring-boot-features.html#boot-features-rsocket-strategies-auto-configuration
# mapping-path: /rsocket
# transport: websocket
在RSocket服务器的控制器中定义。
使用@MessageMapping注解设置RSocket的端点名称。
getMono返回单个数据,因此将响应存储为Mono;getFlux返回多个数据(流),因此将响应存储为Flux。
package sandbox.rsocket;
import java.time.Duration;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Slf4j
@AllArgsConstructor
@Controller
public class RSocketServerController {
/**
* Get response data with mono.
*/
@MessageMapping("getMono")
public Mono<ResponseData> getMono(RequestData requestData) {
log.info("Calling getMono method. request={}", requestData);
return Mono.just(new ResponseData(requestData.getMessage()));
}
/**
* Get response data with flux.
* Responds one of the response data every seconds.
*/
@MessageMapping("getFlux")
public Flux<ResponseData> getFlux(RequestData requestData) {
log.info("Calling getFlux method. request={}", requestData);
final List<ResponseData> list =
IntStream.rangeClosed(1, 10)
.boxed()
.map(i -> new ResponseData(requestData.getMessage() + '_' + i))
.collect(Collectors.toList());
return Flux.fromIterable(list)
.delayElements(Duration.ofSeconds(1));
}
}
RSocket客户端
rsocket-client的build.gradle。
project(":rsocket-client") {
dependencies {
implementation project(':rsocket-model')
implementation 'org.springframework.boot:spring-boot-starter-rsocket'
implementation 'org.springframework.boot:spring-boot-starter-webflux'
implementation 'org.springframework.boot:spring-boot-devtools'
compileOnly("org.projectlombok:lombok")
annotationProcessor("org.projectlombok:lombok")
}
}
在application.yml文件中,可以指定HTTP端口号(8081)的端点。
顺便提一下,将客户端的日志级别设置为DEBUG,可以将RSocket帧信息输出到控制台上。
server:
port: 8081
# Remove commented out if u want to see RSocket frame on console log.
# logging:
# level:
# root: DEBUG
以下是在请求getMono时在控制台输出的DEBUG日志。看起来是个相似的日志!
2020-01-08 23:15:00.853 DEBUG 6776 --- [ctor-http-nio-2] o.s.http.codec.cbor.Jackson2CborEncoder : Encoding [RequestData(message=test)]
2020-01-08 23:15:00.879 DEBUG 6776 --- [actor-tcp-nio-1] io.rsocket.FrameLogger : sending ->
Frame => Stream ID: 1 Type: REQUEST_RESPONSE Flags: 0b100000000 Length: 36
Metadata:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| fe 00 00 08 07 67 65 74 4d 6f 6e 6f |.....getMono |
+--------+-------------------------------------------------+----------------+
Data:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| bf 67 6d 65 73 73 61 67 65 64 74 65 73 74 ff |.gmessagedtest. |
+--------+-------------------------------------------------+----------------+
2020-01-08 23:15:01.015 DEBUG 6776 --- [actor-tcp-nio-1] io.rsocket.FrameLogger : receiving ->
Frame => Stream ID: 1 Type: NEXT_COMPLETE Flags: 0b1100000 Length: 21
Data:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| bf 67 6d 65 73 73 61 67 65 64 74 65 73 74 ff |.gmessagedtest. |
+--------+-------------------------------------------------+----------------+
2020-01-08 23:15:01.023 DEBUG 6776 --- [actor-tcp-nio-1] o.s.http.codec.cbor.Jackson2CborDecoder : Decoded [ResponseData(message=test)]
2020-01-08 23:15:01.023 DEBUG 6776 --- [actor-tcp-nio-1] o.s.http.codec.json.Jackson2JsonEncoder : [e539e702] Encoding [ResponseData(message=test)]
为客户端定义 Configuration。设置 RSocketRequester 的 RSocket 服务器主机名(localhost)和端口号(7000)。由于提供了 Builder 类,可以在几行代码内完成。
package sandbox.rsocket;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.rsocket.RSocketRequester;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@AllArgsConstructor
@Configuration
public class ClientConfiguration {
private final RSocketRequester.Builder builder;
@Bean
public RSocketRequester rSocketRequester() {
return builder.connectTcp("localhost", 7000)
.doOnNext(socket -> log.info("Connected to RSocket."))
.block();
}
}
定义RSocket客户端的服务类。在这里设置RSocket服务器的端点名称。
package sandbox.rsocket;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.stereotype.Service;
import lombok.AllArgsConstructor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@AllArgsConstructor
@Service
public class RSocketClientService {
private final RSocketRequester rSocketRequester;
public Mono<ResponseData> getMono(RequestData data) {
return rSocketRequester.route("getMono")
.data(data)
.retrieveMono(ResponseData.class);
}
public Flux<ResponseData> getFlux(RequestData data) {
return rSocketRequester.route("getFlux")
.data(data)
.retrieveFlux(ResponseData.class);
}
}
RSocket客户端的HTTP端点定义。
package sandbox.rsocket;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import lombok.AllArgsConstructor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@AllArgsConstructor
@RestController
public class RSocketClientController {
private final RSocketClientService clientService;
/**
* Get response mono data.
*/
@GetMapping("/mono")
public Mono<ResponseData> mono(@RequestParam String message) {
return clientService.getMono(new RequestData(message));
}
/**
* Get response flux data with server sent events.
*/
@GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE, value = "/flux")
public Flux<ResponseData> flux(@RequestParam String message) {
return clientService.getFlux(new RequestData(message));
}
}
在本地运行rsocket-backend和rsocket-client,并访问HTTP端点。
以下是访问http://localhost:8081/flux时的结果。数据每秒都会被响应!

总结
-
- Spring BootとRSocketを使った簡単なアプリケーションを作成して、サーバ間の通信を試してみました。
-
- 非常に少ないコードで簡単に作成することはできましたが、ブラックボックスな部分が多く、実際使うときは詳細な理解が必要だな。と思いました。(小並感)
-
- あと、この記事には書いてありませんが、”RSocket over WebSocket”でブラウザとの通信も試してみました。Webアプリ開発者としては、RESTに変わる手段ができてとてもうれしい感があります。(これも小並感)
- 今回作成したソースコードはGitHub: RSocket sandbox に保存しています。参考にしていただければ幸いです。
请拿这个作为参考
-
- Tutorial: Reactive Spring Boot Part 9 – Java RSocket Client
- Youtube: Spring Tips: RSocket Messaging in Spring Boot 2.2