使用Open Liberty和MicroProfile Reactive Messaging读取Kafka事件
在Java应用服务器Open Liberty中,可以通过使用称为MicroProfile Reactive Messaging的API规范来进行事件处理。
在这篇文章中,我们将尝试使用MicroProfile Reactive Messaging在Apache Kafka中实现读取和处理事件的消费者。关于事件发送的内容将在另一篇文章中介绍。

在本文中使用的组件- natively 这本报告使用的组件
本文中将使用以下组件版本。
由于将来可能无法与升级版本兼容,建议在投入生产之前在本地进行验证。
生成Open Liberty Java应用程序的模板
首先创建一个实现Consumer的Open Liberty应用程序。
使用Open Liberty网站上的雛形生成功能可以轻松创建应用程序的框架。
您可以从Starter应用程序的模板生成页面开始,按照下图的方式输入值,然后点击”生成项目”按钮。”Group”和”Artifact”可以填写任意值。

安装依存库
为了处理来自MicroProfile Reactive Messaging API的Kafka事件,请安装相应的依赖库。
在生成的基础项目中,我们已经选择了Maven作为依赖库的管理方法。在Maven的配置文件pom.xml的dependencies元素中,指定以下依赖库。安装将自动在Eclipse或IntelliJ等IDE上执行。
<!-- 必要なものだけ記載 -->
<dependencies>
<dependency>
<groupId>org.eclipse.microprofile.reactive.messaging</groupId>
<artifactId>microprofile-reactive-messaging-api</artifactId>
<version>2.0.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
<version>1.7.36</version> <!-- 2.x.y はNG -->
</dependency>
</dependencies>
在Open Liberty中,我们使用slf4j-jdk14来输出MicroProfile Reactive Messaging和Kafka客户端库的日志。
特征的指定
接下来,我们将指定功能并加载Open Liberty以使用MicroProfile Reactive Messaging。
在Open Liberty中使用的模块被细分为”Feature”的形式。通过仅使用所需的Feature,可以保持应用程序的大小较小。
本次使用的MicroProfile Reactive Messaging还准备了专用的功能。
在Open Liberty的配置文件 server.xml 中指定 mpReactiveMessaging-1.0 Feature。
我們順便在 元素中啟用日誌輸出。
<!-- src/main/liberty/config/server.xml -->
<?xml version="1.0" encoding="UTF-8"?>
<server description="My Liberty Server">
<!-- Enable features -->
<featureManager>
<feature>mpReactiveMessaging-1.0</feature>
</featureManager>
<!-- ...中略... -->
<!-- Logging -->
<logging traceSpecification="REACTIVEMESSAGE=all:org.apache.kafka.*=all"/>
</server>
定义Kafka的连接信息
安装了库和特性之后,接下来需要定义与Apache Kafka的连接信息。连接信息应在MicroProfile Config的配置文件microprofile-config.properties中定义。
请在<项目根目录>/src/main/resources/META-INF/microprofile-config.properties中定义以下设置值。
mp.messaging.incoming.purchases.connector=liberty-kafka
mp.messaging.incoming.purchases.bootstrap.servers=localhost:9092
mp.messaging.incoming.purchases.group.id=liberty-sample-consumer
mp.messaging.incoming.purchases.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.purchases.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.purchases.auto.offset.reset=earliest
在中文中,可以如下方式进行转述:
设定值需要按照以下格式进行定义:mp.messaging.<incoming|outgoing>.<频道名称>.<属性名称>=<设定值>。
如果您想要读取事件,请将第三个由点分隔的元素指定为”incoming”。相反,如果要发送事件,则将其指定为”outgoing”。
<属性名称>遵循Apache Kafka的配置值命名规则,如果您希望进一步详细配置,请参考Apache Kafka文档。
在这里,我们连接到 Kafka Broker localhost:9092,并定义事件的键和值分别为 String 类型。connector 属性是唯一的特殊属性,连接到 Kafka 时需要指定固定值 liberty-kafka。
参考:Apache Kafka 消费者配置
事件操作应用程序的实施
我們會實現一個從Kafka讀取事件的應用程序。我們將以日誌輸出指定爲購買商品名的事件值來進行實現。
以下是实施示例:
虽然只有大约20行的简单示例,但仅凭这个就可以处理事件。
// PurchaseResource.java
package com.example.resource;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import javax.enterprise.context.ApplicationScoped;
import java.util.logging.Logger;
@ApplicationScoped
public class PurchaseResource {
private static final Logger logger = Logger.getLogger(PurchaseResource.class.getName());
@Incoming("purchases")
public void consumePurchases(String item) {
final String itemMessage = String.format("Consumed item purchase: item name = %s", item);
logger.info(itemMessage);
}
}
最重要的部分是 @Incoming(“<频道名称>”)。
@Incoming 是由MicroProfile Reactive Messaging提供的用于读取事件的注解。指定的频道名称与在 microprofile-config.properties 中定义的频道名称相关联。
(1)指定参数为 Message 类型
(2)将参数的值强制转换为 ConsumerRecord 类型
(3)指定方法返回一个 CompletionStage 类型
有关详细示例,请参阅下面的博客文章。
博客-访问Kafka消息中的特定属性,并在Open Liberty 20.0.0.3中为Cookie设置SameSite属性。
应用程序的运行确认
由於應用程式準備就緒,所以現在可以連接到Kafka,並讀取事件。
在應用程式運行之前,請啟動Kafka Broker並創建名為”purchases”的主題。
此外,Starter应用程序已经集成了启动Open Liberty的Maven插件。通过以下命令启动Open Liberty。编译后的WAR文件将自动部署到Open Liberty服务器上。
$ mvn liberty:dev
如果成功加入具有“Generation”的组,则将输出大量日志,并且如果输出了成功加入“Generation”的日志,表示已连接到Kafka Broker并准备好读取事件。
让我们尝试从kafka-console-producer等生产者发送事件。
$ kafka-console-producer.sh --bootstrap-server localhost:9092 \
--topic purchases \
--property "parse.key=true" --property "key.separator=:"
>alice:apple
>bob:banana
>cris:orange
>^C
在Open Liberty应用程序中,事件的值几乎实时被记录输出。
...
[INFO] [2022/11/06 22:36:06:474 JST] 0000003c com.example.resource.PurchaseResource I Consumed item purchase: item name = apple
[INFO] [2022/11/06 22:36:08:018 JST] 0000003a com.example.resource.PurchaseResource I Consumed item purchase: item name = banana
[INFO] [2022/11/06 22:36:10:404 JST] 0000003d com.example.resource.PurchaseResource I Consumed item purchase: item name = orange
最终系统的构成
应用程序的最终目录结构如下所示:
“/path/to/project” 将填写任意项目的根路径。
$ tree /path/to/project
/path/to/project
├── pom.xml
└── src
└── main
├── java
│ └── com
│ └── example
│ └── resource
│ └── PurchaseResource.java
├── liberty
│ └── config
│ └── server.xml
└── resources
└── META-INF
└── microprofile-config.properties
请参考
-
- OpenLiberty – Creating reactive Java microservices
- Sending and receiving messages between microservices with MicroProfile Reactive Messaging