尝试使用 Azure Synapse Link for Azure Cosmos DB API for MongoDB

首先

这篇文章是关于在GitHub上公开的Azure Synapse Link for Azure Cosmos DB API for MongoDB示例的解释文章。
原文是用英语写的。我们在这个存储库上进行了日语翻译,请随意选择访问哪个版本。

    • 本家 (English): Azure-Samples/Synapse

日本語訳 (非公式): ymasaoka/Synapse

另外,Azure Cosmos DB API for MongoDB 的 Azure Synapse Link 示例已经准备在 /Notebooks/PySpark/Synapse Link for Cosmos DB samples/MongoDB 上。如果您想要了解更多详情,请务必查看这里。

建立环境

执行此示例需要以下环境。

    • Azure Cosmos DB アカウント (API for MongoDB)

 

    Spark プールがある Azure Synapse ワークスペース

请参考以下文章以了解创建环境的方法。
Cosmos DB 的请求单位(RU/s)400 RU/s已足够。

    Azure Synapse Link を試してみる

请务必在创建 Azure Cosmos DB 帐户时选择 MongoDB 用 Azure Cosmos DB API。

スクリーンショット 2020-09-23 14.34.49.png

开始使用 Azure Cosmos DB 的 MongoDB API 和 Synapse Link

这次的发展方向

本次将按照以下步骤验证 Azure Synapse Link 的操作。

    1. 従来の MongoDB クライアントを使用してデータセットを Azure Cosmos DB トランザクションストアに挿入

 

    1. トランザクションストアから ETL された分析ストア内のデータに対して集計クエリを実行

 

    1. 別のデータセットを Azure Cosmos DB トランザクションストアに挿入

 

    分析ストア内にある両方のデータセットを統合し、集計クエリを実行

预先准备

Cosmos DB コレクションの作成

Azure Cosmos DB 側で、データベースとコレクションを作成しましょう。
1 点注意は、MongoDB の場合、パーティションキーは シャードキー1というものに置き換えられます。
また、2020/09/25 時点で、本家のサンプル記載ではシャードキーを設定するとありますが、シャードキーは設定しないでください。

データベース: test

コレクション: htap

Storage capacity: Fixed (10 GB)

分析ストア: On

スクリーンショット 2020-09-24 1.32.25.png
スクリーンショット 2020-09-25 20.00.38.png

Synapse 側で Linked services を作成

こちらは、SQL API の時と同様に、Azure Synapse ワークスペースより Linked services を設定します。
リンクされたサービス名の名前は任意のものを設定してください。
この記事では、CosmosDBMongoTestという名前を使用します。

スクリーンショット 2020-09-24 1.50.24.png
スクリーンショット 2020-09-24 1.52.56.png

新しい Spark プールを作成

このサンプルでは Spark プールを使用して PySpark で Cosmos DB の分析ストアにアクセスします。
サンプルの実行にあたり、いくつかの Python ライブラリを Spark プール内にインストールしますが、既に別の用途で Spark プールを作成している場合、新しい Spark プールを作成することを推奨します。
Spark プールに Python ライブラリをインストールすることで、他に影響を与えないためです。
使用していない Spark プールについては、アイドル期間中はコンピューティングの課金は発生しません。(自動一時停止の設定)
デフォルトでは 15 分になっていますが、こちらを無効化しないようにしておくのがベストです。アイドル状態までの時間は任意で変更してください。

スクリーンショット 2020-09-24 2.00.02.png
スクリーンショット 2020-09-24 2.01.32.png

サンプル実行に必要なライブラリをインストール

在新创建的Spark池上,安装执行示例所需的Python库。所需的库列表如示例笔记本中所述。

pymongo==2.8.1
aenum==2.1.2
backports-abc==0.5
bson==0.5.10

请按照pip freeze的参考文档所描述的格式,使用要求文件来安装到Spark池中。可以在创建Spark池时或通过Azure门户或Synapse工作区的专用界面来执行安装操作,而不是通过pip进行安装。

首先,在本地准备好requirements.txt文件。

pymongo==2.8.1
aenum==2.1.2
backports-abc==0.5
bson==0.5.10

这次我们将通过Synapse工作区界面,使用这个requirements.txt文件来安装Python模块。
转到[管理] -> [Apache Spark池]界面,并选择要在示例中使用的Spark池。

スクリーンショット 2020-09-24 3.20.40.png

Spark プールを選択すると、プロパティ画面が表示されますので、[Packages] 欄にある Upload environment config file ボタンを選択し、先ほど作成した requirements.txt をアップロードしましょう。

スクリーンショット 2020-09-24 3.22.00.png

当成功上传后,您应该能够收到以下通知,并确认requirements.txt文件已经被应用。

スクリーンショット 2020-09-24 3.25.15.png
スクリーンショット 2020-09-24 3.25.44.png

您可以使用以下代码在 Synapse 工作区笔记本中执行以确认已安装的 Python 库。

import pip # pip 関数を使用するために必要です。
for i in pip.get_installed_distributions(local_only=True):
    print(i)

在执行时,请确保使用的 Spark 池和语言已设置为 PySpark。

スクリーンショット 2020-09-24 3.32.43.png

requirements.txt に記載したパッケージは、Spark プールの開始時に PyPI からダウンロードされる仕組みになっており、Spark プールから Spark インスタンスが作成される度に使用されるようになります。
もし、出力結果に requirements.txt に記載した Python ライブラリが表示されていない場合は、Spark プールを再起動してみてください。
なお、requirements.txt のアップロード後は、Spark プールの再起動が内部的に実行されるため、環境によっては反映までに時間を要する場合があります。

初始化MongoDB客户端并插入数据。

Python 用の MongoDB クライアントである pymongo を使用して、Cosmos DB のトランザクションストアにデータを挿入します。
MongoDB クライアントを使用するにあたって、CosmosDB のユーザー名とプライマリ/セカンダリ パスワードを先に取得しておきます。

スクリーンショット 2020-09-25 20.26.56.png

一旦获得后,将其设定为变量。

DATABASE_ACCOUNT_NAME = '<ここにユーザー名を入力>'
DATABASE_ACCOUNT_READWRITE_KEY = '<ここにプライマリ/セカンダリ パスワードを入力>'

一旦设置完毕,我们将初始化MongoDB客户端。

from pymongo import MongoClient
from bson import ObjectId # ObjectId が機能するため

client = MongoClient("mongodb://{account}.mongo.cosmos.azure.com:10255/?ssl=true&replicaSet=globaldb".format(account = DATABASE_ACCOUNT_NAME)) # 独自のデータベースアカウントのエンドポイント。
db = client.test    # データベースを選択
db.authenticate(name=DATABASE_ACCOUNT_NAME,password=DATABASE_ACCOUNT_READWRITE_KEY) # データベースアカウント名と任意の読み取り/書き込みキーを使用します。

MongoDB クライアントを初期化したら、データを挿入します。500 アイテムを生成します。

from random import randint
import time

orders = db["htap"]

items = ['Pizza','Sandwich','Soup', 'Salad', 'Tacos']
prices = [2.99, 3.49, 5.49, 12.99, 54.49]

for x in range(1, 501):
    order = {
        'item' : items[randint(0, (len(items)-1))],
        'price' : prices[randint(0, (len(prices)-1))],
        'rating' : randint(1, 5),
        'timestamp' : time.time()
    }

    result=orders.insert(order)

print('500 個の注文の作成が終了しました')

分析ストアからデータを確認

确认在事务存储中插入的数据可以从分析存储中进行确认。
由于数据在后台自动进行ETL,因此在插入数据后立即运行以下代码将导致错误。
请等待大约5分钟后再执行。

# 分析ストアのデータをデータフレームにロードする
# シークレットを使用してセルを実行し、DATABASE_ACCOUNT_NAME および DATABASE_ACCOUNT_READWRITE_KEY 変数を取得します。
df = spark.read.format("cosmos.olap")\
    .option("spark.cosmos.accountEndpoint", "https://{account}.documents.azure.com:443/".format(account = DATABASE_ACCOUNT_NAME))\
    .option("spark.cosmos.accountKey", DATABASE_ACCOUNT_READWRITE_KEY)\
    .option("spark.cosmos.database", "test")\
    .option("spark.cosmos.container", "htap")\
    .load()

# ピザの注文からのすべての収益を調べてみましょう
df.groupBy(df.item.string).sum().show()

# df[df.item.string == 'Pizza'].show(10) 
# df.select(df['item'] == Struct).show(10) 
# df.select("timestamp.float64").show(10)
# df.select("timestamp.string", when(df.timestamp.string != null)).show(10)

等到结果返回来之后,我们一起来确认一下架构信息吧。

df.schema
StructType(List(StructField(_rid,StringType,true),
StructField(_ts,LongType,true),StructField(id,StringType,true),
StructField(_etag,StringType,true),
StructField(_id,StructType(List(StructField(objectId,StringType,true))),true),
StructField(item,StructType(List(StructField(string,StringType,true))),true),
StructField(price,StructType(List(StructField(float64,DoubleType,true))),true),
StructField(rating,StructType(List(StructField(int32,IntegerType,true))),true),
StructField(timestamp,StructType(List(StructField(float64,DoubleType,true))),true),
StructField(_partitionKey,StructType(List(StructField(string,StringType,true))),true)))

以不同的形式插入时间戳数据。

さらにデータを投入していきます。
追加のデータ投入では、timestampデータの部分は、先ほどはtime.time()を指定していましたが、今回はstrftime(“%Y-%m-%d %H:%M:%S”)を指定してみます。

from random import randint
from time import strftime

orders = db["htap"]

items = ['Pizza','Sandwich','Soup', 'Salad', 'Tacos']
prices = [2.99, 3.49, 5.49, 12.99, 54.49]

for x in range(1, 501):
    order = {
        'item' : items[randint(0, (len(items)-1))],
        'price' : prices[randint(0, (len(prices)-1))],
        'rating' : randint(1, 5),
        'timestamp' : strftime("%Y-%m-%d %H:%M:%S")
    }

    result=orders.insert(order)

print('500 個の注文の作成が終了しました')

もし、上記のコードの実行に時間がかかっている、またはエラーが返された場合、MongoDB クライアントをもう一度初期化した後、再実行してみてください。

确认追加的数据

完成数据输入后,请再次检查分析存储中的数据。
如前所述,由于数据正在从事务存储中以后台方式进行ETL到分析存储,因此请稍等片刻后再执行。

# 分析ストアのデータをデータフレームにロードする
# シークレットを使用してセルを実行し、DATABASE_ACCOUNT_NAME および DATABASE_ACCOUNT_READWRITE_KEY 変数を取得します。
df = spark.read.format("cosmos.olap")\
    .option("spark.cosmos.accountEndpoint", "https://{account}.documents.azure.com:443/".format(account = DATABASE_ACCOUNT_NAME))\
    .option("spark.cosmos.accountKey", DATABASE_ACCOUNT_READWRITE_KEY)\
    .option("spark.cosmos.database", "test")\
    .option("spark.cosmos.container", "htap")\
    .load()

# ピザの注文からのすべての収益を調べてみましょう
df.filter( (df.timestamp.string != "")).show(10)

ここでのポイントは、timestamp.string パラメーターを指定していることです。これを指定することによって、time.time() で挿入されたデータではなく、strftime(“%Y-%m-%d %H:%M:%S”) で挿入された、ISO 文字列の日付が入っているデータのみを個別に読み取ることができます。
合わせて、先ほど同様、スキーマ情報も確認してみましょう。

df.schema
StructType(List(StructField(_rid,StringType,true),
StructField(_ts,LongType,true),
StructField(id,StringType,true),
StructField(_etag,StringType,true),
StructField(_id,StructType(List(StructField(objectId,StringType,true))),true),
StructField(item,StructType(List(StructField(string,StringType,true))),true),
StructField(price,StructType(List(StructField(float64,DoubleType,true))),true),
StructField(rating,StructType(List(StructField(int32,IntegerType,true))),true),
StructField(timestamp,StructType(List(StructField(float64,DoubleType,true),StructField(string,StringType,true))),true),
StructField(_partitionKey,StructType(List(StructField(string,StringType,true))),true)))

我认为你可以确认`timestamp`部分已经被更改为`StructField(timestamp,StructType(List(StructField(float64,DoubleType,true),StructField(string,StringType,true))),true)`。

我认为通过这个,可以确认Azure Cosmos DB分析存储的模式管理中的模式信息已经被更改。


我建议阅读《MongoDB的扩展书籍》,虽然这是一本旧书,但可以了解有关MongoDB分片的内容。请注意,这本书是基于MongoDB 1.x版本的。(现在的最新版本是4.4)
bannerAds