将Spark结构化流转移到生产环境中

将Apache Spark的结构化流式处理带入生产环境 – Databricks博客的翻译说明。

这本书是节选译,不保证内容的准确性。有关准确内容,请参考原文。
请注意,这是一篇2017年的文章。

這是一系列關於如何使用Apache Spark進行複雜的流式分析的第五篇文章。

在Databricks中,我们在过去几个月里逐渐将生产流程转移到了结构化流处理。为了让客户能够在Databricks上快速建立生产流程,我们希望提供一种即可用的部署模型。

在产品应用中,自动化(云原生)方法对于监控、警报和故障恢复是必不可少的。本书不仅提供了可用于应对这些挑战的API的详细解释,还解释了Databricks如何简化生产环境中的结构化流处理。

指标和监控

在Apache Spark的结构化流处理中,我们提供了一个简单的程序API,用于获取有关当前正在执行的流的信息。为了获取与当前活动流有关的适当信息,存在两个主要命令可以对当前查询进行执行。一个是用于获取当前状态的命令,另一个是用于获取查询的最新进展的命令。

状况

可能的汉语翻译:
也许第一个问题是“我的流正在执行什么处理?”。状态保留了与流的当前状态相关的信息,并且可以通过返回的对象来访问查询开头。例如,假设有一个简单的计数流,通过下面的查询定义提供IOT设备的计数。

query = streamingCountsDF \
    .writeStream \
    .format("memory") \
    .queryName("counts") \
    .outputMode("complete") \
    .start()

通过执行 query.status ,可以获取流的当前状态。这样可以详细了解流中目前发生的事情。

{
  "message" : "Getting offsets from FileStreamSource[dbfs:/databricks-datasets/structured-streaming/events]",
  "isDataAvailable" : true,
  "isTriggerActive" : true
}

最新的进展

查询的状态无疑非常重要,但能够参考查询的历史进展同样重要。通过使用进展元数据,我们可以回答诸如“以多快的速度处理元组?”或“从源头以多快的速度到达元组?”等问题。

让我们探索为什么选择这些指标,以及为什么理解它们的重要性。

输入速率和处理速率

批处理周期

在流媒体作业中的生产警报。

指标和监控是不错的工具,但为了能快速响应任何出现的问题,而不用整天守着流式作业,就需要一个强大的报警机制。在Databricks上,我们可以将结构化流作为生产管道来运行,从而使报警变得简单。

请注意指定用于触发PagerDuty警报的电子邮件地址。当作业失败时,将触发产品的警报(或指定级别的警报)。

自动化故障恢复

尽管警报很方便,但即使在最佳情况下,仍然需要人类来处理障碍,并且在最糟糕的情况下,处理本身可能是不可能的。为了真正将结构化流式处理过渡到生产环境,您可能希望在保持数据一致性并且没有数据丢失的情况下,尽快从障碍中自动恢复。Databricks可以无缝实现这一点。只需简单设置无法恢复的故障之前的重试次数,Databricks将尝试自动恢复您的流式处理作业。您还可以选择在每次失败时触发通知作为生产障碍。

可以同时获得两个世界中最好的东西。系统会不断尝试自我修复,并与员工和开发者保持沟通。

更新应用

您需要更新流媒体应用程序的两种常见情况会产生充分的理由。(例如,输出模式等)在许多情况下,如果不更改重要的业务逻辑,则可以简单地重新启动流媒体作业,并指定相同的检查点目录。更新后的新流媒体应用程序将从上次的检查点位置继续,并继续运行。

然而,如果要更改有状态的操作(如聚合和输出模式),将需要更多的更新操作。您需要指定一个新的检查点目录,并启动一个全新的流来完成此操作。幸运的是,在Databricks中,您可以轻松地启动另一个流,同时进行迁移到新流的操作。

高级警报和监测

还有其他数据分析也支持的行为监控技术。例如,可以使用Datadog、Apache Kafka、Coda Hale Metrics等系统来输出通知。这些高级技术可以用于外部监控和警报系统的实施。

我们将在以下示例中创建一个StreamingQueryListener,将所有查询的进度信息传送到Kafka。

class KafkaMetrics(servers: String) extends StreamingQueryListener {
  val kafkaProperties = new Properties()
  kafkaProperties.put("bootstrap.servers", servers)
  kafkaProperties.put("key.serializer", "kafkashaded.org.apache.kafka.common.serialization.StringSerializer")
  kafkaProperties.put("value.serializer", "kafkashaded.org.apache.kafka.common.serialization.StringSerializer")

  val producer = new KafkaProducer[String, String](kafkaProperties)

  def onQueryProgress(event: org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent): Unit = {
    producer.send(new ProducerRecord("streaming-metrics", event.progress.json))
  }
  def onQueryStarted(event: org.apache.spark.sql.streaming.StreamingQueryListener.QueryStartedEvent): Unit = {}
  def onQueryTerminated(event: org.apache.spark.sql.streaming.StreamingQueryListener.QueryTerminatedEvent): Unit = {}
}
翻译注:现在PySpark也支持流查询的监听器。

整理

在这篇文章中,我们通过使用Databricks来解释了从结构化流的原型到生产环境的转变有多简单。如果你想了解关于结构化流的其他方面,请查看我们的博客系列。

    • Sparkの構造化ストリーミング

 

    • Real-time Streaming ETL with Structured Streaming in Apache Spark 2.1

 

    • Working with Complex Data Formats with Structured Streaming in Apache Spark 2.1

 

    • Processing Data in Apache Kafka with Structured Streaming in Apache Spark 2.2

 

    Event-time Aggregation and Watermarking in Apache Spark’s Structured Streaming

你可以在Databricks的文档中学习如何使用流式操作,也可以注册进行免费试用。

Databricks 免费试用版

数据光辉免费试用

广告
将在 10 秒后关闭
bannerAds