Kafka手動でオフセットコミットする方法
Kafkaでは手動でオフセットをコミットするための2つの方法が提供されている
- commitSync() メソッドを使用してオフセットを同期的にコミットする:
import org.apache.kafka.clients.consumer.*;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "false"); // 关闭自动提交
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test-topic"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// 处理消息
}
consumer.commitSync(); // 手动提交偏移量
}
} finally {
consumer.close();
}
- 非同期的にオフセットを送信するには、commitAsync()メソッドを使用します。
import org.apache.kafka.clients.consumer.*;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "false"); // 关闭自动提交
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test-topic"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// 处理消息
}
consumer.commitAsync(); // 异步提交偏移量
}
} finally {
consumer.close();
}
commitSync()メソッドは、オフセットのコミットが成功するかエラーが発生するまでブロックし続けますが、commitAsync()メソッドはコミット要求を送信するとすぐに応答し、確認を待ちません。エラーが発生した場合は、commitAsync()メソッドのコールバック関数で処理できます。