使用Spring Boot WebFlux和Kotlin结合使用Spring-Cloud-Stream将消息写入到Kafka中
简介
使用Spring Boot的WebFlux,在Spring Cloud Stream中使用Kafka将消息写入的方法。
环境- 這個詞彙在中文中以较为直接的方式表達為「外界條件」或「周圍的狀況」。
-
- 開発環境: Mac
-
- IDE: IntelliJ
- 言語: Kotlin
预先准备
在Mac上安装Kafka。
请按照以下非常简明的步骤在Mac上安装Kafka(Kafka是一种消息队列软件)。
操作步骤
项目创建
我将使用IntelliJ,从SPRING INITIALIZR中创建一个Spring Boot项目。
文件 -> 新建 -> 项目…

项目元数据
- Language: Kotlin

依赖关系
-
- Spring Boot: 2.0.3
-
- Web -> Reactive Web
- Cloud Messaging -> Reactive Cloud Stream

项目名称,地点
请随意填写姓名和位置以完成。

在项目正在创建中,请稍等一下。
当完成时,会变成这种感觉。

检查和修正Pom文件
请确认通过SPRING INITIALIZR创建的项目的pom文件的设置内容。
spring-cloud.version应该指定为Finchley.RELEASE
<properties>
...
<spring-cloud.version>Finchley.RELEASE</spring-cloud.version>
</properties>
应该指定了中的spring-boot-starter-webflux和spring-cloud-stream-reactive。
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
...
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-reactive</artifactId>
</dependency>
...
</dependencies>
添加 Kafka 连接器的配置至 pom 文件。
请添加 spring-cloud-starter-stream-kafka。
注意,如果没有这个配置,将无法连接到Kafka。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
添加元素,以便能够加载spring-cloud-starter-stream-kafka。
<repositories>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
添加代码
文件结构
添加 Routes.kt、Handler.kt 和 Producer.kt。
21:03:59 ~/IdeaProjects/spring-cloud-stream-kafka-demo/src/main/kotlin/com/example/springcloudstreamkafkademo
$ tree
.
├── Handler.kt
├── Producer.kt
├── Routes.kt
└── SpringCloudStreamKafkaDemoApplication.kt
Routes.kt 的中文译文可以是:路由.kt
在/ api / putText的路径上,接受JSON请求并调用处理程序(Handler)。
package com.example.springcloudstreamkafkademo
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.http.MediaType
@Configuration
class Routes(
private val handler: Handler
) {
@Bean
fun router() = org.springframework.web.reactive.function.server.router {
"/api".nest {
accept(MediaType.APPLICATION_JSON).nest {
POST("/putText", handler::output)
}
}
}
}
处理程序.kt
将请求的JSON转换为对象(RequestMessage),并传递给Kafka生产者。
package com.example.springcloudstreamkafkademo
import org.springframework.stereotype.Component
import org.springframework.web.reactive.function.server.ServerRequest
import org.springframework.web.reactive.function.server.ServerResponse
import org.springframework.web.reactive.function.server.body
import reactor.core.publisher.Mono
data class RequestMessage(
val id: Long,
val text: String
)
@Component
class Handler(private val producer: Producer) {
fun output(req: ServerRequest) =
req.bodyToMono(RequestMessage::class.java)
.flatMap { ServerResponse.ok().body(Mono.just(producer.output(it))) }
}
制片人.kt
这里是关键。对于说明,可以参考Spring Cloud Stream参考指南。然而,在经过多次尝试之后,我找到了一种有效的方法,但也可能有其他更好的方式。
在output函数中,我们将消息传递给fluxSink。而fluxSink是在生成Flux时获取的emitter,由flux的emit函数订阅后,会按顺序处理fluxSink传递的消息。
package com.example.springcloudstreamkafkademo
import org.springframework.cloud.stream.annotation.EnableBinding
import org.springframework.cloud.stream.annotation.Output
import org.springframework.cloud.stream.messaging.Source
import org.springframework.cloud.stream.reactive.StreamEmitter
import org.springframework.stereotype.Component
import reactor.core.publisher.Flux
import reactor.core.publisher.FluxSink
@Component
@EnableBinding(Source::class)
class Producer {
private var fluxSink: FluxSink<RequestMessage>? = null
private var flux: Flux<RequestMessage> = Flux.create<RequestMessage>(
{ emitter -> this.fluxSink = emitter },
FluxSink.OverflowStrategy.BUFFER)
fun output(message: RequestMessage): RequestMessage {
this.fluxSink?.next(message)
return message
}
@StreamEmitter
@Output(Source.OUTPUT)
fun emit(): Flux<RequestMessage> {
return flux
}
}
确认运转
使用SpringCloudStreamKafkaDemoApplication启动Spring Boot。
通过以下日志,似乎连接到了名为output的主题,并附带了Kafka的连接信息。
...
2018-06-23 21:33:28.809 INFO 26767 --- [ main] o.s.c.s.b.k.p.KafkaTopicProvisioner : Using kafka topic for outbound: output
2018-06-23 21:33:28.812 INFO 26767 --- [ main] o.a.k.clients.admin.AdminClientConfig : AdminClientConfig values:
bootstrap.servers = [localhost:9092]
client.id =
connections.max.idle.ms = 300000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 120000
retries = 5
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
2018-06-23 21:33:28.898 INFO 26767 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version : 1.0.1
2018-06-23 21:33:28.898 INFO 26767 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : c0518aa65f25317e
2018-06-23 21:33:29.209 INFO 26767 --- [ main] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [localhost:9092]
buffer.memory = 33554432
client.id =
compression.type = none
connections.max.idle.ms = 540000
enable.idempotence = false
interceptor.classes = null
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
2018-06-23 21:33:29.235 INFO 26767 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version : 1.0.1
2018-06-23 21:33:29.235 INFO 26767 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : c0518aa65f25317e
2018-06-23 21:33:29.242 INFO 26767 --- [ main] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 30000 ms.
2018-06-23 21:33:29.254 INFO 26767 --- [ main] o.s.integration.channel.DirectChannel : Channel 'application.output' has 1 subscriber(s).
2018-06-23 21:33:29.255 INFO 26767 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 0
2018-06-23 21:33:29.273 INFO 26767 --- [ main] o.s.i.endpoint.EventDrivenConsumer : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2018-06-23 21:33:29.273 INFO 26767 --- [ main] o.s.i.channel.PublishSubscribeChannel : Channel 'application.errorChannel' has 1 subscriber(s).
2018-06-23 21:33:29.274 INFO 26767 --- [ main] o.s.i.endpoint.EventDrivenConsumer : started _org.springframework.integration.errorLogger
2018-06-23 21:33:29.274 INFO 26767 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 2147482647
2018-06-23 21:33:29.274 INFO 26767 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 2147483547
2018-06-23 21:33:29.360 INFO 26767 --- [ctor-http-nio-1] r.ipc.netty.tcp.BlockingNettyContext : Started HttpServer on /0:0:0:0:0:0:0:0:8080
2018-06-23 21:33:29.360 INFO 26767 --- [ main] o.s.b.web.embedded.netty.NettyWebServer : Netty started on port(s): 8080
2018-06-23 21:33:29.363 INFO 26767 --- [ main] .SpringCloudStreamKafkaDemoApplicationKt : Started SpringCloudStreamKafkaDemoApplicationKt in 6.443 seconds
...
调用API
我将尝试使用curl调用API.
$ curl -X POST -H 'Content-Type: application/json' localhost:8080/api/putText -d '{"id":100, "text":"aa"}'
通过预先消费主题,可以确认消息的到达。
$ kafka-console-consumer --bootstrap-server localhost:9092 --topic output
{"id":100,"text":"aa"}
{"id":100,"text":"aa"}
{"id":100,"text":"aa"}