カフカがメッセージ送信の成功をどのように判断するか
Kafkaは分散メッセージキューシステムで、メッセージが正常に送信されたかを判断する様々な方法を提供しています。次に一般的な方法をいくつか示します。
- 同期送信方式:メッセージを送信するためにプロデューサーのsend()メソッドを使用して、返されたFutureオブジェクトのget()メソッドを使用してブロックして待機します。get()メソッドが正常に戻った場合、メッセージ送信に成功したことになり、そうでない場合、送信に失敗しました。
ProducerRecord<String, String> record = new ProducerRecord<>("topic", "key", "value");
try {
RecordMetadata metadata = producer.send(record).get();
System.out.println("消息发送成功,offset:" + metadata.offset());
} catch (InterruptedException | ExecutionException e) {
System.err.println("消息发送失败:" + e.getMessage());
}
- 非同期送信方式: Producerのsend()メソッドを使用してメッセージを送信し、メッセージの送信成否時にコールされるCallbackオブジェクトを渡す。
ProducerRecord<String, String> record = new ProducerRecord<>("topic", "key", "value");
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println("消息发送成功,offset:" + metadata.offset());
} else {
System.err.println("消息发送失败:" + exception.getMessage());
}
}
});
- 消息发送确认机制:Kafka提供了消息发送确认机制,可以确保消息被成功发送到指定的分区并写入磁盘。在Producer的配置中设置”acks”参数来指定确认机制的级别:
- acks=0: メッセージを送信する前には、プロデューサーは確認を待ちません。
- acks=1: リーダーレプリカにメッセージが届いたらプロデューサーは確認を受け取り、他のレプリカの確認を待ちません。
- プロデューサーは、レプリカのすべてが複製に関与し、確認を送信したときに初めて確認を受け取ります。
確認メカニズムを使用すると、一定程度でメッセージ送信の確実性を確保できます。ただし、確認メカニズムはメッセージ送信の遅延を増やすことに注意してください。そのため、パフォーマンス要件が高い場合は、acks = 1 のレベルを使用することを検討してください。
いずれの方法でも送信したメッセージが正常に送信できたかどうかは、返却される RecordMetadata オブジェクトの offset 値を確認することでわかります。offset が -1 でない場合は正常に送信され、それ以外の場合は送信に失敗しています。また、例外情報からも送信失敗の原因を確認できます。