在Kafka中,使用Golang进行Producer和Consumer操作
最初
由于可能要使用Kafka,因此我们尝试为几种不同的编程语言编写生产者和消费者代码。这次我们选择使用Golang编写。连接到Kafka时,我们将使用sarama[https://github.com/Shopify/sarama]库。
※ 我不会对卡夫卡本身进行解释。
构成
Golang版本为1.9,sarama版本为1.16.0,VSCode版本为1.22。
产物
制片人
参考sarama的测试,我尝试从生产者开始写。sarama似乎有”SyncProducer”和”AsyncProducer”两种,但由于这次要在不同的线程中运行,所以我选择了”AsynProducer”。
// Producer部分のみ抜粋
var (
// kafkaのアドレス
bootstrapServers = flag.String("bootstrapServers", "localhost:9092", "kafka address")
)
// SendMessage 送信メッセージ
type SendMessage struct {
Message string `json:"message"`
Timestamp int64 `json:"timestamp"`
}
func main() {
flag.Parse()
if *bootstrapServers == "" {
flag.PrintDefaults()
os.Exit(1)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
brokers := strings.Split(*bootstrapServers, ",")
config := sarama.NewConfig()
config.Producer.Return.Errors = true
config.Producer.Return.Successes = true
config.Producer.Retry.Max = 3
producer, err := sarama.NewAsyncProducer(brokers, config)
if err != nil {
panic(err)
}
defer producer.AsyncClose()
// プロデューサールーチン
go func() {
PRODUCER_FOR:
for {
time.Sleep(10000 * time.Millisecond)
timestamp := time.Now().UnixNano()
send := &SendMessage{
Message: "Hello",
Timestamp: timestamp,
}
jsBytes, err := json.Marshal(send)
if err != nil {
panic(err)
}
msg := &sarama.ProducerMessage{
Topic: "test.A",
Key: sarama.StringEncoder(strconv.FormatInt(timestamp, 10)),
Value: sarama.StringEncoder(string(jsBytes)),
}
producer.Input() <- msg
select {
case <-producer.Successes():
fmt.Println(fmt.Sprintf("success send. message: %s, timestamp: %d", send.Message, send.Timestamp))
case err := <-producer.Errors():
fmt.Println(fmt.Sprintf("fail send. reason: %v", err.Msg))
case <-ctx.Done():
break PRODUCER_FOR
}
}
}()
fmt.Println("go-kafka-example start.")
<-signals
fmt.Println("go-kafka-example stop.")
}
每10秒发送一次将”SendMessage”转换为JSON格式的消息。
producer.Input() <- 我觉得msg有点不舒服…。
消费者 zhě)
接下来,我将写一些消费者的内容。
// Consumer部分のみ抜粋
var (
// kafkaのアドレス
bootstrapServers = flag.String("bootstrapServers", "localhost:9092", "kafka address")
)
// ConsumedMessage 受信メッセージ
type ConsumedMessage struct {
Message string `json:"message"`
Timestamp int64 `json:"timestamp"`
}
func main() {
flag.Parse()
if *bootstrapServers == "" {
flag.PrintDefaults()
os.Exit(1)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
brokers := strings.Split(*bootstrapServers, ",")
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
consumer, err := sarama.NewConsumer(brokers, config)
if err != nil {
panic(err)
}
defer func() {
if err := consumer.Close(); err != nil {
panic(err)
}
}()
partition, err := consumer.ConsumePartition("test.A", 0, sarama.OffsetNewest)
if err != nil {
panic(err)
}
// コンシューマールーチン
go func() {
CONSUMER_FOR:
for {
select {
case msg := <-partition.Messages():
var consumed ConsumedMessage
if err := json.Unmarshal(msg.Value, &consumed); err != nil {
fmt.Println(err)
}
fmt.Println(fmt.Sprintf("consumed message. message: %s, timestamp: %d", consumed.Message, consumed.Timestamp))
case <-ctx.Done():
break CONSUMER_FOR
}
}
}()
fmt.Println("go-kafka-example start.")
<-signals
fmt.Println("go-kafka-example stop.")
}
消費者似乎是从”Consumer”结构体中生成分区的。它等待接收消息,然后将其从JSON转换为”ConsumedMessage”。
执行
构建并执行混淆了生产者和消费者的东西(main.go),应该在控制台上输出如下内容(运行30秒后停止)。
go-kafka-example start.
success send. message: Hello, timestamp: 1523932794754645700
consumed message. message: Hello, timestamp: 1523932794754645700
success send. message: Hello, timestamp: 1523932804761916500
consumed message. message: Hello, timestamp: 1523932804761916500
success send. message: Hello, timestamp: 1523932814765186700
consumed message. message: Hello, timestamp: 1523932814765186700
go-kafka-example stop.
结束
生产者和消费者都在goroutine的封闭世界中编写,但如果想要将其导出到外部,需要巧妙地使用通道。此外,如果想要处理偏移量等问题,可能需要更复杂的操作(参考)。
添加附注
还有一个名为confluent-kafka-go的confluent库作为另一个客户端库可供选择,但是由于需要安装C库librdkafka,因此本次没有尝试。
我已经试过了,内容和sarama版本没什么大差别,所以我不会发表帖子,但是我会放置源代码的链接。
https://github.com/lightstaff/confluent-kafka-go-example