使用Python操作Azure Cosmos DB的数据

首先

由于我在Python中检查了关于Azure Cosmos DB的数据操作,我想把它们整理在备忘录中。(虽然开始总是一样的措辞…)

最近我涉足了无服务器架构(AWS AppSync+Lambda+DynamoDB),然后我就好奇如果用Azure的话会是怎样的情况,所以我进行了一番调查。
在Azure实现无服务器架构的话,我认为需要两个组件:Azure Functions(HttpTrigger/Graphene)和CosmosDB。

我們首先希望實現有關 Azure CosmosDB 的數據操作。

关于Azure CosmosDB,官方文档提供了关于概述和详细信息的丰富内容。此外,关于使用Azure Cosmos DB: Azure Cosmos DB SQL API创建Python应用程序的官方文档提供了设置、实施方法和教程,建议参考此文档以加深理解。

创建Azure Cosmos DB账户

为了使用Azure Cosmos DB,首先需要创建一个Azure Cosmos DB帐户。
Azure Cosmos DB帐户类似于一个DB集群,您可以在帐户下创建多个数据库和多个容器(表)。
还有一些连接信息,如指定虚拟网络,为连接提供的端点URI和密钥等。

API的类型被设定为核心(SQL)。还为MongoDB的API、Cassandra、Azure表和Gremlin(图形)等API准备了其它选项。
(我认为是关于在CosmosClient的QueryItems中使用哪种API类型来执行查询)

Azure Cosmos DB可以通过Azure Portal或者ARM模板部署,但这次我们将使用Azure Portal。

cosmosdb01.png
cosmosdb02.png

3. 您将看到标签设置选项,但这次我们将选择不指定。

cosmosdb03.png
cosmosdb05.png

3. 准备Python执行环境

因为我将来会尝试使用Azure Functions,所以我希望能够在Python3.6虚拟环境中运行进行CosmosDB数据操作的Python程序。

首先,创建并激活虚拟环境。

python3.6 -m venv .env
source .env/bin/activate

然后,我们将安装本次使用的Python模块(azure-cosmos)。

在命令行中运行以下命令安装:
pip3.6 install azure-cosmos

4. 在Python中操作CosmosDB的数据。

4.1. 执行样本

這次準備的程式碼在這裡。
#這份程式碼在這裡的說明會不時進行更新,因此GitHub上的版本是最新的。

在cosmosdb/config.py文件中准备了环境信息,并记录了CosmosDB的URI和主要密钥。
(配置内容可在AzurePortal-> CosmosDB-> 从键中通过URI和主要密钥确认)

git clone https://github.com/moyota/python-cosmosdb.git
cd python-cosmosdb
python3.6 -m venv .env
source .env/bin/activate
pip install -r requirements.txt
vi cosmosdb/config.py
python -m Program

由于还有官方文档和GitHub(Azure/azure-cosmos-python)可供参考,所以这两者也是有用的。

4.2. 关于对Cosmos DB的数据操作的实现

我会参考 Azure SDK for Python(azure.cosmos相关)的文档进行实现。

我們提供了一個在構造函數內進行數據庫定義和容器(表格定義)的initialize_xxx方法,以及執行項目(記錄)的CRUD操作的xxx_item方法。(數據庫連接在構造函數內實現)

import azure.cosmos.cosmos_client as cosmos_client
import azure.cosmos.errors as errors
from .config import config
from logging import getLogger
logger = getLogger(__name__)

class DatabaseConnection():
    def __init__(self):
        # Initialize the Cosmos client
        self.client = cosmos_client.CosmosClient(url_connection=config['ENDPOINT'],
            auth={'masterKey': config['PRIMARYKEY']})

        # Read a database
        self.database_link = 'dbs/' + config['DATABASE']

        # Read a container
        self.container_link = self.database_link + '/colls/{0}'.format(config['CONTAINER'])


    def get_options(self):
        options = {
            'enableCrossPartitionQuery': True,
            'maxItemCount': 5,
        }
        return options


    # Create a database
    def initialize_database(self, database_id=config['DATABASE']):
        try:        
            return self.client.CreateDatabase({'id': database_id})
            # database_link = self.database_link
            # database_link = db['_self']
        except errors.HTTPFailure as e:
            if e.status_code == 409:
               print('A database with id \'{0}\' already exists'.format(self.database_link))
            else:
                raise errors.HTTPFailure(e.status_code)
        except Exception as e:
            raise e


    # Create a container
    def initialize_container(self, database_id=config['DATABASE'], container_id=config['CONTAINER']):
        try:
            database_link = 'dbs/' + database_id
            container_definition = {
                'id': container_id,
                "indexingPolicy": {
                    "indexingMode": "consistent", # consistent or lazy
                    "automatic": True,
                },
                "partitionKey": {
                    "paths": [
                      "/partitionKey"
                    ],
                    "kind": "Hash",
                }
            }
            return self.client.CreateContainer(database_link, container_definition, {'offerThroughput': 400})
        except errors.CosmosError as e:
            if e.status_code == 409:
                logger.error('A collection with id \'{0}\' already exists'.format(self.container_link))
            else:
                raise errors.HTTPFailure(e.status_code) 
        except Exception as e:
            raise e


    # Create and add a item to the container
    def create_item(self, item):
        query = {'query': 'SELECT * FROM c WHERE c.id = "{0}"'.format(item["id"])}

        try:
            self.client.CreateItem(self.container_link, item)
            results = list(self.client.QueryItems(self.container_link, query, self.get_options()))
            for item in results:
                logger.info(item)
            return results
        except errors.HTTPFailure as e:
            if e.status_code == 404:
                print('A collection with id \'{0}\' does not exist'.format(self.container_link))
            elif e.status_code == 409:
                logger.error('A Item with id \'{0}\' already exists'.format(item['id']))            
            else: 
                raise errors.HTTPFailure(e.status_code)
        except Exception as e:
            raise e


    # Delete a item from the container
    def delete_item(self, item):
        query = {'query': 'SELECT * FROM c WHERE c.id = "{0}"'.format(item["id"])}

        try:
            results = self.client.QueryItems(self.container_link, query, self.get_options())
            for item in list(results):
                logger.info(item)
                options = self.get_options()
                options['partitionKey'] = item['partitionKey']
                self.client.DeleteItem(item['_self'], options)
            return results
        except errors.HTTPFailure as e:
            if e.status_code == 404:
                print('A collection with id \'{0}\' does not exist'.format(self.container_link))
            else:
                raise errors.HTTPFailure(e.status_code)
        except Exception as e:
            raise e


    # Upsert a item from the container
    def upsert_item(self, item):
        try:
            result = self.client.UpsertItem(self.container_link, item, self.get_options())
            logger.info(result)
            return result
        except errors.HTTPFailure as e:
            if e.status_code == 404:
                print('A collection with id \'{0}\' does not exist'.format(self.container_link))
            else:
                raise errors.HTTPFailure(e.status_code)
        except Exception as e:
            raise e


    def read_item(self, id):
        query = {'query': 'SELECT * FROM c WHERE c.id = "{0}"'.format(id)}

        try:
            results = list(self.client.QueryItems(self.container_link, query, self.get_options()))
            return results
        except errors.HTTPFailure as e:
            if e.status_code == 404:
                print('A collection with id \'{0}\' does not exist'.format(self.container_link))
            else:
                raise errors.HTTPFailure(e.status_code)
        except Exception as e:
raise e


    def read_items(self):
        try:
            itemList = list(self.client.ReadItems(self.container_link, {'maxItemCount': 10}))

            logger.info('Found {0} documents'.format(itemList.__len__()))

            for item in itemList:
                logger.info(item)
            return itemList
        except errors.HTTPFailure as e:
            if e.status_code == 404:
                print('A collection with id \'{0}\' does not exist'.format(self.container_link))
            else:
                raise errors.HTTPFailure(e.status_code)
        except Exception as e:
            raise e


# Sample Data1
def getReplacedItem(id):
    return {
        'id': 'id{0}'.format(id),
        'partitionKey': id,
        'message': 'Hello World CosmosDB!',
        'addition': 'test replace {0}'.format(id),
    }
# Sample Data2
def getItem(id):
    return {
        'id': 'id{0}'.format(id),
        'partitionKey': id,
        'message': 'Hello World CosmosDB!',
    }

在容器定义(CreateContainer)中,您可以在定义时指定分区键、索引策略和吞吐量(RU)。
(似乎也可以指定唯一键策略,但需要确认)

同时,CosmosClient的CreateItem/DeleteItem/UpsertItem/ReadItems方法需要指定容器链接,可以使用’dbs/数据库名/colls/容器名’或者ReadContainer来获取并指定。

UpsertItem方法是一个执行更新/插入操作的方法,如果项目具有相同的id,那么将更新该值,如果id不同,则会创建一个新项目。

如果数据库/容器/项目已经存在,则会发生异常(errors.HTTPFailure.status_code为409),所以请适当地添加异常处理。

请注意,对于cosmos_client的DeleteItem方法,必须在选项中指定分区键的值。如果没有进行此指定,将会引发以下异常(参考)。如果项目中没有指定分区键,则请使用upsert方法或其他方法来注册键值。

将上述准备好的代码放入Azure Functions中,并作为样例代码运行。getItem和getReplacedItem是为了生成虚拟数据而准备的函数。

from cosmosdb.cosmosdb import DatabaseConnection
from cosmosdb.cosmosdb import getItem, getReplacedItem

if __name__ == "__main__":
    print("---")
    dbConnection = DatabaseConnection()

    print(dbConnection.initialize_database())
    print(dbConnection.initialize_container())

    dbConnection.create_item(getItem("1"))
    dbConnection.create_item(getItem("2"))
    dbConnection.create_item(getItem("3"))
    dbConnection.upsert_item(getReplacedItem("3"))
    dbConnection.upsert_item(getReplacedItem("4"))
    dbConnection.delete_item(getItem("2"))

    print("---")
    itemList = dbConnection.read_items()
    for item in itemList:
        print(item)

执行一次并查看数据浏览器,结果如下:

python -m Program

cosmosdb04.png

总结

这次准备使用Python操作CosmosDB,但由于官方文档非常完善,我们实际上将在实施过程中参考文档进行操作。
关于容器定义和选项参数的详细信息可能没有在文档中列出,所以我们需要在摸索中进行调查。
此外,我希望将执行环境设置为Azure Functions,并开始实现GraphQL的API。

请提供相关资料

用 Azure Cosmos DB SQL API 帐户来构建 Python 应用程序:Azure Cosmos DB 说明文档 (官方)、Azure Cosmos DB Python 示例、Azure/azure-cosmos-python(GitHub)、Python 的 Azure SDK (azure.cosmos.cosmos_client)。

bannerAds