我在 AKS 上使用 Confluent for Kubernetes 配置了 Confluent Platform,并进行了连接器插件的编程

簡述

Confluent for Kubernetes(CFK)是一个云原生的控制面板,用于在私有云环境(本例为Azure Kubernetes Service(AKS))中部署和管理Confluent。它提供了声明式API,以标准且简单的接口来定制、部署和管理Confluent平台。

使用CFK在AKS上部署的Confluent Platform,为配置Connector设计的工作流概述如下。

    1. 准备Kubernetes环境(通过预先准备完成)

 

    1. 部署Confluent for Kubernetes(通过预先准备完成)

 

    1. 部署Confluent Platform(通过预先准备完成)

 

    1. 使用Connector Plugin容器重新部署Confluent Platform

 

    验证Connector Plugin
image.png

本地环境

    • macOS Monterey 12.3.1

 

    • python 3.8.12

 

    • Azure CLI 2.34.1

 

    • helm v3.6.3

 

    kubectl v1.21.3

预先准备

    1. 执行此文章时,AKS集群环境已被建立。

执行此文章时,已在AKS上部署了Confluent平台。

参考此文章时,新定义的Connector插件已包含在Connect容器映像中,并已在DockerHub上发布。


在连接器插件容器中重新部署Confluent平台。

更改Confluent Platform的配置

将上述预先准备的第二步中的 “confluent_platform_ccc.yaml” 更改为使用含有 CosmosDB 连接器插件的 Docker 镜像。


##以下の部分の変更
apiVersion: platform.confluent.io/v1beta1
kind: Connect
metadata:
  name: connect
  namespace: akscfk231
spec:
  replicas: 1
  image:
    application: keisz/confluent_connector:1.0         # <--- ここを変更
    init: confluentinc/confluent-init-container:2.3.0
  dependencies:
    kafka:
      bootstrapEndpoint: kafka:9071

重新部署Confluent平台

$ kubectl apply -f confluent_platform_ccc.yaml

zookeeper.platform.confluent.io/zookeeper unchanged
kafka.platform.confluent.io/kafka unchanged
connect.platform.confluent.io/connect configured
ksqldb.platform.confluent.io/ksqldb unchanged
controlcenter.platform.confluent.io/controlcenter unchanged
schemaregistry.platform.confluent.io/schemaregistry unchanged
kafkarestproxy.platform.confluent.io/kafkarestproxy unchanged

插件连接器的验证

更改Pod / svc。

$ kubectl get pods

NAME                                  READY   STATUS    RESTARTS   AGE
confluent-operator-76d7677b8c-pdp8s   1/1     Running   0          13m
connect-0                             1/1     Running   0          111s
controlcenter-0                       1/1     Running   0          8m20s
kafka-0                               1/1     Running   0          9m51s
kafka-1                               1/1     Running   0          9m51s
kafka-2                               1/1     Running   0          9m51s
kafkarestproxy-0                      1/1     Running   0          8m21s
ksqldb-0                              1/1     Running   0          8m20s
schemaregistry-0                      1/1     Running   0          8m20s
zookeeper-0                           1/1     Running   0          12m
zookeeper-1                           1/1     Running   0          12m
zookeeper-2                           1/1     Running   0          12m


$ kubectl get svc

NAME                        TYPE        CLUSTER-IP   EXTERNAL-IP   PORT(S)                                                          AGE
confluent-operator          ClusterIP   10.1.0.231   <none>        7778/TCP                                                         15m
connect                     ClusterIP   None         <none>        8083/TCP,7203/TCP,7777/TCP,7778/TCP                              14m
connect-0-internal          ClusterIP   10.1.0.232   <none>        8083/TCP,7203/TCP,7777/TCP,7778/TCP                              14m
controlcenter               ClusterIP   None         <none>        9021/TCP,7203/TCP,7777/TCP,7778/TCP                              10m
controlcenter-0-internal    ClusterIP   10.1.0.95    <none>        9021/TCP,7203/TCP,7777/TCP,7778/TCP                              10m
kafka                       ClusterIP   None         <none>        9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP   11m
kafka-0-internal            ClusterIP   10.1.0.181   <none>        9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP   11m
kafka-1-internal            ClusterIP   10.1.0.103   <none>        9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP   11m
kafka-2-internal            ClusterIP   10.1.0.27    <none>        9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP   11m
kafkarestproxy              ClusterIP   None         <none>        8082/TCP,7203/TCP,7777/TCP,7778/TCP                              10m
kafkarestproxy-0-internal   ClusterIP   10.1.0.107   <none>        8082/TCP,7203/TCP,7777/TCP,7778/TCP                              10m
ksqldb                      ClusterIP   None         <none>        8088/TCP,7203/TCP,7777/TCP,7778/TCP                              10m
ksqldb-0-internal           ClusterIP   10.1.0.22    <none>        8088/TCP,7203/TCP,7777/TCP,7778/TCP                              10m
schemaregistry              ClusterIP   None         <none>        8081/TCP,7203/TCP,7777/TCP,7778/TCP                              10m
schemaregistry-0-internal   ClusterIP   10.1.0.227   <none>        8081/TCP,7203/TCP,7777/TCP,7778/TCP                              10m
zookeeper                   ClusterIP   None         <none>        2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP            14m
zookeeper-0-internal        ClusterIP   10.1.0.66    <none>        2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP            14m
zookeeper-1-internal        ClusterIP   10.1.0.21    <none>        2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP            14m
zookeeper-2-internal        ClusterIP   10.1.0.118   <none>        2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP            14m

连接器的确认

为确认,将连接重定向到本地主机。

$ kubectl port-forward --address localhost svc/connect 8083:8083
Forwarding from 127.0.0.1:8083 -> 8083
Forwarding from [::1]:8083 -> 8083

※ CTRL+C で終了できます

打开另一个终端,并确认存在Connector的Plug-in(确认CosmosDB的Plugin)。

$ curl http://localhost:8083/connector-plugins | jq

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   504  100   504    0     0   2030      0 --:--:-- --:--:-- --:--:--  2032
[
  {
    "class": "com.azure.cosmos.kafka.connect.sink.CosmosDBSinkConnector",
    "type": "sink",
    "version": "null"
  },
  {
    "class": "com.azure.cosmos.kafka.connect.source.CosmosDBSourceConnector",
    "type": "source",
    "version": "null"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
    "type": "source",
    "version": "1"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
    "type": "source",
    "version": "1"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
    "type": "source",
    "version": "1"
  }
]

在Confluent Control Center进行确认

$ kubectl confluent dashboard controlcenter
http://localhost:9021
スクリーンショット 2022-06-29 14.51.19.png

处理后

Pod / secret / namespace 的卸载方法

## Pod : confluent-operator
$ helm delete confluent-operator

## Pod : confluent-platform
$ kubectl delete -f confluent_platform_ccc.yaml

## namespace の削除方法(namespace配下のPodは全て削除される)
$ kubectl delete namespace akscfk231

停止和启动AKS集群。

$ az aks stop -g rg_ituru_aks01 -n aks_ituru_cp01
$ az aks start -g rg_ituru_aks01 -n aks_ituru_cp01

检查事件中是否有任何错误消息

kubectl get events

总结

我已经确认在已安装Connector Plugin(自定义配置)的Docker镜像中,可以配置Confluent Platform。

参考信息

我已参考了以下信息。

Kubernetes的汇聚平台

bannerAds