使用Kafka Connect将MQTT数据注册
由于关于Kafka Connect的文章较少,所以这次我花了很多时间来研究。Kafka Connect是最近附带在Kafka中的一个工具,使数据能够无需编写代码就能流入Kafka。在将数据流入Kafka时,使用了名为Arvo的模式和序列化器。由于我错误地以为它是无模式的,所以在进行修复时遇到了相当大的困难。
安装Apache Kafka
在安装 Kafka 时,Docker 是非常方便的,如果与 Zookeeper 等一起部署,能够轻松实现扩展。顺便提一下,据我调查,Confluent 和 LANDOOP 在 Kafka 行业中都非常知名,它们都开发了与 Kafka 相关的工具,并提供 PaaS 等服务。
本次我们将使用LANDOOP公司在Docker Hub上提供的fast-data-dev。
虽然也可以单独安装Kafka,但由于我们要使用Connect和Schema Registry,所以使用这个包非常方便。
顺便一提,这个包已经预装了称为Landoop Stream Reactor 25+ Connectors的连接器。虽然也可以使用第三方连接器,但我打算直接使用这个。
在Docke-Hub的网站上,有docker命令的示例,但在这里我们将使用docker-compose。请根据环境使用environment、volumes。可以使用docker-compose up -d命令来启动。
version: '2'
services:
landoop:
image: "landoop/fast-data-dev"
network_mode: "host"
container_name: "landoop"
mem_limit: 3G
environment:
ADV_HOST: "EXPORTするIPアドレス"
USER: "Basic認証のユーザ名"
PASSWORD: "Basic認証のパスワード"
volumes:
- /path/to/3rd/Party/Connector/Jars:/connectors
ports:
- "2181:2181"
- "3030:3030"
- "8081-8083:8081-8083"
- "9581-9585:9581-9585"
- "9092:9092"
MQTT的准备工作
使用Kafka Connect时,如果订阅和注册的数据模式不同,可能会导致连接失败,因此应该事先进行适当的格式调整。
在这里,我们将负载设置为JSON格式。这样一来,只需要使用Stream Reactor Connectors就可以将数据注册到Kafka中。您可以事先准备模式文件,并在连接器注册时指定,如果没有指定则会自动生成。Avro模式文件会被注册到模式注册表中,由于是JSON格式,所以您一看就明白了吧。

顺便说一句,MQTT拥有许多不同的主题名称,可以使用#或+等通配符进行灵活的订阅;而当注册到Kafka时,主题基本上只能是一个摄取主题。即使写了通配符也会被忽略。
通过Stream Reactor Connectors实现MQTT输入
根据前面提到的,我将尝试使用LANDOOP公司的原厂工具。虽然有说明书,但需要注意以免卡住。
如果按照正常的方式操作,可能会从UI的CONNECTOR链接中生成到http://localhost:3030之类的地址,但由于要在参数中输入名为KCQL的DSL,因此无法输入。这太荒谬了。

在中文中,注册的方法是使用CURL最好。例如,如下所示。如果使用上述的docker-compose.yml文件,Schema Registry会在8083端口处启动。我们要指定那个端口。MQTT Broker是假设在本地主机上运行的。
curl -X POST http://localhost:8083/connectors -H 'Content-Type: application/json' -H 'Accept: application/json' -d '{
"name":"mqtt-kafka",
"config":
{"connector.class": "com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector",
"tasks.max": "1",
"connect.mqtt.kcql": "INSERT INTO sensors SELECT * FROM /sensors WITHCONVERTER=`com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter`",
"connect.mqtt.service.quality": "0",
"connect.mqtt.password": "ブローカーのパスワード",
"connect.mqtt.username": "ユーザ名",
"connect.mqtt.hosts": "tcp://localhost:1883",
"connect.mqtt.converter.throw.on.error": "true"
}'
使用的输入JSON需要有name和config字段,并且后者中嵌套了各种参数的描述。
connect.mqtt.kcql是一个类似SQL的DSL。使用这种语法,将尝试使用名为/sensors的主题负载,并通过使用名为sensors的键将其注册到Kafka。在此过程中,将使用名为JsonSimpleConverter的解析器来处理MQTT的负载。
顺便提一下,也有一个叫做CONNECT CLI的工具可以使用,看起来也不错。
如果在注册时遇到问题,可以访问http://localhost:3030/logs/connect-distributed.log来查看错误日志。
使用自己制作的连接器
在fast-data-dev的情况下,如果在/connector路径下安装自定义连接器,则似乎会将其注册到类路径中。关于这一点,这个网站可以作为参考。