尝试使用Apache Spark的流式处理

这个环境如下:
操作系统:Ubuntu 16或18
Hadoop版本:hadoop-3.2.1.tar.gz
JDK(Java开发工具)版本:jdk-8u202-linux-x64.tar.gz
Spark版本:spark-3.0.1-bin-hadoop3.2.tgz

名称节点
192.168.76.216: h-gpu05

数据节点
192.168.76.210: h-gpu03
192.168.76.210: h-gpu04

Scala和Sbt的版本


hadoop@h-gpu05:/mnt/data/hadoop$ scala -version
Scala code runner version 2.12.2 -- Copyright 2002-2017, LAMP/EPFL and Lightbend, Inc.
hadoop@h-gpu05:/mnt/data/hadoop$ sbt -version
sbt script version: 1.4.6

首先,在Spark-shell中尝试使用Spark Streaming。
本次将通过Spark-Streaming将处理的数据发送到本地主机的端口号9999。

打开端口


$nc -lk localhost 9999

hadoop@h-gpu05:/mnt/data/hadoop/spark-3.0.1-bin-hadoop3.2/bin$ ./spark-shell 
2021-01-07 11:01:12,652 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2021-01-07 11:01:17,935 WARN util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
2021-01-07 11:01:17,936 WARN util.Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
2021-01-07 11:01:17,936 WARN util.Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
2021-01-07 11:01:17,936 WARN util.Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.
2021-01-07 11:01:17,936 WARN util.Utils: Service 'SparkUI' could not bind on port 4044. Attempting port 4045.
Spark context Web UI available at http://h-gpu05:4045
Spark context available as 'sc' (master = local[*], app id = local-1609984878068).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.0.1
      /_/
         
Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_202)
Type in expressions to have them evaluated.
Type :help for more information.

scala> import org.apache.spark.{SparkContext, SparkConf}                                                                                                                                                  
import org.apache.spark.{SparkContext, SparkConf}

scala> import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}

scala> import org.apache.spark.storage.StorageLevel   
import org.apache.spark.storage.StorageLevel

scala> import org.apache.log4j.{Level, Logger}  
import org.apache.log4j.{Level, Logger}

scala> Logger.getRootLogger.setLevel(Level.WARN)

scala>  val ssc = new StreamingContext(sc, Seconds(10)) 
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@5008c5a

scala> val lines = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER)                                                                                                              
lines: org.apache.spark.streaming.dstream.ReceiverInputDStream[String] = org.apache.spark.streaming.dstream.SocketInputDStream@4c9a3ae

在下面的代码行中,描述对传入的数据(存储在lines中)的处理内容。



scala> val words = lines.flatMap(_.split(" "))                                                                                                                                                            
words: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.FlatMappedDStream@191a8997

scala> val pairs = words.map((_, 1))                                                                                                                                                                      
pairs: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.MappedDStream@1d67a1ad

scala> val wordCounts = pairs.reduceByKey(_ + _)                                                                                                                                                          
wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@215998b6

scala> wordCounts.print()

scala> ssc.start()

-------------------------------------------                                     
Time: 1609985280000 ms
-------------------------------------------

请按照以下方式发送。


$ nc -lk localhost 9999
hoge hoge hoge

Spark方面的回应


-------------------------------------------                                     
Time: 1609985290000 ms
-------------------------------------------
(hoge,3)

(`ー´)b 只需要一个选项,将以下内容以汉语原生方式进行改写:

接下来,尝试在jar文件中运行Spark Streaming。
最终输出将如下所示。


hadoop@h-gpu05:/mnt/data/hadoop/nii-cyber-security-admin/spark/spark-streaming-1$ ${SPARK_HOME}/bin/spark-submit --master local[*] --class StreamingFirst target/scala-2.12/app_2.12-1.4.6.jar localhost 9999
2021-01-07 21:04:56,179 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
-------------------------------------------
Time: 1610021100000 ms
-------------------------------------------
-------------------------------------------
Time: 1610021110000 ms
-------------------------------------------
-------------------------------------------
Time: 1610021120000 ms
-------------------------------------------

创建jar文件时的备忘录

自 Scala 2.11起(包括Scala-2.12.2 / sbt-1.4.6版及更新版本),不再使用sbt-assembly。

如果要执行sbt assembly命令,需要使用类似下面的plugins.sbt文件,但在scala 2.11等版本中,由于版本不匹配而遇到困难。


hadoop@h-gpu05:/mnt/data/hadoop/nii-cyber-security-admin/spark/spark-streaming-1$ more plugins.sbt.bak                                                                                                    
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.2.0")

如果是Scala-2.12.2 / sbt-1.4.6或更高版本,


$sbt clean package

可以创建可以在Spark Streaming中使用的JAR文件。

来看看代码吧。。(可以非常简单地编写)


 1import org.apache.spark.{SparkContext, SparkConf}
 2import org.apache.spark.streaming.{Seconds, StreamingContext}
 3import org.apache.spark.storage.StorageLevel
 4import org.apache.log4j.{Level, Logger}
 5
 6object StreamingFirst {
 7  def main(args: Array[String]) {
 8    Logger.getRootLogger.setLevel(Level.WARN)
 9
10    val sparkConf = new SparkConf().setAppName("StreamingFirst")
11    val sc = new SparkContext(sparkConf)
12    val ssc = new StreamingContext(sc, Seconds(10))
13    val lines = ssc.socketTextStream(args(0),
14                                     args(1).toInt,
15                                     StorageLevel.MEMORY_AND_DISK_SER)
16    val words = lines.flatMap(.split(" ")).filter(.nonEmpty)
17    val wordCounts = words.map((, 1)).reduceByKey( + _)
18    wordCounts.print()
19    ssc.start()
20    ssc.awaitTermination()
21
22  }
23}

以下是两个链接示例中的scala代码:

https://github.com/RuoAndo/qiita/blob/master/spark/spark-streaming-1/main.scala
https://github.com/RuoAndo/qiita/tree/master/spark/spark-streaming-1

请用中文重新表达这两个链接包含的内容及其意义。

试着执行(Spark-submit)一下…

|(;゚Д゚|


hadoop@h-gpu05:/mnt/data/hadoop/nii-cyber-security-admin/spark/spark-streaming-1$ sbt clean package
[info] Updated file /mnt/data/hadoop/nii-cyber-security-admin/spark/spark-streaming-1/project/build.properties: set sbt.version to 1.4.6
[info] welcome to sbt 1.4.6 (Private Build Java 1.8.0_275)
[info] loading project definition from /mnt/data/hadoop/nii-cyber-security-admin/spark/spark-streaming-1/project
[info] loading settings for project spark-streaming-1 from build.sbt ...
[info] set current project to App (in build file:/mnt/data/hadoop/nii-cyber-security-admin/spark/spark-streaming-1/)
[success] Total time: 0 s, completed Jan 7, 2021 9:18:47 PM
[warn] There may be incompatibilities among your library dependencies; run 'evicted' to see detailed eviction warnings.
[info] compiling 1 Scala source to /mnt/data/hadoop/nii-cyber-security-admin/spark/spark-streaming-1/target/scala-2.12/classes ...
[success] Total time: 9 s, completed Jan 7, 2021 9:18:57 PM

只需要一种选项,以下是对该表情的中文本地化释义:
「哇!」


hadoop@h-gpu05:~/qiita/spark/spark-streaming-1$ nc -lk localhost 9999

【啊!】


hadoop@h-gpu05:/mnt/data/hadoop/nii-cyber-security-admin/spark/spark-streaming-1$ ${SPARK_HOME}/bin/spark-submit --master local[*] --class StreamingFirst target/scala-2.12/app_2.12-1.4.6.jar localhost 9999                                                                                                                                                                                                       

2021-01-07 21:20:35,882 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

Time: 1610022040000 ms

|*゚Д゚| – 叹气表情


hadoop@h-gpu05:~/qiita/spark/spark-streaming-1$ nc -lk localhost 9999
hoge hoge hoge

( ´Д⊂ヽ)


-------------------------------------------
Time: 1610022090000 ms
-------------------------------------------
(hoge,3)

(`ー´)b – 需要中文母语的解释: (`ー´)b