kafkaで最新のオフセットを取得する方法

KafkaのJavaクライアントAPIを使用して、Kafkaトピックの各パーティションの最後のオフセットを取得できます。

まず、KafkaConsumerインスタンスを作成し、bootstrap.servers、group.idなどの必要な構成プロパティを設定します。

その後、コンシューマーのassign()メソッドを使用して、オフセットを取得したいトピックパーティションをコンシューマーに割り当てます。

次に、コンシューマの seekToEnd() メソッドを呼び出してコンシューマの位置をパーティションの最後のオフセットに設定します

最後に、各パーティションの最後のオフセットをconsumerのposition()メソッドで取得する。

以下は、Kafka トピックの各パーティションの最後のオフセットを取得する方法を示すサンプルコードです。

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;

import java.util.*;

public class KafkaOffsetExample {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        List<PartitionInfo> partitions = consumer.partitionsFor("test-topic");

        List<TopicPartition> topicPartitions = new ArrayList<>();
        for (PartitionInfo partition : partitions) {
            topicPartitions.add(new TopicPartition(partition.topic(), partition.partition()));
        }

        consumer.assign(topicPartitions);
        consumer.seekToEnd(topicPartitions);

        Map<TopicPartition, Long> endOffsets = new HashMap<>();
        for (TopicPartition topicPartition : topicPartitions) {
            endOffsets.put(topicPartition, consumer.position(topicPartition));
        }

        for (Map.Entry<TopicPartition, Long> entry : endOffsets.entrySet()) {
            System.out.println("Partition: " + entry.getKey() + ", Last Offset: " + entry.getValue());
        }

        consumer.close();
    }
}

上記のサンプルでは、Kafkaクラスタのブートストラップサーバーアドレスとしてlocalhost:9092を使用し、test-groupをコンシューマグループIDとして使用し、test-topicをオフセットを取得するトピックとして使用します。

Kafka クラスタアドレス、トピック、コンシューマグループ ID が、コードで正しく設定されていることを確認してください。

bannerAds