事件驱动架构
今天开始,我们将转换话题,讨论使用 Kafka 进行异步处理。
我们迄今使用gRPC进行同步通信,并以常见模式将数据保存到数据库中。
另一方面,我们公司采用基于流处理平台(例如Kafka)的事件驱动架构。
我们将数据作为日志推送到队列作为Source of Truth,确保即使数据库发生故障,我们也可以通过重新读取日志来还原数据。
此外,在微服务之间的通信中,我们非常频繁地使用基于事件队列的异步方法。
大多数团队使用以Apache Kafka为基础的软件,但并没有特别规定,也有一些团队使用Benthos或其他工具。
LinkedIn, the developer of Kafka, provides a very informative article on logs and event-driven architecture, which is highly recommended for those interested. Although it is a bit lengthy, please give it a read as it offers valuable insights that can change your perspective on logs.
活动形式
我打算在事件队列中使用 protocol buffer 来保留数据,这样可以以什么样的格式保留数据呢?
即使使用协议缓冲,如果团队之间的格式各不相同,那么在希望在无论领域如何都保留共同的数据,如“事件发送时间”等时,它的实用性并不是很好。在这种情况下,拥有一个公共格式,可以将各团队自由定义的实际事件包装起来,并提供元数据等,会很方便。
由于我公司使用的格式并不是开源的,所以不能直接进行介绍,但是下面给出的是相似的感觉。
syntax = "proto3";
import "google/protobuf/timestamp.proto";
import "google/protobuf/any.proto";
message Event {
string id = 1;
google.protobuf.Timestamp timestamp = 2;
google.protobuf.Any payload = 3;
}
我們將實際事件作為有效資訊傳遞。
這次我們打算使用這份。
编译Envelope
在Makefile的protos任务中添加以下描述并进行编译。
diff --git a/Makefile b/Makefile
index 0510df4..f5b531d 100644
--- a/Makefile
+++ b/Makefile
@@ -6,15 +6,22 @@ LINKFLAGS := -X main.gitHash=$(GIT_HASH)
PROTO_DIR := ./proto
GENERATED_DIR := ./internal/pb
GENERATED_SERVICE_DIR := $(GENERATED_DIR)/service
+GENERATED_ENVELOPE_DIR := $(GENERATED_DIR)/envelope
+ENVELOPE_PROTO_MAPPINGS := Mgoogle/protobuf/timestamp.proto=github.com/gogo/protobuf/types,Mgoogle/protobuf/any.proto=github.com/gogo/protobuf/types
.PHONY: protos
protos:
- mkdir -pv $(GENERATED_DIR) $(GENERATED_SERVICE_DIR)
+ mkdir -pv $(GENERATED_DIR) $(GENERATED_SERVICE_DIR) $(GENERATED_ENVELOPE_DIR)
protoc \
-I $(PROTO_DIR) \
-I $(GOPATH)/src:$(GOPATH)/src/github.com/gogo/protobuf/protobuf \
--gogoslick_out=plugins=grpc:$(GENERATED_SERVICE_DIR) \
service.proto
+ protoc \
+ -I $(PROTO_DIR) \
+ -I $(GOPATH)/src:$(GOPATH)/src/github.com/gogo/protobuf/protobuf \
+ --gogoslick_out=paths=source_relative,$(ENVELOPE_PROTO_MAPPINGS):$(GENERATED_ENVELOPE_DIR) \
+ envelope.proto
mockgen-install:
只要无事 envelope.pb.go 文件被创建在 internal/pb/envelope 下面,就没有问题。
今天是导入的事情,到这里为止。
明天我想部署Kafka。