我使用Confluent for Kubernetes在AKS上配置了Confluent平台-主题创建部分
摘要
CFK(Confluent for Kubernetes)是用于在私有云环境(例如 Azure Kubernetes Service(AKS))上部署和管理Confluent的云原生控制平台。它具有基于声明性API的标准且简单的接口,可用于定制、部署和管理Confluent Platform。
使用CFK在AKS上部署的Confluent Platform上创建Topic的工作流程概述如下。
-
- 准备 Kubernetes 环境(预先完成的准备工作)
-
- 部署 Confluent for Kubernetes(预先完成的准备工作)
-
- 部署 Confluent 平台(预先完成的准备工作)
-
- 配置 Confluent 平台的附加设置
- 执行 Confluent 平台的附加部署(创建主题)

本地环境
-
- macOS Monterey 12.3.1
-
- python 3.8.12
-
- Azure CLI 2.34.1
-
- helm v3.6.3
- kubectl v1.21.3
准备事前
-
- 通过执行本文,确认已经成功搭建了AKS集群环境。
通过执行本文,确认已经成功在AKS上部署了Confluent Platform。
请添加 Confluent 平台的配置。
Bearer authentication secret 的创建(Bearer认证)
为了创建Topic,创建Kafka Rest Class时,我们将创建一个用于身份验证的ID/密码定义文件。
username=kafka
password=kafka-secret
创建Kafka Rest Class(Admin Rest API)的yaml文件
apiVersion: platform.confluent.io/v1beta1
kind: KafkaRestClass
metadata:
name: default
namespace: akscfk231
spec:
kafkaRest:
authentication:
type: bearer
bearer:
secretRef: rest-credential
创建主题时的YAML文件定义
---
apiVersion: platform.confluent.io/v1beta1
kind: KafkaTopic
metadata:
name: topic001
namespace: akscfk231
spec:
replicas: 3
partitionCount: 1
kafkaRest:
authentication:
type: bearer
bearer:
secretRef: rest-credential
configs:
cleanup.policy: "delete"
---
apiVersion: platform.confluent.io/v1beta1
kind: KafkaTopic
metadata:
name: topic002
namespace: akscfk231
spec:
replicas: 3
partitionCount: 1
kafkaRest:
authentication:
type: bearer
bearer:
secretRef: rest-credential
configs:
cleanup.policy: "delete"
将Confluent平台部署在额外的环境上
我将在AKS环境中创建认证信息。
$ kubectl create secret generic rest-credential --from-file=bearer.txt=./restapibearer.txt
管理员 REST API 的部署
在AKS集群中创建Admin REST API。
$ kubectl apply -f kafka_rest_class.yaml
创建一个 Confluent 平台的主题
通过自定义资源,通过管理员 REST API 创建主题。
$ kubectl apply -f kafka_topic.yaml
确认创建的主题
$ kubectl get topic
NAME REPLICAS PARTITION STATUS CLUSTERID AGE
topic001 3 1 CREATED b7gKmMeBQK2NEKC139fsog 42s
topic002 3 1 CREATED b7gKmMeBQK2NEKC139fsog 42s
确认已部署的Confluent平台资源。
$ kubectl get confluent
NAME REPLICAS PARTITION STATUS CLUSTERID AGE
kafkatopic.platform.confluent.io/topic001 3 1 CREATED b7gKmMeBQK2NEKC139fsog 2m35s
kafkatopic.platform.confluent.io/topic002 3 1 CREATED b7gKmMeBQK2NEKC139fsog 2m35s
NAME REPLICAS READY STATUS AGE
schemaregistry.platform.confluent.io/schemaregistry 1 1 RUNNING 63m
NAME REPLICAS READY STATUS AGE
zookeeper.platform.confluent.io/zookeeper 3 3 RUNNING 63m
NAME REPLICAS READY STATUS AGE
kafkarestproxy.platform.confluent.io/kafkarestproxy 1 1 RUNNING 63m
NAME REPLICAS READY STATUS AGE
kafka.platform.confluent.io/kafka 3 3 RUNNING 63m
NAME AGE
kafkarestclass.platform.confluent.io/default 4m55s
NAME REPLICAS READY STATUS AGE
ksqldb.platform.confluent.io/ksqldb 1 1 RUNNING 63m
NAME REPLICAS READY STATUS AGE
controlcenter.platform.confluent.io/controlcenter 1 1 RUNNING 63m
NAME REPLICAS READY STATUS AGE
connect.platform.confluent.io/connect 1 1 RUNNING 63m
## kafka rest class の詳細確認
$ kubectl describe kafkarestclass
Name: default
Namespace: akscfk231
Labels: <none>
Annotations: <none>
API Version: platform.confluent.io/v1beta1
Kind: KafkaRestClass
Metadata:
Creation Timestamp: 2022-06-29T06:28:19Z
Finalizers:
kafkarestclass.finalizers.platform.confluent.io
Generation: 1
Managed Fields:
API Version: platform.confluent.io/v1beta1
Fields Type: FieldsV1
fieldsV1:
f:metadata:
f:annotations:
.:
f:kubectl.kubernetes.io/last-applied-configuration:
f:spec:
.:
f:kafkaRest:
.:
f:authentication:
.:
f:bearer:
.:
f:secretRef:
f:type:
Manager: kubectl-client-side-apply
Operation: Update
Time: 2022-06-29T06:28:19Z
API Version: platform.confluent.io/v1beta1
Fields Type: FieldsV1
fieldsV1:
f:metadata:
f:finalizers:
.:
v:"kafkarestclass.finalizers.platform.confluent.io":
Manager: manager
Operation: Update
Time: 2022-06-29T06:28:19Z
API Version: platform.confluent.io/v1beta1
Fields Type: FieldsV1
fieldsV1:
f:status:
.:
f:endpoint:
f:kafkaClusterID:
Manager: manager
Operation: Update
Subresource: status
Time: 2022-06-29T06:28:20Z
Resource Version: 27349
UID: 15aaae84-8330-4459-89b2-a3f19d2a1dd5
Spec:
Kafka Rest:
Authentication:
Bearer:
Secret Ref: rest-credential
Type: bearer
Status:
Endpoint: http://kafka.akscfk231.svc.cluster.local:8090
Kafka Cluster ID: b7gKmMeBQK2NEKC139fsog
Events: <none>
## kafkatopic の詳細確認
$ kubectl describe kafkatopic
Name: topic001
:
省略
:
Name: topic002
Namespace: akscfk231
Labels: <none>
Annotations: platform.confluent.io/last-spec-hash: WzQ9Vg/QcNB+03C54Sk+/m2scCu9E6oQpByPckcUWJ0=
API Version: platform.confluent.io/v1beta1
Kind: KafkaTopic
Metadata:
Creation Timestamp: 2022-06-29T06:30:39Z
Finalizers:
kafkatopic.finalizers.platform.confluent.io
Generation: 1
Managed Fields:
API Version: platform.confluent.io/v1beta1
Fields Type: FieldsV1
fieldsV1:
f:metadata:
f:annotations:
.:
f:kubectl.kubernetes.io/last-applied-configuration:
f:spec:
.:
f:configs:
.:
f:cleanup.policy:
f:kafkaRest:
.:
f:authentication:
.:
f:bearer:
.:
f:secretRef:
f:type:
f:partitionCount:
f:replicas:
Manager: kubectl-client-side-apply
Operation: Update
Time: 2022-06-29T06:30:39Z
API Version: platform.confluent.io/v1beta1
Fields Type: FieldsV1
fieldsV1:
f:metadata:
f:annotations:
f:platform.confluent.io/last-spec-hash:
f:finalizers:
.:
v:"kafkatopic.finalizers.platform.confluent.io":
f:ownerReferences:
.:
k:{"uid":"86f23b17-6529-4e6f-974d-ea6af118c9a0"}:
Manager: manager
Operation: Update
Time: 2022-06-29T06:30:40Z
Owner References:
API Version: platform.confluent.io/v1beta1
Block Owner Deletion: true
Controller: true
Kind: Kafka
Name: kafka
UID: 86f23b17-6529-4e6f-974d-ea6af118c9a0
Resource Version: 28028
UID: df9489f5-b097-4607-9c0a-c50e97355dcf
Spec:
Configs:
cleanup.policy: delete
Kafka Rest:
Authentication:
Bearer:
Secret Ref: rest-credential
Type: bearer
Partition Count: 1
Replicas: 3
Status:
Kafka Cluster: akscfk231/kafka
Kafka Cluster ID: b7gKmMeBQK2NEKC139fsog
Kafka Rest Endpoint: http://kafka.akscfk231.svc.cluster.local:8090
Partition Count: 1
Replicas: 3
State: CREATED
Events: <none>
使用Confluent插件CLI工具进行确认
## Confluent コンポーネントのバージョン確認
$ kubectl confluent version
COMPONENT NAME VERSION OPERATOR-VERSION
Zookeeper zookeeper 7.1.0 v0.435.23
Kafka kafka 7.1.0 v0.435.23
Connect connect 1.0 v0.435.23
SchemaRegistry schemaregistry 7.1.0 v0.435.23
KsqlDB ksqldb 7.1.0 v0.435.23
ControlCenter controlcenter 7.1.0 v0.435.23
## Confluent コンポーネントへのアクセスに使用できるエンドポイントの確認
$ kubectl confluent http-endpoints
COMPONENT NAME ACCESS ADDRESS AUTH AUTHORIZATION
Kafka kafka-rest INTERNAL http://kafka.akscfk231.svc.cluster.local:8090
Connect connect INTERNAL http://connect.akscfk231.svc.cluster.local:8083
SchemaRegistry schemaregistry INTERNAL http://schemaregistry.akscfk231.svc.cluster.local:8081
KsqlDB ksqldb INTERNAL http://ksqldb.akscfk231.svc.cluster.local:8088
ControlCenter controlcenter INTERNAL http://controlcenter.akscfk231.svc.cluster.local:9021 basic
验证与Confluent Control Center的连接
$ kubectl confluent dashboard controlcenter
http://localhost:9021

事后处理
Pod / secret / namespace 的卸载方法
## Pod : confluent-operator
$ helm delete confluent-operator
## Pod : confluent-platform
$ kubectl delete -f confluent_platform_ccc.yaml
$ kubectl delete -f kafka_rest_class.yaml
$ kubectl delete -f kafka_topic.yaml
## secret情報
$ kubectl delete secret rest-credential
## namespace の削除方法(namespace配下のPodは全て削除される)
$ kubectl delete namespace akscfk231
AKS集群的停止与启动
$ az aks stop -g rg_ituru_aks01 -n aks_ituru_cp01
$ az aks start -g rg_ituru_aks01 -n aks_ituru_cp01
请检查事件中是否有任何错误消息
kubectl get events
总结
我已经确认我可以在AKS集群中创建一个Admin REST API,并且可以通过Custom Resource通过该API创建主题。
请提供一些相关资料
我已阅读并参考了您提供的信息。
Kubernetes 平台上的 Confluent
管理 Confluent Admin REST API
confluent-kubernetes-examples/security/production-secure-deploy/ 文件路径
Kafka REST API 部署示例
用于创建主题的示例