使用传感器数据的Spark流应用程序
這是關於這個筆記本畫廊的傳感器數據流應用程序的操作步驟。
笔记本的翻译版在这里。
创建假感测数据
生成由100个文件组成的100个记录的数据。
%scala
// --- データを生成するパスを設定・適宜変更してください ---
val path = "/tmp/takaakiyayoidatabrickscom/Streaming/sdevices/"
dbutils.fs.mkdirs(path)
val numFiles = 100
val numDataPerFile = 100
import scala.util.Random
val deviceTypes = Seq("SensorTypeA", "SensorTypeB", "SensorTypeC", "SensorTypeD")
val startTime = System.currentTimeMillis
dbutils.fs.rm(path, true)
(1 to numFiles).par.foreach { fileId =>
val file = s"$path/file-$fileId.json"
val data = (1 to numDataPerFile).map { x =>
val timestamp = new java.sql.Timestamp(startTime + (fileId * 60000) + (Random.nextInt() % 10000))
val deviceId = Random.nextInt(100)
val deviceType = deviceTypes(Random.nextInt(deviceTypes.size))
val signalStrength = math.abs(Random.nextDouble % 100)
s"""{"timestamp":"$timestamp","deviceId":$deviceId,"deviceType":"$deviceType","signalStrength":$signalStrength}"""
}.mkString("\n")
dbutils.fs.put(file, data)
}
dbutils.fs.head(dbutils.fs.ls(s"$path/file-1.json").head.path)
流媒体传感器数据处理
在构建端到端的连续应用程序的目标中,结构化流是一个强大的功能。在高级别上,它提供以下功能:
-
- 所有的数据记录都被保存在前缀(分区)中,并且按顺序进行处理和计数,所以输出表始终保持一致性。
容错性包括与输出同步的交互,并通过结构化流处理整体。
具有处理延迟数据和不按顺序的数据能力。

1- 酱料
我们可以将输入数据源视为“输入表”。流中到达的所有数据项都会被视为输入表中的新行添加进去。
2-连续处理和查询
接下来,开发者会定义一个查询,将该源或输入表视为静态表,以计算将被写入输出 sink 的最终结果表。Spark 会自动将这种批处理查询转换为流处理执行计划,这称为增量更新:Spark 会确定需要维护什么样的状态来更新结果,每当记录到达时。最后,开发者会指定触发器以控制何时更新结果。每次触发器被执行时,Spark 会检查新数据(输入表的新行),并以增量方式更新结果。

3-洗碗.
这个模型的最后部分是输出模式。每当结果表更新时,开发人员希望能够将更改写入外部系统,如S3、HDFS和数据库。通常情况下,希望以增量方式进行输出写入。鉴于此,结构化流处理提供了三种输出模式。
Append: 最後のトリガー以降に結果テーブルに追加された新規行のみが、外部ストレージに書き込まれます。
Complete: 集計のようにアップデートされた結果テーブル全体が外部ストレージに書き込まれます。
Update: 最後のトリガー以降に結果テーブル更新された行のみが、外部ストレージ上で変更されます。

使用PySpark的结构化流API来创建连续处理应用程序的示例。
设置输出、检查点和不良记录的文件路径。
# Cmd3の4行目のパスから /sdevices/ を除外してください
base_path = "/tmp/takaakiyayoidatabrickscom/Streaming"
output_path = f"{base_path}/out/iot-stream/"
checkpoint_path = f"{base_path}/out/iot-stream-checkpoint"
#
# チェックポイントパスの作成
#
dbutils.fs.rm(checkpoint_path,True) # チェックポイントの上書き
dbutils.fs.mkdirs(checkpoint_path)
#
#
bad_records_path = f"{base_path}/badRecordsPath/streaming-sensor/"
dbutils.fs.rm(bad_records_path, True) # ディレクトリを空に
dbutils.fs.mkdirs(bad_records_path)
传感器的数据是怎样的呢?
sensor_path = f"{base_path}/sdevices/"
sensor_file_name= sensor_path + "file-1.json"
dbutils.fs.head(sensor_file_name, 233)

定义输入流和输出流的架构
为了性能考虑,良好的最佳实践是在Spark中定义模式而不是推断模式。如果没有模式,则Spark会启动几个作业:一个用于读取头文件的作业,一个用于读取部分分区以验证数据匹配模式的作业。
如果出现问题,可以立即生成错误,并且当存在缺失值或数据类型不匹配时,可以设置一个选项来决定是否接受,或用NaN或null进行替换。
from pyspark.sql.functions import *
from pyspark.sql.types import *
# オリジナルの入力スキーマ
jsonSchema = (
StructType()
.add("timestamp", TimestampType()) # ソースのイベント時間
.add("deviceId", LongType())
.add("deviceType", StringType())
.add("signalStrength", DoubleType())
)
# いくつかのETL(変換およびカラムの追加)を行うのでカラムを追加してスキーマを変更します。
# この変換データは、処理やレポート生成に使用できるようにSQLテーブルを作成元としてのParquetファイルに格納されます。
parquetSchema = (
StructType()
.add("timestamp", TimestampType()) # ソースのイベント時間
.add("deviceId", LongType())
.add("deviceType", StringType())
.add("signalStrength", DoubleType())
.add("INPUT_FILE_NAME", StringType()) # このデータアイテムを読み込んだファイル名
.add("PROCESSED_TIME", TimestampType())) # 処理中のエグゼキューターの時間
从对象存储源读取流
在这种情况下,我们可以通过一次性从文件中读取数据来模拟Kafka实时流。但是,我们也可以将其作为Apache Kafka的主题。
由于教程需要,我们故意将处理速度放慢了。
inputDF = ( spark
.readStream
.schema(jsonSchema)
.option("maxFilesPerTrigger", 1) # チュートリアルのために処理を遅くしています
.option("badRecordsPath", bad_records_path) # いかなる不正レコードはこちらに格納されます
.json(sensor_path) # ソース
.withColumn("INPUT_FILE_NAME", input_file_name()) # ファイルパスを保持
.withColumn("PROCESSED_TIME", current_timestamp()) # 処理時刻のタイムスタンプを追加
.withWatermark("PROCESSED_TIME", "1 minute") # オプション: 順序が遅れたデータに対するウィンドウ
)
向Parquet文件同步写入流。
query = (inputDF
.writeStream
.format("parquet") # 後段処理あるいは必要に応じてバッチクエリーのために保存を行うシンク
.option("path", output_path)
.option("checkpointLocation", checkpoint_path) # 障害復旧のためのチェックポイントの追加
.outputMode("append")
.queryName("devices") # オプションとして、クエリーを実行する際に指定するクエリー名を指定
.trigger(processingTime='5 seconds')
.start()
)

从输入流创建一个临时表,以便能够快速执行SQL查询。
inputDF.createOrReplaceTempView("parquet_sensors")
对由输入流创建的临时表执行查询
%sql select * from parquet_sensors where deviceType = 'SensorTypeD' or deviceType = 'SensorTypeA'

对保存在输入流中的Parquet文件进行附加处理,并执行查询。
spark.conf.set("spark.sql.shuffle.partitions", "1") # 優れたクエリー性能のためにシャッフルサイズを小さく維持
devices = (spark.readStream
.schema(parquetSchema)
.format("parquet")
.option("maxFilesPerTrigger", 1) # デモのために遅くしています
.load(output_path)
.withWatermark("PROCESSED_TIME", "1 minute") # 順序を守らないデータに対するウィンドウ
)
# より複雑な集計クエリーを行うために一時テーブルを作成
devices.createOrReplaceTempView("sensors")
即使降低了质量,我们还是会继续执行流查询。
哪个文件被处理了?
display(
devices.
select("INPUT_FILE_NAME", "PROCESSED_TIME")
.groupBy("INPUT_FILE_NAME", "PROCESSED_TIME")
.count()
.orderBy("PROCESSED_TIME", ascending=False)
)

有多少数据经过了?
%sql select count(*) from sensors

每种传感器类型的强度最小值、最大值和平均值分别如何?
在Spark SQL中使用min()、max()和avg()函数。
%sql
select count(*), deviceType, min(signalStrength), max(signalStrength), avg(signalStrength)
from sensors
group by deviceType
order by deviceType asc

让我们创建一个流来汇总设备和每5秒窗口的信号计数。
例如,对于尺寸为5的时、分的滚动窗口,如下所示。
[(00:00 – 00:05), (00:05 – 00:10), (00:10 – 00:15)] 可提供一种选择:
该活动将属于这些滚动窗口中的任意一个。
(devices
.groupBy(
window("timestamp", "5 seconds"),
"deviceId"
)
.count()
.createOrReplaceTempView("sensor_counts")) # データフレームを用いて一時ビューを作成
在这5秒的时间窗口内,哪个设备经历了信号的丢失?
%sql select * from sensor_counts where count < 5 order by window.start desc

可能存在着传输信号未发送的感应器警报可能要发送警报。
让我们从临时表sensor_counts中创建一个数据帧。
lost_sensor_signals = (spark.table("sensor_counts")
.filter(col("count") < 5)
.select("window.start", "window.end", "deviceId", "count")
)
# データフレームの表示
display(lost_sensor_signals)

为了将工人的日志写入,利用foreach机制。
这个可以用于监控的目的。另一个工作可以扫描日志以获得警报,并将其发布到Kafka的主题中,或者将其发布到Ganglia。如果有可用的Kafka服务器或可以通过REST API使用的Ganglia服务,那么尝试一下会是一个不错的练习。
def processRow(row):
# 今時点ではログファイルに書き込みを行いますが、このロジックは容易にKafkaのトピックや、GangliaやPagerDutyのようなモニタリング、ページングサービスにアラートを発行するように拡張することができます
print("ALERT from Sensors: Between {} and {}, device {} reported only {} times".format(row.start, row.end, row.deviceId, row[3]))
(lost_sensor_signals
.writeStream
.outputMode("complete") # モニタリングのためにKafkaの"alerts"トピックにすることもできます
.foreach(processRow)
.start()
)
这是一个生成结果的示例。

数据清理
获取所有正在运行的查询
sqm = spark.streams
[q.name for q in sqm.active]

停止流媒体
[q.stop() for q in sqm.active]
Databricks 免费试用
Databricks 免费试用