Python与SensorTag、Kafka、Spark Streaming的流处理- 第6部分: 在Jupyter中使用PySpark Streaming进行窗口聚合
终于准备好执行标题代码了。将SensorTag的数据发送到Kafka,并使用PySpark Streaming进行窗口聚合。我们将使用Jupyter作为Python的交互式执行环境。
預備
我们将使用之前准备好的Python脚本和Kafka、Spark集群。
Part 2で書いたRaspberry Pi 3からSensorTagのデータをJSONフォーマットでKafkaに送信するスクリプトとConfluent Open Sourceクラスタ
Part 5で構築したSpark Standalone ClusterとJupyter
笔记本
我们将在Jupyter Notebook中以交互式方式运行和确认代码。以下段落分别对应一个单元格。在Web浏览器中打开Jupyter,然后选择右上方的New按钮,然后选择Python 3。
- http://<仮想マシンのパブリックIPアドレス>:8888
请将以下内容以中文翻译,并且只需提供一个选项:
PYSPARK_SUBMIT_ARG
根据Spark Streaming + Kafka Integration Guide,连接从Spark Streaming到Kafka所需的Jar文件有两个,Scala的Jar文件非常严格。这两个文件分别是spark-streaming-kafka-0-8(适用于Kafka 0.8.2.1及以上版本)和spark-streaming-kafka-0-10(适用于Kafka 0.10及以上版本)。虽然我们所使用的Kafka版本是0.10.2.1,但我们将使用支持Python的spark-streaming-kafka-0-8。
每个版本的名称应如下所示:spark-streaming-kafka-<Kafka版本>_<Scala版本>:<Spark版本>。
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.1 pyspark-shell'
魔法指令
使用Jupyter的魔术命令安装kafka-python包。
!pip install kafka-python
导入
导入Spark Streaming和Kafka所需的包。
import json
import pytz
from datetime import datetime
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import *
from pyspark.sql.types import *
from kafka import KafkaProducer
Spark的上下文
从Spark 2.x开始,Spark的上下文变得更加复杂,但我们可以使用SparkSession.builder作为入口点。我们可以在1分钟的批处理间隔内创建StreamingContext。
spark = (
SparkSession
.builder
.getOrCreate()
)
sc = spark.sparkContext
ssc = StreamingContext(sc, 60)
Kafka 生产者
定义Kafka代理服务器列表、输入和输出的主题名称。为了将窗口聚合结果输出到Kafka主题,还需要创建生产者。
brokers = "<Kafka BrokerのIPアドレス>:9092"
sourceTopic = "sensortag"
sinkTopic = "sensortag-sink"
producer = KafkaProducer(bootstrap_servers=brokers,
value_serializer=lambda v: json.dumps(v).encode('utf-8'))
窗口总计 jì)
这是处理本次脚本主要操作的函数。它将应用StructType模式到转换Kafka的JSON为RDD的DataFrame,并创建DataFrame。使用DataFrame的窗口聚合函数,在2分钟窗口内计算周围温度(ambient)和湿度(rh)的平均值。
为了使窗口聚合的结果更易于理解,DataFrame在处理过程中删除了时区,并添加了 Asia/Tokyo 时区。
def windowAverage(rdd):
schema = StructType([
StructField('ambient', DoubleType(), True),
StructField('bid', StringType(), True),
StructField('humidity', DoubleType(), True),
StructField('objecttemp', DoubleType(), True),
StructField('rh', DoubleType(), True),
StructField('time', TimestampType(), True),
])
streamingInputDF = spark.createDataFrame(
rdd, schema=schema
)
print('1分バッチのDataFrame')
streamingInputDF.show(truncate=False)
averageDF = (
streamingInputDF
.groupBy(
streamingInputDF.bid,
window("time", "2 minute"))
.avg("ambient","rh")
)
sinkRDD = averageDF.rdd.map(lambda x: {'bid': x[0],
'time': pytz.utc.localize(x[1]['end']).astimezone(pytz.timezone('Asia/Tokyo')).isoformat(),
'ambient': x[2],
'rh': x[3]})
if not sinkRDD.isEmpty():
print('2分ウィンドウの平均値')
sinkList = sinkRDD.collect()
print(sinkList)
for sink in sinkList:
producer.send(sinkTopic, sink)
创建Kafka流中的数据流。
指定Kafka Broker的IP地址和从Raspberry Pi 3发送SensorTag的JSON字符串的主题。将JSON字符串逐行反序列化并创建pyspark.sql.Row。将时间字段从UNIX时间戳转换为Python的datetime,并删除时区。
kafkaStream = KafkaUtils.createDirectStream(
ssc, [sourceTopic], {"metadata.broker.list":brokers})
rowStream = (
kafkaStream
.map(lambda line: json.loads(line[1]))
.map(lambda x: Row(
ambient=x['ambient'],
bid=x['bid'],
humidity=x['humidity'],
objecttemp=x['objecttemp'],
rh=x['rh'],
time=datetime.fromtimestamp(x['time']).replace(tzinfo=None),
)
)
)
rowStream.foreachRDD(windowAverage)
开始StreamingContext
最後,启动StreamingContext并等待程序停止。
ssc.start()
ssc.awaitTermination()
运行脚本
在树莓派3上执行Part 2中编写的Python脚本。
输出结果 (shū chū jié guǒ)
以下是显示的输出。与DataFrame的输出不同,窗口聚合的结果附带了时区。
1分バッチのDataFrame
+--------+-----------------+-----------------+----------+---------------+---------------------+
|ambient |bid |humidity |objecttemp|rh |time |
+--------+-----------------+-----------------+----------+---------------+---------------------+
|28.78125|B0:B4:48:BD:DA:03|28.72314453125 |22.96875 |75.714111328125|2017-08-01 23:44:03.0|
|28.78125|B0:B4:48:BD:DA:03|28.72314453125 |22.90625 |75.714111328125|2017-08-01 23:44:13.0|
|28.75 |B0:B4:48:BD:DA:03|28.72314453125 |22.875 |75.616455078125|2017-08-01 23:44:23.0|
|28.75 |B0:B4:48:BD:DA:03|28.69293212890625|23.15625 |75.616455078125|2017-08-01 23:44:34.0|
|28.75 |B0:B4:48:BD:DA:03|28.7030029296875 |23.03125 |75.616455078125|2017-08-01 23:44:44.0|
|28.75 |B0:B4:48:BD:DA:03|28.69293212890625|23.125 |75.616455078125|2017-08-01 23:44:55.0|
+--------+-----------------+-----------------+----------+---------------+---------------------+
2分ウィンドウの平均値
[{'bid': 'B0:B4:48:BD:DA:03', 'time': '2017-08-02T08:46:00+09:00', 'ambient': 28.760416666666668, 'rh': 75.64900716145833}]