flinkは、どのようにしてRedisからデータを読み込むのですか。
FlinkはRedisと接続してデータを読み取ることができます。以下はFlinkがRedisからデータを読み取る一般的な手順です。
- Flinkプロジェクトのpom.xmlファイルにRedisに関連する依存関係を追加します。例えば、
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
- Flinkの実行環境を作成する。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- Redisの接続設定を作成してください。
FlinkJedisPoolConfig jedisConfig = new FlinkJedisPoolConfig.Builder()
.setHost("localhost")
.setPort(6379)
.build();
- ソースを追加する()
DataStream<String> dataStream = env.addSource(new RedisSource<>(jedisConfig, new MyRedisMapper()));
MyRedisMapperは、RedisMapperインターフェースを実装したカスタムクラスであり、Redis内のデータ形式を指定し、データをFlinkデータストリームにマッピングするために使用されます。
- Redisマッパー
public class MyRedisMapper implements RedisMapper<String> {
@Override
public RedisCommandDescription getCommandDescription() {
// 指定Redis命令,例如GET key
return new RedisCommandDescription(RedisCommand.GET);
}
@Override
public String getKeyFromData(String data) {
// 从Redis中获取的数据中提取用于分区的键
return data;
}
@Override
public String getValueFromData(String data) {
// 从Redis中获取的数据中提取值
return data;
}
}
- プリント()
dataStream.print();
- 実行する (じっこうする)
env.execute("Read from Redis");
したがって、FlinkはRedisからデータを読み取り、処理することができます。実際の状況に応じて適切な調整と拡張を行ってください。