使用Apache Flink和Scala对传感器数据进行窗口聚合
我打算尝试使用Apache Flink这个流处理框架,继续学习Spark Streaming和Kafka Streams之后。由于我已经使用Python编写了Spark Streaming,使用Java编写了Kafka Streams,所以想尝试用Scala编写Apache Flink。
Apache Flink与Kafka一样,也是使用Scala编写的。它强调了Scala的独特的向后兼容性,进行着积极的开发。因此,可以在网络上找到的信息很快就会过时,API也容易被声明为Deprecated或PublicEvolving,对初学者来说有点难以入门的情况。虽然很难找到适合学习的好文章,但我参考了THE RISE OF BIG DATA STREAMING中的传感器数据窗口聚合的写法。
项目模板
在中国人中使用Apache Flink项目时,使用Scala和SBT创建一个Flink项目模板非常方便。让我们首先以模板中的WordCount作为示例来了解如何使用。请克隆该模板。
$ cd ~/scala_apps
$ git clone https://github.com/tillrohrmann/flink-project.git
这里有几个例子,但我们将使用WordCount.scala。
$ tree flink-project
flink-project/
├── build.sbt
├── idea.sbt
├── project
│ ├── assembly.sbt
│ └── build.properties
├── README
└── src
└── main
├── resources
│ └── log4j.properties
└── scala
└── org
└── example
├── Job.scala
├── SocketTextStreamWordCount.scala
└── WordCount.scala
如果要使用ENSIME,请参考这里创建一个.ensime文件,并在Emacs中使用M-x ensime命令。
$ cd ~/scala_apps/flink-project
$ sbt
> ensimeConfig
以下是WordCount.scala的代码。它将计算包含在示例文本中的英文单词数量。
package org.example
import org.apache.flink.api.scala._
object WordCount {
def main(args: Array[String]) {
val env = ExecutionEnvironment.getExecutionEnvironment
val text = env.fromElements("To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,")
val counts = text.flatMap { _.toLowerCase.split("\\W+") }
.map { (_, 1) }
.groupBy(0)
.sum(1)
counts.print()
}
}
在项目目录中运行sbt的run命令。由于有几个实现了main方法的类,所以输入WordCount 3。
$ cd ~/scala_apps/flink-project
$ sbt
> run
Multiple main classes detected, select one to run:
[1] org.example.Job
[2] org.example.SocketTextStreamWordCount
[3] org.example.WordCount
Enter number:3
运行后,将计算并输出文本中包含的英文单词数量。
(a,1)
(fortune,1)
(in,1)
(mind,1)
(or,2)
(question,1)
(slings,1)
(suffer,1)
(take,1)
(that,1)
(to,4)
文本数据是通过ExecutionEnvironment的fromElements方法创建的DataSource。
val env = ExecutionEnvironment.getExecutionEnvironment
val text = env.fromElements("To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,")
在Apache Flink的Scala中,尽管可以更简洁地编写代码,但有时候很难理解下划线、map和groupBy中出现的0和1代表的是什么。在Apache Flink的元组中,当通过field指定时,是从零开始索引的,所以顺序依次是0和1。
val counts = text.flatMap { _.toLowerCase.split("\\W+") }
.map { (_, 1) }
.groupBy(0)
.sum(1)
使用flatMap和正则表达式将文本数据分割成单词,并创建一个DataSet。使用map()函数创建一个由单词(_)和数字(1)组成的Tuple的DataSet。使用groupBy()函数,并指定字段0来按照单词进行分组,创建一个按单词分组的GroupedDataSet。最后,使用sum()函数的参数指定字段1,将单词和单词总数组成一个Tuple的AggregateDataSet。
窗口汇总
使用此模板项目,编写一个程序,使用60秒的滚动窗口聚合传感器数据,并计算周围温度(ambient)的平均值。由于使用Kafka作为数据源,所以参考此文档将来自Raspberry Pi 3的SensorTag数据发送到Kafka。
Raspberry Pi 3 -> Source (Kafka) -> ストリーム処理 -> Sink (Kafka)
克隆模板项目后,删除现有文件。
$ cd ~/scala_apps
$ git clone https://github.com/tillrohrmann/flink-project.git streams-flink-scala-examples
$ cd streams-flink-scala-examples
$ rm -fr src/main/scala/org/
创建Scala的包目录。
$ mkdir -p src/main/scala/com/github/masato/streams/flink
构建文件.sbt
Kafka 使用与我们相同的 landoop/fast-data-dev Docker 映像。 版本为 0.10.2.1。 我们将添加支持 Kafka 0.10 的软件包。
val flinkVersion = "1.3.2"
val flinkDependencies = Seq(
"org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
"org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion)
val otherDependencies = Seq(
"com.typesafe" % "config" % "1.3.1"
)
lazy val root = (project in file(".")).
settings(
libraryDependencies ++= flinkDependencies,
libraryDependencies ++= otherDependencies
)
mainClass in assembly := Some("com.github.masato.streams.flink.App")
App.scala 的中文释义是什么?
以下是实现了主要方法的程序的完整内容。Kafka连接信息等使用config进行配置,并在设置文件中定义。源代码也存储在代码库中。
package com.github.masato.streams.flink
import java.util.Properties
import java.time.ZoneId;
import java.time.format.DateTimeFormatter
import java.util.Date
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.functions.IngestionTimeExtractor
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010,FlinkKafkaProducer010}
import org.apache.flink.streaming.util.serialization.{JSONDeserializationSchema,SimpleStringSchema}
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.util.Collector
import com.fasterxml.jackson.databind.node.ObjectNode
import scala.util.parsing.json.JSONObject
import com.typesafe.config.ConfigFactory
case class Accumulator(time: Long, bid: String, var sum: Double, var count: Int)
class Aggregate extends AggregateFunction[(String, Double), Accumulator,Accumulator] {
override def createAccumulator(): Accumulator = {
return Accumulator(0L, "", 0.0, 0)
}
override def merge(a: Accumulator, b: Accumulator): Accumulator = {
a.sum += b.sum
a.count += b.count
return a
}
override def add(value: (String, Double), acc: Accumulator): Unit = {
acc.sum += value._2
acc.count += 1
}
override def getResult(acc: Accumulator): Accumulator = {
return acc
}
}
object App {
val fmt = DateTimeFormatter.ISO_OFFSET_DATE_TIME
val conf = ConfigFactory.load()
val bootstrapServers = conf.getString("app.bootstrap-servers")
val groupId = conf.getString("app.group-id")
val sourceTopic = conf.getString("app.source-topic")
val sinkTopic = conf.getString("app.sink-topic")
def main(args: Array[String]): Unit = {
val props = new Properties()
props.setProperty("bootstrap.servers", bootstrapServers)
props.setProperty("group.id", groupId)
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val source = new FlinkKafkaConsumer010[ObjectNode](
sourceTopic, new JSONDeserializationSchema(), props)
val events = env.addSource(source).name("events")
val timestamped = events.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[ObjectNode](Time.seconds(10)) {
override def extractTimestamp(element: ObjectNode): Long = element.get("time").asLong * 1000
})
timestamped
.map { v =>
val key = v.get("bid").asText
val ambient = v.get("ambient").asDouble
(key, ambient)
}
.keyBy(v => v._1)
.timeWindow(Time.seconds(60))
.aggregate(new Aggregate(),
( key: String,
window: TimeWindow,
input: Iterable[Accumulator],
out: Collector[Accumulator] ) => {
var in = input.iterator.next()
out.collect(Accumulator(window.getEnd, key, in.sum/in.count, in.count))
}
)
.map { v =>
val zdt = new Date(v.time).toInstant().atZone(ZoneId.systemDefault())
val time = fmt.format(zdt)
val json = Map("time" -> time, "bid" -> v.bid, "ambient" -> v.sum)
val retval = JSONObject(json).toString()
println(retval)
retval
}
.addSink(new FlinkKafkaProducer010[String](
bootstrapServers,
sinkTopic,
new SimpleStringSchema)
).name("kafka")
env.execute()
}
}
我们将按顺序查看main()函数的处理。首先进行与Kafka连接的设置。由于连接的Kafka版本为0.10,我们将使用FlinkKafkaConsumer010。来自树莓派3的传感器数据采用以下JSON格式。
{'bid': 'B0:B4:48:BE:5E:00', 'time': 1503527847, 'humidity': 26.55792236328125, 'objecttemp': 22.3125, 'ambient': 26.375, 'rh': 76.983642578125}
使用JSONDeserializationSchema进行反序列化。
val props = new Properties()
props.setProperty("bootstrap.servers", bootstrapServers)
props.setProperty("group.id", groupId)
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val source = new FlinkKafkaConsumer010[ObjectNode](
sourceTopic, new JSONDeserializationSchema(), props)
Apache Flink的时间模型被设置为事件时间(TimeCharacteristic.EventTime)。我们将传感器数据的time字段用作时间戳和水印。
val events = env.addSource(source).name("events")
val timestamped = events.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[ObjectNode](Time.seconds(10)) {
override def extractTimestamp(element: ObjectNode): Long = element.get("time").asLong * 1000
})
从传感器中可以获取到一些数据,但在这里我们只使用环境温度的值。使用map()函数以SensorTag的蓝牙地址作为键来创建一个新的Tuple。
timestamped
.map { v =>
val key = v.get("bid").asText
val ambient = v.get("ambient").asDouble
(key, ambient)
}
使用DataStream的keyBy()方法,以Tuple的索引1指定BD地址作为键创建了一个KeyedStream。
.keyBy(v => v._1)
使用timeWindow()方法创建了一个设定为60秒的滚动窗口的WindowedStream。
.timeWindow(Time.seconds(60))
在1.3版本中,apply()已被弃用。之前,可以这样写:
.apply(
(0L, "", 0.0, 0),
(acc: (Long, String, Double, Int),
v: (String, Double)) => { (0L, v._1, acc._3 + v._2, acc._4 + 1) },
( window: TimeWindow,
counts: Iterable[(Long, String, Double, Int)],
out: Collector[(Long, String, Double, Int)] ) => {
var count = counts.iterator.next()
out.collect((window.getEnd, count._2, count._3/count._4, count._4))
}
)
另外,由于fold()也被标记为弃用,我们推荐使用aggregate()进行尝试。aggregate()实现了AggregateFunction。可以像apply()的示例一样使用元组,但将其转换为case类会更易读一些。
.aggregate(new Aggregate(),
( key: String,
window: TimeWindow,
input: Iterable[Accumulator],
out: Collector[Accumulator] ) => {
var in = input.iterator.next()
out.collect(Accumulator(window.getEnd, key, in.sum/in.count, in.count))
}
)
为了方便与外部系统进行协作,数据流会被映射为带有时区的ISO-8601格式的JSON字符串,并使用UNIX时间进行map()操作。这里为了调试目的,JSON字符串被输出到标准输出。
.map { v =>
val zdt = new Date(v.time).toInstant().atZone(ZoneId.systemDefault())
val time = fmt.format(zdt)
val json = Map("time" -> time, "bid" -> v.bid, "ambient" -> v.sum)
val retval = JSONObject(json).toString()
println(retval)
retval
}
在最后,将数据流Sink到Kafka中。如果给Sink命名为name(”kafka”),则会在运行时的日志中显示。
.addSink(new FlinkKafkaProducer010[String](
bootstrapServers,
sinkTopic,
new SimpleStringSchema)
).name("kafka")
env.execute()
运行sbt
我将转到项目并执行sbt的run命令。
$ cd ~/scala_apps/streams-flink-scala-examples
$ sbt
> run
通过60秒的滚动窗口对周围环境温度进行了平均值的收集,并将结果输出到标准输出。
{"time" : "2017-08-24T08:10:00+09:00", "bid" : "B0:B4:48:BE:5E:00", "ambient" : 26.203125}
{"time" : "2017-08-24T08:11:00+09:00", "bid" : "B0:B4:48:BE:5E:00", "ambient" : 26.234375}
{"time" : "2017-08-24T08:12:00+09:00", "bid" : "B0:B4:48:BE:5E:00", "ambient" : 26.26875}