由于Spring Boot 2.2支持RSocket,因此我会尝试使用它

你好。由于Spring Boot 2.2支持RSocket,所以我试着使用了它。

RSocket是什么?

    • マイクロサービス間の通信用に設計されたバイナリプロトコル。

 

    • Reactive Socket。Reactive Streamsをサポート。

 

    • TCP、WebSocket、Aeron、そしてHTTP/2 streamsをトランスポートプロトコルとして使用できる。

 

    • 以下の4つのインタラクションモデルが使用できる。

request-response (1リクエスト、1レスポンス)
request-stream (1リクエスト、複数レスポンス)
fire-and-forget (1リクエスト、Noレスポンス
channel (複数リクエスト、複数レスポンス)

開発言語に依存しない。Java、JavaScriot、GO、.NET など
Spring 5.2、Spring Boot 2.2から正式サポート。
公式サイト : RSocketの公式サイト

与其他协议相比较

与REST相比

    • RSocketは、RESTよりも通信上のオーバーヘッドが少ない。

 

    RESTで上記4つのインタラクションを実現したい場合、ポーリングなどの仕組みが必要。

与gRPC相比

    gRPCはブラウザとの通信がサポートされていない。ブラウザと通信したい場合、RESTに変換するプロキシが必要。

请参考下一个网站以获取更详细信息。

RSocketでRESTに安息(Rest)を
Differences between gRPC and RSocket

马上试一试

我已经大致了解了一下官方网站等,所以将创建一个简单的应用程序。
RSocket 是用于服务间通信的协议,所以我们将使用它。

    • リクエストを送る側(rsocket-client)

 

    リクエストを受けてレスポンスを返す側(rsocket-backend)

我们需要两个东西。使用rsocket-client和rsocket-backend分别作为服务器,并且暂时进行服务器间的通信试验。应用程序的顺序图如下所示。

スクリーンショット 2020-01-08 22.20.47.png

我想在浏览器中访问并查看结果,所以将rsocket-client应用程序作为具有HTTP端点的应用程序。
1至4是”请求-响应”的交互,5至9是”请求-流”的交互。
我希望9的响应能够随时返回,所以使用了SSE(服务器推送事件)。

项目结构

rsocket-client和rsocket-backend将作为各自的子项目进行创建。
为了避免在每个项目中分别定义请求和响应所使用的数据模型(DTO),我们还将创建名为rsocket-model的子项目来统一定义。

|- build.gradle
|- rsocket-model/
|  |- build.gradle
|  :
|- rsocket-client/
|  |- build.gradle
|  :
└  rsocket-backend/
   |- build.gradle
   :

共同使用的build.gradle如下:
将spring-boot-starter-rsocket和spring-boot-starter-webflux添加为依赖项。

buildscript {
    repositories {
        mavenCentral()
        maven { url "https://plugins.gradle.org/m2/" }
    }

    dependencies {
        classpath "org.springframework.boot:spring-boot-gradle-plugin:2.2.2.RELEASE"
        classpath "io.spring.gradle:dependency-management-plugin:1.0.8.RELEASE"
    }
}

allprojects {
    repositories {
        mavenCentral()
    }
}

subprojects {
    group = 'sandbox'
    version = '0.0.1-SNAPSHOT'

    apply plugin: "java"
    apply plugin: "java-library"
    apply plugin: "org.springframework.boot"
    apply plugin: "io.spring.dependency-management"

    sourceCompatibility = JavaVersion.VERSION_11
    targetCompatibility = JavaVersion.VERSION_11

    dependencyManagement {
        dependencies {
            dependency "org.springframework.boot:spring-boot-starter-rsocket:2.2.2.RELEASE"
            dependency "org.springframework.boot:spring-boot-starter-webflux:2.2.2.RELEASE"
            dependency "org.springframework.boot:spring-boot-devtools:2.2.2.RELEASE"
        }
    }
}

RSocket模型

rsocket-model的build.gradle文件。

project(":rsocket-model") {
    dependencies {
        compileOnly("org.projectlombok:lombok")
        annotationProcessor("org.projectlombok:lombok")
    }
    bootJar {
        enabled = false
    }
    jar {
        enabled = true
    }
}

请求DTO的定义。由于Java对象的编码和解码将使用Jackson,所以不要忘记创建默认构造函数。

package sandbox.rsocket;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@NoArgsConstructor
@AllArgsConstructor
@Data
public class RequestData {
    String message;
}

通过定义DTO来响应。

package sandbox.rsocket;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@NoArgsConstructor
@AllArgsConstructor
@Data
public class ResponseData {
    String message;
}

rsocket后端

rsocket-bankend的build.gradle文件。

project(":rsocket-backend") {
    dependencies {
        implementation project(':rsocket-model')
        implementation 'org.springframework.boot:spring-boot-starter-rsocket'
        implementation 'org.springframework.boot:spring-boot-starter-webflux'
        implementation 'org.springframework.boot:spring-boot-devtools'

        compileOnly("org.projectlombok:lombok")
        annotationProcessor("org.projectlombok:lombok")
    }
}

在application.yml中指定RSocket使用的端口号(7000)。
您还可以使用YAML指定传输协议。有关这些定义,请参阅Spring Boot文档。
8.2. RSocket服务器自动配置。

spring:
  rsocket:
    server:
      port: 7000
      # Remove commented out if enable RSocket over websocket. (Using tcp as default.)
      # See the link for the details. https://docs.spring.io/spring-boot/docs/current-SNAPSHOT/reference/html/spring-boot-features.html#boot-features-rsocket-strategies-auto-configuration
      # mapping-path: /rsocket
      # transport: websocket

在RSocket服务器的控制器中定义。
使用@MessageMapping注解设置RSocket的端点名称。
getMono返回单个数据,因此将响应存储为Mono;getFlux返回多个数据(流),因此将响应存储为Flux。

package sandbox.rsocket;

import java.time.Duration;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Slf4j
@AllArgsConstructor
@Controller
public class RSocketServerController {
    /**
     * Get response data with mono.
     */
    @MessageMapping("getMono")
    public Mono<ResponseData> getMono(RequestData requestData) {
        log.info("Calling getMono method. request={}", requestData);
        return Mono.just(new ResponseData(requestData.getMessage()));
    }

    /**
     * Get response data with flux.
     * Responds one of the response data every seconds.
     */
    @MessageMapping("getFlux")
    public Flux<ResponseData> getFlux(RequestData requestData) {
        log.info("Calling getFlux method. request={}", requestData);
        final List<ResponseData> list =
                IntStream.rangeClosed(1, 10)
                         .boxed()
                         .map(i -> new ResponseData(requestData.getMessage() + '_' + i))
                         .collect(Collectors.toList());
        return Flux.fromIterable(list)
                   .delayElements(Duration.ofSeconds(1));
    }
}

RSocket客户端

rsocket-client的build.gradle。

project(":rsocket-client") {
    dependencies {
        implementation project(':rsocket-model')
        implementation 'org.springframework.boot:spring-boot-starter-rsocket'
        implementation 'org.springframework.boot:spring-boot-starter-webflux'
        implementation 'org.springframework.boot:spring-boot-devtools'

        compileOnly("org.projectlombok:lombok")
        annotationProcessor("org.projectlombok:lombok")
    }
}

在application.yml文件中,可以指定HTTP端口号(8081)的端点。
顺便提一下,将客户端的日志级别设置为DEBUG,可以将RSocket帧信息输出到控制台上。

server:
  port: 8081
# Remove commented out if u want to see RSocket frame on console log.
# logging:
#  level:
#    root: DEBUG

以下是在请求getMono时在控制台输出的DEBUG日志。看起来是个相似的日志!

2020-01-08 23:15:00.853 DEBUG 6776 --- [ctor-http-nio-2] o.s.http.codec.cbor.Jackson2CborEncoder  : Encoding [RequestData(message=test)]
2020-01-08 23:15:00.879 DEBUG 6776 --- [actor-tcp-nio-1] io.rsocket.FrameLogger                   : sending -> 
Frame => Stream ID: 1 Type: REQUEST_RESPONSE Flags: 0b100000000 Length: 36
Metadata:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| fe 00 00 08 07 67 65 74 4d 6f 6e 6f             |.....getMono    |
+--------+-------------------------------------------------+----------------+
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| bf 67 6d 65 73 73 61 67 65 64 74 65 73 74 ff    |.gmessagedtest. |
+--------+-------------------------------------------------+----------------+
2020-01-08 23:15:01.015 DEBUG 6776 --- [actor-tcp-nio-1] io.rsocket.FrameLogger                   : receiving -> 
Frame => Stream ID: 1 Type: NEXT_COMPLETE Flags: 0b1100000 Length: 21
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| bf 67 6d 65 73 73 61 67 65 64 74 65 73 74 ff    |.gmessagedtest. |
+--------+-------------------------------------------------+----------------+
2020-01-08 23:15:01.023 DEBUG 6776 --- [actor-tcp-nio-1] o.s.http.codec.cbor.Jackson2CborDecoder  : Decoded [ResponseData(message=test)]
2020-01-08 23:15:01.023 DEBUG 6776 --- [actor-tcp-nio-1] o.s.http.codec.json.Jackson2JsonEncoder  : [e539e702] Encoding [ResponseData(message=test)]

为客户端定义 Configuration。设置 RSocketRequester 的 RSocket 服务器主机名(localhost)和端口号(7000)。由于提供了 Builder 类,可以在几行代码内完成。

package sandbox.rsocket;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.rsocket.RSocketRequester;

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@AllArgsConstructor
@Configuration
public class ClientConfiguration {
    private final RSocketRequester.Builder builder;

    @Bean
    public RSocketRequester rSocketRequester() {
        return builder.connectTcp("localhost", 7000)
                      .doOnNext(socket -> log.info("Connected to RSocket."))
                      .block();
    }
}

定义RSocket客户端的服务类。在这里设置RSocket服务器的端点名称。

package sandbox.rsocket;

import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.stereotype.Service;

import lombok.AllArgsConstructor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@AllArgsConstructor
@Service
public class RSocketClientService {
    private final RSocketRequester rSocketRequester;

    public Mono<ResponseData> getMono(RequestData data) {
        return rSocketRequester.route("getMono")
                               .data(data)
                               .retrieveMono(ResponseData.class);
    }

    public Flux<ResponseData> getFlux(RequestData data) {
        return rSocketRequester.route("getFlux")
                               .data(data)
                               .retrieveFlux(ResponseData.class);
    }
}

RSocket客户端的HTTP端点定义。

package sandbox.rsocket;

import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import lombok.AllArgsConstructor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@AllArgsConstructor
@RestController
public class RSocketClientController {
    private final RSocketClientService clientService;

    /**
     * Get response mono data.
     */
    @GetMapping("/mono")
    public Mono<ResponseData> mono(@RequestParam String message) {
        return clientService.getMono(new RequestData(message));
    }

    /**
     * Get response flux data with server sent events.
     */
    @GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE, value = "/flux")
    public Flux<ResponseData> flux(@RequestParam String message) {
        return clientService.getFlux(new RequestData(message));
    }
}

在本地运行rsocket-backend和rsocket-client,并访问HTTP端点。
以下是访问http://localhost:8081/flux时的结果。数据每秒都会被响应!

Flux_demo.gif

总结

    • Spring BootとRSocketを使った簡単なアプリケーションを作成して、サーバ間の通信を試してみました。

 

    • 非常に少ないコードで簡単に作成することはできましたが、ブラックボックスな部分が多く、実際使うときは詳細な理解が必要だな。と思いました。(小並感)

 

    • あと、この記事には書いてありませんが、”RSocket over WebSocket”でブラウザとの通信も試してみました。Webアプリ開発者としては、RESTに変わる手段ができてとてもうれしい感があります。(これも小並感)

 

    今回作成したソースコードはGitHub: RSocket sandbox に保存しています。参考にしていただければ幸いです。

请拿这个作为参考

    • Tutorial: Reactive Spring Boot Part 9 – Java RSocket Client

 

    Youtube: Spring Tips: RSocket Messaging in Spring Boot 2.2
广告
将在 10 秒后关闭
bannerAds