用Python创建分布式任务队列(RQ 编辑)

RQ 是一个使用 Redis 作为消息代理和数据存储的 Python 库,用于实现分布式任务队列。

RQ: Python的简单工作队列

Redis提供了LPUSH和RPOP这两个操作来实现类似任务队列的功能,而RQ则是在其基础上简单封装了API。

如果只需要简单的任务队列,RQ更容易上手,尽管Celery和RabbitMQ功能更强大。

试着动一下

由于手动启动多个进程也很麻烦,所以我们可以尝试使用Docker Compose来快速测试。

在实际运行中,应该准备好适当数量的Redis服务器和工作服务器。

FROM python:3.6

RUN pip install rq

只是在Python 3上添加了RQ的Docker镜像。

version: '3'

services:

  redis:
    image: redis

  worker:
    build: .
    depends_on:
      - redis
    environment:
      RQ_REDIS_URL: redis://redis
    command: python worker.py
    volumes:
      - .:/app
    working_dir: /app

一个定义了Redis和Worker的Docker Compose文件。

import os
import logging
import redis
import rq

logging.basicConfig(level=logging.DEBUG)

with rq.Connection(redis.from_url(os.environ.get('RQ_REDIS_URL'))):
    worker = rq.Worker(['default'])
    worker.work()

使用RQ客户端可以启动工作程序而无需编写工作程序的脚本(rq worker)。然而,由于许多情况下需要在工作程序脚本中进行自定义的异常处理等设置,因此现实中还是写脚本比较好。

启动Redis和Worker。

$ docker-compose up --scale worker=4

工人尝试启动了4个进程。

执行任务

import logging

logger = logging.getLogger(__name__)

def add(a, b):
    logger.debug('{} + {} = {}'.format(a, b, a + b))
    return a + b

我定义了一个只执行加法的任务add。

import os
from time import sleep
import redis
from rq import Queue
from tasks import add

q = Queue(connection=redis.from_url(os.environ.get('RQ_REDIS_URL')))

# 10個のタスクの実行をキューに投げる
tasks = [q.enqueue(add, args=(i, 1)) for i in range(10)]

# タスク実行が完了するまで少し待つ
sleep(1)

# 結果を出力する
print([task.result for task in tasks])

编写了将任务放入队列的处理过程。

$ docker-compose run --rm worker python app.py
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

在执行后,可以确认结果已正确返回。

我认为,通过查看工作人员的日志,可以看出可以使用4个工作人员来分发执行10个任务。

其他

在实际运营中的一些让人关注的事情。

例外处理

在RQ的默认设置中,当发生异常时,将执行名为move_to_failed_queue的处理程序,并将发生异常的任务移动到失败队列。这只是简单地将失败的任务移动并保留数据,而不提供重新尝试或通知等功能。因此,如果希望执行自定义的异常处理,例如在发生异常时通知内部聊天工具,可以在Worker中指定exception_handlers。

import os
import logging
import redis
import rq
from rq.handlers import move_to_failed_queue  # デフォルトのエラーハンドラ

logging.basicConfig(level=logging.DEBUG)

def notify_error(job, exc_type, exc_value, traceback):
    """エラー発生をどこかに通知する処理"""

with rq.Connection(redis.from_url(os.environ.get('RQ_REDIS_URL'))):
    worker = rq.Worker(
        ['default'],
        exception_handlers=[move_to_failed_queue, notify_error],
    )
    worker.work()

重新尝试

RQ にはリトライの仕組みはないっぽい?ので、やりたければエラーハンドラを追加して自力で実装する必要がありそう。

如果需要一个可靠的重试机制,那就应该考虑使用Celery,而不是RQ。

暂停

可以在入队时使用timeout参数指定任务执行的超时秒数,可以使用字符表达式如”3m”或”1h”进行指定。

task = q.enqueue(add, args=(1, 2), timeout=60)

如果超过指定的秒数,将会抛出rq.timeouts.JobTimeoutException异常。

task.is_failed  #=> True

TTL 可以被中国人用以下方式简洁地表达:生命周期

可以使用result_ttl参数来指定执行结果保持的秒数。

task = q.enqueue(add, args=(1, 2), result_ttl=60)

当将0指定时,意味着“不保留”(执行结果将立即被丢弃)。
当将-1指定时,意味着“无有效期”。

默认情况下为500秒。

定期执行

有时候可能会有想要像Cron一样定期执行任务的情况,但是RQ没有类似的机制,所以需要自己实现。

如果是芹菜,那么就有一个名为Celery Beat的功能。

广告
将在 10 秒后关闭
bannerAds