尝试进行 ksqlDB 快速入门

目标

由于需要接触ksqlDB,因此创建了Confluent特制的快速开始指南,并记录了相关备注。原始资料:ksqlDB快速开始。

這個中文問題讓我到達瞬間短路,但我會試試看。

相關的Qiita文章

1. 尝试进行ksqlDB快速入门
2. 创建使用ksqlDB的流查询(前半部分)
3. 创建使用ksqlDB的流查询(后半部分)
4. 在Kafka上处理MySQL表的更新信息,使用ksql处理

环境

    • Ubuntu 20.04 (on WSL2)

 

    • Confluent Platform Community Edition 6.2.1

 

    Kafkaクラスターは最小構成で稼働(*.propertiesは殆どいじっていない)

ksqlDB 快速入门

1. 获取 Confluent 平台

忽略不计。

2. 启动 ksqlDB 的服务器

在启动ksqlDB服务器之前,先启动zookeeper和broker。由于没有使用Docker,所以直接使用命令进行启动。没有对ksql-server.properties进行任何特殊更改。执行命令:bin/ksql-server-start etc/ksqldb/ksql-server.properties。

...
[2021-10-19 14:38:38,630] INFO Waiting until monitored service is ready for metrics collection (io.confluent.support.metrics.BaseMetricsReporter:171)
[2021-10-19 14:38:38,630] INFO Monitored service is now ready (io.confluent.support.metrics.BaseMetricsReporter:183)
[2021-10-19 14:38:38,630] INFO Attempting to collect and submit metrics (io.confluent.support.metrics.BaseMetricsReporter:142)
[2021-10-19 14:38:38,633] INFO ksqlDB API server listening on http://0.0.0.0:8088 (io.confluent.ksql.rest.server.KsqlRestApplication:375)
[2021-10-19 14:38:38,634] INFO Server up and running (io.confluent.ksql.rest.server.KsqlServerMain:90)
[2021-10-19 14:38:39,648] INFO Successfully submitted metrics to Confluent via secure endpoint (io.confluent.support.metrics.submitters.ConfluentSubmitter:146)

ksql服务器启动后,大约创建了三个用于内部处理的主题。
– 交易状态
– confluent-ksql-default__command_topic
– 默认_ksql处理日志

gen@LAPTOP-O8FG8ES2:~/confluent-6.2.1$ bin/kafka-topics --list --bootstrap-server localhost:9092
__consumer_offsets
__transaction_state
_confluent-ksql-default__command_topic
_schemas
default_ksql_processing_log

3. 启动 ksqlDB 的交互式 CLI

由于没有使用Docker,所以直接通过命令启动。
启动ksqlDB CLI。
按照上述指南,预先创建用于存储ksql日志的目录,并通过环境变量传递。
LOG_DIR=./ksql_logs bin/ksql http://localhost:8088

gen@LAPTOP-O8FG8ES2:~/confluent-6.2.1$ LOG_DIR=./ksql_logs bin/ksql http://localhost:8088

                  ===========================================
                  =       _              _ ____  ____       =
                  =      | | _____  __ _| |  _ \| __ )      =
                  =      | |/ / __|/ _` | | | | |  _ \      =
                  =      |   <\__ \ (_| | | |_| | |_) |     =
                  =      |_|\_\___/\__, |_|____/|____/      =
                  =                   |_|                   =
                  =  Event Streaming Database purpose-built =
                  =        for stream processing apps       =
                  ===========================================

Copyright 2017-2021 Confluent Inc.

CLI v6.2.1, Server v6.2.1 located at http://localhost:8088
Server Status: RUNNING

Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!

ksql>

我喜欢这样的ASCII艺术!(巴啊啊啊啊啊)

4. 创建一个流

创建一个名为stream的东西。类似于为kafka主题添加模式的东西。给予以下属性。

    • 名前: streamの名前

 

    • kafka-topic: このstreamに対応するtopic名

 

    • value_format: topicに格納したメッセージのEncoding。JSONって書いとけばよさそう。

 

    partitions: topic内のパーティション数

以下是从ksql CLI创建的流的属性。

    • 名前: riderLocations(ライダーの居場所みたいなもん)

 

    • 属性値

profileId VARCHAR(プロファイルID値)
latitude DOUBLE(緯度)
longitude DOUBLE(経度)

kafka-topic: locations
value_format: json
partitions: 1

ksql> CREATE STREAM riderLocations (profileId VARCHAR, latitude DOUBLE, longitude DOUBLE)
>  WITH (kafka_topic='locations', value_format='json', partitions=1);

 Message
----------------
 Stream created
----------------

5. 创建物化视图

创建两个表。第一个表currentLocation仅存储从riderLocation流中获取的最新位置。LATEST_BY_OFFSET作为SQL聚合函数被使用,它返回(topic/partition中)最新偏移量的值,即latitude/longitude。EMIT CHANGES表示这个SQL查询是一个推送查询,表明SQL处理将持续执行的类型。

CREATE TABLE currentLocation AS
  SELECT profileId,
         LATEST_BY_OFFSET(latitude) AS la,
         LATEST_BY_OFFSET(longitude) AS lo
  FROM riderlocations
  GROUP BY profileId
  EMIT CHANGES;

第二个表格riderNearMountainView有点复杂,但我们努力解读一下。

CREATE TABLE ridersNearMountainView AS
  SELECT ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1) AS distanceInMiles,
         COLLECT_LIST(profileId) AS riders,
         COUNT(*) AS count
  FROM currentLocation
  GROUP BY ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1);

GEO_DISTANCE(la,lo,37.4133,-122.1162)是一个返回经纬度之间大圆距离(单位:千米)的标量函数。

将xxx四舍五入至最接近的整数,精度为零。如果精度为负数,则将数值四舍五入至小数点右侧,但我不太明白它的意思。

COLLECT_LIST(profileId)是一种聚合函数,将所有值作为数组返回。

总行数
返回一个统计函数的行数

通过指定FROM和GROUP BY的SQL语句,这似乎是要执行一个操作,即在最新时间点(currentLocation)上,按照距离(distanceInMiles)从某一地点起的每个距离返回所有骑手的个人资料名(riders)和人数(count)。顺便提一下,该地点(纬度37.4133、经度-122.1162)看起来像是加利福尼亚的MountainView。使用ksqlcli创建表没有遇到特别问题,顺利完成。

ksql> CREATE TABLE currentLocation AS
>  SELECT profileId,
>         LATEST_BY_OFFSET(latitude) AS la,
>         LATEST_BY_OFFSET(longitude) AS lo
>  FROM riderlocations
>  GROUP BY profileId
>  EMIT CHANGES;

 Message
----------------------------------------------
 Created query with ID CTAS_CURRENTLOCATION_3
----------------------------------------------
ksql> CREATE TABLE ridersNearMountainView AS
>  SELECT ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1) AS distanceInMiles,
>         COLLECT_LIST(profileId) AS riders,
>         COUNT(*) AS count
>  FROM currentLocation
>  GROUP BY ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1);

 Message
-----------------------------------------------------
 Created query with ID CTAS_RIDERSNEARMOUNTAINVIEW_5
-----------------------------------------------------

6. 在流上运行一个推送查询

在MoutainView发出push查询来流(stream)请求。从距离山视5公里以内的位置信息开始逐个显示(Cofluent网站上使用英里表示,但GEO_DISTANCE函数默认使用公里)。由于指定了EMIT CHANGES,所以看起来会持续执行下去。

-----------------------------------------------------
ksql> -- Mountain View lat, long: 37.4133, -122.1162
ksql> SELECT * FROM riderLocations
>  WHERE GEO_DISTANCE(latitude, longitude, 37.4133, -122.1162) <= 5 EMIT CHANGES;
+-----------------------------------------------------------------------------+-----------------------------------------------------------------------------+-----------------------------------------------------------------------------+
|PROFILEID                                                                    |LATITUDE                                                                     |LONGITUDE                                                                    |
+-----------------------------------------------------------------------------+-----------------------------------------------------------------------------+-----------------------------------------------------------------------------+

7. 开启另一个命令行会话

在一个新的窗口打开ksqlCLI会话。

8. 用事件填充流

从新窗口启动的ksqlCLI向流中插入6条数据。

ksql> INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('c2309eec', 37.7877, -122.4205);
ksql> INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('18f4ea86', 37.3903, -122.0643);
ksql> INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ab5cbad', 37.3952, -122.0813);
ksql> INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('8b6eae59', 37.3944, -122.0813);
ksql> INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4a7c7b41', 37.4049, -122.0822);
ksql> INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ddad000', 37.7857, -122.4011);

通过这个操作,只有距离MountainView不超过5公里的记录会在ksqlCLI的结果中显示出来。

+-----------------------------------------------------------------------------+-----------------------------------------------------------------------------+-----------------------------------------------------------------------------+
|PROFILEID                                                                    |LATITUDE                                                                     |LONGITUDE                                                                    |
+-----------------------------------------------------------------------------+-----------------------------------------------------------------------------+-----------------------------------------------------------------------------+
|4ab5cbad                                                                     |37.3952                                                                      |-122.0813                                                                    |
|8b6eae59                                                                     |37.3944                                                                      |-122.0813                                                                    |
|4a7c7b41                                                                     |37.4049                                                                      |-122.0822                                                                    |

9. 对物化视图运行Pull查询

最后,尝试从ridersNearMountainView表中提取距离在10公里以内的记录。通过设置参数来执行SQL查询,而不是使用push方式。额外尝试提取所有记录,但是仍然不太清楚ROUND函数的舍入方式。

ksql> SET 'ksql.query.pull.table.scan.enabled'='true';
Successfully changed local property 'ksql.query.pull.table.scan.enabled' to 'true'. Use the UNSET command to revert your change.
ksql> SELECT * from ridersNearMountainView WHERE distanceInMiles <= 10;
+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+
|DISTANCEINMILES                                          |RIDERS                                                   |COUNT                                                    |
+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+
|0.0                                                      |[4ab5cbad, 8b6eae59, 4a7c7b41]                           |3                                                        |
|10.0                                                     |[18f4ea86]                                               |1                                                        |
ksql> SELECT * from ridersNearMountainView;
+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+
|DISTANCEINMILES                                          |RIDERS                                                   |COUNT                                                    |
+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+
|0.0                                                      |[4ab5cbad, 8b6eae59, 4a7c7b41]                           |3                                                        |
|10.0                                                     |[18f4ea86]                                               |1                                                        |
|50.0                                                     |[c2309eec, 4ddad000]                                     |2                                                        |

总结

    • 大元のtopicをstreamとして定義

 

    • streamからtableも作れるし、tableからtableも作れる。これをmaterialized viewと呼ぶ?

 

    • ksqlの実行はpush型とpull型の2つがある

 

    • pushで作ったtableをpull→pullとmaterialized viewで引張ったりもできる

 

    関数はかなり沢山ありそう
bannerAds