概要地回顾Python构建的实时处理基础设施,以满足”我想了解目前的数据!”的需求
这篇文章是NTT通信官方圣诞节日历的最后一天的文章。没想到我竟然担任了最后一天的文章责任,真是不好意思。
该作者目前在技术开发部门工作,负责进行公司内外各种数据分析和异常检测技术的研究与开发。最近的研究成果已被AAAI的IAAI2018会议采纳。
首先
为了迅速向团队内的各种服务部门提供分析结果,我们正在团队中建立使用开源软件构建的分析基础设施。在进行报告分析时,需要进行数据汇总和分析处理。根据观看者的不同,也有各种报告间隔(分析结果更新频率)的要求。例如,
-
- 定期的なメール配信、データの最新化、機械学習のモデルの更新(1日に1回のバッチ処理)
-
- 1日に何度かデータにアクセスして、データの状況を確認する(◯分間 / ◯時間でのバッチ処理)
- 常に今のデータをモニタリングしたい(ニアリアルタイム処理、ストリーミング処理)
特别是最近的社交谈话中,经常会涉及到希望实时查看数据或者想知道现在的情况这样的话题。实际上,每天都不会一直盯着监视器。但是,在服务出现故障时,可能会需要紧盯监视器,渴望尽快知道数据是否已经到了或者现在的情况如何。另外,如果能够在演示中展示实时运行的话,也会显得很酷!
我们在各种开源软件架构下实施了这样的实时分析,并进行了验证和引入。在JANOG2 39th中,我们的同事@__kaname__负责了使用pmacct->kafka->presto->re:dash进行快速流分析的报告。我们公司也发布了有关网络流量数据的实时分析案例。除了这样的网络流量数据分析外,我们还使用在互联网上安装的传感器(探测器)收集不同的网络流量数据,并在各种角度上进行分析。因此,在本文中,我想分享关于搭建基于Kafka+SparkStreaming的实时处理平台的经验。由于对许多技术是第一次接触,我在学习的同时也在实践中积累经验。
在解释时,会涉及到fluentd、Docker、Kafka、Spark等术语,但在这方面我会尽量用平易的语言进行解释,但不会过多展开。
建筑设计
假设在多个地点安装了传感器,并且通过Fluentd将数据保存在云上的虚拟机中。
在这种情况下,需要的条件是,
-
- SQLベースで書ける処理が必要。すでに分析者が利用しているSQLクエリが複数存在するので、リアルタイムに集計する際にもそれを利用できるようにしたい。
-
- Pythonベースで構築したい。チームのスキルセット的にPythonで構築したほうがメンテナンスコストがかからないのと、基盤上で機械学習モデルAPIのインタフェースをすぐに利用できるためです。
-
- 分析環境の再現性の確保のため、Dockerコンテナで構築したい。
- Kibanaでの可視化のため、Elasticsearchにデータを格納したい。
基于上述要求和当前趋势,我们决定使用Docker容器在Kafka和SparkStreaming上进行部署,并使用Python构建所有处理过程。
选择SparkStreaming的原因包括以下几点:不要求实时粒度到几秒的程度,SparkSQL能够使用SQL处理流入的数据,同时可以直接在Python中使用机器学习API。
以下是Ponzi图的内容。本文排除了节点数、集群规模以及机器学习API等方面的内容。

我們使用的版本如下所列。
-
- fluentd : 0.12.40
-
- Kafka : 0.10.2.1
-
- Spark : 2.2
- Elasticsearch : 5.6.2
目前,Kafka集群和Elasticsearch集群已经启动。
从云端的Fluentd传送到Kafka
为了将从收集传感器发送到Kafka,我们使用云环境的fluentd作为中继。为此,我们将编辑fluent.conf,并使用fluent-plugin-kafka作为输出插件发送到Kafka。以下是一个示例:
<match aaa>
<store>
@type kafka_buffered
brokers XXX.XXX.XXX.XXX:9092
default_topic TOPIC_NAME
<snip>
# See fluentd document for buffer related parameters
max_send_retries 1
required_acks -1
ack_timeout nil
compression_codec gzip
kafka_agg_max_bytes 4096
kafka_agg_max_messages nil
max_send_limit_bytes 1000000
discard_kafka_delivery_failed false
monitoring_list []
</store>
</match>
参数调整按照文档进行,但也进行了设置并限制了 max_send_limit_bytes。设置后重新加载后,频繁出现了 forward error error=# error_class=Fluent::BufferQueueLimitError 的错误。参考过处理类似错误的方法(在 td-agent 中处理队列大小超过限制的方法),对 Fluentd 的虚拟机内存进行增加来处理。
从云上的Kafka连接到基础设施上的Kafka。
将云端的Kafka发送数据到基础Kafka。需要进行数据清洗处理,以便更容易进行分析。这包括为原始数据添加节点标签信息、时间信息和统计信息等。
为了使用SparkStreaming进行处理,首先要确保在任何环境下都能保证可重现性,所以需要构建Spark的Docker容器。虽然已经有公开的Spark Docker镜像文件,但是由于包含了不必要的库等原因,没有简单的镜像可用,所以需要准备Dockerfile。对于使用Docker Swarm构建Apache Spark集群的参考也很有帮助。
我只会提取Dockerfile中的重要部分。
安装Spark
# install spark
RUN cd /tmp \
&& curl -LO http://ftp.jaist.ac.jp/pub/apache/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz \
&& tar zxf spark-2.2.0-bin-hadoop2.7.tgz \
&& mv spark-2.2.0-bin-hadoop2.7 /spark \
&& rm spark-2.2.0-bin-hadoop2.7.tgz
使用Spark时,需要下载外部的JAR文件。这些文件包括apache-log4j-extras-1.2.17.jar,用于以JST时区显示日志,以及spark-streaming-kafka-0-8-assembly_2.11-2.2.0.jar,用于从Kafka加载数据。
另外,如果使用Structured Streaming,需要参考Structured Streaming + Kafka Integration Guide(Kafka broker版本0.10.0或更高版本),并使用spark-sql-kafka-0-10_2.11-2.2.0.jar。
# add jar files
RUN curl -o /spark/jars/apache-log4j-extras-1.2.17.jar -L "https://www.apache.org/dist/logging/log4j/extras/1.2.17/apache-log4j-extras-1.2.17.jar"
RUN curl -o /spark/jars/spark-sql-kafka-0-10_2.11-2.2.0.jar -L "http://central.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.11/2.2.0/spark-sql-kafka-0-10_2.11-2.2.0.jar"
RUN curl -o /spark/jars/spark-streaming-kafka-0-8-assembly_2.11-2.2.0.jar "http://central.maven.org/maven2/org/apache/spark/spark-streaming-kafka-0-8-assembly_2.11/2.2.0/spark-streaming-kafka-0-8-assembly_2.11-2.2.0.jar"
这是一个用于将数据发布到Kafka的Python库,适用于Python在Spark中的版本设置环境变量。
# python packages
RUN pip3 install --upgrade pip setuptools
RUN pip3 install kafka-python
# set python3 for spark
ENV PYSPARK_PYTHON=python3
构建Dockerfile并生成镜像。
$ docker build -t spark-streaming-load:latest .
使用docker-compose进行启动。在此过程中,将挂载Spark的配置文件并启动。
spark-streaming-load:
image: spark-streaming-load:latest
container_name: spark-streaming-load
hostname: spark-streaming-load
volumes:
- /data:/data
- /spark/conf/load.spark-defaults.conf:/spark/conf/defaults.conf
- /spark/conf/load.log4j.properties:/spark/conf/log4j.properties
- /spark/log:/spark/log
在load.spark-defaults.conf中设置与Spark性能有关的参数。在官方文档中查看参数的内容。以下是示例。关于参数调优的详细信息将在后面介绍。
spark.master local[2]
spark.driver.memory 8g
spark.executor.memory 8g
spark.eventLog.enabled true
spark.streaming.concurrentJobs 1
通过设置日志文件,可以设置日志的输出位置和格式,以便于进行调试。
进行将日志输出到文件的设置(参考:Spark Streaming 日志配置)。每个文件最多存储50MB数据,如果超过,将写入到另一个文件中。设置保存最多5个文件。
# Set everything to be logged to the console
#log4j.rootCategory=INFO,console
log4j.rootCategory=INFO,rolling
# logged to the rolling setting
log4j.appender.rolling=org.apache.log4j.RollingFileAppender
log4j.appender.rolling.layout=org.apache.log4j.EnhancedPatternLayout
log4j.appender.rolling.layout.conversionPattern=[%d{ISO8601}{GMT+9}]%-5p - %m%n
log4j.appender.rolling.maxFileSize=50MB
log4j.appender.rolling.maxBackupIndex=5
log4j.appender.rolling.file=/var/log/spark/streaming_load.log
log4j.appender.rolling.encoding=UTF-8
对于时间刻度,将按照以下格式输出。注释被默认设置为禁用。
#log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout=org.apache.log4j.EnhancedPatternLayout
#log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss/zzz} %p %c{1}: %m%n
log4j.appender.console.layout.ConversionPattern=[%d{ISO8601}{GMT+9}]%-5p - %m%n
结果,日志被成功输出到文件中。
<snip>
[2017-11-01 14:25:26,413]INFO - Running Spark version 2.2.0
<snip>
我們已經完成了基本的設置,接下來準備Python腳本。以下是一個範例,我們將使用Direct Kafka來載入數據。
# -*- coding:utf-8 -*-
import sys,json
from datetime import datetime
from kafka import KafkaProducer
from collections import defaultdict
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SQLContext,SparkSession
def process(dstream):
# process
<省略>
# Output
BROKER_HOSTS = "XXX.XXX.XXX.XXX:9092"
producer = KafkaProducer(bootstrap_servers = BROKER_HOSTS,
value_serializer = lambda v : json.dumps(v).encode('utf-8'))
producer.send("TOPIC_NAME", data)
if __name__ == "__main__":
# 30秒毎のバッチ処理を繰り返すStreamingContext
spark = (SparkSession.builder.getOrCreate())
sc = spark.sparkContext
sqlContext = SQLContext(sc)
ssc = StreamingContext(sc, 30)
# kafka Direct Stream
def my_utf8_decoder(s):
try :
if s is None:
return "{}"
return s.decode('utf-8')
except UnicodeDecodeError :
return "{}"
BROKERS_HOSTS = "XXX.XXX.XXX.XXX:9092"
kafkaStream = KafkaUtils.createDirectStream(ssc = ssc,
topics = ["TOPIC_NAME"],
kafkaParams = {"metadata.broker.list" : BROKERS_HOSTS},
valueDecoder = my_utf8_decoder)
# Parse
lines = kafkaStream.map(lambda x: x[1])
lines.foreachRDD(process)
# ストリーム処理を開始
ssc.start()
ssc.awaitTermination()
由于Spark有一个示例文件,所以基本上可以通过参考它来运行。但是在持续运行时,会发生来自Kafka的数据加载时的EOFError。这似乎是由于无法解码数据的字符编码引起的。因此,在valueDecoder中添加了异常处理。发生这个的原因是传感器数据获取的错误,导致偶尔出现字符串数据。在现实问题中,这种情况经常发生。参考:连接Flume推特流到Spark时出现UTF-8编码错误。
为了避免任务堆积,进行参数调整。
为了稳定地运行SparkStreaming作业,必须监视作业是否堆积等情况。如果置之不理,作业处理将赶不上,导致数据延迟。例如,在某个时间点数据急剧增加或者添加了繁重的处理任务时会发生这种情况。
在Spark的调试日志中,会出现”INFO JobScheduler:已添加时间为◯◯◯毫秒的作业”,从中可以意识到出现了这种情况。
当初,我注意到这个日志一直在频繁输出,于是我搜索了一下,发现像是在我的Spark独立集群中连续添加了一些任务,需要说明的是,这是在处理线程不足的情况下产生的INFO日志。
虽然仅仅增加Spark的工作节点数量并不能解决问题,但接下来我们开始研究性能调优。
在我进行搜索并找到的文章中,特别有帮助的是Linkedin的技术博客《Spark Streaming : Performance Tuning With Kafka and Mesos》。根据我的理解,它的摘要如下:
-
- Receiver baseより Direct base推奨。理由は、Kafkaのデータパーティションごとに、SparkのRDDを1対1で対応させることができる。結果、サーバのコア数をフルに活用して処理がはやくなる。Kafkaのデータパーティションはサーバのコア数の約2〜3倍のほうがフルにCPUを利用できる(らしい)
Batch Interval Parameterを少しずつ大きくして調整する。これは、Sparkのミニバッチの間隔のことで、ssc = StreamingContext(sc, XXX)のXXX部分を調整します。これはジョブの大きさや要件(RTB広告配信などではかなりキモになる)によって最適なのを調整します。
ConcurrentJobs Parameter を大きくする。ジョブの実行数のことで、ここを大きくすると、あるジョブの処理が遅れていても、それが完了するのを待つことなく次のジョブを実行することができます。ただし、私の意見でもありますが、本来1であるのが望ましいと思います。理由は、そもそもジョブがなぜ遅れているのかの原因がログ上で見えにくくなるためです。
KafkaのPatitionがあるなら、1ミニバッチの1パーティションあたりの処理メッセージ数の制限値maxRatePerPartitionを設定(デフォルトはnot set)する。これはあるパーティションで急激なデータが増えても、制限値でコントロールできるようにするためのパラメータです。
除此之外,MapR的《Apache Kafka/Spark Streaming系统的性能优化-电信案例研究》也对系统中工作节点的内存分配等进行了介绍,对我非常有参考价值。
将卡夫卡中的数据导出到Elasticsearch
我們開始將數據積累到基礎設施上的 Kafka。關於如何使用這些數據,這次我們將使用另一個 SparkStreaming 來將這些數據存儲到 ElasticSearch 中,以供實時可視化使用。以下是示例代碼。
# -*- coding:utf-8 -*-
import json
from datetime import datetime
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SQLContext,SparkSession,Row
from utils.Schema import Schema
from utils.Select import Select
from utils.ElasticSpark import ElasticSpark
def getSparkSessionInstance(sparkConf):
if ('sparkSessionSingletonInstance' not in globals()):
globals()['sparkSessionSingletonInstance'] = SparkSession\
.builder\
.config(conf=sparkConf)\
.getOrCreate()
return globals()['sparkSessionSingletonInstance']
def process(dstream):
# dstream load
spark_dstream = getSparkSessionInstance(dstream.context.getConf())
row = dstream.map(lambda x: Row(
date = json.loads(x)["DATE"]
key = json.loads(x)["KEY"],
value = float(json.loads(x)["VALUE"]))
)
# schema setting
schema = Schema()
row_df = spark_dstream.createDataFrame(row, schema.get())
row_df.show()
# Select and Filtering
ps = Select(spark = spark, df = row_df)
sql_df = ps.select(keyword = "AAA")
sql_df.show()
# To ES
es = ElasticSpark()
es.df_write(sql_df, "BBB", "BBB")
if __name__ == "__main__":
# StreamingContext by Each 60 sec
spark = (SparkSession.builder.getOrCreate())
sc = spark.sparkContext
sqlContext = SQLContext(sc)
ssc = StreamingContext(sc, 60)
# kafka Direct Stream
BROKERS_HOSTS = "XXX.XXX.XXX.XXX:9092"
kafkaStream = KafkaUtils.createDirectStream(ssc = ssc,
topics = ["TOPIC_NAME"],
kafkaParams = {"metadata.broker.list" : BROKERS_HOSTS})
# Batch Process Define
lines = kafkaStream.map(lambda x: x[1])
lines.foreachRDD(process)
# Start
ssc.start()
ssc.awaitTermination()
大致上与Spark示例代码相同,但为了确保其能够在实际分析中使用,我进行了一些处理。以下是大致的流程说明。
为了使用Spark的DataFrame类型处理数据,首先将RDD格式以Row类型进行描述,然后构建DataFrame。
接下来,我们将使用自定义的Schema类来定义每个数据列的模式(例如:字符串、浮点数和日期类型)。示例如下:
schema = StructType([
StructField("date", TimestampType(), True),
StructField("key", StringType(), True),
StructField("value", FloatType(), True)
])
接下来,我们可以使用自定义的Select类基于Spark SQL从数据中选择所需的列,或者仅提取与条件匹配的行,还可以使用Groupby等方法进行统计处理以生成数据。以下是一个例子:
sql_df = self.spark.sql("""
select *
from df
where key rlike '^[a-z]+[0-9]{2}$' AND
(value > 0 AND value < 100)
""")
最後,將DataFrame數據導入到Elasticsearch中。在這裡,數據將通過Elasticsearch的基本身份驗證進行存儲。這裡需要注意的是,我發現es.nodes.wan.only的默認值似乎是False,但我在自己的環境中將其設置為True。此外,我還將添加異常處理,以確保在Elasticsearch服務宕機時仍然能夠繼續處理。
try :
df.write\
.format("org.elasticsearch.spark.sql")\
.mode('append')\
.option("es.nodes",self.urls)\
.option("es.port",self.port)\
.option("es.net.http.auth.user",self.user)\
.option("es.net.http.auth.pass",self.password)\
.option("es.nodes.wan.only", True)\
.option("es.resource",index+"/"+mapping)\
.save()
except Py4JJavaError :
print("ERROR - Something wrong to import ES")
最终结果是以DataFrame形式的数据流入,并被导入到Elasticsearch中。
<snip>
INFO DAGScheduler: Job 93 finished: showString at NativeMethodAccessorImpl.java:0, took 0.162547 s
+---------------------+-------------+-----------+
| date| key| value|
+---------------------+-------------+-----------+
| 2017-12-12 09:58:50| abcc12| 3.72|
+---------------------+-------------+-----------+
INFO SparkContext: Starting job: runJob at EsSparkSQL.scala:101
<snip>
然后可以使用Kibana按照个人喜好来查看数据。省略了此设置。
回顾构建后的情况时
-
- Kafka+SparkStreamingを今回一から作りました。サンプルコードは色々あるのですが、それを実環境で動かすまでのプロセスはそれの組み合わせだけじゃ解決しないことも多かったです。その辺は、幅広い周辺知識をキャッチアップしながら、あとはStack Overflowの検索力。
-
- データの発生からのESへの格納までのボトルネック箇所の分析はこれから必要になりそうです。Kafka、Spark、Elasticsearchとボトルネック箇所となり得るのが複数考えられるため、切り分けのために各種プロセスのモニタリングが必要になります。Spark Streamingの概要と検証シナリオは読んでいて、ボトルネック箇所を調べる勉強になりました。
-
- どれくらいの前のデータを見たいというニーズにこのアーキテクチャだとどこまで耐えれるのかはまだまだわかりません。例えば、10秒後のデータがみたいといったことに耐えれるのかなど。
- セキュリティ要件を今回は省いていますが、SSL化等でパフォーマンスがどの程度落ちるのかも気になります。
我們在運用中理解要點,現在達到了僅使用Python就能完成處理的能力,所以將來會朝著更好的方向努力。
在最后
在本篇文章中,作者回顾了自己在Python中构建实时处理基础设施的经验。最近一直在进行异常检测技术的研究和论文写作,因此能够以一种新鲜的心态来处理基础设施的构建。回顾过程中,确实存在一些不足之处,但通过参与节日的机会,能够将其输出。对于为什么会做这样的事情等一些朴素的意见和问题,我们非常欢迎!享受数据吧!
通过基于JavaScript的随机函数进行公平判断来决定。↩
日本网络运营商协会的缩写。旨在通过讨论、研究和介绍与互联网技术及相关操作事项有关的事宜,为日本的互联网技术人员和用户做出贡献的组织(官方网页)↩
可以定期将文件写入HDFS并作为冷数据进行永久保存,也可以通过机器学习将分析结果存储在ES中考虑。↩