KafkaデータをRedisに書き込むには

KafkaのデータをRedisに書き込む場合は、以下の手順に従ってください。

  1. Kafkaコンシューマーを作成してKafkaトピックのデータを読み取ります。
  2. Redisとやり取りするためのRedisクライアントを作成します。
  3. Kafka のメッセージを解析して対応するデータを Redis に書き込む

以下のサンプルコードでは、KafkaのデータをRedisに書き込む方法を示しています。

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import redis.clients.jedis.Jedis;

import java.util.Collections;
import java.util.Properties;

public class KafkaToRedisExample {
    private static final String KAFKA_BOOTSTRAP_SERVERS = "localhost:9092";
    private static final String KAFKA_TOPIC = "your-kafka-topic";
    private static final String REDIS_HOST = "localhost";
    private static final int REDIS_PORT = 6379;

    public static void main(String[] args) {
        // 创建Kafka消费者配置
        Properties kafkaProps = new Properties();
        kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BOOTSTRAP_SERVERS);
        kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "kafka-to-redis-example-group");
        kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        // 创建Redis客户端
        Jedis jedis = new Jedis(REDIS_HOST, REDIS_PORT);

        // 创建Kafka消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps);
        consumer.subscribe(Collections.singletonList(KAFKA_TOPIC));

        // 持续从Kafka消费消息并写入Redis
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                // 解析Kafka消息
                String key = record.key();
                String value = record.value();

                // 写入Redis
                jedis.set(key, value);
                System.out.println("Wrote to Redis: " + key + " - " + value);
            }
            // 提交消费位移
            consumer.commitAsync();
        }
    }
}

これらの設定を変更してください: KAFKA_BOOTSTRAP_SERVERS、KAFKA_TOPIC、REDIS_HOST、REDIS_PORT。このサンプルコードは、JavaクライアントとJedisライブラリを使ってKafkaとRedisを接続します。

bannerAds