用Redis实现JobQueue
最近不得已写了一个用MySQL来管理作业的代码,但我查了一下发现像sidekq和Celery这样的著名工具通常都使用Redis作为后端。
实现方式
工作将存储在Redis的LIST数据类型中。
ジョブを追加する際はRPUSHでリストの後ろから追加し、ジョブを取得する際はBLPOPを使って先頭から取得する。
BLPOPはリストからの先頭の要素を取得しつつ削除するコマンドだが、リストが空の場合はリストに要素が追加されるかタイムアウトまで待つ(ブロッキング)。このコマンドのおかげでワーカーは何度もジョブの有無を確認(ポーリング)しなくて済むし、ジョブが追加されたら即座に取得して実行できる。
使用命令行
コマンド動作BLPOP
(Blocking Left Pop?)リストの先頭から要素を取得しつつ削除する。
リストが空の場合は追加されるかタイムアウトまで待つ。RPUSH
(Right Push?)リストの後ろに要素を追加する。CLIENT ID現在のConnectionを識別するclient_idを取得する。CLIENT UNBLOCKclient_idのブロッキングを解除する。
(Blocking Left Pop?)リストの先頭から要素を取得しつつ削除する。
リストが空の場合は追加されるかタイムアウトまで待つ。RPUSH
(Right Push?)リストの後ろに要素を追加する。CLIENT ID現在のConnectionを識別するclient_idを取得する。CLIENT UNBLOCKclient_idのブロッキングを解除する。
代码 (Mandarin Chinese)
我使用Python Redis库进行了实现。
我使用两个线程运行了工作程序,并将从标准输入获取的字符串作为任务添加到队列中。
工作人员在结束时不等待BLPOP超时,导致无法正常结束并感到困惑,但是通过使用CLIENT UNBLOCK命令解除了阻塞状态,立即实现了结束。
import redis
from threading import Thread
class WorkerThread(Thread):
def __init__(self, key):
super().__init__()
# single_connection_clientにTrueをセットすると、RedisClientが使うConnectionが固定される
# Connectionを固定しないとclient_idの違うConnectionでBLPOPが実行される可能性があり、CLIENT UNBLOCKが実行できなくなる
self.client = redis.Redis(
host='redis', port=6379, db=0, single_connection_client=True
)
self.client_id = self.client.client_id()
self.key = key
self.running = True
def run(self):
while self.running:
message = self.client.blpop(self.key, timeout=30)
if (message):
print("{}: {}".format(self.client_id, message))
def stop(self):
self.running = False
# self.clientはConnectionが固定されているので、ブロッキング中に追加でコマンドを実行できない
# CLIENT UNBLOCK実行のため新しいクライアントを作成する
client = redis.Redis(host='redis', port=6379, db=0)
client.client_unblock(self.client_id)
def main(n_workers=2):
client = redis.Redis(host='redis', port=6379, db=0)
threads = [WorkerThread("queue") for _ in range(n_workers)]
try:
for t in threads: t.start()
# 空文字入力で終了
s = input()
while s:
client.rpush("queue", s)
s = input()
finally:
print("waiting for workers stopped")
for t in threads:
t.stop()
for t in threads: t.join()
if __name__ == '__main__':
main()
执行结果 (shí jié guǒ)
当输入文字时,可以确认两个工人会依次打印出来。
/app # python3 redis_queue.py
A
125: (b'queue', b'A')
B
126: (b'queue', b'B')
C
125: (b'queue', b'C')
D
126: (b'queue', b'D')
waiting for workers stopped