在kafka的Ruby客户端phobos中,快速启动消费者(*也可作为生产者*)

我想要更轻松地处理Kafka,所以我在寻找一些好用的Rubygems。但是常常被推荐的RubyGems大多数都是与Rails的集成前提相关的。

只是想要订阅主题并轻松地进行分发,这个目的可能有点繁重。

在这其中,我找到了Phobos。只需几行代码就能创建一个Consumer,感觉非常开心。

    phobos/phobos: Simplifying Kafka for ruby apps

这样的话,我甚至可以像写脚本一样立即做出反应。所以我来介绍一下。

准备用于测试的Kafka-broker。

从这里开始引用,并在本地运行kafka-broker。

    https://docs.confluent.io/current/installation/docker/docs/config-reference.html

稍微调整一下端口和环境变量。

和动物园管理员一起,

$ docker run -d --rm \
  -p 2181:2181 \
  --name=zookeeper \
  -e ZOOKEEPER_CLIENT_PORT=2181 \
  -e ZOOKEEPER_TICK_TIME=2000 \
  -e ZOOKEEPER_SYNC_LIMIT=2 \
  confluentinc/cp-zookeeper:5.1.0

我是经纪人。

$ docker run -d --rm \
  -p 9092:9092 \
  --name=kafka \
  --link=zookeeper \
  -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
  -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 \
  -e KAFKA_BROKER_ID=1 \
  -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
  -e KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE=false \
  confluentinc/cp-kafka:5.1.0

如果使用docker-compose的话,会是这样的感觉。

---
version: "3.1"
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.1.0
    ports:
      - 2181:2181
    environment:
      - ZOOKEEPER_CLIENT_PORT=2181
      - ZOOKEEPER_TICK_TIME=2000
      - ZOOKEEPER_SYNC_LIMIT=2
  kafka:
    image: confluentinc/cp-kafka:5.1.0
    ports:
      - 9092:9092
    links:
      - zookeeper
    environment:
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
      - KAFKA_BROKER_ID=2
      - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
      - KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE=false

现在,我们已经准备好了kafka-broker,让我们来谈谈phobos吧。

Phobos的安装设置

安装Rubygems的phobos。

$ bundle init
$ echo "gem 'phobos'" >> Gemfile 
$ bundle install --binstubs --path vendor/bundle

我会确认是否会出现帮助提示。

$ ./bin/phobos 
Commands:
  phobos help [COMMAND]  # Describe available commands or one specific command
  phobos init            # Initialize your project with Phobos
  phobos start           # Starts Phobos
  phobos version         # Outputs the version number. Can be used with: phobos -v or phobos --version

让我们使用phobos init来准备初始文件。

$ ./bin/phobos init
      create  config/phobos.yml
      create  phobos_boot.rb

在这个时候,你已经非常友善了。

请简单确认一下phobos的运行情况。

将整体配置写入config/phobos.yml。

    • 接続情報などグローバルな設定

 

    • Producer/Consumerのデフォルト設定

 

    Consumerの購読先(複数可)とリアクション

我稍微看一下由init精心制作的config/phobos.yml文件。

listeners:
  - handler: Phobos::EchoHandler
    topic: test

]测试’这个主题的订阅,定义了一个内置处理程序Phobos::EchoHandler来处理。

如果kafka-broker已在本地启动,就不需要进行任何更改,请尝试使用phobos start来启动。

$ ./bin/phobos start
______ _           _
| ___ \ |         | |
| |_/ / |__   ___ | |__   ___  ___
|  __/| '_ \ / _ \| '_ \ / _ \/ __|
| |   | | | | (_) | |_) | (_) \__ \
\_|   |_| |_|\___/|_.__/ \___/|___/

phobos_boot.rb - find this file at /Users/sawanoboriyu/develop/src/sandbox/qiita_ruby-phobos/phobos_boot.rb

[2019-02-03T18:11:59:345+0900Z] INFO  -- Phobos : <Hash> {:message=>"Phobos configured", :env=>"N/A"}
[2019-02-03T18:11:59:374+0900Z] INFO  -- Phobos : <Hash> {:message=>"Listener started", :listener_id=>"442f72", :group_id=>"test-1", :topic=>"test", :handler=>"Phobos::EchoHandler"}

你想尝试一些东西吗?这是从最后一行可以看出来,作为一个关于”test”主题的消费者在运作。

让我们使用kafkacat进行数据流入。我们可以使用Homebrew之类的工具来安装它。

    edenhill/kafkacat: Generic command line non-JVM Apache Kafka producer and consumer

实际上,kafkacat已经可以作为一个休闲的消费者进行操作,不过暂时先不管它。

$ echo aa | kafkacat -P -b localhost -t test

Phobos::EchoHandler是一个简单的处理程序,它只是将接收到的消息记录在日志中。确实收到了消息:message=>”aa”。

$ ./bin/phobos start
______ _           _
| ___ \ |         | |
| |_/ / |__   ___ | |__   ___  ___
|  __/| '_ \ / _ \| '_ \ / _ \/ __|

# -- snip --

[2019-02-03T18:12:14:481+0900Z] INFO  -- Phobos : <Hash> {:message=>"aa", :listener_id=>"442f72", :group_id=>"test-1", :topic=>"test", :handler=>"Phobos::EchoHandler", :key=>nil, :partition=>0, :offset=>0, :retry_count=>0}

创建处理程序来处理消息。

我们来看一下由 phobos init 创建的另一个文件,phobos_boot.rb。

只能放置不管。

# Use this file to load your code
puts <<~ART
  ______ _           _
  | ___ \\ |         | |
  | |_/ / |__   ___ | |__   ___  ___
  |  __/| '_ \\ / _ \\| '_ \\ / _ \\/ __|
  | |   | | | | (_) | |_) | (_) \\__ \\
  \\_|   |_| |_|\\___/|_.__/ \\___/|___/
ART
puts "
phobos_boot.rb - find this file at #{File.expand_path(__FILE__)}

"

这是phobos默认引用的入口点。如果想要做些什么,可以继续在这里添加。

暂时将以下代码作为替代Phobos::Handler使用的处理器写入,请确保包含#consume方法即可。我已经这样写了一个处理器,名为MyHandler,它只是将内容打印到标准输出中。

class MyHandler
  include Phobos::Handler

  def consume(payload, metadata)
    puts 'your message is ' + payload
  end
end

将在config/phobos.yml文件中更改配置以使用此处理程序。

listeners:
  - handler: MyHandler
    # handler: Phobos::EchoHandler
    topic: test

我会启动Phobos,并且试着在背后注入消息。

$ ./bin/phobos start
______ _           _
| ___ \ |         | |
| |_/ / |__   ___ | |__   ___  ___
|  __/| '_ \ / _ \| '_ \ / _ \/ __|
| |   | | | | (_) | |_) | (_) \__ \
\_|   |_| |_|\___/|_.__/ \___/|___/

phobos_boot.rb - find this file at /Users/sawanoboriyu/develop/src/sandbox/qiita_ruby-phobos/phobos_boot.rb

[2019-02-03T18:45:52:276+0900Z] INFO  -- Phobos : <Hash> {:message=>"Phobos configured", :env=>"N/A"}
[2019-02-03T18:45:52:287+0900Z] INFO  -- Phobos : <Hash> {:message=>"Listener started", :listener_id=>"b59783", :group_id=>"test-1", :topic=>"test", :handler=>"MyHandler"}

# 裏で `echo aa | kafkacat -P -b localhost -t test`

your message is aa

太好了。

附带一提,如果包括Phobos::Producer,您可以将消息流传到任何主题。这也很简单,请试试吧。

这个例外怎么样呢?

当Consumer处理时出现异常时,将使用简单的Backoff算法进行随机等待后进行重试。
但是,由于ConsumerGroup的机制,它好像记得处理了多少内容,导致对此消息的重试会一直重复。
因此,我们需要快速地将日志记录下来并结束,或者将其通知或将有效负载发送到用于重试的主题等,以便能够处理下一个任务。

结束

当我发现kafka如此易于操作时,我感觉它变得更加贴近我了。直觉上说,它的轻松性与redis相当。对于将其嵌入到现有的某个东西中并以库的方式使用,我认为使用方式类似于amqp + bunny。

Phobos本身能够通过一个参数在线程中进行并行处理,也可以单独处理多个监听器,非常方便。此外,它拥有很好的group_id和kafka机制,因此可以轻松进行扩展。