Redis遅延キューの実装方法
Redisの遅延キューは、遅延タスクを格納するためにソート済みセットデータ構造を利用できます。具体的には、以下の手順で実装します。
- 遅延の実行時刻をスコア、タスクの内容をメンバーとして、ソートセットに追加する
- タイマータスクを利用してSorted Set内のタスクをポーリングで調べ、実行すべきタスクを探す。
- 遂行する。
- タスクが完了したら、ソートセットからタスクを削除します。
簡単な実装例は次のとおりです。
- キューに遅延ジョブを追加する:
import time
import redis
def add_delayed_task(queue_name, task, delay):
r = redis.Redis()
score = time.time() + delay
r.zadd(queue_name, {task: score})
- タイマードポーリングで実行する遅延ジョブ:
import redis
def check_delayed_tasks(queue_name):
r = redis.Redis()
current_time = time.time()
tasks = r.zrangebyscore(queue_name, 0, current_time)
for task in tasks:
# 执行任务
execute_task(task)
# 从队列中删除任务
r.zrem(queue_name, task)
def execute_task(task):
# 执行任务的逻辑
pass
定期に check_delayed_tasks 関数を呼び出すには、例えば apscheduler を用いたタスクのスケジューリングにより、cron ジョブを利用する。
このサンプルはあくまでシンプルな実装例であり、実際の利用時には、タスク並行実行の考慮やタスクの再試行のメカニズムなどが必要となる場合があります。また、スケジューラーの選択も実際のニーズに応じて検討してください。