Uplift如何使用Databricks Delta Live Tables来构建CDC和多路数据流?
如何通过Databricks Delta实时表提升 CDC 数据管道的规模 – Databricks博客
这篇文章由Uplift的Ruchira和Joydeep共同撰写,表达了对于Databricks Lakehouse平台的贡献和领导作用的感激之情。
Uplift是领先的“立即购买,以后付款”的解决方案提供商,旨在让人们从生活中获得更多,并能够在适当的时间进行购买。Uplift的灵活付款选项为购买者提供了简单且可靠的选择,既可以立即购买,又可以在后续期间进行支付。
Uplift的解决方案通过最高级别的安全性、隐私、数据管理和集成,优化了超过200个合作伙伴的支付流程。这样一来,客户可以在不感受到任何摩擦的情况下,通过在线、呼叫中心和面对面的方式享受购物体验。这个庞大的合作伙伴生态系统对我们的工程团队提出了数据工程和分析方面的挑战。由于数据是企业指数级增长的主要价值创造驱动因素,因此Uplift需要一个极度可扩展的解决方案来最小化基础设施和需要管理的”管理代码(janitor code)”。
通过整合数百个合作伙伴和数据来源,Uplift利用自身的核心数据管道来推动以下洞察和操作的实现。
ファンネルメトリクス – アプリケーション率、承認率、テイクアップ率、コンバージョン率、トランザクションの規模
ユーザーメトリクス – リピートユーザー率、トータルのアクティブユーザー、新規ユーザー、解約率、チャネル横断ショッピング
パートナーレポート – パートナーレベルでのファンネルメトリクス、収益メトリクス
ファンディング – 受給条件の評価指標、メトリクス、融資した資産に対するモニタリング
融資 – ロール率、過失のモニタリング、回復、クジレット/不正の承認ファンネル
カスタマーサポート – コールセンターの統計情報、キューのモニタリング、支払いポータルのアクティビティ
为了实现这一目标,Uplift利用Databricks的Lakehouse平台轻松地从Kafka和S3对象存储中导入数百个主题,并构建了一个强大的数据集成系统进行协调。每个数据源都被独立存储,但由应用工程团队自动检测和导入新的数据源,使每个数据源的数据能够独立演化,以供后续的分析团队使用。
在使用Lakehouse平台进行标准化之前,添加新数据源需要开发新的数据管道,因此在添加新数据源或团队之间的更改方面的沟通是手动的,可能会导致错误混入,并浪费时间。通过使用Delta Live Tables,他们的系统可以自动适应可扩展和变更,并减少了需要开发、管理和编排的笔记本数量(从100多个减少到两个管道),从而大大加快了获得洞察所需的时间。
在这个数据导入流水线中,提升(Uplift)有以下要求。
-
- 通过基于Delta Lake的技术,提供将100多个主题从Kafka/S3可扩展地导入到Lakehouse中的能力,使分析师能够以表格格式活用原始数据。
-
- 提供动态创建表格以适应潜在的新Kafka主题的灵活性,使新数据的发现和利用变得容易。
-
- 自动更新每个主题的模式,以适应来自Kafka的数据更改。
-
- 提供可以通过显式的表格规则来设置的后续层,以确保有效管理已经投入运营的表格,包括模式强制、数据质量期望、数据类型映射和默认值等。
-
- 具有处理所有显式设置的表格的SCD Type1的数据流水线。
- 能够与后续应用程序协同工作,以创建摘要统计信息和趋势聚合。
這些要求適用於稱為「多重化」設計模式的用例。當獨立的流集共享相同的源時,會使用多路徑處理。在這個例子中,我們從Kafka的訊息佇列和一系列的S3儲存桶中提取具有100個變更事件的生資料,並且並行解析它們到一個單一的Delta表。
请注意,多重化是一种复杂的流式设计模式,与传统的一对一源和目标流的典型模式不同。如果您认为需要多重化,但尚未实施,那么观看有关基本流媒体最佳实践以及涵盖了该设计模式在流式操作中的权衡的视频可能是一个好的起点。
让我们来审查利用Delta Lake的Medallion体系结构的这个用例中的两个通用解决方案。这将成为加强以下两种解决方案的基本框架。
多元化解决方案 (Duō ‘àn)
DatabricksにおけるSpark構造化ストリーミングはforeachBatchメソッドを用いて1対多のストリーミングを活用します。このソリューションはブロンズステージテーブルを読み込み、マイクロバッチの中で単一のストリームを複数のテーブルに分割します。
DatabricksのDelta Live Tables(DLT)は、並列で全てのストリームを作成し、管理するために用いられます。このプロセスでは、ブロンズテーブルのすべての一意のトピックを動的に識別し、トピックごとに明示的にコードを書いたりチェックポイントを管理することなしに、トピックごとに独立したストリームを生成します。
在接下来的章节中,假设大家已经对Spark结构化流与Delta Live Tables的基本概念有所了解。
在这个例子中,Delta Live Tables提供了一个声明式流水线,在高度灵活的托管架构中能够对所有表定义进行配置。DLT可以使用一个数据流水线来定义、流式处理和管理100个表,而且不会损失表级灵活性。例如,可以定期更新某个后续表,而其他表可以实时更新用于分析。所有这些可以在一个数据流水线中进行管理。
在深入研究Delta Live Tables(DLT)解决方案之前,让我们先探讨一下在Databricks中使用Spark Structured Streaming的现有解决方案设计。
解决方案1:通过Databricks Delta和Spark结构化流实现数据的多路复用。

在构建化流式任务中,流式数据从Kafka读取多个主题,并在foreachBatch语句中向多个表格输入一个流。以下的代码块展示了将一个流写入多个表格的示例。
df_bronze_stage_1 = spark.readStream.format(“json”).load()
def writeMultipleTables(microBatchDf, BatchId):
df_topic_1 = (microBatchDf
.filter(col("topic")== lit("topic_1"))
)
df_topic_2 = (microBatchDf
.filter(col("topic")== lit("topic_2"))
)
df_topic_3 = (microBatchDf
.filter(col("topic")== lit("topic_3"))
)
df_topic_4 = (microBatchDf
.filter(col("topic")== lit("topic_4"))
)
df_topic_5 = (microBatchDf
.filter(col("topic")== lit("topic_5"))
)
### Apply schemas
## Look up schema registry, check to see if the events in each event type are equal to the most recently registered schema, Register new schema
##### Write to sink location (in series within the microBatch)
df_topic_1.write.format("delta").mode("overwrite").option("path","/data/dlt_blog/bronze_topic_1").saveAsTable("bronze_topic_1")
df_topic_2.write.format("delta").option("mergeSchema", "true").option("path", "/data/dlt_blog/bronze_topic_2").mode("overwrite").saveAsTable("bronze_topic_2")
df_topic_3.write.format("delta").mode("overwrite").option("path", "/data/dlt_blog/bronze_topic_3").saveAsTable("bronze_topic_3")
df_topic_4.write.format("delta").mode("overwrite").option("path", "/data/dlt_blog/bronze_topic_4").saveAsTable("bronze_topic_4")
df_topic_5.write.format("delta").mode("overwrite").option("path", "/data/dlt_blog/bronze_topic_5").saveAsTable("bronze_topic_5")
return
### Using For each batch - microBatchMode
(df_bronze_stage_1 # This is a readStream data frame
.writeStream
.trigger(availableNow=True) # ProcessingTime='30 seconds'
.option("checkpointLocation", checkpoint_location)
.foreachBatch(writeMultipleTables)
.start()
)
在Spark構造化流數據處理方案中,有幾個關鍵的設計考慮事項。
要使用结构化流进行一对多的表格流式传输,需要使用foreachBatch函数,并且需要在每个微批处理函数中进行表格写入操作(请参考上面的示例)。这是一种非常强大的设计,但也有一些限制。
-
- 可扩展性:当表数量较少时,一对多的表写入很简单,但按照上述代码示例,默认情况下所有表都是串行写入的(Spark代码按顺序运行,下一步操作必须等待前一步完成)。因此,当表数量增加到100个时,无法实现扩展。这意味着每次添加表时作业的总执行时间会大大增加。
复杂性:写入处理是硬编码的,意味着没有简单的方法可以自动检测新主题并创建表。每当有新的数据源时,都需要进行代码发布。这是严重的时间浪费,使得流水线变得脆弱。虽然可能实现,但需要大量的开发工作量。
严格性:表可能需要以不同的周期进行更新。还可能根据不同的数据质量期望、分区和数据布局要求,需要不同的预处理逻辑。因此,需要为不同表组创建完全独立的作业。
效率:每个表的数据量可能不同,如果它们使用相同的流式集群,可能无法提高集群利用率。要实现这些流的负载平衡,需要更多的开发工作和更创造性的解决方案。
总体来说,这个解决方案运行得很好,但是通过使用单一的分布式账本技术管道,可以解决这些问题,并且进一步简化解决方案。
解决方案2:利用Databricks的Delta Live Tables(Python),进行数据复制和变更数据捕获(CDC)。
为了简化满足上述要求(自动检测新建表,在一个顶级上进行并行流处理,强制数据质量,每个表的模式演变,执行所有表的最终阶段CDC upsert),我们将使用Python进行Delta Live Tables的元编程模型,以便在每个阶段并行声明和创建所有表。

这个是由两个任务组成的一个作业来实现。
-
- 任务A:针对名为Bronze Stage 1的单个Delta表,从所有Kafka主题的原始数据中进行readStream操作。随后,任务A将为流检测到的每个单独主题创建一个视图。(还可以明确地存储解析每个主题的有效负载,并使用模式注册表来使用模式。该视图可以保存该模式注册表,也可以使用其他模式管理系统)。在这个示例中,我们简单地从每个主题的JSON有效负载动态推断出所有模式,并在Silver表中进行数据类型转换。
任务B:从Bronze Stage 1接收流的1个Delta Live Tables管道将使用在第一个任务中生成的视图作为配置,并使用元编程模型触发创建Bronze Stage 2表,针对视图中的所有主题。然后,同一个DLT管道将读取显式配置(在这种情况下为JSON配置),以注册“生产就绪”的表,使用更严格的数据质量期望和数据类型强制。在此阶段,管道将清洗所有Bronze Stage 2表,并使用APPLY CHANGES INTO方法将更新合并到最终的Silver Stage表。最后,从Silver Stage中汇总生成一个可以用于报告的分析表- Gold Stage。
使用Delta Live Tables进行复用和CDC的实施步骤。
以下是使用Delta Live Tables设置多路复用 + CDC的各个实现步骤。
-
- 从原始数据到Bronze Stage 1 – 从Kafka读取主题并将数据写入Bronze Stage 1 Delta表的代码示例。
创建主题/事件的唯一视图 – 从Bronze Stage 1创建视图。
从单个Bronze Stage 1分支到各个表 – 从视图创建Bronze Stage 2的代码示例(元编程)。
将Bronze Stage 2转换为Silver Stage – 使用Silver配置层和Silver表管理配置示例进行元编程模型演示的代码示例。
创建Gold汇总 – 使用Delta Live Tables的代码示例创建完整的Gold汇总表。
DLT管道的DAG – 测试和执行从Bronze Stage 1到Gold的DLT管道的代码示例。
DLT管道的配置 – 使用参数、集群定制和其他必要的设置更改来配置Delta Live Tables管道,以实现在生产环境中的部署。
创建多任务作业 – 将步骤1和步骤2-7(所有这些都是一个DLT管道)合并为单个Databricks作业,其中两个任务按顺序执行。
步骤1:从原始数据到青铜阶段1
startingOffsets = "earliest"
kafka = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafka_bootstrap_servers_plaintext)
.option("subscribe", topic )
.option("startingOffsets", startingOffsets)
.load()
)
read_stream = (kafka.select(col("key").cast("string").alias("topic"), col("value").alias("payload"))
)
(read_stream
.writeStream
.format("delta")
.mode("append")
.option("checkpointLocation", checkpoint_location)
.option("path", )
saveAsTable("PreBronzeAllTypes")
)
步骤2:创建一个特定主题/事件的独特视图。
%sql
CREATE VIEW IF NOT EXISTS dlt_types_config AS
SELECT DISTINCT topic, sub_topic -- Other things such as schema from a registry, or other helpful metadata from Kafka
FROM PreBronzeAllTypes;

步骤3:从一个铜阶段1分支到各个个体桌子
%python
bronze_tables = spark.read.table("cody_uplift_dlt_blog.dlt_types_config")
## Distinct list is already managed for us via the view definition
topic_list = [[i[0],i[1]] for i in bronze_tables.select(col('topic'), col('sub_topic')).coalesce(1).collect()]
print(topic_list)
import re
def generate_bronze_tables(topic, sub_topic):
topic_clean = re.sub("/", "_", re.sub("-", "_", topic))
sub_topic_clean = re.sub("/", "_", re.sub("-", "_", sub_topic))
@dlt.table(
name=f"bronze_{topic_clean}_{sub_topic_clean}",
comment=f"Bronze table for topic: {topic_clean}, sub_topic:{sub_topic_clean}"
)
def create_call_table():
## For now this is the beginning of the DAG in DLT
df = spark.readStream.table('cody_uplift_dlt_blog.PreBronzeAllTypes').filter((col("topic") == topic) & (col("sub_topic") == sub_topic))
## Pass readStream into any preprocessing functions that return a streaming data frame
df_flat = _flatten(df, topic, sub_topic)
return df_flat
for topic, sub_topic in topic_list:
#print(f”Build table for {topic} with event type {sub_topic}”)
generate_bronze_tables(topic, sub_topic)
Step 4: 將Bronze Stage 2提升至Silver Stage。

生成DLT函数的定义,以进行Bronze Stage 2的转换处理和表设置。
def generate_bronze_transformed_tables(source_table, trigger_interval, partition_cols, zorder_cols, column_rename_logic = '', drop_column_logic = ''):
@dlt.table(
name=f"bronze_transformed_{source_table}",
table_properties={
"quality": "bronze",
"pipelines.autoOptimize.managed": "true",
"pipelines.autoOptimize.zOrderCols": zorder_cols,
"pipelines.trigger.interval": trigger_interval
}
)
def transform_bronze_tables():
source_delta = dlt.read_stream(source_table)
transformed_delta = eval(f"source_delta{column_rename_logic}{drop_column_logic}")
return transformed_delta
定义一个函数,用于在Delta Live Tables中生成带有CDC的Silver表。
def generate_silver_tables(target_table, source_table, merge_keys, where_condition, trigger_interval, partition_cols, zorder_cols, expect_all_or_drop_dict, column_rename_logic = '', drop_column_logic = ''):
#### Define DLT Table this way if we want to map columns
@dlt.view(
name=f"silver_source_{source_table}")
@dlt.expect_all_or_drop(expect_all_or_drop_dict)
def build_source_view():
#
source_delta = dlt.read_stream(source_table)
transformed_delta = eval(f"source_delta{column_rename_logic}{column_rename_logic}")
return transformed_delta
#return dlt.read_stream(f"bronze_transformed_{source_table}")
### Create the target table definition
dlt.create_target_table(name=target_table,
comment= f"Clean, merged {target_table}",
#partition_cols=["topic"],
table_properties={
"quality": "silver",
"pipelines.autoOptimize.managed": "true",
"pipelines.autoOptimize.zOrderCols": zorder_cols,
"pipelines.trigger.interval": trigger_interval
}
)
## Do the merge
dlt.apply_changes(
target = target_table,
source = f"silver_source_{source_table}",
keys = merge_keys,
#where = where_condition,#f"{source}.Column) <> col({target}.Column)"
sequence_by = col("timestamp"),#primary key, auto-incrementing ID of any kind that can be used to identity order of events, or timestamp
ignore_null_updates = False
)
return
获取Silver表设置并传递给合并函数
for table, config in silver_tables_config.items():
##### Build Transformation Query Logic from a Config File #####
#Desired format for renamed columns
result_renamed_columns = []
for renamed_column, coalesced_columns in config.get('renamed_columns')[0].items():
renamed_col_result = []
for i in range( 0 , len(coalesced_columns)):
renamed_col_result.append(f"col('{coalesced_columns[i]}')")
result_renamed_columns.append(f".withColumn('{renamed_column}', coalesce({','.join(renamed_col_result)}))")
#Drop renamed columns
result_drop_renamed_columns = []
for renamed_column, dropped_column in config.get('renamed_columns')[0].items():
for item in dropped_column:
result_drop_renamed_columns.append(f".drop(col('{item}'))")
#Desired format for pk NULL check
where_conditions = []
for item in config.get('upk'):
where_conditions.append(f"{item} IS NOT NULL")
source_table = config.get("source_table_name")
upks = config.get("upk")
### Table Level Properties
trigger_interval = config.get("trigger_interval")
partition_cols = config.get("partition_columns")
zorder_cols = config.get("zorder_columns")
column_rename_logic = ''.join(result_renamed_columns)
drop_column_logic = ''.join(result_drop_renamed_columns)
expect_all_or_drop_dict = config.get("expect_all_or_drop")
print(f"""Target Table: {table} \n
Source Table: {source_table} \n
ON: {upks} \n Renamed Columns: {result_renamed_columns} \n
Dropping Replaced Columns: {renamed_col_result} \n
With the following WHERE conditions: {where_conditions}.\n
Column Rename Logic: {column_rename_logic} \n
Drop Column Logic: {drop_column_logic}\n\n""")
### Do CDC Separate from Transformations
generate_silver_tables(target_table=table,
source_table=config.get("source_table_name"),
trigger_interval = trigger_interval,
partition_cols = partition_cols,
zorder_cols = zorder_cols,
expect_all_or_drop_dict = expect_all_or_drop_dict,
merge_keys = upks,
where_condition = where_conditions,
column_rename_logic= column_rename_logic,
drop_column_logic= drop_column_logic
)
步骤5: 创建金汇总
创建Gold统计表
@dlt.table(
name='Funnel_Metrics_By_Day',
table_properties={'quality': 'gold'}
)
def getFunnelMetricsByDay():
summary_df = (dlt.read("Silver_Finance_Update").groupBy(date_trunc('day', col("timestamp")).alias("Date")).agg(count(col("timestamp")).alias("DailyFunnelMetrics"))
)
return summary_df
步骤6:DLT管道的有向无环图
通过将所有内容汇总起来,创建以下的DLT管道。

第七步:设置DLT流水线
{
"id": "c44f3244-b5b6-4308-baff-5c9c1fafd37a",
"name": "UpliftDLTPipeline",
"storage": "dbfs:/pipelines/c44f3244-b5b6-4308-baff-5c9c1fafd37a",
"configuration": {
"pipelines.applyChangesPreviewEnabled": "true"
},
"clusters": [
{
"label": "default",
"autoscale": {
"min_workers": 1,
"max_workers": 5
}
}
],
"libraries": [
{
"notebook": {
"path": "/Streaming Demos/UpliftDLTWork/DLT - Bronze Layer"
}
},
{
"notebook": {
"path": "/Users/DataEngineering/Streaming Demos/UpliftDLTWork/DLT - Silver Layer"
}
}
],
"target": "uplift_dlt_blog",
"continuous": false,
"development": true
}
在这个设置中,您可以配置管道级参数、云设置(例如IAM实例配置文件)和集群设置等。有关可用的DLT设置的完整列表,请参阅此文档。
步骤八:创建多任务工作
将DLT管道和前处理组合为一个作业。

在Delta Live Tables中,您可以通过表设置独立地控制每个表的所有设置,而无需更改管道代码。这使得管道的修改变得简单,并通过高级自动扩展大大提高了可扩展性,并通过表的并行生成来提高效率。最后,支持具有100个以上表的整个管道的是一个作业,并且所有的流式基础设施都被抽象化为简单的设置,并且使用简单的用户界面来管理管道的所有表的数据质量。在Delta Live Tables之前,管理这样的管道数据质量和重审是手动且非常耗时的事情。
这是一个很好的例子,它展示了使用Delta Live Tables可以简化数据工程师的工作体验。如果数据工程师和分析师自己创建和管理这样一个精细的数据管道,将需要数百小时的时间。但是通过使用Delta Live Tables,这一过程变得简单了。
最终,通过Delta Live Tables,Uplift能够在不必担心管理代码和个别数据源的情况下,专注于为其合作伙伴提供更智能和有效的产品。
免费试用 Databricks
数据湖 数据原型