javaでApache Kafkaにデータを書き込む

Javaでは、Kafkaが提供するProducer APIを使用して、Kafkaにデータを書き込むことができます。次に簡単なサンプルコードを示します。

import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class KafkaProducerExample {
public static void main(String[] args) {
// 设置Kafka相关配置
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 创建一个Kafka生产者
Producer<String, String> producer = new KafkaProducer<>(props);
// 构建一个消息
String topic = "my-topic";
String key = "my-key";
String value = "Hello, Kafka!";
// 发送消息到Kafka
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (metadata != null) {
System.out.println("消息发送成功,偏移量为:" + metadata.offset());
} else {
System.out.println("消息发送失败,原因为:" + exception.getMessage());
}
}
});
// 关闭Kafka生产者
producer.close();
}
}

上記コードでは、まずKafka関連の設定を含むPropertiesオブジェクトを作成し、その設定を使用してKafkaプロデューサーを作成しました。次に、メッセージを構築し、ProducerRecordを使用してそのメッセージを指定されたトピックに送信しました。最後に、close()メソッドを呼び出してKafkaプロデューサーを閉じました。

Kafkaのコンフィグレーションに従って、`bootstrap.servers`プロパティの値を変更する必要があります。また、正しいトピック名を指定してください。さらに、必要に応じてメッセージのキーと値を変更できます。

上記のコードでのメッセージ送信は非同期となっている点に注意して下さい。つまり、producer.send()関数は直ぐに返り値を返し、メッセージがKafkaに書き込まれるのを待ちません。もし、メッセージの送信を同期的に必要とする場合は、send().get()関数が使用できます。これは現在のスレッドをメッセージの送信が完了するまでブロックします。

また、コールバック関数のonCompletion()メソッドで送信結果を処理できます。メッセージがKafkaに正常に書き込まれた場合、metadataパラメータには、書き込まれたメッセージに関するメタデータ(トピック、パーティション、オフセットなど)が含まれます。送信が失敗した場合は、exceptionパラメータに失敗理由に関する例外情報が含まれます。

この情報がお役に立てば幸いです。

bannerAds