利用Celery和Redis实现Python任务的分布式处理
动机
我正在使用Django开发web应用程序,但是当在服务器上进行耗时的处理时,必须保持网页打开直到处理完成,这让我感到困扰。如果关闭页面,处理也会中止。
在这种情况下,可以使用异步处理。即使关闭页面,处理也会继续进行,并且之后可以打开页面来查看结果。
djangoで非同期処理を行う場合、ネットで検索して一番情報が多いのがCeleryとRedisを使う方法でした。しかし、色々な記事を読んでも、djangoへの組込み方は説明されているのですが、動作の仕組みや、タスクを別のホストで動かす方法(実際にはこれがしたかった)に言及されていなかったり、なんとなくしっくり来ませんでした。
そこで、とりあえずdjangoの話は置いておいて、Celery+RedisでPythonタスクをリモートホスト上で分散処理させる方法をまとめてみました。
環境
-
- OS : macOS 12.1 Monterey
-
- Python : 3.8.9 (macOS付属)
- Docker 20.10.12 (Redisサーバを動かす)
安装
Redis (简称)
まずは、Redisをインストールして動かします。
今回、RedisサーバはDocker上で動かして済ませてしまいました。Docker-Desktopでも、Docker-CLI + Docker-Engineでも構わないのですが、Dockerが既にインストールされている環境であれば、以下のコマンドでRedisのDockerイメージが取得され、コンテナが動作します。
docker run -d --rm -p 6379:6379 redis
如果不使用Docker,可以使用Homebrew来安装redis并启动redis-server也可以。
brew install redis
redis-server
Pythonモジュール
必须安装的Python模块是celery和Redis。我使用pip进行了安装。
pip3 install celery Redis [--user]
安装celery模块时,celery命令也会被安装。如果使用”–user”选项进行安装,则该命令将位于$HOME/Library/Python/3.8/bin/celery,建议将其加入环境变量。
首先在本地测试一下
準備要動的任務
你可以在一个名为 tasks.py 的文件中定义想要异步执行的任务。
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0')
app.conf.result_backend = 'redis://localhost:6379/0'
@app.task
def add(x, y):
from time import sleep
sleep(10)
return x + y
在第二行中,创建了Celery类的一个实例,第一个参数是Celery对象(应用程序)的名称,第二个参数broker是代理的URL。
在这里,代理是一个机制,它将发布的任务排队并分配给执行任务的工人。我们指定了在Docker上运行的Redis。由于我们已经打开了6379端口,所以可以通过localhost:6379访问Docker中的Redis。
第三行的描述是为了将执行的任务结果存储在Redis中。如果没有这个描述,任务的执行结果将只会输出到工作人员的日志中。
在使用django时,可以通过设置result_backend将结果存储在数据库表中。
Redis的URL字段的端口号(6379)和数据库号(末尾的0)都是默认值,所以可以省略不写。
app.conf.result_backend = 'redis://localhost'
实际运行的任务是add函数。
add函数被修饰符@app.task修饰,这表示它是在Celery中执行的任务。
执行任务
当tasks.py完成后,将启动工作进程。
工作进程是在要执行任务的主机上持续运行的,我们将其一直保持运行。由于这次是在本地执行,所以我们在包含tasks.py的目录中执行以下命令。
celery -A tasks worker --loglevel=info --concurrency=5
...
[config]
.> app: tasks:0x109a70730
.> transport: redis://localhost:6379//
.> results: redis://localhost:6379/
.> concurrency: 5 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)
[queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. tasks.add
[2022-01-28 13:40:46,996: INFO/MainProcess] Connected to redis://localhost:6379//
[2022-01-28 13:40:47,021: INFO/MainProcess] mingle: searching for neighbors
[2022-01-28 13:40:48,073: INFO/MainProcess] mingle: all alone
[2022-01-28 13:40:48,140: INFO/MainProcess] celery@myhost.local ready.
-A tasks指定参照tasks.py中Celery应用程序的定义。
–concurrency=5指定该工作程序最多同时执行5个任务。
在tasks.py所在的目录中启动Python,并输入以下内容。
python3
>>> from tasks import add
>>> res = add.delay(1234, 5678)
importしたaddではなく、add.delay()を呼び出すことで、addがタスクとして発行されます。
ワーカー(celely)のログには、以下の1行目が出力され、10秒ほど経つと2行目が表示されます。これは、発行したタスクが「10秒スリープしてから足し算の答えを返す」ものだからで、2行目のログにも1234+5678の答えの6912が表示されています。
[2022-01-28 13:41:10,048: INFO/MainProcess] Task tasks.add[e1165fa2-779e-49e5-9aad-1fdb472315a6] received
[2022-01-28 13:41:20,074: INFO/ForkPoolWorker-4] Task tasks.add[e1165fa2-779e-49e5-9aad-1fdb472315a6] succeeded in 10.023096726000002s: 6912
现在,让我们在发出任务的Python终端中检查任务的结果。
>>> res.ready()
True
>>> res.get()
6912
(另一种形式的)任务定义
在之前的例子中,我们在tasks.py文件中定义了任务,并将它们作为app指定给了worker。现在我们可以将其结构改变为以下形式。
mkdir adder
mv tasks.py adder/celery.py
最终结果是,tasks.py被移动并重命名为adder/目录下的celery.py。
在这种状态下启动工作进程,并且启动方法会有以下更改。
celery -A adder worker --loglevel=info --concurrency=5
-A tasks已经改变为-A adder。当在celery中指定目录-A时,它似乎会引用该目录下的celery.py文件中的应用程序定义。
在这种情况下,任务的发布如下所示。
>>> from adder.celery import add
>>> res = add.delay(1234, 5678)
远程执行任务
我真正想做的是从这里开始。
在之前的例子中,发出的任务在同一主机上执行。我想要的是让它在另一个主机上执行。
在主机B上的准备
让我们将之前任务发布的localhost称为hostA,希望实际执行任务的主机称为hostB。在hostB上,我们还需要安装celery和Redis的Python模块。
pip3 install celery Redis [--user]
将在hostA上创建的tasks.py(或者adder/celery.py)文件复制到hostB,并做以下更改。
from celery import Celery
app = Celery('tasks', broker='redis://hostA:6379/0') # hostAに変更
app.conf.result_backend = 'redis://hostA:6379/0' # hostAに変更
@app.task
def add(x, y):
from time import sleep
sleep(10)
return x + y
変更したのは、RedisサーバーのURLです。RedisはhostA上で(正確には、hostA上のDockerコンテナ上で)動いているので、URLのホスト名をhostAに変更します。
如果能够进行更改,将在hostB上运行worker。与在hostA(本地主机)上运行的方式完全相同。
celery -A tasks worker --loglevel=info --concurrency=5
在hostA上发布任务,并在hostB上执行任务。
在主机A上,我们发出完全与之前的例子相同的任务。
python3
>>> from tasks import add
>>> res = add.delay(1234, 5678)
这一次,任务将在hostB上执行。
(如果没有停止hostA上的工作进程,它也可能在hostA上执行 :-))
让我们在hostA和hostB上都运行工作器,并尝试从hostA的Python中发出以下任务。
from tasks import add
results = []
for i in range(10):
res = add(i, i)
results.append(res)
我已经分发了10个任务。
hostA和hostB的任务分别指定了–concurrency=5,所以每个主机将执行5个任务(可能)。
最后
使用Celery+Redis,可以轻松编写Python的分布式处理。
虽然最初目的是在Django中进行异步处理,但我觉得还有其他很多用途。