我在「Confluent + InfluxDB + Grafana」中尝试将IoT数据进行流式处理并可视化
第二步:确认从InfluxDB的Broker收到数据
总结
我使用Confluent Platform的”cp-all-in-one”基础构建了一个本地的Docker容器环境,并通过IoT数据生成的Python程序将数据发送给Confluent进行流式处理,最后通过InfluxDB和Grafana实时可视化。我使用了相应的Sink Connector来完成这个过程。


以下是以本次 STEP-2 为例的三个步骤,按顺序说明上述内容:
STEP-1. 在Docker容器环境中添加Confluent平台的配置
STEP-2. 在InfluxDB中确认从经纪人接收到的数据
STEP-3. 通过InfluxDB传递给Grafana进行实时数据可视化确认
本地环境
macOS Big Sur 11.3 的版本
Python 3.8.3
Docker 版本 20.10.7,构建版本 f0df350(CPU:8 个,内存:10GB,交换空间:1GB)
InfluxDB 的配置
- 在创建的本地Docker环境中,从本地终端连接到”influxdb”并检查信息。
$ docker exec -it influxdb /bin/bash
root@influxdb:/#
root@influxdb:/# uname -srn
Linux influxdb 5.10.25-linuxkit
root@influxdb:/#
root@influxdb:/# influxd version
InfluxDB v1.8.6 (git: 1.8 v1.8.6)
root@influxdb:/#
root@influxdb:/# influx -version
InfluxDB shell version: 1.8.6
root@influxdb:/#
root@influxdb:/# exit
exit
2. 创建名为「IoTSample」的数据库在「InfluxDB」中。
$ docker exec -it influxdb influx
Connected to http://localhost:8086 version 1.8.6
InfluxDB shell version: 1.8.6
>
> create database IoTSample
>
> show databases
name: databases
name
----
_internal
IoTSample
>
创建连接器

カテゴリ入力項目値Which topics do you want to get data from?topicstopic_201, topic_202How should we connect to your data?nameInfluxDBSinkConnector_1Commontasks.max1CommonValue converter classorg.apache.kafka.connect.json.JsonConverterInfluxDBInfluxDB API URL*http://influxdb:8086WriteInfluxDB DatabaseIoTSampleWriteMeasurement Name Format${topic}Additional Propertiesvalue.converter.schemas.enablefalse


在InfluxDB中进行确认
- 在本地终端上连接到”InfluxDB”,确认数据库”IoTSample”中已自动创建表”topic_201″和”topic_202″。
$ docker exec -it influxdb influx
Connected to http://localhost:8086 version 1.8.6
InfluxDB shell version: 1.8.6
>
> use IoTSample
Using database IoTSample
>
> show measurements
name: measurements
name
----
topic_201
topic_202
>
2. 在本地计算机上执行此处的物联网数据生成程序。生成50条数据。
$ python IoTSampleData-v5.py --mode mq --count 50 --wait 1
3. 然后确认每个主要运动(表格)是否已写入数据。
>
> select * from topic_201
name: topic_201
time id iot_num iot_state proc section vol_1 vol_2
---- -- ------- --------- ---- ------- ----- -----
1626241925549000000 0 226-2461 島根県 111 M 198.1845397767455 57.268514136837176
1626241926554000000 1 145-2621 宮崎県 111 A 121.70584421224085 75.71752774135611
1626241927559000000 2 852-7187 熊本県 111 A 126.41080104674175 72.15478589353621
:
:
:
1626408886875000000 47 440-1826 兵庫県 111 J 116.80492255443481 58.16002504591993
1626408887881000000 48 238-2420 長野県 111 Q 168.35017898970486 68.57868248992546
1626408888886000000 49 175-8107 高知県 111 S 132.90782936265464 86.22280607722584
>
>
> select * from topic_202
name: topic_202
time IOT_NUM IOT_STATE PROC SECTION VOL_1 VOL_2 ZZTIME
---- ------- --------- ---- ------- ----- ----- ------
1626408840741000000 327-9411 埼玉県 111 C 127.4471407715012 71.60478903611397 2021-07-16T13:13:59.693142
1626408848776000000 910-2198 兵庫県 111 E 194.46476675784402 70.33439224608537 2021-07-16T13:13:59.693268
1626408863809000000 134-0246 北海道 111 E 123.3766530207224 61.78121028564689 2021-07-16T13:13:59.693468
1626408876870000000 214-7209 埼玉県 111 C 101.40696510178657 88.97291678787539 2021-07-16T13:13:59.693639
1626408878878000000 103-2715 福島県 111 C 160.6706712489061 53.167263544338155 2021-07-16T13:13:59.693665
>
我已经确认通过Broker的主题可以正常接收到InfluxDB的数据。
接下来,我将尝试确认在Grafana中可以实时可视化此度量(表)的数据。
本课题的步骤信息
步骤1:在Docker容器环境中重新构建Confluent平台
步骤2:确认从Broker接收数据在InfluxDB中
步骤3:确认通过InfluxDB在Grafana中实时可视化数据