使用 ksqlDB 创建流式查询(后半部分)

目标

继续前半部分,试着实施以下在下述来源中的“使用嵌套模式(STRUCT)在 ksqlDB 中”的部分。(由于那个链接已经过时,我已经经历了无法形容的痛苦,全部都埋葬在黑暗之中了。)

使用ksqlDB(本地)针对Apache Kafka®编写流查询。

相关的Qiita文章

1. 尝试进行ksqlDB快速入门
2. 创建使用ksqlDB的流查询(前半部分)
3. 创建使用ksqlDB的流查询(后半部分)
4. 在Kafka上使用ksql处理MySQL表的更新信息

使用ksqlDB创建流查询(后半部分)。

在 ksqlDB 中使用嵌套模式(STRUCT)

这次Stream/Table的原始数据源是ksql-datagen的一个预设订单。

bin/ksql-datagen quickstart=orders format=avro topic=orders msgRate=1

ksql-datagen生成的日志输出如下:

[0] --> ([ 1504174143763L | 0 | 'Item_878' | 4.036972663514597 | Struct{city=City_67,state=State_25,zipcode=71437} ]) ts:1635225586518
[1] --> ([ 1509355271065L | 1 | 'Item_670' | 9.154643686738641 | Struct{city=City_32,state=State_25,zipcode=81180} ]) ts:1635225586796
[2] --> ([ 1487743369101L | 2 | 'Item_682' | 9.913937197671148 | Struct{city=City_99,state=State_25,zipcode=99831} ]) ts:1635225587794
[3] --> ([ 1514028266544L | 3 | 'Item_915' | 7.027966334147111 | Struct{city=City_68,state=State_78,zipcode=47239} ]) ts:1635225588795
[4] --> ([ 1500111527090L | 4 | 'Item_533' | 9.268916469646818 | Struct{city=City_71,state=State_72,zipcode=15953} ]) ts:1635225589795
[5] --> ([ 1492193039132L | 5 | 'Item_294' | 7.781222807562362 | Struct{city=City_46,state=State_66,zipcode=29977} ]) ts:1635225590795
[6] --> ([ 1495043772684L | 6 | 'Item_322' | 1.2480686742580827 | Struct{city=City_86,state=State_69,zipcode=18948} ]) ts:1635225591795
[7] --> ([ 1505289388278L | 7 | 'Item_233' | 0.15332131177604907 | Struct{city=City_29,state=State_98,zipcode=64327} ]) ts:1635225592795
[8] --> ([ 1515383680311L | 8 | 'Item_521' | 6.810614048446687 | Struct{city=City_63,state=State_28,zipcode=32247} ]) ts:1635225593795
[9] --> ([ 1515133573377L | 9 | 'Item_981' | 4.464408695223682 | Struct{city=City_81,state=State_13,zipcode=87019} ]) ts:1635225594795
[10] --> ([ 1506411308569L | 10 | 'Item_366' | 4.2732457996601845 | Struct{city=City_29,state=State_95,zipcode=93103} ]) ts:1635225595795
[11] --> ([ 1492697874803L | 11 | 'Item_688' | 4.155429455625357 | Struct{city=City_77,state=State_47,zipcode=48054} ]) ts:1635225596795
[12] --> ([ 1498066277947L | 12 | 'Item_438' | 8.852426005487066 | Struct{city=City_35,state=State_52,zipcode=36616} ]) ts:1635225597795

通过Schema Registry的日志查找主题名称,然后使用以下API来查找架构。我已经向先辈学习了jq的使用方法。

curl -X GET http://localhost:8081/subjects/orders-value/versions/1 | jq .schema | sed -e 's/^"//' -e 's/"$//' -e 's/\\//g' | jq .

根据Avro Schema的结构,可以看到KsqlDataSourceSchema记录中有一个名为KsqlDataSourceSchema_address的记录,它似乎包含city、state和zipcode这些字段。

{
  "type": "record",
  "name": "KsqlDataSourceSchema",
  "namespace": "io.confluent.ksql.avro_schemas",
  "fields": [
    {
      "name": "ordertime",
      "type": [
        "null",
        "long"
      ],
      "default": null
    },
    {
      "name": "orderid",
      "type": [
        "null",
        "int"
      ],
      "default": null
    },
    {
      "name": "itemid",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "orderunits",
      "type": [
        "null",
        "double"
      ],
      "default": null
    },
    {
      "name": "address",
      "type": [
        "null",
        {
          "type": "record",
          "name": "KsqlDataSourceSchema_address",
          "fields": [
            {
              "name": "city",
              "type": [
                "null",
                "string"
              ],
              "default": null
            },
            {
              "name": "state",
              "type": [
                "null",
                "string"
              ],
              "default": null
            },
            {
              "name": "zipcode",
              "type": [
                "null",
                "long"
              ],
              "default": null
            }
          ]
        }
      ],
      "default": null
    }
  ]
}

通过ksqlDB CLI的PRINT命令来观察消息。可以看到消息键中包含了orderid的原始值,并且在address元素中嵌套了city、state和zipcode。

Key format: KAFKA_INT or KAFKA_STRING
Value format: AVRO
rowtime: 2021/10/26 05:19:46.518 Z, key: 0, value: {"ordertime": 1504174143763, "orderid": 0, "itemid": "Item_878", "orderunits": 4.036972663514597, "address": {"city": "City_67", "state": "State_25", "zipcode": 71437}}, partition: 0
...

在下一个要发布的CREATE中,我对其含义感到困惑,但我理解这只是一个示例,它展示了如何手动声明嵌套结构的地址元素,而没有利用到宝贵的Avro Schema。

CREATE STREAM ORDERS
  (
    ORDERTIME BGINT,
    ORDERID INT,
    ITEMID STRING,
    ORDERUNITS DOUBLE,
    ADDRESS STRUCT<CITY STRING, STATE STRING, ZIPCODE BIGINT>
  )
   WITH (KAFKA_TOPIC='orders', VALUE_FORMAT='avro');

描述的结果如下所示。

ksql> DESCRIBE ORDERS;

Name                 : ORDERS
 Field      | Type
----------------------------------------------------------------------------------
 ORDERTIME  | BIGINT
 ORDERID    | INTEGER
 ITEMID     | VARCHAR(STRING)
 ORDERUNITS | DOUBLE
 ADDRESS    | STRUCT<CITY VARCHAR(STRING), STATE VARCHAR(STRING), ZIPCODE BIGINT>
----------------------------------------------------------------------------------

实际上,您还可以直接从Avro Schema读取模式结构。由于是流式的,因此并不总要求指定键。

CREATE STREAM ORDERS WITH (KAFKA_TOPIC='orders', VALUE_FORMAT='avro');

用这个方法创建的时候,当进行DESCRIBE时,能够正确地从AVRO Schema中提取嵌套结构。

ksql> DESCRIBE ORDERS;

Name                 : ORDERS
 Field      | Type
----------------------------------------------------------------------------------
 ORDERTIME  | BIGINT
 ORDERID    | INTEGER
 ITEMID     | VARCHAR(STRING)
 ORDERUNITS | DOUBLE
 ADDRESS    | STRUCT<CITY VARCHAR(STRING), STATE VARCHAR(STRING), ZIPCODE BIGINT>
----------------------------------------------------------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;

Stream的选择操作也成功了。当想要收集嵌套子元素时,在ASCII艺术箭头->之前加上它是很有趣的。

SELECT ORDERID, ADDRESS->CITY FROM ORDERS EMIT CHANGES LIMIT 5;
+---------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------+
|ORDERID                                                                                |CITY                                                                                   |
+---------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------+
|0                                                                                      |City_75                                                                                |
|1                                                                                      |City_29                                                                                |
|2                                                                                      |City_75                                                                                |
|3                                                                                      |City_24                                                                                |
|4                                                                                      |City_19                                                                                |
Limit Reached
Query terminated

将以下内容插入到数据库中:

根据惯例,使用ksql-datagen工具创建两个基于预设orders的Topic。

bin/ksql-datagen quickstart=orders format=json topic=orders_local msgRate=2
bin/ksql-datagen quickstart=orders format=json topic=orders_3rdparty msgRate=2

使用ksqlDB CLI查看内容时,可以看到它以JSON格式存储。(虽然和Avro外观相同)

ksql> PRINT orders_local FROM BEGINNING;
Key format: KAFKA_INT or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2021/10/26 05:37:41.369 Z, key: 0, value: {"ordertime":1503864575926,"orderid":0,"itemid":"Item_434","orderunits":4.2864657664768755,"address":{"city":"City_69","state":"State_35","zipcode":46591}}, partition: 0
....

接下来,我们将创建与这些主题相关的流。

CREATE STREAM ORDERS_SRC_LOCAL
 (
   ORDERTIME BIGINT,
   ORDERID INT,
   ITEMID STRING,
   ORDERUNITS DOUBLE,
   ADDRESS STRUCT<CITY STRING, STATE STRING, ZIPCODE BIGINT>
 )
  WITH (KAFKA_TOPIC='orders_local', VALUE_FORMAT='JSON');

CREATE STREAM ORDERS_SRC_3RDPARTY
 (
   ORDERTIME BIGINT,
   ORDERID INT,
   ITEMID STRING,
   ORDERUNITS DOUBLE,
   ADDRESS STRUCT<CITY STRING, STATE STRING, ZIPCODE BIGINT>
 )
  WITH (KAFKA_TOPIC='orders_3rdparty', VALUE_FORMAT='JSON');

由于两个Stream都可以通过SELECT查看内容,所以似乎没有问题。接下来,从Stream的ORDERS_SRC_LOCAL创建Stream的ALL_ORDERS。在此过程中,添加SRC列和包含“LOCAL”字符串的信息。

CREATE STREAM ALL_ORDERS AS SELECT 'LOCAL' AS SRC, * FROM ORDERS_SRC_LOCAL EMIT CHANGES;

描述內容被搜尋後,可以看出添加了SRC列。

ksql> DESCRIBE ALL_ORDERS;

Name                 : ALL_ORDERS
 Field      | Type
----------------------------------------------------------------------------------
 SRC        | VARCHAR(STRING)
 ORDERTIME  | BIGINT
 ORDERID    | INTEGER
 ITEMID     | VARCHAR(STRING)
 ORDERUNITS | DOUBLE
 ADDRESS    | STRUCT<CITY VARCHAR(STRING), STATE VARCHAR(STRING), ZIPCODE BIGINT>
----------------------------------------------------------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;

下一步,使用INSERT将ORDERS_3RDPARTY插入到已创建的Stream ALL_ORDERS中。SELECT作为INSERT的参数让人感到非常不舒服。

INSERT INTO ALL_ORDERS SELECT '3RD PARTY' AS SRC, * FROM ORDERS_SRC_3RDPARTY EMIT CHANGES;

描述(DESCRIBE)并没有特别改变结果。然而,如果选择(SELECT)ALL_ORDERS,可以看到两个流(Stream)被合并。

+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+
|SRC                        |ORDERTIME                  |ORDERID                    |ITEMID                     |ORDERUNITS                 |ADDRESS                    |
+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+
|LOCAL                      |1491607148202              |0                          |Item_898                   |2.295635561371047          |{CITY=City_75, STATE=State_|
|                           |                           |                           |                           |                           |51, ZIPCODE=99961}         |
|LOCAL                      |1502213077209              |1                          |Item_140                   |2.5099291074319723         |{CITY=City_31, STATE=State_|
|                           |                           |                           |                           |                           |85, ZIPCODE=60896}         |
|LOCAL                      |1499994360002              |2                          |Item_389                   |0.2058594684735549         |{CITY=City_87, STATE=State_|
|                           |                           |                           |                           |                           |73, ZIPCODE=21220}         |
|LOCAL                      |1496100207758              |3                          |Item_847                   |7.477474294032186          |{CITY=City_86, STATE=State_|
|                           |                           |                           |                           |                           |32, ZIPCODE=22647}         |
|LOCAL                      |1505599782301              |4                          |Item_315                   |8.881110008286903          |{CITY=City_96, STATE=State_|
|                           |                           |                           |                           |                           |92, ZIPCODE=55875}         |
|LOCAL                      |1513625276385              |5                          |Item_125                   |1.951656779215225          |{CITY=City_68, STATE=State_|
|                           |                           |                           |                           |                           |48, ZIPCODE=48094}         |
|3RD PARTY                  |1516534876769              |0                          |Item_276                   |9.00734689040726           |{CITY=City_79, STATE=State_|
|                           |                           |                           |                           |                           |69, ZIPCODE=90948}         |
|3RD PARTY                  |1491684084899              |1                          |Item_724                   |9.804594098542523          |{CITY=City_85, STATE=State_|
|                           |                           |                           |                           |                           |44, ZIPCODE=41963}         |
|LOCAL                      |1493513975029              |6                          |Item_364                   |0.3857448600632095         |{CITY=City_83, STATE=State_|
|                           |                           |                           |                           |                           |48, ZIPCODE=91698}         |
|3RD PARTY                  |1510339405388              |2                          |Item_184                   |5.594021107440936          |{CITY=City_56, STATE=State_|

总结(下半部分)

    • ネストされた子構造はCREATE時の列名としてADDRESS STRUCTのようSTRUCT<>を用いて指定するが、Avro Shcema利用ならそのまま引張ってきてもよい。

 

    • ネストされた子構造をSELECTする際は列名としてADDRESS->CITYのように->を用いて指定する。

 

    INSERT INTO + SELECTによって、あるStreamに別のStreamを統合することが出来る(が表構造の一致が前提なんだろうか)。
广告
将在 10 秒后关闭
bannerAds