使用Spring Boot WebFlux和Kotlin结合使用Spring-Cloud-Stream将消息写入到Kafka中

简介

使用Spring Boot的WebFlux,在Spring Cloud Stream中使用Kafka将消息写入的方法。

环境- 這個詞彙在中文中以较为直接的方式表達為「外界條件」或「周圍的狀況」。

    • 開発環境: Mac

 

    • IDE: IntelliJ

 

    言語: Kotlin

预先准备

在Mac上安装Kafka。

请按照以下非常简明的步骤在Mac上安装Kafka(Kafka是一种消息队列软件)。

操作步骤

项目创建

我将使用IntelliJ,从SPRING INITIALIZR中创建一个Spring Boot项目。

文件 -> 新建 -> 项目…

image.png

项目元数据

    Language: Kotlin
image.png

依赖关系

    • Spring Boot: 2.0.3

 

    • Web -> Reactive Web

 

    Cloud Messaging -> Reactive Cloud Stream
image.png

项目名称,地点

请随意填写姓名和位置以完成。

image.png

在项目正在创建中,请稍等一下。

当完成时,会变成这种感觉。

image.png

检查和修正Pom文件

请确认通过SPRING INITIALIZR创建的项目的pom文件的设置内容。

spring-cloud.version应该指定为Finchley.RELEASE

  <properties>
    ...
    <spring-cloud.version>Finchley.RELEASE</spring-cloud.version>
  </properties>

应该指定了中的spring-boot-starter-webflux和spring-cloud-stream-reactive。

  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>
    ...
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream-reactive</artifactId>
    </dependency>
    ...
  </dependencies>

添加 Kafka 连接器的配置至 pom 文件。

请添加 spring-cloud-starter-stream-kafka。
注意,如果没有这个配置,将无法连接到Kafka。

    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-stream-kafka</artifactId>
    </dependency>

添加元素,以便能够加载spring-cloud-starter-stream-kafka。

  <repositories>
    <repository>
      <id>spring-milestones</id>
      <name>Spring Milestones</name>
      <url>https://repo.spring.io/milestone</url>
      <snapshots>
        <enabled>false</enabled>
      </snapshots>
    </repository>
  </repositories>

添加代码

文件结构

添加 Routes.kt、Handler.kt 和 Producer.kt。

21:03:59 ~/IdeaProjects/spring-cloud-stream-kafka-demo/src/main/kotlin/com/example/springcloudstreamkafkademo 
$ tree
.
├── Handler.kt
├── Producer.kt
├── Routes.kt
└── SpringCloudStreamKafkaDemoApplication.kt

Routes.kt 的中文译文可以是:路由.kt

在/ api / putText的路径上,接受JSON请求并调用处理程序(Handler)。

package com.example.springcloudstreamkafkademo

import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.http.MediaType

@Configuration
class Routes(
        private val handler: Handler
) {

    @Bean
    fun router() = org.springframework.web.reactive.function.server.router {
        "/api".nest {
            accept(MediaType.APPLICATION_JSON).nest {
                POST("/putText", handler::output)
            }
        }
    }
}

处理程序.kt

将请求的JSON转换为对象(RequestMessage),并传递给Kafka生产者。

package com.example.springcloudstreamkafkademo

import org.springframework.stereotype.Component
import org.springframework.web.reactive.function.server.ServerRequest
import org.springframework.web.reactive.function.server.ServerResponse
import org.springframework.web.reactive.function.server.body
import reactor.core.publisher.Mono

data class RequestMessage(
        val id: Long,
        val text: String
)

@Component
class Handler(private val producer: Producer) {

    fun output(req: ServerRequest) =
            req.bodyToMono(RequestMessage::class.java)
                    .flatMap { ServerResponse.ok().body(Mono.just(producer.output(it))) }

}

制片人.kt

这里是关键。对于说明,可以参考Spring Cloud Stream参考指南。然而,在经过多次尝试之后,我找到了一种有效的方法,但也可能有其他更好的方式。

在output函数中,我们将消息传递给fluxSink。而fluxSink是在生成Flux时获取的emitter,由flux的emit函数订阅后,会按顺序处理fluxSink传递的消息。

package com.example.springcloudstreamkafkademo

import org.springframework.cloud.stream.annotation.EnableBinding
import org.springframework.cloud.stream.annotation.Output
import org.springframework.cloud.stream.messaging.Source
import org.springframework.cloud.stream.reactive.StreamEmitter
import org.springframework.stereotype.Component
import reactor.core.publisher.Flux
import reactor.core.publisher.FluxSink

@Component
@EnableBinding(Source::class)
class Producer {

    private var fluxSink: FluxSink<RequestMessage>? = null

    private var flux: Flux<RequestMessage> = Flux.create<RequestMessage>(
            { emitter -> this.fluxSink = emitter },
            FluxSink.OverflowStrategy.BUFFER)

    fun output(message: RequestMessage): RequestMessage {
        this.fluxSink?.next(message)
        return message
    }

    @StreamEmitter
    @Output(Source.OUTPUT)
    fun emit(): Flux<RequestMessage> {
        return flux
    }

}

确认运转

使用SpringCloudStreamKafkaDemoApplication启动Spring Boot。
通过以下日志,似乎连接到了名为output的主题,并附带了Kafka的连接信息。

...
2018-06-23 21:33:28.809  INFO 26767 --- [           main] o.s.c.s.b.k.p.KafkaTopicProvisioner      : Using kafka topic for outbound: output
2018-06-23 21:33:28.812  INFO 26767 --- [           main] o.a.k.clients.admin.AdminClientConfig    : AdminClientConfig values: 
    bootstrap.servers = [localhost:9092]
    client.id = 
    connections.max.idle.ms = 300000
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 120000
    retries = 5
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS

2018-06-23 21:33:28.898  INFO 26767 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 1.0.1
2018-06-23 21:33:28.898  INFO 26767 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : c0518aa65f25317e
2018-06-23 21:33:29.209  INFO 26767 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
    acks = 1
    batch.size = 16384
    bootstrap.servers = [localhost:9092]
    buffer.memory = 33554432
    client.id = 
    compression.type = none
    connections.max.idle.ms = 540000
    enable.idempotence = false
    interceptor.classes = null
    key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
    linger.ms = 0
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 0
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    transaction.timeout.ms = 60000
    transactional.id = null
    value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer

2018-06-23 21:33:29.235  INFO 26767 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 1.0.1
2018-06-23 21:33:29.235  INFO 26767 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : c0518aa65f25317e
2018-06-23 21:33:29.242  INFO 26767 --- [           main] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 30000 ms.
2018-06-23 21:33:29.254  INFO 26767 --- [           main] o.s.integration.channel.DirectChannel    : Channel 'application.output' has 1 subscriber(s).
2018-06-23 21:33:29.255  INFO 26767 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 0
2018-06-23 21:33:29.273  INFO 26767 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2018-06-23 21:33:29.273  INFO 26767 --- [           main] o.s.i.channel.PublishSubscribeChannel    : Channel 'application.errorChannel' has 1 subscriber(s).
2018-06-23 21:33:29.274  INFO 26767 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started _org.springframework.integration.errorLogger
2018-06-23 21:33:29.274  INFO 26767 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147482647
2018-06-23 21:33:29.274  INFO 26767 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147483547
2018-06-23 21:33:29.360  INFO 26767 --- [ctor-http-nio-1] r.ipc.netty.tcp.BlockingNettyContext     : Started HttpServer on /0:0:0:0:0:0:0:0:8080
2018-06-23 21:33:29.360  INFO 26767 --- [           main] o.s.b.web.embedded.netty.NettyWebServer  : Netty started on port(s): 8080
2018-06-23 21:33:29.363  INFO 26767 --- [           main] .SpringCloudStreamKafkaDemoApplicationKt : Started SpringCloudStreamKafkaDemoApplicationKt in 6.443 seconds 
...

调用API

我将尝试使用curl调用API.

$ curl -X POST -H 'Content-Type: application/json' localhost:8080/api/putText -d '{"id":100, "text":"aa"}'

通过预先消费主题,可以确认消息的到达。

$ kafka-console-consumer --bootstrap-server localhost:9092 --topic output
{"id":100,"text":"aa"}
{"id":100,"text":"aa"}
{"id":100,"text":"aa"}
bannerAds