尝试进行 ksqlDB 快速入门
目标
由于需要接触ksqlDB,因此创建了Confluent特制的快速开始指南,并记录了相关备注。原始资料:ksqlDB快速开始。
這個中文問題讓我到達瞬間短路,但我會試試看。
相關的Qiita文章
1. 尝试进行ksqlDB快速入门
2. 创建使用ksqlDB的流查询(前半部分)
3. 创建使用ksqlDB的流查询(后半部分)
4. 在Kafka上处理MySQL表的更新信息,使用ksql处理
环境
-
- Ubuntu 20.04 (on WSL2)
-
- Confluent Platform Community Edition 6.2.1
- Kafkaクラスターは最小構成で稼働(*.propertiesは殆どいじっていない)
ksqlDB 快速入门
1. 获取 Confluent 平台
忽略不计。
2. 启动 ksqlDB 的服务器
在启动ksqlDB服务器之前,先启动zookeeper和broker。由于没有使用Docker,所以直接使用命令进行启动。没有对ksql-server.properties进行任何特殊更改。执行命令:bin/ksql-server-start etc/ksqldb/ksql-server.properties。
...
[2021-10-19 14:38:38,630] INFO Waiting until monitored service is ready for metrics collection (io.confluent.support.metrics.BaseMetricsReporter:171)
[2021-10-19 14:38:38,630] INFO Monitored service is now ready (io.confluent.support.metrics.BaseMetricsReporter:183)
[2021-10-19 14:38:38,630] INFO Attempting to collect and submit metrics (io.confluent.support.metrics.BaseMetricsReporter:142)
[2021-10-19 14:38:38,633] INFO ksqlDB API server listening on http://0.0.0.0:8088 (io.confluent.ksql.rest.server.KsqlRestApplication:375)
[2021-10-19 14:38:38,634] INFO Server up and running (io.confluent.ksql.rest.server.KsqlServerMain:90)
[2021-10-19 14:38:39,648] INFO Successfully submitted metrics to Confluent via secure endpoint (io.confluent.support.metrics.submitters.ConfluentSubmitter:146)
ksql服务器启动后,大约创建了三个用于内部处理的主题。
– 交易状态
– confluent-ksql-default__command_topic
– 默认_ksql处理日志
gen@LAPTOP-O8FG8ES2:~/confluent-6.2.1$ bin/kafka-topics --list --bootstrap-server localhost:9092
__consumer_offsets
__transaction_state
_confluent-ksql-default__command_topic
_schemas
default_ksql_processing_log
3. 启动 ksqlDB 的交互式 CLI
由于没有使用Docker,所以直接通过命令启动。
启动ksqlDB CLI。
按照上述指南,预先创建用于存储ksql日志的目录,并通过环境变量传递。
LOG_DIR=./ksql_logs bin/ksql http://localhost:8088
gen@LAPTOP-O8FG8ES2:~/confluent-6.2.1$ LOG_DIR=./ksql_logs bin/ksql http://localhost:8088
===========================================
= _ _ ____ ____ =
= | | _____ __ _| | _ \| __ ) =
= | |/ / __|/ _` | | | | | _ \ =
= | <\__ \ (_| | | |_| | |_) | =
= |_|\_\___/\__, |_|____/|____/ =
= |_| =
= Event Streaming Database purpose-built =
= for stream processing apps =
===========================================
Copyright 2017-2021 Confluent Inc.
CLI v6.2.1, Server v6.2.1 located at http://localhost:8088
Server Status: RUNNING
Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!
ksql>
我喜欢这样的ASCII艺术!(巴啊啊啊啊啊)
4. 创建一个流
创建一个名为stream的东西。类似于为kafka主题添加模式的东西。给予以下属性。
-
- 名前: streamの名前
-
- kafka-topic: このstreamに対応するtopic名
-
- value_format: topicに格納したメッセージのEncoding。JSONって書いとけばよさそう。
- partitions: topic内のパーティション数
以下是从ksql CLI创建的流的属性。
-
- 名前: riderLocations(ライダーの居場所みたいなもん)
-
- 属性値
profileId VARCHAR(プロファイルID値)
latitude DOUBLE(緯度)
longitude DOUBLE(経度)
kafka-topic: locations
value_format: json
partitions: 1
ksql> CREATE STREAM riderLocations (profileId VARCHAR, latitude DOUBLE, longitude DOUBLE)
> WITH (kafka_topic='locations', value_format='json', partitions=1);
Message
----------------
Stream created
----------------
5. 创建物化视图
创建两个表。第一个表currentLocation仅存储从riderLocation流中获取的最新位置。LATEST_BY_OFFSET作为SQL聚合函数被使用,它返回(topic/partition中)最新偏移量的值,即latitude/longitude。EMIT CHANGES表示这个SQL查询是一个推送查询,表明SQL处理将持续执行的类型。
CREATE TABLE currentLocation AS
SELECT profileId,
LATEST_BY_OFFSET(latitude) AS la,
LATEST_BY_OFFSET(longitude) AS lo
FROM riderlocations
GROUP BY profileId
EMIT CHANGES;
第二个表格riderNearMountainView有点复杂,但我们努力解读一下。
CREATE TABLE ridersNearMountainView AS
SELECT ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1) AS distanceInMiles,
COLLECT_LIST(profileId) AS riders,
COUNT(*) AS count
FROM currentLocation
GROUP BY ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1);
GEO_DISTANCE(la,lo,37.4133,-122.1162)是一个返回经纬度之间大圆距离(单位:千米)的标量函数。
将xxx四舍五入至最接近的整数,精度为零。如果精度为负数,则将数值四舍五入至小数点右侧,但我不太明白它的意思。
COLLECT_LIST(profileId)是一种聚合函数,将所有值作为数组返回。
总行数
返回一个统计函数的行数
通过指定FROM和GROUP BY的SQL语句,这似乎是要执行一个操作,即在最新时间点(currentLocation)上,按照距离(distanceInMiles)从某一地点起的每个距离返回所有骑手的个人资料名(riders)和人数(count)。顺便提一下,该地点(纬度37.4133、经度-122.1162)看起来像是加利福尼亚的MountainView。使用ksqlcli创建表没有遇到特别问题,顺利完成。
ksql> CREATE TABLE currentLocation AS
> SELECT profileId,
> LATEST_BY_OFFSET(latitude) AS la,
> LATEST_BY_OFFSET(longitude) AS lo
> FROM riderlocations
> GROUP BY profileId
> EMIT CHANGES;
Message
----------------------------------------------
Created query with ID CTAS_CURRENTLOCATION_3
----------------------------------------------
ksql> CREATE TABLE ridersNearMountainView AS
> SELECT ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1) AS distanceInMiles,
> COLLECT_LIST(profileId) AS riders,
> COUNT(*) AS count
> FROM currentLocation
> GROUP BY ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1);
Message
-----------------------------------------------------
Created query with ID CTAS_RIDERSNEARMOUNTAINVIEW_5
-----------------------------------------------------
6. 在流上运行一个推送查询
在MoutainView发出push查询来流(stream)请求。从距离山视5公里以内的位置信息开始逐个显示(Cofluent网站上使用英里表示,但GEO_DISTANCE函数默认使用公里)。由于指定了EMIT CHANGES,所以看起来会持续执行下去。
-----------------------------------------------------
ksql> -- Mountain View lat, long: 37.4133, -122.1162
ksql> SELECT * FROM riderLocations
> WHERE GEO_DISTANCE(latitude, longitude, 37.4133, -122.1162) <= 5 EMIT CHANGES;
+-----------------------------------------------------------------------------+-----------------------------------------------------------------------------+-----------------------------------------------------------------------------+
|PROFILEID |LATITUDE |LONGITUDE |
+-----------------------------------------------------------------------------+-----------------------------------------------------------------------------+-----------------------------------------------------------------------------+
7. 开启另一个命令行会话
在一个新的窗口打开ksqlCLI会话。
8. 用事件填充流
从新窗口启动的ksqlCLI向流中插入6条数据。
ksql> INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('c2309eec', 37.7877, -122.4205);
ksql> INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('18f4ea86', 37.3903, -122.0643);
ksql> INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ab5cbad', 37.3952, -122.0813);
ksql> INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('8b6eae59', 37.3944, -122.0813);
ksql> INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4a7c7b41', 37.4049, -122.0822);
ksql> INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ddad000', 37.7857, -122.4011);
通过这个操作,只有距离MountainView不超过5公里的记录会在ksqlCLI的结果中显示出来。
+-----------------------------------------------------------------------------+-----------------------------------------------------------------------------+-----------------------------------------------------------------------------+
|PROFILEID |LATITUDE |LONGITUDE |
+-----------------------------------------------------------------------------+-----------------------------------------------------------------------------+-----------------------------------------------------------------------------+
|4ab5cbad |37.3952 |-122.0813 |
|8b6eae59 |37.3944 |-122.0813 |
|4a7c7b41 |37.4049 |-122.0822 |
9. 对物化视图运行Pull查询
最后,尝试从ridersNearMountainView表中提取距离在10公里以内的记录。通过设置参数来执行SQL查询,而不是使用push方式。额外尝试提取所有记录,但是仍然不太清楚ROUND函数的舍入方式。
ksql> SET 'ksql.query.pull.table.scan.enabled'='true';
Successfully changed local property 'ksql.query.pull.table.scan.enabled' to 'true'. Use the UNSET command to revert your change.
ksql> SELECT * from ridersNearMountainView WHERE distanceInMiles <= 10;
+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+
|DISTANCEINMILES |RIDERS |COUNT |
+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+
|0.0 |[4ab5cbad, 8b6eae59, 4a7c7b41] |3 |
|10.0 |[18f4ea86] |1 |
ksql> SELECT * from ridersNearMountainView;
+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+
|DISTANCEINMILES |RIDERS |COUNT |
+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+
|0.0 |[4ab5cbad, 8b6eae59, 4a7c7b41] |3 |
|10.0 |[18f4ea86] |1 |
|50.0 |[c2309eec, 4ddad000] |2 |
总结
-
- 大元のtopicをstreamとして定義
-
- streamからtableも作れるし、tableからtableも作れる。これをmaterialized viewと呼ぶ?
-
- ksqlの実行はpush型とpull型の2つがある
-
- pushで作ったtableをpull→pullとmaterialized viewで引張ったりもできる
- 関数はかなり沢山ありそう