使用事件驱动的方式对Kafka Streams进行自动扩展【KEDA入门】
“KEDA是什么意思?”
Kubernetes集群上运行的开源事件驱动自动缩放器(基于Kubernetes的事件驱动自动缩放器 = KEDA),通过阈值监视外部系统中的特定指标,并自动调整要扩展的应用程序(例如Kubernetes Deployment)的Pod数量。
在KEDA本身中进行度量监视,然而,应用程序Pod数量的自动调整是由Kubernetes的标准资源——水平Pod自动缩放器(以下简称HPA)来实现的。
此外,监视对象的连接信息、指标阈值、Pod数量调整范围等设置项可以通过称为ScaledObject的KEDA特有资源进行定义。
各种资源和系统之间的关系可以总结如下。
应用程序的缩放目标
本次我们将使用KEDA对预先创建的KafkaStreams应用程序进行自动扩缩容。
应用程序的处理内容将是以下的ETL处理。
-
- 从Apache Kafka中获取要处理的数据。(提取: 抽出)
-
- 仅选择数据中需要的信息,并将其转换为可插入到数据库中的格式。(转换: 変換)
- 将经过转换的数据插入到Mongo DB中。(加载: 格納)
请参考源代码以获取有关应用程序的详细信息。
通过KEDA实现自动扩缩容
那么,实际上我们来运行KEDA并观察应用程序自动缩放的情况。
※可真正运行的源代码全部存储在这个代码库中。
动作环境和前提条件 zuò hé tí
or
行动环境与前提条件 yǔ tí
为了使用Shell脚本,请假设目标运行环境是Mac或Linux(以及WSL)。
另外,还需要以下工具。
-
- Git
-
- Docker
-
- Kubernetes (クラスタ1つとkubectlコマンド)
- Helm (バージョン3)
1. 相关系统的预先设置
事先设置好在Kafka Streams应用程序中使用的Apache Kafka和Mongo DB。
if [ ! -d kafka-streams-pipeline/ ]; then
git clone https://github.com/ogi-iii/kafka-streams-pipeline.git
fi
cd kafka-streams-pipeline/
git pull
docker compose -f ./demo/docker-compose.yml up -d
./demo/scripts/create-topic.sh
cd ..
2. 预先部署可扩展的应用程序
我会提前部署KEDA将扩展的Kafka Streams应用程序。
if [ ! -d keda-auto-scalable-pipeline/ ]; then
git clone https://github.com/ogi-iii/keda-auto-scalable-pipeline.git
fi
cd keda-auto-scalable-pipeline/
git pull
docker image build -t streams-pipeline-app .
kubectl create namespace pipeline
kubectl apply -f kubernetes/pipeline-deployment.yaml
cd ..
3. KEDA设置
使用Helm来安装KEDA。
helm repo add kedacore https://kedacore.github.io/charts
helm repo update
创建一个专门为KEDA使用的namespace,并将KEDA安装到Kubernetes集群上。
kubectl create namespace keda
helm install keda kedacore/keda -n keda
4. 部署 ScaledObject
KEDA以如下方式将其独有的ScaledObject资源定义为yaml文件。
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: streams-pipeline-app-scaler
namespace: pipeline
spec:
scaleTargetRef:
apiVersion: apps/v1 # Optional. Default: apps/v1
kind: Deployment # Optional. Default: Deployment
name: streams-pipeline-app # Mandatory. Must be in the same namespace as the ScaledObject
pollingInterval: 5 # Optional. Default: 30 seconds
cooldownPeriod: 10 # Optional. Default: 300 seconds
idleReplicaCount: 0 # Optional. Must be less than minReplicaCount
minReplicaCount: 1 # Optional. Default: 1
maxReplicaCount: 5 # Optional. Default: 100 (up to the number of partitions in Kafka Brokers)
triggers:
- type: kafka
metadata:
bootstrapServers: host.docker.internal:29092
consumerGroup: streams-pipeline-app
topic: pizzaOrders
lagThreshold: '10'
offsetResetPolicy: latest
allowIdleConsumers: "false"
version: 1.0.0
将创建的yaml文件部署如下。
kubectl apply -f scaled-object.yaml
如果无法成功连接到Apache Kafka作为Metrics监视的主体,请尝试删除一次ScaledObject资源,然后重新部署。
# リソースの削除
kubectl delete -f ./kubernetes/scaled-object.yaml
# リソースの再デプロイ
kubectl apply -f scaled-object.yaml
确认自动扩展的运作
预先检查 Kafka Streams 应用程序的 Pod 数量。
kubectl get pods -n pipeline
# 【出力結果の例】
# NAME READY STATUS RESTARTS AGE
# streams-pipeline-app-aaaaaaa-aaaaaaaa 1/1 Running 0 115s
将验证用的源数据传输到Apache Kafka。
if [ ! -d kafka-streams-pipeline/ ]; then
git clone https://github.com/ogi-iii/kafka-streams-pipeline.git
fi
cd kafka-streams-pipeline/
git pull
./demo/scripts/create-datagen.sh
cd ..
在源数据流入后,过一段时间再次显示Kafka Streams应用程序的Pod数量,可以确认Pod数量增加并进行了扩展。
kubectl get pods -n pipeline
# 【出力結果の例】
# NAME READY STATUS RESTARTS AGE
# streams-pipeline-app-ddddddd-dddddddd 1/1 Running 0 40s
# streams-pipeline-app-ccccccc-cccccccc 1/1 Running 0 40s
# streams-pipeline-app-bbbbbbb-bbbbbbbb 1/1 Running 0 40s
# streams-pipeline-app-aaaaaaa-aaaaaaaa 1/1 Running 0 6m23s
请参考以下链接
-
- Prometheusメトリクスに基づいて Kubernetes Pod をオートスケーリングさせてみた – QG Tech Blog
-
- ContainerSolutions/keda-kafka
-
- ogi-iii/keda-auto-scalable-pipeline
- ogi-iii/kafka-streams-pipeline