使用事件驱动的方式对Kafka Streams进行自动扩展【KEDA入门】

“KEDA是什么意思?”

Kubernetes集群上运行的开源事件驱动自动缩放器(基于Kubernetes的事件驱动自动缩放器 = KEDA),通过阈值监视外部系统中的特定指标,并自动调整要扩展的应用程序(例如Kubernetes Deployment)的Pod数量。

在KEDA本身中进行度量监视,然而,应用程序Pod数量的自动调整是由Kubernetes的标准资源——水平Pod自动缩放器(以下简称HPA)来实现的。

此外,监视对象的连接信息、指标阈值、Pod数量调整范围等设置项可以通过称为ScaledObject的KEDA特有资源进行定义。

各种资源和系统之间的关系可以总结如下。

应用程序的缩放目标

本次我们将使用KEDA对预先创建的KafkaStreams应用程序进行自动扩缩容。

应用程序的处理内容将是以下的ETL处理。

    1. 从Apache Kafka中获取要处理的数据。(提取: 抽出)

 

    1. 仅选择数据中需要的信息,并将其转换为可插入到数据库中的格式。(转换: 変換)

 

    将经过转换的数据插入到Mongo DB中。(加载: 格納)

请参考源代码以获取有关应用程序的详细信息。

通过KEDA实现自动扩缩容

那么,实际上我们来运行KEDA并观察应用程序自动缩放的情况。

※可真正运行的源代码全部存储在这个代码库中。

动作环境和前提条件 zuò hé tí

or

行动环境与前提条件 yǔ tí

为了使用Shell脚本,请假设目标运行环境是Mac或Linux(以及WSL)。

另外,还需要以下工具。

    • Git

 

    • Docker

 

    • Kubernetes (クラスタ1つとkubectlコマンド)

 

    Helm (バージョン3)

1. 相关系统的预先设置

事先设置好在Kafka Streams应用程序中使用的Apache Kafka和Mongo DB。

if [ ! -d kafka-streams-pipeline/ ]; then
    git clone https://github.com/ogi-iii/kafka-streams-pipeline.git
fi
cd kafka-streams-pipeline/
git pull

docker compose -f ./demo/docker-compose.yml up -d
./demo/scripts/create-topic.sh

cd ..

2. 预先部署可扩展的应用程序

我会提前部署KEDA将扩展的Kafka Streams应用程序。

if [ ! -d keda-auto-scalable-pipeline/ ]; then
    git clone https://github.com/ogi-iii/keda-auto-scalable-pipeline.git
fi
cd keda-auto-scalable-pipeline/
git pull

docker image build -t streams-pipeline-app .
kubectl create namespace pipeline
kubectl apply -f kubernetes/pipeline-deployment.yaml

cd ..

3. KEDA设置

使用Helm来安装KEDA。

helm repo add kedacore https://kedacore.github.io/charts
helm repo update

创建一个专门为KEDA使用的namespace,并将KEDA安装到Kubernetes集群上。

kubectl create namespace keda
helm install keda kedacore/keda -n keda

4. 部署 ScaledObject

KEDA以如下方式将其独有的ScaledObject资源定义为yaml文件。

apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
  name: streams-pipeline-app-scaler
  namespace: pipeline
spec:
  scaleTargetRef:
    apiVersion: apps/v1                # Optional. Default: apps/v1
    kind:       Deployment             # Optional. Default: Deployment
    name:       streams-pipeline-app   # Mandatory. Must be in the same namespace as the ScaledObject
  pollingInterval:  5                  # Optional. Default: 30 seconds
  cooldownPeriod:   10                 # Optional. Default: 300 seconds
  idleReplicaCount: 0                  # Optional. Must be less than minReplicaCount
  minReplicaCount:  1                  # Optional. Default: 1
  maxReplicaCount:  5                  # Optional. Default: 100 (up to the number of partitions in Kafka Brokers)
  triggers:
  - type: kafka
    metadata:
      bootstrapServers: host.docker.internal:29092
      consumerGroup: streams-pipeline-app
      topic: pizzaOrders
      lagThreshold: '10'
      offsetResetPolicy: latest
      allowIdleConsumers: "false"
      version: 1.0.0

将创建的yaml文件部署如下。

kubectl apply -f scaled-object.yaml

如果无法成功连接到Apache Kafka作为Metrics监视的主体,请尝试删除一次ScaledObject资源,然后重新部署。

# リソースの削除
kubectl delete -f ./kubernetes/scaled-object.yaml

# リソースの再デプロイ
kubectl apply -f scaled-object.yaml

确认自动扩展的运作

预先检查 Kafka Streams 应用程序的 Pod 数量。

kubectl get pods -n pipeline
# 【出力結果の例】
# NAME                                    READY   STATUS    RESTARTS   AGE
# streams-pipeline-app-aaaaaaa-aaaaaaaa   1/1     Running   0          115s

将验证用的源数据传输到Apache Kafka。

if [ ! -d kafka-streams-pipeline/ ]; then
    git clone https://github.com/ogi-iii/kafka-streams-pipeline.git
fi
cd kafka-streams-pipeline/
git pull

./demo/scripts/create-datagen.sh

cd ..

在源数据流入后,过一段时间再次显示Kafka Streams应用程序的Pod数量,可以确认Pod数量增加并进行了扩展。

kubectl get pods -n pipeline
# 【出力結果の例】
# NAME                                    READY   STATUS    RESTARTS   AGE
# streams-pipeline-app-ddddddd-dddddddd   1/1     Running   0          40s
# streams-pipeline-app-ccccccc-cccccccc   1/1     Running   0          40s
# streams-pipeline-app-bbbbbbb-bbbbbbbb   1/1     Running   0          40s
# streams-pipeline-app-aaaaaaa-aaaaaaaa   1/1     Running   0          6m23s

请参考以下链接

    • Prometheusメトリクスに基づいて Kubernetes Pod をオートスケーリングさせてみた – QG Tech Blog

 

    • ContainerSolutions/keda-kafka

 

    • ogi-iii/keda-auto-scalable-pipeline

 

    ogi-iii/kafka-streams-pipeline
bannerAds