使用 ksqlDB 创建流式查询(后半部分)
目标
继续前半部分,试着实施以下在下述来源中的“使用嵌套模式(STRUCT)在 ksqlDB 中”的部分。(由于那个链接已经过时,我已经经历了无法形容的痛苦,全部都埋葬在黑暗之中了。)
使用ksqlDB(本地)针对Apache Kafka®编写流查询。
相关的Qiita文章
1. 尝试进行ksqlDB快速入门
2. 创建使用ksqlDB的流查询(前半部分)
3. 创建使用ksqlDB的流查询(后半部分)
4. 在Kafka上使用ksql处理MySQL表的更新信息
使用ksqlDB创建流查询(后半部分)。
在 ksqlDB 中使用嵌套模式(STRUCT)
这次Stream/Table的原始数据源是ksql-datagen的一个预设订单。
bin/ksql-datagen quickstart=orders format=avro topic=orders msgRate=1
ksql-datagen生成的日志输出如下:
[0] --> ([ 1504174143763L | 0 | 'Item_878' | 4.036972663514597 | Struct{city=City_67,state=State_25,zipcode=71437} ]) ts:1635225586518
[1] --> ([ 1509355271065L | 1 | 'Item_670' | 9.154643686738641 | Struct{city=City_32,state=State_25,zipcode=81180} ]) ts:1635225586796
[2] --> ([ 1487743369101L | 2 | 'Item_682' | 9.913937197671148 | Struct{city=City_99,state=State_25,zipcode=99831} ]) ts:1635225587794
[3] --> ([ 1514028266544L | 3 | 'Item_915' | 7.027966334147111 | Struct{city=City_68,state=State_78,zipcode=47239} ]) ts:1635225588795
[4] --> ([ 1500111527090L | 4 | 'Item_533' | 9.268916469646818 | Struct{city=City_71,state=State_72,zipcode=15953} ]) ts:1635225589795
[5] --> ([ 1492193039132L | 5 | 'Item_294' | 7.781222807562362 | Struct{city=City_46,state=State_66,zipcode=29977} ]) ts:1635225590795
[6] --> ([ 1495043772684L | 6 | 'Item_322' | 1.2480686742580827 | Struct{city=City_86,state=State_69,zipcode=18948} ]) ts:1635225591795
[7] --> ([ 1505289388278L | 7 | 'Item_233' | 0.15332131177604907 | Struct{city=City_29,state=State_98,zipcode=64327} ]) ts:1635225592795
[8] --> ([ 1515383680311L | 8 | 'Item_521' | 6.810614048446687 | Struct{city=City_63,state=State_28,zipcode=32247} ]) ts:1635225593795
[9] --> ([ 1515133573377L | 9 | 'Item_981' | 4.464408695223682 | Struct{city=City_81,state=State_13,zipcode=87019} ]) ts:1635225594795
[10] --> ([ 1506411308569L | 10 | 'Item_366' | 4.2732457996601845 | Struct{city=City_29,state=State_95,zipcode=93103} ]) ts:1635225595795
[11] --> ([ 1492697874803L | 11 | 'Item_688' | 4.155429455625357 | Struct{city=City_77,state=State_47,zipcode=48054} ]) ts:1635225596795
[12] --> ([ 1498066277947L | 12 | 'Item_438' | 8.852426005487066 | Struct{city=City_35,state=State_52,zipcode=36616} ]) ts:1635225597795
通过Schema Registry的日志查找主题名称,然后使用以下API来查找架构。我已经向先辈学习了jq的使用方法。
curl -X GET http://localhost:8081/subjects/orders-value/versions/1 | jq .schema | sed -e 's/^"//' -e 's/"$//' -e 's/\\//g' | jq .
根据Avro Schema的结构,可以看到KsqlDataSourceSchema记录中有一个名为KsqlDataSourceSchema_address的记录,它似乎包含city、state和zipcode这些字段。
{
"type": "record",
"name": "KsqlDataSourceSchema",
"namespace": "io.confluent.ksql.avro_schemas",
"fields": [
{
"name": "ordertime",
"type": [
"null",
"long"
],
"default": null
},
{
"name": "orderid",
"type": [
"null",
"int"
],
"default": null
},
{
"name": "itemid",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "orderunits",
"type": [
"null",
"double"
],
"default": null
},
{
"name": "address",
"type": [
"null",
{
"type": "record",
"name": "KsqlDataSourceSchema_address",
"fields": [
{
"name": "city",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "state",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "zipcode",
"type": [
"null",
"long"
],
"default": null
}
]
}
],
"default": null
}
]
}
通过ksqlDB CLI的PRINT命令来观察消息。可以看到消息键中包含了orderid的原始值,并且在address元素中嵌套了city、state和zipcode。
Key format: KAFKA_INT or KAFKA_STRING
Value format: AVRO
rowtime: 2021/10/26 05:19:46.518 Z, key: 0, value: {"ordertime": 1504174143763, "orderid": 0, "itemid": "Item_878", "orderunits": 4.036972663514597, "address": {"city": "City_67", "state": "State_25", "zipcode": 71437}}, partition: 0
...
在下一个要发布的CREATE中,我对其含义感到困惑,但我理解这只是一个示例,它展示了如何手动声明嵌套结构的地址元素,而没有利用到宝贵的Avro Schema。
CREATE STREAM ORDERS
(
ORDERTIME BGINT,
ORDERID INT,
ITEMID STRING,
ORDERUNITS DOUBLE,
ADDRESS STRUCT<CITY STRING, STATE STRING, ZIPCODE BIGINT>
)
WITH (KAFKA_TOPIC='orders', VALUE_FORMAT='avro');
描述的结果如下所示。
ksql> DESCRIBE ORDERS;
Name : ORDERS
Field | Type
----------------------------------------------------------------------------------
ORDERTIME | BIGINT
ORDERID | INTEGER
ITEMID | VARCHAR(STRING)
ORDERUNITS | DOUBLE
ADDRESS | STRUCT<CITY VARCHAR(STRING), STATE VARCHAR(STRING), ZIPCODE BIGINT>
----------------------------------------------------------------------------------
实际上,您还可以直接从Avro Schema读取模式结构。由于是流式的,因此并不总要求指定键。
CREATE STREAM ORDERS WITH (KAFKA_TOPIC='orders', VALUE_FORMAT='avro');
用这个方法创建的时候,当进行DESCRIBE时,能够正确地从AVRO Schema中提取嵌套结构。
ksql> DESCRIBE ORDERS;
Name : ORDERS
Field | Type
----------------------------------------------------------------------------------
ORDERTIME | BIGINT
ORDERID | INTEGER
ITEMID | VARCHAR(STRING)
ORDERUNITS | DOUBLE
ADDRESS | STRUCT<CITY VARCHAR(STRING), STATE VARCHAR(STRING), ZIPCODE BIGINT>
----------------------------------------------------------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;
Stream的选择操作也成功了。当想要收集嵌套子元素时,在ASCII艺术箭头->之前加上它是很有趣的。
SELECT ORDERID, ADDRESS->CITY FROM ORDERS EMIT CHANGES LIMIT 5;
+---------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------+
|ORDERID |CITY |
+---------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------+
|0 |City_75 |
|1 |City_29 |
|2 |City_75 |
|3 |City_24 |
|4 |City_19 |
Limit Reached
Query terminated
将以下内容插入到数据库中:
根据惯例,使用ksql-datagen工具创建两个基于预设orders的Topic。
bin/ksql-datagen quickstart=orders format=json topic=orders_local msgRate=2
bin/ksql-datagen quickstart=orders format=json topic=orders_3rdparty msgRate=2
使用ksqlDB CLI查看内容时,可以看到它以JSON格式存储。(虽然和Avro外观相同)
ksql> PRINT orders_local FROM BEGINNING;
Key format: KAFKA_INT or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2021/10/26 05:37:41.369 Z, key: 0, value: {"ordertime":1503864575926,"orderid":0,"itemid":"Item_434","orderunits":4.2864657664768755,"address":{"city":"City_69","state":"State_35","zipcode":46591}}, partition: 0
....
接下来,我们将创建与这些主题相关的流。
CREATE STREAM ORDERS_SRC_LOCAL
(
ORDERTIME BIGINT,
ORDERID INT,
ITEMID STRING,
ORDERUNITS DOUBLE,
ADDRESS STRUCT<CITY STRING, STATE STRING, ZIPCODE BIGINT>
)
WITH (KAFKA_TOPIC='orders_local', VALUE_FORMAT='JSON');
CREATE STREAM ORDERS_SRC_3RDPARTY
(
ORDERTIME BIGINT,
ORDERID INT,
ITEMID STRING,
ORDERUNITS DOUBLE,
ADDRESS STRUCT<CITY STRING, STATE STRING, ZIPCODE BIGINT>
)
WITH (KAFKA_TOPIC='orders_3rdparty', VALUE_FORMAT='JSON');
由于两个Stream都可以通过SELECT查看内容,所以似乎没有问题。接下来,从Stream的ORDERS_SRC_LOCAL创建Stream的ALL_ORDERS。在此过程中,添加SRC列和包含“LOCAL”字符串的信息。
CREATE STREAM ALL_ORDERS AS SELECT 'LOCAL' AS SRC, * FROM ORDERS_SRC_LOCAL EMIT CHANGES;
描述內容被搜尋後,可以看出添加了SRC列。
ksql> DESCRIBE ALL_ORDERS;
Name : ALL_ORDERS
Field | Type
----------------------------------------------------------------------------------
SRC | VARCHAR(STRING)
ORDERTIME | BIGINT
ORDERID | INTEGER
ITEMID | VARCHAR(STRING)
ORDERUNITS | DOUBLE
ADDRESS | STRUCT<CITY VARCHAR(STRING), STATE VARCHAR(STRING), ZIPCODE BIGINT>
----------------------------------------------------------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;
下一步,使用INSERT将ORDERS_3RDPARTY插入到已创建的Stream ALL_ORDERS中。SELECT作为INSERT的参数让人感到非常不舒服。
INSERT INTO ALL_ORDERS SELECT '3RD PARTY' AS SRC, * FROM ORDERS_SRC_3RDPARTY EMIT CHANGES;
描述(DESCRIBE)并没有特别改变结果。然而,如果选择(SELECT)ALL_ORDERS,可以看到两个流(Stream)被合并。
+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+
|SRC |ORDERTIME |ORDERID |ITEMID |ORDERUNITS |ADDRESS |
+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+
|LOCAL |1491607148202 |0 |Item_898 |2.295635561371047 |{CITY=City_75, STATE=State_|
| | | | | |51, ZIPCODE=99961} |
|LOCAL |1502213077209 |1 |Item_140 |2.5099291074319723 |{CITY=City_31, STATE=State_|
| | | | | |85, ZIPCODE=60896} |
|LOCAL |1499994360002 |2 |Item_389 |0.2058594684735549 |{CITY=City_87, STATE=State_|
| | | | | |73, ZIPCODE=21220} |
|LOCAL |1496100207758 |3 |Item_847 |7.477474294032186 |{CITY=City_86, STATE=State_|
| | | | | |32, ZIPCODE=22647} |
|LOCAL |1505599782301 |4 |Item_315 |8.881110008286903 |{CITY=City_96, STATE=State_|
| | | | | |92, ZIPCODE=55875} |
|LOCAL |1513625276385 |5 |Item_125 |1.951656779215225 |{CITY=City_68, STATE=State_|
| | | | | |48, ZIPCODE=48094} |
|3RD PARTY |1516534876769 |0 |Item_276 |9.00734689040726 |{CITY=City_79, STATE=State_|
| | | | | |69, ZIPCODE=90948} |
|3RD PARTY |1491684084899 |1 |Item_724 |9.804594098542523 |{CITY=City_85, STATE=State_|
| | | | | |44, ZIPCODE=41963} |
|LOCAL |1493513975029 |6 |Item_364 |0.3857448600632095 |{CITY=City_83, STATE=State_|
| | | | | |48, ZIPCODE=91698} |
|3RD PARTY |1510339405388 |2 |Item_184 |5.594021107440936 |{CITY=City_56, STATE=State_|
总结(下半部分)
-
- ネストされた子構造はCREATE時の列名としてADDRESS STRUCTのようSTRUCT<>を用いて指定するが、Avro Shcema利用ならそのまま引張ってきてもよい。
-
- ネストされた子構造をSELECTする際は列名としてADDRESS->CITYのように->を用いて指定する。
- INSERT INTO + SELECTによって、あるStreamに別のStreamを統合することが出来る(が表構造の一致が前提なんだろうか)。