JavaでKafkaメッセージを取得するにはどうしますか。
JavaでKafkaメッセージを受信するには、KafkaのJavaクライアントライブラリを使用する必要があります。一般的なKafkaメッセージの受信方法を以下に示します。
- まず、プロジェクトにApache Kafkaの公式のクライアントライブラリやSpring KafkaなどのKafkaのJava クライアントライブラリが確実に導入されていることを確認してください。
- Kafkaのアドレス、ポート等のKafkaクラスターの接続設定を行い、Kafkaコンシューマーオブジェクトを作成します。
- Properties props = new Properties();
props.put(“bootstrap.servers”, “kafka1:9092,kafka2:9092”); // カフカ集群のアドレスとポート
props.put(“group.id”, “group1”); // コンシューマーグループのID
props.put(“auto.offset.reset”, “latest”); // コンシューマーが最新のメッセージから消費を開始
props.put(“key.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);
props.put(“value.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); - Kafka のトピックを 1 つ以上サブスクライブします。
- コンシューマー.サブスクライブ(Arrays.asList(“topic1”, “topic2”)); // トピックリストを購読
- ループ内でKafkaから継続的にメッセージをプルして処理します。
- while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); // Kafkaからメッセージを取得
for (ConsumerRecord<String, String> record : records) {
String topic = record.topic();
int partition = record.partition();
long offset = record.offset();
String key = record.key();
String value = record.value();// 受信したメッセージを処理
System.out.printf(“メッセージを受け取りました: トピック = %s, パーティション = %d, オフセット = %d, キー = %s, 値 = %s\n”,
topic, partition, offset, key, value);
}
} - 実際利用する際には、各種例外処理に注意し、プログラム終了時にコンシューマーを確実に閉じましょう。
上記の手順で、JavaでKafkaメッセージを受信することができます。