如何在StructuredStreaming中反映JOIN静态Dataframe的更新
首先
在Structured Streaming中,我们正在通过Kafka接收流数据并与存储在HDFS上的表进行JOIN操作。我们使用每天一次的批处理来更新HDFS上的表。
由于在这种情况下将表格更新反映到流结果中感到困难,所以我们分享这个经验。
首先先声明一下,这次的帖子并没有百分之百的自信,所以请自行负责参考。相反,如果有更好的方法,请指出来。
然而,目前为止,我们能够正确处理这个问题。
借鉴的资讯
Spark的邮件列表由databricks的人回答问题,所以我相信没有问题。但是,因为是2017年3月的信息,可能有点过时。
在那边讨论了一下,他们正在使用StructuredStreaming将Kafka的流数据和S3的数据进行JOIN处理,但S3每周更新一次,要如何将其反映出来。最后得出的结论是需要按照以下步骤进行操作,换句话说,就是要重新开始做整个处理的过程。
-
- static DataFrameを再作成する(例ではS3)
-
- streaming DataFrameを再作成する(例ではKafka)
- queryをrestart
如果可以容忍10秒的延迟,而无需整个应用程序重启的话,那就没有问题。
以下是对原文的中文翻译:关于如何重启查询,你可以参考在GitHub上提供的一个示例代码,该代码在”How to Shutdown a Spark Streaming Job Gracefully”一文中介绍。虽然这篇文章是在2017年2月发布的,有点旧了,但由于是databricks的人发布的,所以应该是可信的。
只不过,由于使用的是Spark Streaming,所以需要将其转换为Structured Streaming的API。
我们通过以下两种方法进行了实现,并且都进行了实际的运行确认。
定期重新启动 de
我的编程语言是Python。
def launch():
kafkaDataFrame = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers","......") \
.option("subscribe", "....") \
.load()
thresholdDataFrame = spark.read.format("parquet") \
.load("hdfs://....")
# persistする
thresholdDataFrame.persist()
# いろいろ処理
# いろいろ処理後、JOIN
withThreshold = kafkaDataFrame \
.join(
thresholdDataFrame,
(hour(col("window.start")) == thresholdDataFrame.hour)
&
....
) \
.select(...)
kafkaQuery = withThreshold \
.selectExpr("to_json(struct(*)) AS value") \
.writeStream \
.outputMode("append") \
.format("kafka") \
.......
.start()
isStopped = False
# 半日に一度restartする
checkIntervalSeconds = 12 * 60 * 60
print("calling awaitTermination")
isStopped = kafkaQuery.awaitTermination(checkIntervalSeconds)
if isStopped:
print("confirmed! The streaming context is stopped. Exiting application...")
else:
print("Streaming App is still running. Timeout...")
if not isStopped:
print("stopping kafkaQuery right now")
kafkaQuery.stop()
print("kafkaQuery is stopped!!!!!!!")
print("restarting query!!!!!!!")
launch()
这里只是有一些不显眼的迷人之处,我花了4、5个小时。
一旦理解了,非常简单,awaitTermination的Scala API需要指定毫秒,而Python的API则需要指定秒。
我只看过Scala的文档,所以没有注意到这一点,我试图将其设置为1分钟作为测试,但没有任何返回结果,所以我放弃了,并且无奈地想到了下一章中引发异常的方法。
而且,老实说,在写这篇文章的过程中,我注意到了上面所提到的差异笑出声来。
在更新时引发异常,并在catch中重新启动。
def launch():
kafkaDataFrame = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers","......") \
.option("subscribe", "....") \
.load()
thresholdDataFrame = spark.read.format("parquet") \
.load("hdfs://....")
# persistしない!!!!
# thresholdDataFrame.persist()
# いろいろ処理
# いろいろ処理後、JOIN
# ...
# ...
try:
kafkaQuery.awaitTermination()
except StreamingQueryException as e:
# おそらく不要だが念のため
kafkaQuery.stop()
# バッチ処理によってthresholdDataFrameが更新された場合リスタート
if re.search(r"java\.io\.FileNotFoundException.*part.*parquet", traceback.format_exc()):
print("===================================================")
print("detected static dataframe was updated!")
print("restarting query")
print("===================================================")
launch()
这是一个有点棘手的方法。
如果thresholdDataFrame引用的HDFS文件使用mode(“overwrite”)指定了路径来进行覆盖操作,那么在更新时,如果没有调用persist方法来对已加载的dataframe进行持久化,将会发生以下异常。
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 22.0 failed 1 times, most recent failure: Lost task 1.0 in stage 22.0 (TID 1440, localhost, executor driver): java.io.FileNotFoundException: File does not exist: hdfs://server/path/to/part-00034-afcdb8cd-0d97-4a47-81fa-99540fdc6f0c-c000.snappy.parquet
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
這個例外不僅僅適用於StructuredStreaming。
df=spark.read.parquet("hdfs://server/path/to")
# 他のターミナルでhdfs://server/path/toを上書きする
df.show()
即使进行类似处理,也会发生相同的异常。这可能是因为RDD处理中的文件读取被延迟了,但实际上在spark.read的时候,它已经将文件路径等元数据作为缓存信息保留了下来。
如果想要避免这个例外情况,只需要在写入时使用mode(“append”),但这样做文件所表示的意义也会发生变化。
另外,如果调用df.persist,它会将实际数据初始化并持久化,这样就不会发生异常。
在这里,我们会逆向利用这个特性,有意诱发并捕获异常,并在仅当发生了由于HDFS文件更新而导致的异常时尝试重新启动。
哪种更好?
尽管后一种方法可以立即在批量更新时进行反映,但从设计角度来看并不可取,不能使用persist,基本上选择前一种方法就好了吧。