Flink基础【代码概览】
首先
这篇文章是为Flink初学者准备的。我自己也正在学习Flink。
Flink 是什么?
这是一个为并行分散流处理而开发的开源软件。可以说它是为Java8虚拟机(JVM)设计的Java API。
Flink的编写简单易懂,可以通过API轻松进行故障恢复和分布式处理的部署配置,非常方便。
Flink的运行原理和源代码的关系

在考虑了这些因素之后,Flink的源代码可以概括如下。
//決まり文句。初期設定。envを使って引数や並列数や時間の定義などを行えます。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//Source
DataStream<String> recordinput = env.readTextFile("/home/input.txt");
//Operator
DataStream<Tuple<String, Integer>> processedstream = recordinput.map(new MapFunction(){~}).keyBy(0).max(1).filter(new FilterFunction(){~});
//Sink
processedstream.writesaText("home/Output.txt");
//決まり文句。Flinkは最後にこの処理を書かないと実行されないようになっている。
env.execute();
如您所见,Flink的核心是Operator部分。我们会调用各种方法对它进行处理。
在上面的例子中,我们使用了诸如map、keyBy、max等方法。首先,已加载的记录input会将txt文件的一行作为字符串进行读取,因此我们使用map将其转换为Tuple的形式。然后,通过keyBy将数据流按id进行分组(类似于将河流分成几段)。然后,通过max计算每个流中value的最大值,并最后通过filter生成仅包含id为A的流。
尽管上述代码仅仅是个示意,实际上并没有正确的代码,但我认为您已经理解代码的执行流程了。
实际的代码
最后,我们将提供一个实际运行的示例代码。
package main.java.file;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;
public class Streaming {
public static void main(String[] args) throws Exception {
//決まり文句。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//時間の定義をprocessingtime(CPU側の時間)に設定
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
//Source。ソケット通信でlocalhostのポート9090からデータを受け取る。
DataStream<String> stream1 = env.socketTextStream("localhost",9090);
//一つ目のoperator。上の例ではoperatorを一つで書いているが、
//別に複数個に分けて書くこともできる。メソッドの引数には
//決められた形のクラスを与えることが多い。このmapでは
//引数内でそのクラスの定義をしている。ローカルクラスと呼ばれる仕組み。
//mapに渡すクラスはMapFunctionインターフェースを実装する。
DataStream<Tuple2<String, Integer>> stream2 = stream1.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
String[] words = value.split(",");
return new Tuple2<>(words[0], Integer.parseInt(words[3]));
}
});
//二つ目のoperator。keyByの引数では入力データの
//どの位置を基準にグループ分けするかを示す。
//その後windowメソッドで直近1msに到着したレコードごとに区切り、
//aggregateメソッドで区切ったwindowごとに引数の
//Countクラスに記述された演算を行わせる。
//今回はローカルクラスでなく一番下でクラスを定義している。
//どちらの方法でもよい。
DataStream<Tuple2<String,Integer>> stream3 = stream2
.keyBy(0)
.window(TumblingProcessingTimeWindows.of(Time.milliseconds(1)))
.aggregate(new Count());
//Sink
stream3.writeAsText("/mnt/c/work/sample1.txt");
//決まり文句。
env.execute("sample_temp");
}
//上のaggregateメソッドの引数用に定義。aggregateに
//渡すクラスはAggregateFunctionインターフェースを実装する。
public static class Count implements AggregateFunction<Tuple2<String,Integer>, Tuple3<String,Integer,Integer>, Tuple2<String,Integer>>{
@Override
public Tuple3<String, Integer, Integer> createAccumulator() {
return new Tuple3<>("",0,0);
}
@Override
public Tuple3<String, Integer, Integer> add(Tuple2<String, Integer> input, Tuple3<String, Integer, Integer> acc) {
acc.f0=input.f0;
acc.f1+=input.f1;
acc.f2+=1;
return acc;
}
@Override
public Tuple2<String, Integer> getResult(Tuple3<String, Integer, Integer> accumulator) {
return new Tuple2<>(accumulator.f0, accumulator.f2);
}
@Override
public Tuple3<String, Integer, Integer> merge(Tuple3<String, Integer, Integer> acc1, Tuple3<String, Integer, Integer> acc2) {
acc1.f1+=acc2.f1;
acc1.f2+=acc2.f2;
return acc1;
}
}
}