kafkaはオフセットをどのようにリセットしますか
Kafkaは、kafka-consumer-groups.shコマンドラインツールを使用するか、プログラムを使用して、2つの方法でオフセットをリセットします。
方法 1: kafka-consumer-groups.sh コマンドラインツールを使用します。
- ターミナルウィンドウを開く。
- Kafkaのインストールディレクトリのbinディレクトリに切り替えます。
- ./kafka-consumer-groups.sh –bootstrap-server –group <コンシューマー グループ> –reset-offsets –to-earliest –topic <トピック名> –execute
- Kafka ブローカー
- コンシューマー・グループ
- 種名
- 一番古い順
- –execute
方法2:プログラミングを使用したリセット
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
import org.apache.kafka.clients.admin.ResetConsumerGroupOffsetsOptions;
import org.apache.kafka.clients.admin.ResetConsumerGroupOffsetsResult;
import org.apache.kafka.common.TopicPartition;
import java.util.Collections;
import java.util.Properties;
public class KafkaOffsetReset {
public static void main(String[] args) throws Exception {
// Kafka broker地址
String bootstrapServers = "<kafka_broker>";
// 消费者组名称
String groupId = "<consumer_group>";
// 主题名称
String topic = "<topic_name>";
// 创建AdminClient
Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
AdminClient adminClient = AdminClient.create(properties);
// 获取消费者组描述
ConsumerGroupDescription consumerGroupDescription = adminClient.describeConsumerGroups(Collections.singleton(groupId)).all().get().get(groupId);
// 获取消费者组的偏移量
ListConsumerGroupOffsetsOptions options = new ListConsumerGroupOffsetsOptions();
options.topicPartitions(Collections.singleton(new TopicPartition(topic, 0))); // 这里假设只有一个分区
adminClient.listConsumerGroupOffsets(groupId, options).partitionsToOffsetAndMetadata().get().forEach((tp, om) -> {
System.out.println("Partition: " + tp.partition() + ", Offset: " + om.offset());
});
// 重置消费者组的偏移量
ResetConsumerGroupOffsetsOptions resetOptions = new ResetConsumerGroupOffsetsOptions();
resetOptions.topicPartitions(Collections.singletonMap(new TopicPartition(topic, 0), consumerGroupDescription));
ResetConsumerGroupOffsetsResult resetResult = adminClient.resetConsumerGroupOffsets(groupId, resetOptions);
resetResult.partitionsToOffsetAndMetadata().get().forEach((tp, om) -> {
System.out.println("Partition: " + tp.partition() + ", Offset: " + om.offset());
});
// 关闭AdminClient
adminClient.close();
}
}
コードの中では、はKafkaブローカーのアドレスに、はコンシューマーグループ名に、はトピック名に置き換える必要があります。このサンプルではパーティションは1つであると想定していますが、実際には必要に応じて修正してください。
どんな方法を使用する場合でもオフセットのリセットは注意して操作する必要があります。データの再コンシュームやデータの損失につながる可能性があります。本番環境では慎重にご利用ください。