我尝试使用Azure Kubernetes Service (AKS)和Confluent Platform的Helm Chart来运行应用程序

概要 – 简要版的描述

我参考了这篇文章,用一点点理解的 Helm v3,总结了使用 Confuent Platform 的 Helm Chart 和 ACR 中的容器镜像(rabbitmq)来在 AKS 上运行容器应用程序的步骤。

image.png

我将使用以下两个步骤按顺序解释上述内容。这次我将解释第2步。
第1步:在AKS上部署Confluent Platform环境
第2步:在AKS上运行容器应用程序

本地环境

macOS Big Sur 11.3: 苹果操作系统Big Sur 11.3
Python 3.8.3: Python 3.8.3
Helm 3.6.3: Helm 3.6.3

事前准备

    1. 参考にして、已经搭建好了AKS和ACR,并且已经确认在AKS集群上运行的节点。

参考此文章,将容器镜像推送到ACR。

确认在此文章中已经完成了“Helm准备”。


将设定传送到Pod

确认Pod

$ kubectl get pod -n akscp01

NAME                                       READY   STATUS    RESTARTS   AGE
cp600-cp-control-center-5b55c5676f-tn5w9   1/1     Running   3          3m46s
cp600-cp-kafka-0                           2/2     Running   1          3m46s
cp600-cp-kafka-1                           2/2     Running   0          3m8s
cp600-cp-kafka-2                           2/2     Running   0          2m46s
cp600-cp-kafka-connect-764c9bd6cd-n84qw    2/2     Running   3          3m46s
cp600-cp-ksql-server-5948c75b8b-dnfcm      2/2     Running   4          3m46s
cp600-cp-schema-registry-5d79b8c57-r4r5p   2/2     Running   3          3m46s
cp600-cp-zookeeper-0                       2/2     Running   0          3m46s
cp600-cp-zookeeper-1                       2/2     Running   0          3m8s
cp600-cp-zookeeper-2                       2/2     Running   0          2m32s
cp600db-influxdb-5ff9b5cfbc-zkvjb          1/1     Running   0          33m
cp600gf-grafana-59cff8f44b-rwlwx           1/1     Running   0          13m
cp600mq-rabbitmq-0                         1/1     Running   0          6m22s

创建 Kafka 主题

## Kafka-client への接続
$ kubectl exec -it kafka-client -n akscp01 -- /bin/bash

## 環境変数の定義
$ export RELEASE_NAME=cp600
$ export ZOOKEEPERS=${RELEASE_NAME}-cp-zookeeper:2181

## topic_201 / topic_202 の作成
$ kafka-topics --zookeeper $ZOOKEEPERS --create --topic topic_201 --partitions 3 --replication-factor 1
$ kafka-topics --zookeeper $ZOOKEEPERS --create --topic topic_202 --partitions 3 --replication-factor 1

## topic の確認
$ kafka-topics --zookeeper $ZOOKEEPERS --list

KSQL 的配置

## Ksql-client への接続
$ kubectl exec -it ksql-client -n akscp01 -- /bin/bash

## 環境変数の定義
$ export RELEASE_NAME=cp600
$ export KSQLDB=${RELEASE_NAME}-cp-ksql-server:8088

## KsqlDB への接続
$ ksql http://$KSQLDB

## topic の確認
ksql> list topics;

 Kafka Topic                   | Partitions | Partition Replicas 
-----------------------------------------------------------------
 cp600-cp-kafka-connect-config | 1          | 3                  
 cp600-cp-kafka-connect-offset | 25         | 3                  
 cp600-cp-kafka-connect-status | 5          | 3                  
 topic_201                     | 3          | 1                  
 topic_202                     | 3          | 1                  
-----------------------------------------------------------------

## stream_201 / stream_202 の作成
ksql> CREATE STREAM stream_201 (id BIGINT, time VARCHAR, proc VARCHAR, section VARCHAR, iot_num VARCHAR, iot_state VARCHAR, vol_1 DOUBLE, vol_2 DOUBLE) WITH (KAFKA_TOPIC = 'topic_201', VALUE_FORMAT='JSON'); 

ksql> CREATE STREAM stream_202 WITH (KAFKA_TOPIC = 'topic_202', VALUE_FORMAT='JSON') AS SELECT s201.section as section, s201.time as zztime, s201.proc as proc, s201.iot_num as iot_num, s201.iot_state as iot_state, s201.vol_1 as vol_1, s201.vol_2 as vol_2 FROM stream_201 s201 WHERE section='E' OR section='C' OR section='W'; 

## stream の確認
ksql> show streams;

 Stream Name | Kafka Topic | Format 
------------------------------------
 STREAM_201  | topic_201   | JSON   
 STREAM_202  | topic_202   | JSON   
------------------------------------

连线器的设置

使用了ACR的容器映像,所以需要确认是否已安装了RabbitMQ/InfluxDB的连接器插件。

$ kubectl port-forward --address localhost --namespace akscp01 svc/cp600-cp-kafka-connect 8083:8083
Forwarding from 127.0.0.1:8083 -> 8083
Forwarding from [::1]:8083 -> 8083

※ CTRL+C で終了できます

打开另一个终端窗口,确认RabbitMQSourceConnector和InfluxDBSinkConnector的插件已经存在。

$ curl http://localhost:8083/connector-plugins | jq 

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   793  100   793    0     0   6007      0 --:--:-- --:--:-- --:--:--  5962
[
  {
    "class": "io.confluent.connect.rabbitmq.RabbitMQSourceConnector",
    "type": "source",
    "version": "0.0.0.0"
  },
  {
    "class": "io.confluent.influxdb.InfluxDBSinkConnector",
    "type": "sink",
    "version": "unknown"
  },
    :
   中略
    :
]

创建RabbitMQSourceConnector

RabbitMQSourceConnector 的定义文件如下所示。

{
    "name" : "RabbitMQSourceConnector_1",
    "config" : {
      "connector.class" : "io.confluent.connect.rabbitmq.RabbitMQSourceConnector",
      "tasks.max" : "1",
      "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
      "rabbitmq.host": "cp600mq-rabbitmq",
      "rabbitmq.username": "guest",
      "rabbitmq.password": "guest",
      "rabbitmq.port": "5672",
      "kafka.topic": "topic_201",
      "rabbitmq.queue": "IoTHub"
    }
}

创建一个新的RabbitMQSourceConnector

$ curl -s -X POST -H 'Content-Type: application/json' --data @RabbitMQSourceConnector.json http://localhost:8083/connectors

{"name":"RabbitMQSourceConnector_1","config":{"connector.class":"io.confluent.connect.rabbitmq.RabbitMQSourceConnector","tasks.max":"1","value.converter":"org.apache.kafka.connect.converters.ByteArrayConverter","rabbitmq.host":"cp600mq-rabbitmq","rabbitmq.username":"guest","rabbitmq.password":"guest","rabbitmq.port":"5672","kafka.topic":"topic_201","rabbitmq.queue":"IoTHub","name":"RabbitMQSourceConnector_1"},"tasks":[],"type":"source"}

创建 InfluxDBSinkConnector

InfluxDBSinkConnector的定义文件如下所示。

{
    "name": "InfluxDBSinkConnector_1",
    "config" : {
        "value.converter.schemas.enable": "false",
        "connector.class": "io.confluent.influxdb.InfluxDBSinkConnector",
        "tasks.max": "1",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "topics": "topic_201, topic_202",
        "influxdb.url": "http://cp600db-influxdb:8086",
        "influxdb.db": "IoTSample",
        "measurement.name.format": "${topic}"
    }
}

创建一个新的InfluxDBSinkConnector

$ curl -s -X POST -H 'Content-Type: application/json' --data @InfluxDBSinkConnector.json http://localhost:8083/connectors

{"name":"InfluxDBSinkConnector_1","config":{"value.converter.schemas.enable":"false","connector.class":"io.confluent.influxdb.InfluxDBSinkConnector","tasks.max":"1","value.converter":"org.apache.kafka.connect.json.JsonConverter","topics":"topic_201, topic_202","influxdb.url":"http://influxdb:8086","influxdb.db":"IoTSample","measurement.name.format":"${topic}","name":"InfluxDBSinkConnector_1"},"tasks":[],"type":"sink"}

连接器的确认

$ curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
          jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
          column -s : -t| sed 's/\"//g'| sort

sink    |  InfluxDBSinkConnector_1    |  RUNNING  |  RUNNING  |  io.confluent.influxdb.InfluxDBSinkConnector
source  |  RabbitMQSourceConnector_1  |  RUNNING  |  RUNNING  |  io.confluent.connect.rabbitmq.RabbitMQSourceConnector

InfluxDB的配置

## influxdb への接続
$ kubectl exec -it svc/cp600db-influxdb -n akscp01 -- influx                                       

Connected to http://localhost:8086 version 1.8.5
InfluxDB shell version: 1.8.5
>

## データベース「IoTSample」の作成と確認
> create database IoTSample
> show databases
name: databases
name
----
_internal
IoTSample

Grafana的设置

为了通过GUI进行Grafana设置,我们需要进行Grafana GUI的端口转发配置。

$ kubectl port-forward --address localhost --namespace akscp01 svc/cp600gf-grafana 3000:3000
Forwarding from 127.0.0.1:3000 -> 3000
Forwarding from [::1]:3000 -> 3000

※ CTRL+C で終了できます

将在这篇文章中进行”连接Grafana和InfluxDB的设置”和”创建Grafana仪表板”的配置。
但是,请将URL设置值从http://influxdb:8086更改为http://cp600db-influxdb:8086。

RabbitMQ的配置

由于使用了ACR的容器映像,所以没有任何配置位置。


应用程序的运行确认

设置端口转发

由于容器应用程序不进行外部公开,所以需要对充当外部接口的容器应用程序 rabbitmq 和 grafana 进行端口转发的设置。

$ kubectl port-forward --address localhost --namespace akscp01 svc/cp600gf-grafana 3000:3000 &
$ kubectl port-forward --address localhost --namespace akscp01 svc/cp600mq-rabbitmq 5672:5672 &

应用程序的运行

与”实时可视化通过Grafana仪表盘”中提到的相同,我们可以通过本地终端执行IoT数据生成程序。我们生成了100条数据,每秒生成一条。

$ python IoTSampleData-v5.py --mode mq --count 100 --wait 1
image.png

整理事后的工作

停止端口转发

## 該当のプロセスを検索します
$ ps -axl | grep kubectl                                                                      
  501 35260  8527     4006   0  31  5  5052880  34332 -      SN                  0 ttys000    0:00.23 kubectl port-forward --address localhost --namespace akscp01 svc/cp600gf-grafana 3000:3000
  501 35261  8527     4006   0  31  5  5053160  33008 -      SN                  0 ttys000    0:00.20 kubectl port-forward --address localhost --namespace akscp01 svc/cp600mq-rabbitmq 5672:5672
  501 35276 18433     4006   0  31  0  4399480    828 -      S+                  0 ttys002    0:00.00 grep kubectl

## 該当のプロセスを kill します
$ kill 35260                                                                                  
$ kill 35261

卸载Pod

$ helm delete cp600gf -n akscp01
$ helm delete cp600db -n akscp01
$ helm delete cp600mq -n akscp01
$ helm delete cp600 -n akscp01

删除命名空间

$ kubectl delete namespace akscp01

总结

我以相当土气的方式强行实现了容器应用程序,但我希望花点时间,利用 ConfigMap 和各种配置文件加载等方法,追求时尚的实现方式。


本课题的步骤信息

第一步:在AKS上构建Confluent平台环境
第二步:在AKS上运行容器应用程序

参考资料

感谢您提供的以下信息。我将参考它们。推荐使用Helm v3。
Helm是用于Kubernetes的事实上的标准部署工具。

bannerAds