使用Apache Camel框架的Kafka组件进行简单的消息发送和接收
首先
使用Apache Camel的Kafka组件,可以轻松实现与kafka的消息发送和接收。
通过使用它,我们将尝试创建以下程序。
每秒发送一条消息到Topic(test_queue)。
接收Topic(test_queue)的消息并将其输出到日志中。
※ 本文中,我們使用了 XML DSL 來定義 route,但在下一篇文章中我們將使用 Java DSL 版本進行撰寫。此外,該文章也提供了比本文更多的資訊。供您參考。
- Apache CamelフレームワークのKafkaコンポーネントを使用して簡単なメッセージの送受信(Java DSL編)
修改pom.xml文件
首先,我们需要在pom.xml中添加适用于kafka的库。
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-kafka</artifactId>
<version>2.21.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
制片人创建
每秒向Topic(test_queue)发送一条消息。
发送的消息格式为“${date:now}: test message”,包含了发送时间的字符串。
我写了以下的XML DSL路由。
<route id="producer_test">
<from uri="quartz2://test/testTimer?cron=0/1+*+*+*+*+?" />
<setBody>
<simple>${date:now}: test message</simple>
</setBody>
<to
uri="kafka:test_queue?brokers=192.168.10.122:9092" />
<log message="producer_test end" />
</route>
「」是Producer的设置,指定形式为「kafka:Topic名称?brokers=broker的主机:端口」。
如果您想更改Topic的副本数量等设置,请提前创建Topic。
消费者制作
接收主题(test_queue)的消息,并将其输出到日志中。
编写了以下的XML DSL路由。
<route id="consumer_test">
<from
uri="kafka:test_queue?brokers=192.168.10.122:9092&groupId=TESTGROUP" />
<log message="body = ${body}" />
</route>
在Consumer的设置中,这个部分可以这样指定:「」。
因为Consumer这次只有一个,所以不需要对属性进行设置也可以。同时需要注意在指定Topic的时候需要使用「kafka:Topic名?brokers=broker的主机:端口」。
运行已创建的Camel应用程序
运行创建的Camel应用程序,您可以确认每秒输出一次“body = Sun Sep 02 10:15:01 JST 2018: test message”的日志。
[2018-09-02 10:15:01.041], [INFO ], consumer_test, Camel (camel-1) thread #1 - KafkaConsumer[test_queue2], consumer_test, body = Sun Sep 02 10:15:01 JST 2018: test message
[2018-09-02 10:15:02.011], [INFO ], producer_test, Camel (camel-1) thread #3 - KafkaProducer[test_queue2], producer_test, producer_test end
[2018-09-02 10:15:02.011], [INFO ], consumer_test, Camel (camel-1) thread #1 - KafkaConsumer[test_queue2], consumer_test, body = Sun Sep 02 10:15:02 JST 2018: test message
[2018-09-02 10:15:03.011], [INFO ], producer_test, Camel (camel-1) thread #4 - KafkaProducer[test_queue2], producer_test, producer_test end
[2018-09-02 10:15:03.011], [INFO ], consumer_test, Camel (camel-1) thread #1 - KafkaConsumer[test_queue2], consumer_test, body = Sun Sep 02 10:15:03 JST 2018: test message
[2018-09-02 10:15:04.011], [INFO ], producer_test, Camel (camel-1) thread #5 - KafkaProducer[test_queue2], producer_test, producer_test end
[2018-09-02 10:15:04.011], [INFO ], consumer_test, Camel (camel-1) thread #1 - KafkaConsumer[test_queue2], consumer_test, body = Sun Sep 02 10:15:04 JST 2018: test message
[2018-09-02 10:15:05.005], [INFO ], producer_test, Camel (camel-1) thread #6 - KafkaProducer[test_queue2], producer_test, producer_test end
请参阅以下页面,了解除了本次使用的属性之外的许多Kafka组件的属性。
- Kafka Component(公式サイト)
全资源
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>sample</groupId>
<artifactId>kafka-camel_simple</artifactId>
<version>1.0.0</version>
<name>kafka-camel_simple</name>
<description>kafka-camel_simple</description>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<camel.version>2.22.0</camel.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core</artifactId>
<version>${camel.version}</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-spring</artifactId>
<version>${camel.version}</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-quartz2</artifactId>
<version>${camel.version}</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-kafka</artifactId>
<version>${camel.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
</dependencies>
</project>
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:camel="http://camel.apache.org/schema/spring"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util-4.2.xsd">
<camelContext xmlns="http://camel.apache.org/schema/spring">
<route id="consumer_test">
<from
uri="kafka:test_queue?brokers=192.168.10.122:9092&groupId=TESTGROUP" />
<log message="body = ${body}" />
</route>
<route id="producer_test">
<from uri="quartz2://test/testTimer?cron=0/1+*+*+*+*+?" />
<setBody>
<simple>${date:now}: test message</simple>
</setBody>
<to
uri="kafka:test_queue?brokers=192.168.10.122:9092" />
<log message="producer_test end" />
</route>
</camelContext>
</beans>
package sample;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class TestMain {
Logger logger = LoggerFactory.getLogger("sample.TestMain");
public static void main(String[] args) throws Exception {
TestMain main = new TestMain();
main.start();
}
private void start() throws Exception {
logger.info("start ");
try (ClassPathXmlApplicationContext applicationContext =
new ClassPathXmlApplicationContext("camel-context.xml")) {
applicationContext.start();
Thread.sleep(60000);
}
}
}