RabbitMQのディレイキューの使い方は?

RabbitMQの遅延キューは、プラグインであるrabbitmq_delayed_message_exchangeを使用して実装されています。以下に使用手順を示します。

  1. プラグインをインストールするには、まずrabbitmq_delayed_message_exchangeプラグインをインストールする必要があります。次のコマンドを使用してインストールできます。
  2. rabbitmq_delayed_message_exchangeを有効にする
  3. 延期スイッチを作成する:次のコマンドを使用して、延期スイッチを作成します。
  4. rabbitmqadminによって、名前がdelayed_exchangeでタイプがx-delayed-messageのexchangeを宣言し、引数は{“x-delayed-type”:”direct”}となります。
  5. 延長キューを作成する:以下のコマンドを使用して延長キューを作成し、遅延エクスチェンジにバインドします。
  6. rabbitmqadminコマンドを使用して、delayed_queueという名前のキューを宣言します。rabbitmqadminを使用して、delayed_exchangeからdelayed_queueへのbindingを宣言します。routing_keyはdelayed_routing_keyです。
  7. 遅延メッセージの送信: 以下のコードスニペットを使用して、遅延キューに遅延メッセージを送信します。
  8. import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(‘localhost’))
    channel = connection.channel()

    channel.basic_publish(exchange=’delayed_exchange’,
    routing_key=’delayed_routing_key’,
    body=’Delayed message’,
    properties=pika.BasicProperties(
    headers={‘x-delay’: 5000} # メッセージの遅延時間を設定、単位はミリ秒
    ))
    connection.close()

  9. 上記のコードでは、ヘッダー内のx-delayフィールドを設定することで遅延時間を指定し、単位はミリ秒です。
  10. 消費遅延メッセージ:以下のコードスニペットを使用して、遅延キュー内のメッセージを消費します。
  11. pikaをインポートします。def callback(ch, method, properties, body):
    print(“受信したメッセージ:”, body)

    connection = pika.BlockingConnection(pika.ConnectionParameters(‘localhost’))
    channel = connection.channel()

    channel.basic_consume(queue=’delayed_queue’,
    on_message_callback=callback,
    auto_ack=True)

    channel.start_consuming()

  12. 上記のコードでは、basic_consumeメソッドのqueueパラメーターを遅延キューの名前に指定し、auto_ackをTrueに設定して、メッセージを自動的に確認する。

注意:上記のコード片段中の ‘localhost’ と ‘delayed_routing_key’ は、実際の状況に合わせて変更してください。

bannerAds