我尝试使用 Confluent for Kubernetes 在 AKS 上配置 Confluent 平台 – KSQL 流创建部分

简要

Confluent for Kubernetes (CFK) 是一个云原生的控制平台,用于在私有云环境(本例中为Azure Kubernetes Service(AKS))上部署和管理Confluent。它具有基于声明性API的标准且简单的界面,可用于定制、部署和管理Confluent平台。

使用CFK将Confluent Platform的KSQL部署在AKS上,创建流程来创建Stream的工作流程概述如下。

    1. 准备 Kubernetes 环境(在预先准备完成的情况下)

 

    1. 部署 Confluent for Kubernetes(在预先准备完成的情况下)

 

    1. 部署 Confluent Platform(包括所需的 Connector Plugin)(在预先准备完成的情况下)

 

    1. 执行 Confluent Platform 的其他部署(创建主题)(在预先准备完成的情况下)

 

    1. 执行 Confluent Platform 的其他部署(创建源/汇 Connector)(在预先准备完成的情况下)

 

    1. 为 Confluent Platform 的流进行额外配置

 

    执行 Confluent Platform 的流的额外部署(创建流)
image.png

本地环境

    • macOS Monterey 12.3.1

 

    • python 3.8.12

 

    • Azure CLI 2.34.1

 

    • helm v3.6.3

 

    kubectl v1.21.3

事前准备

image.png

新增配置的Confluent平台

更改Confluent平台的配置

为了使用KSQL REST API(ksqldb-cli),我们需要在之前的”confluent_platform_ccc.yaml”文件中添加一个”kind: KsqlDB”和”spec: – configOverrides: – server:”的定义。

apiVersion: platform.confluent.io/v1beta1
kind: KsqlDB
metadata:
  name: ksqldb
  namespace: akscfk231
spec:
  replicas: 1
  image:
    application: confluentinc/cp-ksqldb-server:7.1.0
    init: confluentinc/confluent-init-container:2.3.0
  dataVolumeCapacity: 10Gi
  configOverrides:
    server:
      - ksql.schema.registry.url=http://schemaregistry.akscfk231.svc.cluster.local:8081
      - listeners=http://ksqldb.akscfk231.svc.cluster.local:8088

在Confluent平台上进行附加部署

将附加设置反映到Confluent Platform的ksqldb中。

将前述添加定义的”confluent_platform_ccc.yaml”重新应用。仅将ksqldb配置为已配置的设置。

$ kubectl apply -f confluent_platform_ccc.yaml
zookeeper.platform.confluent.io/zookeeper unchanged
kafka.platform.confluent.io/kafka unchanged
connect.platform.confluent.io/connect unchanged
ksqldb.platform.confluent.io/ksqldb configured       # <--- ここだけ configured
controlcenter.platform.confluent.io/controlcenter unchanged
schemaregistry.platform.confluent.io/schemaregistry unchanged
kafkarestproxy.platform.confluent.io/kafkarestproxy unchanged

确认已部署的 Confluent Platform 资源

ksqldb的Configmap信息

## Configmap 一覧の取得
$ kubectl get configmap                 
NAME                           DATA   AGE
connect-init-config            3      26m
connect-shared-config          5      26m
controlcenter-init-config      3      23m
controlcenter-shared-config    6      23m
kafka-init-config              3      24m
kafka-shared-config            5      24m
kafkarestproxy-init-config     3      23m
kafkarestproxy-shared-config   4      23m
ksqldb-init-config             3      23m
ksqldb-shared-config           5      23m
kube-root-ca.crt               1      29m
schemaregistry-init-config     3      23m
schemaregistry-shared-config   4      23m
zookeeper-init-config          3      26m
zookeeper-shared-config        6      26m

## ksqldb の Configmap 情報
$ kubectl describe configmap ksqldb-shared-config
Name:         ksqldb-shared-config
Namespace:    akscfk231
Labels:       app.kubernetes.io/managed-by=confluent-operator
              confluent-platform=true
              cr-name=ksqldb
              type=ksqldb
Annotations:  platform.confluent.io/cr-name: ksqldb
              platform.confluent.io/last-applied:
                eyJkYXRhIjp7ImRpc2stdXNhZ2UtYWdlbnQucHJvcGVydGllcyI6ImRpc2suZGF0YT0vbW50L2RhdGEvZGF0YVxuc2VydmljZS5uYW1lPWtzcWxkYlxuIiwiam14LWV4cG9ydGVyLn...
              platform.confluent.io/namespace: akscfk231
              platform.confluent.io/type: ksqldb

Data
====
jvm.config:
----
-Dcom.sun.management.jmxremote=true
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.local.only=false
-Dcom.sun.management.jmxremote.port=7203
-Dcom.sun.management.jmxremote.rmi.port=7203
-Dcom.sun.management.jmxremote.ssl=false
-Djava.awt.headless=true
-Djdk.tls.ephemeralDHKeySize=2048
-Djdk.tls.server.enableSessionTicketExtension=false
-XX:+ExplicitGCInvokesConcurrent
-XX:+PrintFlagsFinal
-XX:+UnlockDiagnosticVMOptions
-XX:+UseG1GC
-XX:ConcGCThreads=1
-XX:G1HeapRegionSize=16
-XX:InitiatingHeapOccupancyPercent=35
-XX:MaxGCPauseMillis=20
-XX:MaxMetaspaceFreeRatio=80
-XX:MetaspaceSize=96m
-XX:MinMetaspaceFreeRatio=50
-XX:ParallelGCThreads=1
-server

ksqldb.properties:
----
authentication.skip.paths=/chc/live,/chc/ready
bootstrap.servers=kafka.akscfk231.svc.cluster.local:9071
config.providers=file
config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider
confluent.support.metrics.enable=true
ksql.schema.registry.url=http://schemaregistry.akscfk231.svc.cluster.local:8081   # <--- 追加された設定
ksql.service.id=akscfk231.ksqldb_
ksql.sink.replicas=3
ksql.streams.num.standby.replicas=1
ksql.streams.producer.confluent.batch.expiry.ms=9223372036854775807
ksql.streams.producer.max.block.ms=9223372036854775807
ksql.streams.producer.request.timeout.ms=300000
ksql.streams.producer.retries=2147483647
ksql.streams.replication.factor=3
ksql.streams.state.dir=/mnt/data/data/ksql-state
listeners=http://ksqldb.akscfk231.svc.cluster.local:8088                          # <--- 追加された設定
request.timeout.ms=20000
retry.backoff.ms=500
security.protocol=PLAINTEXT

log4j.properties:
----
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.EnhancedPatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%p] %d [%t] %c %M - %m%n
log4j.rootLogger=INFO, stdout

disk-usage-agent.properties:
----
disk.data=/mnt/data/data
service.name=ksqldb

jmx-exporter.yaml:
----
lowercaseOutputLabelNames: false
lowercaseOutputName: true
ssl: false

Events:  <none>

Ksqldb 的详细信息

$ kubectl describe ksqldb ksqldb
Name:         ksqldb
Namespace:    akscfk231
Labels:       <none>
Annotations:  <none>
API Version:  platform.confluent.io/v1beta1
Kind:         KsqlDB
Metadata:
  Creation Timestamp:  2022-08-18T05:56:26Z
  Finalizers:
    ksqldb.finalizers.platform.confluent.io
  Generation:  1
  Managed Fields:
    API Version:  platform.confluent.io/v1beta1
    Fields Type:  FieldsV1
    fieldsV1:
      f:metadata:
        f:annotations:
          .:
          f:kubectl.kubernetes.io/last-applied-configuration:
      f:spec:
        .:
        f:configOverrides:
          .:
          f:server:
        f:dataVolumeCapacity:
        f:image:
          .:
          f:application:
          f:init:
        f:replicas:
    Manager:      kubectl-client-side-apply
    Operation:    Update
    Time:         2022-08-18T05:56:26Z
    API Version:  platform.confluent.io/v1beta1
    Fields Type:  FieldsV1
    fieldsV1:
      f:metadata:
        f:finalizers:
          .:
          v:"ksqldb.finalizers.platform.confluent.io":
    Manager:      manager
    Operation:    Update
    Time:         2022-08-18T05:56:26Z
    API Version:  platform.confluent.io/v1beta1
    Fields Type:  FieldsV1
    fieldsV1:
      f:status:
        .:
        f:clusterName:
        f:clusterNamespace:
        f:conditions:
        f:currentReplicas:
        f:internalTopicNames:
        f:kafka:
          .:
          f:bootstrapEndpoint:
        f:operatorVersion:
        f:phase:
        f:readyReplicas:
        f:replicas:
        f:restConfig:
          .:
          f:internalEndpoint:
        f:selector:
        f:serviceID:
    Manager:         manager
    Operation:       Update
    Subresource:     status
    Time:            2022-08-18T06:04:03Z
  Resource Version:  5691
  UID:               3cc5b96a-245c-4982-a21d-28a178c1a1e5
Spec:
  Config Overrides:
    Server:
      ksql.schema.registry.url=http://schemaregistry.akscfk231.svc.cluster.local:8081   # <--- 追加された設定
      listeners=http://ksqldb.akscfk231.svc.cluster.local:8088                          # <--- 追加された設定
  Data Volume Capacity:  10Gi
  Image:
    Application:  confluentinc/cp-ksqldb-server:7.1.0
    Init:         confluentinc/confluent-init-container:2.3.0
  Replicas:       1
Status:
  Cluster Name:       ksqldb
  Cluster Namespace:  akscfk231
  Conditions:
    Last Probe Time:       2022-08-18T06:01:01Z
    Last Transition Time:  2022-08-18T06:04:03Z
    Message:               Deployment has minimum availability.
    Reason:                MinimumReplicasAvailable
    Status:                True
    Type:                  platform.confluent.io/statefulset-available
    Last Probe Time:       2022-08-18T06:01:01Z
    Last Transition Time:  2022-08-18T06:04:03Z
    Message:               Kubernetes resources ready.
    Reason:                KubernetesResourcesReady
    Status:                True
    Type:                  platform.confluent.io/resources-ready
    Last Probe Time:       2022-08-18T06:01:01Z
    Last Transition Time:  2022-08-18T06:01:01Z
    Message:               Cluster is not being garbage collected
    Reason:                Garbage Collection not triggered
    Status:                False
    Type:                  platform.confluent.io/garbage-collecting
  Current Replicas:        1
  Internal Topic Names:
    _confluent-ksql-akscfk231.ksqldb__command_topic
    _confluent-ksql-akscfk231.ksqldb__configs
  Kafka:
    Bootstrap Endpoint:  kafka.akscfk231.svc.cluster.local:9071
  Operator Version:      v0.435.23
  Phase:                 RUNNING
  Ready Replicas:        1
  Replicas:              1
  Rest Config:
    Internal Endpoint:  http://ksqldb.akscfk231.svc.cluster.local:8088
  Selector:             app=ksqldb,clusterId=akscfk231,confluent-platform=true,type=ksqldb
  Service ID:           akscfk231.ksqldb_
Events:
  Type     Reason            Age                 From    Message
  ----     ------            ----                ----    -------
  Warning  Warning           39m (x19 over 44m)  ksqlDB  waiting for at-least one kafka pod availability
  Normal   SuccessfulCreate  39m (x2 over 39m)   ksqlDB  resource type *v1.Service successfully created
  Normal   SuccessfulCreate  39m (x2 over 39m)   ksqlDB  resource type *v1.ConfigMap successfully created
  Normal   SuccessfulCreate  39m                 ksqlDB  resource type *v1.PersistentVolumeClaim successfully created
  Normal   SuccessfulCreate  39m                 ksqlDB  resource type *v1.StatefulSet successfully created

增加流的配置

    • 以下の2つのクエリを Configmap で定義します

「topic001」→「stream001」用のクエリとして「queries001.sql」を定義

必要なカラムのみを stream001 に転送します

「stream001」をベースに「stream002」→「topic002」用のクエリとして「queries002.sql」を定義

カラム – section : ‘C’, ‘E’, ‘F’, ‘W’ のデータのみ抽出し、topic002 に転送します

Pod 的配置用於作業。

使用Configmap定义两个查询的Pod配置文件

---
apiVersion: v1
kind: Pod
metadata:
  name: ksql-client2
  namespace: akscfk231
spec:
  containers:
  - name: ksql-client2
    image: confluentinc/ksqldb-cli:latest
    env:
      - name: KSQL_BOOTSTRAP_SERVERS
        value: PLAINTEXT://kafka.akscfk231.svc.cluster.local:9092
      - name: KSQL_KSQL_SCHEMA_REGISTRY_URL
        value: http://schemaregistry.akscfk231.svc.cluster.local:8081
    volumeMounts:
      - mountPath: /ituru
        name: ksql-queries
  volumes:
  - name: ksql-queries
    configMap:
      name: demo-ksql-client-queries-configmap
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: demo-ksql-client-queries-configmap
  namespace: akscfk231
data:                         # <--- 以下に2つのクエリを定義する
  queries001.sql: |-
    CREATE STREAM stream001 (
      id VARCHAR,
      ctid BIGINT,
      section VARCHAR,
      iot_state VARCHAR,
      val_1 DOUBLE,
      val_2 DOUBLE,
      created_at VARCHAR
    ) WITH (kafka_topic = 'topic001', value_format = 'avro');
  queries002.sql: |-
    CREATE STREAM stream002 WITH (kafka_topic = 'topic002', value_format = 'JSON_SR') AS 
    SELECT * FROM stream001 WHERE section='C' OR section='E' OR section='F' OR section='W';

启动作业使用的 Pod

## ksqldb-cli Pod の作成
$ kubectl apply -f ksql-client2.yaml 
pod/ksql-client2 created
configmap/demo-ksql-client-queries-configmap created

确认使用作业Pod

$ kubectl get pods
NAME                                  READY   STATUS    RESTARTS      AGE
confluent-operator-76d7677b8c-ll6rf   1/1     Running   0             17m
connect-0                             1/1     Running   1 (11m ago)   16m
controlcenter-0                       1/1     Running   0             12m
kafka-0                               1/1     Running   0             13m
kafka-1                               1/1     Running   0             13m
kafka-2                               1/1     Running   0             13m
kafkarestproxy-0                      1/1     Running   0             12m
ksql-client2                          1/1     Running   0             54s
ksqldb-0                              1/1     Running   1 (10m ago)   12m
schemaregistry-0                      1/1     Running   0             12m
zookeeper-0                           1/1     Running   0             16m
zookeeper-1                           1/1     Running   0             16m
zookeeper-2                           1/1     Running   0             16m

确认 Configmap

## ksqldb-cli の Configmap 設定情報確認
$ kubectl get configmap demo-ksql-client-queries-configmap
NAME                                 DATA   AGE
demo-ksql-client-queries-configmap   2      112s


## ksqldb-cli の Configmap 詳細情報確認
$ kubectl describe configmap demo-ksql-client-queries-configmap
Name:         demo-ksql-client-queries-configmap
Namespace:    akscfk231
Labels:       <none>
Annotations:  <none>

Data
====
queries001.sql:
----
CREATE STREAM stream001 (
  id VARCHAR,
  ctid BIGINT,
  section VARCHAR,
  iot_state VARCHAR,
  val_1 DOUBLE,
  val_2 DOUBLE,
  created_at VARCHAR
) WITH (kafka_topic = 'topic001', value_format = 'avro');
queries002.sql:
----
CREATE STREAM stream002 WITH (kafka_topic = 'topic002', value_format = 'JSON_SR') AS 
SELECT * FROM stream001 WHERE section='C' OR section='E' OR section='F' OR section='W';
Events:  <none>

直播設置

使用 ksqldb cli 进行创建

请连接到上述创建的工作 Pod,使用 ksqldb cli 来配置流。

## 作業用 Pod への接続
$ kubectl exec -it ksql-client2 -- /bin/bash


## 設定ファイルの確認(stream001)
[appuser@ksql-client2 ~]$ cat /ituru/queries001.sql
CREATE STREAM stream001 (
  id VARCHAR,
  ctid BIGINT,
  section VARCHAR,
  iot_state VARCHAR,
  val_1 DOUBLE,
  val_2 DOUBLE,
  created_at VARCHAR
) WITH (kafka_topic = 'topic001', value_format = 'avro');

## 設定ファイルの確認(stream002)
[appuser@ksql-client2 ituru]$ cat /ituru/queries002.sql
CREATE STREAM stream002 WITH (kafka_topic = 'topic002', value_format = 'JSON_SR') AS 
SELECT * FROM stream001 WHERE section='C' OR section='E' OR section='F' OR section='W';


## 現在の topic 一覧の取得
[appuser@ksql-client2 ~]$ ksql --execute "SHOW TOPICS;" -- http://ksqldb.akscfk231.svc.cluster.local:8088
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.

 Kafka Topic                       | Partitions | Partition Replicas 
---------------------------------------------------------------------
 _schemas_schemaregistry_akscfk231 | 1          | 3                  
 akscfk231.connect-configs         | 1          | 3                  
 akscfk231.connect-offsets         | 25         | 3                  
 akscfk231.connect-status          | 5          | 3                  
 topic001                          | 1          | 3                  
 topic002                          | 1          | 3                  
---------------------------------------------------------------------


## stream001 の作成 
[appuser@ksql-client2 ~]$ ksql --file /ituru/queries001.sql -- http://ksqldb.akscfk231.svc.cluster.local:8088
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.

 Message        
----------------
 Stream created 
----------------

## stream001 の確認 
[appuser@ksql-client2 ~]$ ksql --execute "DESCRIBE stream001 EXTENDED;" -- http://ksqldb.akscfk231.svc.cluster.local:8088
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.

Name                 : STREAM001
Type                 : STREAM
Timestamp field      : Not set - using <ROWTIME>
Key format           : KAFKA
Value format         : AVRO
Kafka topic          : topic001 (partitions: 1, replication: 3)
Statement            : CREATE STREAM STREAM001 (ID STRING, CTID BIGINT, SECTION STRING, IOT_STATE STRING, VAL_1 DOUBLE, VAL_2 DOUBLE, CREATED_AT STRING) WITH (KAFKA_TOPIC='topic001', KEY_FORMAT='KAFKA', VALUE_FORMAT='AVRO');

 Field      | Type            
------------------------------
 ID         | VARCHAR(STRING) 
 CTID       | BIGINT          
 SECTION    | VARCHAR(STRING) 
 IOT_STATE  | VARCHAR(STRING) 
 VAL_1      | DOUBLE          
 VAL_2      | DOUBLE          
 CREATED_AT | VARCHAR(STRING) 
------------------------------

Local runtime statistics
------------------------

(Statistics of the local KSQL server interaction with the Kafka topic topic001)


## stream002 の作成 
[appuser@ksql-client2 ~]$ ksql --file /ituru/queries002.sql -- http://ksqldb.akscfk231.svc.cluster.local:8088
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.

 Message                                
----------------------------------------
 Created query with ID CSAS_STREAM002_1 
----------------------------------------

## stream002 の確認 
[appuser@ksql-client2 ~]$ ksql --execute "DESCRIBE stream002 EXTENDED;" -- http://ksqldb.akscfk231.svc.cluster.local:8088
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.

Name                 : STREAM002
Type                 : STREAM
Timestamp field      : Not set - using <ROWTIME>
Key format           : KAFKA
Value format         : JSON_SR
Kafka topic          : topic002 (partitions: 1, replication: 3)
Statement            : CREATE STREAM STREAM002 WITH (KAFKA_TOPIC='topic002', PARTITIONS=1, REPLICAS=3, VALUE_FORMAT='JSON_SR') AS SELECT *
FROM STREAM001 STREAM001
WHERE ((((STREAM001.SECTION = 'C') OR (STREAM001.SECTION = 'E')) OR (STREAM001.SECTION = 'F')) OR (STREAM001.SECTION = 'W'))
EMIT CHANGES;

 Field      | Type            
------------------------------
 ID         | VARCHAR(STRING) 
 CTID       | BIGINT          
 SECTION    | VARCHAR(STRING) 
 IOT_STATE  | VARCHAR(STRING) 
 VAL_1      | DOUBLE          
 VAL_2      | DOUBLE          
 CREATED_AT | VARCHAR(STRING) 
------------------------------

Queries that write from this STREAM
-----------------------------------
CSAS_STREAM002_1 (RUNNING) : CREATE STREAM STREAM002 WITH (KAFKA_TOPIC='topic002', PARTITIONS=1, REPLICAS=3, VALUE_FORMAT='JSON_SR') AS SELECT * FROM STREAM001 STREAM001 WHERE ((((STREAM001.SECTION = 'C') OR (STREAM001.SECTION = 'E')) OR (STREAM001.SECTION = 'F')) OR (STREAM001.SECTION = 'W')) EMIT CHANGES;

For query topology and execution plan please run: EXPLAIN <QueryId>

Local runtime statistics
------------------------

(Statistics of the local KSQL server interaction with the Kafka topic topic002)

Consumer Groups summary:

Consumer Group       : _confluent-ksql-akscfk231.ksqldb_query_CSAS_STREAM002_1
<no offsets committed by this group yet>


## stream 一覧の取得 
[appuser@ksql-client2 ~]$ ksql --execute "SHOW STREAMS;" -- http://ksqldb.akscfk231.svc.cluster.local:8088
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.

 Stream Name | Kafka Topic | Key Format | Value Format | Windowed 
------------------------------------------------------------------
 STREAM001   | topic001    | KAFKA      | AVRO         | false    
 STREAM002   | topic002    | KAFKA      | JSON_SR      | false    
------------------------------------------------------------------


## ちなみに、既存の stream(stream001) を再度作成しようとするとエラーになる
[appuser@ksql-client2 ~]$ ksql --file /ituru/queries001.sql -- http://ksqldb.akscfk231.svc.cluster.local:8088
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
Cannot add stream 'STREAM001': A stream with the same name already exists

在Confluent Control Center进行确认

$ kubectl confluent dashboard controlcenter
http://localhost:9021
image.png

进行善后工作

Pod / secret / namespace 的卸载方法

## Pod : confluent-operator
$ helm delete confluent-operator             

## Pod : confluent-platform
$ kubectl delete -f confluent_platform_ccc.yaml

## Pod : 作業用
$ kubectl delete -f ksql-client2.yaml

## namespace の削除方法(namespace配下のPodは全て削除される)
$ kubectl delete namespace akscfk231

停止:停用AKS群集
起動:启动AKS群集

$ az aks stop -g rg_ituru_aks01 -n aks_ituru_cp01
$ az aks start -g rg_ituru_aks01 -n aks_ituru_cp01

检查是否有错误信息。

$ kubectl get events
$ kubectl logs ksql-client2

总结

我已经确认了使用ksql cli在Confluent Platform上可以配置流。然而,当数据流进来时,我们需要再次调整数据转换方法等。下次我想要尝试实际进行数据流。

请提供相关信息。

我已参考以下信息。

Confluent平台的高级配置选项
Kubernetes上的KSQL应用部署示例
配置ksqlDB CLI
ksqlDB
ksqlDB文档
创建流
KSQL实例
配置键转换器和值转换器
使用带有模式注册表的Kafka Connect

广告
将在 10 秒后关闭
bannerAds