使用Spring Cloud Stream将Kafka的分区按实例读取
事件
当使用Spring-Cloud-Stream在多个实例上连接Kafka时,即使将消费者组指定为相同的值,也会出现每个实例从相同的分区读取消息的情况。因此,需要记录消费者组的配置方法。
- org.springframework.cloud:spring-cloud-stream-parent:1.0.0.M4
设置步骤
查看公式文件
最近的官方文件特别易懂,比阅读代码更有收获。
将下述四个项目的说明结合起来,我们就能理解对应的方法。
-
- instanceCount=<インスタンス数>
-
- instanceIndex=<インスタンスの連番(0からインスタンス数-1)>
-
- bindings..partitioned=true
- bindings..group=<コンシューマーグループ名>(もしかしたら不要)
设置示例
应用程序配置文件的示例
spring:
cloud:
stream:
instanceCount: 2
instanceIndex: 0
bindings:
input:
destination: test.topic
group: test_consumer_group
partitioned: true
application.properties的例子。
spring.cloud.stream.instanceCount=2
spring.cloud.stream.instanceIndex=0
spring.cloud.stream.bindings.input.destination=test.topic
spring.cloud.stream.bindings.input.group=test_consumer_group
spring.cloud.stream.bindings.input.partitioned=true
设置instanceCount和instanceIndex。
instanceCount和instanceIndex是根据不同的环境(开发机、测试、生产)而有所不同的,因此可以通过外部环境变量进行指定。
环境变量的例子
export SPRING_CLOUD_STREAM_INSTANCECOUNT=2
export SPRING_CLOUD_STREAM_INSTANCEINDEX=0
Java启动选项的示例。
-Dspring.cloud.stream.instanceCount=2 -Dspring.cloud.stream.instanceIndex=0
请记下以下公式文件的描述部分。
2.4 消费者群体
只指定消费者组,并不会对同一分区允许多个消费者(实例)进行访问起效果。
4.1 春季云流属性
在Spring Cloud Stream的配置中,存在instanceCount和instanceIndex两个设置,对于Kafka,这是必需的配置(由于有默认设置,因此不需要显式指定)。
比如说,如果在一台服务器上启动一个实例,那么当有两台服务器时,instanceCount为2,instanceIndex分别为0和1。
当有三台服务器时,instanceCount为3,instanceIndex分别为0、1和2。
就光是這個設定,效果並不顯著。
4.2.2 消费者属性
在 “Partitioned” 的解释中提到了 “是否使用分区制作者”,但仅凭这句话无法理解其含义。然而,这是必要的。
在说明中,
spring.cloud.stream.bindings.<channelName>.consumer.partitioned=true
好像需要消费者的元素,但事实上并不是这样。
spring.cloud.stream.bindings.<channelName>.partitioned=true
这个设置生效了。这可能是因为版本的差异所致。
8.2 实例索引和实例数量
这里也有instanceCount和instanceIndex的说明。如果部署到Spring Cloud Dataflow中,这些值将自动设置。但是,对于普通的Spring Cloud Stream应用程序,需要准确地进行设置。
配置分区的输入绑定
这一部分提供了具体的设置方法。虽然不清楚为什么会在不同的地方出现类似的说明,但通过阅读这一部分,可以了解设置方法。
为(例如input)设置partitioned=true,并适当设置instanceCount等参数,否则将无法正确地从分区读取消息,所以需要注意。
spring.cloud.stream.bindings.input.partitioned=true
spring.cloud.stream.instanceIndex=3
spring.cloud.stream.instanceCount=5
行动的结果
验证条件
-
- kafkaのpartition数:3
- コンシューマーのインスタンス数:2
应用程序的yml配置内容
spring:
cloud:
stream:
instanceCount: 2
bindings:
input:
destination: test.topic
group: test_consumer_group
partitioned: true
日志记录
日志级别设置为DEBUG。
第一个实例的日志
使用以下命令以-Dspring.cloud.stream.instanceIndex=0选项启动应用程序:
id=0はpartitionのインデックスを示す。
@3の3の数字は、メッセージのオフセットを示す。何番目のメッセージまで読んだかがわかる。
インスタンス1で受け取っているメッセージは、インデックスが0と2のpartition。
2016-04-09 15:21:20.626 [pool-3-thread-2] DEBUG o.s.i.k.c.DefaultConnection fetch - Reading from Partition[topic='test.topic', id=0]@3
2016-04-09 15:21:20.724 [pool-3-thread-1] DEBUG o.s.i.k.c.DefaultConnection fetch - Reading from Partition[topic='test.topic', id=2]@14
2016-04-09 15:21:20.726 [pool-3-thread-2] DEBUG o.s.i.k.c.DefaultConnection fetch - Reading from Partition[topic='test.topic', id=0]@3
2016-04-09 15:21:20.826 [pool-3-thread-1] DEBUG o.s.i.k.c.DefaultConnection fetch - Reading from Partition[topic='test.topic', id=2]@14
2016-04-09 15:21:20.828 [pool-3-thread-2] DEBUG o.s.i.k.c.DefaultConnection fetch - Reading from Partition[topic='test.topic', id=0]@3
2016-04-09 15:21:20.927 [pool-3-thread-1] DEBUG o.s.i.k.c.DefaultConnection fetch - Reading from Partition[topic='test.topic', id=2]@14
第二個實例的日誌
使用-Dspring.cloud.stream.instanceIndex=1选项启动
- インスタンス2で受け取っているメッセージは、インデックスが1のpartition。
2016-04-09 15:15:20.039 [pool-3-thread-1] DEBUG o.s.i.k.c.DefaultConnection fetch - Reading from Partition[topic='test.topic', id=1]@33
2016-04-09 15:15:20.142 [pool-3-thread-1] DEBUG o.s.i.k.c.DefaultConnection fetch - Reading from Partition[topic='test.topic', id=1]@33
2016-04-09 15:15:20.244 [pool-3-thread-1] DEBUG o.s.i.k.c.DefaultConnection fetch - Reading from Partition[topic='test.topic', id=1]@33
我的感受
考虑到消费者实例数量的变化,需要在每个实例中重新设置instanceCount和instanceIndex的配置,这是相当繁琐的工作。如果考虑引入Spring Cloud DataFlow,它可能会自动进行配置,但是考虑到引入的成本,即使运维工作有点繁琐,也应该忍受下去。