RabbitMQのディレイキューの使い方は?
RabbitMQの遅延キューは、プラグインであるrabbitmq_delayed_message_exchangeを使用して実装されています。以下に使用手順を示します。
- プラグインをインストールするには、まずrabbitmq_delayed_message_exchangeプラグインをインストールする必要があります。次のコマンドを使用してインストールできます。
- rabbitmq_delayed_message_exchangeを有効にする
- 延期スイッチを作成する:次のコマンドを使用して、延期スイッチを作成します。
- rabbitmqadminによって、名前がdelayed_exchangeでタイプがx-delayed-messageのexchangeを宣言し、引数は{“x-delayed-type”:”direct”}となります。
- 延長キューを作成する:以下のコマンドを使用して延長キューを作成し、遅延エクスチェンジにバインドします。
- rabbitmqadminコマンドを使用して、delayed_queueという名前のキューを宣言します。rabbitmqadminを使用して、delayed_exchangeからdelayed_queueへのbindingを宣言します。routing_keyはdelayed_routing_keyです。
- 遅延メッセージの送信: 以下のコードスニペットを使用して、遅延キューに遅延メッセージを送信します。
- import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(‘localhost’))
channel = connection.channel()channel.basic_publish(exchange=’delayed_exchange’,
routing_key=’delayed_routing_key’,
body=’Delayed message’,
properties=pika.BasicProperties(
headers={‘x-delay’: 5000} # メッセージの遅延時間を設定、単位はミリ秒
))
connection.close() - 上記のコードでは、ヘッダー内のx-delayフィールドを設定することで遅延時間を指定し、単位はミリ秒です。
- 消費遅延メッセージ:以下のコードスニペットを使用して、遅延キュー内のメッセージを消費します。
- pikaをインポートします。def callback(ch, method, properties, body):
print(“受信したメッセージ:”, body)connection = pika.BlockingConnection(pika.ConnectionParameters(‘localhost’))
channel = connection.channel()channel.basic_consume(queue=’delayed_queue’,
on_message_callback=callback,
auto_ack=True)channel.start_consuming()
- 上記のコードでは、basic_consumeメソッドのqueueパラメーターを遅延キューの名前に指定し、auto_ackをTrueに設定して、メッセージを自動的に確認する。
注意:上記のコード片段中の ‘localhost’ と ‘delayed_routing_key’ は、実際の状況に合わせて変更してください。