使用Apache Flink进行数据处理

首先

你好!我是LOB的@RyosukeKawamura。
這篇文章是LOB Advent Calendar 2018第19天的文章。

这是我的第二次发帖。
由于我目前正在工作中使用Apache Flink进行数据处理,所以这次我想写一些关于它的内容。

关于Flink,具体的实例在网上并不多,即使有,也只能找到一些“试一试”或者“Flink的◯◯API解释”之类的东西。所以到底该怎么办呢?这让我很难理解。但是正好我身边有一个非常懂的同事,我们在一起学习的过程中,我终于明白了“如果灵活运用的话,这确实非常方便!”所以为了整理自己的知识,我想把它总结起来。

中文:Apache Flink 是什么

这个家的文档非常充实。如果仔细阅读,大致就能理解(应该)。总的来说,它是一个”分布式流处理平台”。虽然有很多类似的东西(比如Storm和Spark等),现在已经变得很难区分了,但听说Flink从流处理进化到渗透到批处理等其他领域。

Flink的优点

我认为以下三点大致能触及到其真髓,虽然还未完全理解。

流式处理和批处理都得到支持,并且可以使用类似的处理方式来实现。

在处理实时流数据、文件协作和读取表格时,能够以大致相同的方式进行实现是非常令人高兴的。我们目前正在进行的是处理与tsv文件协作的批处理,但这可能会发生变化,例如通过Kafka等方式发送日志而不是文件。在推进项目时,我认为会经常发生较大的结构变更,这是一个非常令人高兴的点。然而,由于有一些只能在流处理中使用的API,所以我们将当前处理有限的文件协作数据作为流处理进行实现(详细信息请参见下文的“相关问题”)。

在数据汇总等方面,API非常丰富且能够满足大部分需求。

这个真的让我很吃惊。就像处理数组一样,只需顺利地连接方法,就能实现聚合等功能。熟悉函数式语言的人会更加容易上手。(我没经验。。。)
这次是用Java来实现的,不过Scala也可以使用Flink,听说那边的效果更好。
样例代码是这样的。

当障碍发生时,能够智能地恢复并从此处重新开始处理的实施方法是可能的。

不好意思,這部分目前還未實現,所以我並不了解細節。
根據實現的情況,似乎可以設置一個類似檢查點的機制,可以從那裡重新開始,達到更好的效果。
這樣似乎更容易保證冪等性,這真是令人高興啊。
Flink集群預設提供的控制台也非常方便,可以用來查看作業進度,這對我們非常有幫助。

image.png

卡在那里

将CSV等有限的数据作为流处理。

我們需要處理以有限文件為輸入的處理流程。由於需要滿足”想要使用流式處理的API”和”可能會轉變成流式處理”的要求,我們必須做出相應的調整。
我們的解決方法是:讀取文件->將其作為表格載入一次->從表格中載入數據->將結果作為數據流進行操作。
這可能有點棘手,但只要好好搜索官方文檔,你就會發現其實它早已有所提及,所以還是要仔細閱讀一下…實現的話就會是這種感覺。

// ストリーム処理・テーブルの処理に必要な環境を読み込む
StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment(); 
StreamTableEnvironment tEnv = StreamTableEnvironment.getTableEnvironment(sEnv); 

// ファイル読み込み
TableSource source =  new CsvTableSource.Builder() 
    .path("/path/to/user_data.tsv")
    .ignoreFirstLine() // ヘッダー読み飛ばしがこのメソッド書くだけでOKなの地味に嬉しい 
    .ignoreParseErrors() // 不正なデータはこれで無視できる
    .fieldDelimiter("\t") // タブ区切り
    .field("cookie", Types.STRING) 
    .field("date", Types.TIMESTAMP)
    .build(); 

// テーブルのデータソースとして登録
tEnv.registerTableSource("users", source);

// テーブルとしてデータを取得
Table table = tEnv
    .scan("users"); // ここに.filter()とか.select()とか繋ぐとSQLチックな処理もできる

// ストリーミングに変換
DataStream<UserEntity> ds = tEnv.toAppendStream(table, Row.class) // Entityを定義しておけばその型のストリームとして読み込める

ds.flatMap(new hogeFlatMap()).setParallelism(parallelNum).addSink(...); // ここは省略してます(FlatMapしてaddSinkにわたすみたいなのがよくある実装の形らしい)
sEnv.execute(); // ここ忘れるとなにも動かない

初次见面的时候很难想象它到底如何运作。

这就是了。哈哈

ds.flatMap(new hogeFlatMap()).setParallelism(parallelNum).addSink(...);

根据我的理解,这段话的意思是:在并行处理的情况下,通过流式处理来将hogeFlatMap()函数应用在parallelNum个元素上,并将hogeFlatMap()内部的输出传递给addSink函数。这种处理方式就像在命令行中使用管道连接多个操作一样,十分方便。刚开始时我完全不理解这个意思。由于flatMap函数会在每次调用时都被执行,所以在其中创建连接会导致无法进行套接字连接而导致程序崩溃(因此需要将其传递给构造函数),总之,最初的困惑点是很难理解每个操作是在何时执行的,即使使用单步调试也不容易观察到。

最后

只要掌握了窍门(尽管我还没有掌握),Flink可能成为数据管道中强大的助手,虽然理解和掌握它的形象有点困难。由于有机会接触广告科技的大数据,我希望能够更多地学习并掌握它的使用。

如果你对我们公司能够运用不仅限于Flink的新数据处理技术抱有兴趣并且希望一同合作,我们公司提供了许多这样的机会。特别是由于我们可以处理其他公司无法应对的大量和高质量的数据,如果你对此感兴趣并希望合作,请一定与我们携手合作!

我们正在热烈招募志同道合的伙伴,为了创造能够改变流通形态的广告平台!我们每天都在开展战略会议,致力于建立高效的数据基础设施!请点击链接 https://lob-inc.com/recruit/。

bannerAds