我在 AKS 上使用 Confluent for Kubernetes 配置了 Confluent Platform,并进行了连接器插件的编程
簡述
Confluent for Kubernetes(CFK)是一个云原生的控制面板,用于在私有云环境(本例为Azure Kubernetes Service(AKS))中部署和管理Confluent。它提供了声明式API,以标准且简单的接口来定制、部署和管理Confluent平台。
使用CFK在AKS上部署的Confluent Platform,为配置Connector设计的工作流概述如下。
-
- 准备Kubernetes环境(通过预先准备完成)
-
- 部署Confluent for Kubernetes(通过预先准备完成)
-
- 部署Confluent Platform(通过预先准备完成)
-
- 使用Connector Plugin容器重新部署Confluent Platform
- 验证Connector Plugin

本地环境
-
- macOS Monterey 12.3.1
-
- python 3.8.12
-
- Azure CLI 2.34.1
-
- helm v3.6.3
- kubectl v1.21.3
预先准备
-
- 执行此文章时,AKS集群环境已被建立。
执行此文章时,已在AKS上部署了Confluent平台。
参考此文章时,新定义的Connector插件已包含在Connect容器映像中,并已在DockerHub上发布。
在连接器插件容器中重新部署Confluent平台。
更改Confluent Platform的配置
将上述预先准备的第二步中的 “confluent_platform_ccc.yaml” 更改为使用含有 CosmosDB 连接器插件的 Docker 镜像。
##以下の部分の変更
apiVersion: platform.confluent.io/v1beta1
kind: Connect
metadata:
name: connect
namespace: akscfk231
spec:
replicas: 1
image:
application: keisz/confluent_connector:1.0 # <--- ここを変更
init: confluentinc/confluent-init-container:2.3.0
dependencies:
kafka:
bootstrapEndpoint: kafka:9071
重新部署Confluent平台
$ kubectl apply -f confluent_platform_ccc.yaml
zookeeper.platform.confluent.io/zookeeper unchanged
kafka.platform.confluent.io/kafka unchanged
connect.platform.confluent.io/connect configured
ksqldb.platform.confluent.io/ksqldb unchanged
controlcenter.platform.confluent.io/controlcenter unchanged
schemaregistry.platform.confluent.io/schemaregistry unchanged
kafkarestproxy.platform.confluent.io/kafkarestproxy unchanged
插件连接器的验证
更改Pod / svc。
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
confluent-operator-76d7677b8c-pdp8s 1/1 Running 0 13m
connect-0 1/1 Running 0 111s
controlcenter-0 1/1 Running 0 8m20s
kafka-0 1/1 Running 0 9m51s
kafka-1 1/1 Running 0 9m51s
kafka-2 1/1 Running 0 9m51s
kafkarestproxy-0 1/1 Running 0 8m21s
ksqldb-0 1/1 Running 0 8m20s
schemaregistry-0 1/1 Running 0 8m20s
zookeeper-0 1/1 Running 0 12m
zookeeper-1 1/1 Running 0 12m
zookeeper-2 1/1 Running 0 12m
$ kubectl get svc
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
confluent-operator ClusterIP 10.1.0.231 <none> 7778/TCP 15m
connect ClusterIP None <none> 8083/TCP,7203/TCP,7777/TCP,7778/TCP 14m
connect-0-internal ClusterIP 10.1.0.232 <none> 8083/TCP,7203/TCP,7777/TCP,7778/TCP 14m
controlcenter ClusterIP None <none> 9021/TCP,7203/TCP,7777/TCP,7778/TCP 10m
controlcenter-0-internal ClusterIP 10.1.0.95 <none> 9021/TCP,7203/TCP,7777/TCP,7778/TCP 10m
kafka ClusterIP None <none> 9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP 11m
kafka-0-internal ClusterIP 10.1.0.181 <none> 9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP 11m
kafka-1-internal ClusterIP 10.1.0.103 <none> 9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP 11m
kafka-2-internal ClusterIP 10.1.0.27 <none> 9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP 11m
kafkarestproxy ClusterIP None <none> 8082/TCP,7203/TCP,7777/TCP,7778/TCP 10m
kafkarestproxy-0-internal ClusterIP 10.1.0.107 <none> 8082/TCP,7203/TCP,7777/TCP,7778/TCP 10m
ksqldb ClusterIP None <none> 8088/TCP,7203/TCP,7777/TCP,7778/TCP 10m
ksqldb-0-internal ClusterIP 10.1.0.22 <none> 8088/TCP,7203/TCP,7777/TCP,7778/TCP 10m
schemaregistry ClusterIP None <none> 8081/TCP,7203/TCP,7777/TCP,7778/TCP 10m
schemaregistry-0-internal ClusterIP 10.1.0.227 <none> 8081/TCP,7203/TCP,7777/TCP,7778/TCP 10m
zookeeper ClusterIP None <none> 2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP 14m
zookeeper-0-internal ClusterIP 10.1.0.66 <none> 2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP 14m
zookeeper-1-internal ClusterIP 10.1.0.21 <none> 2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP 14m
zookeeper-2-internal ClusterIP 10.1.0.118 <none> 2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP 14m
连接器的确认
为确认,将连接重定向到本地主机。
$ kubectl port-forward --address localhost svc/connect 8083:8083
Forwarding from 127.0.0.1:8083 -> 8083
Forwarding from [::1]:8083 -> 8083
※ CTRL+C で終了できます
打开另一个终端,并确认存在Connector的Plug-in(确认CosmosDB的Plugin)。
$ curl http://localhost:8083/connector-plugins | jq
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 504 100 504 0 0 2030 0 --:--:-- --:--:-- --:--:-- 2032
[
{
"class": "com.azure.cosmos.kafka.connect.sink.CosmosDBSinkConnector",
"type": "sink",
"version": "null"
},
{
"class": "com.azure.cosmos.kafka.connect.source.CosmosDBSourceConnector",
"type": "source",
"version": "null"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
"type": "source",
"version": "1"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
"type": "source",
"version": "1"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"type": "source",
"version": "1"
}
]
在Confluent Control Center进行确认
$ kubectl confluent dashboard controlcenter
http://localhost:9021

处理后
Pod / secret / namespace 的卸载方法
## Pod : confluent-operator
$ helm delete confluent-operator
## Pod : confluent-platform
$ kubectl delete -f confluent_platform_ccc.yaml
## namespace の削除方法(namespace配下のPodは全て削除される)
$ kubectl delete namespace akscfk231
停止和启动AKS集群。
$ az aks stop -g rg_ituru_aks01 -n aks_ituru_cp01
$ az aks start -g rg_ituru_aks01 -n aks_ituru_cp01
检查事件中是否有任何错误消息
kubectl get events
总结
我已经确认在已安装Connector Plugin(自定义配置)的Docker镜像中,可以配置Confluent Platform。
参考信息
我已参考了以下信息。
Kubernetes的汇聚平台