在本地使用Ruby尝试操作Cloud Pub/Sub
在 Cloud Pub/Sub 上玩耍。有一个模拟器可以在本地进行游玩。
我对中间件不是很了解,但我认为它类似于Apache Kafka或AWS的Kinesis Stream。
实际上,根据使用方式和成本考量,我觉得SQS更相似,但我并没有使用过Kafka或Kinesis,所以不太清楚。
有一个用于在本地环境进行确认的 Cloud Pub/Sub 的模拟器。
要启动服务器,可以使用 gcloud beta emulators pubsub start 进行执行。
如果客户端设定了环境变量PUBSUB_EMULATOR_HOST,那么连接将会被重定向到模拟器。通过执行命令eval $(gcloud beta emulators pubsub env-init),可以简便地进行设定。
当pubsub向topic发布消息后,能够将消息传递给已订阅该topic的subscriber。
subscriber接收消息有两种方式可供选择,即拉模式和推模式。
在拉模式下,subscriber需要定期向服务器请求是否有新消息可获取。
此外,还需要将处理完成的消息通知服务器。只需发送ack即可。如果不发送ack,一定时间后将重新获取事件。
当使用推送(push)模式时,系统会将消息以POST方式发送到预先注册的URL(终端点)。
类似于触发Webhook的机制。
根据发送POST请求后返回的状态码,来确定是否代替了ack确认。
我来试一试
提前启动模拟器。
请先安装 gcloud。
假设你正在使用repl进行操作。
在启动repl之前,
使用”eval $(gcloud beta emulators pubsub env-init)”命令来设置环境变量。
我会使用gcloud运行irb或者pry。
提前准备好PubSub::Service类的实例,以便方便进行作业操作。
gcloud = Gcloud.new "project-id"
pubsub = gcloud.pubsub
如果是模拟器的话,即使”project-id”是一个不存在的项目ID也没有关系。
创建一个主题
我试着创建一个名为我的主题(my-topic)的话题。
topic = pubsub.create_topic "my-topic"
以pull方式接收消息
首先,创建一个订阅(subscription)。订阅之后即可创建。
将其命名为pull-subscriber-1。
subscription1 = topic.subscribe "pull-subscriber-1"
既然机会难得,就试着做两个。
subscription2 = topic.subscribe "pull-subscriber-2"
消息已经处于可以接收的状态。
试着拉取消息。
subscription1.pull.each do |message|
p message.message.data
message.acknowledge!
end # => []
我试着发送一条消息。
topic.publish "hello"
试着再取得一次。
subscription1.pull.each do |message|
p message.message.data
message.acknowledge!
end # => [ReciveMessage的なの]
应该输出为”你好”。
也让我试试在subscription2订阅上收取。
subscription2.pull.each do |message|
p message.message.data
end # => [ReciveMessage的なの]
当输出为”hello”时,由于没有发送ack,稍等一会儿再执行会重新输出。subscription1不再收到任何消息。
通过Push方式接收消息
需要准备一个可以接收的应用程序。
启动另一个shell,并执行以下Ruby代码。
require "sinatra"
require "json"
require "base64"
post "/push" do
message = JSON.parse request.body.read
data = Base64.decode64 message["message"]["data"]
logger.info data
response.status = 204
end
$ ruby app.rb
Sinatra的默认端口为4567,因此要求进行POST的URL为http://localhost:4567/push。
当准备好后,返回REPL并创建订阅。
subscription1 = topic.subscribe "push-subscriber-1"
subscription1.endpoint = "http://localhost:4567/push"
当指定了endpoint时,它将变为推送。
(当将endpoint作为subscribe的参数传入:”http://localhost:4567/push”,但无法正常工作。看起来像是个bug。)
之后只需发布消息。
topic.publish "hoge"
topic.publish "goro"
在网络应用程序中显示”hoge”和”goro”。
如果准备好第二个push类型的订阅,消息将发送两次。
subscription2 = topic.subscribe "push-subscriber-2"
subscription2.endpoint = "http://localhost:4567/push"
topic.publish "hoge"
topic.publish "goro"
这个程序会输出两次『hoge』和两次『goro』。
概括一下
因为在本地测试不需要进行TLS设置,所以可以更容易地确认推送式的操作。当实际使用时,可以通过GCP的Web控制台创建主题(topic)和订阅(subscription)。