红移流式摄入的使用方法汇总
这是AWS Analytics Advent Calendar 2022的第八篇文章。
首先
AWS re:Invent 2022 参加会议上,宣布了 Redshift 的流式数据导入功能的 GA 版本。流式数据导入是指可以直接从 Amazon Kinesis Data Streams 和 Amazon Managed Streaming for Apache Kafka (MSK) 实时投入数据到 Redshift 的功能。
在此功能推出之前,要实现在 Redshift 上实时导入数据,需要使用 Amazon Kinesis Data Firehose 先将数据暂存到 S3 中,然后逐个使用 Redshift 的COPY语句进行导入。这需要实现和操作各种复杂的机制,比如逐个执行COPY语句、排除已加载到S3上的文件等等。
通过这个功能,可以摆脱这些实现和操作的麻烦,并且能够以更接近实时的新鲜度投入数据,甚至比以往的方法更快速。
本文中将总结使用Amazon Kinesis Data Streams进行流式数据摄取时的基本用法和注意事项,针对的是已启用Redshift的情况。
2. 如何使用
以下是大致的使用方法:
-
- 给与与Redshift集群关联的IAM角色对Amazon Kinesis Data Streams流的访问权限
-
- 在Redshift上创建用于读取流数据的Materialized View
- 定期更新Materialized View以从流中获取数据
1. 授予访问权限
为了访问 Redshift 到 Amazon Kinesis Data Streams 的流,请将与 Redshift 集群关联的 IAM 角色授予对 Kinesis 的访问权限的 IAM 策略附加到 Redshift。IAM 策略如下所示。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "ReadStream",
"Effect": "Allow",
"Action": [
"kinesis:DescribeStreamSummary",
"kinesis:GetShardIterator",
"kinesis:GetRecords",
"kinesis:DescribeStream"
],
"Resource": "arn:aws:kinesis:*:0123456789:stream/*" ←ここに読み込み元の stream の ARN を記載する
},
{
"Sid": "ListStream",
"Effect": "Allow",
"Action": [
"kinesis:ListStreams",
"kinesis:ListShards"
],
"Resource": "*"
}
]
}
创建材料化视图
在创建 Materialized View 之前,需要确保 Redshift 集群和 Amazon Kinesis 数据流之间可以进行网络通信。换句话说,如果集群所属的 VPC 的子网组可以通过 Internet Gateway 或 NAT Gateway 访问互联网,那么即使通过互联网也是可以的,需要确保这一点。另外,如果希望在 AWS 内部的网络上封闭,需要使用 Kinesis interface VPC Endpoint。在这里,我们将介绍如何创建 interface VPC Endpoint。
创建 VPC 终端节点
以下是重点内容。
-
- Amazon Kinesis Data Streams にアクセスするための Endpoint なのでサービスとして kinesis-streams を選択
-
- VPC は Redshift のクラスターが配置されている場所を選択
-
- サブネットはクラスターが配置されているサブネットグループに含まれるサブネットを選択
サブネットグループについての詳細は Managing cluster subnet groups using the console のドキュメントを参考にしてください
创建物化视图
用Redshift查询编辑器(查询编辑器v2)创建外部SCHEMA。
CREATE EXTERNAL SCHEMA {外部 SCHEMA 名}
FROM KINESIS
IAM_ROLE 'iam-role-arn'; # デフォルト IAM ロールが設定されている場合は IAM_ROLE default でも可
之后,我会创建一个Materialized View。
CREATE MATERIALIZED VIEW {VIEW 名} AS
SELECT
... # ここでデータのマッピングを行う
FROM {外部 SCHEMA 名}.{stream 名};
由于{流名}中包含以下元数据,可以利用这些元数据将其映射到表的列上。
由于 kinesis_data 被保存为二进制数据,所以在将其映射到列时需要使用 from_varbyte 函数将其转换为字符串。此外,由于输入到流中的数据通常是以 json 或 csv 格式为主,因此可以使用 json_extract_path_text 函数或 split_part 函数将其拆分成列。以下是一个例子:
json_extract_path_text(from_varbyte(kinesis_data, 'utf-8'), '{フィールド名}')::int as {カラム名}
更新材料化视图以导入数据。
通过执行以下查询,更新并导入数据至Materialized View。
REFRESH MATERIALIZED VIEW {VIEW 名};
这个查询只会导入自上次更新时点到当前更新时点所增加的数据。已经导入的数据将不会被更新。另外,如果是初始更新时刻没有上次更新时刻,将会导入存储在流中的所有数据。因此,如果删除了视图,只有当前保存在流中的数据会被导入。流的保留期限过期的数据将无法再次导入。
定期更新的物化视图
除非使用REFRESH进行更新,否则数据将无法导入,为了实时导入数据,需要频繁且定期地进行更新。在配置好的Redshift集群中,可以使用查询计划设置来定时执行REFRESH语句。然而,该调度器的最小单位是分钟,并且在执行时会使用计算资源。因此,更新频率需要考虑实时需求和计算资源的可接受程度来确定。
12/08更新
在创建物化视图时,可以使用物化视图的自动更新功能,无需主动执行 REFRESH 操作即可实现自动更新。在这种情况下,创建物化视图时需要添加 AUTO REFRESH YES。
CREATE MATERIALIZED VIEW {VIEW 名} AUTO REFRESH YES AS
SELECT
... # ここでデータのマッピングを行う
FROM {外部 SCHEMA 名}.{stream 名};
这个功能可以与手动刷新同时使用。
注意点是,由于自动更新是由AWS一方的逻辑来执行的,因此无法通过自身的逻辑来控制更新。据我亲身使用的经验,即使数据被输入流中,更新也不会立即生效(可能需要几分钟才会更新)。如果需要频繁更新,可能需要手动进行刷新。
总结
我已经解释了在经过配置的Redshift集群上使用流数据摄取的基本方法。虽然Redshift Serverless也可以使用流数据摄取,在更新时消耗资源单位(RPU)可能会被用掉,并且无法使用查询调度设置,所以如果不使用Materialized View的自动更新功能,则需要自行准备一种更新Materialized View的方法。
引用资料
-
- https://docs.aws.amazon.com/redshift/latest/dg/materialized-view-streaming-ingestion.html
-
- https://docs.aws.amazon.com/redshift/latest/dg/materialized-view-streaming-ingestion-getting-started.html
-
- https://docs.aws.amazon.com/redshift/latest/dg/materialized-view-refresh.html#materialized-view-auto-refresh
- https://aws.amazon.com/jp/blogs/aws/new-for-amazon-redshift-general-availability-of-streaming-ingestion-for-kinesis-data-streams-and-managed-streaming-for-apache-kafka/