我在「Confluent + InfluxDB + Grafana」中尝试将IoT数据进行流式处理并可视化

第二步:确认从InfluxDB的Broker收到数据

总结

我使用Confluent Platform的”cp-all-in-one”基础构建了一个本地的Docker容器环境,并通过IoT数据生成的Python程序将数据发送给Confluent进行流式处理,最后通过InfluxDB和Grafana实时可视化。我使用了相应的Sink Connector来完成这个过程。

image.png
image.png

以下是以本次 STEP-2 为例的三个步骤,按顺序说明上述内容:
STEP-1. 在Docker容器环境中添加Confluent平台的配置
STEP-2. 在InfluxDB中确认从经纪人接收到的数据
STEP-3. 通过InfluxDB传递给Grafana进行实时数据可视化确认

本地环境

macOS Big Sur 11.3 的版本
Python 3.8.3
Docker 版本 20.10.7,构建版本 f0df350(CPU:8 个,内存:10GB,交换空间:1GB)

InfluxDB 的配置

    在创建的本地Docker环境中,从本地终端连接到”influxdb”并检查信息。
$ docker exec -it influxdb /bin/bash

root@influxdb:/# 
root@influxdb:/# uname -srn
Linux influxdb 5.10.25-linuxkit
root@influxdb:/# 
root@influxdb:/# influxd version
InfluxDB v1.8.6 (git: 1.8 v1.8.6)
root@influxdb:/# 
root@influxdb:/# influx -version
InfluxDB shell version: 1.8.6
root@influxdb:/# 
root@influxdb:/# exit
exit

2. 创建名为「IoTSample」的数据库在「InfluxDB」中。

$ docker exec -it influxdb influx

Connected to http://localhost:8086 version 1.8.6
InfluxDB shell version: 1.8.6
> 
> create database IoTSample
> 
> show databases
name: databases
name
----
_internal
IoTSample
> 

创建连接器

image.png
カテゴリ入力項目値Which topics do you want to get data from?topicstopic_201, topic_202How should we connect to your data?nameInfluxDBSinkConnector_1Commontasks.max1CommonValue converter classorg.apache.kafka.connect.json.JsonConverterInfluxDBInfluxDB API URL*http://influxdb:8086WriteInfluxDB DatabaseIoTSampleWriteMeasurement Name Format${topic}Additional Propertiesvalue.converter.schemas.enablefalse
image.png
image.png

在InfluxDB中进行确认

    在本地终端上连接到”InfluxDB”,确认数据库”IoTSample”中已自动创建表”topic_201″和”topic_202″。
$ docker exec -it influxdb influx

Connected to http://localhost:8086 version 1.8.6
InfluxDB shell version: 1.8.6
> 
> use IoTSample
Using database IoTSample
> 
> show measurements
name: measurements
name
----
topic_201
topic_202
> 

2. 在本地计算机上执行此处的物联网数据生成程序。生成50条数据。

$ python IoTSampleData-v5.py --mode mq --count 50 --wait 1

3. 然后确认每个主要运动(表格)是否已写入数据。

> 
> select * from topic_201
name: topic_201
time                id iot_num  iot_state proc section vol_1              vol_2
----                -- -------  --------- ---- ------- -----              -----
1626241925549000000 0  226-2461 島根県       111  M       198.1845397767455  57.268514136837176
1626241926554000000 1  145-2621 宮崎県       111  A       121.70584421224085 75.71752774135611
1626241927559000000 2  852-7187 熊本県       111  A       126.41080104674175 72.15478589353621
    :
    :
    :
1626408886875000000 47 440-1826 兵庫県       111  J       116.80492255443481 58.16002504591993
1626408887881000000 48 238-2420 長野県       111  Q       168.35017898970486 68.57868248992546
1626408888886000000 49 175-8107 高知県       111  S       132.90782936265464 86.22280607722584
> 
> 
> select * from topic_202
name: topic_202
time                IOT_NUM  IOT_STATE PROC SECTION VOL_1              VOL_2              ZZTIME
----                -------  --------- ---- ------- -----              -----              ------
1626408840741000000 327-9411 埼玉県       111  C       127.4471407715012  71.60478903611397  2021-07-16T13:13:59.693142
1626408848776000000 910-2198 兵庫県       111  E       194.46476675784402 70.33439224608537  2021-07-16T13:13:59.693268
1626408863809000000 134-0246 北海道       111  E       123.3766530207224  61.78121028564689  2021-07-16T13:13:59.693468
1626408876870000000 214-7209 埼玉県       111  C       101.40696510178657 88.97291678787539  2021-07-16T13:13:59.693639
1626408878878000000 103-2715 福島県       111  C       160.6706712489061  53.167263544338155 2021-07-16T13:13:59.693665
>

我已经确认通过Broker的主题可以正常接收到InfluxDB的数据。
接下来,我将尝试确认在Grafana中可以实时可视化此度量(表)的数据。

本课题的步骤信息

步骤1:在Docker容器环境中重新构建Confluent平台
步骤2:确认从Broker接收数据在InfluxDB中
步骤3:确认通过InfluxDB在Grafana中实时可视化数据

bannerAds