RabbitMQ 同期レスポンスの実現方法

RabbitMQのRPC機能を利用すると、同期的な結果の返却を実現できます。RPC(リモートプロシージャコール)は、あるプログラムがまるでローカル関数であるかのように、別のプログラム内の関数やメソッドを呼び出す、プロセス間通信の一種です。

RabbitMQで同期的に返答結果を取得するための一般的な手順を以下に示します。

  1. RPCクライアントおよびRPCサーバーを作成します。
  2. クライアントはリクエストメッセージをユニークなコールバックキューと一緒にサーバーに送信します。
  3. サーバーからリクエストを受けた後、リクエストを処理し、結果をクライアント指定のコールバックキューに送信します。
  4. クライアントは結果を受け取り、それを呼び出し元に返します。

具体化の手順は次のとおりです。

  1. サーバから返される結果を受け取るためのコールバックキューを定義します。
  2. クライアントがリクエストメッセージを送信する際は、コールバックキューの名前をメッセージ属性の値としてサーバーに送信する。
  3. サーバーがリクエストを処理するとき、結果はコールバックキューに送信されます。
  4. クライアントはリクエストの送信後、コールバックキューの監視を開始し、結果の受信を待ちます。
  5. クライアントは結果を受け取ると、呼び出し元に結果を返します。

以下には簡単なサンプルコードを示します。

RPC クライアント

import pika
import uuid

class RpcClient:
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
        self.channel = self.connection.channel()

        result = self.channel.queue_declare(queue='', exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(queue=self.callback_queue, on_message_callback=self.on_response, auto_ack=True)

    def on_response(self, ch, method, props, body):
        if self.correlation_id == props.correlation_id:
            self.response = body

    def call(self, message):
        self.response = None
        self.correlation_id = str(uuid.uuid4())

        self.channel.basic_publish(
            exchange='',
            routing_key='rpc_queue',
            properties=pika.BasicProperties(
                reply_to=self.callback_queue,
                correlation_id=self.correlation_id,
            ),
            body=message)

        while self.response is None:
            self.connection.process_data_events()

        return self.response


rpc_client = RpcClient()
response = rpc_client.call('Hello, World!')
print(response)

RPC サーバー:

import pika

class RpcServer:
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
        self.channel = self.connection.channel()

        self.channel.queue_declare(queue='rpc_queue')

        self.channel.basic_qos(prefetch_count=1)
        self.channel.basic_consume(queue='rpc_queue', on_message_callback=self.on_request)

    def on_request(self, ch, method, props, body):
        message = body.decode()
        response = self.process_request(message)

        ch.basic_publish(
            exchange='',
            routing_key=props.reply_to,
            properties=pika.BasicProperties(correlation_id=props.correlation_id),
            body=str(response))

        ch.basic_ack(delivery_tag=method.delivery_tag)

    def process_request(self, message):
        # 处理请求的逻辑
        response = 'Hello, ' + message
        return response


rpc_server = RpcServer()
rpc_server.channel.start_consuming()

上記サンプルコードでは、クライアントはリクエストメッセージを送信時にコールバックキューの名前をメッセージプロパティの値としてサーバーに送信します。サーバーはリクエストを処理すると、その結果をコールバックキューに送信します。クライアントはリクエスト送信後、コールバックキューの監視を開始し、結果の受信を待ちます。クライアントは結果を受信後、呼び出し元にその結果を返します。

注意点;上のサンプルコードはPythonのpikaライブラリを使用して実装しています、他のプログラミング言語をご利用の場合は、対応する言語のRabbitMQ RPCの実装方法を確認してください。

bannerAds