关于Apache Flume和Spark Streaming的整合

假设分析Web服务器的访问日志的实时流式处理是一种常见的应用场景。这篇文章将介绍关于如何集成名为Apache Flume的日志收集平台和E-MapReduce集群的Spark Streaming分析平台的方法。

    前提
    • EMR-3.16.0

 

    • クラスタータイプは Hadoop

 

    • ハードウェア構成(Header)はecs.sn1ne.2xlargeを1台

 

    ハードウェア構成(Worker)はecs.sn1ne.2xlargeを3台
# cat /etc/redhat-release
CentOS Linux release 7.4.1708 (Core) 
# uname -r
3.10.0-693.2.2.el7.x86_64
# flume-ng version
Flume 1.8.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: Unknown
Compiled by root on Wed Nov 28 11:09:28 CST 2018
From source with checksum 63b5d03c9afd862ff786f7826ffe55d0
# hadoop version
Hadoop 2.7.2
Subversion http://gitlab.alibaba-inc.com/soe/emr-hadoop.git -r d2cd70f951304b8ca3d12991262e7e0d321abefc
Compiled by root on 2018-11-30T09:31Z
Compiled with protoc 2.5.0
From source with checksum 4447ed9f24dcd981df7daaadd5bafc0
This command was run using /opt/apps/ecm/service/hadoop/2.7.2-1.3.2/package/hadoop-2.7.2-1.3.2/share/hadoop/common/hadoop-common-2.7.2.jar
    Flumeの設定

关于Flume的使用方法,不在此处进行解释,但希望对感兴趣的人能参考官方文档。Flume的配置文件如下所示。源和汇分别配置为Spooldir和Avro客户端。


a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /root/spool
a1.sources.r1.fileHeader = true

a1.sinks.k1.type = avro
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 9906

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
    Spark Streamingについて

Spark Streaming提供了基于微批处理的流数据处理功能,可以在几秒钟到几分钟的短时间间隔内重复执行。本次使用的时间间隔数据如下:

DStreamのバッチ間隔1秒スライディング間隔1秒ウィンドウサイズ300 秒
import re
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.flume import FlumeUtils

parts = [
    r'(?P<host>\S+)',                   
    r'\S+',                            
    r'(?P<user>\S+)',                  
    r'\[(?P<time>.+)\]',               
    r'"(?P<request>.+)"',               
    r'(?P<status>[0-9]+)',              
    r'(?P<size>\S+)',                   
    r'"(?P<referer>.*)"',               
    r'"(?P<agent>.*)"', 
]
pattern = re.compile(r'\s+'.join(parts)+r'\s*\Z')

def extractURLRequest(line):
    exp = pattern.match(line)
    if exp:
        request = exp.groupdict()["request"]
        if request:
           requestFields = request.split()
           if (len(requestFields) > 1):
                return requestFields[1]

if __name__ == "__main__":

    sc = SparkContext(appName="StreamingFlumeLogAggregator")
    sc.setLogLevel("ERROR")
    ssc = StreamingContext(sc, 1)
    flumeStream = FlumeUtils.createStream(ssc, "localhost", 9906)
    lines = flumeStream.map(lambda x: x[1])
    urls = lines.map(extractURLRequest)   
    urlCounts = urls.map(lambda x: (x, 1)).reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y : x - y, 300, 1)

    sortedResults = urlCounts.transform(lambda rdd: rdd.sortBy(lambda x: x[1], False))
    sortedResults.pprint()
    ssc.checkpoint("/root/checkpoint")
    ssc.start()
    ssc.awaitTermination()
    実行する

将用于分析的访问日志导入到SpoolDir(/root/spool)中,然后使用下面的命令启动Flume NG。然后,在另一个进程中启动上述的SparkStreaming应用程序。

# bin/flume-ng agent --conf conf --conf-file ~/sparkstreamingflume.conf --name a1 -Dflume.root.logger=INFO,console

spark-submit --packages org.apache.spark:spark-streaming-flume_2.11:2.3.2 SparkFlume.py
# tail  access_log.txt 
46.166.139.20 - - [06/Dec/2015:03:14:54 +0000] "POST /xmlrpc.php HTTP/1.0" 200 370 "-" "Mozilla/4.0 (compatible: MSIE 7.0; Windows NT 6.0)"
46.166.139.20 - - [06/Dec/2015:03:14:54 +0000] "POST /xmlrpc.php HTTP/1.0" 200 370 "-" "Mozilla/4.0 (compatible: MSIE 7.0; Windows NT 6.0)"
46.166.139.20 - - [06/Dec/2015:03:14:55 +0000] "POST /xmlrpc.php HTTP/1.0" 200 370 "-" "Mozilla/4.0 (compatible: MSIE 7.0; Windows NT 6.0)"
46.166.139.20 - - [06/Dec/2015:03:14:55 +0000] "POST /xmlrpc.php HTTP/1.0" 200 370 "-" "Mozilla/4.0 (compatible: MSIE 7.0; Windows NT 6.0)"
46.166.139.20 - - [06/Dec/2015:03:14:56 +0000] "POST /xmlrpc.php HTTP/1.0" 200 370 "-" "Mozilla/4.0 (compatible: MSIE 7.0; Windows NT 6.0)"
46.166.139.20 - - [06/Dec/2015:03:14:56 +0000] "POST /xmlrpc.php HTTP/1.0" 200 370 "-" "Mozilla/4.0 (compatible: MSIE 7.0; Windows NT 6.0)"
46.166.139.20 - - [06/Dec/2015:03:14:57 +0000] "POST /xmlrpc.php HTTP/1.0" 200 370 "-" "Mozilla/4.0 (compatible: MSIE 7.0; Windows NT 6.0)"
46.166.139.20 - - [06/Dec/2015:03:14:58 +0000] "POST /xmlrpc.php HTTP/1.0" 200 370 "-" "Mozilla/4.0 (compatible: MSIE 7.0; Windows NT 6.0)"
46.166.139.20 - - [06/Dec/2015:03:14:58 +0000] "POST /xmlrpc.php HTTP/1.0" 200 370 "-" "Mozilla/4.0 (compatible: MSIE 7.0; Windows NT 6.0)"
46.166.139.20 - - [06/Dec/2015:03:14:59 +0000] "POST /xmlrpc.php HTTP/1.0" 200 370 "-" "Mozilla/4.0 (compatible: MSIE 7.0; Windows NT 6.0)"

现在我们能够以每秒一次的实时处理方式来处理以下内容,输出了每个访问URL的访问次数(前10位)列表。

-------------------------------------------

Time: 2019-03-15 14:02:19

(u'/xmlrpc.php', 8509)
(u'/wp-login.php', 1798)
(u'/', 119)
(u'/robots.txt', 44)
(u'/blog/', 36)
(u'/page-sitemap.xml', 29)
(u'/post-sitemap.xml', 29)
(u'/category-sitemap.xml', 29)
(u'/sitemap_index.xml', 29)
(u'http://51.254.206.142/httptest.php', 26)
...



Time: 2019-03-15 14:02:20

(u'/xmlrpc.php', 68415)
(u'/wp-login.php', 1923)
(u'/', 440)
(u'/blog/', 138)
(u'/robots.txt', 123)
(u'/post-sitemap.xml', 118)
(u'/sitemap_index.xml', 118)
(u'/page-sitemap.xml', 117)
(u'/category-sitemap.xml', 117)
(u'/orlando-headlines/', 95)
...



Time: 2019-03-15 14:02:21

(u'/xmlrpc.php', 68415)
(u'/wp-login.php', 1923)
(u'/', 440)
(u'/blog/', 138)
(u'/robots.txt', 123)
(u'/post-sitemap.xml', 118)
(u'/sitemap_index.xml', 118)
(u'/page-sitemap.xml', 117)
(u'/category-sitemap.xml', 117)
(u'/orlando-headlines/', 95)
...
    最後

大家好,您对结合Spark Streaming和Apache Flume构建的流数据处理系统以及验证结果的介绍感觉如何呢?实际上,除了Web访问之外,我认为这种系统在实现实时分析和监控各种日常业务日志、流量日志、邮件数据等方面也是可行的,希望您可以考虑一下。

bannerAds