Kafkaのデータを重複排除するには、flinkをどう使えばいいですか?

Flink で Kafka のデータの重複を除去する方法は次の通りです:

  1. キーバイ
  2. 削減する
  3. 集計する
  4. 畳む (たたむ)
DataStream<MyData> stream = env.addSource(new FlinkKafkaConsumer<>(...));

DataStream<MyData> deduplicatedStream = stream
    .keyBy(data -> data.getId())  // 按照 id 字段进行分组
    .reduce((data1, data2) -> data1);  // 使用 reduce 操作符将相同 id 的数据去重
  1. キーバイ
  2. プロセス関数
  3. 豊かなFlatMapFunction
DataStream<MyData> stream = env.addSource(new FlinkKafkaConsumer<>(...));

DataStream<MyData> deduplicatedStream = stream
    .keyBy(data -> data.getUniqueId())  // 按照唯一标识符进行分组
    .process(new DeduplicateFunction());  // 自定义 ProcessFunction 实现去重逻辑

public static class DeduplicateFunction extends ProcessFunction<MyData, MyData> {
    private ValueState<Boolean> seen;

    @Override
    public void open(Configuration parameters) throws Exception {
        seen = getRuntimeContext().getState(new ValueStateDescriptor<>("seen", Boolean.class));
    }

    @Override
    public void processElement(MyData data, Context ctx, Collector<MyData> out) throws Exception {
        if (seen.value() == null) {
            seen.update(true);
            out.collect(data);
        }
    }
}

一つの選択肢:
注意すべきポイントは、上記の方法は隣接するデータにのみ適用されるということです。データ量が大きい場合やデータの分布が偏っている場合は、性能の問題が発生する可能性があります。Kafka全体のデータを重複排除する必要がある場合は、Flinkの状態バックエンドであるRocksDBを使用することを検討すると良いでしょう。その際には、処理済みデータの識別子を状態に保存し、定期的に状態から期限切れのデータを削除するようにしてください。

bannerAds