RabbitMQ的发布者和消费者(使用Golang)

首先

鉴于Docker上无法正常运行Kafka,出于无聊的理由,尝试使用RabbitMQ。这次我们将使用Go语言编写,客户端使用github.com/streadway/amqp。

※不会对RabbitMQ本身进行解释。

构成

Golang:版本1.9
RabbitMQ:版本3.7(Docker仓库)
streadway/amqp:主分支

任务完成的结果

模特儿

首先,我们先随意定义一个消息模型。

package protocol

// Protocol メッセージモデル
type Protocol struct {
    Message   string
    Timestamp int64
}

出版商

接下来是Publisher。在Kafka中被称为Producer。

package main

import (
    "encoding/json"
    "flag"
    "fmt"
    "log"
    "time"

    "github.com/lightstaff/go-rabbitmq-example/protocol"
    "github.com/streadway/amqp"
)

var (
    // RabbitMQのURLはパラメータで指定
    rabbitmqURL = flag.String("rabbitmqUrl", "localhost:5672", "Your RabbtMQ URL")
)

func main() {
    flag.Parse()

    if *rabbitmqURL == "" {
        log.Fatalln("[ERROR] require rabbitmqUrl")
    }

    log.Println("publisher start")

    // amqpだから・・・
    url := fmt.Sprintf("amqp://%s", *rabbitmqURL)

    // ダイアルして・・・
    conn, err := amqp.Dial(url)
    if err != nil {
        log.Printf("[ERROR] %s", err.Error())
        return
    }
    defer conn.Close()

    // チャンネル開いて・・・
    ch, err := conn.Channel()
    if err != nil {
        log.Printf("[ERROR] %s", err.Error())
        return
    }
    defer ch.Close()

    // Exchangeを作って・・・
    if err := ch.ExchangeDeclare("test", "fanout", false, true, false, false, nil); err != nil {
        log.Printf("[ERROR] %s", err.Error())
        return
    }

    // とりあえず3回・・・
    for i := 0; i < 3; i++ {
        // メッセージ作って・・・
        p := &protocol.Protocol{
            Message:   fmt.Sprintf("Hello. No%d", i),
            Timestamp: time.Now().UnixNano(),
        }

        // バイナリ化して・・・
        bytes, err := json.Marshal(p)
        if err != nil {
            log.Printf("[ERROR] %s", err.Error())
            continue
        }

        // Publish!!
        if err := ch.Publish("test", "", false, false, amqp.Publishing{
            ContentType: "text/plain",
            Body:        bytes,
        }); err != nil {
            log.Printf("[ERROR] %s", err.Error())
            continue
        }

        log.Printf("[INFO] send message. msg: %v", p)
    }

    log.Println("publisher stop")
}

这部分内容与代码的注释一样。似乎还能做一些https://www.rabbitmq.com/confirms.html上的操作,但本次将省略。

消费者 zhě)

接下来是消费者。

package main

import (
    "context"
    "encoding/json"
    "flag"
    "fmt"
    "log"
    "os"
    "os/signal"

    "github.com/lightstaff/go-rabbitmq-example/protocol"
    "github.com/streadway/amqp"
)

var (
    // RabbitMQのURLはパラメータで指定
    rabbitmqURL = flag.String("rabbitmqUrl", "localhost:5672", "Your RabbtMQ URL")
)

func main() {
    flag.Parse()

    if *rabbitmqURL == "" {
        log.Fatalln("[ERROR] require rabbitmqUrl")
    }

    log.Println("consumer start")

    // amqpだから・・・
    url := fmt.Sprintf("amqp://%s", *rabbitmqURL)

    // goroutineかけるので・・・
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // 終了待機するので・・・
    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)

    // ダイアルして・・・
    conn, err := amqp.Dial(url)
    if err != nil {
        log.Printf("[ERROR] %s", err.Error())
        return
    }
    defer conn.Close()

    // チャンネル開いて・・・
    ch, err := conn.Channel()
    if err != nil {
        log.Printf("[ERROR] %s", err.Error())
        return
    }
    defer ch.Close()

    // Exchangeを作って・・・
    if err := ch.ExchangeDeclare("test", "fanout", false, true, false, false, nil); err != nil {
        log.Printf("[ERROR] %s", err.Error())
        return
    }

    // Queueを作って・・・
    q, err := ch.QueueDeclare("", false, true, true, false, nil)
    if err != nil {
        log.Printf("[ERROR] %s", err.Error())
        return
    }

    // QueueにExchangeをBindして・・・
    if err := ch.QueueBind(q.Name, "", "test", false, nil); err != nil {
        log.Printf("[ERROR] %s", err.Error())
        return
    }

    // Consume!!
    msgs, err := ch.Consume(q.Name, "", true, true, false, false, nil)
    if err != nil {
        log.Printf("[ERROR] %s", err.Error())
        return
    }

    // メッセージ受付ルーチン
    go func() {
    CONSUMER_FOR:
        for {
            select {
            case <-ctx.Done():
                break CONSUMER_FOR
            case m, ok := <-msgs:
                if ok {
                    // モデル化して・・・
                    var p protocol.Protocol
                    if err := json.Unmarshal(m.Body, &p); err != nil {
                        log.Printf("[ERROR] %s", err.Error())
                        continue CONSUMER_FOR
                    }

                    log.Printf("[INFO] success consumed. tag: %d, body: %v", m.DeliveryTag, &p)
                }
            }
        }
    }()

    <-signals

    log.Println("consumer stop")
}

这也是依据代码注释的内容,但与Publisher相比,步骤增加了。Publisher和Consumer都使用ch.ExchangeDeclare(…)来创建Exchange,根据GoDoc中的说明,ExchangeDeclare在服务器上声明一个交换机。如果交换机不存在,服务器将创建一个新的。如果交换机已存在,服务器会验证它的类型、持久性和自动删除标志是否与提供的相符。因此,如果Publisher和Consumer之间存在差异,将会抛出错误。

进行

我們將在兩個不同的控制台中分別運行發布者(Publisher)和消費者(Consumer)。順便一提,我們會啟動兩個消費者。

$: go run main.go -rabbitmqUrl="xxx.xxx.xxx.xxx:5672"
2018/04/28 15:51:15 publisher start
2018/04/28 15:51:16 [INFO] send message. msg: &{Hello. No0 1524898276122079800}
2018/04/28 15:51:16 [INFO] send message. msg: &{Hello. No1 1524898276122579800}
2018/04/28 15:51:16 [INFO] send message. msg: &{Hello. No2 1524898276123079900}
2018/04/28 15:51:16 publisher stop
$: go run main.go -rabbitmqUrl="xxx.xxx.xxx.xxx:5672"
2018/04/28 15:51:03 consumer start
2018/04/28 15:51:16 [INFO] success consumed. tag: 1, msg: &{Hello. No0 1524898276122079800}
2018/04/28 15:51:16 [INFO] success consumed. tag: 2, msg: &{Hello. No1 1524898276122579800}
2018/04/28 15:51:16 [INFO] success consumed. tag: 3, msg: &{Hello. No2 1524898276123079900}
2018/04/28 15:51:31 consumer stop
$: go run main.go -rabbitmqUrl="xxx.xxx.xxx.xxx:5672"
2018/04/28 15:51:07 consumer start
2018/04/28 15:51:16 [INFO] success consumed. tag: 1, msg: &{Hello. No0 1524898276122079800}
2018/04/28 15:51:16 [INFO] success consumed. tag: 2, msg: &{Hello. No1 1524898276122579800}
2018/04/28 15:51:16 [INFO] success consumed. tag: 3, msg: &{Hello. No2 1524898276123079900}
2018/04/28 15:51:34 consumer stop

以这样的方式进行输出。

结束

我认为RabbitMQ的教程拥有丰富的各种语言示例,这很好(因为它是官方编写的,所以更加可信)。我打算用Scala或C#来编写下一篇教程。

bannerAds