使用 ksqlDB 创建流查询的方法(前半部分)

目标

作为上次实施ksqlDB Quickstart的延续,我尝试创建了下面引用的流式查询。这是一个初学者在操作时的笔记,建议与原始资料一起参考。需要注意的是,直接链接到日文页面会跳转到旧版本的ksqlDB教程,所以最好注意一下。(因此,我经历了非常非常非常困难的努力,但将其全部埋藏在黑暗之中。)

使用ksqlDB(本地)对Apache Kafka®编写流式查询的原始材料。

相关的Qiita文章 Qiita

1. 尝试执行ksqlDB快速入门
2. 使用ksqlDB创建流查询(前半部分)
3. 使用ksqlDB创建流查询(后半部分)
4. 在Kafka上使用ksql处理MySQL表更新信息

环境

    • Ubuntu 20.04 (on WSL2)

 

    • Confluent Platform Community Edition 6.2.1

 

    • Kafkaクラスターは最小構成で稼働(*.propertiesは殆どいじっていない)

 

    ksqlDB Serverに加えてSchema Registryも必要。ksql-server.propertiesの以下のコメントアウトを外しておくこと。
#------ Schema Registry -------
# Uncomment and complete the following to enable KSQL's integration to the Confluent Schema Registry:
ksql.schema.registry.url=http://localhost:8081

创建使用ksqlDB的流式查询(前半部分)

创造话题和生成数据。

只要安装了Confluent Platform,就可以使用名为ksql-datagen的应用程序连续地产生消息,并将其运行以将数据输出到ksql处理的主题中(请注意,在显示帮助信息时尝试创建/usr/logs目录失败,因此为了安全起见,我提前创建了该目录)。

这次在Quickstart中指定的是ksql-datagen预设的两个用于演示的用户和页面浏览。尽管不喜欢让它一直运行,我很快就停掉了,但在后面的ksql执行时会适时重新启动。

gen@LAPTOP-O8FG8ES2:~/confluent-6.2.1$ bin/ksql-datagen quickstart=pageviews format=json topic=pageviews msgRate=5
...
[1635213107712L] --> ([ 1635213107712L | 'User_4' | 'Page_90' ]) ts:1635213108125
[1635213108142L] --> ([ 1635213108142L | 'User_3' | 'Page_78' ]) ts:1635213108143
[1635213108145L] --> ([ 1635213108145L | 'User_7' | 'Page_50' ]) ts:1635213108145
...
gen@LAPTOP-O8FG8ES2:~/confluent-6.2.1$ bin/ksql-datagen quickstart=users format=avro topic=users msgRate=1
...
['User_6'] --> ([ 1490859483194L | 'User_6' | 'Region_4' | 'OTHER' ]) ts:1635213240163
['User_6'] --> ([ 1500448592685L | 'User_6' | 'Region_5' | 'FEMALE' ]) ts:1635213240523
['User_1'] --> ([ 1490186139886L | 'User_1' | 'Region_8' | 'FEMALE' ]) ts:1635213241484
...

使用ksqlDB CLI启动/使用SHOW和PRINT语句调查Kafka主题。

方便的是,与kafka-topics相比,输入文字的数量要少一些。

ksql> SHOW TOPICS;

 Kafka Topic                 | Partitions | Partition Replicas
---------------------------------------------------------------
 CURRENTLOCATION             | 1          | 1
 RIDERSNEARMOUNTAINVIEW      | 1          | 1
 default_ksql_processing_log | 1          | 1
 locations                   | 1          | 1
 pageviews                   | 1          | 1
 users                       | 1          | 1
---------------------------------------------------------------

如果尝试使用PRINT的话,似乎是要从当前偏移量开始读取。如果停止ksql-datagen,就没有任何返回结果。我强制加上FROM BEGINNING选项尝试显示。

ksql> PRINT pageviews FROM BEGINNING;
Key format: KAFKA_BIGINT or KAFKA_DOUBLE
Value format: JSON or KAFKA_STRING
rowtime: 2021/10/26 01:51:48.125 Z, key: 1635213107712, value: {"viewtime":1635213107712,"userid":"User_4","pageid":"Page_90"}, partition: 0
rowtime: 2021/10/26 01:51:48.143 Z, key: 1635213108142, value: {"viewtime":1635213108142,"userid":"User_3","pageid":"Page_78"}, partition: 0
rowtime: 2021/10/26 01:51:48.145 Z, key: 1635213108145, value: {"viewtime":1635213108145,"userid":"User_7","pageid":"Page_50"}, partition: 0
rowtime: 2021/10/26 01:51:48.145 Z, key: 1635213108145, value: {"viewtime":1635213108145,"userid":"User_5","pageid":"Page_63"}, partition: 0
...
ksql> PRINT users FROM BEGINNING;
Key format: KAFKA_STRING
Value format: AVRO
rowtime: 2021/10/26 01:54:00.163 Z, key: User_6, value: {"registertime": 1490859483194, "userid": "User_6", "regionid": "Region_4", "gender": "OTHER"}, partition: 0
rowtime: 2021/10/26 01:54:00.523 Z, key: User_6, value: {"registertime": 1500448592685, "userid": "User_6", "regionid": "Region_5", "gender": "FEMALE"}, partition: 0
rowtime: 2021/10/26 01:54:01.484 Z, key: User_1, value: {"registertime": 1490186139886, "userid": "User_1", "regionid": "Region_8", "gender": "FEMALE"}, partition: 0
...

创建流和表

首先,针对JSON分隔的页面浏览量创建一个流(Stream),并进行描述。对于这个流,没有明确声明关键字。

CREATE STREAM pageviews_original (viewtime bigint, userid varchar, pageid varchar) WITH
    (kafka_topic='pageviews', value_format='JSON');

描述结果如下。

ksql> DESCRIBE pageviews_original;

Name                 : PAGEVIEWS_ORIGINAL
 Field    | Type
----------------------------
 VIEWTIME | BIGINT
 USERID   | VARCHAR(STRING)
 PAGEID   | VARCHAR(STRING)
----------------------------

接下来,针对Avro Schema分隔符的users,创建一个Table。CREATE的意思是“将从Message Key中提取的内容作为id列,并根据Message Value按照Avro Schema添加为列”,这就是它的意思。

CREATE TABLE users_original (id VARCHAR PRIMARY KEY) WITH
    (kafka_topic='users', value_format='AVRO');

描述如下,表中存在一个名为id的主键,并且其他列是根据Avro模式创建的。根据先前的消息内容,可以确定ID列和USERID列具有相同的内容。

ksql> DESCRIBE users_original;

Name                 : USERS_ORIGINAL
 Field        | Type
-----------------------------------------------
 ID           | VARCHAR(STRING)  (primary key)
 REGISTERTIME | BIGINT
 USERID       | VARCHAR(STRING)
 REGIONID     | VARCHAR(STRING)
 GENDER       | VARCHAR(STRING)
-----------------------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;

显示数据

用户的选择结果如下。虽然它被标记为Pull Query处理,但由于存在EMIT CHANGES指定,它看起来像是Push Query。确实,没有特别操作,它正在从Topic的开头读取,但对于Table的查询是否被视为Pull Query呢?ID列中存储的值是由Message Key生成的。

ksql> SELECT * FROM users_original EMIT CHANGES LIMIT 5;
+---------------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+
|ID                               |REGISTERTIME                     |USERID                           |REGIONID                         |GENDER                           |
+---------------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+
|User_9                           |1496749900085                    |User_9                           |Region_8                         |OTHER                            |
|User_6                           |1488503784213                    |User_6                           |Region_4                         |OTHER                            |
|User_4                           |1492836860050                    |User_4                           |Region_4                         |MALE                             |
|User_7                           |1505758611024                    |User_7                           |Region_7                         |OTHER                            |
|User_2                           |1513237144819                    |User_2                           |Region_9                         |MALE                             |
Limit Reached
Query terminated

原始页面视图的SELECT结果如下。虽然被标记为Push Query处理,但除非更改环境变量(设置 ‘auto.offset.reset’=’earliest’),否则只会显示当前主题中偏移量后的消息。

ksql> SELECT * FROM pageviews_original emit changes LIMIT 3;
+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+
|VIEWTIME                                                 |USERID                                                   |PAGEID                                                   |
+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+
|1635215795084                                            |User_3                                                   |Page_58                                                  |
|1635215795419                                            |User_6                                                   |Page_49                                                  |
|1635215795419                                            |User_4                                                   |Page_63                                                  |
Limit Reached
Query terminated

写入查询

创建一个SELECT语句,通过将pageviews_original(Stream)与users_original(Table)进行LEFT JOIN来扩展用户信息。在SELECT时,使用AS关键字来覆盖的原始列应该来自Message Key。

SELECT users_original.id AS userid, pageid, regionid, gender
    FROM pageviews_original
    LEFT JOIN users_original
      ON pageviews_original.userid = users_original.id
    EMIT CHANGES
    LIMIT 5;

下面是进行了JOIN操作后的查询结果,通过连接可以将某个页面的观看者信息与其所在地区的ID和性别相关联。

+------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+
|USERID                                    |PAGEID                                    |REGIONID                                  |GENDER                                    |
+------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+
|User_4                                    |Page_22                                   |Region_3                                  |FEMALE                                    |
|User_2                                    |Page_66                                   |Region_2                                  |OTHER                                     |
|User_7                                    |Page_73                                   |Region_9                                  |MALE                                      |
|User_5                                    |Page_88                                   |Region_4                                  |MALE                                      |
|User_6                                    |Page_99                                   |Region_1                                  |MALE                                      |
Limit Reached
Query terminated

将这条SELECT语句持久化为流。 ON条件中至少要有一个与消息键相关的列是必需的(好像是这样)。

CREATE STREAM pageviews_enriched AS
  SELECT users_original.id AS userid, pageid, regionid, gender
  FROM pageviews_original
  LEFT JOIN users_original
    ON pageviews_original.userid = users_original.id
  EMIT CHANGES;

一応也进行一下描述。

ksql> DESCRIBE pageviews_enriched;

Name                 : PAGEVIEWS_ENRICHED
 Field    | Type
-----------------------------------
 USERID   | VARCHAR(STRING)  (key)
 PAGEID   | VARCHAR(STRING)
 REGIONID | VARCHAR(STRING)
 GENDER   | VARCHAR(STRING)
-----------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;

当尝试SELECT STREAM时,得到的结果与先前的ksql相似。这个ksql会不断执行,并将结果写入到Stream(Topic)中。

ksql> SELECT * FROM pageviews_enriched EMIT CHANGES LIMIT 5;
+------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+
|USERID                                    |PAGEID                                    |REGIONID                                  |GENDER                                    |
+------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+
|User_8                                    |Page_71                                   |Region_9                                  |OTHER                                     |
|User_5                                    |Page_47                                   |Region_1                                  |MALE                                      |
|User_5                                    |Page_14                                   |Region_1                                  |MALE                                      |
|User_2                                    |Page_34                                   |Region_2                                  |OTHER                                     |
|User_2                                    |Page_61                                   |Region_2                                  |OTHER                                     |
Limit Reached
Query terminated

创建一个只选出来自该Stream中的女性的Stream(pageviews_female)。

CREATE STREAM pageviews_female AS
  SELECT * FROM pageviews_enriched
  WHERE gender = 'FEMALE'
  EMIT CHANGES;

创建一个名为Stream(pageview_female_like89)的流,该流仅选择居住在Region_8或Region_9的人。

CREATE STREAM pageviews_female_like_89
  WITH (kafka_topic='pageviews_enriched_r8_r9') AS
  SELECT * FROM pageviews_female
  WHERE regionid LIKE '%_8' OR regionid LIKE '%_9'
  EMIT CHANGES;

进行选择后,将获得以下既具有女性特点又地区限定的结果。

ksql> select * from pageviews_female_like_89 EMIT CHANGES LIMIT 5;
+------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+
|USERID                                    |PAGEID                                    |REGIONID                                  |GENDER                                    |
+------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+
|User_8                                    |Page_77                                   |Region_9                                  |FEMALE                                    |
|User_7                                    |Page_84                                   |Region_9                                  |FEMALE                                    |
|User_7                                    |Page_79                                   |Region_9                                  |FEMALE                                    |
|User_7                                    |Page_33                                   |Region_9                                  |FEMALE                                    |

接下来需要创建一个按性别和区域ID分组的表。这个CREATE命令在内部使用性别列+区域ID列作为消息键,因此需要使用KEY_FORMAT=’json’来定义允许在消息键中存在多个列。

CREATE TABLE pageviews_regions
  WITH (KEY_FORMAT='json') AS
SELECT gender, regionid , COUNT(*) AS numusers
FROM pageviews_enriched
  WINDOW TUMBLING (size 30 second)
GROUP BY gender, regionid
EMIT CHANGES;

描述如下,在gender/regionid方面均成为主键。在消息密钥中以JSON格式存储了两列数据。

ksql> DESCRIBE pageviews_regions;

Name                 : PAGEVIEWS_REGIONS
 Field    | Type
-------------------------------------------------------------------
 GENDER   | VARCHAR(STRING)  (primary key) (Window type: TUMBLING)
 REGIONID | VARCHAR(STRING)  (primary key) (Window type: TUMBLING)
 NUMUSERS | BIGINT
-------------------------------------------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;

这个CREATE的另一个重点可能是WINDOW TUMBLING(30秒大小)。根据ksql文档,它是用于执行一定时间间隔的聚合。让我们试着SELECT一下看看。

ksql> SELECT * FROM pageviews_regions EMIT CHANGES LIMIT 5;
+---------------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+
|GENDER                           |REGIONID                         |WINDOWSTART                      |WINDOWEND                        |NUMUSERS                         |
+---------------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+
|FEMALE                           |Region_1                         |1635222870000                    |1635222900000                    |1                                |
|MALE                             |Region_8                         |1635222870000                    |1635222900000                    |3                                |
|MALE                             |Region_9                         |1635222870000                    |1635222900000                    |3                                |
|MALE                             |Region_2                         |1635222870000                    |1635222900000                    |5                                |
|FEMALE                           |Region_3                         |1635222870000                    |1635222900000                    |4                                |

窗口开头列/窗口结束列被自动附加,这表示时间间隔(虽然对于时间戳的理解不太清楚)。

然后尝试发出Pull Query,但需要分别指定gender和regionid。

SELECT * FROM pageviews_regions WHERE gender='FEMALE' AND regionid='Region_4';
ksql> SELECT * FROM pageviews_regions WHERE gender='FEMALE' AND regionid='Region_4';
+---------------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+
|GENDER                           |REGIONID                         |WINDOWSTART                      |WINDOWEND                        |NUMUSERS                         |
+---------------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+
|FEMALE                           |Region_4                         |1635222900000                    |1635222930000                    |1                                |
Query terminated

顺便说一下,如果对Pull Query没有WHERE指定,会得到以下抱怨。如果确实想要执行,可以尝试指定SET ‘ksql.query.pull.table.scan.enabled’=’true’。

ksql> SELECT * FROM pageviews_regions;
Missing WHERE clause.  See https://cnfl.io/queries for more info.
Add EMIT CHANGES if you intended to issue a push query.
Pull queries require a WHERE clause that:
 - includes a key equality expression, e.g. `SELECT * FROM X WHERE <key-column>=Y;`.
 - in the case of a multi-column key, is a conjunction of equality expressions that cover all key columns.
 - (optionally) limits the time bounds of the windowed table.
         Bounds on [`WINDOWSTART`, `WINDOWEND`] are supported
         Supported operators are [EQUAL, GREATER_THAN, GREATER_THAN_OR_EQUAL, LESS_THAN, LESS_THAN_OR_EQUAL]
If more flexible queries are needed, table scans can be enabled by setting ksql.query.pull.table.scan.enabled=true.

也可以指定WINDOWSTART列。

SELECT NUMUSERS FROM pageviews_regions WHERE
  gender='FEMALE' AND regionid='Region_1' AND WINDOWSTART=1635222870000;
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|NUMUSERS                                                                                                                                                                         |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1                                                                                                                                                                                |
Query terminated
  Query terminated

还可以指定WINDOWSTART/WINDOWEND的范围。

SELECT WINDOWSTART, WINDOWEND, NUMUSERS FROM pageviews_regions WHERE
gender='FEMALE' AND regionid='Region_3' 
AND 1635222870000 <= WINDOWSTART 
AND WINDOWSTART <= 1635222930000; 
ksql> 
+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+
|WINDOWSTART                                              |WINDOWEND                                                |NUMUSERS                                                 |
+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+
|1635222870000                                            |1635222900000                                            |4                                                        |
Query terminated

另外,可以通过以下命令列出Stream或Table持久化查询的写入目标。

ksql> SHOW QUERIES;

 Query ID                          | Query Type | Status    | Sink Name                | Sink Kafka Topic         | Query String                                                                                                                                                                                                                                                                                                                                                                                                                         
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 CTAS_RIDERSNEARMOUNTAINVIEW_5     | PERSISTENT | RUNNING:1 | RIDERSNEARMOUNTAINVIEW   | RIDERSNEARMOUNTAINVIEW   | CREATE TABLE RIDERSNEARMOUNTAINVIEW WITH (KAFKA_TOPIC='RIDERSNEARMOUNTAINVIEW', PARTITIONS=1, REPLICAS=1) AS SELECT   ROUND(GEO_DISTANCE(CURRENTLOCATION.LA, CURRENTLOCATION.LO, 37.4133, -122.1162), -1) DISTANCEINMILES,   COLLECT_LIST(CURRENTLOCATION.PROFILEID) RIDERS,   COUNT(*) COUNT FROM CURRENTLOCATION CURRENTLOCATION GROUP BY ROUND(GEO_DISTANCE(CURRENTLOCATION.LA, CURRENTLOCATION.LO, 37.4133, -122.1162), -1) EMIT CHANGES;
 CSAS_PAGEVIEWS_ENRICHED_113       | PERSISTENT | RUNNING:1 | PAGEVIEWS_ENRICHED       | PAGEVIEWS_ENRICHED       | CREATE STREAM PAGEVIEWS_ENRICHED WITH (KAFKA_TOPIC='PAGEVIEWS_ENRICHED', PARTITIONS=1, REPLICAS=1) AS SELECT   USERS_ORIGINAL.ID USERID,   PAGEVIEWS_ORIGINAL.PAGEID PAGEID,   USERS_ORIGINAL.REGIONID REGIONID,   USERS_ORIGINAL.GENDER GENDER FROM PAGEVIEWS_ORIGINAL PAGEVIEWS_ORIGINAL LEFT OUTER JOIN USERS_ORIGINAL USERS_ORIGINAL ON ((PAGEVIEWS_ORIGINAL.USERID = USERS_ORIGINAL.ID)) EMIT CHANGES;                          
 CSAS_PAGEVIEWS_FEMALE_115         | PERSISTENT | RUNNING:1 | PAGEVIEWS_FEMALE         | PAGEVIEWS_FEMALE         | CREATE STREAM PAGEVIEWS_FEMALE WITH (KAFKA_TOPIC='PAGEVIEWS_FEMALE', PARTITIONS=1, REPLICAS=1) AS SELECT * FROM PAGEVIEWS_ENRICHED PAGEVIEWS_ENRICHED WHERE (PAGEVIEWS_ENRICHED.GENDER = 'FEMALE') EMIT CHANGES;                                                                                                                                                                                                                     
 CTAS_PAGEVIEWS_REGIONS_119        | PERSISTENT | RUNNING:1 | PAGEVIEWS_REGIONS        | PAGEVIEWS_REGIONS        | CREATE TABLE PAGEVIEWS_REGIONS WITH (KAFKA_TOPIC='PAGEVIEWS_REGIONS', KEY_FORMAT='json', PARTITIONS=1, REPLICAS=1) AS SELECT   PAGEVIEWS_ENRICHED.GENDER GENDER,   PAGEVIEWS_ENRICHED.REGIONID REGIONID,   COUNT(*) NUMUSERS FROM PAGEVIEWS_ENRICHED PAGEVIEWS_ENRICHED WINDOW TUMBLING ( SIZE 30 SECONDS )  GROUP BY PAGEVIEWS_ENRICHED.GENDER, PAGEVIEWS_ENRICHED.REGIONID EMIT CHANGES;                                           
 CSAS_PAGEVIEWS_FEMALE_LIKE_89_117 | PERSISTENT | RUNNING:1 | PAGEVIEWS_FEMALE_LIKE_89 | pageviews_enriched_r8_r9 | CREATE STREAM PAGEVIEWS_FEMALE_LIKE_89 WITH (KAFKA_TOPIC='pageviews_enriched_r8_r9', PARTITIONS=1, REPLICAS=1) AS SELECT * FROM PAGEVIEWS_FEMALE PAGEVIEWS_FEMALE WHERE ((PAGEVIEWS_FEMALE.REGIONID LIKE '%_8') OR (PAGEVIEWS_FEMALE.REGIONID LIKE '%_9')) EMIT CHANGES;                                                                                                                                                             
 CTAS_CURRENTLOCATION_3            | PERSISTENT | RUNNING:1 | CURRENTLOCATION          | CURRENTLOCATION          | CREATE TABLE CURRENTLOCATION WITH (KAFKA_TOPIC='CURRENTLOCATION', PARTITIONS=1, REPLICAS=1) AS SELECT   RIDERLOCATIONS.PROFILEID PROFILEID,   LATEST_BY_OFFSET(RIDERLOCATIONS.LATITUDE) LA,   LATEST_BY_OFFSET(RIDERLOCATIONS.LONGITUDE) LO FROM RIDERLOCATIONS RIDERLOCATIONS GROUP BY RIDERLOCATIONS.PROFILEID EMIT CHANGES;                                                                                                       
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
For detailed information on a Query run: EXPLAIN <Query ID>;

在用DESCRIBE命令加上EXTENDED选项时,可以了解该消费者组摘要处理了多少个消息。

ksql> DESCRIBE PAGEVIEWS_REGIONS EXTENDED;

Name                 : PAGEVIEWS_REGIONS
Type                 : TABLE
Timestamp field      : Not set - using <ROWTIME>
Key format           : JSON
Value format         : JSON
Kafka topic          : PAGEVIEWS_REGIONS (partitions: 1, replication: 1)
Statement            : CREATE TABLE PAGEVIEWS_REGIONS WITH (KAFKA_TOPIC='PAGEVIEWS_REGIONS', KEY_FORMAT='json', PARTITIONS=1, REPLICAS=1) AS SELECT
  PAGEVIEWS_ENRICHED.GENDER GENDER,
  PAGEVIEWS_ENRICHED.REGIONID REGIONID,
  COUNT(*) NUMUSERS
FROM PAGEVIEWS_ENRICHED PAGEVIEWS_ENRICHED
WINDOW TUMBLING ( SIZE 30 SECONDS )
GROUP BY PAGEVIEWS_ENRICHED.GENDER, PAGEVIEWS_ENRICHED.REGIONID
EMIT CHANGES;

 Field    | Type
-------------------------------------------------------------------
 GENDER   | VARCHAR(STRING)  (primary key) (Window type: TUMBLING)
 REGIONID | VARCHAR(STRING)  (primary key) (Window type: TUMBLING)
 NUMUSERS | BIGINT
-------------------------------------------------------------------

Queries that write from this TABLE
-----------------------------------
CTAS_PAGEVIEWS_REGIONS_119 (RUNNING) : CREATE TABLE PAGEVIEWS_REGIONS WITH (KAFKA_TOPIC='PAGEVIEWS_REGIONS', KEY_FORMAT='json', PARTITIONS=1, REPLICAS=1) AS SELECT   PAGEVIEWS_ENRICHED.GENDER GENDER,   PAGEVIEWS_ENRICHED.REGIONID REGIONID,   COUNT(*) NUMUSERS FROM PAGEVIEWS_ENRICHED PAGEVIEWS_ENRICHED WINDOW TUMBLING ( SIZE 30 SECONDS )  GROUP BY PAGEVIEWS_ENRICHED.GENDER, PAGEVIEWS_ENRICHED.REGIONID EMIT CHANGES;

For query topology and execution plan please run: EXPLAIN <QueryId>

Local runtime statistics
------------------------
messages-per-sec:         0   total-messages:       102     last-message: 2021-10-26T04:35:30.3Z

(Statistics of the local KSQL server interaction with the Kafka topic PAGEVIEWS_REGIONS)

Consumer Groups summary:

Consumer Group       : _confluent-ksql-default_query_CTAS_PAGEVIEWS_REGIONS_119

Kafka topic          : PAGEVIEWS_ENRICHED
Max lag              : 0

 Partition | Start Offset | End Offset | Offset | Lag
------------------------------------------------------
 0         | 0            | 319        | 319    | 0
------------------------------------------------------

Kafka topic          : _confluent-ksql-default_query_CTAS_PAGEVIEWS_REGIONS_119-Aggregate-GroupBy-repartition
Max lag              : 0

 Partition | Start Offset | End Offset | Offset | Lag
------------------------------------------------------
 0         | 175          | 175        | 175    | 0
------------------------------------------------------

总结(上半部分)

– 在将ksql处理用CREATE持久化时,我不太清楚如何正确使用Stream和Table。似乎后者需要某种键…
– 对于Pull Query和Push Query的明确定义我也不太清楚… 目前先理解EMIT CHANGES选项一般是Push Query,但从测试结果来看似乎有些不同。
– 使用SELECT WINDOW TUMBLING (size x second)可以按照x秒间隔进行窗口聚合。
– 被用作GROUP BY的列似乎会成为该Table的消息键。

因为变得相当长,所以后半部分将写在另一个文件里。

bannerAds