【入门级别】ApacheSpark基础教程-SparkStreaming

上一次是在这里。
https://qiita.com/SHA_AKA/items/f57cbe11b208282103e3
在基础编辑的最后,我们将尝试实现SparkStreaming。
环境与上一次相同。

目录

    1. 更新pom.xml文件和下载ncat

 

    1. 无状态和有状态

 

    1. 第一段(无状态)

 

    1. 窗口处理(有状态)

 

    1. 写入MySQL

 

    其他

更新pom.xml文件和下载ncat软件

在pom.xml中添加以下的依赖关系

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.13</artifactId>
    <version>3.2.1</version>
    <scope>provided</scope>
</dependency>

为了监视本地端口并获取数据,需要下载ncat。

sudo apt-get -y install ncat

安装完成后,打开一个新的终端,并执行以下命令。

nc -lk 8080

无状态和有状态

流媒体处理本质上是批处理。只是处理时间间隔非常短,所以外观上看起来是”连续”的。
Spark Streaming中有两种转换操作:无状态和有状态。
简单地说,
・无状态:仅针对本次获取的数据进行操作,与前后的数据无关。
・有状态:不仅涉及本次数据,还涉及前后的数据或其计算结果。

首先,在初始代码中提供无状态的示例。

最初的代码(无状态)

实例操作:提取包含错误的记录。

package com;

import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

public class SparkStreaming {
    public static void main(String[] args) throws InterruptedException {
        /*
        conf設定、local[2]とは2 thread
        JavaStreamingContext設定、Durations.seconds(5)とは処理間隔が5秒
         */
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("sparkStreamIng");
        JavaStreamingContext javaStreamingContext = new JavaStreamingContext(conf, Durations.seconds(5));

        /*
        InputDStream:IP:localhost,PORT:8080
        errorを含む行を抽出
         */
        JavaReceiverInputDStream<String> inputDStream = javaStreamingContext.socketTextStream("xiexiaofeng-virtual-machine",8080);
        JavaDStream<String> errorLine = inputDStream.filter(s -> s.contains("error"));

        //errorLineをプリント
        errorLine.print();

        /*
        監視開始
         */
        javaStreamingContext.start();
        javaStreamingContext.awaitTermination();
    }
}
error.png

窗口处理(有状态)

在stateful操作中,需要使用window函数来指定时间间隔。
– window函数:用于处理固定时间间隔的数据。
– window length:窗口的长度,即时间间隔。
– sliding interval:处理频率。
此外,在stateful操作中需要检查点。
代码示例:每30秒对每个IP地址进行访问次数计数,并每10秒执行一次。

package com;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import scala.Tuple2;

public class SparkStreamingCount {
    public static void main(String[] args) throws InterruptedException {
        SparkConf sparkConf = new SparkConf().setAppName("SparkCount").setMaster("local[2]");
        JavaStreamingContext javaStreamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(10));

        //check point
        javaStreamingContext.checkpoint("/opt/data1");

        JavaDStream<String> dStream = javaStreamingContext.socketTextStream("xiexiaofeng-virtual-machine",8080);
        JavaPairDStream<String,Long> namedStream = dStream.mapToPair(s-> new Tuple2(s,1l));
        /*
        window関数:一定時間間隔のデータを処理する
        window length:windowの長さ、つまり時間間隔、ここでは30秒
        sliding interval:処理の頻度、ここでは10秒
         */
        JavaPairDStream<String, Long> result = namedStream.reduceByKeyAndWindow(new Add(), new Minus(), Durations.seconds(30), Durations.seconds(10));
        //print
        result.print();
        //start
        javaStreamingContext.start();
        javaStreamingContext.awaitTermination();
    }
    static class Add implements Function2<Long,Long,Long> {
        @Override
        public Long call(Long v1, Long v2) throws Exception{
            return v1 + v2;
        }
    }

    static class Minus implements Function2<Long,Long,Long> {
        @Override
        public Long call(Long v1, Long v2) throws Exception{
            return v1 - v2;
        }
    }

}

将数据写入MySQL数据库

常见的场景是将从流媒体获取的数据输出到数据库中。
在与数据库的连接中,建议创建静态连接池。
(以避免由分布式处理导致的连接爆炸成本)
首先是连接池的代码。

package com;

import java.sql.Connection;
import java.sql.DriverManager;
import java.util.LinkedList;

public class ConnectionPool {
    //リンクリストを作成
    private static LinkedList<Connection> connectionQueue;

    static {
        try{
            Class.forName("com.mysql.jdbc.Driver");
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    public synchronized static Connection getConnection(){
        try {
            if(connectionQueue == null){
                connectionQueue = new LinkedList<>();
            }
            //connectionは五つまでQueueにpush
            for (int i = 0; i < 5; i++){
                Connection conn = DriverManager.getConnection("jdbc:mysql://127.0.0.1:8080",
                        "root","password");
                connectionQueue.push(conn);
            }
        }catch (Exception e){
            e.printStackTrace();
        }
        //Queueからconnectionを取得
        return connectionQueue.poll();
    }

    public static void returnConnection(Connection conn){
        connectionQueue.push(conn);
    }

}

主要处理:每30秒计算一次IP访问次数并写入到MySQL。

package com;

import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;

import java.sql.Connection;
import java.sql.Statement;

public class SparkStreamingJDBC {
    public static void main(String[] args) throws InterruptedException {
        SparkConf sparkConf = new SparkConf().setAppName("SparkCount").setMaster("local[2]");
        JavaStreamingContext javaStreamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(30));

        //check point
        javaStreamingContext.checkpoint("/opt/data1");

        JavaDStream<String> dStream = javaStreamingContext.socketTextStream("xiexiaofeng-virtual-machine",8080);
        JavaPairDStream<String,Long> namedStream = dStream.mapToPair(s-> new Tuple2(s,1l));
        //30秒内IPごとのアクセスをカウント
        JavaPairDStream<String, Long> result = namedStream.reduceByKey((v1,v2)->v1+v2);
        //結果print
        result.print();
        //MySQLに書き込み
        result.foreachRDD(rdd -> {
            rdd.foreachPartition(partitionOfRecords -> {
                Connection connection = ConnectionPool.getConnection();
                Tuple2<String, Long> ipcount;

                while (partitionOfRecords.hasNext()) {
                    ipcount = partitionOfRecords.next();
                    String sql = "insert into dtable(ip,count)" + "values ('" + ipcount._1 + "'," + ipcount._2 + ")";
                    Statement statement = connection.createStatement();
                    statement.executeUpdate(sql);
                }
            });
        });

        try{
            javaStreamingContext.start();
            javaStreamingContext.awaitTermination();
        }catch (Exception e){
            e.printStackTrace();
        }

    }
}

最近天气炎热,感觉很懒,因为还没有安装MySQL,所以结果暂时省略。

其他

虽然与Kafka集成是常见的例子,但目前经常使用云服务来处理数据(如GCP的pubsub,AWS的MSK),在实现Dataproc时再讨论一下。
基础部分就到此为止,下一次将讨论应用部分,在Spark的机器学习中的实现。

广告
将在 10 秒后关闭
bannerAds