Confluent平台备忘录-(3)模式注册简易测试

首先

这是使用Confluent平台的记录。
这次我们将尝试使用Schema Registry进行消息的发送和接收。

相关文章

Confluent平台备忘录 – (1)环境设置
Confluent平台备忘录 – (2)简易消息发送和接收测试
Confluent平台备忘录 – (3)Schema注册表简易测试

可以考虑的信息来源

模式注册概述
Avro模式序列化器和反序列化器
Apache Avro™ 1.10.2规范
Avro,SchemaRegistry入门

关于Schema Registry

image.png

我认为会有这样的流程。

在消息发送时:
准备消息格式的Schema(Avro,JSON等)。
生产者根据Schema对数据进行序列化。
将使用的Schema注册到Schema Registry后,会返回一个用于标识该Schema的id,然后将id和数据一起发送到Kafka的Topic。
换句话说,会将“正在使用此Schema”(id)和序列化后的数据作为消息发送。

收到消息时:
由于Schema id和序列化数据一同接收到,因此消费者端需要从Schema注册表中获取与id关联的Schema,并对数据进行反序列化。

现在,无论是生产者还是消费者,都可以按照预定的模式(消息格式)处理消息。

顺便提一下,在上图中,Schema Registry被描绘为保留Schema信息的概念图,但实际上Schema Registry似乎在内部使用Kafka上的Topic作为Schema信息的存储位置。参考:Schema Registry概述-Kafka后端。

Kafka被用作Schema Registry的存储后端。一个特殊的Kafka主题(默认为_schemas),只有一个分区,被用作高度可用的预写日志。所有的模式、主题/版本和ID元数据以及兼容性设置都会作为消息追加到这个日志中。因此,一个Schema Registry实例既是_schemas主题下的消息生产者,也是消息消费者。

关于Avro Schema

Avro是一个关于数据格式的规范,可以使用Avro Schema定义数据类型。我认为,类似于JSON和JSON Schema的关系,我们应该理解Avro和Avro Schema的关系。尽管Avro Schema是用JSON格式编写的,这可能有些复杂,但是Avro数据格式和JSON数据格式是不同的。

在Schema Registry中,有几种可用的数据类型(Schema类型),其中之一是Avro。关于Avro,为Producer和Consumer分别提供了kafka-avro-console-producer和kafka-avro-console-consumer命令,使用这些命令可以从Shell中简单地确认消息的发送和接收。此外,这些命令可以进行Avro <=> JSON转换,因此在实际的数据输入和确认时可以使用JSON格式(内部会自动转换为Avro)。

kafka-avro-console-producer –help
这个工具帮助从标准输入读取数据并将其发布到Kafka。
选项 描述
—— ———–
–batch-size <整数: 大小> 如果未同步发送,表示在单个批次中发送的消息数量。(默认值: 200)
–bootstrap-server <字符串: 连接到的服务器> 除非指定–broker-list(已弃用),否则必须指定。要连接的服务器(们)。经纪人列表字符串的格式为 主机1:端口1,主机2:端口2。
–broker-list <字符串: 经纪人列表> 已弃用,请使用–bootstrap-server代替;如果指定了–bootstrap-server,则忽略。经纪人列表字符串的格式为 主机1:端口1,主机2:端口2。
–compression-codec [字符串: 压缩编解码器] 压缩编解码器:’none’、’gzip’、’snappy’、’lz4’或’zstd’。如果没有指定值,则默认为’gzip’。
–help 打印用法信息。
–line-reader <字符串: 读取器类> 用于从标准输入读取行的类名。默认情况下,每行作为一个单独的消息读取。(默认值: kafka.tools.ConsoleProducer$LineMessageReader)
–max-block-ms <整数: 最大阻塞时间> 生产者在发送请求期间最多阻塞的时间(默认值: 60000)
–max-memory-bytes <整数: 总内存字节数> 生产者用于缓冲等待发送到服务器的记录的总内存。(默认值: 33554432)
–max-partition-memory-bytes <整数: 每个分区的内存字节数> 为分区分配的缓冲区大小。当接收到小于此大小的记录时,生产者将尝试将其乐观地合并在一起,直到达到此大小。(默认值: 16384)
–message-send-max-retries <整数> 经纪人可能由于多种原因而无法接收消息,并且短暂不可用只是其中之一。此属性指定在生产者放弃并放弃此消息之前的重试次数。(默认值: 3)
–metadata-expiry-ms <整数: 元数据过期间隔> 即使没有看到任何领导者更改,超时后我们也会强制刷新元数据的时间段(默认值: 300000)
–producer-property <字符串: 生产者属性> 以键=值的形式传递用户定义的属性机制给生产者。
–producer.config <字符串: 配置文件> 生产者配置属性文件。注意,[producer-property]优先于此配置。
–property <字符串: 属性> 以键=值的形式传递用户定义的属性机制给消息读取器。这允许为用户定义的消息读取器进行自定义配置。默认属性包括:
parse.key=true|false
key.separator=<键分隔符>
ignore.error=true|false
–request-required-acks <字符串: 请求所需的确认> 生产者请求所需的确认(默认值: 1)
–request-timeout-ms <整数: 请求超时时间> 生产者请求的确认超时时间。值必须是非负且非零的(默认值: 1500)
–retry-backoff-ms <整数> 在每次重试之前,生产者会刷新相关主题的元数据。由于领导人选举需要一段时间,此属性指定生产者在刷新元数据之前等待的时间量。(默认值: 100)
–socket-buffer-size <整数: 大小> TCP接收大小。(默认值: 102400)
–sync 如果设置了此选项,则对经纪人的消息发送请求是同步的,一次发送一个。
–timeout <整数: 超时时间> 如果设置并且生产者在异步模式下运行,这是消息在排队等待足够的批量大小的最大时间。值以毫秒为单位。(默认值: 1000)
–topic <字符串: 主题> 必需:要发送消息的主题。
–version 显示Kafka版本。
kafka-avro-console-consumer –help
此工具用于从Kafka主题中读取数据并输出到标准输出。
选项 描述
—— ———–
–bootstrap-server
–consumer-property
–consumer.config 消费者配置文件。注意,[consumer-property]的优先级高于此配置。
–enable-systest-events 除了记录已消费消息外,还记录消费者的生命周期事件。(这只适用于系统测试。)
–formatter 用于格式化Kafka消息以供显示的类的名称。(默认值:kafka.tools.DefaultMessageFormatter)
–from-beginning 如果消费者尚未有已建立的偏移量来消费,从日志中最早的消息开始而不是最新的消息。
–group 消费者的消费者组ID。
–help 打印使用信息。
–isolation-level 设置为read_committed以过滤出未提交的事务消息。设置为read_uncommitted以读取所有消息。(默认值:read_uncommitted)
–key-deserializer
–max-messages 在退出之前要消费的最大消息数量。如果未设置,则持续消费。
–offset 要从中消费的偏移量ID(非负数),或者’earliest’表示从开头开始,或者’latest’表示从结尾开始。(默认值:latest)
–partition 要消费的分区。如果没有指定’–offset’,消费将从分区的末尾开始。
–property 用于初始化消息格式化程序的属性。默认属性包括:
print.timestamp=true|false
print.key=true|false
print.offset=true|false
print.partition=true|false
print.headers=true|false
print.value=true|false
key.separator=
line.separator= headers.separator= null.literal=
key.deserializer=
value.deserializer=
header.deserializer= 用户还可以传入自定义的属性来配置其格式化程序;具体来说,用户可以传入以’key.deserializer.’,’value.deserializer.’和’headers.deserializer.’为前缀的属性来配置其反序列化器。
–skip-message-on-error 如果处理消息时发生错误,跳过该消息而不是停止。
–timeout-ms 如果指定,则在指定的时间间隔内没有可供消费的消息时退出。
–topic 要消费的主题ID。
–value-deserializer
–version 显示Kafka版本。
–whitelist 指定要包括在消费中的主题的正则表达式。

此外,据看起来,Schema Registry还提供了用于管理的REST API,可以使用curl等工具进行访问。Schema Registry API参考资料。

一个操控的实例

我将尝试按照以下所示的示例进行操作:Avro模式序列化和反序列化 – Avro模式测试驱动。

确认模式类型

使用REST API可以确认已在Schema Registry中注册的模式类型。

[root@test12 ~]# curl http://localhost:8081/schemas/types
["JSON","PROTOBUF","AVRO"]

可以确认AVRO被处理。
(默认情况下,似乎在Schema注册表中已经安装了上述3种类型的插件。)

Schema指定でのメッセージ送受信の例1 / シンプルな場合:

试着使用简单的架构进行消息的发送和接收。

{
  "type": "record",
  "name": "myrecord",
  "fields": [
    {
      "name": "f1",
      "type": "string"
    }
  ]
}

以上的示例将整个消息定义为record类型(一种复杂类型)。在”fields”部分定义了具体字段。以上示例表示只有一个名为”f1″的String类型字段的消息。

要启动一个按照这个模式发送消息到 t1-a 主题的生产者,需要执行以下命令。(如果主题尚未创建,将会自动创建)

kafka-avro-console-producer --broker-list localhost:9092 \
--property schema.registry.url=http://localhost:8081 \
--topic t1-a \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'

与kafka-console-producer命令不同,此命令没有提示信息,因此您需要输入要连续发送的消息并按下Enter键。

{"f1": "value1-a"}

使用Ctrl+C退出。
接下来尝试启动Consumer。请指定以下的Topic名称和Schema Registry。

kafka-avro-console-consumer --bootstrap-server localhost:9092 \
--from-beginning --topic t1-a \
--property schema.registry.url=http://localhost:8081

然后,将返回以下消息。

{"f1":"value1-a"}

由于发送的消息被原样返回,所以并没有太多的乐趣,但在背后应该进行了序列化/反序列化。

在消费者执行时,尝试使用kafka-avro-console-consumer命令,并使用–property print.schema.ids=true参数进行指定。

[root@test12 /opt/confluent-6.2.0/bin]# ./kafka-avro-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic t1-a --property schema.registry.url=http://localhost:8081 --property print.schema.ids=true
{"f1":"value1-a"}       1

通过上述消息可知,与Schema id为1相关联。
尝试使用curl从Schema Registry获取schema id为1的信息,返回如下所示。

[root@test12 ~]# curl http://localhost:8081/schemas/ids/1
{"schema":"{\"type\":\"record\",\"name\":\"myrecord\",\"fields\":[{\"name\":\"f1\",\"type\":\"string\"}]}"}

順便提一下,如果使用kafka-console-consumer命令而不进行反序列化来查看这个Topic,会发生什么呢?

[root@test12 /opt/confluent-6.2.0/bin]# ./kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic t1-a
value1-a

变成了这样。

Schema指定でのメッセージ送受信例2 / キー指定
使用指定模式的消息发送和接收示例2 / 指定键

在Kafka的消息中,我们可以添加Key。让我们试试发送和接收使用Key和Value分别使用架构的消息。

用于密钥的架构

{"type":"string"}

Value的模式(与上例相同)。

{
  "type": "record",
  "name": "myrecord",
  "fields": [
    {
      "name": "f1",
      "type": "string"
    }
  ]
}

要启动一个Producer来发送具有遵循上述架构的Key和Value的消息到名为t2-a的Topic,需要执行以下命令。

kafka-avro-console-producer --broker-list localhost:9092 \
  --topic t2-a \
  --property parse.key=true \
  --property "key.separator= "\
  --property key.schema='{"type":"string"}' \
  --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}' \
  --property schema.registry.url=http://localhost:8081

请将以下内容输入为邮件消息。

"key1" {"f1": "value2-a"}
"AAAAA" {"f1": "TTTTT"}
"TEST01" {"f1": "XXXXXX"}

在另一个终端运行Consumer。

kafka-avro-console-consumer --from-beginning --topic t2-a \
   --bootstrap-server localhost:9092 \
   --property print.key=true \
   --property print.schema.ids=true \
   --property schema.id.separator=: \
   --property schema.registry.url=http://localhost:8081

然后,将返回以下结果。

"key1":2        {"f1":"value2-a"}:1
"AAAAA":2       {"f1":"TTTTT"}:1
"TEST01":2      {"f1":"XXXXXX"}:1

我用curl查看模式编号为2的信息。

[root@test12 ~]# curl http://localhost:8081/schemas/ids/2
{"schema":"\"string\""}

顺便说一句,如果你用kafka-console-consumer命令不进行反序列化查看,结果大致如下。

[root@test12 /opt/confluent-6.2.0/bin]# ./kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic t2-a
value2-a

TTTTT

XXXXXX

在Schema指定下的消息收發示例3 / 複雜Schema。

我会尝试使用稍微复杂一些的模式。
有关Avro Schema的定义方法,请参考此处。
Apache Avro™ 1.10.2规范

Key的模式设计

{"type":"string"}

用于Value的模式。

{
  "type": "record",
  "name": "userInfo",
    "fields" : [
      {"name": "username", "type": "string"},
      {"name": "age", "type": "int"},
      {"name": "phone", "type": "string"},
      {"name": "address", "type": {
        "type": "record",
        "name" : "user_address",
          "fields" : [
            {"name": "street", "type": "string"},
            {"name": "city", "type": "string"},
            {"name": "country", "type": "string"},
            {"name": "zip", "type": "string"}
          ]
        }
      }
    ]
}  

利用上述的架构,对名为`test3’的主题进行消息的收发。
由于架构很长,因此将其格式化为上述形式的Schema,创建一个名为avroSchema3.json的文件,并用jq命令将其转换为一行,然后将其设置为环境变量。

[root@test12 ~/Kafka]# avroSchema3=$(jq --compact-output . avroSchema3.json)

[root@test12 ~/Kafka]# echo ${avroSchema3}
{"type":"record","name":"userInfo","fields":[{"name":"username","type":"string"},{"name":"age","type":"int"},{"name":"phone","type":"string"},{"name":"address","type":{"type":"record","name":"user_address","fields":[{"name":"street","type":"string"},{"name":"city","type":"string"},{"name":"country","type":"string"},{"name":"zip","type":"string"}]}}]}

使用以下命令启动生产者。

kafka-avro-console-producer --broker-list localhost:9092 --topic test3 \
  --property parse.key=true \
  --property "key.separator= "\
  --property key.schema='{"type":"string"}' \
  --property value.schema=${avroSchema3} \
  --property schema.registry.url=http://localhost:8081

在另外一个终端窗口中启动Consumer。

kafka-avro-console-consumer --from-beginning --topic test3 \
   --bootstrap-server localhost:9092 \
   --property print.key=true \
   --property print.schema.ids=true \
   --property schema.id.separator=: \
   --property schema.registry.url=http://localhost:8081

我将尝试从以下的生产者发送以下的数据。

"A001" {"username": "Yamada Ichiro", "age": 31, "phone": "1234-5678", "address":{"street": "1-1", "city": "chiba", "country": "Japan", "zip": "111-2222"}}
"A002" {"username": "Tanaka Jiro", "age": 32, "phone": "2222-2222", "address":{"street": "2-2", "city": "tokyo", "country": "Japan", "zip": "111-2222"}}

消费者接收到了以下的数据。

"A001":2        {"username":"Yamada Ichiro","age":31,"phone":"1234-5678","address":{"street":"1-1","city":"chiba","country":"Japan","zip":"111-2222"}}:3
"A002":2        {"username":"Tanaka Jiro","age":32,"phone":"2222-2222","address":{"street":"2-2","city":"tokyo","country":"Japan","zip":"111-2222"}}:3

尝试使用kafka-console-consumer命令接收Topictest3的消息,没有进行反序列化,结果如下。

[root@test12 /opt/confluent-6.2.0/bin]# ./kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic test3
Yamada Ichiro>1234-56781-1
chiba
Japan111-2222
Tanaka Jiro@2222-22222-2
tokyo
Japan111-2222

也许Avro以二进制形式保存数据,换行可能没有正确显示。从这可以得出,内部可能正常使用了Schema进行序列化/反序列化。

顺便说一下,当将缺少字段的数据插入到用于Avro的Producer/Consumer的Schema定义中时,会导致Producer出错。

发送邮件

"A003" {"username": "Suzuki Saburo"}

制片人因错误而结束。

org.apache.kafka.common.errors.SerializationException: Error deserializing json {"username": "Suzuki Saburo"} to Avro of schema {"type":"record","name":"userInfo","fields":[{"name":"username","type":"string"},{"name":"age","type":"int"},{"name":"phone","type":"string"},{"name":"address","type":{"type":"record","name":"user_address","fields":[{"name":"street","type":"string"},{"name":"city","type":"string"},{"name":"country","type":"string"},{"name":"zip","type":"string"}]}}]}
Caused by: org.apache.avro.AvroTypeException: Expected int. Got END_OBJECT
        at org.apache.avro.io.JsonDecoder.error(JsonDecoder.java:511)
        at org.apache.avro.io.JsonDecoder.readInt(JsonDecoder.java:165)
        at org.apache.avro.io.ValidatingDecoder.readInt(ValidatingDecoder.java:82)
        at org.apache.avro.generic.GenericDatumReader.readInt(GenericDatumReader.java:551)
        at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:195)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
        at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
        at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
        at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
        at io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils.toObject(AvroSchemaUtils.java:190)
        at io.confluent.kafka.formatter.AvroMessageReader.readFrom(AvroMessageReader.java:121)
        at io.confluent.kafka.formatter.SchemaMessageReader.readMessage(SchemaMessageReader.java:344)
        at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:51)
        at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala)

整理模式信息

如前所述,我们可以通过REST从Schema Registry中获取模式信息。
例如,假设我们获取了以下模式信息(示例为检索模式 id 为 5 的信息)。

[root@test12 ~]# curl http://localhost:8081/schemas/ids/5
{"schema":"{\"type\":\"record\",\"name\":\"FILEA\",\"namespace\":\"value.SOURCEDB.VSAM.TEST01\",\"fields\":[{\"name\":\"STAT\",\"type\":[{\"type\":\"string\",\"logicalType\":\"CHARACTER\",\"dbColumnName\":\"STAT\",\"length\":1},\"null\"],\"doc\":\"\",\"default\":\"\"},{\"name\":\"NUMB\",\"type\":[{\"type\":\"string\",\"logicalType\":\"CHARACTER\",\"dbColumnName\":\"NUMB\",\"length\":6},\"null\"],\"doc\":\"\",\"default\":\"\"},{\"name\":\"NAME\",\"type\":[{\"type\":\"string\",\"logicalType\":\"CHARACTER\",\"dbColumnName\":\"NAME\",\"length\":20},\"null\"],\"doc\":\"\",\"default\":\"\"},{\"name\":\"ADDRX\",\"type\":[{\"type\":\"string\",\"logicalType\":\"CHARACTER\",\"dbColumnName\":\"ADDRX\",\"length\":20},\"null\"],\"doc\":\"\",\"default\":\"\"},{\"name\":\"PHONE\",\"type\":[{\"type\":\"string\",\"logicalType\":\"CHARACTER\",\"dbColumnName\":\"PHONE\",\"length\":8},\"null\"],\"doc\":\"\",\"default\":\"\"},{\"name\":\"DATEX\",\"type\":[{\"type\":\"string\",\"logicalType\":\"CHARACTER\",\"dbColumnName\":\"DATEX\",\"length\":8},\"null\"],\"doc\":\"\",\"default\":\"\"},{\"name\":\"AMOUNT\",\"type\":[{\"type\":\"string\",\"logicalType\":\"CHARACTER\",\"dbColumnName\":\"AMOUNT\",\"length\":8},\"null\"],\"doc\":\"\",\"default\":\"\"},{\"name\":\"COMMENT\",\"type\":[{\"type\":\"string\",\"logicalType\":\"CHARACTER\",\"dbColumnName\":\"COMMENT\",\"length\":9},\"null\"],\"doc\":\"\",\"default\":\"\"}]}"}

如此,结果将以 JSON 格式返回,在”schema”元素中以一行字符串的形式返回模式信息。例如:{“schema”:”xxx”}。这个“xxx”部分是所需的模式,以 JSON 格式表示为一个字符串数据元素,并且双引号已经被转义(带有反斜杠)。当返回的模式比较复杂时,会变得非常难以阅读。
因此,通过下面的命令,可以将模式信息进行整洁的格式化输出。

[root@test12 ~]# curl -s  http://localhost:8081/schemas/ids/5 | jq -r .schema | jq .
{
  "type": "record",
  "name": "FILEA",
  "namespace": "value.SOURCEDB.VSAM.TEST01",
  "fields": [
    {
      "name": "STAT",
      "type": [
        {
          "type": "string",
          "logicalType": "CHARACTER",
          "dbColumnName": "STAT",
          "length": 1
        },
        "null"
      ],
      "doc": "",
      "default": ""
    },
    {
      "name": "NUMB",
      "type": [
        {
          "type": "string",
          "logicalType": "CHARACTER",
          "dbColumnName": "NUMB",
          "length": 6
        },
        "null"
      ],
      "doc": "",
      "default": ""
    },
    {
      "name": "NAME",
      "type": [
        {
          "type": "string",
          "logicalType": "CHARACTER",
          "dbColumnName": "NAME",
          "length": 20
        },
        "null"
      ],
      "doc": "",
      "default": ""
    },
    {
      "name": "ADDRX",
      "type": [
        {
          "type": "string",
          "logicalType": "CHARACTER",
          "dbColumnName": "ADDRX",
          "length": 20
        },
        "null"
      ],
      "doc": "",
      "default": ""
    },
    {
      "name": "PHONE",
      "type": [
        {
          "type": "string",
          "logicalType": "CHARACTER",
          "dbColumnName": "PHONE",
          "length": 8
        },
        "null"
      ],
      "doc": "",
      "default": ""
    },
    {
      "name": "DATEX",
      "type": [
        {
          "type": "string",
          "logicalType": "CHARACTER",
          "dbColumnName": "DATEX",
          "length": 8
        },
        "null"
      ],
      "doc": "",
      "default": ""
    },
    {
      "name": "AMOUNT",
      "type": [
        {
          "type": "string",
          "logicalType": "CHARACTER",
          "dbColumnName": "AMOUNT",
          "length": 8
        },
        "null"
      ],
      "doc": "",
      "default": ""
    },
    {
      "name": "COMMENT",
      "type": [
        {
          "type": "string",
          "logicalType": "CHARACTER",
          "dbColumnName": "COMMENT",
          "length": 9
        },
        "null"
      ],
      "doc": "",
      "default": ""
    }
  ]
}

我会简单地补充一下上面的命令。
使用curl获取信息,然后使用jq命令提取所需部分。具体操作如下所示。

    • curlで情報取得

 

    • jqでschema要素の値のみ抽出

 

    schema要素の値を改めてjqで整形して表示

请向以下内容查看

脚本

#!/bin/bash

sudo /opt/confluent-6.2.0/bin/kafka-topics --list --bootstrap-server localhost:9092
#!/bin/bash

if [ $# -lt 1 ] ; then
        echo "Usage: "
        echo "  " $0 " <topicName>"
        exit 0
fi

topicName=$1

sudo /opt/confluent-6.2.0/bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic ${topicName} --property schema.registry.url=http://localhost:8081

#sudo /opt/confluent-6.2.0/bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic ${topicName} --property schema.registry.url=http://localhost:8081 --property print.schema.ids=true
#!/bin/bash

if [ $# -lt 1 ] ; then
        echo "Usage: "
        echo "  " $0 " <schemaID>"
        exit 0
fi

schemaID=$1

curl --silent  http://localhost:8081/schemas/ids/${schemaID} | jq -r .schema | jq .

bannerAds