kafkaのトピックデータを削除する方法

Kafka のトピックデータを削除する方法について説明します。

  1. kafka-topics.sh
kafka-topics.sh --zookeeper <zookeeper地址> --topic <主题名称> --delete --if-exists

このコマンドにより、指定したトピックのすべてのパティションのデータが削除されます。

  1. Adminクライアント
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.DeleteRecordsResult;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class ClearTopic {
   public static void main(String[] args) {
      // 设置Kafka连接配置
      Properties props = new Properties();
      props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "<Kafka服务器地址>");

      // 创建AdminClient
      try (AdminClient adminClient = AdminClient.create(props)) {
         // 获取主题的分区信息
         ListOffsetsResult listOffsetsResult = adminClient.listOffsets(Collections.singletonMap(new TopicPartition("<主题名称>", 0), ListOffsetsResult.EARLIEST_TIMESTAMP));
         Map<TopicPartition, ListOffsetsResultInfo> topicOffsets = listOffsetsResult.all().get();
         
         // 删除主题的数据
         DeleteRecordsResult deleteRecordsResult = adminClient.deleteRecords(topicOffsets);
         KafkaFuture<Map<TopicPartition, DeletedRecords>> deletedRecords = deleteRecordsResult.deletedRecords();
         deletedRecords.get();
         
         System.out.println("主题数据已清空");
      } catch (InterruptedException | ExecutionException e) {
         e.printStackTrace();
      }
   }
}

このコードは指定したトピックのすべてのパーティションデータを削除します。

テーマデータを空にする操作は危険なので注意が必要です。一度削除されると復元できません。そのため、空にする操作を行う前に、必ず操作に間違いがないかを確認し、重要なデータはバックアップしてください。

bannerAds