让我们尝试使用RabbitMQ + PostgreSQL + Celery进行异步任务处理

整体形象

    1. 客户端应用程序将任务注册到RabbitMQ。

 

    1. Celery的工作节点从RabbitMQ获取任务并执行。

 

    1. 执行结果保存到PostgreSQL。

 

    1. 执行结果返回到RabbitMQ。

 

    1. 客户端应用程序从PostgreSQL获取执行结果。

 

    Flower监控工作节点和任务的执行状态。

应用技术

兔子消息队列

    • メッセージキューを実現するためのOSSで、メッセージブローカと呼ばれる

 

    詳しくはDocker Composeで立てたRabbitMQにpikaで接続してみるを参照

西芹

    • Pythonで非同期タスク処理を行うライブラリ

 

    • RabbitMQ以外にもRedis等、様々なメッセージブローカに対応している

 

    GitHub: https://github.com/celery/celery

    • Celery専用のモニタリングツール

 

    GitHub: https://github.com/mher/flower

PostgreSQL是一种数据库管理系统。

    • いわずもがなRDBMS

 

    • 今回は実行結果の永続化に使用する

 

    実行結果を永続化しない場合、クライアントアプリケーションから実行結果を取得できなくなる

配置环境

Docker Compose的设置

    RabbitMQとPostgreSQLのDockerコンテナを起動する
services:

  rabbitmq:
    image: rabbitmq:3.11.7-management
    container_name: rabbitmq
    ports:
      - 5672:5672
      - 15672:15672
    environment:
      - RABBITMQ_DEFAULT_USER=root
      - RABBITMQ_DEFAULT_PASS=password
    volumes:
      - ./docker/rabbitmq/data:/var/lib/rabbitmq

  postgresql:
    image: postgres:latest
    container_name: postgresql
    ports:
      - 5432:5432
    environment:
      - POSTGRES_DB=celery
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
      - PGDATA=/var/lib/postgresql/data/pgdata
    volumes:
      - ./docker/postgresql/data:/var/lib/postgresql/data
# Dockerコンテナの起動
$ docker compose up -d

# Dockerコンテナが起動していることを確認
$ docker compose ps
NAME                COMMAND                  SERVICE             STATUS              PORTS
postgresql          "docker-entrypoint.s…"   postgresql          running             0.0.0.0:5432->5432/tcp
rabbitmq            "docker-entrypoint.s…"   rabbitmq            running             0.0.0.0:5672->5672/tcp, 0.0.0.0:15672->15672/tcp

# RabbitMQに接続できることを確認
$ open http://localhost:15672

# PostgreSQLに接続できることを確認
$ psql -h localhost -p 5432 -U postgres -d celery
Password for user postgres: 
psql (14.5 (Homebrew), server 15.2 (Debian 15.2-1.pgdg110+1))
WARNING: psql major version 14, server major version 15.
         Some psql features might not work.
Type "help" for help.

celery=# 

安装图书馆

$ pipenv install celery
$ pipenv install flower

# CeleryからPostgreSQLを操作するために必要
$ pipenv install SQLAlchemy
$ pipenv install psycopg2

启动Flower

$ celery --broker=amqp://root:password@localhost:5672// flower
$ open localhost:5555
FireShot Capture 157 - Flower - localhost.png

任务的执行

from celery import Celery

# brokerにはRabbitMQ、backendにはPostgreSQLの接続情報を記述する
celery = Celery(
    "tasks",
    broker="amqp://root:password@localhost:5672//",
    backend="db+postgresql://postgres:postgres@localhost:5432/celery",
)

# 名前を受け取って挨拶を返すタスク
# Celery.taskデコレータを付与することで、Celeryのタスクとして登録される
@celery.task
def greet(name: str):
    return "hello, {name}!".format(name=name)

工人的启动

$ celery -A tasks worker --loglevel=INFO

# ワーカーの並列数を指定する場合
$ celery -A tasks worker --concurrency 4 --loglevel=INFO
    • ワーカーの並列数を指定した場合、Flower上の「Max concurrency」が指定した値になっている

ちなみにデフォルトの並列数は8

任务执行

from tasks import greet

# タスクの実行
res = greet.delay("takashi")

# タスクの実行結果の取得
msg = res.get(timeout=1)
print(msg)
$ python client.py
hello, takashi!

确认Flower和PostgreSQL

    Flower上の「Processed」と「Succeeded」が増加している
    PostgreSQL上に実行結果を保存するテーブルが作成されている
$ psql -h localhost -p 5432 -U postgres -d celery
Password for user postgres: 
psql (14.5 (Homebrew), server 15.2 (Debian 15.2-1.pgdg110+1))
WARNING: psql major version 14, server major version 15.
         Some psql features might not work.
Type "help" for help.

celery=# \dt
               List of relations
 Schema |        Name        | Type  |  Owner   
--------+--------------------+-------+----------
 public | celery_taskmeta    | table | postgres
 public | celery_tasksetmeta | table | postgres
(2 rows)

bannerAds