使用Apache Flink和Scala对传感器数据进行窗口聚合

我打算尝试使用Apache Flink这个流处理框架,继续学习Spark Streaming和Kafka Streams之后。由于我已经使用Python编写了Spark Streaming,使用Java编写了Kafka Streams,所以想尝试用Scala编写Apache Flink。

Apache Flink与Kafka一样,也是使用Scala编写的。它强调了Scala的独特的向后兼容性,进行着积极的开发。因此,可以在网络上找到的信息很快就会过时,API也容易被声明为Deprecated或PublicEvolving,对初学者来说有点难以入门的情况。虽然很难找到适合学习的好文章,但我参考了THE RISE OF BIG DATA STREAMING中的传感器数据窗口聚合的写法。

项目模板

在中国人中使用Apache Flink项目时,使用Scala和SBT创建一个Flink项目模板非常方便。让我们首先以模板中的WordCount作为示例来了解如何使用。请克隆该模板。

$ cd ~/scala_apps
$ git clone https://github.com/tillrohrmann/flink-project.git

这里有几个例子,但我们将使用WordCount.scala。

$ tree flink-project
flink-project/
├── build.sbt
├── idea.sbt
├── project
│   ├── assembly.sbt
│   └── build.properties
├── README
└── src
    └── main
        ├── resources
        │   └── log4j.properties
        └── scala
            └── org
                └── example
                    ├── Job.scala
                    ├── SocketTextStreamWordCount.scala
                    └── WordCount.scala

如果要使用ENSIME,请参考这里创建一个.ensime文件,并在Emacs中使用M-x ensime命令。

$ cd ~/scala_apps/flink-project
$ sbt
> ensimeConfig

以下是WordCount.scala的代码。它将计算包含在示例文本中的英文单词数量。

package org.example
import org.apache.flink.api.scala._

object WordCount {
  def main(args: Array[String]) {

    val env = ExecutionEnvironment.getExecutionEnvironment

    val text = env.fromElements("To be, or not to be,--that is the question:--",
      "Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune",
      "Or to take arms against a sea of troubles,")

    val counts = text.flatMap { _.toLowerCase.split("\\W+") }
      .map { (_, 1) }
      .groupBy(0)
      .sum(1)

    counts.print()

  }
}

在项目目录中运行sbt的run命令。由于有几个实现了main方法的类,所以输入WordCount 3。

$ cd ~/scala_apps/flink-project
$ sbt
> run
Multiple main classes detected, select one to run:

 [1] org.example.Job
 [2] org.example.SocketTextStreamWordCount
 [3] org.example.WordCount

Enter number:3

运行后,将计算并输出文本中包含的英文单词数量。

 (a,1)
 (fortune,1)
 (in,1)
 (mind,1)
 (or,2)
 (question,1)
 (slings,1)
 (suffer,1)
 (take,1)
 (that,1)
 (to,4)

文本数据是通过ExecutionEnvironment的fromElements方法创建的DataSource。

    val env = ExecutionEnvironment.getExecutionEnvironment

    val text = env.fromElements("To be, or not to be,--that is the question:--",
      "Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune",
      "Or to take arms against a sea of troubles,")

在Apache Flink的Scala中,尽管可以更简洁地编写代码,但有时候很难理解下划线、map和groupBy中出现的0和1代表的是什么。在Apache Flink的元组中,当通过field指定时,是从零开始索引的,所以顺序依次是0和1。

    val counts = text.flatMap { _.toLowerCase.split("\\W+") }
      .map { (_, 1) }
      .groupBy(0)
      .sum(1)

使用flatMap和正则表达式将文本数据分割成单词,并创建一个DataSet。使用map()函数创建一个由单词(_)和数字(1)组成的Tuple的DataSet。使用groupBy()函数,并指定字段0来按照单词进行分组,创建一个按单词分组的GroupedDataSet。最后,使用sum()函数的参数指定字段1,将单词和单词总数组成一个Tuple的AggregateDataSet。

窗口汇总

使用此模板项目,编写一个程序,使用60秒的滚动窗口聚合传感器数据,并计算周围温度(ambient)的平均值。由于使用Kafka作为数据源,所以参考此文档将来自Raspberry Pi 3的SensorTag数据发送到Kafka。

Raspberry Pi 3 -> Source (Kafka) -> ストリーム処理 -> Sink (Kafka)

克隆模板项目后,删除现有文件。

$ cd ~/scala_apps
$ git clone https://github.com/tillrohrmann/flink-project.git streams-flink-scala-examples
$ cd streams-flink-scala-examples
$ rm -fr src/main/scala/org/

创建Scala的包目录。

$ mkdir -p src/main/scala/com/github/masato/streams/flink

构建文件.sbt

Kafka 使用与我们相同的 landoop/fast-data-dev Docker 映像。 版本为 0.10.2.1。 我们将添加支持 Kafka 0.10 的软件包。

val flinkVersion = "1.3.2"

val flinkDependencies = Seq(
  "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion)

val otherDependencies = Seq(
   "com.typesafe" % "config" % "1.3.1"
)

lazy val root = (project in file(".")).
  settings(
    libraryDependencies ++= flinkDependencies,
    libraryDependencies ++= otherDependencies
  )

mainClass in assembly := Some("com.github.masato.streams.flink.App")

App.scala 的中文释义是什么?

以下是实现了主要方法的程序的完整内容。Kafka连接信息等使用config进行配置,并在设置文件中定义。源代码也存储在代码库中。

package com.github.masato.streams.flink

import java.util.Properties
import java.time.ZoneId;
import java.time.format.DateTimeFormatter
import java.util.Date

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.functions.IngestionTimeExtractor
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010,FlinkKafkaProducer010}
import org.apache.flink.streaming.util.serialization.{JSONDeserializationSchema,SimpleStringSchema}
import org.apache.flink.api.common.functions.AggregateFunction

import org.apache.flink.util.Collector
import com.fasterxml.jackson.databind.node.ObjectNode
import scala.util.parsing.json.JSONObject
import com.typesafe.config.ConfigFactory

case class Accumulator(time: Long, bid: String, var sum: Double, var count: Int)

class Aggregate extends AggregateFunction[(String, Double), Accumulator,Accumulator] {

  override def createAccumulator(): Accumulator = {
    return Accumulator(0L, "", 0.0, 0)
  }

  override def merge(a: Accumulator, b: Accumulator): Accumulator = {
    a.sum += b.sum
    a.count += b.count
    return a
  }

  override def add(value: (String, Double), acc: Accumulator): Unit = {
    acc.sum += value._2
    acc.count += 1
  }

  override def getResult(acc: Accumulator): Accumulator = {
    return acc
  }
}

object App {
  val fmt = DateTimeFormatter.ISO_OFFSET_DATE_TIME

  val conf = ConfigFactory.load()
  val bootstrapServers = conf.getString("app.bootstrap-servers")
  val groupId = conf.getString("app.group-id")
  val sourceTopic = conf.getString("app.source-topic")
  val sinkTopic = conf.getString("app.sink-topic")

  def main(args: Array[String]): Unit = {
    val props = new Properties()
    props.setProperty("bootstrap.servers", bootstrapServers)
    props.setProperty("group.id", groupId)

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val source = new FlinkKafkaConsumer010[ObjectNode](
      sourceTopic, new JSONDeserializationSchema(), props)

    val events = env.addSource(source).name("events")

    val timestamped = events.assignTimestampsAndWatermarks(
      new BoundedOutOfOrdernessTimestampExtractor[ObjectNode](Time.seconds(10)) {
        override def extractTimestamp(element: ObjectNode): Long = element.get("time").asLong * 1000
      })

    timestamped
      .map { v =>
        val key =  v.get("bid").asText
        val ambient = v.get("ambient").asDouble
        (key, ambient)
      }
      .keyBy(v => v._1)
      .timeWindow(Time.seconds(60))
      .aggregate(new Aggregate(),
        ( key: String,
          window: TimeWindow,
          input: Iterable[Accumulator],
          out: Collector[Accumulator] ) => {
            var in = input.iterator.next()
            out.collect(Accumulator(window.getEnd, key, in.sum/in.count, in.count))
          }
      )
      .map { v =>
        val zdt = new Date(v.time).toInstant().atZone(ZoneId.systemDefault())
        val time = fmt.format(zdt)
        val json = Map("time" -> time, "bid" -> v.bid, "ambient" -> v.sum)
        val retval = JSONObject(json).toString()
        println(retval)
        retval
      }
      .addSink(new FlinkKafkaProducer010[String](
        bootstrapServers,
        sinkTopic,
        new SimpleStringSchema)
      ).name("kafka")
    env.execute()
  }
}

我们将按顺序查看main()函数的处理。首先进行与Kafka连接的设置。由于连接的Kafka版本为0.10,我们将使用FlinkKafkaConsumer010。来自树莓派3的传感器数据采用以下JSON格式。

{'bid': 'B0:B4:48:BE:5E:00', 'time': 1503527847, 'humidity': 26.55792236328125, 'objecttemp': 22.3125, 'ambient': 26.375, 'rh': 76.983642578125}

使用JSONDeserializationSchema进行反序列化。

    val props = new Properties()
    props.setProperty("bootstrap.servers", bootstrapServers)
    props.setProperty("group.id", groupId)

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val source = new FlinkKafkaConsumer010[ObjectNode](
      sourceTopic, new JSONDeserializationSchema(), props)

Apache Flink的时间模型被设置为事件时间(TimeCharacteristic.EventTime)。我们将传感器数据的time字段用作时间戳和水印。

    val events = env.addSource(source).name("events")

    val timestamped = events.assignTimestampsAndWatermarks(
      new BoundedOutOfOrdernessTimestampExtractor[ObjectNode](Time.seconds(10)) {
        override def extractTimestamp(element: ObjectNode): Long = element.get("time").asLong * 1000
      })

从传感器中可以获取到一些数据,但在这里我们只使用环境温度的值。使用map()函数以SensorTag的蓝牙地址作为键来创建一个新的Tuple。

    timestamped
      .map { v =>
        val key =  v.get("bid").asText
        val ambient = v.get("ambient").asDouble
        (key, ambient)
      }

使用DataStream的keyBy()方法,以Tuple的索引1指定BD地址作为键创建了一个KeyedStream。

      .keyBy(v => v._1)

使用timeWindow()方法创建了一个设定为60秒的滚动窗口的WindowedStream。

      .timeWindow(Time.seconds(60))

在1.3版本中,apply()已被弃用。之前,可以这样写:

      .apply(
        (0L, "", 0.0, 0),
        (acc: (Long, String, Double, Int),
         v: (String, Double)) => { (0L, v._1, acc._3 + v._2, acc._4 + 1) },
        ( window: TimeWindow,
          counts: Iterable[(Long, String, Double, Int)],
          out: Collector[(Long, String, Double, Int)] ) => {
            var count = counts.iterator.next()
            out.collect((window.getEnd, count._2, count._3/count._4, count._4))
          }
      )

另外,由于fold()也被标记为弃用,我们推荐使用aggregate()进行尝试。aggregate()实现了AggregateFunction。可以像apply()的示例一样使用元组,但将其转换为case类会更易读一些。

      .aggregate(new Aggregate(),
        ( key: String,
          window: TimeWindow,
          input: Iterable[Accumulator],
          out: Collector[Accumulator] ) => {
            var in = input.iterator.next()
            out.collect(Accumulator(window.getEnd, key, in.sum/in.count, in.count))
          }
      )

为了方便与外部系统进行协作,数据流会被映射为带有时区的ISO-8601格式的JSON字符串,并使用UNIX时间进行map()操作。这里为了调试目的,JSON字符串被输出到标准输出。

      .map { v =>
        val zdt = new Date(v.time).toInstant().atZone(ZoneId.systemDefault())
        val time = fmt.format(zdt)
        val json = Map("time" -> time, "bid" -> v.bid, "ambient" -> v.sum)
        val retval = JSONObject(json).toString()
        println(retval)
        retval
      }

在最后,将数据流Sink到Kafka中。如果给Sink命名为name(”kafka”),则会在运行时的日志中显示。

      .addSink(new FlinkKafkaProducer010[String](
        bootstrapServers,
        sinkTopic,
        new SimpleStringSchema)
      ).name("kafka")
    env.execute()

运行sbt

我将转到项目并执行sbt的run命令。

$ cd ~/scala_apps/streams-flink-scala-examples
$ sbt
> run

通过60秒的滚动窗口对周围环境温度进行了平均值的收集,并将结果输出到标准输出。

{"time" : "2017-08-24T08:10:00+09:00", "bid" : "B0:B4:48:BE:5E:00", "ambient" : 26.203125}
{"time" : "2017-08-24T08:11:00+09:00", "bid" : "B0:B4:48:BE:5E:00", "ambient" : 26.234375}
{"time" : "2017-08-24T08:12:00+09:00", "bid" : "B0:B4:48:BE:5E:00", "ambient" : 26.26875}
bannerAds