kafkaで最新のオフセットを取得する方法
KafkaのJavaクライアントAPIを使用して、Kafkaトピックの各パーティションの最後のオフセットを取得できます。
まず、KafkaConsumerインスタンスを作成し、bootstrap.servers、group.idなどの必要な構成プロパティを設定します。
その後、コンシューマーのassign()メソッドを使用して、オフセットを取得したいトピックパーティションをコンシューマーに割り当てます。
次に、コンシューマの seekToEnd() メソッドを呼び出してコンシューマの位置をパーティションの最後のオフセットに設定します
最後に、各パーティションの最後のオフセットをconsumerのposition()メソッドで取得する。
以下は、Kafka トピックの各パーティションの最後のオフセットを取得する方法を示すサンプルコードです。
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
public class KafkaOffsetExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
List<PartitionInfo> partitions = consumer.partitionsFor("test-topic");
List<TopicPartition> topicPartitions = new ArrayList<>();
for (PartitionInfo partition : partitions) {
topicPartitions.add(new TopicPartition(partition.topic(), partition.partition()));
}
consumer.assign(topicPartitions);
consumer.seekToEnd(topicPartitions);
Map<TopicPartition, Long> endOffsets = new HashMap<>();
for (TopicPartition topicPartition : topicPartitions) {
endOffsets.put(topicPartition, consumer.position(topicPartition));
}
for (Map.Entry<TopicPartition, Long> entry : endOffsets.entrySet()) {
System.out.println("Partition: " + entry.getKey() + ", Last Offset: " + entry.getValue());
}
consumer.close();
}
}
上記のサンプルでは、Kafkaクラスタのブートストラップサーバーアドレスとしてlocalhost:9092を使用し、test-groupをコンシューマグループIDとして使用し、test-topicをオフセットを取得するトピックとして使用します。
Kafka クラスタアドレス、トピック、コンシューマグループ ID が、コードで正しく設定されていることを確認してください。