kafkaでキュー内のメッセージ数を取得する方法
Kafkaキュー内のメッセージ数を調べる方法は、以下のようにいくつかあります。
- kafka-console-consumer をネイティブ日本語に言い換えてください。
- 東京で大きな地震が起こった.
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic <topic-name> --from-beginning
これにより、すべてのメッセージが端末に表示されて、最後に読み込まれたメッセージの総数が表示されます。
- Kafkaが提供する独自のJMXインターフェイスを使用して、キュー内のメッセージ数を取得できます。KafkaのJMXインターフェイスには、Kafkaクラスタの監視や管理に使用できます。JMXクライアントツール(JConsoleやVisualVMなど)を使用してKafkaのJMXインターフェイスに接続すると、kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSecという指標がキュー内のメッセージ数を表す指標であることがわかります。
- カフカのJavaクライアントAPIを使用してキュー内のメッセージ数を取得するには、Javaプログラムを作成してカフカのJavaクライアントAPIを使用してKafkaクラスタに接続し、Consumerオブジェクトの`position()`メソッドを使用して現在のコンシューマ位置を取得し(つまり、消費したメッセージ数)、`beginningOffsets()`メソッドで返される開始オフセットを差し引くことができます。メッセージ数を取得できます。以下は、サンプルコードです。
import org.apache.kafka.clients.consumer.*;
import java.util.*;
public class KafkaMessageCount {
public static void main(String[] args) {
String topicName = "<topic-name>";
String bootstrapServers = "localhost:9092";
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServers);
props.put("group.id", "test-group");
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Collections.singletonList(topicName));
consumer.poll(0); // 必须在订阅后调用poll()方法,以获取分区分配
long beginningOffset = consumer.beginningOffsets(consumer.assignment()).values().iterator().next();
long currentPosition = consumer.position(consumer.assignment()).values().iterator().next();
long messageCount = currentPosition - beginningOffset;
System.out.println("队列中的消息数: " + messageCount);
} catch (Exception e) {
e.printStackTrace();
}
}
}
topic名と localhost:9092 を実際のトピックとKafkaサーバーアドレスに置き換えて実行すれば、キューのメッセージ数がわかります。
消費メッセージを介してキュー内のメッセージ数を取得するため、クラスターに一定の消耗を引き起こす可能性がある点に注意してください。メッセージを消費せずにメッセージ数だけを取得する必要がある場合は、Kafka の管理ツール、JMX インターフェイス、または Kafka の Java クライアント API の KafkaConsumer#endOffsets() メソッドを使用して、各パーティションの最新のオフセットを取得し、合計して総メッセージ数を求めます。