RabbitMQ的发布者和消费者(使用Golang)
首先
鉴于Docker上无法正常运行Kafka,出于无聊的理由,尝试使用RabbitMQ。这次我们将使用Go语言编写,客户端使用github.com/streadway/amqp。
※不会对RabbitMQ本身进行解释。
构成
Golang:版本1.9
RabbitMQ:版本3.7(Docker仓库)
streadway/amqp:主分支
任务完成的结果
模特儿
首先,我们先随意定义一个消息模型。
package protocol
// Protocol メッセージモデル
type Protocol struct {
Message string
Timestamp int64
}
出版商
接下来是Publisher。在Kafka中被称为Producer。
package main
import (
"encoding/json"
"flag"
"fmt"
"log"
"time"
"github.com/lightstaff/go-rabbitmq-example/protocol"
"github.com/streadway/amqp"
)
var (
// RabbitMQのURLはパラメータで指定
rabbitmqURL = flag.String("rabbitmqUrl", "localhost:5672", "Your RabbtMQ URL")
)
func main() {
flag.Parse()
if *rabbitmqURL == "" {
log.Fatalln("[ERROR] require rabbitmqUrl")
}
log.Println("publisher start")
// amqpだから・・・
url := fmt.Sprintf("amqp://%s", *rabbitmqURL)
// ダイアルして・・・
conn, err := amqp.Dial(url)
if err != nil {
log.Printf("[ERROR] %s", err.Error())
return
}
defer conn.Close()
// チャンネル開いて・・・
ch, err := conn.Channel()
if err != nil {
log.Printf("[ERROR] %s", err.Error())
return
}
defer ch.Close()
// Exchangeを作って・・・
if err := ch.ExchangeDeclare("test", "fanout", false, true, false, false, nil); err != nil {
log.Printf("[ERROR] %s", err.Error())
return
}
// とりあえず3回・・・
for i := 0; i < 3; i++ {
// メッセージ作って・・・
p := &protocol.Protocol{
Message: fmt.Sprintf("Hello. No%d", i),
Timestamp: time.Now().UnixNano(),
}
// バイナリ化して・・・
bytes, err := json.Marshal(p)
if err != nil {
log.Printf("[ERROR] %s", err.Error())
continue
}
// Publish!!
if err := ch.Publish("test", "", false, false, amqp.Publishing{
ContentType: "text/plain",
Body: bytes,
}); err != nil {
log.Printf("[ERROR] %s", err.Error())
continue
}
log.Printf("[INFO] send message. msg: %v", p)
}
log.Println("publisher stop")
}
这部分内容与代码的注释一样。似乎还能做一些https://www.rabbitmq.com/confirms.html上的操作,但本次将省略。
消费者 zhě)
接下来是消费者。
package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"log"
"os"
"os/signal"
"github.com/lightstaff/go-rabbitmq-example/protocol"
"github.com/streadway/amqp"
)
var (
// RabbitMQのURLはパラメータで指定
rabbitmqURL = flag.String("rabbitmqUrl", "localhost:5672", "Your RabbtMQ URL")
)
func main() {
flag.Parse()
if *rabbitmqURL == "" {
log.Fatalln("[ERROR] require rabbitmqUrl")
}
log.Println("consumer start")
// amqpだから・・・
url := fmt.Sprintf("amqp://%s", *rabbitmqURL)
// goroutineかけるので・・・
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 終了待機するので・・・
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
// ダイアルして・・・
conn, err := amqp.Dial(url)
if err != nil {
log.Printf("[ERROR] %s", err.Error())
return
}
defer conn.Close()
// チャンネル開いて・・・
ch, err := conn.Channel()
if err != nil {
log.Printf("[ERROR] %s", err.Error())
return
}
defer ch.Close()
// Exchangeを作って・・・
if err := ch.ExchangeDeclare("test", "fanout", false, true, false, false, nil); err != nil {
log.Printf("[ERROR] %s", err.Error())
return
}
// Queueを作って・・・
q, err := ch.QueueDeclare("", false, true, true, false, nil)
if err != nil {
log.Printf("[ERROR] %s", err.Error())
return
}
// QueueにExchangeをBindして・・・
if err := ch.QueueBind(q.Name, "", "test", false, nil); err != nil {
log.Printf("[ERROR] %s", err.Error())
return
}
// Consume!!
msgs, err := ch.Consume(q.Name, "", true, true, false, false, nil)
if err != nil {
log.Printf("[ERROR] %s", err.Error())
return
}
// メッセージ受付ルーチン
go func() {
CONSUMER_FOR:
for {
select {
case <-ctx.Done():
break CONSUMER_FOR
case m, ok := <-msgs:
if ok {
// モデル化して・・・
var p protocol.Protocol
if err := json.Unmarshal(m.Body, &p); err != nil {
log.Printf("[ERROR] %s", err.Error())
continue CONSUMER_FOR
}
log.Printf("[INFO] success consumed. tag: %d, body: %v", m.DeliveryTag, &p)
}
}
}
}()
<-signals
log.Println("consumer stop")
}
这也是依据代码注释的内容,但与Publisher相比,步骤增加了。Publisher和Consumer都使用ch.ExchangeDeclare(…)来创建Exchange,根据GoDoc中的说明,ExchangeDeclare在服务器上声明一个交换机。如果交换机不存在,服务器将创建一个新的。如果交换机已存在,服务器会验证它的类型、持久性和自动删除标志是否与提供的相符。因此,如果Publisher和Consumer之间存在差异,将会抛出错误。
进行
我們將在兩個不同的控制台中分別運行發布者(Publisher)和消費者(Consumer)。順便一提,我們會啟動兩個消費者。
$: go run main.go -rabbitmqUrl="xxx.xxx.xxx.xxx:5672"
2018/04/28 15:51:15 publisher start
2018/04/28 15:51:16 [INFO] send message. msg: &{Hello. No0 1524898276122079800}
2018/04/28 15:51:16 [INFO] send message. msg: &{Hello. No1 1524898276122579800}
2018/04/28 15:51:16 [INFO] send message. msg: &{Hello. No2 1524898276123079900}
2018/04/28 15:51:16 publisher stop
$: go run main.go -rabbitmqUrl="xxx.xxx.xxx.xxx:5672"
2018/04/28 15:51:03 consumer start
2018/04/28 15:51:16 [INFO] success consumed. tag: 1, msg: &{Hello. No0 1524898276122079800}
2018/04/28 15:51:16 [INFO] success consumed. tag: 2, msg: &{Hello. No1 1524898276122579800}
2018/04/28 15:51:16 [INFO] success consumed. tag: 3, msg: &{Hello. No2 1524898276123079900}
2018/04/28 15:51:31 consumer stop
$: go run main.go -rabbitmqUrl="xxx.xxx.xxx.xxx:5672"
2018/04/28 15:51:07 consumer start
2018/04/28 15:51:16 [INFO] success consumed. tag: 1, msg: &{Hello. No0 1524898276122079800}
2018/04/28 15:51:16 [INFO] success consumed. tag: 2, msg: &{Hello. No1 1524898276122579800}
2018/04/28 15:51:16 [INFO] success consumed. tag: 3, msg: &{Hello. No2 1524898276123079900}
2018/04/28 15:51:34 consumer stop
以这样的方式进行输出。
结束
我认为RabbitMQ的教程拥有丰富的各种语言示例,这很好(因为它是官方编写的,所以更加可信)。我打算用Scala或C#来编写下一篇教程。