我使用Confluent for Kubernetes在AKS上配置了Confluent平台-主题创建部分

摘要

CFK(Confluent for Kubernetes)是用于在私有云环境(例如 Azure Kubernetes Service(AKS))上部署和管理Confluent的云原生控制平台。它具有基于声明性API的标准且简单的接口,可用于定制、部署和管理Confluent Platform。

使用CFK在AKS上部署的Confluent Platform上创建Topic的工作流程概述如下。

    1. 准备 Kubernetes 环境(预先完成的准备工作)

 

    1. 部署 Confluent for Kubernetes(预先完成的准备工作)

 

    1. 部署 Confluent 平台(预先完成的准备工作)

 

    1. 配置 Confluent 平台的附加设置

 

    执行 Confluent 平台的附加部署(创建主题)
image.png

本地环境

    • macOS Monterey 12.3.1

 

    • python 3.8.12

 

    • Azure CLI 2.34.1

 

    • helm v3.6.3

 

    kubectl v1.21.3

准备事前

    1. 通过执行本文,确认已经成功搭建了AKS集群环境。

通过执行本文,确认已经成功在AKS上部署了Confluent Platform。


请添加 Confluent 平台的配置。

Bearer authentication secret 的创建(Bearer认证)

为了创建Topic,创建Kafka Rest Class时,我们将创建一个用于身份验证的ID/密码定义文件。

username=kafka
password=kafka-secret

创建Kafka Rest Class(Admin Rest API)的yaml文件

apiVersion: platform.confluent.io/v1beta1
kind: KafkaRestClass
metadata:
  name: default
  namespace: akscfk231
spec:
  kafkaRest:
    authentication:
      type: bearer
      bearer:
        secretRef: rest-credential

创建主题时的YAML文件定义

---
apiVersion: platform.confluent.io/v1beta1
kind: KafkaTopic
metadata:
  name: topic001
  namespace: akscfk231
spec:
  replicas: 3
  partitionCount: 1
  kafkaRest:
    authentication:
      type: bearer
      bearer:
        secretRef: rest-credential
  configs:
    cleanup.policy: "delete"
---
apiVersion: platform.confluent.io/v1beta1
kind: KafkaTopic
metadata:
  name: topic002
  namespace: akscfk231
spec:
  replicas: 3
  partitionCount: 1
  kafkaRest:
    authentication:
      type: bearer
      bearer:
        secretRef: rest-credential
  configs:
    cleanup.policy: "delete"

将Confluent平台部署在额外的环境上

我将在AKS环境中创建认证信息。

$ kubectl create secret generic rest-credential --from-file=bearer.txt=./restapibearer.txt

管理员 REST API 的部署

在AKS集群中创建Admin REST API。

$ kubectl apply -f kafka_rest_class.yaml

创建一个 Confluent 平台的主题

通过自定义资源,通过管理员 REST API 创建主题。

$ kubectl apply -f kafka_topic.yaml

确认创建的主题

$ kubectl get topic 
NAME       REPLICAS   PARTITION   STATUS    CLUSTERID                AGE
topic001   3          1           CREATED   b7gKmMeBQK2NEKC139fsog   42s
topic002   3          1           CREATED   b7gKmMeBQK2NEKC139fsog   42s

确认已部署的Confluent平台资源。

$ kubectl get confluent
NAME                                        REPLICAS   PARTITION   STATUS    CLUSTERID                AGE
kafkatopic.platform.confluent.io/topic001   3          1           CREATED   b7gKmMeBQK2NEKC139fsog   2m35s
kafkatopic.platform.confluent.io/topic002   3          1           CREATED   b7gKmMeBQK2NEKC139fsog   2m35s

NAME                                                  REPLICAS   READY   STATUS    AGE
schemaregistry.platform.confluent.io/schemaregistry   1          1       RUNNING   63m

NAME                                        REPLICAS   READY   STATUS    AGE
zookeeper.platform.confluent.io/zookeeper   3          3       RUNNING   63m

NAME                                                  REPLICAS   READY   STATUS    AGE
kafkarestproxy.platform.confluent.io/kafkarestproxy   1          1       RUNNING   63m

NAME                                REPLICAS   READY   STATUS    AGE
kafka.platform.confluent.io/kafka   3          3       RUNNING   63m

NAME                                           AGE
kafkarestclass.platform.confluent.io/default   4m55s

NAME                                  REPLICAS   READY   STATUS    AGE
ksqldb.platform.confluent.io/ksqldb   1          1       RUNNING   63m

NAME                                                REPLICAS   READY   STATUS    AGE
controlcenter.platform.confluent.io/controlcenter   1          1       RUNNING   63m

NAME                                    REPLICAS   READY   STATUS    AGE
connect.platform.confluent.io/connect   1          1       RUNNING   63m


## kafka rest class の詳細確認
$ kubectl describe kafkarestclass          
Name:         default
Namespace:    akscfk231
Labels:       <none>
Annotations:  <none>
API Version:  platform.confluent.io/v1beta1
Kind:         KafkaRestClass
Metadata:
  Creation Timestamp:  2022-06-29T06:28:19Z
  Finalizers:
    kafkarestclass.finalizers.platform.confluent.io
  Generation:  1
  Managed Fields:
    API Version:  platform.confluent.io/v1beta1
    Fields Type:  FieldsV1
    fieldsV1:
      f:metadata:
        f:annotations:
          .:
          f:kubectl.kubernetes.io/last-applied-configuration:
      f:spec:
        .:
        f:kafkaRest:
          .:
          f:authentication:
            .:
            f:bearer:
              .:
              f:secretRef:
            f:type:
    Manager:      kubectl-client-side-apply
    Operation:    Update
    Time:         2022-06-29T06:28:19Z
    API Version:  platform.confluent.io/v1beta1
    Fields Type:  FieldsV1
    fieldsV1:
      f:metadata:
        f:finalizers:
          .:
          v:"kafkarestclass.finalizers.platform.confluent.io":
    Manager:      manager
    Operation:    Update
    Time:         2022-06-29T06:28:19Z
    API Version:  platform.confluent.io/v1beta1
    Fields Type:  FieldsV1
    fieldsV1:
      f:status:
        .:
        f:endpoint:
        f:kafkaClusterID:
    Manager:         manager
    Operation:       Update
    Subresource:     status
    Time:            2022-06-29T06:28:20Z
  Resource Version:  27349
  UID:               15aaae84-8330-4459-89b2-a3f19d2a1dd5
Spec:
  Kafka Rest:
    Authentication:
      Bearer:
        Secret Ref:  rest-credential
      Type:          bearer
Status:
  Endpoint:          http://kafka.akscfk231.svc.cluster.local:8090
  Kafka Cluster ID:  b7gKmMeBQK2NEKC139fsog
Events:              <none>


## kafkatopic の詳細確認
$ kubectl describe kafkatopic       
Name:         topic001
     :
    省略
     :


Name:         topic002
Namespace:    akscfk231
Labels:       <none>
Annotations:  platform.confluent.io/last-spec-hash: WzQ9Vg/QcNB+03C54Sk+/m2scCu9E6oQpByPckcUWJ0=
API Version:  platform.confluent.io/v1beta1
Kind:         KafkaTopic
Metadata:
  Creation Timestamp:  2022-06-29T06:30:39Z
  Finalizers:
    kafkatopic.finalizers.platform.confluent.io
  Generation:  1
  Managed Fields:
    API Version:  platform.confluent.io/v1beta1
    Fields Type:  FieldsV1
    fieldsV1:
      f:metadata:
        f:annotations:
          .:
          f:kubectl.kubernetes.io/last-applied-configuration:
      f:spec:
        .:
        f:configs:
          .:
          f:cleanup.policy:
        f:kafkaRest:
          .:
          f:authentication:
            .:
            f:bearer:
              .:
              f:secretRef:
            f:type:
        f:partitionCount:
        f:replicas:
    Manager:      kubectl-client-side-apply
    Operation:    Update
    Time:         2022-06-29T06:30:39Z
    API Version:  platform.confluent.io/v1beta1
    Fields Type:  FieldsV1
    fieldsV1:
      f:metadata:
        f:annotations:
          f:platform.confluent.io/last-spec-hash:
        f:finalizers:
          .:
          v:"kafkatopic.finalizers.platform.confluent.io":
        f:ownerReferences:
          .:
          k:{"uid":"86f23b17-6529-4e6f-974d-ea6af118c9a0"}:
    Manager:    manager
    Operation:  Update
    Time:       2022-06-29T06:30:40Z
  Owner References:
    API Version:           platform.confluent.io/v1beta1
    Block Owner Deletion:  true
    Controller:            true
    Kind:                  Kafka
    Name:                  kafka
    UID:                   86f23b17-6529-4e6f-974d-ea6af118c9a0
  Resource Version:        28028
  UID:                     df9489f5-b097-4607-9c0a-c50e97355dcf
Spec:
  Configs:
    cleanup.policy:  delete
  Kafka Rest:
    Authentication:
      Bearer:
        Secret Ref:  rest-credential
      Type:          bearer
  Partition Count:   1
  Replicas:          3
Status:
  Kafka Cluster:        akscfk231/kafka
  Kafka Cluster ID:     b7gKmMeBQK2NEKC139fsog
  Kafka Rest Endpoint:  http://kafka.akscfk231.svc.cluster.local:8090
  Partition Count:      1
  Replicas:             3
  State:                CREATED
Events:                 <none>

使用Confluent插件CLI工具进行确认

## Confluent コンポーネントのバージョン確認
$ kubectl confluent version
COMPONENT       NAME            VERSION  OPERATOR-VERSION
Zookeeper       zookeeper       7.1.0    v0.435.23
Kafka           kafka           7.1.0    v0.435.23
Connect         connect         1.0      v0.435.23
SchemaRegistry  schemaregistry  7.1.0    v0.435.23
KsqlDB          ksqldb          7.1.0    v0.435.23
ControlCenter   controlcenter   7.1.0    v0.435.23


## Confluent コンポーネントへのアクセスに使用できるエンドポイントの確認
$ kubectl confluent http-endpoints
COMPONENT       NAME            ACCESS    ADDRESS                                                 AUTH   AUTHORIZATION
Kafka           kafka-rest      INTERNAL  http://kafka.akscfk231.svc.cluster.local:8090                  
Connect         connect         INTERNAL  http://connect.akscfk231.svc.cluster.local:8083                
SchemaRegistry  schemaregistry  INTERNAL  http://schemaregistry.akscfk231.svc.cluster.local:8081         
KsqlDB          ksqldb          INTERNAL  http://ksqldb.akscfk231.svc.cluster.local:8088                 
ControlCenter   controlcenter   INTERNAL  http://controlcenter.akscfk231.svc.cluster.local:9021   basic  

验证与Confluent Control Center的连接

$ kubectl confluent dashboard controlcenter
http://localhost:9021
スクリーンショット 2022-06-27 22.07.13.png

事后处理

Pod / secret / namespace 的卸载方法

## Pod : confluent-operator
$ helm delete confluent-operator             

## Pod : confluent-platform
$ kubectl delete -f confluent_platform_ccc.yaml
$ kubectl delete -f kafka_rest_class.yaml
$ kubectl delete -f kafka_topic.yaml

## secret情報
$ kubectl delete secret rest-credential

## namespace の削除方法(namespace配下のPodは全て削除される)
$ kubectl delete namespace akscfk231

AKS集群的停止与启动

$ az aks stop -g rg_ituru_aks01 -n aks_ituru_cp01
$ az aks start -g rg_ituru_aks01 -n aks_ituru_cp01

请检查事件中是否有任何错误消息

kubectl get events

总结

我已经确认我可以在AKS集群中创建一个Admin REST API,并且可以通过Custom Resource通过该API创建主题。

请提供一些相关资料

我已阅读并参考了您提供的信息。

Kubernetes 平台上的 Confluent
管理 Confluent Admin REST API
confluent-kubernetes-examples/security/production-secure-deploy/ 文件路径
Kafka REST API 部署示例
用于创建主题的示例

广告
将在 10 秒后关闭
bannerAds