Flink基础【代码概览】

首先

这篇文章是为Flink初学者准备的。我自己也正在学习Flink。

Flink 是什么?

这是一个为并行分散流处理而开发的开源软件。可以说它是为Java8虚拟机(JVM)设计的Java API。
Flink的编写简单易懂,可以通过API轻松进行故障恢复和分布式处理的部署配置,非常方便。

Flink的运行原理和源代码的关系

Flink1.png

在考虑了这些因素之后,Flink的源代码可以概括如下。

//決まり文句。初期設定。envを使って引数や並列数や時間の定義などを行えます。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//Source
DataStream<String> recordinput = env.readTextFile("/home/input.txt");

//Operator
DataStream<Tuple<String, Integer>> processedstream = recordinput.map(new MapFunction(){~}).keyBy(0).max(1).filter(new FilterFunction(){~});

//Sink
processedstream.writesaText("home/Output.txt");

//決まり文句。Flinkは最後にこの処理を書かないと実行されないようになっている。
env.execute();

如您所见,Flink的核心是Operator部分。我们会调用各种方法对它进行处理。
在上面的例子中,我们使用了诸如map、keyBy、max等方法。首先,已加载的记录input会将txt文件的一行作为字符串进行读取,因此我们使用map将其转换为Tuple的形式。然后,通过keyBy将数据流按id进行分组(类似于将河流分成几段)。然后,通过max计算每个流中value的最大值,并最后通过filter生成仅包含id为A的流。

尽管上述代码仅仅是个示意,实际上并没有正确的代码,但我认为您已经理解代码的执行流程了。

实际的代码

最后,我们将提供一个实际运行的示例代码。

package main.java.file;


import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;

public class Streaming {
    public static void main(String[] args) throws Exception {

         //決まり文句。
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         //時間の定義をprocessingtime(CPU側の時間)に設定
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);


         //Source。ソケット通信でlocalhostのポート9090からデータを受け取る。
        DataStream<String> stream1 = env.socketTextStream("localhost",9090);


        //一つ目のoperator。上の例ではoperatorを一つで書いているが、
        //別に複数個に分けて書くこともできる。メソッドの引数には
        //決められた形のクラスを与えることが多い。このmapでは
        //引数内でそのクラスの定義をしている。ローカルクラスと呼ばれる仕組み。
        //mapに渡すクラスはMapFunctionインターフェースを実装する。
        DataStream<Tuple2<String, Integer>> stream2 = stream1.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                String[] words = value.split(",");
                return new Tuple2<>(words[0], Integer.parseInt(words[3]));
            }
        });

        //二つ目のoperator。keyByの引数では入力データの
        //どの位置を基準にグループ分けするかを示す。
        //その後windowメソッドで直近1msに到着したレコードごとに区切り、
        //aggregateメソッドで区切ったwindowごとに引数の
        //Countクラスに記述された演算を行わせる。
        //今回はローカルクラスでなく一番下でクラスを定義している。
        //どちらの方法でもよい。
        DataStream<Tuple2<String,Integer>> stream3 = stream2
                .keyBy(0)
                .window(TumblingProcessingTimeWindows.of(Time.milliseconds(1)))
                .aggregate(new Count());

        //Sink
        stream3.writeAsText("/mnt/c/work/sample1.txt");

        //決まり文句。
        env.execute("sample_temp");

    }



    //上のaggregateメソッドの引数用に定義。aggregateに
    //渡すクラスはAggregateFunctionインターフェースを実装する。
    public static class Count implements AggregateFunction<Tuple2<String,Integer>, Tuple3<String,Integer,Integer>, Tuple2<String,Integer>>{

        @Override
        public Tuple3<String, Integer, Integer> createAccumulator() {
            return new Tuple3<>("",0,0);
        }

        @Override
        public Tuple3<String, Integer, Integer> add(Tuple2<String, Integer> input, Tuple3<String, Integer, Integer> acc) {
            acc.f0=input.f0;
            acc.f1+=input.f1;
            acc.f2+=1;

            return acc;
        }

        @Override
        public Tuple2<String, Integer> getResult(Tuple3<String, Integer, Integer> accumulator) {
            return new Tuple2<>(accumulator.f0, accumulator.f2);
        }

        @Override
        public Tuple3<String, Integer, Integer> merge(Tuple3<String, Integer, Integer> acc1, Tuple3<String, Integer, Integer> acc2) {
            acc1.f1+=acc2.f1;
            acc1.f2+=acc2.f2;

            return acc1;
        }

    }






}

bannerAds