javaプログラムでKafkaのトピックを取得する方法
Javaプログラムは、Kafkaが提供するJavaクライアント経由で、Kafkaのトピックにアクセスできます。以下は、トピックリストを取得するサンプルコードです。
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.TopicListing;
import org.apache.kafka.common.KafkaFuture;
import java.util.Collection;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class KafkaTopicExample {
public static void main(String[] args) {
// Kafka配置
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// 创建AdminClient对象
try (AdminClient adminClient = AdminClient.create(properties)) {
// 获取topic列表
ListTopicsResult topicsResult = adminClient.listTopics();
// 获取Future对象
KafkaFuture<Collection<TopicListing>> topicListingFuture = topicsResult.listings();
// 获取topic列表
Collection<TopicListing> topicListings = topicListingFuture.get();
// 遍历输出每个topic
for (TopicListing topicListing : topicListings) {
System.out.println(topicListing.name());
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
上記のコードでは、最初にAdminClientオブジェクトを作成し、Kafkaの設定を渡します。次に、listTopicsメソッドを使用してListTopicsResultオブジェクトを取得します。このオブジェクトには、トピックのリストを取得する方法が含まれています。listingsメソッドを呼び出すことで、トピックのリストを取得する非同期プロセスを表すKafkaFutureオブジェクトを取得します。最後に、getメソッドを呼び出して実際のトピックのリストを取得し、各トピックの名前を出力します。
なお、ここでは、Kafkaクラスターのアドレスを指定するために『bootstrap.servers』パラメーターを使用しています。お使いの実際の、Kafkaクラスターの構成に合わせて変更してください。