试用Spring Cloud Stream(在Kafka中将一个消息分发到多个消息)

首先

我之前尝试了使用Spring Cloud Stream连接到Kafka的示例,但这次我想试试接收Kafka消息,并将接收到的消息分发到多个输出(multi output)。

环境

与上次相同

操作步骤

根据之前创建的每秒发送时间消息的示例,判断时间的秒数是偶数还是奇数,并根据偶数和奇数分配到对应的主题上,创建处理程序。

创建一个用于连接Kafka的项目。

前回,以上样例将被用于上述项目中。

进行kafka消息接收设置

在application.yml中添加输出目标的topic。

指定要写入偶数消息和奇数消息的主题。

spring:
  cloud:
    stream:
      bindings:
        output: spring.cloud.stream.test.topic
        input: spring.cloud.stream.test.topic
        even: spring.cloud.stream.test.even.topic # 偶数
        odd: spring.cloud.stream.test.odd.topic # 奇数

创建一个定义用于写入偶数消息的通道的接口。

package com.example;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface EvenOutput {
  String OUTPUT = "spring.cloud.stream.test.even.topic";

  @Output("spring.cloud.stream.test.even.topic")
  MessageChannel output();
}

创建一个定义用于写入奇数消息的通道的接口。

package com.example;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface OddOutput {
  String OUTPUT = "spring.cloud.stream.test.odd.topic";

  @Output("spring.cloud.stream.test.odd.topic")
  MessageChannel output();
}

对之前制作的SampleSink进行修改。

在之前创建的SampleSink中添加一个将消息分类的处理。

上一次的状态

package com.example;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.integration.annotation.ServiceActivator;

@EnableBinding(Sink.class)
public class SampleSink {

  private static Logger logger = LoggerFactory.getLogger(SampleSink.class);

  @ServiceActivator(inputChannel = Sink.INPUT)
  public void sink(Object payload) {
    logger.info("Received: " + payload);
  }
}

更改后

@EnableBiddingに{}(配列形式)でSink.classの他に、先ほど作成したEvenOutput.classとOddOutput.classを指定

@Autowiredで、EvenOutputとOddOutputのインターフェースを指定
LocalDateTimeの秒を取得して、偶数だったらevenOutput.output().send(…)、奇数だったらoddOutput.output().send(…)のように条件によって書き込むtopicを変更する

package com.example;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.support.GenericMessage;

import java.time.LocalDateTime;

@EnableBinding({Sink.class, EvenOutput.class, OddOutput.class})
public class SampleSink {

  private static Logger logger = LoggerFactory.getLogger(SampleSink.class);

  @Autowired
  private EvenOutput evenOutput;

  @Autowired
  private OddOutput oddOutput;

  @ServiceActivator(inputChannel = Sink.INPUT)
  public void sink(Object payload) {
    logger.info("Received: " + payload);
    LocalDateTime now = (LocalDateTime) payload;
    if (now.getSecond() % 2 == 0) {
      logger.info("output even: " + payload);
      evenOutput.output().send(new GenericMessage<Object>(now));
    } else {
      logger.info("output odd : " + payload);
      oddOutput.output().send(new GenericMessage<Object>(now));
    }
  }
}

试一下。

与上次一样,在事前已经执行以下步骤。

    • zookeeperを立ち上げる

 

    kafkaを立ち上げる

SpringBootApplication的启动

当打开应用时,可以看到信息分别以偶数和奇数进行添加。

...
(省略)
...
com.example.SpringCloudStreamKafkaDemoApplication --spring.output.ansi.enabled=always
2015-12-06 10:54:49.168  INFO 53162 --- [           main] .e.SpringCloudStreamKafkaDemoApplication : Starting SpringCloudStreamKafkaDemoApplication on MBP-15UAC-184.local with PID 53162 (/Users/tgoto/Develop/git/spring-cloud-stream-kafka-demo/build/classes/main started by tgoto in /Users/tgoto/Develop/git/spring-cloud-stream-kafka-demo)
2015-12-06 10:54:49.172  INFO 53162 --- [           main] .e.SpringCloudStreamKafkaDemoApplication : No profiles are active
2015-12-06 10:54:49.233  INFO 53162 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@647fd8ce: startup date [Sun Dec 06 10:54:49 JST 2015]; root of context hierarchy
2015-12-06 10:54:49.738  INFO 53162 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'configurationPropertiesRebinderAutoConfiguration' of type [class org.springframework.cloud.autoconfigure.ConfigurationPropertiesRebinderAutoConfiguration$$EnhancerBySpringCGLIB$$2d142414] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2015-12-06 10:54:49.916  INFO 53162 --- [           main] .e.SpringCloudStreamKafkaDemoApplication : Started SpringCloudStreamKafkaDemoApplication in 1.065 seconds (JVM running for 1.76)

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v1.3.0.RELEASE)

2015-12-06 10:54:50.075  INFO 53162 --- [           main] .e.SpringCloudStreamKafkaDemoApplication : No profiles are active
2015-12-06 10:54:50.090  INFO 53162 --- [           main] ationConfigEmbeddedWebApplicationContext : Refreshing org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@7922d892: startup date [Sun Dec 06 10:54:50 JST 2015]; parent: org.springframework.context.annotation.AnnotationConfigApplicationContext@647fd8ce
2015-12-06 10:54:51.035  INFO 53162 --- [           main] o.s.b.f.config.PropertiesFactoryBean     : Loading properties file from URL [jar:file:/Users/tgoto/.gradle/caches/modules-2/files-2.1/org.springframework.integration/spring-integration-core/4.2.1.RELEASE/bb42e637833fd9c17df6092790d6209872e0bd65/spring-integration-core-4.2.1.RELEASE.jar!/META-INF/spring.integration.default.properties]
2015-12-06 10:54:51.039  INFO 53162 --- [           main] o.s.i.config.IntegrationRegistrar        : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. 
...
(省略)
...
2015-12-06 10:55:30.551  INFO 53162 --- [           main] s.i.k.i.KafkaMessageDrivenChannelAdapter : started org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter@6d5f4900
2015-12-06 10:55:30.553  INFO 53162 --- [           main] o.s.c.s.b.k.KafkaMessageChannelBinder$4  : Adding {message-handler:inbound.spring.cloud.stream.test.topic} as a subscriber to the 'bridge.spring.cloud.stream.test.topic' channel
2015-12-06 10:55:30.553  INFO 53162 --- [           main] o.s.c.s.b.k.KafkaMessageChannelBinder$4  : started inbound.spring.cloud.stream.test.topic
2015-12-06 10:55:30.555  INFO 53162 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 0
2015-12-06 10:55:30.556  INFO 53162 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147482647
2015-12-06 10:55:30.626  INFO 53162 --- [pool-4-thread-1] com.example.SampleSink                   : Received: 2015-12-05T12:10:43.528
2015-12-06 10:55:30.626  INFO 53162 --- [pool-4-thread-1] com.example.SampleSink                   : output odd : 2015-12-05T12:10:43.528
2015-12-06 10:55:30.629  INFO 53162 --- [           main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 8080 (http)
2015-12-06 10:55:30.632  INFO 53162 --- [           main] .e.SpringCloudStreamKafkaDemoApplication : Started SpringCloudStreamKafkaDemoApplication in 41.911 seconds (JVM running for 42.478)
2015-12-06 10:55:30.753  INFO 53162 --- [pool-4-thread-1] com.example.SampleSink                   : Received: 2015-12-05T12:10:44.617
2015-12-06 10:55:30.753  INFO 53162 --- [pool-4-thread-1] com.example.SampleSink                   : output even: 2015-12-05T12:10:44.617
2015-12-06 10:55:30.858  INFO 53162 --- [pool-4-thread-1] com.example.SampleSink                   : Received: 2015-12-05T12:10:45.622
2015-12-06 10:55:30.859  INFO 53162 --- [pool-4-thread-1] com.example.SampleSink                   : output odd : 2015-12-05T12:10:45.622
2015-12-06 10:55:30.860  INFO 53162 --- [pool-4-thread-1] com.example.SampleSink                   : Received: 2015-12-05T12:10:46.623
2015-12-06 10:55:30.860  INFO 53162 --- [pool-4-thread-1] com.example.SampleSink                   : output even: 2015-12-05T12:10:46.623

确认该主题是否已被创建

~ ❯❯❯ /usr/local/bin/kafka-topics.sh  --list --zookeeper localhost:2181
spring.cloud.stream.test.even.topic
spring.cloud.stream.test.odd.topic
spring.cloud.stream.test.topic

确认主题的内容

虽然无法看到LocalDateTime的值,但是它确实被写入了一些东西。

~ ❯❯❯ /usr/local/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic spring.cloud.stream.test.even.topic
?
 contentType8"application/x-java-object;type=java.time.LocalDateTime"
                                                                     ?

????,
?
 contentType8"application/x-java-object;type=java.time.LocalDateTime"
                                                                     ?

???.
?
 contentType8"application/x-java-object;type=java.time.LocalDateTime"
                                                                     ?

????0
?
 contentType8"application/x-java-object;type=java.time.LocalDateTime"
                                                                     ?

????2
?
 contentType8"application/x-java-object;type=java.time.LocalDateTime"
                                                                     ?
广告
将在 10 秒后关闭
bannerAds