在「Confluent + RabbitMQ」中尝试对物联网数据进行流处理
步骤3:确认接收流处理后的数据。
基本概念

我們將按照以下三個步驟來逐一說明以上內容。這次我們將進行 STEP-3 的說明。
STEP-1:在 Docker 容器環境中搭建 Confluent Platform。
STEP-2:通過 RabbitMQ 作為 Broker 來進行數據接收確認。
STEP-3:確認經過流式處理後的數據接收。
本地环境 dì
macOS Big Sur 11.3
python 3.8.3
Docker 版本 20.10.7,构建 f0df350(CPU:8,内存:10GB,交换空间:1GB)
创建数据接收主题来接收流处理后的数据。

在Producer那一方创建KsqlDB Stream。

生产者通过Stream接收数据。
- 在本地计算机上运行STEP-2的物联网数据生成程序。
$ python IoTSampleData-v5.py --mode mq --count 5 --wait 1

创建Consumer端的KsqlDB Stream
-
- 要使用KsqlDB查询功能从数据中提取信息,无法在Confluent Platform的Control-Center上创建,所以需要通过“ksqldb-cli”来操作“ksqldb-server”。
首先,连接到“ksqldb-cli”。
$ docker exec -it ksqldb-cli /bin/bash
[appuser@56b432e8a452 ~]$
2. 连接到“ksqldb-server”。
[appuser@56b432e8a452 ~]$ ksql http://ksqldb-server:8088
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
===========================================
= _ _ ____ ____ =
= | | _____ __ _| | _ \| __ ) =
= | |/ / __|/ _` | | | | | _ \ =
= | <\__ \ (_| | | |_| | |_) | =
= |_|\_\___/\__, |_|____/|____/ =
= |_| =
= Event Streaming Database purpose-built =
= for stream processing apps =
===========================================
Copyright 2017-2020 Confluent Inc.
CLI v6.0.0, Server v6.0.0 located at http://ksqldb-server:8088
Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!
ksql>
3. 创建包含流处理查询的流。 还将进行创建后的确认。
– 在“stream_201”中,按照以下条件提取流数据,并将结果发送到“topic-202”中创建一个名为“stream_202”的流。
– 提取条件: section=’E’ OR section=’C’ OR section=’W’
ksql> CREATE STREAM stream_202 WITH (KAFKA_TOPIC = 'topic_202', VALUE_FORMAT='JSON') AS SELECT s201.section as section, s201.time as zztime, s201.proc as proc, s201.iot_num as iot_num, s201.iot_state as iot_state, s201.vol_1 as vol_1, s201.vol_2 as vol_2 FROM stream_201 s201 WHERE section='E' OR section='C' OR section='W';
Message
-----------------------------------------
Created query with ID CSAS_STREAM_202_0
-----------------------------------------
ksql>
ksql> describe extended stream_202;
Name : STREAM_202
Type : STREAM
Timestamp field : Not set - using <ROWTIME>
Key format : KAFKA
Value format : JSON
Kafka topic : topic_202 (partitions: 1, replication: 1)
Statement : CREATE STREAM STREAM_202 WITH (KAFKA_TOPIC='topic_202', PARTITIONS=1, REPLICAS=1, VALUE_FORMAT='JSON') AS SELECT
S201.SECTION SECTION,
S201.TIME ZZTIME,
S201.PROC PROC,
S201.IOT_NUM IOT_NUM,
S201.IOT_STATE IOT_STATE,
S201.VOL_1 VOL_1,
S201.VOL_2 VOL_2
FROM STREAM_201 S201
WHERE (((S201.SECTION = 'E') OR (S201.SECTION = 'C')) OR (S201.SECTION = 'W'))
EMIT CHANGES;
Field | Type
-----------------------------
SECTION | VARCHAR(STRING)
ZZTIME | VARCHAR(STRING)
PROC | VARCHAR(STRING)
IOT_NUM | VARCHAR(STRING)
IOT_STATE | VARCHAR(STRING)
VOL_1 | DOUBLE
VOL_2 | DOUBLE
-----------------------------
Queries that write from this STREAM
-----------------------------------
CSAS_STREAM_202_0 (RUNNING) : CREATE STREAM STREAM_202 WITH (KAFKA_TOPIC='topic_202', PARTITIONS=1, REPLICAS=1, VALUE_FORMAT='JSON') AS SELECT S201.SECTION SECTION, S201.TIME ZZTIME, S201.PROC PROC, S201.IOT_NUM IOT_NUM, S201.IOT_STATE IOT_STATE, S201.VOL_1 VOL_1, S201.VOL_2 VOL_2 FROM STREAM_201 S201 WHERE (((S201.SECTION = 'E') OR (S201.SECTION = 'C')) OR (S201.SECTION = 'W')) EMIT CHANGES;
For query topology and execution plan please run: EXPLAIN <QueryId>
Local runtime statistics
------------------------
(Statistics of the local KSQL server interaction with the Kafka topic topic_202)
ksql>
4. 我们来确认一下创建的两个流。
ksql> show streams;
Stream Name | Kafka Topic | Format
------------------------------------------------------------
KSQL_PROCESSING_LOG | default_ksql_processing_log | JSON
STREAM_201 | topic_201 | JSON
STREAM_202 | topic_202 | JSON
------------------------------------------------------------
ksql>

在消费者端流上接收数据。
- 在本地计算机上运行STEP-2的物联网数据生成程序。由于定义了抽取条件,因此会生成50条数据。
$ python IoTSampleData-v5.py --mode mq --count 50 --wait 1


使用这个方法,我们可以通过流处理(查询处理)来处理“topic_201”的数据,并且可以在“topic_202”中查看数据提取结果。
最后
经过三个步骤,我们通过IoT数据生成程序发送数据,通过RabbitMQ接收来自代理商“topic_201”的数据,并确认可以通过流处理提取数据。
本课题的步骤信息
步骤1:在Docker容器环境中构建Confluent平台
步骤2:通过RabbitMQ中间件检查数据接收情况
步骤3:检查流处理后的数据接收情况。