我尝试使用 Confluent for Kubernetes 在 AKS 上配置 Confluent 平台 – KSQL 流创建部分
简要
Confluent for Kubernetes (CFK) 是一个云原生的控制平台,用于在私有云环境(本例中为Azure Kubernetes Service(AKS))上部署和管理Confluent。它具有基于声明性API的标准且简单的界面,可用于定制、部署和管理Confluent平台。
使用CFK将Confluent Platform的KSQL部署在AKS上,创建流程来创建Stream的工作流程概述如下。
-
- 准备 Kubernetes 环境(在预先准备完成的情况下)
-
- 部署 Confluent for Kubernetes(在预先准备完成的情况下)
-
- 部署 Confluent Platform(包括所需的 Connector Plugin)(在预先准备完成的情况下)
-
- 执行 Confluent Platform 的其他部署(创建主题)(在预先准备完成的情况下)
-
- 执行 Confluent Platform 的其他部署(创建源/汇 Connector)(在预先准备完成的情况下)
-
- 为 Confluent Platform 的流进行额外配置
- 执行 Confluent Platform 的流的额外部署(创建流)

本地环境
-
- macOS Monterey 12.3.1
-
- python 3.8.12
-
- Azure CLI 2.34.1
-
- helm v3.6.3
- kubectl v1.21.3
事前准备

新增配置的Confluent平台
更改Confluent平台的配置
为了使用KSQL REST API(ksqldb-cli),我们需要在之前的”confluent_platform_ccc.yaml”文件中添加一个”kind: KsqlDB”和”spec: – configOverrides: – server:”的定义。
apiVersion: platform.confluent.io/v1beta1
kind: KsqlDB
metadata:
name: ksqldb
namespace: akscfk231
spec:
replicas: 1
image:
application: confluentinc/cp-ksqldb-server:7.1.0
init: confluentinc/confluent-init-container:2.3.0
dataVolumeCapacity: 10Gi
configOverrides:
server:
- ksql.schema.registry.url=http://schemaregistry.akscfk231.svc.cluster.local:8081
- listeners=http://ksqldb.akscfk231.svc.cluster.local:8088
在Confluent平台上进行附加部署
将附加设置反映到Confluent Platform的ksqldb中。
将前述添加定义的”confluent_platform_ccc.yaml”重新应用。仅将ksqldb配置为已配置的设置。
$ kubectl apply -f confluent_platform_ccc.yaml
zookeeper.platform.confluent.io/zookeeper unchanged
kafka.platform.confluent.io/kafka unchanged
connect.platform.confluent.io/connect unchanged
ksqldb.platform.confluent.io/ksqldb configured # <--- ここだけ configured
controlcenter.platform.confluent.io/controlcenter unchanged
schemaregistry.platform.confluent.io/schemaregistry unchanged
kafkarestproxy.platform.confluent.io/kafkarestproxy unchanged
确认已部署的 Confluent Platform 资源
ksqldb的Configmap信息
## Configmap 一覧の取得
$ kubectl get configmap
NAME DATA AGE
connect-init-config 3 26m
connect-shared-config 5 26m
controlcenter-init-config 3 23m
controlcenter-shared-config 6 23m
kafka-init-config 3 24m
kafka-shared-config 5 24m
kafkarestproxy-init-config 3 23m
kafkarestproxy-shared-config 4 23m
ksqldb-init-config 3 23m
ksqldb-shared-config 5 23m
kube-root-ca.crt 1 29m
schemaregistry-init-config 3 23m
schemaregistry-shared-config 4 23m
zookeeper-init-config 3 26m
zookeeper-shared-config 6 26m
## ksqldb の Configmap 情報
$ kubectl describe configmap ksqldb-shared-config
Name: ksqldb-shared-config
Namespace: akscfk231
Labels: app.kubernetes.io/managed-by=confluent-operator
confluent-platform=true
cr-name=ksqldb
type=ksqldb
Annotations: platform.confluent.io/cr-name: ksqldb
platform.confluent.io/last-applied:
eyJkYXRhIjp7ImRpc2stdXNhZ2UtYWdlbnQucHJvcGVydGllcyI6ImRpc2suZGF0YT0vbW50L2RhdGEvZGF0YVxuc2VydmljZS5uYW1lPWtzcWxkYlxuIiwiam14LWV4cG9ydGVyLn...
platform.confluent.io/namespace: akscfk231
platform.confluent.io/type: ksqldb
Data
====
jvm.config:
----
-Dcom.sun.management.jmxremote=true
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.local.only=false
-Dcom.sun.management.jmxremote.port=7203
-Dcom.sun.management.jmxremote.rmi.port=7203
-Dcom.sun.management.jmxremote.ssl=false
-Djava.awt.headless=true
-Djdk.tls.ephemeralDHKeySize=2048
-Djdk.tls.server.enableSessionTicketExtension=false
-XX:+ExplicitGCInvokesConcurrent
-XX:+PrintFlagsFinal
-XX:+UnlockDiagnosticVMOptions
-XX:+UseG1GC
-XX:ConcGCThreads=1
-XX:G1HeapRegionSize=16
-XX:InitiatingHeapOccupancyPercent=35
-XX:MaxGCPauseMillis=20
-XX:MaxMetaspaceFreeRatio=80
-XX:MetaspaceSize=96m
-XX:MinMetaspaceFreeRatio=50
-XX:ParallelGCThreads=1
-server
ksqldb.properties:
----
authentication.skip.paths=/chc/live,/chc/ready
bootstrap.servers=kafka.akscfk231.svc.cluster.local:9071
config.providers=file
config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider
confluent.support.metrics.enable=true
ksql.schema.registry.url=http://schemaregistry.akscfk231.svc.cluster.local:8081 # <--- 追加された設定
ksql.service.id=akscfk231.ksqldb_
ksql.sink.replicas=3
ksql.streams.num.standby.replicas=1
ksql.streams.producer.confluent.batch.expiry.ms=9223372036854775807
ksql.streams.producer.max.block.ms=9223372036854775807
ksql.streams.producer.request.timeout.ms=300000
ksql.streams.producer.retries=2147483647
ksql.streams.replication.factor=3
ksql.streams.state.dir=/mnt/data/data/ksql-state
listeners=http://ksqldb.akscfk231.svc.cluster.local:8088 # <--- 追加された設定
request.timeout.ms=20000
retry.backoff.ms=500
security.protocol=PLAINTEXT
log4j.properties:
----
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.EnhancedPatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%p] %d [%t] %c %M - %m%n
log4j.rootLogger=INFO, stdout
disk-usage-agent.properties:
----
disk.data=/mnt/data/data
service.name=ksqldb
jmx-exporter.yaml:
----
lowercaseOutputLabelNames: false
lowercaseOutputName: true
ssl: false
Events: <none>
Ksqldb 的详细信息
$ kubectl describe ksqldb ksqldb
Name: ksqldb
Namespace: akscfk231
Labels: <none>
Annotations: <none>
API Version: platform.confluent.io/v1beta1
Kind: KsqlDB
Metadata:
Creation Timestamp: 2022-08-18T05:56:26Z
Finalizers:
ksqldb.finalizers.platform.confluent.io
Generation: 1
Managed Fields:
API Version: platform.confluent.io/v1beta1
Fields Type: FieldsV1
fieldsV1:
f:metadata:
f:annotations:
.:
f:kubectl.kubernetes.io/last-applied-configuration:
f:spec:
.:
f:configOverrides:
.:
f:server:
f:dataVolumeCapacity:
f:image:
.:
f:application:
f:init:
f:replicas:
Manager: kubectl-client-side-apply
Operation: Update
Time: 2022-08-18T05:56:26Z
API Version: platform.confluent.io/v1beta1
Fields Type: FieldsV1
fieldsV1:
f:metadata:
f:finalizers:
.:
v:"ksqldb.finalizers.platform.confluent.io":
Manager: manager
Operation: Update
Time: 2022-08-18T05:56:26Z
API Version: platform.confluent.io/v1beta1
Fields Type: FieldsV1
fieldsV1:
f:status:
.:
f:clusterName:
f:clusterNamespace:
f:conditions:
f:currentReplicas:
f:internalTopicNames:
f:kafka:
.:
f:bootstrapEndpoint:
f:operatorVersion:
f:phase:
f:readyReplicas:
f:replicas:
f:restConfig:
.:
f:internalEndpoint:
f:selector:
f:serviceID:
Manager: manager
Operation: Update
Subresource: status
Time: 2022-08-18T06:04:03Z
Resource Version: 5691
UID: 3cc5b96a-245c-4982-a21d-28a178c1a1e5
Spec:
Config Overrides:
Server:
ksql.schema.registry.url=http://schemaregistry.akscfk231.svc.cluster.local:8081 # <--- 追加された設定
listeners=http://ksqldb.akscfk231.svc.cluster.local:8088 # <--- 追加された設定
Data Volume Capacity: 10Gi
Image:
Application: confluentinc/cp-ksqldb-server:7.1.0
Init: confluentinc/confluent-init-container:2.3.0
Replicas: 1
Status:
Cluster Name: ksqldb
Cluster Namespace: akscfk231
Conditions:
Last Probe Time: 2022-08-18T06:01:01Z
Last Transition Time: 2022-08-18T06:04:03Z
Message: Deployment has minimum availability.
Reason: MinimumReplicasAvailable
Status: True
Type: platform.confluent.io/statefulset-available
Last Probe Time: 2022-08-18T06:01:01Z
Last Transition Time: 2022-08-18T06:04:03Z
Message: Kubernetes resources ready.
Reason: KubernetesResourcesReady
Status: True
Type: platform.confluent.io/resources-ready
Last Probe Time: 2022-08-18T06:01:01Z
Last Transition Time: 2022-08-18T06:01:01Z
Message: Cluster is not being garbage collected
Reason: Garbage Collection not triggered
Status: False
Type: platform.confluent.io/garbage-collecting
Current Replicas: 1
Internal Topic Names:
_confluent-ksql-akscfk231.ksqldb__command_topic
_confluent-ksql-akscfk231.ksqldb__configs
Kafka:
Bootstrap Endpoint: kafka.akscfk231.svc.cluster.local:9071
Operator Version: v0.435.23
Phase: RUNNING
Ready Replicas: 1
Replicas: 1
Rest Config:
Internal Endpoint: http://ksqldb.akscfk231.svc.cluster.local:8088
Selector: app=ksqldb,clusterId=akscfk231,confluent-platform=true,type=ksqldb
Service ID: akscfk231.ksqldb_
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Warning Warning 39m (x19 over 44m) ksqlDB waiting for at-least one kafka pod availability
Normal SuccessfulCreate 39m (x2 over 39m) ksqlDB resource type *v1.Service successfully created
Normal SuccessfulCreate 39m (x2 over 39m) ksqlDB resource type *v1.ConfigMap successfully created
Normal SuccessfulCreate 39m ksqlDB resource type *v1.PersistentVolumeClaim successfully created
Normal SuccessfulCreate 39m ksqlDB resource type *v1.StatefulSet successfully created
增加流的配置
-
- 以下の2つのクエリを Configmap で定義します
「topic001」→「stream001」用のクエリとして「queries001.sql」を定義
必要なカラムのみを stream001 に転送します
「stream001」をベースに「stream002」→「topic002」用のクエリとして「queries002.sql」を定義
カラム – section : ‘C’, ‘E’, ‘F’, ‘W’ のデータのみ抽出し、topic002 に転送します
Pod 的配置用於作業。
使用Configmap定义两个查询的Pod配置文件
---
apiVersion: v1
kind: Pod
metadata:
name: ksql-client2
namespace: akscfk231
spec:
containers:
- name: ksql-client2
image: confluentinc/ksqldb-cli:latest
env:
- name: KSQL_BOOTSTRAP_SERVERS
value: PLAINTEXT://kafka.akscfk231.svc.cluster.local:9092
- name: KSQL_KSQL_SCHEMA_REGISTRY_URL
value: http://schemaregistry.akscfk231.svc.cluster.local:8081
volumeMounts:
- mountPath: /ituru
name: ksql-queries
volumes:
- name: ksql-queries
configMap:
name: demo-ksql-client-queries-configmap
---
apiVersion: v1
kind: ConfigMap
metadata:
name: demo-ksql-client-queries-configmap
namespace: akscfk231
data: # <--- 以下に2つのクエリを定義する
queries001.sql: |-
CREATE STREAM stream001 (
id VARCHAR,
ctid BIGINT,
section VARCHAR,
iot_state VARCHAR,
val_1 DOUBLE,
val_2 DOUBLE,
created_at VARCHAR
) WITH (kafka_topic = 'topic001', value_format = 'avro');
queries002.sql: |-
CREATE STREAM stream002 WITH (kafka_topic = 'topic002', value_format = 'JSON_SR') AS
SELECT * FROM stream001 WHERE section='C' OR section='E' OR section='F' OR section='W';
启动作业使用的 Pod
## ksqldb-cli Pod の作成
$ kubectl apply -f ksql-client2.yaml
pod/ksql-client2 created
configmap/demo-ksql-client-queries-configmap created
确认使用作业Pod
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
confluent-operator-76d7677b8c-ll6rf 1/1 Running 0 17m
connect-0 1/1 Running 1 (11m ago) 16m
controlcenter-0 1/1 Running 0 12m
kafka-0 1/1 Running 0 13m
kafka-1 1/1 Running 0 13m
kafka-2 1/1 Running 0 13m
kafkarestproxy-0 1/1 Running 0 12m
ksql-client2 1/1 Running 0 54s
ksqldb-0 1/1 Running 1 (10m ago) 12m
schemaregistry-0 1/1 Running 0 12m
zookeeper-0 1/1 Running 0 16m
zookeeper-1 1/1 Running 0 16m
zookeeper-2 1/1 Running 0 16m
确认 Configmap
## ksqldb-cli の Configmap 設定情報確認
$ kubectl get configmap demo-ksql-client-queries-configmap
NAME DATA AGE
demo-ksql-client-queries-configmap 2 112s
## ksqldb-cli の Configmap 詳細情報確認
$ kubectl describe configmap demo-ksql-client-queries-configmap
Name: demo-ksql-client-queries-configmap
Namespace: akscfk231
Labels: <none>
Annotations: <none>
Data
====
queries001.sql:
----
CREATE STREAM stream001 (
id VARCHAR,
ctid BIGINT,
section VARCHAR,
iot_state VARCHAR,
val_1 DOUBLE,
val_2 DOUBLE,
created_at VARCHAR
) WITH (kafka_topic = 'topic001', value_format = 'avro');
queries002.sql:
----
CREATE STREAM stream002 WITH (kafka_topic = 'topic002', value_format = 'JSON_SR') AS
SELECT * FROM stream001 WHERE section='C' OR section='E' OR section='F' OR section='W';
Events: <none>
直播設置
使用 ksqldb cli 进行创建
请连接到上述创建的工作 Pod,使用 ksqldb cli 来配置流。
## 作業用 Pod への接続
$ kubectl exec -it ksql-client2 -- /bin/bash
## 設定ファイルの確認(stream001)
[appuser@ksql-client2 ~]$ cat /ituru/queries001.sql
CREATE STREAM stream001 (
id VARCHAR,
ctid BIGINT,
section VARCHAR,
iot_state VARCHAR,
val_1 DOUBLE,
val_2 DOUBLE,
created_at VARCHAR
) WITH (kafka_topic = 'topic001', value_format = 'avro');
## 設定ファイルの確認(stream002)
[appuser@ksql-client2 ituru]$ cat /ituru/queries002.sql
CREATE STREAM stream002 WITH (kafka_topic = 'topic002', value_format = 'JSON_SR') AS
SELECT * FROM stream001 WHERE section='C' OR section='E' OR section='F' OR section='W';
## 現在の topic 一覧の取得
[appuser@ksql-client2 ~]$ ksql --execute "SHOW TOPICS;" -- http://ksqldb.akscfk231.svc.cluster.local:8088
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
Kafka Topic | Partitions | Partition Replicas
---------------------------------------------------------------------
_schemas_schemaregistry_akscfk231 | 1 | 3
akscfk231.connect-configs | 1 | 3
akscfk231.connect-offsets | 25 | 3
akscfk231.connect-status | 5 | 3
topic001 | 1 | 3
topic002 | 1 | 3
---------------------------------------------------------------------
## stream001 の作成
[appuser@ksql-client2 ~]$ ksql --file /ituru/queries001.sql -- http://ksqldb.akscfk231.svc.cluster.local:8088
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
Message
----------------
Stream created
----------------
## stream001 の確認
[appuser@ksql-client2 ~]$ ksql --execute "DESCRIBE stream001 EXTENDED;" -- http://ksqldb.akscfk231.svc.cluster.local:8088
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
Name : STREAM001
Type : STREAM
Timestamp field : Not set - using <ROWTIME>
Key format : KAFKA
Value format : AVRO
Kafka topic : topic001 (partitions: 1, replication: 3)
Statement : CREATE STREAM STREAM001 (ID STRING, CTID BIGINT, SECTION STRING, IOT_STATE STRING, VAL_1 DOUBLE, VAL_2 DOUBLE, CREATED_AT STRING) WITH (KAFKA_TOPIC='topic001', KEY_FORMAT='KAFKA', VALUE_FORMAT='AVRO');
Field | Type
------------------------------
ID | VARCHAR(STRING)
CTID | BIGINT
SECTION | VARCHAR(STRING)
IOT_STATE | VARCHAR(STRING)
VAL_1 | DOUBLE
VAL_2 | DOUBLE
CREATED_AT | VARCHAR(STRING)
------------------------------
Local runtime statistics
------------------------
(Statistics of the local KSQL server interaction with the Kafka topic topic001)
## stream002 の作成
[appuser@ksql-client2 ~]$ ksql --file /ituru/queries002.sql -- http://ksqldb.akscfk231.svc.cluster.local:8088
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
Message
----------------------------------------
Created query with ID CSAS_STREAM002_1
----------------------------------------
## stream002 の確認
[appuser@ksql-client2 ~]$ ksql --execute "DESCRIBE stream002 EXTENDED;" -- http://ksqldb.akscfk231.svc.cluster.local:8088
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
Name : STREAM002
Type : STREAM
Timestamp field : Not set - using <ROWTIME>
Key format : KAFKA
Value format : JSON_SR
Kafka topic : topic002 (partitions: 1, replication: 3)
Statement : CREATE STREAM STREAM002 WITH (KAFKA_TOPIC='topic002', PARTITIONS=1, REPLICAS=3, VALUE_FORMAT='JSON_SR') AS SELECT *
FROM STREAM001 STREAM001
WHERE ((((STREAM001.SECTION = 'C') OR (STREAM001.SECTION = 'E')) OR (STREAM001.SECTION = 'F')) OR (STREAM001.SECTION = 'W'))
EMIT CHANGES;
Field | Type
------------------------------
ID | VARCHAR(STRING)
CTID | BIGINT
SECTION | VARCHAR(STRING)
IOT_STATE | VARCHAR(STRING)
VAL_1 | DOUBLE
VAL_2 | DOUBLE
CREATED_AT | VARCHAR(STRING)
------------------------------
Queries that write from this STREAM
-----------------------------------
CSAS_STREAM002_1 (RUNNING) : CREATE STREAM STREAM002 WITH (KAFKA_TOPIC='topic002', PARTITIONS=1, REPLICAS=3, VALUE_FORMAT='JSON_SR') AS SELECT * FROM STREAM001 STREAM001 WHERE ((((STREAM001.SECTION = 'C') OR (STREAM001.SECTION = 'E')) OR (STREAM001.SECTION = 'F')) OR (STREAM001.SECTION = 'W')) EMIT CHANGES;
For query topology and execution plan please run: EXPLAIN <QueryId>
Local runtime statistics
------------------------
(Statistics of the local KSQL server interaction with the Kafka topic topic002)
Consumer Groups summary:
Consumer Group : _confluent-ksql-akscfk231.ksqldb_query_CSAS_STREAM002_1
<no offsets committed by this group yet>
## stream 一覧の取得
[appuser@ksql-client2 ~]$ ksql --execute "SHOW STREAMS;" -- http://ksqldb.akscfk231.svc.cluster.local:8088
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
Stream Name | Kafka Topic | Key Format | Value Format | Windowed
------------------------------------------------------------------
STREAM001 | topic001 | KAFKA | AVRO | false
STREAM002 | topic002 | KAFKA | JSON_SR | false
------------------------------------------------------------------
## ちなみに、既存の stream(stream001) を再度作成しようとするとエラーになる
[appuser@ksql-client2 ~]$ ksql --file /ituru/queries001.sql -- http://ksqldb.akscfk231.svc.cluster.local:8088
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
Cannot add stream 'STREAM001': A stream with the same name already exists
在Confluent Control Center进行确认
$ kubectl confluent dashboard controlcenter
http://localhost:9021

进行善后工作
Pod / secret / namespace 的卸载方法
## Pod : confluent-operator
$ helm delete confluent-operator
## Pod : confluent-platform
$ kubectl delete -f confluent_platform_ccc.yaml
## Pod : 作業用
$ kubectl delete -f ksql-client2.yaml
## namespace の削除方法(namespace配下のPodは全て削除される)
$ kubectl delete namespace akscfk231
停止:停用AKS群集
起動:启动AKS群集
$ az aks stop -g rg_ituru_aks01 -n aks_ituru_cp01
$ az aks start -g rg_ituru_aks01 -n aks_ituru_cp01
检查是否有错误信息。
$ kubectl get events
$ kubectl logs ksql-client2
总结
我已经确认了使用ksql cli在Confluent Platform上可以配置流。然而,当数据流进来时,我们需要再次调整数据转换方法等。下次我想要尝试实际进行数据流。
请提供相关信息。
我已参考以下信息。
Confluent平台的高级配置选项
Kubernetes上的KSQL应用部署示例
配置ksqlDB CLI
ksqlDB
ksqlDB文档
创建流
KSQL实例
配置键转换器和值转换器
使用带有模式注册表的Kafka Connect