使用Apache Camel框架的Kafka组件进行简单的消息发送和接收

首先

使用Apache Camel的Kafka组件,可以轻松实现与kafka的消息发送和接收。
通过使用它,我们将尝试创建以下程序。

每秒发送一条消息到Topic(test_queue)。
接收Topic(test_queue)的消息并将其输出到日志中。

※ 本文中,我們使用了 XML DSL 來定義 route,但在下一篇文章中我們將使用 Java DSL 版本進行撰寫。此外,該文章也提供了比本文更多的資訊。供您參考。

    Apache CamelフレームワークのKafkaコンポーネントを使用して簡単なメッセージの送受信(Java DSL編)

修改pom.xml文件

首先,我们需要在pom.xml中添加适用于kafka的库。

        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-kafka</artifactId>
            <version>2.21.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.0.0</version>
        </dependency>

制片人创建

每秒向Topic(test_queue)发送一条消息。
发送的消息格式为“${date:now}: test message”,包含了发送时间的字符串。

我写了以下的XML DSL路由。

        <route id="producer_test">
            <from uri="quartz2://test/testTimer?cron=0/1+*+*+*+*+?" />
            <setBody>
                <simple>${date:now}: test message</simple>
            </setBody>
            <to
                uri="kafka:test_queue?brokers=192.168.10.122:9092" />
            <log message="producer_test end" />
        </route>

「」是Producer的设置,指定形式为「kafka:Topic名称?brokers=broker的主机:端口」。

如果您想更改Topic的副本数量等设置,请提前创建Topic。

消费者制作

接收主题(test_queue)的消息,并将其输出到日志中。
编写了以下的XML DSL路由。

        <route id="consumer_test">
            <from
                uri="kafka:test_queue?brokers=192.168.10.122:9092&amp;groupId=TESTGROUP" />
            <log message="body = ${body}" />
        </route>

在Consumer的设置中,这个部分可以这样指定:「」。
因为Consumer这次只有一个,所以不需要对属性进行设置也可以。同时需要注意在指定Topic的时候需要使用「kafka:Topic名?brokers=broker的主机:端口」。

运行已创建的Camel应用程序

运行创建的Camel应用程序,您可以确认每秒输出一次“body = Sun Sep 02 10:15:01 JST 2018: test message”的日志。

[2018-09-02 10:15:01.041], [INFO ], consumer_test, Camel (camel-1) thread #1 - KafkaConsumer[test_queue2], consumer_test, body = Sun Sep 02 10:15:01 JST 2018: test message
[2018-09-02 10:15:02.011], [INFO ], producer_test, Camel (camel-1) thread #3 - KafkaProducer[test_queue2], producer_test, producer_test end
[2018-09-02 10:15:02.011], [INFO ], consumer_test, Camel (camel-1) thread #1 - KafkaConsumer[test_queue2], consumer_test, body = Sun Sep 02 10:15:02 JST 2018: test message
[2018-09-02 10:15:03.011], [INFO ], producer_test, Camel (camel-1) thread #4 - KafkaProducer[test_queue2], producer_test, producer_test end
[2018-09-02 10:15:03.011], [INFO ], consumer_test, Camel (camel-1) thread #1 - KafkaConsumer[test_queue2], consumer_test, body = Sun Sep 02 10:15:03 JST 2018: test message
[2018-09-02 10:15:04.011], [INFO ], producer_test, Camel (camel-1) thread #5 - KafkaProducer[test_queue2], producer_test, producer_test end
[2018-09-02 10:15:04.011], [INFO ], consumer_test, Camel (camel-1) thread #1 - KafkaConsumer[test_queue2], consumer_test, body = Sun Sep 02 10:15:04 JST 2018: test message
[2018-09-02 10:15:05.005], [INFO ], producer_test, Camel (camel-1) thread #6 - KafkaProducer[test_queue2], producer_test, producer_test end

请参阅以下页面,了解除了本次使用的属性之外的许多Kafka组件的属性。

    Kafka Component(公式サイト)

全资源

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>sample</groupId>
    <artifactId>kafka-camel_simple</artifactId>
    <version>1.0.0</version>
    <name>kafka-camel_simple</name>
    <description>kafka-camel_simple</description>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
        <camel.version>2.22.0</camel.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-core</artifactId>
            <version>${camel.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-spring</artifactId>
            <version>${camel.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-quartz2</artifactId>
            <version>${camel.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-kafka</artifactId>
            <version>${camel.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>2.8.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.8.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>2.8.2</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
        </dependency>
    </dependencies>

</project>
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:camel="http://camel.apache.org/schema/spring"
    xmlns:util="http://www.springframework.org/schema/util"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
          http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
          http://camel.apache.org/schema/spring
          http://camel.apache.org/schema/spring/camel-spring.xsd
          http://www.springframework.org/schema/util
          http://www.springframework.org/schema/util/spring-util-4.2.xsd">

    <camelContext xmlns="http://camel.apache.org/schema/spring">
        <route id="consumer_test">
            <from
                uri="kafka:test_queue?brokers=192.168.10.122:9092&amp;groupId=TESTGROUP" />
            <log message="body = ${body}" />
        </route>

        <route id="producer_test">
            <from uri="quartz2://test/testTimer?cron=0/1+*+*+*+*+?" />
            <setBody>
                <simple>${date:now}: test message</simple>
            </setBody>
            <to
                uri="kafka:test_queue?brokers=192.168.10.122:9092" />
            <log message="producer_test end" />
        </route>
    </camelContext>
</beans>
package sample;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class TestMain {

    Logger logger = LoggerFactory.getLogger("sample.TestMain");

    public static void main(String[] args) throws Exception {
        TestMain main = new TestMain();
        main.start();
    }

    private void start() throws Exception {
        logger.info("start ");
        try (ClassPathXmlApplicationContext applicationContext =
                  new ClassPathXmlApplicationContext("camel-context.xml")) {
            applicationContext.start();
            Thread.sleep(60000);
        }
    }
}
bannerAds