使用传感器数据的Spark流应用程序

這是關於這個筆記本畫廊的傳感器數據流應用程序的操作步驟。

 

笔记本的翻译版在这里。

 

创建假感测数据

生成由100个文件组成的100个记录的数据。

%scala

// --- データを生成するパスを設定・適宜変更してください ---
val path = "/tmp/takaakiyayoidatabrickscom/Streaming/sdevices/"
dbutils.fs.mkdirs(path)

val numFiles = 100
val numDataPerFile = 100

import scala.util.Random

val deviceTypes = Seq("SensorTypeA", "SensorTypeB", "SensorTypeC", "SensorTypeD")
val startTime = System.currentTimeMillis
dbutils.fs.rm(path, true)

(1 to numFiles).par.foreach { fileId =>
  val file = s"$path/file-$fileId.json"
  val data = (1 to numDataPerFile).map { x => 
    val timestamp = new java.sql.Timestamp(startTime + (fileId * 60000) + (Random.nextInt() % 10000))
    val deviceId = Random.nextInt(100)
    val deviceType = deviceTypes(Random.nextInt(deviceTypes.size))
    val signalStrength = math.abs(Random.nextDouble % 100)
    s"""{"timestamp":"$timestamp","deviceId":$deviceId,"deviceType":"$deviceType","signalStrength":$signalStrength}"""
  }.mkString("\n")
  dbutils.fs.put(file, data)
}
dbutils.fs.head(dbutils.fs.ls(s"$path/file-1.json").head.path)

流媒体传感器数据处理

在构建端到端的连续应用程序的目标中,结构化流是一个强大的功能。在高级别上,它提供以下功能:

    1. 所有的数据记录都被保存在前缀(分区)中,并且按顺序进行处理和计数,所以输出表始终保持一致性。

容错性包括与输出同步的交互,并通过结构化流处理整体。

具有处理延迟数据和不按顺序的数据能力。

1- 酱料

我们可以将输入数据源视为“输入表”。流中到达的所有数据项都会被视为输入表中的新行添加进去。

2-连续处理和查询

接下来,开发者会定义一个查询,将该源或输入表视为静态表,以计算将被写入输出 sink 的最终结果表。Spark 会自动将这种批处理查询转换为流处理执行计划,这称为增量更新:Spark 会确定需要维护什么样的状态来更新结果,每当记录到达时。最后,开发者会指定触发器以控制何时更新结果。每次触发器被执行时,Spark 会检查新数据(输入表的新行),并以增量方式更新结果。

3-洗碗.

这个模型的最后部分是输出模式。每当结果表更新时,开发人员希望能够将更改写入外部系统,如S3、HDFS和数据库。通常情况下,希望以增量方式进行输出写入。鉴于此,结构化流处理提供了三种输出模式。

Append: 最後のトリガー以降に結果テーブルに追加された新規行のみが、外部ストレージに書き込まれます。

Complete: 集計のようにアップデートされた結果テーブル全体が外部ストレージに書き込まれます。

Update: 最後のトリガー以降に結果テーブル更新された行のみが、外部ストレージ上で変更されます。

undefined

使用PySpark的结构化流API来创建连续处理应用程序的示例。

设置输出、检查点和不良记录的文件路径。

# Cmd3の4行目のパスから /sdevices/ を除外してください
base_path = "/tmp/takaakiyayoidatabrickscom/Streaming"

output_path = f"{base_path}/out/iot-stream/"
checkpoint_path = f"{base_path}/out/iot-stream-checkpoint"
#
# チェックポイントパスの作成
#
dbutils.fs.rm(checkpoint_path,True) # チェックポイントの上書き
dbutils.fs.mkdirs(checkpoint_path)
#
#
bad_records_path = f"{base_path}/badRecordsPath/streaming-sensor/"
dbutils.fs.rm(bad_records_path, True) # ディレクトリを空に
dbutils.fs.mkdirs(bad_records_path)

传感器的数据是怎样的呢?

sensor_path = f"{base_path}/sdevices/"
sensor_file_name= sensor_path + "file-1.json"
dbutils.fs.head(sensor_file_name, 233)
Screen Shot 2022-08-08 at 13.25.56.png

定义输入流和输出流的架构

为了性能考虑,良好的最佳实践是在Spark中定义模式而不是推断模式。如果没有模式,则Spark会启动几个作业:一个用于读取头文件的作业,一个用于读取部分分区以验证数据匹配模式的作业。

如果出现问题,可以立即生成错误,并且当存在缺失值或数据类型不匹配时,可以设置一个选项来决定是否接受,或用NaN或null进行替换。

from pyspark.sql.functions import *
from pyspark.sql.types import *

# オリジナルの入力スキーマ
jsonSchema = (
  StructType()
  .add("timestamp", TimestampType()) # ソースのイベント時間
  .add("deviceId", LongType())
  .add("deviceType", StringType())
  .add("signalStrength", DoubleType())
)
# いくつかのETL(変換およびカラムの追加)を行うのでカラムを追加してスキーマを変更します。
# この変換データは、処理やレポート生成に使用できるようにSQLテーブルを作成元としてのParquetファイルに格納されます。
parquetSchema = (
  StructType()
  .add("timestamp", TimestampType()) # ソースのイベント時間
  .add("deviceId", LongType())
  .add("deviceType", StringType())
  .add("signalStrength", DoubleType())
  .add("INPUT_FILE_NAME", StringType()) # このデータアイテムを読み込んだファイル名
  .add("PROCESSED_TIME", TimestampType())) # 処理中のエグゼキューターの時間

从对象存储源读取流

在这种情况下,我们可以通过一次性从文件中读取数据来模拟Kafka实时流。但是,我们也可以将其作为Apache Kafka的主题。

请留意:
由于教程需要,我们故意将处理速度放慢了。
inputDF = ( spark 
          .readStream 
          .schema(jsonSchema) 
          .option("maxFilesPerTrigger", 1)  # チュートリアルのために処理を遅くしています
          .option("badRecordsPath", bad_records_path) # いかなる不正レコードはこちらに格納されます
          .json(sensor_path) # ソース
          .withColumn("INPUT_FILE_NAME", input_file_name()) # ファイルパスを保持
          .withColumn("PROCESSED_TIME", current_timestamp()) # 処理時刻のタイムスタンプを追加
          .withWatermark("PROCESSED_TIME", "1 minute") # オプション: 順序が遅れたデータに対するウィンドウ
         )

向Parquet文件同步写入流。

query = (inputDF
         .writeStream
         .format("parquet") # 後段処理あるいは必要に応じてバッチクエリーのために保存を行うシンク
         .option("path", output_path)
         .option("checkpointLocation", checkpoint_path) # 障害復旧のためのチェックポイントの追加
         .outputMode("append")
         .queryName("devices") # オプションとして、クエリーを実行する際に指定するクエリー名を指定
         .trigger(processingTime='5 seconds')
         .start() 
        )
Screen Shot 2022-08-08 at 13.28.55.png

从输入流创建一个临时表,以便能够快速执行SQL查询。

inputDF.createOrReplaceTempView("parquet_sensors")

对由输入流创建的临时表执行查询

%sql select * from parquet_sensors where deviceType = 'SensorTypeD' or deviceType = 'SensorTypeA'
Screen Shot 2022-08-08 at 13.30.05.png

对保存在输入流中的Parquet文件进行附加处理,并执行查询。

spark.conf.set("spark.sql.shuffle.partitions", "1") # 優れたクエリー性能のためにシャッフルサイズを小さく維持
devices = (spark.readStream
           .schema(parquetSchema)
           .format("parquet")
           .option("maxFilesPerTrigger", 1) # デモのために遅くしています
           .load(output_path)
           .withWatermark("PROCESSED_TIME", "1 minute") # 順序を守らないデータに対するウィンドウ
          )

 # より複雑な集計クエリーを行うために一時テーブルを作成
devices.createOrReplaceTempView("sensors")

即使降低了质量,我们还是会继续执行流查询。

哪个文件被处理了?

display(
  devices.
  select("INPUT_FILE_NAME", "PROCESSED_TIME")
  .groupBy("INPUT_FILE_NAME", "PROCESSED_TIME")
  .count()
  .orderBy("PROCESSED_TIME", ascending=False)
)
Screen Shot 2022-08-08 at 13.31.38.png

有多少数据经过了?

%sql select count(*) from sensors
Screen Shot 2022-08-08 at 13.32.14.png

每种传感器类型的强度最小值、最大值和平均值分别如何?

在Spark SQL中使用min()、max()和avg()函数。

请注意,在Python笔记本中,您可以使用%sql魔术命令来使用SQL。
%sql 

select count(*), deviceType, min(signalStrength), max(signalStrength), avg(signalStrength) 
  from sensors 
    group by deviceType 
    order by deviceType asc
Screen Shot 2022-08-08 at 13.33.22.png

让我们创建一个流来汇总设备和每5秒窗口的信号计数。

请注意,这是一个弹性窗口,而不是滑动窗口。大小为5秒。

例如,对于尺寸为5的时、分的滚动窗口,如下所示。

[(00:00 – 00:05), (00:05 – 00:10), (00:10 – 00:15)] 可提供一种选择:

该活动将属于这些滚动窗口中的任意一个。

(devices
 .groupBy(
   window("timestamp", "5 seconds"),
   "deviceId"
 )
 .count()
 .createOrReplaceTempView("sensor_counts")) # データフレームを用いて一時ビューを作成

在这5秒的时间窗口内,哪个设备经历了信号的丢失?

%sql select * from sensor_counts where count < 5 order by window.start desc
Screen Shot 2022-08-08 at 13.34.39.png

可能存在着传输信号未发送的感应器警报可能要发送警报。

让我们从临时表sensor_counts中创建一个数据帧。

lost_sensor_signals = (spark.table("sensor_counts")
         .filter(col("count") < 5)
         .select("window.start", "window.end", "deviceId", "count")
         )

# データフレームの表示
display(lost_sensor_signals)
Screen Shot 2022-08-08 at 13.35.14.png
为了将工人的日志写入,利用foreach机制。

这个可以用于监控的目的。另一个工作可以扫描日志以获得警报,并将其发布到Kafka的主题中,或者将其发布到Ganglia。如果有可用的Kafka服务器或可以通过REST API使用的Ganglia服务,那么尝试一下会是一个不错的练习。

def processRow(row):
  # 今時点ではログファイルに書き込みを行いますが、このロジックは容易にKafkaのトピックや、GangliaやPagerDutyのようなモニタリング、ページングサービスにアラートを発行するように拡張することができます
  print("ALERT from Sensors: Between {} and {}, device {} reported only {} times".format(row.start, row.end, row.deviceId, row[3]))
  
(lost_sensor_signals
 .writeStream
 .outputMode("complete") # モニタリングのためにKafkaの"alerts"トピックにすることもできます
 .foreach(processRow)
 .start()
)

这是一个生成结果的示例。

数据清理

获取所有正在运行的查询

sqm = spark.streams
[q.name for q in sqm.active]
Screen Shot 2022-08-08 at 13.36.28.png

停止流媒体

[q.stop() for q in sqm.active]

Databricks 免费试用

Databricks 免费试用

广告
将在 10 秒后关闭
bannerAds