利用Celery和Redis实现Python任务的分布式处理

动机

我正在使用Django开发web应用程序,但是当在服务器上进行耗时的处理时,必须保持网页打开直到处理完成,这让我感到困扰。如果关闭页面,处理也会中止。
在这种情况下,可以使用异步处理。即使关闭页面,处理也会继续进行,并且之后可以打开页面来查看结果。

djangoで非同期処理を行う場合、ネットで検索して一番情報が多いのがCeleryとRedisを使う方法でした。しかし、色々な記事を読んでも、djangoへの組込み方は説明されているのですが、動作の仕組みや、タスクを別のホストで動かす方法(実際にはこれがしたかった)に言及されていなかったり、なんとなくしっくり来ませんでした。

そこで、とりあえずdjangoの話は置いておいて、Celery+RedisでPythonタスクをリモートホスト上で分散処理させる方法をまとめてみました。

環境

    • OS : macOS 12.1 Monterey

 

    • Python : 3.8.9 (macOS付属)

 

    Docker 20.10.12 (Redisサーバを動かす)

安装

Redis (简称)

まずは、Redisをインストールして動かします。
今回、RedisサーバはDocker上で動かして済ませてしまいました。Docker-Desktopでも、Docker-CLI + Docker-Engineでも構わないのですが、Dockerが既にインストールされている環境であれば、以下のコマンドでRedisのDockerイメージが取得され、コンテナが動作します。

docker run -d --rm -p 6379:6379 redis

如果不使用Docker,可以使用Homebrew来安装redis并启动redis-server也可以。

brew install redis
redis-server

Pythonモジュール

必须安装的Python模块是celery和Redis。我使用pip进行了安装。

pip3 install celery Redis [--user]

安装celery模块时,celery命令也会被安装。如果使用”–user”选项进行安装,则该命令将位于$HOME/Library/Python/3.8/bin/celery,建议将其加入环境变量。

首先在本地测试一下

準備要動的任務

你可以在一个名为 tasks.py 的文件中定义想要异步执行的任务。

from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0')
app.conf.result_backend = 'redis://localhost:6379/0'

@app.task
def add(x, y):
    from time import sleep
    sleep(10)
    return x + y

在第二行中,创建了Celery类的一个实例,第一个参数是Celery对象(应用程序)的名称,第二个参数broker是代理的URL。

在这里,代理是一个机制,它将发布的任务排队并分配给执行任务的工人。我们指定了在Docker上运行的Redis。由于我们已经打开了6379端口,所以可以通过localhost:6379访问Docker中的Redis。

第三行的描述是为了将执行的任务结果存储在Redis中。如果没有这个描述,任务的执行结果将只会输出到工作人员的日志中。
在使用django时,可以通过设置result_backend将结果存储在数据库表中。

Redis的URL字段的端口号(6379)和数据库号(末尾的0)都是默认值,所以可以省略不写。

app.conf.result_backend = 'redis://localhost'

实际运行的任务是add函数。
add函数被修饰符@app.task修饰,这表示它是在Celery中执行的任务。

执行任务

当tasks.py完成后,将启动工作进程。
工作进程是在要执行任务的主机上持续运行的,我们将其一直保持运行。由于这次是在本地执行,所以我们在包含tasks.py的目录中执行以下命令。

celery -A tasks worker --loglevel=info --concurrency=5
...
[config]
.> app:         tasks:0x109a70730
.> transport:   redis://localhost:6379//
.> results:     redis://localhost:6379/
.> concurrency: 5 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)
[queues]
.> celery           exchange=celery(direct) key=celery
[tasks]
  . tasks.add
[2022-01-28 13:40:46,996: INFO/MainProcess] Connected to redis://localhost:6379//
[2022-01-28 13:40:47,021: INFO/MainProcess] mingle: searching for neighbors
[2022-01-28 13:40:48,073: INFO/MainProcess] mingle: all alone
[2022-01-28 13:40:48,140: INFO/MainProcess] celery@myhost.local ready.

-A tasks指定参照tasks.py中Celery应用程序的定义。
–concurrency=5指定该工作程序最多同时执行5个任务。

在tasks.py所在的目录中启动Python,并输入以下内容。

python3
>>> from tasks import add
>>> res = add.delay(1234, 5678)

importしたaddではなく、add.delay()を呼び出すことで、addがタスクとして発行されます。

ワーカー(celely)のログには、以下の1行目が出力され、10秒ほど経つと2行目が表示されます。これは、発行したタスクが「10秒スリープしてから足し算の答えを返す」ものだからで、2行目のログにも1234+5678の答えの6912が表示されています。

[2022-01-28 13:41:10,048: INFO/MainProcess] Task tasks.add[e1165fa2-779e-49e5-9aad-1fdb472315a6] received
[2022-01-28 13:41:20,074: INFO/ForkPoolWorker-4] Task tasks.add[e1165fa2-779e-49e5-9aad-1fdb472315a6] succeeded in 10.023096726000002s: 6912

现在,让我们在发出任务的Python终端中检查任务的结果。

>>> res.ready()
True
>>> res.get()
6912

(另一种形式的)任务定义

在之前的例子中,我们在tasks.py文件中定义了任务,并将它们作为app指定给了worker。现在我们可以将其结构改变为以下形式。

mkdir adder
mv tasks.py adder/celery.py

最终结果是,tasks.py被移动并重命名为adder/目录下的celery.py。
在这种状态下启动工作进程,并且启动方法会有以下更改。

celery -A adder worker --loglevel=info --concurrency=5

-A tasks已经改变为-A adder。当在celery中指定目录-A时,它似乎会引用该目录下的celery.py文件中的应用程序定义。

在这种情况下,任务的发布如下所示。

>>> from adder.celery import add
>>> res = add.delay(1234, 5678)

远程执行任务

我真正想做的是从这里开始。
在之前的例子中,发出的任务在同一主机上执行。我想要的是让它在另一个主机上执行。

在主机B上的准备

让我们将之前任务发布的localhost称为hostA,希望实际执行任务的主机称为hostB。在hostB上,我们还需要安装celery和Redis的Python模块。

pip3 install celery Redis [--user]

将在hostA上创建的tasks.py(或者adder/celery.py)文件复制到hostB,并做以下更改。

from celery import Celery
app = Celery('tasks', broker='redis://hostA:6379/0') # hostAに変更
app.conf.result_backend = 'redis://hostA:6379/0' # hostAに変更

@app.task
def add(x, y):
    from time import sleep
    sleep(10)
    return x + y

変更したのは、RedisサーバーのURLです。RedisはhostA上で(正確には、hostA上のDockerコンテナ上で)動いているので、URLのホスト名をhostAに変更します。

如果能够进行更改,将在hostB上运行worker。与在hostA(本地主机)上运行的方式完全相同。

celery -A tasks worker --loglevel=info --concurrency=5

在hostA上发布任务,并在hostB上执行任务。

在主机A上,我们发出完全与之前的例子相同的任务。

python3
>>> from tasks import add
>>> res = add.delay(1234, 5678)

这一次,任务将在hostB上执行。
(如果没有停止hostA上的工作进程,它也可能在hostA上执行 :-))

让我们在hostA和hostB上都运行工作器,并尝试从hostA的Python中发出以下任务。

from tasks import add
results = []
for i in range(10):
    res = add(i, i)
    results.append(res)

我已经分发了10个任务。
hostA和hostB的任务分别指定了–concurrency=5,所以每个主机将执行5个任务(可能)。

最后

使用Celery+Redis,可以轻松编写Python的分布式处理。
虽然最初目的是在Django中进行异步处理,但我觉得还有其他很多用途。

bannerAds