使用Spring Cloud Stream将Kafka的分区按实例读取

事件

当使用Spring-Cloud-Stream在多个实例上连接Kafka时,即使将消费者组指定为相同的值,也会出现每个实例从相同的分区读取消息的情况。因此,需要记录消费者组的配置方法。

    org.springframework.cloud:spring-cloud-stream-parent:1.0.0.M4

设置步骤

查看公式文件

最近的官方文件特别易懂,比阅读代码更有收获。

将下述四个项目的说明结合起来,我们就能理解对应的方法。

    • instanceCount=<インスタンス数>

 

    • instanceIndex=<インスタンスの連番(0からインスタンス数-1)>

 

    • bindings..partitioned=true

 

    bindings..group=<コンシューマーグループ名>(もしかしたら不要)

设置示例

应用程序配置文件的示例

spring:
  cloud:
    stream:
      instanceCount: 2
      instanceIndex: 0
      bindings:
        input:
          destination: test.topic
          group: test_consumer_group
          partitioned: true

application.properties的例子。

spring.cloud.stream.instanceCount=2
spring.cloud.stream.instanceIndex=0
spring.cloud.stream.bindings.input.destination=test.topic
spring.cloud.stream.bindings.input.group=test_consumer_group
spring.cloud.stream.bindings.input.partitioned=true
设置instanceCount和instanceIndex。

instanceCount和instanceIndex是根据不同的环境(开发机、测试、生产)而有所不同的,因此可以通过外部环境变量进行指定。

环境变量的例子
export SPRING_CLOUD_STREAM_INSTANCECOUNT=2
export SPRING_CLOUD_STREAM_INSTANCEINDEX=0
Java启动选项的示例。
-Dspring.cloud.stream.instanceCount=2 -Dspring.cloud.stream.instanceIndex=0

请记下以下公式文件的描述部分。

2.4 消费者群体

只指定消费者组,并不会对同一分区允许多个消费者(实例)进行访问起效果。

4.1 春季云流属性

在Spring Cloud Stream的配置中,存在instanceCount和instanceIndex两个设置,对于Kafka,这是必需的配置(由于有默认设置,因此不需要显式指定)。

比如说,如果在一台服务器上启动一个实例,那么当有两台服务器时,instanceCount为2,instanceIndex分别为0和1。
当有三台服务器时,instanceCount为3,instanceIndex分别为0、1和2。

就光是這個設定,效果並不顯著。

4.2.2 消费者属性

在 “Partitioned” 的解释中提到了 “是否使用分区制作者”,但仅凭这句话无法理解其含义。然而,这是必要的。

在说明中,

spring.cloud.stream.bindings.<channelName>.consumer.partitioned=true

好像需要消费者的元素,但事实上并不是这样。

spring.cloud.stream.bindings.<channelName>.partitioned=true

这个设置生效了。这可能是因为版本的差异所致。

8.2 实例索引和实例数量

这里也有instanceCount和instanceIndex的说明。如果部署到Spring Cloud Dataflow中,这些值将自动设置。但是,对于普通的Spring Cloud Stream应用程序,需要准确地进行设置。

配置分区的输入绑定

这一部分提供了具体的设置方法。虽然不清楚为什么会在不同的地方出现类似的说明,但通过阅读这一部分,可以了解设置方法。
为(例如input)设置partitioned=true,并适当设置instanceCount等参数,否则将无法正确地从分区读取消息,所以需要注意。

spring.cloud.stream.bindings.input.partitioned=true
spring.cloud.stream.instanceIndex=3
spring.cloud.stream.instanceCount=5

行动的结果

验证条件

    • kafkaのpartition数:3

 

    コンシューマーのインスタンス数:2

应用程序的yml配置内容

spring:
  cloud:
    stream:
      instanceCount: 2
      bindings:
        input:
          destination: test.topic
          group: test_consumer_group
          partitioned: true

日志记录

日志级别设置为DEBUG。

第一个实例的日志

使用以下命令以-Dspring.cloud.stream.instanceIndex=0选项启动应用程序:

id=0はpartitionのインデックスを示す。

@3の3の数字は、メッセージのオフセットを示す。何番目のメッセージまで読んだかがわかる。
インスタンス1で受け取っているメッセージは、インデックスが0と2のpartition。

2016-04-09 15:21:20.626 [pool-3-thread-2] DEBUG o.s.i.k.c.DefaultConnection fetch - Reading from Partition[topic='test.topic', id=0]@3
2016-04-09 15:21:20.724 [pool-3-thread-1] DEBUG o.s.i.k.c.DefaultConnection fetch - Reading from Partition[topic='test.topic', id=2]@14
2016-04-09 15:21:20.726 [pool-3-thread-2] DEBUG o.s.i.k.c.DefaultConnection fetch - Reading from Partition[topic='test.topic', id=0]@3
2016-04-09 15:21:20.826 [pool-3-thread-1] DEBUG o.s.i.k.c.DefaultConnection fetch - Reading from Partition[topic='test.topic', id=2]@14
2016-04-09 15:21:20.828 [pool-3-thread-2] DEBUG o.s.i.k.c.DefaultConnection fetch - Reading from Partition[topic='test.topic', id=0]@3
2016-04-09 15:21:20.927 [pool-3-thread-1] DEBUG o.s.i.k.c.DefaultConnection fetch - Reading from Partition[topic='test.topic', id=2]@14

第二個實例的日誌

使用-Dspring.cloud.stream.instanceIndex=1选项启动

    インスタンス2で受け取っているメッセージは、インデックスが1のpartition。
2016-04-09 15:15:20.039 [pool-3-thread-1] DEBUG o.s.i.k.c.DefaultConnection fetch - Reading from Partition[topic='test.topic', id=1]@33
2016-04-09 15:15:20.142 [pool-3-thread-1] DEBUG o.s.i.k.c.DefaultConnection fetch - Reading from Partition[topic='test.topic', id=1]@33
2016-04-09 15:15:20.244 [pool-3-thread-1] DEBUG o.s.i.k.c.DefaultConnection fetch - Reading from Partition[topic='test.topic', id=1]@33

我的感受

考虑到消费者实例数量的变化,需要在每个实例中重新设置instanceCount和instanceIndex的配置,这是相当繁琐的工作。如果考虑引入Spring Cloud DataFlow,它可能会自动进行配置,但是考虑到引入的成本,即使运维工作有点繁琐,也应该忍受下去。

bannerAds