【入门级别】ApacheSpark基础教程-SparkStreaming
上一次是在这里。
https://qiita.com/SHA_AKA/items/f57cbe11b208282103e3
在基础编辑的最后,我们将尝试实现SparkStreaming。
环境与上一次相同。
目录
-
- 更新pom.xml文件和下载ncat
-
- 无状态和有状态
-
- 第一段(无状态)
-
- 窗口处理(有状态)
-
- 写入MySQL
- 其他
更新pom.xml文件和下载ncat软件
在pom.xml中添加以下的依赖关系
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.13</artifactId>
<version>3.2.1</version>
<scope>provided</scope>
</dependency>
为了监视本地端口并获取数据,需要下载ncat。
sudo apt-get -y install ncat
安装完成后,打开一个新的终端,并执行以下命令。
nc -lk 8080
无状态和有状态
流媒体处理本质上是批处理。只是处理时间间隔非常短,所以外观上看起来是”连续”的。
Spark Streaming中有两种转换操作:无状态和有状态。
简单地说,
・无状态:仅针对本次获取的数据进行操作,与前后的数据无关。
・有状态:不仅涉及本次数据,还涉及前后的数据或其计算结果。
首先,在初始代码中提供无状态的示例。
最初的代码(无状态)
实例操作:提取包含错误的记录。
package com;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
public class SparkStreaming {
public static void main(String[] args) throws InterruptedException {
/*
conf設定、local[2]とは2 thread
JavaStreamingContext設定、Durations.seconds(5)とは処理間隔が5秒
*/
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("sparkStreamIng");
JavaStreamingContext javaStreamingContext = new JavaStreamingContext(conf, Durations.seconds(5));
/*
InputDStream:IP:localhost,PORT:8080
errorを含む行を抽出
*/
JavaReceiverInputDStream<String> inputDStream = javaStreamingContext.socketTextStream("xiexiaofeng-virtual-machine",8080);
JavaDStream<String> errorLine = inputDStream.filter(s -> s.contains("error"));
//errorLineをプリント
errorLine.print();
/*
監視開始
*/
javaStreamingContext.start();
javaStreamingContext.awaitTermination();
}
}

窗口处理(有状态)
在stateful操作中,需要使用window函数来指定时间间隔。
– window函数:用于处理固定时间间隔的数据。
– window length:窗口的长度,即时间间隔。
– sliding interval:处理频率。
此外,在stateful操作中需要检查点。
代码示例:每30秒对每个IP地址进行访问次数计数,并每10秒执行一次。
package com;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
public class SparkStreamingCount {
public static void main(String[] args) throws InterruptedException {
SparkConf sparkConf = new SparkConf().setAppName("SparkCount").setMaster("local[2]");
JavaStreamingContext javaStreamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(10));
//check point
javaStreamingContext.checkpoint("/opt/data1");
JavaDStream<String> dStream = javaStreamingContext.socketTextStream("xiexiaofeng-virtual-machine",8080);
JavaPairDStream<String,Long> namedStream = dStream.mapToPair(s-> new Tuple2(s,1l));
/*
window関数:一定時間間隔のデータを処理する
window length:windowの長さ、つまり時間間隔、ここでは30秒
sliding interval:処理の頻度、ここでは10秒
*/
JavaPairDStream<String, Long> result = namedStream.reduceByKeyAndWindow(new Add(), new Minus(), Durations.seconds(30), Durations.seconds(10));
//print
result.print();
//start
javaStreamingContext.start();
javaStreamingContext.awaitTermination();
}
static class Add implements Function2<Long,Long,Long> {
@Override
public Long call(Long v1, Long v2) throws Exception{
return v1 + v2;
}
}
static class Minus implements Function2<Long,Long,Long> {
@Override
public Long call(Long v1, Long v2) throws Exception{
return v1 - v2;
}
}
}
将数据写入MySQL数据库
常见的场景是将从流媒体获取的数据输出到数据库中。
在与数据库的连接中,建议创建静态连接池。
(以避免由分布式处理导致的连接爆炸成本)
首先是连接池的代码。
package com;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.LinkedList;
public class ConnectionPool {
//リンクリストを作成
private static LinkedList<Connection> connectionQueue;
static {
try{
Class.forName("com.mysql.jdbc.Driver");
}catch (Exception e){
e.printStackTrace();
}
}
public synchronized static Connection getConnection(){
try {
if(connectionQueue == null){
connectionQueue = new LinkedList<>();
}
//connectionは五つまでQueueにpush
for (int i = 0; i < 5; i++){
Connection conn = DriverManager.getConnection("jdbc:mysql://127.0.0.1:8080",
"root","password");
connectionQueue.push(conn);
}
}catch (Exception e){
e.printStackTrace();
}
//Queueからconnectionを取得
return connectionQueue.poll();
}
public static void returnConnection(Connection conn){
connectionQueue.push(conn);
}
}
主要处理:每30秒计算一次IP访问次数并写入到MySQL。
package com;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
import java.sql.Connection;
import java.sql.Statement;
public class SparkStreamingJDBC {
public static void main(String[] args) throws InterruptedException {
SparkConf sparkConf = new SparkConf().setAppName("SparkCount").setMaster("local[2]");
JavaStreamingContext javaStreamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(30));
//check point
javaStreamingContext.checkpoint("/opt/data1");
JavaDStream<String> dStream = javaStreamingContext.socketTextStream("xiexiaofeng-virtual-machine",8080);
JavaPairDStream<String,Long> namedStream = dStream.mapToPair(s-> new Tuple2(s,1l));
//30秒内IPごとのアクセスをカウント
JavaPairDStream<String, Long> result = namedStream.reduceByKey((v1,v2)->v1+v2);
//結果print
result.print();
//MySQLに書き込み
result.foreachRDD(rdd -> {
rdd.foreachPartition(partitionOfRecords -> {
Connection connection = ConnectionPool.getConnection();
Tuple2<String, Long> ipcount;
while (partitionOfRecords.hasNext()) {
ipcount = partitionOfRecords.next();
String sql = "insert into dtable(ip,count)" + "values ('" + ipcount._1 + "'," + ipcount._2 + ")";
Statement statement = connection.createStatement();
statement.executeUpdate(sql);
}
});
});
try{
javaStreamingContext.start();
javaStreamingContext.awaitTermination();
}catch (Exception e){
e.printStackTrace();
}
}
}
最近天气炎热,感觉很懒,因为还没有安装MySQL,所以结果暂时省略。
其他
虽然与Kafka集成是常见的例子,但目前经常使用云服务来处理数据(如GCP的pubsub,AWS的MSK),在实现Dataproc时再讨论一下。
基础部分就到此为止,下一次将讨论应用部分,在Spark的机器学习中的实现。