我尝试使用Azure Kubernetes Service (AKS)和Confluent Platform的Helm Chart来运行应用程序
概要 – 简要版的描述
我参考了这篇文章,用一点点理解的 Helm v3,总结了使用 Confuent Platform 的 Helm Chart 和 ACR 中的容器镜像(rabbitmq)来在 AKS 上运行容器应用程序的步骤。

我将使用以下两个步骤按顺序解释上述内容。这次我将解释第2步。
第1步:在AKS上部署Confluent Platform环境
第2步:在AKS上运行容器应用程序
本地环境
macOS Big Sur 11.3: 苹果操作系统Big Sur 11.3
Python 3.8.3: Python 3.8.3
Helm 3.6.3: Helm 3.6.3
事前准备
-
- 参考にして、已经搭建好了AKS和ACR,并且已经确认在AKS集群上运行的节点。
参考此文章,将容器镜像推送到ACR。
确认在此文章中已经完成了“Helm准备”。
将设定传送到Pod
确认Pod
$ kubectl get pod -n akscp01
NAME READY STATUS RESTARTS AGE
cp600-cp-control-center-5b55c5676f-tn5w9 1/1 Running 3 3m46s
cp600-cp-kafka-0 2/2 Running 1 3m46s
cp600-cp-kafka-1 2/2 Running 0 3m8s
cp600-cp-kafka-2 2/2 Running 0 2m46s
cp600-cp-kafka-connect-764c9bd6cd-n84qw 2/2 Running 3 3m46s
cp600-cp-ksql-server-5948c75b8b-dnfcm 2/2 Running 4 3m46s
cp600-cp-schema-registry-5d79b8c57-r4r5p 2/2 Running 3 3m46s
cp600-cp-zookeeper-0 2/2 Running 0 3m46s
cp600-cp-zookeeper-1 2/2 Running 0 3m8s
cp600-cp-zookeeper-2 2/2 Running 0 2m32s
cp600db-influxdb-5ff9b5cfbc-zkvjb 1/1 Running 0 33m
cp600gf-grafana-59cff8f44b-rwlwx 1/1 Running 0 13m
cp600mq-rabbitmq-0 1/1 Running 0 6m22s
创建 Kafka 主题
## Kafka-client への接続
$ kubectl exec -it kafka-client -n akscp01 -- /bin/bash
## 環境変数の定義
$ export RELEASE_NAME=cp600
$ export ZOOKEEPERS=${RELEASE_NAME}-cp-zookeeper:2181
## topic_201 / topic_202 の作成
$ kafka-topics --zookeeper $ZOOKEEPERS --create --topic topic_201 --partitions 3 --replication-factor 1
$ kafka-topics --zookeeper $ZOOKEEPERS --create --topic topic_202 --partitions 3 --replication-factor 1
## topic の確認
$ kafka-topics --zookeeper $ZOOKEEPERS --list
KSQL 的配置
## Ksql-client への接続
$ kubectl exec -it ksql-client -n akscp01 -- /bin/bash
## 環境変数の定義
$ export RELEASE_NAME=cp600
$ export KSQLDB=${RELEASE_NAME}-cp-ksql-server:8088
## KsqlDB への接続
$ ksql http://$KSQLDB
## topic の確認
ksql> list topics;
Kafka Topic | Partitions | Partition Replicas
-----------------------------------------------------------------
cp600-cp-kafka-connect-config | 1 | 3
cp600-cp-kafka-connect-offset | 25 | 3
cp600-cp-kafka-connect-status | 5 | 3
topic_201 | 3 | 1
topic_202 | 3 | 1
-----------------------------------------------------------------
## stream_201 / stream_202 の作成
ksql> CREATE STREAM stream_201 (id BIGINT, time VARCHAR, proc VARCHAR, section VARCHAR, iot_num VARCHAR, iot_state VARCHAR, vol_1 DOUBLE, vol_2 DOUBLE) WITH (KAFKA_TOPIC = 'topic_201', VALUE_FORMAT='JSON');
ksql> CREATE STREAM stream_202 WITH (KAFKA_TOPIC = 'topic_202', VALUE_FORMAT='JSON') AS SELECT s201.section as section, s201.time as zztime, s201.proc as proc, s201.iot_num as iot_num, s201.iot_state as iot_state, s201.vol_1 as vol_1, s201.vol_2 as vol_2 FROM stream_201 s201 WHERE section='E' OR section='C' OR section='W';
## stream の確認
ksql> show streams;
Stream Name | Kafka Topic | Format
------------------------------------
STREAM_201 | topic_201 | JSON
STREAM_202 | topic_202 | JSON
------------------------------------
连线器的设置
使用了ACR的容器映像,所以需要确认是否已安装了RabbitMQ/InfluxDB的连接器插件。
$ kubectl port-forward --address localhost --namespace akscp01 svc/cp600-cp-kafka-connect 8083:8083
Forwarding from 127.0.0.1:8083 -> 8083
Forwarding from [::1]:8083 -> 8083
※ CTRL+C で終了できます
打开另一个终端窗口,确认RabbitMQSourceConnector和InfluxDBSinkConnector的插件已经存在。
$ curl http://localhost:8083/connector-plugins | jq
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 793 100 793 0 0 6007 0 --:--:-- --:--:-- --:--:-- 5962
[
{
"class": "io.confluent.connect.rabbitmq.RabbitMQSourceConnector",
"type": "source",
"version": "0.0.0.0"
},
{
"class": "io.confluent.influxdb.InfluxDBSinkConnector",
"type": "sink",
"version": "unknown"
},
:
中略
:
]
创建RabbitMQSourceConnector
RabbitMQSourceConnector 的定义文件如下所示。
{
"name" : "RabbitMQSourceConnector_1",
"config" : {
"connector.class" : "io.confluent.connect.rabbitmq.RabbitMQSourceConnector",
"tasks.max" : "1",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"rabbitmq.host": "cp600mq-rabbitmq",
"rabbitmq.username": "guest",
"rabbitmq.password": "guest",
"rabbitmq.port": "5672",
"kafka.topic": "topic_201",
"rabbitmq.queue": "IoTHub"
}
}
创建一个新的RabbitMQSourceConnector
$ curl -s -X POST -H 'Content-Type: application/json' --data @RabbitMQSourceConnector.json http://localhost:8083/connectors
{"name":"RabbitMQSourceConnector_1","config":{"connector.class":"io.confluent.connect.rabbitmq.RabbitMQSourceConnector","tasks.max":"1","value.converter":"org.apache.kafka.connect.converters.ByteArrayConverter","rabbitmq.host":"cp600mq-rabbitmq","rabbitmq.username":"guest","rabbitmq.password":"guest","rabbitmq.port":"5672","kafka.topic":"topic_201","rabbitmq.queue":"IoTHub","name":"RabbitMQSourceConnector_1"},"tasks":[],"type":"source"}
创建 InfluxDBSinkConnector
InfluxDBSinkConnector的定义文件如下所示。
{
"name": "InfluxDBSinkConnector_1",
"config" : {
"value.converter.schemas.enable": "false",
"connector.class": "io.confluent.influxdb.InfluxDBSinkConnector",
"tasks.max": "1",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"topics": "topic_201, topic_202",
"influxdb.url": "http://cp600db-influxdb:8086",
"influxdb.db": "IoTSample",
"measurement.name.format": "${topic}"
}
}
创建一个新的InfluxDBSinkConnector
$ curl -s -X POST -H 'Content-Type: application/json' --data @InfluxDBSinkConnector.json http://localhost:8083/connectors
{"name":"InfluxDBSinkConnector_1","config":{"value.converter.schemas.enable":"false","connector.class":"io.confluent.influxdb.InfluxDBSinkConnector","tasks.max":"1","value.converter":"org.apache.kafka.connect.json.JsonConverter","topics":"topic_201, topic_202","influxdb.url":"http://influxdb:8086","influxdb.db":"IoTSample","measurement.name.format":"${topic}","name":"InfluxDBSinkConnector_1"},"tasks":[],"type":"sink"}
连接器的确认
$ curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
column -s : -t| sed 's/\"//g'| sort
sink | InfluxDBSinkConnector_1 | RUNNING | RUNNING | io.confluent.influxdb.InfluxDBSinkConnector
source | RabbitMQSourceConnector_1 | RUNNING | RUNNING | io.confluent.connect.rabbitmq.RabbitMQSourceConnector
InfluxDB的配置
## influxdb への接続
$ kubectl exec -it svc/cp600db-influxdb -n akscp01 -- influx
Connected to http://localhost:8086 version 1.8.5
InfluxDB shell version: 1.8.5
>
## データベース「IoTSample」の作成と確認
> create database IoTSample
> show databases
name: databases
name
----
_internal
IoTSample
Grafana的设置
为了通过GUI进行Grafana设置,我们需要进行Grafana GUI的端口转发配置。
$ kubectl port-forward --address localhost --namespace akscp01 svc/cp600gf-grafana 3000:3000
Forwarding from 127.0.0.1:3000 -> 3000
Forwarding from [::1]:3000 -> 3000
※ CTRL+C で終了できます
将在这篇文章中进行”连接Grafana和InfluxDB的设置”和”创建Grafana仪表板”的配置。
但是,请将URL设置值从http://influxdb:8086更改为http://cp600db-influxdb:8086。
RabbitMQ的配置
由于使用了ACR的容器映像,所以没有任何配置位置。
应用程序的运行确认
设置端口转发
由于容器应用程序不进行外部公开,所以需要对充当外部接口的容器应用程序 rabbitmq 和 grafana 进行端口转发的设置。
$ kubectl port-forward --address localhost --namespace akscp01 svc/cp600gf-grafana 3000:3000 &
$ kubectl port-forward --address localhost --namespace akscp01 svc/cp600mq-rabbitmq 5672:5672 &
应用程序的运行
与”实时可视化通过Grafana仪表盘”中提到的相同,我们可以通过本地终端执行IoT数据生成程序。我们生成了100条数据,每秒生成一条。
$ python IoTSampleData-v5.py --mode mq --count 100 --wait 1

整理事后的工作
停止端口转发
## 該当のプロセスを検索します
$ ps -axl | grep kubectl
501 35260 8527 4006 0 31 5 5052880 34332 - SN 0 ttys000 0:00.23 kubectl port-forward --address localhost --namespace akscp01 svc/cp600gf-grafana 3000:3000
501 35261 8527 4006 0 31 5 5053160 33008 - SN 0 ttys000 0:00.20 kubectl port-forward --address localhost --namespace akscp01 svc/cp600mq-rabbitmq 5672:5672
501 35276 18433 4006 0 31 0 4399480 828 - S+ 0 ttys002 0:00.00 grep kubectl
## 該当のプロセスを kill します
$ kill 35260
$ kill 35261
卸载Pod
$ helm delete cp600gf -n akscp01
$ helm delete cp600db -n akscp01
$ helm delete cp600mq -n akscp01
$ helm delete cp600 -n akscp01
删除命名空间
$ kubectl delete namespace akscp01
总结
我以相当土气的方式强行实现了容器应用程序,但我希望花点时间,利用 ConfigMap 和各种配置文件加载等方法,追求时尚的实现方式。
本课题的步骤信息
第一步:在AKS上构建Confluent平台环境
第二步:在AKS上运行容器应用程序
参考资料
感谢您提供的以下信息。我将参考它们。推荐使用Helm v3。
Helm是用于Kubernetes的事实上的标准部署工具。