使用 ksqlDB 创建流查询的方法(前半部分)
目标
作为上次实施ksqlDB Quickstart的延续,我尝试创建了下面引用的流式查询。这是一个初学者在操作时的笔记,建议与原始资料一起参考。需要注意的是,直接链接到日文页面会跳转到旧版本的ksqlDB教程,所以最好注意一下。(因此,我经历了非常非常非常困难的努力,但将其全部埋藏在黑暗之中。)
使用ksqlDB(本地)对Apache Kafka®编写流式查询的原始材料。
相关的Qiita文章 Qiita
1. 尝试执行ksqlDB快速入门
2. 使用ksqlDB创建流查询(前半部分)
3. 使用ksqlDB创建流查询(后半部分)
4. 在Kafka上使用ksql处理MySQL表更新信息
环境
-
- Ubuntu 20.04 (on WSL2)
-
- Confluent Platform Community Edition 6.2.1
-
- Kafkaクラスターは最小構成で稼働(*.propertiesは殆どいじっていない)
- ksqlDB Serverに加えてSchema Registryも必要。ksql-server.propertiesの以下のコメントアウトを外しておくこと。
#------ Schema Registry -------
# Uncomment and complete the following to enable KSQL's integration to the Confluent Schema Registry:
ksql.schema.registry.url=http://localhost:8081
创建使用ksqlDB的流式查询(前半部分)
创造话题和生成数据。
只要安装了Confluent Platform,就可以使用名为ksql-datagen的应用程序连续地产生消息,并将其运行以将数据输出到ksql处理的主题中(请注意,在显示帮助信息时尝试创建/usr/logs目录失败,因此为了安全起见,我提前创建了该目录)。
这次在Quickstart中指定的是ksql-datagen预设的两个用于演示的用户和页面浏览。尽管不喜欢让它一直运行,我很快就停掉了,但在后面的ksql执行时会适时重新启动。
gen@LAPTOP-O8FG8ES2:~/confluent-6.2.1$ bin/ksql-datagen quickstart=pageviews format=json topic=pageviews msgRate=5
...
[1635213107712L] --> ([ 1635213107712L | 'User_4' | 'Page_90' ]) ts:1635213108125
[1635213108142L] --> ([ 1635213108142L | 'User_3' | 'Page_78' ]) ts:1635213108143
[1635213108145L] --> ([ 1635213108145L | 'User_7' | 'Page_50' ]) ts:1635213108145
...
gen@LAPTOP-O8FG8ES2:~/confluent-6.2.1$ bin/ksql-datagen quickstart=users format=avro topic=users msgRate=1
...
['User_6'] --> ([ 1490859483194L | 'User_6' | 'Region_4' | 'OTHER' ]) ts:1635213240163
['User_6'] --> ([ 1500448592685L | 'User_6' | 'Region_5' | 'FEMALE' ]) ts:1635213240523
['User_1'] --> ([ 1490186139886L | 'User_1' | 'Region_8' | 'FEMALE' ]) ts:1635213241484
...
使用ksqlDB CLI启动/使用SHOW和PRINT语句调查Kafka主题。
方便的是,与kafka-topics相比,输入文字的数量要少一些。
ksql> SHOW TOPICS;
Kafka Topic | Partitions | Partition Replicas
---------------------------------------------------------------
CURRENTLOCATION | 1 | 1
RIDERSNEARMOUNTAINVIEW | 1 | 1
default_ksql_processing_log | 1 | 1
locations | 1 | 1
pageviews | 1 | 1
users | 1 | 1
---------------------------------------------------------------
如果尝试使用PRINT的话,似乎是要从当前偏移量开始读取。如果停止ksql-datagen,就没有任何返回结果。我强制加上FROM BEGINNING选项尝试显示。
ksql> PRINT pageviews FROM BEGINNING;
Key format: KAFKA_BIGINT or KAFKA_DOUBLE
Value format: JSON or KAFKA_STRING
rowtime: 2021/10/26 01:51:48.125 Z, key: 1635213107712, value: {"viewtime":1635213107712,"userid":"User_4","pageid":"Page_90"}, partition: 0
rowtime: 2021/10/26 01:51:48.143 Z, key: 1635213108142, value: {"viewtime":1635213108142,"userid":"User_3","pageid":"Page_78"}, partition: 0
rowtime: 2021/10/26 01:51:48.145 Z, key: 1635213108145, value: {"viewtime":1635213108145,"userid":"User_7","pageid":"Page_50"}, partition: 0
rowtime: 2021/10/26 01:51:48.145 Z, key: 1635213108145, value: {"viewtime":1635213108145,"userid":"User_5","pageid":"Page_63"}, partition: 0
...
ksql> PRINT users FROM BEGINNING;
Key format: KAFKA_STRING
Value format: AVRO
rowtime: 2021/10/26 01:54:00.163 Z, key: User_6, value: {"registertime": 1490859483194, "userid": "User_6", "regionid": "Region_4", "gender": "OTHER"}, partition: 0
rowtime: 2021/10/26 01:54:00.523 Z, key: User_6, value: {"registertime": 1500448592685, "userid": "User_6", "regionid": "Region_5", "gender": "FEMALE"}, partition: 0
rowtime: 2021/10/26 01:54:01.484 Z, key: User_1, value: {"registertime": 1490186139886, "userid": "User_1", "regionid": "Region_8", "gender": "FEMALE"}, partition: 0
...
创建流和表
首先,针对JSON分隔的页面浏览量创建一个流(Stream),并进行描述。对于这个流,没有明确声明关键字。
CREATE STREAM pageviews_original (viewtime bigint, userid varchar, pageid varchar) WITH
(kafka_topic='pageviews', value_format='JSON');
描述结果如下。
ksql> DESCRIBE pageviews_original;
Name : PAGEVIEWS_ORIGINAL
Field | Type
----------------------------
VIEWTIME | BIGINT
USERID | VARCHAR(STRING)
PAGEID | VARCHAR(STRING)
----------------------------
接下来,针对Avro Schema分隔符的users,创建一个Table。CREATE的意思是“将从Message Key中提取的内容作为id列,并根据Message Value按照Avro Schema添加为列”,这就是它的意思。
CREATE TABLE users_original (id VARCHAR PRIMARY KEY) WITH
(kafka_topic='users', value_format='AVRO');
描述如下,表中存在一个名为id的主键,并且其他列是根据Avro模式创建的。根据先前的消息内容,可以确定ID列和USERID列具有相同的内容。
ksql> DESCRIBE users_original;
Name : USERS_ORIGINAL
Field | Type
-----------------------------------------------
ID | VARCHAR(STRING) (primary key)
REGISTERTIME | BIGINT
USERID | VARCHAR(STRING)
REGIONID | VARCHAR(STRING)
GENDER | VARCHAR(STRING)
-----------------------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;
显示数据
用户的选择结果如下。虽然它被标记为Pull Query处理,但由于存在EMIT CHANGES指定,它看起来像是Push Query。确实,没有特别操作,它正在从Topic的开头读取,但对于Table的查询是否被视为Pull Query呢?ID列中存储的值是由Message Key生成的。
ksql> SELECT * FROM users_original EMIT CHANGES LIMIT 5;
+---------------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+
|ID |REGISTERTIME |USERID |REGIONID |GENDER |
+---------------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+
|User_9 |1496749900085 |User_9 |Region_8 |OTHER |
|User_6 |1488503784213 |User_6 |Region_4 |OTHER |
|User_4 |1492836860050 |User_4 |Region_4 |MALE |
|User_7 |1505758611024 |User_7 |Region_7 |OTHER |
|User_2 |1513237144819 |User_2 |Region_9 |MALE |
Limit Reached
Query terminated
原始页面视图的SELECT结果如下。虽然被标记为Push Query处理,但除非更改环境变量(设置 ‘auto.offset.reset’=’earliest’),否则只会显示当前主题中偏移量后的消息。
ksql> SELECT * FROM pageviews_original emit changes LIMIT 3;
+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+
|VIEWTIME |USERID |PAGEID |
+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+
|1635215795084 |User_3 |Page_58 |
|1635215795419 |User_6 |Page_49 |
|1635215795419 |User_4 |Page_63 |
Limit Reached
Query terminated
写入查询
创建一个SELECT语句,通过将pageviews_original(Stream)与users_original(Table)进行LEFT JOIN来扩展用户信息。在SELECT时,使用AS关键字来覆盖的原始列应该来自Message Key。
SELECT users_original.id AS userid, pageid, regionid, gender
FROM pageviews_original
LEFT JOIN users_original
ON pageviews_original.userid = users_original.id
EMIT CHANGES
LIMIT 5;
下面是进行了JOIN操作后的查询结果,通过连接可以将某个页面的观看者信息与其所在地区的ID和性别相关联。
+------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+
|USERID |PAGEID |REGIONID |GENDER |
+------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+
|User_4 |Page_22 |Region_3 |FEMALE |
|User_2 |Page_66 |Region_2 |OTHER |
|User_7 |Page_73 |Region_9 |MALE |
|User_5 |Page_88 |Region_4 |MALE |
|User_6 |Page_99 |Region_1 |MALE |
Limit Reached
Query terminated
将这条SELECT语句持久化为流。 ON条件中至少要有一个与消息键相关的列是必需的(好像是这样)。
CREATE STREAM pageviews_enriched AS
SELECT users_original.id AS userid, pageid, regionid, gender
FROM pageviews_original
LEFT JOIN users_original
ON pageviews_original.userid = users_original.id
EMIT CHANGES;
一応也进行一下描述。
ksql> DESCRIBE pageviews_enriched;
Name : PAGEVIEWS_ENRICHED
Field | Type
-----------------------------------
USERID | VARCHAR(STRING) (key)
PAGEID | VARCHAR(STRING)
REGIONID | VARCHAR(STRING)
GENDER | VARCHAR(STRING)
-----------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;
当尝试SELECT STREAM时,得到的结果与先前的ksql相似。这个ksql会不断执行,并将结果写入到Stream(Topic)中。
ksql> SELECT * FROM pageviews_enriched EMIT CHANGES LIMIT 5;
+------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+
|USERID |PAGEID |REGIONID |GENDER |
+------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+
|User_8 |Page_71 |Region_9 |OTHER |
|User_5 |Page_47 |Region_1 |MALE |
|User_5 |Page_14 |Region_1 |MALE |
|User_2 |Page_34 |Region_2 |OTHER |
|User_2 |Page_61 |Region_2 |OTHER |
Limit Reached
Query terminated
创建一个只选出来自该Stream中的女性的Stream(pageviews_female)。
CREATE STREAM pageviews_female AS
SELECT * FROM pageviews_enriched
WHERE gender = 'FEMALE'
EMIT CHANGES;
创建一个名为Stream(pageview_female_like89)的流,该流仅选择居住在Region_8或Region_9的人。
CREATE STREAM pageviews_female_like_89
WITH (kafka_topic='pageviews_enriched_r8_r9') AS
SELECT * FROM pageviews_female
WHERE regionid LIKE '%_8' OR regionid LIKE '%_9'
EMIT CHANGES;
进行选择后,将获得以下既具有女性特点又地区限定的结果。
ksql> select * from pageviews_female_like_89 EMIT CHANGES LIMIT 5;
+------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+
|USERID |PAGEID |REGIONID |GENDER |
+------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+
|User_8 |Page_77 |Region_9 |FEMALE |
|User_7 |Page_84 |Region_9 |FEMALE |
|User_7 |Page_79 |Region_9 |FEMALE |
|User_7 |Page_33 |Region_9 |FEMALE |
接下来需要创建一个按性别和区域ID分组的表。这个CREATE命令在内部使用性别列+区域ID列作为消息键,因此需要使用KEY_FORMAT=’json’来定义允许在消息键中存在多个列。
CREATE TABLE pageviews_regions
WITH (KEY_FORMAT='json') AS
SELECT gender, regionid , COUNT(*) AS numusers
FROM pageviews_enriched
WINDOW TUMBLING (size 30 second)
GROUP BY gender, regionid
EMIT CHANGES;
描述如下,在gender/regionid方面均成为主键。在消息密钥中以JSON格式存储了两列数据。
ksql> DESCRIBE pageviews_regions;
Name : PAGEVIEWS_REGIONS
Field | Type
-------------------------------------------------------------------
GENDER | VARCHAR(STRING) (primary key) (Window type: TUMBLING)
REGIONID | VARCHAR(STRING) (primary key) (Window type: TUMBLING)
NUMUSERS | BIGINT
-------------------------------------------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;
这个CREATE的另一个重点可能是WINDOW TUMBLING(30秒大小)。根据ksql文档,它是用于执行一定时间间隔的聚合。让我们试着SELECT一下看看。
ksql> SELECT * FROM pageviews_regions EMIT CHANGES LIMIT 5;
+---------------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+
|GENDER |REGIONID |WINDOWSTART |WINDOWEND |NUMUSERS |
+---------------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+
|FEMALE |Region_1 |1635222870000 |1635222900000 |1 |
|MALE |Region_8 |1635222870000 |1635222900000 |3 |
|MALE |Region_9 |1635222870000 |1635222900000 |3 |
|MALE |Region_2 |1635222870000 |1635222900000 |5 |
|FEMALE |Region_3 |1635222870000 |1635222900000 |4 |
窗口开头列/窗口结束列被自动附加,这表示时间间隔(虽然对于时间戳的理解不太清楚)。
然后尝试发出Pull Query,但需要分别指定gender和regionid。
SELECT * FROM pageviews_regions WHERE gender='FEMALE' AND regionid='Region_4';
ksql> SELECT * FROM pageviews_regions WHERE gender='FEMALE' AND regionid='Region_4';
+---------------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+
|GENDER |REGIONID |WINDOWSTART |WINDOWEND |NUMUSERS |
+---------------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+
|FEMALE |Region_4 |1635222900000 |1635222930000 |1 |
Query terminated
顺便说一下,如果对Pull Query没有WHERE指定,会得到以下抱怨。如果确实想要执行,可以尝试指定SET ‘ksql.query.pull.table.scan.enabled’=’true’。
ksql> SELECT * FROM pageviews_regions;
Missing WHERE clause. See https://cnfl.io/queries for more info.
Add EMIT CHANGES if you intended to issue a push query.
Pull queries require a WHERE clause that:
- includes a key equality expression, e.g. `SELECT * FROM X WHERE <key-column>=Y;`.
- in the case of a multi-column key, is a conjunction of equality expressions that cover all key columns.
- (optionally) limits the time bounds of the windowed table.
Bounds on [`WINDOWSTART`, `WINDOWEND`] are supported
Supported operators are [EQUAL, GREATER_THAN, GREATER_THAN_OR_EQUAL, LESS_THAN, LESS_THAN_OR_EQUAL]
If more flexible queries are needed, table scans can be enabled by setting ksql.query.pull.table.scan.enabled=true.
也可以指定WINDOWSTART列。
SELECT NUMUSERS FROM pageviews_regions WHERE
gender='FEMALE' AND regionid='Region_1' AND WINDOWSTART=1635222870000;
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|NUMUSERS |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1 |
Query terminated
Query terminated
还可以指定WINDOWSTART/WINDOWEND的范围。
SELECT WINDOWSTART, WINDOWEND, NUMUSERS FROM pageviews_regions WHERE
gender='FEMALE' AND regionid='Region_3'
AND 1635222870000 <= WINDOWSTART
AND WINDOWSTART <= 1635222930000;
ksql>
+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+
|WINDOWSTART |WINDOWEND |NUMUSERS |
+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+
|1635222870000 |1635222900000 |4 |
Query terminated
另外,可以通过以下命令列出Stream或Table持久化查询的写入目标。
ksql> SHOW QUERIES;
Query ID | Query Type | Status | Sink Name | Sink Kafka Topic | Query String
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
CTAS_RIDERSNEARMOUNTAINVIEW_5 | PERSISTENT | RUNNING:1 | RIDERSNEARMOUNTAINVIEW | RIDERSNEARMOUNTAINVIEW | CREATE TABLE RIDERSNEARMOUNTAINVIEW WITH (KAFKA_TOPIC='RIDERSNEARMOUNTAINVIEW', PARTITIONS=1, REPLICAS=1) AS SELECT ROUND(GEO_DISTANCE(CURRENTLOCATION.LA, CURRENTLOCATION.LO, 37.4133, -122.1162), -1) DISTANCEINMILES, COLLECT_LIST(CURRENTLOCATION.PROFILEID) RIDERS, COUNT(*) COUNT FROM CURRENTLOCATION CURRENTLOCATION GROUP BY ROUND(GEO_DISTANCE(CURRENTLOCATION.LA, CURRENTLOCATION.LO, 37.4133, -122.1162), -1) EMIT CHANGES;
CSAS_PAGEVIEWS_ENRICHED_113 | PERSISTENT | RUNNING:1 | PAGEVIEWS_ENRICHED | PAGEVIEWS_ENRICHED | CREATE STREAM PAGEVIEWS_ENRICHED WITH (KAFKA_TOPIC='PAGEVIEWS_ENRICHED', PARTITIONS=1, REPLICAS=1) AS SELECT USERS_ORIGINAL.ID USERID, PAGEVIEWS_ORIGINAL.PAGEID PAGEID, USERS_ORIGINAL.REGIONID REGIONID, USERS_ORIGINAL.GENDER GENDER FROM PAGEVIEWS_ORIGINAL PAGEVIEWS_ORIGINAL LEFT OUTER JOIN USERS_ORIGINAL USERS_ORIGINAL ON ((PAGEVIEWS_ORIGINAL.USERID = USERS_ORIGINAL.ID)) EMIT CHANGES;
CSAS_PAGEVIEWS_FEMALE_115 | PERSISTENT | RUNNING:1 | PAGEVIEWS_FEMALE | PAGEVIEWS_FEMALE | CREATE STREAM PAGEVIEWS_FEMALE WITH (KAFKA_TOPIC='PAGEVIEWS_FEMALE', PARTITIONS=1, REPLICAS=1) AS SELECT * FROM PAGEVIEWS_ENRICHED PAGEVIEWS_ENRICHED WHERE (PAGEVIEWS_ENRICHED.GENDER = 'FEMALE') EMIT CHANGES;
CTAS_PAGEVIEWS_REGIONS_119 | PERSISTENT | RUNNING:1 | PAGEVIEWS_REGIONS | PAGEVIEWS_REGIONS | CREATE TABLE PAGEVIEWS_REGIONS WITH (KAFKA_TOPIC='PAGEVIEWS_REGIONS', KEY_FORMAT='json', PARTITIONS=1, REPLICAS=1) AS SELECT PAGEVIEWS_ENRICHED.GENDER GENDER, PAGEVIEWS_ENRICHED.REGIONID REGIONID, COUNT(*) NUMUSERS FROM PAGEVIEWS_ENRICHED PAGEVIEWS_ENRICHED WINDOW TUMBLING ( SIZE 30 SECONDS ) GROUP BY PAGEVIEWS_ENRICHED.GENDER, PAGEVIEWS_ENRICHED.REGIONID EMIT CHANGES;
CSAS_PAGEVIEWS_FEMALE_LIKE_89_117 | PERSISTENT | RUNNING:1 | PAGEVIEWS_FEMALE_LIKE_89 | pageviews_enriched_r8_r9 | CREATE STREAM PAGEVIEWS_FEMALE_LIKE_89 WITH (KAFKA_TOPIC='pageviews_enriched_r8_r9', PARTITIONS=1, REPLICAS=1) AS SELECT * FROM PAGEVIEWS_FEMALE PAGEVIEWS_FEMALE WHERE ((PAGEVIEWS_FEMALE.REGIONID LIKE '%_8') OR (PAGEVIEWS_FEMALE.REGIONID LIKE '%_9')) EMIT CHANGES;
CTAS_CURRENTLOCATION_3 | PERSISTENT | RUNNING:1 | CURRENTLOCATION | CURRENTLOCATION | CREATE TABLE CURRENTLOCATION WITH (KAFKA_TOPIC='CURRENTLOCATION', PARTITIONS=1, REPLICAS=1) AS SELECT RIDERLOCATIONS.PROFILEID PROFILEID, LATEST_BY_OFFSET(RIDERLOCATIONS.LATITUDE) LA, LATEST_BY_OFFSET(RIDERLOCATIONS.LONGITUDE) LO FROM RIDERLOCATIONS RIDERLOCATIONS GROUP BY RIDERLOCATIONS.PROFILEID EMIT CHANGES;
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
For detailed information on a Query run: EXPLAIN <Query ID>;
在用DESCRIBE命令加上EXTENDED选项时,可以了解该消费者组摘要处理了多少个消息。
ksql> DESCRIBE PAGEVIEWS_REGIONS EXTENDED;
Name : PAGEVIEWS_REGIONS
Type : TABLE
Timestamp field : Not set - using <ROWTIME>
Key format : JSON
Value format : JSON
Kafka topic : PAGEVIEWS_REGIONS (partitions: 1, replication: 1)
Statement : CREATE TABLE PAGEVIEWS_REGIONS WITH (KAFKA_TOPIC='PAGEVIEWS_REGIONS', KEY_FORMAT='json', PARTITIONS=1, REPLICAS=1) AS SELECT
PAGEVIEWS_ENRICHED.GENDER GENDER,
PAGEVIEWS_ENRICHED.REGIONID REGIONID,
COUNT(*) NUMUSERS
FROM PAGEVIEWS_ENRICHED PAGEVIEWS_ENRICHED
WINDOW TUMBLING ( SIZE 30 SECONDS )
GROUP BY PAGEVIEWS_ENRICHED.GENDER, PAGEVIEWS_ENRICHED.REGIONID
EMIT CHANGES;
Field | Type
-------------------------------------------------------------------
GENDER | VARCHAR(STRING) (primary key) (Window type: TUMBLING)
REGIONID | VARCHAR(STRING) (primary key) (Window type: TUMBLING)
NUMUSERS | BIGINT
-------------------------------------------------------------------
Queries that write from this TABLE
-----------------------------------
CTAS_PAGEVIEWS_REGIONS_119 (RUNNING) : CREATE TABLE PAGEVIEWS_REGIONS WITH (KAFKA_TOPIC='PAGEVIEWS_REGIONS', KEY_FORMAT='json', PARTITIONS=1, REPLICAS=1) AS SELECT PAGEVIEWS_ENRICHED.GENDER GENDER, PAGEVIEWS_ENRICHED.REGIONID REGIONID, COUNT(*) NUMUSERS FROM PAGEVIEWS_ENRICHED PAGEVIEWS_ENRICHED WINDOW TUMBLING ( SIZE 30 SECONDS ) GROUP BY PAGEVIEWS_ENRICHED.GENDER, PAGEVIEWS_ENRICHED.REGIONID EMIT CHANGES;
For query topology and execution plan please run: EXPLAIN <QueryId>
Local runtime statistics
------------------------
messages-per-sec: 0 total-messages: 102 last-message: 2021-10-26T04:35:30.3Z
(Statistics of the local KSQL server interaction with the Kafka topic PAGEVIEWS_REGIONS)
Consumer Groups summary:
Consumer Group : _confluent-ksql-default_query_CTAS_PAGEVIEWS_REGIONS_119
Kafka topic : PAGEVIEWS_ENRICHED
Max lag : 0
Partition | Start Offset | End Offset | Offset | Lag
------------------------------------------------------
0 | 0 | 319 | 319 | 0
------------------------------------------------------
Kafka topic : _confluent-ksql-default_query_CTAS_PAGEVIEWS_REGIONS_119-Aggregate-GroupBy-repartition
Max lag : 0
Partition | Start Offset | End Offset | Offset | Lag
------------------------------------------------------
0 | 175 | 175 | 175 | 0
------------------------------------------------------
总结(上半部分)
– 在将ksql处理用CREATE持久化时,我不太清楚如何正确使用Stream和Table。似乎后者需要某种键…
– 对于Pull Query和Push Query的明确定义我也不太清楚… 目前先理解EMIT CHANGES选项一般是Push Query,但从测试结果来看似乎有些不同。
– 使用SELECT WINDOW TUMBLING (size x second)可以按照x秒间隔进行窗口聚合。
– 被用作GROUP BY的列似乎会成为该Table的消息键。
因为变得相当长,所以后半部分将写在另一个文件里。