KafkaからデータをJavaで取得する方法

Javaでは、Apache KafkaのJavaクライアントライブラリを使用してKakfaからデータを読み取ることができます。簡単なサンプルコードを次に示します。

まずプロジェクトにKafkaのJavaクライアントライブラリの依存関係を追加する必要があります。依存関係は次の依存関係を構築ツールの(MavenもしくはGradleなど)の設定ファイルに追加することで追加できます。

<!-- Kafka client -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.8.0</version>
</dependency>

その場合、下記のコードでKafkaからデータを読み取ることができます。

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerExample {
    public static void main(String[] args) {
        // Kafka集群的地址
        String bootstrapServers = "localhost:9092";
        // 消费者组的ID
        String groupId = "my-group";
        // 要消费的主题
        String topic = "my-topic";

        // 配置消费者的属性
        Properties properties = new Properties();
        properties.put("bootstrap.servers", bootstrapServers);
        properties.put("group.id", groupId);
        properties.put("key.deserializer", StringDeserializer.class);
        properties.put("value.deserializer", StringDeserializer.class);

        // 创建消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        // 订阅主题
        consumer.subscribe(Collections.singletonList(topic));

        // 无限循环从Kafka中读取消息
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("Received message: " + record.value());
            }
        }
    }
}

上記コードでは、コンシューマーを作成して、トピックを購読しました。その後、 consumer.poll(1000)を呼び出して、kafkaからデータをプルしています。この例では、単に受信したメッセージをコンソールに出力しています。

BootstrapServers、groupId、topicを接続するKafkaクラスタのアドレス、コンシューマーグループのID、コンシュームするトピックに置き換えて下さい。

お役に立てれば幸いです!

bannerAds