红移流式摄入的使用方法汇总

这是AWS Analytics Advent Calendar 2022的第八篇文章。

首先

AWS re:Invent 2022 参加会议上,宣布了 Redshift 的流式数据导入功能的 GA 版本。流式数据导入是指可以直接从 Amazon Kinesis Data Streams 和 Amazon Managed Streaming for Apache Kafka (MSK) 实时投入数据到 Redshift 的功能。

在此功能推出之前,要实现在 Redshift 上实时导入数据,需要使用 Amazon Kinesis Data Firehose 先将数据暂存到 S3 中,然后逐个使用 Redshift 的COPY语句进行导入。这需要实现和操作各种复杂的机制,比如逐个执行COPY语句、排除已加载到S3上的文件等等。

通过这个功能,可以摆脱这些实现和操作的麻烦,并且能够以更接近实时的新鲜度投入数据,甚至比以往的方法更快速。

本文中将总结使用Amazon Kinesis Data Streams进行流式数据摄取时的基本用法和注意事项,针对的是已启用Redshift的情况。

2. 如何使用

以下是大致的使用方法:

    1. 给与与Redshift集群关联的IAM角色对Amazon Kinesis Data Streams流的访问权限

 

    1. 在Redshift上创建用于读取流数据的Materialized View

 

    定期更新Materialized View以从流中获取数据

1. 授予访问权限

为了访问 Redshift 到 Amazon Kinesis Data Streams 的流,请将与 Redshift 集群关联的 IAM 角色授予对 Kinesis 的访问权限的 IAM 策略附加到 Redshift。IAM 策略如下所示。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "ReadStream",
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStreamSummary",
                "kinesis:GetShardIterator",
                "kinesis:GetRecords",
                "kinesis:DescribeStream"
            ],
            "Resource": "arn:aws:kinesis:*:0123456789:stream/*" ←ここに読み込み元の stream の ARN を記載する
        },
        {
            "Sid": "ListStream",
            "Effect": "Allow",
            "Action": [
                "kinesis:ListStreams",
                "kinesis:ListShards"
            ],
            "Resource": "*"
        }
    ]
}

创建材料化视图

在创建 Materialized View 之前,需要确保 Redshift 集群和 Amazon Kinesis 数据流之间可以进行网络通信。换句话说,如果集群所属的 VPC 的子网组可以通过 Internet Gateway 或 NAT Gateway 访问互联网,那么即使通过互联网也是可以的,需要确保这一点。另外,如果希望在 AWS 内部的网络上封闭,需要使用 Kinesis interface VPC Endpoint。在这里,我们将介绍如何创建 interface VPC Endpoint。

创建 VPC 终端节点

Screen Shot 2022-12-06 at 18.25.55 copy.jpg

以下是重点内容。

    • Amazon Kinesis Data Streams にアクセスするための Endpoint なのでサービスとして kinesis-streams を選択

 

    • VPC は Redshift のクラスターが配置されている場所を選択

 

    • サブネットはクラスターが配置されているサブネットグループに含まれるサブネットを選択

サブネットグループについての詳細は Managing cluster subnet groups using the console のドキュメントを参考にしてください

创建物化视图

用Redshift查询编辑器(查询编辑器v2)创建外部SCHEMA。

CREATE EXTERNAL SCHEMA {外部 SCHEMA 名}
FROM KINESIS
IAM_ROLE 'iam-role-arn'; # デフォルト IAM ロールが設定されている場合は IAM_ROLE default でも可

之后,我会创建一个Materialized View。

CREATE MATERIALIZED VIEW {VIEW 名} AS
SELECT
... # ここでデータのマッピングを行う
FROM {外部 SCHEMA 名}.{stream 名};

由于{流名}中包含以下元数据,可以利用这些元数据将其映射到表的列上。

メタデータ名型説明approximate_arrival_timestampTIMESTAMP各レコードが Kinesis stream に投入されたおおよその時間partition_keyVARCHAR(256)Kinesis stream に投入する時に指定した partition_keyshard_idCHAR(20)Kinesis stream 内の shard の idsequence_numberVARCHAR(128)shard 内で一意の値で、時間とともに増加kinesis_dataVARBYTE(1024000)Kinesis stream に投入された実データrefresh_timeTIMESTAMPMaterialized View が REFRESH を開始した時間 (= Redshift に取り込みを開始した時間)

由于 kinesis_data 被保存为二进制数据,所以在将其映射到列时需要使用 from_varbyte 函数将其转换为字符串。此外,由于输入到流中的数据通常是以 json 或 csv 格式为主,因此可以使用 json_extract_path_text 函数或 split_part 函数将其拆分成列。以下是一个例子:

json_extract_path_text(from_varbyte(kinesis_data, 'utf-8'), '{フィールド名}')::int as {カラム名}

更新材料化视图以导入数据。

通过执行以下查询,更新并导入数据至Materialized View。

REFRESH MATERIALIZED VIEW {VIEW 名};

这个查询只会导入自上次更新时点到当前更新时点所增加的数据。已经导入的数据将不会被更新。另外,如果是初始更新时刻没有上次更新时刻,将会导入存储在流中的所有数据。因此,如果删除了视图,只有当前保存在流中的数据会被导入。流的保留期限过期的数据将无法再次导入。

定期更新的物化视图

除非使用REFRESH进行更新,否则数据将无法导入,为了实时导入数据,需要频繁且定期地进行更新。在配置好的Redshift集群中,可以使用查询计划设置来定时执行REFRESH语句。然而,该调度器的最小单位是分钟,并且在执行时会使用计算资源。因此,更新频率需要考虑实时需求和计算资源的可接受程度来确定。

12/08更新

在创建物化视图时,可以使用物化视图的自动更新功能,无需主动执行 REFRESH 操作即可实现自动更新。在这种情况下,创建物化视图时需要添加 AUTO REFRESH YES。

CREATE MATERIALIZED VIEW {VIEW 名} AUTO REFRESH YES AS
SELECT
... # ここでデータのマッピングを行う
FROM {外部 SCHEMA 名}.{stream 名};

这个功能可以与手动刷新同时使用。

注意点是,由于自动更新是由AWS一方的逻辑来执行的,因此无法通过自身的逻辑来控制更新。据我亲身使用的经验,即使数据被输入流中,更新也不会立即生效(可能需要几分钟才会更新)。如果需要频繁更新,可能需要手动进行刷新。

总结

我已经解释了在经过配置的Redshift集群上使用流数据摄取的基本方法。虽然Redshift Serverless也可以使用流数据摄取,在更新时消耗资源单位(RPU)可能会被用掉,并且无法使用查询调度设置,所以如果不使用Materialized View的自动更新功能,则需要自行准备一种更新Materialized View的方法。

引用资料

    • https://docs.aws.amazon.com/redshift/latest/dg/materialized-view-streaming-ingestion.html

 

    • https://docs.aws.amazon.com/redshift/latest/dg/materialized-view-streaming-ingestion-getting-started.html

 

    • https://docs.aws.amazon.com/redshift/latest/dg/materialized-view-refresh.html#materialized-view-auto-refresh

 

    https://aws.amazon.com/jp/blogs/aws/new-for-amazon-redshift-general-availability-of-streaming-ingestion-for-kinesis-data-streams-and-managed-streaming-for-apache-kafka/