让我们尝试使用pika库连接到通过Docker Compose创建的RabbitMQ
首先
-
- 仕事でRabbitMQを触る必要性が出てきたのでDocker Composeで立てて遊んでみる
- 尚、本格的に構築する場合にはクラスタリング等も検討したほうが良い: https://www.rabbitmq.com/clustering.html
概述
RabbitMQ是什么?
-
- 主にメッセージキューを実現するためのOSSで、メッセージブローカと呼ばれる
-
- Erlangで書かれている
-
- 紆余曲折あり、現在はVMware社がサポートを行っている
-
- 他の選択肢としては、OSSだとApache KafkaやActiveMQ、パブリッククラウドだとAmazon SQSやAmazon MQ(内部的にはRabbitMQやActiveMQが使われている)等がある
- GitHubリポジトリ: https://github.com/rabbitmq
消息队列是什么
- 送信側と受信側の間にキューを挟み、送信側が任意のタイミングでキューにデータを溜め込み、受信側が任意のタイミングでキューに溜まったデータを取り出すような通信形態のこと

-
- 主に以下のようなメリットを享受することが期待できる
送信側は受信側の処理を待つことなく後続の処理ができる
送信側と受信側の間にメッセージブローカを挟むことによって疎結合なアーキテクチャを実現できる
スケーラブルなアーキテクチャを実現できる
使用的协议
-
- プロトコルはデフォルトではAMQP(Advanced Message Queuing Protocol)が使用される
-
- AMQPは元々、金融機関向けのプロトコルとして提供されており、JPモルガンやバンク・オブ・アメリカでも利用されていた
-
- 現在はOASISが標準化している他、ISO/IEC 19464として国際標準にもなっている
-
- MQTT(Message Queuing Telemetry Transport)等、他のプロトコルを使用することもできる
- これらのプロトコルを実装したシステムは金融機関のシステムの他、WebやIoTの文脈でも利用されている
基本概念
-
- プロデューサー
メッセージを送信するプログラム
コンシューマー
メッセージを受信するプログラム
キュー
メッセージをバッファリングしておく仕組み
エクスチェンジ
この記事では触れないが、重要が概念であるため説明
どのメッセージをどのように配送するかを決定する仕組み
チャンネル
1つのTCPコネクションを分割して共有するためのパフォーマンス上の仕組み
ドキュメント: https://www.rabbitmq.com/channels.html
Docker Compose的配置
- Docker Compose以外のインストール方法: https://www.rabbitmq.com/download.html
services:
rabbitmq:
# -management を付与したイメージの場合、15672ポートでManagement Plugin(Web UIのようなもの)が利用できるようになる
# Management Pluginのドキュメント: https://www.rabbitmq.com/management.html
# イメージは現時点で最新のものを指定している
image: rabbitmq:3.11.7-management
container_name: rabbitmq
ports:
- 5672:5672
- 15672:15672
# ユーザ名/パスワードの設定
# 特に設定しない場合、guest/guestが暗黙的に設定される
environment:
- RABBITMQ_DEFAULT_USER=root
- RABBITMQ_DEFAULT_PASS=password
# データの永続化
volumes:
- ./docker/rabbitmq/data:/var/lib/rabbitmq
-
- Webブラウザ上で http://localhost:15672 を開くとRabbitMQのWeb UIにアクセスできる
- username と password には docker-compose.yml に設定した RABBITMQ_DEFAULT_USER と RABBITMQ_DEFAULT_PASS を入力する

-
- ログイン後の画面
-
- RabbitMQの一通りの操作が可能
e.g. コネクションやキューの確認、メッセージの監視、etc …

用Python进行连接
-
- Pythonから接続する場合、pikaを利用する
-
- GitHubリポジトリ: https://github.com/pika/pika
以下はpikaを使ってメッセージの送信、受信をするサンプル
だいたいここに書いてあることと同じ: https://www.rabbitmq.com/tutorials/tutorial-one-python.html
安装
$ pip install pika
源代码
- 送信側
import pika
# コネクションの確立
# usernameとpasswordにguest/guestが設定されている場合は認証情報を指定しなくて良い
credentials = pika.PlainCredentials("root", "password")
connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost", port=5672, credentials=credentials))
# チャンネルの作成・キューの指定
queue_name = "greet"
channel = connection.channel()
channel.queue_declare(queue=queue_name)
# メッセージの送信
message = "hello, world!"
channel.basic_publish(exchange="", routing_key=queue_name, body=message)
print("Sent:", message)
# コネクションの切断
connection.close()
- 受信側
import pika
# コネクションの確立
# usernameとpasswordにguest/guestが設定されている場合は認証情報を指定しなくて良い
credentials = pika.PlainCredentials("root", "password")
connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost", port=5672, credentials=credentials))
# チャンネルの作成・キューの指定
queue_name = "greet"
channel = connection.channel()
channel.queue_declare(queue=queue_name)
# メッセージの受信
# ちなみに auto_ack=True にすると、メッセージ送信直後に正常に配信されたとみなされる
# スループットは向上するが、実際に配信されるよりも前にコネクションやチャンネルが切断されると、送信されたメッセージは失われるため注意
# ドキュメント: https://www.rabbitmq.com/confirms.html
def callback(ch, method, props, body):
print("Received:", body)
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
print("Waiting for messages ...")
channel.start_consuming()
执行结果
$ python consumer.py
Waiting for messages ...
Received: b'hello, world!'
$ python producer.py
Sent: hello, world!
- きちんと受信できていることが分かる
网页用户界面
-
- せっかくなので確認しておく
-
- Overview
メッセージが配送・取得されていることが分かる

-
- Queues
ソースコードから指定したキューが作成されている

-
- Channels
greet キューに紐付いたチャンネルが作成されている
