カフカがメッセージ送信の成功をどのように判断するか

Kafkaは分散メッセージキューシステムで、メッセージが正常に送信されたかを判断する様々な方法を提供しています。次に一般的な方法をいくつか示します。

  1. 同期送信方式:メッセージを送信するためにプロデューサーのsend()メソッドを使用して、返されたFutureオブジェクトのget()メソッドを使用してブロックして待機します。get()メソッドが正常に戻った場合、メッセージ送信に成功したことになり、そうでない場合、送信に失敗しました。
ProducerRecord<String, String> record = new ProducerRecord<>("topic", "key", "value");
try {
    RecordMetadata metadata = producer.send(record).get();
    System.out.println("消息发送成功,offset:" + metadata.offset());
} catch (InterruptedException | ExecutionException e) {
    System.err.println("消息发送失败:" + e.getMessage());
}
  1. 非同期送信方式: Producerのsend()メソッドを使用してメッセージを送信し、メッセージの送信成否時にコールされるCallbackオブジェクトを渡す。
ProducerRecord<String, String> record = new ProducerRecord<>("topic", "key", "value");
producer.send(record, new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        if (exception == null) {
            System.out.println("消息发送成功,offset:" + metadata.offset());
        } else {
            System.err.println("消息发送失败:" + exception.getMessage());
        }
    }
});
  1. 消息发送确认机制:Kafka提供了消息发送确认机制,可以确保消息被成功发送到指定的分区并写入磁盘。在Producer的配置中设置”acks”参数来指定确认机制的级别:
  1. acks=0: メッセージを送信する前には、プロデューサーは確認を待ちません。
  2. acks=1: リーダーレプリカにメッセージが届いたらプロデューサーは確認を受け取り、他のレプリカの確認を待ちません。
  3. プロデューサーは、レプリカのすべてが複製に関与し、確認を送信したときに初めて確認を受け取ります。

確認メカニズムを使用すると、一定程度でメッセージ送信の確実性を確保できます。ただし、確認メカニズムはメッセージ送信の遅延を増やすことに注意してください。そのため、パフォーマンス要件が高い場合は、acks = 1 のレベルを使用することを検討してください。

いずれの方法でも送信したメッセージが正常に送信できたかどうかは、返却される RecordMetadata オブジェクトの offset 値を確認することでわかります。offset が -1 でない場合は正常に送信され、それ以外の場合は送信に失敗しています。また、例外情報からも送信失敗の原因を確認できます。

bannerAds