sparkはどのようにしてkafkaからデータを読み込むか?
スパークでのカフカデータの読み込みには、スパークの公式Kafka統合ライブラリであるスパークストリーミングカフカを使用することができます。
最初に、SparkプロジェクトにSpark Streaming Kafkaの依存関係を追加する必要があります。Mavenプロジェクトの場合、以下の依存関係をpom.xmlファイルに追加することができます。
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>3.0.2</version>
</dependency>
その後、SparkSessionオブジェクトを使用してStreamingContextを作成し、バッチ処理の間隔を指定できます。
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.kafka.common.serialization.StringDeserializer;
public class KafkaStreamingExample {
public static void main(String[] args) throws InterruptedException {
// 创建SparkConf对象
SparkConf sparkConf = new SparkConf().setAppName("KafkaStreamingExample").setMaster("local[*]");
// 创建JavaStreamingContext对象
JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(1));
// 设置Kafka参数
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "test-group");
// 创建Kafka主题列表
Collection<String> topics = Arrays.asList("topic1", "topic2");
// 创建Kafka输入流
JavaInputDStream<ConsumerRecord<String, String>> kafkaStream = KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
// 处理Kafka数据
kafkaStream.foreachRDD(rdd -> {
// 在这里对每个RDD进行处理
rdd.foreach(record -> {
System.out.println("Key: " + record.key() + ", Value: " + record.value());
});
});
// 启动流处理程序
streamingContext.start();
// 等待流处理程序终止
streamingContext.awaitTermination();
}
}
上記の例では、まずSparkConfオブジェクトとJavaStreamingContextオブジェクトを作成しました。 次に、Kafkaのパラメータを設定しました。これには、Kafkaサーバーのアドレス、キーと値の逆シリアル化クラス、およびコンシューマーグループIDが含まれます。 次に、Kafka入力ストリームを作成し、購読するトピックとKafkaのパラメータを指定しました。 最後に、foreachRDDメソッドを使用して、各RDDを処理し、各レコードのキーと値を取得しました。
上記の例では、createDirectStreamメソッドはKafka 0.10バージョンおよびそれ以降のバージョンに適用されます。古いバージョンのKafkaを使用している場合は、別のオーバーロードバージョンのcreateDirectStreamメソッドを使用することができます。また、必要に応じて他のパラメータや処理ロジックを調整することも可能です。