尝试使用Apache Spark的流式处理
这个环境如下:
操作系统:Ubuntu 16或18
Hadoop版本:hadoop-3.2.1.tar.gz
JDK(Java开发工具)版本:jdk-8u202-linux-x64.tar.gz
Spark版本:spark-3.0.1-bin-hadoop3.2.tgz
名称节点
192.168.76.216: h-gpu05
数据节点
192.168.76.210: h-gpu03
192.168.76.210: h-gpu04
Scala和Sbt的版本
hadoop@h-gpu05:/mnt/data/hadoop$ scala -version
Scala code runner version 2.12.2 -- Copyright 2002-2017, LAMP/EPFL and Lightbend, Inc.
hadoop@h-gpu05:/mnt/data/hadoop$ sbt -version
sbt script version: 1.4.6
首先,在Spark-shell中尝试使用Spark Streaming。
本次将通过Spark-Streaming将处理的数据发送到本地主机的端口号9999。
打开端口
$nc -lk localhost 9999
hadoop@h-gpu05:/mnt/data/hadoop/spark-3.0.1-bin-hadoop3.2/bin$ ./spark-shell
2021-01-07 11:01:12,652 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2021-01-07 11:01:17,935 WARN util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
2021-01-07 11:01:17,936 WARN util.Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
2021-01-07 11:01:17,936 WARN util.Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
2021-01-07 11:01:17,936 WARN util.Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.
2021-01-07 11:01:17,936 WARN util.Utils: Service 'SparkUI' could not bind on port 4044. Attempting port 4045.
Spark context Web UI available at http://h-gpu05:4045
Spark context available as 'sc' (master = local[*], app id = local-1609984878068).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.0.1
/_/
Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_202)
Type in expressions to have them evaluated.
Type :help for more information.
scala> import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.{SparkContext, SparkConf}
scala> import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
scala> import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevel
scala> import org.apache.log4j.{Level, Logger}
import org.apache.log4j.{Level, Logger}
scala> Logger.getRootLogger.setLevel(Level.WARN)
scala> val ssc = new StreamingContext(sc, Seconds(10))
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@5008c5a
scala> val lines = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER)
lines: org.apache.spark.streaming.dstream.ReceiverInputDStream[String] = org.apache.spark.streaming.dstream.SocketInputDStream@4c9a3ae
在下面的代码行中,描述对传入的数据(存储在lines中)的处理内容。
scala> val words = lines.flatMap(_.split(" "))
words: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.FlatMappedDStream@191a8997
scala> val pairs = words.map((_, 1))
pairs: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.MappedDStream@1d67a1ad
scala> val wordCounts = pairs.reduceByKey(_ + _)
wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@215998b6
scala> wordCounts.print()
scala> ssc.start()
-------------------------------------------
Time: 1609985280000 ms
-------------------------------------------
请按照以下方式发送。
$ nc -lk localhost 9999
hoge hoge hoge
Spark方面的回应
-------------------------------------------
Time: 1609985290000 ms
-------------------------------------------
(hoge,3)
(`ー´)b 只需要一个选项,将以下内容以汉语原生方式进行改写:
接下来,尝试在jar文件中运行Spark Streaming。
最终输出将如下所示。
hadoop@h-gpu05:/mnt/data/hadoop/nii-cyber-security-admin/spark/spark-streaming-1$ ${SPARK_HOME}/bin/spark-submit --master local[*] --class StreamingFirst target/scala-2.12/app_2.12-1.4.6.jar localhost 9999
2021-01-07 21:04:56,179 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
-------------------------------------------
Time: 1610021100000 ms
-------------------------------------------
-------------------------------------------
Time: 1610021110000 ms
-------------------------------------------
-------------------------------------------
Time: 1610021120000 ms
-------------------------------------------
创建jar文件时的备忘录
自 Scala 2.11起(包括Scala-2.12.2 / sbt-1.4.6版及更新版本),不再使用sbt-assembly。
如果要执行sbt assembly命令,需要使用类似下面的plugins.sbt文件,但在scala 2.11等版本中,由于版本不匹配而遇到困难。
hadoop@h-gpu05:/mnt/data/hadoop/nii-cyber-security-admin/spark/spark-streaming-1$ more plugins.sbt.bak
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.2.0")
如果是Scala-2.12.2 / sbt-1.4.6或更高版本,
$sbt clean package
可以创建可以在Spark Streaming中使用的JAR文件。
来看看代码吧。。(可以非常简单地编写)
1import org.apache.spark.{SparkContext, SparkConf}
2import org.apache.spark.streaming.{Seconds, StreamingContext}
3import org.apache.spark.storage.StorageLevel
4import org.apache.log4j.{Level, Logger}
5
6object StreamingFirst {
7 def main(args: Array[String]) {
8 Logger.getRootLogger.setLevel(Level.WARN)
9
10 val sparkConf = new SparkConf().setAppName("StreamingFirst")
11 val sc = new SparkContext(sparkConf)
12 val ssc = new StreamingContext(sc, Seconds(10))
13 val lines = ssc.socketTextStream(args(0),
14 args(1).toInt,
15 StorageLevel.MEMORY_AND_DISK_SER)
16 val words = lines.flatMap(.split(" ")).filter(.nonEmpty)
17 val wordCounts = words.map((, 1)).reduceByKey( + _)
18 wordCounts.print()
19 ssc.start()
20 ssc.awaitTermination()
21
22 }
23}
以下是两个链接示例中的scala代码:
https://github.com/RuoAndo/qiita/blob/master/spark/spark-streaming-1/main.scala
https://github.com/RuoAndo/qiita/tree/master/spark/spark-streaming-1
请用中文重新表达这两个链接包含的内容及其意义。
试着执行(Spark-submit)一下…
|(;゚Д゚|
hadoop@h-gpu05:/mnt/data/hadoop/nii-cyber-security-admin/spark/spark-streaming-1$ sbt clean package
[info] Updated file /mnt/data/hadoop/nii-cyber-security-admin/spark/spark-streaming-1/project/build.properties: set sbt.version to 1.4.6
[info] welcome to sbt 1.4.6 (Private Build Java 1.8.0_275)
[info] loading project definition from /mnt/data/hadoop/nii-cyber-security-admin/spark/spark-streaming-1/project
[info] loading settings for project spark-streaming-1 from build.sbt ...
[info] set current project to App (in build file:/mnt/data/hadoop/nii-cyber-security-admin/spark/spark-streaming-1/)
[success] Total time: 0 s, completed Jan 7, 2021 9:18:47 PM
[warn] There may be incompatibilities among your library dependencies; run 'evicted' to see detailed eviction warnings.
[info] compiling 1 Scala source to /mnt/data/hadoop/nii-cyber-security-admin/spark/spark-streaming-1/target/scala-2.12/classes ...
[success] Total time: 9 s, completed Jan 7, 2021 9:18:57 PM
只需要一种选项,以下是对该表情的中文本地化释义:
「哇!」
hadoop@h-gpu05:~/qiita/spark/spark-streaming-1$ nc -lk localhost 9999
【啊!】
hadoop@h-gpu05:/mnt/data/hadoop/nii-cyber-security-admin/spark/spark-streaming-1$ ${SPARK_HOME}/bin/spark-submit --master local[*] --class StreamingFirst target/scala-2.12/app_2.12-1.4.6.jar localhost 9999
2021-01-07 21:20:35,882 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Time: 1610022040000 ms
|*゚Д゚| – 叹气表情
hadoop@h-gpu05:~/qiita/spark/spark-streaming-1$ nc -lk localhost 9999
hoge hoge hoge
( ´Д⊂ヽ)
-------------------------------------------
Time: 1610022090000 ms
-------------------------------------------
(hoge,3)
(`ー´)b – 需要中文母语的解释: (`ー´)b