在「Confluent + RabbitMQ」中尝试对物联网数据进行流处理

步骤3:确认接收流处理后的数据。

基本概念

image.png

我們將按照以下三個步驟來逐一說明以上內容。這次我們將進行 STEP-3 的說明。
STEP-1:在 Docker 容器環境中搭建 Confluent Platform。
STEP-2:通過 RabbitMQ 作為 Broker 來進行數據接收確認。
STEP-3:確認經過流式處理後的數據接收。

本地环境 dì

macOS Big Sur 11.3
python 3.8.3
Docker 版本 20.10.7,构建 f0df350(CPU:8,内存:10GB,交换空间:1GB)

创建数据接收主题来接收流处理后的数据。

image.png

在Producer那一方创建KsqlDB Stream。

image.png

生产者通过Stream接收数据。

    在本地计算机上运行STEP-2的物联网数据生成程序。
$ python IoTSampleData-v5.py --mode mq --count 5 --wait 1
image.png

创建Consumer端的KsqlDB Stream

    1. 要使用KsqlDB查询功能从数据中提取信息,无法在Confluent Platform的Control-Center上创建,所以需要通过“ksqldb-cli”来操作“ksqldb-server”。

首先,连接到“ksqldb-cli”。

$ docker exec -it ksqldb-cli /bin/bash
[appuser@56b432e8a452 ~]$

2. 连接到“ksqldb-server”。

[appuser@56b432e8a452 ~]$ ksql http://ksqldb-server:8088
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.

                  ===========================================
                  =       _              _ ____  ____       =
                  =      | | _____  __ _| |  _ \| __ )      =
                  =      | |/ / __|/ _` | | | | |  _ \      =
                  =      |   <\__ \ (_| | | |_| | |_) |     =
                  =      |_|\_\___/\__, |_|____/|____/      =
                  =                   |_|                   =
                  =  Event Streaming Database purpose-built =
                  =        for stream processing apps       =
                  ===========================================

Copyright 2017-2020 Confluent Inc.

CLI v6.0.0, Server v6.0.0 located at http://ksqldb-server:8088

Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!

ksql> 

3. 创建包含流处理查询的流。 还将进行创建后的确认。
– 在“stream_201”中,按照以下条件提取流数据,并将结果发送到“topic-202”中创建一个名为“stream_202”的流。
– 提取条件: section=’E’ OR section=’C’ OR section=’W’

ksql> CREATE STREAM stream_202 WITH (KAFKA_TOPIC = 'topic_202', VALUE_FORMAT='JSON') AS SELECT s201.section as section, s201.time as zztime, s201.proc as proc, s201.iot_num as iot_num, s201.iot_state as iot_state, s201.vol_1 as vol_1, s201.vol_2 as vol_2 FROM stream_201 s201 WHERE section='E' OR section='C' OR section='W';

 Message                                 
-----------------------------------------
 Created query with ID CSAS_STREAM_202_0 
-----------------------------------------
ksql> 
ksql> describe extended stream_202;

Name                 : STREAM_202
Type                 : STREAM
Timestamp field      : Not set - using <ROWTIME>
Key format           : KAFKA
Value format         : JSON
Kafka topic          : topic_202 (partitions: 1, replication: 1)
Statement            : CREATE STREAM STREAM_202 WITH (KAFKA_TOPIC='topic_202', PARTITIONS=1, REPLICAS=1, VALUE_FORMAT='JSON') AS SELECT
  S201.SECTION SECTION,
  S201.TIME ZZTIME,
  S201.PROC PROC,
  S201.IOT_NUM IOT_NUM,
  S201.IOT_STATE IOT_STATE,
  S201.VOL_1 VOL_1,
  S201.VOL_2 VOL_2
FROM STREAM_201 S201
WHERE (((S201.SECTION = 'E') OR (S201.SECTION = 'C')) OR (S201.SECTION = 'W'))
EMIT CHANGES;

 Field     | Type            
-----------------------------
 SECTION   | VARCHAR(STRING) 
 ZZTIME    | VARCHAR(STRING) 
 PROC      | VARCHAR(STRING) 
 IOT_NUM   | VARCHAR(STRING) 
 IOT_STATE | VARCHAR(STRING) 
 VOL_1     | DOUBLE          
 VOL_2     | DOUBLE          
-----------------------------

Queries that write from this STREAM
-----------------------------------
CSAS_STREAM_202_0 (RUNNING) : CREATE STREAM STREAM_202 WITH (KAFKA_TOPIC='topic_202', PARTITIONS=1, REPLICAS=1, VALUE_FORMAT='JSON') AS SELECT   S201.SECTION SECTION,   S201.TIME ZZTIME,   S201.PROC PROC,   S201.IOT_NUM IOT_NUM,   S201.IOT_STATE IOT_STATE,   S201.VOL_1 VOL_1,   S201.VOL_2 VOL_2 FROM STREAM_201 S201 WHERE (((S201.SECTION = 'E') OR (S201.SECTION = 'C')) OR (S201.SECTION = 'W')) EMIT CHANGES;

For query topology and execution plan please run: EXPLAIN <QueryId>

Local runtime statistics
------------------------


(Statistics of the local KSQL server interaction with the Kafka topic topic_202)
ksql> 

4. 我们来确认一下创建的两个流。

ksql> show streams;

 Stream Name         | Kafka Topic                 | Format 
------------------------------------------------------------
 KSQL_PROCESSING_LOG | default_ksql_processing_log | JSON   
 STREAM_201          | topic_201                   | JSON   
 STREAM_202          | topic_202                   | JSON   
------------------------------------------------------------
ksql> 
image.png

在消费者端流上接收数据。

    在本地计算机上运行STEP-2的物联网数据生成程序。由于定义了抽取条件,因此会生成50条数据。
$ python IoTSampleData-v5.py --mode mq --count 50 --wait 1
image.png
image.png

使用这个方法,我们可以通过流处理(查询处理)来处理“topic_201”的数据,并且可以在“topic_202”中查看数据提取结果。

最后

经过三个步骤,我们通过IoT数据生成程序发送数据,通过RabbitMQ接收来自代理商“topic_201”的数据,并确认可以通过流处理提取数据。

本课题的步骤信息

步骤1:在Docker容器环境中构建Confluent平台
步骤2:通过RabbitMQ中间件检查数据接收情况
步骤3:检查流处理后的数据接收情况。

广告
将在 10 秒后关闭
bannerAds