使用Open Liberty和MicroProfile Reactive Messaging读取Kafka事件

在Java应用服务器Open Liberty中,可以通过使用称为MicroProfile Reactive Messaging的API规范来进行事件处理。

在这篇文章中,我们将尝试使用MicroProfile Reactive Messaging在Apache Kafka中实现读取和处理事件的消费者。关于事件发送的内容将在另一篇文章中介绍。

スクリーンショット 2022-11-06 21.54.44.png

在本文中使用的组件- natively 这本报告使用的组件

本文中将使用以下组件版本。
由于将来可能无法与升级版本兼容,建议在投入生产之前在本地进行验证。

コンポーネント名バージョンJDKOpenJDK 17.0.4.1 (IBM Semeru Runtime Open Edition)Open Liberty22.0.0.11

生成Open Liberty Java应用程序的模板

首先创建一个实现Consumer的Open Liberty应用程序。
使用Open Liberty网站上的雛形生成功能可以轻松创建应用程序的框架。

您可以从Starter应用程序的模板生成页面开始,按照下图的方式输入值,然后点击”生成项目”按钮。”Group”和”Artifact”可以填写任意值。

スクリーンショット 2022-11-06 21.56.54.png
请注意,在撰写本文时,如果将Java EE / Jakarta EE版本指定为“9.1”,它将无法作为消费者正常运行。

安装依存库

为了处理来自MicroProfile Reactive Messaging API的Kafka事件,请安装相应的依赖库。
在生成的基础项目中,我们已经选择了Maven作为依赖库的管理方法。在Maven的配置文件pom.xml的dependencies元素中,指定以下依赖库。安装将自动在Eclipse或IntelliJ等IDE上执行。

    <!-- 必要なものだけ記載 -->
    <dependencies>
        <dependency>
            <groupId>org.eclipse.microprofile.reactive.messaging</groupId>
            <artifactId>microprofile-reactive-messaging-api</artifactId>
            <version>2.0.1</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.3.1</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-jdk14</artifactId>
            <version>1.7.36</version> <!-- 2.x.y はNG -->
        </dependency>
    </dependencies>

在Open Liberty中,我们使用slf4j-jdk14来输出MicroProfile Reactive Messaging和Kafka客户端库的日志。

在2022年11月的本文撰写时,slf4j-jdk14有一个更高的2.x.y版本可供使用。但是,如果指定了2.x版本,应用程序可能无法正常启动,因此建议指定上述版本以确保安全。

特征的指定

接下来,我们将指定功能并加载Open Liberty以使用MicroProfile Reactive Messaging。

在Open Liberty中使用的模块被细分为”Feature”的形式。通过仅使用所需的Feature,可以保持应用程序的大小较小。

本次使用的MicroProfile Reactive Messaging还准备了专用的功能。
在Open Liberty的配置文件 server.xml 中指定 mpReactiveMessaging-1.0 Feature。

我們順便在 元素中啟用日誌輸出。

<!-- src/main/liberty/config/server.xml -->
<?xml version="1.0" encoding="UTF-8"?>
<server description="My Liberty Server">

    <!-- Enable features -->
    <featureManager>
        <feature>mpReactiveMessaging-1.0</feature>
    </featureManager>

    <!-- ...中略... -->

    <!-- Logging -->
    <logging traceSpecification="REACTIVEMESSAGE=all:org.apache.kafka.*=all"/>

</server>

定义Kafka的连接信息

安装了库和特性之后,接下来需要定义与Apache Kafka的连接信息。连接信息应在MicroProfile Config的配置文件microprofile-config.properties中定义。

请在<项目根目录>/src/main/resources/META-INF/microprofile-config.properties中定义以下设置值。

mp.messaging.incoming.purchases.connector=liberty-kafka
mp.messaging.incoming.purchases.bootstrap.servers=localhost:9092
mp.messaging.incoming.purchases.group.id=liberty-sample-consumer
mp.messaging.incoming.purchases.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.purchases.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.purchases.auto.offset.reset=earliest

在中文中,可以如下方式进行转述:

设定值需要按照以下格式进行定义:mp.messaging.<incoming|outgoing>.<频道名称>.<属性名称>=<设定值>。

如果您想要读取事件,请将第三个由点分隔的元素指定为”incoming”。相反,如果要发送事件,则将其指定为”outgoing”。
<属性名称>遵循Apache Kafka的配置值命名规则,如果您希望进一步详细配置,请参考Apache Kafka文档。

在这里,我们连接到 Kafka Broker localhost:9092,并定义事件的键和值分别为 String 类型。connector 属性是唯一的特殊属性,连接到 Kafka 时需要指定固定值 liberty-kafka。

参考:Apache Kafka 消费者配置

事件操作应用程序的实施

我們會實現一個從Kafka讀取事件的應用程序。我們將以日誌輸出指定爲購買商品名的事件值來進行實現。

以下是实施示例:
虽然只有大约20行的简单示例,但仅凭这个就可以处理事件。

// PurchaseResource.java
package com.example.resource;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import javax.enterprise.context.ApplicationScoped;
import java.util.logging.Logger;

@ApplicationScoped
public class PurchaseResource {
    private static final Logger logger = Logger.getLogger(PurchaseResource.class.getName());

    @Incoming("purchases")
    public void consumePurchases(String item) {
        final String itemMessage = String.format("Consumed item purchase: item name = %s", item);
        logger.info(itemMessage);
    }
}

最重要的部分是 @Incoming(“<频道名称>”)。
@Incoming 是由MicroProfile Reactive Messaging提供的用于读取事件的注解。指定的频道名称与在 microprofile-config.properties 中定义的频道名称相关联。

在此示例代码中,我们只指定了添加了 @Incoming 注解的方法的事件值参数。如果想获取事件的键或主题名称,可以添加以下三个实现。
(1)指定参数为 Message 类型
(2)将参数的值强制转换为 ConsumerRecord 类型
(3)指定方法返回一个 CompletionStage 类型
有关详细示例,请参阅下面的博客文章。
博客-访问Kafka消息中的特定属性,并在Open Liberty 20.0.0.3中为Cookie设置SameSite属性。

应用程序的运行确认

由於應用程式準備就緒,所以現在可以連接到Kafka,並讀取事件。
在應用程式運行之前,請啟動Kafka Broker並創建名為”purchases”的主題。

此外,Starter应用程序已经集成了启动Open Liberty的Maven插件。通过以下命令启动Open Liberty。编译后的WAR文件将自动部署到Open Liberty服务器上。

$ mvn liberty:dev

如果成功加入具有“Generation”的组,则将输出大量日志,并且如果输出了成功加入“Generation”的日志,表示已连接到Kafka Broker并准备好读取事件。

让我们尝试从kafka-console-producer等生产者发送事件。

$ kafka-console-producer.sh --bootstrap-server localhost:9092 \
    --topic purchases \
    --property "parse.key=true" --property "key.separator=:"

>alice:apple
>bob:banana
>cris:orange
>^C

在Open Liberty应用程序中,事件的值几乎实时被记录输出。

...
[INFO] [2022/11/06 22:36:06:474 JST] 0000003c com.example.resource.PurchaseResource                        I Consumed item purchase: item name = apple
[INFO] [2022/11/06 22:36:08:018 JST] 0000003a com.example.resource.PurchaseResource                        I Consumed item purchase: item name = banana
[INFO] [2022/11/06 22:36:10:404 JST] 0000003d com.example.resource.PurchaseResource                        I Consumed item purchase: item name = orange

最终系统的构成

应用程序的最终目录结构如下所示:
“/path/to/project” 将填写任意项目的根路径。

$ tree /path/to/project

/path/to/project
├── pom.xml
└── src
    └── main
        ├── java
        │   └── com
        │       └── example
        │           └── resource
        │               └── PurchaseResource.java
        ├── liberty
        │   └── config
        │       └── server.xml
        └── resources
            └── META-INF
                └── microprofile-config.properties

请参考

    • OpenLiberty – Creating reactive Java microservices

 

    Sending and receiving messages between microservices with MicroProfile Reactive Messaging
广告
将在 10 秒后关闭
bannerAds