在Kafka中,使用Golang进行Producer和Consumer操作

最初

由于可能要使用Kafka,因此我们尝试为几种不同的编程语言编写生产者和消费者代码。这次我们选择使用Golang编写。连接到Kafka时,我们将使用sarama[https://github.com/Shopify/sarama]库。

※ 我不会对卡夫卡本身进行解释。

构成

Golang版本为1.9,sarama版本为1.16.0,VSCode版本为1.22。

产物

制片人

参考sarama的测试,我尝试从生产者开始写。sarama似乎有”SyncProducer”和”AsyncProducer”两种,但由于这次要在不同的线程中运行,所以我选择了”AsynProducer”。

// Producer部分のみ抜粋

var (
    // kafkaのアドレス
    bootstrapServers = flag.String("bootstrapServers", "localhost:9092", "kafka address")
)

// SendMessage 送信メッセージ
type SendMessage struct {
    Message   string `json:"message"`
    Timestamp int64  `json:"timestamp"`
}

func main() {
    flag.Parse()

    if *bootstrapServers == "" {
        flag.PrintDefaults()
        os.Exit(1)
    }

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)

    brokers := strings.Split(*bootstrapServers, ",")
    config := sarama.NewConfig()

    config.Producer.Return.Errors = true
    config.Producer.Return.Successes = true
    config.Producer.Retry.Max = 3

    producer, err := sarama.NewAsyncProducer(brokers, config)
    if err != nil {
        panic(err)
    }

    defer producer.AsyncClose()

    // プロデューサールーチン
    go func() {
    PRODUCER_FOR:
        for {
            time.Sleep(10000 * time.Millisecond)

            timestamp := time.Now().UnixNano()

            send := &SendMessage{
                Message:   "Hello",
                Timestamp: timestamp,
            }

            jsBytes, err := json.Marshal(send)
            if err != nil {
                panic(err)
            }

            msg := &sarama.ProducerMessage{
                Topic: "test.A",
                Key:   sarama.StringEncoder(strconv.FormatInt(timestamp, 10)),
                Value: sarama.StringEncoder(string(jsBytes)),
            }

            producer.Input() <- msg

            select {
            case <-producer.Successes():
                fmt.Println(fmt.Sprintf("success send. message: %s, timestamp: %d", send.Message, send.Timestamp))
            case err := <-producer.Errors():
                fmt.Println(fmt.Sprintf("fail send. reason: %v", err.Msg))
            case <-ctx.Done():
                break PRODUCER_FOR
            }
        }
    }()

    fmt.Println("go-kafka-example start.")

    <-signals

    fmt.Println("go-kafka-example stop.")
}

每10秒发送一次将”SendMessage”转换为JSON格式的消息。
producer.Input() <- 我觉得msg有点不舒服…。

消费者 zhě)

接下来,我将写一些消费者的内容。

// Consumer部分のみ抜粋

var (
    // kafkaのアドレス
    bootstrapServers = flag.String("bootstrapServers", "localhost:9092", "kafka address")
)

// ConsumedMessage 受信メッセージ
type ConsumedMessage struct {
    Message   string `json:"message"`
    Timestamp int64  `json:"timestamp"`
}

func main() {
    flag.Parse()

    if *bootstrapServers == "" {
        flag.PrintDefaults()
        os.Exit(1)
    }

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)

    brokers := strings.Split(*bootstrapServers, ",")
    config := sarama.NewConfig()

    config.Consumer.Return.Errors = true

    consumer, err := sarama.NewConsumer(brokers, config)
    if err != nil {
        panic(err)
    }

    defer func() {
        if err := consumer.Close(); err != nil {
            panic(err)
        }
    }()

    partition, err := consumer.ConsumePartition("test.A", 0, sarama.OffsetNewest)
    if err != nil {
        panic(err)
    }

    // コンシューマールーチン
    go func() {
    CONSUMER_FOR:
        for {
            select {
            case msg := <-partition.Messages():
                var consumed ConsumedMessage
                if err := json.Unmarshal(msg.Value, &consumed); err != nil {
                    fmt.Println(err)
                }
                fmt.Println(fmt.Sprintf("consumed message. message: %s, timestamp: %d", consumed.Message, consumed.Timestamp))
            case <-ctx.Done():
                break CONSUMER_FOR
            }
        }
    }()

    fmt.Println("go-kafka-example start.")

    <-signals

    fmt.Println("go-kafka-example stop.")
}

消費者似乎是从”Consumer”结构体中生成分区的。它等待接收消息,然后将其从JSON转换为”ConsumedMessage”。

执行

构建并执行混淆了生产者和消费者的东西(main.go),应该在控制台上输出如下内容(运行30秒后停止)。

go-kafka-example start.
success send. message: Hello, timestamp: 1523932794754645700
consumed message. message: Hello, timestamp: 1523932794754645700
success send. message: Hello, timestamp: 1523932804761916500
consumed message. message: Hello, timestamp: 1523932804761916500
success send. message: Hello, timestamp: 1523932814765186700
consumed message. message: Hello, timestamp: 1523932814765186700
go-kafka-example stop.

结束

生产者和消费者都在goroutine的封闭世界中编写,但如果想要将其导出到外部,需要巧妙地使用通道。此外,如果想要处理偏移量等问题,可能需要更复杂的操作(参考)。

添加附注

还有一个名为confluent-kafka-go的confluent库作为另一个客户端库可供选择,但是由于需要安装C库librdkafka,因此本次没有尝试。

我已经试过了,内容和sarama版本没什么大差别,所以我不会发表帖子,但是我会放置源代码的链接。
https://github.com/lightstaff/confluent-kafka-go-example

bannerAds