试用Spring Cloud Stream(在Kafka中将一个消息分发到多个消息)
首先
我之前尝试了使用Spring Cloud Stream连接到Kafka的示例,但这次我想试试接收Kafka消息,并将接收到的消息分发到多个输出(multi output)。
环境
与上次相同
操作步骤
根据之前创建的每秒发送时间消息的示例,判断时间的秒数是偶数还是奇数,并根据偶数和奇数分配到对应的主题上,创建处理程序。
创建一个用于连接Kafka的项目。
前回,以上样例将被用于上述项目中。
进行kafka消息接收设置
在application.yml中添加输出目标的topic。
指定要写入偶数消息和奇数消息的主题。
spring:
cloud:
stream:
bindings:
output: spring.cloud.stream.test.topic
input: spring.cloud.stream.test.topic
even: spring.cloud.stream.test.even.topic # 偶数
odd: spring.cloud.stream.test.odd.topic # 奇数
创建一个定义用于写入偶数消息的通道的接口。
package com.example;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
public interface EvenOutput {
String OUTPUT = "spring.cloud.stream.test.even.topic";
@Output("spring.cloud.stream.test.even.topic")
MessageChannel output();
}
创建一个定义用于写入奇数消息的通道的接口。
package com.example;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
public interface OddOutput {
String OUTPUT = "spring.cloud.stream.test.odd.topic";
@Output("spring.cloud.stream.test.odd.topic")
MessageChannel output();
}
对之前制作的SampleSink进行修改。
在之前创建的SampleSink中添加一个将消息分类的处理。
上一次的状态
package com.example;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.integration.annotation.ServiceActivator;
@EnableBinding(Sink.class)
public class SampleSink {
private static Logger logger = LoggerFactory.getLogger(SampleSink.class);
@ServiceActivator(inputChannel = Sink.INPUT)
public void sink(Object payload) {
logger.info("Received: " + payload);
}
}
更改后
@EnableBiddingに{}(配列形式)でSink.classの他に、先ほど作成したEvenOutput.classとOddOutput.classを指定
@Autowiredで、EvenOutputとOddOutputのインターフェースを指定
LocalDateTimeの秒を取得して、偶数だったらevenOutput.output().send(…)、奇数だったらoddOutput.output().send(…)のように条件によって書き込むtopicを変更する
package com.example;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.support.GenericMessage;
import java.time.LocalDateTime;
@EnableBinding({Sink.class, EvenOutput.class, OddOutput.class})
public class SampleSink {
private static Logger logger = LoggerFactory.getLogger(SampleSink.class);
@Autowired
private EvenOutput evenOutput;
@Autowired
private OddOutput oddOutput;
@ServiceActivator(inputChannel = Sink.INPUT)
public void sink(Object payload) {
logger.info("Received: " + payload);
LocalDateTime now = (LocalDateTime) payload;
if (now.getSecond() % 2 == 0) {
logger.info("output even: " + payload);
evenOutput.output().send(new GenericMessage<Object>(now));
} else {
logger.info("output odd : " + payload);
oddOutput.output().send(new GenericMessage<Object>(now));
}
}
}
试一下。
与上次一样,在事前已经执行以下步骤。
-
- zookeeperを立ち上げる
- kafkaを立ち上げる
SpringBootApplication的启动
当打开应用时,可以看到信息分别以偶数和奇数进行添加。
...
(省略)
...
com.example.SpringCloudStreamKafkaDemoApplication --spring.output.ansi.enabled=always
2015-12-06 10:54:49.168 INFO 53162 --- [ main] .e.SpringCloudStreamKafkaDemoApplication : Starting SpringCloudStreamKafkaDemoApplication on MBP-15UAC-184.local with PID 53162 (/Users/tgoto/Develop/git/spring-cloud-stream-kafka-demo/build/classes/main started by tgoto in /Users/tgoto/Develop/git/spring-cloud-stream-kafka-demo)
2015-12-06 10:54:49.172 INFO 53162 --- [ main] .e.SpringCloudStreamKafkaDemoApplication : No profiles are active
2015-12-06 10:54:49.233 INFO 53162 --- [ main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@647fd8ce: startup date [Sun Dec 06 10:54:49 JST 2015]; root of context hierarchy
2015-12-06 10:54:49.738 INFO 53162 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'configurationPropertiesRebinderAutoConfiguration' of type [class org.springframework.cloud.autoconfigure.ConfigurationPropertiesRebinderAutoConfiguration$$EnhancerBySpringCGLIB$$2d142414] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2015-12-06 10:54:49.916 INFO 53162 --- [ main] .e.SpringCloudStreamKafkaDemoApplication : Started SpringCloudStreamKafkaDemoApplication in 1.065 seconds (JVM running for 1.76)
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v1.3.0.RELEASE)
2015-12-06 10:54:50.075 INFO 53162 --- [ main] .e.SpringCloudStreamKafkaDemoApplication : No profiles are active
2015-12-06 10:54:50.090 INFO 53162 --- [ main] ationConfigEmbeddedWebApplicationContext : Refreshing org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@7922d892: startup date [Sun Dec 06 10:54:50 JST 2015]; parent: org.springframework.context.annotation.AnnotationConfigApplicationContext@647fd8ce
2015-12-06 10:54:51.035 INFO 53162 --- [ main] o.s.b.f.config.PropertiesFactoryBean : Loading properties file from URL [jar:file:/Users/tgoto/.gradle/caches/modules-2/files-2.1/org.springframework.integration/spring-integration-core/4.2.1.RELEASE/bb42e637833fd9c17df6092790d6209872e0bd65/spring-integration-core-4.2.1.RELEASE.jar!/META-INF/spring.integration.default.properties]
2015-12-06 10:54:51.039 INFO 53162 --- [ main] o.s.i.config.IntegrationRegistrar : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined.
...
(省略)
...
2015-12-06 10:55:30.551 INFO 53162 --- [ main] s.i.k.i.KafkaMessageDrivenChannelAdapter : started org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter@6d5f4900
2015-12-06 10:55:30.553 INFO 53162 --- [ main] o.s.c.s.b.k.KafkaMessageChannelBinder$4 : Adding {message-handler:inbound.spring.cloud.stream.test.topic} as a subscriber to the 'bridge.spring.cloud.stream.test.topic' channel
2015-12-06 10:55:30.553 INFO 53162 --- [ main] o.s.c.s.b.k.KafkaMessageChannelBinder$4 : started inbound.spring.cloud.stream.test.topic
2015-12-06 10:55:30.555 INFO 53162 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 0
2015-12-06 10:55:30.556 INFO 53162 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 2147482647
2015-12-06 10:55:30.626 INFO 53162 --- [pool-4-thread-1] com.example.SampleSink : Received: 2015-12-05T12:10:43.528
2015-12-06 10:55:30.626 INFO 53162 --- [pool-4-thread-1] com.example.SampleSink : output odd : 2015-12-05T12:10:43.528
2015-12-06 10:55:30.629 INFO 53162 --- [ main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 8080 (http)
2015-12-06 10:55:30.632 INFO 53162 --- [ main] .e.SpringCloudStreamKafkaDemoApplication : Started SpringCloudStreamKafkaDemoApplication in 41.911 seconds (JVM running for 42.478)
2015-12-06 10:55:30.753 INFO 53162 --- [pool-4-thread-1] com.example.SampleSink : Received: 2015-12-05T12:10:44.617
2015-12-06 10:55:30.753 INFO 53162 --- [pool-4-thread-1] com.example.SampleSink : output even: 2015-12-05T12:10:44.617
2015-12-06 10:55:30.858 INFO 53162 --- [pool-4-thread-1] com.example.SampleSink : Received: 2015-12-05T12:10:45.622
2015-12-06 10:55:30.859 INFO 53162 --- [pool-4-thread-1] com.example.SampleSink : output odd : 2015-12-05T12:10:45.622
2015-12-06 10:55:30.860 INFO 53162 --- [pool-4-thread-1] com.example.SampleSink : Received: 2015-12-05T12:10:46.623
2015-12-06 10:55:30.860 INFO 53162 --- [pool-4-thread-1] com.example.SampleSink : output even: 2015-12-05T12:10:46.623
确认该主题是否已被创建
~ ❯❯❯ /usr/local/bin/kafka-topics.sh --list --zookeeper localhost:2181
spring.cloud.stream.test.even.topic
spring.cloud.stream.test.odd.topic
spring.cloud.stream.test.topic
确认主题的内容
虽然无法看到LocalDateTime的值,但是它确实被写入了一些东西。
~ ❯❯❯ /usr/local/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic spring.cloud.stream.test.even.topic
?
contentType8"application/x-java-object;type=java.time.LocalDateTime"
?
????,
?
contentType8"application/x-java-object;type=java.time.LocalDateTime"
?
???.
?
contentType8"application/x-java-object;type=java.time.LocalDateTime"
?
????0
?
contentType8"application/x-java-object;type=java.time.LocalDateTime"
?
????2
?
contentType8"application/x-java-object;type=java.time.LocalDateTime"
?