将Spark结构化流转移到生产环境中
将Apache Spark的结构化流式处理带入生产环境 – Databricks博客的翻译说明。
這是一系列關於如何使用Apache Spark進行複雜的流式分析的第五篇文章。
在Databricks中,我们在过去几个月里逐渐将生产流程转移到了结构化流处理。为了让客户能够在Databricks上快速建立生产流程,我们希望提供一种即可用的部署模型。
在产品应用中,自动化(云原生)方法对于监控、警报和故障恢复是必不可少的。本书不仅提供了可用于应对这些挑战的API的详细解释,还解释了Databricks如何简化生产环境中的结构化流处理。
指标和监控
在Apache Spark的结构化流处理中,我们提供了一个简单的程序API,用于获取有关当前正在执行的流的信息。为了获取与当前活动流有关的适当信息,存在两个主要命令可以对当前查询进行执行。一个是用于获取当前状态的命令,另一个是用于获取查询的最新进展的命令。
状况
可能的汉语翻译:
也许第一个问题是“我的流正在执行什么处理?”。状态保留了与流的当前状态相关的信息,并且可以通过返回的对象来访问查询开头。例如,假设有一个简单的计数流,通过下面的查询定义提供IOT设备的计数。
query = streamingCountsDF \
.writeStream \
.format("memory") \
.queryName("counts") \
.outputMode("complete") \
.start()
通过执行 query.status ,可以获取流的当前状态。这样可以详细了解流中目前发生的事情。
{
"message" : "Getting offsets from FileStreamSource[dbfs:/databricks-datasets/structured-streaming/events]",
"isDataAvailable" : true,
"isTriggerActive" : true
}

最新的进展
查询的状态无疑非常重要,但能够参考查询的历史进展同样重要。通过使用进展元数据,我们可以回答诸如“以多快的速度处理元组?”或“从源头以多快的速度到达元组?”等问题。

让我们探索为什么选择这些指标,以及为什么理解它们的重要性。
输入速率和处理速率

批处理周期

在流媒体作业中的生产警报。
指标和监控是不错的工具,但为了能快速响应任何出现的问题,而不用整天守着流式作业,就需要一个强大的报警机制。在Databricks上,我们可以将结构化流作为生产管道来运行,从而使报警变得简单。

请注意指定用于触发PagerDuty警报的电子邮件地址。当作业失败时,将触发产品的警报(或指定级别的警报)。
自动化故障恢复
尽管警报很方便,但即使在最佳情况下,仍然需要人类来处理障碍,并且在最糟糕的情况下,处理本身可能是不可能的。为了真正将结构化流式处理过渡到生产环境,您可能希望在保持数据一致性并且没有数据丢失的情况下,尽快从障碍中自动恢复。Databricks可以无缝实现这一点。只需简单设置无法恢复的故障之前的重试次数,Databricks将尝试自动恢复您的流式处理作业。您还可以选择在每次失败时触发通知作为生产障碍。
可以同时获得两个世界中最好的东西。系统会不断尝试自我修复,并与员工和开发者保持沟通。
更新应用
您需要更新流媒体应用程序的两种常见情况会产生充分的理由。(例如,输出模式等)在许多情况下,如果不更改重要的业务逻辑,则可以简单地重新启动流媒体作业,并指定相同的检查点目录。更新后的新流媒体应用程序将从上次的检查点位置继续,并继续运行。
然而,如果要更改有状态的操作(如聚合和输出模式),将需要更多的更新操作。您需要指定一个新的检查点目录,并启动一个全新的流来完成此操作。幸运的是,在Databricks中,您可以轻松地启动另一个流,同时进行迁移到新流的操作。
高级警报和监测
还有其他数据分析也支持的行为监控技术。例如,可以使用Datadog、Apache Kafka、Coda Hale Metrics等系统来输出通知。这些高级技术可以用于外部监控和警报系统的实施。
我们将在以下示例中创建一个StreamingQueryListener,将所有查询的进度信息传送到Kafka。
class KafkaMetrics(servers: String) extends StreamingQueryListener {
val kafkaProperties = new Properties()
kafkaProperties.put("bootstrap.servers", servers)
kafkaProperties.put("key.serializer", "kafkashaded.org.apache.kafka.common.serialization.StringSerializer")
kafkaProperties.put("value.serializer", "kafkashaded.org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](kafkaProperties)
def onQueryProgress(event: org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent): Unit = {
producer.send(new ProducerRecord("streaming-metrics", event.progress.json))
}
def onQueryStarted(event: org.apache.spark.sql.streaming.StreamingQueryListener.QueryStartedEvent): Unit = {}
def onQueryTerminated(event: org.apache.spark.sql.streaming.StreamingQueryListener.QueryTerminatedEvent): Unit = {}
}
整理
在这篇文章中,我们通过使用Databricks来解释了从结构化流的原型到生产环境的转变有多简单。如果你想了解关于结构化流的其他方面,请查看我们的博客系列。
-
- Sparkの構造化ストリーミング
-
- Real-time Streaming ETL with Structured Streaming in Apache Spark 2.1
-
- Working with Complex Data Formats with Structured Streaming in Apache Spark 2.1
-
- Processing Data in Apache Kafka with Structured Streaming in Apache Spark 2.2
- Event-time Aggregation and Watermarking in Apache Spark’s Structured Streaming
你可以在Databricks的文档中学习如何使用流式操作,也可以注册进行免费试用。
Databricks 免费试用版
数据光辉免费试用