在Kubernetes上运行Digdag和Embulk的工作流平台
关于这篇文章
我将写一篇关于在Kubernetes上运行Digdag,并从该工作流中调用Embulk批处理的工作流基础设施的文章。
我将展示一个在Digdag工作流中从Big Query索引到Elasticsearch的Embulk示例。
整体架构

Digdag将API请求处理分为主节点(master)和执行工作流的工作节点(worker),并在各自的Pod中启动。Digdag worker会动态地启动一个Kubernetes作业(Job),用于执行embulk进行实际的处理任务。此外,Digdag worker会根据排队的任务数自动缩放。
问题
要实现这种结构,有两个问题需要解决。
-
- 从digdag worker启动作业
- digdag worker的水平自动伸缩
从Digdag工作程序中启动作业
在执行digdag工作时,希望在容器中进行执行。因为Digdag已经实现了Docker的命令执行器,所以可以很容易地在Docker上执行程序。
然而,由于digdag worker本身在Pod容器上运行,因此会变成类似于Docker in Docker的状态。因此,我们需要从digdag worker中动态地启动作业并在其上运行程序,使其能够执行工作流程。
使用kube-job可以从命令行启动Kubernetes的Job。
然而,在kube-job v0.5.0以下的版本中,由于无法轻易地从Kubernetes内部调用kube-job命令,请允许我们应用补丁。
digdag工作者的配置如下所示。
digdag工作人员的镜像
请将 kube-job 的二进制文件放置在 multi-stage builds 中。
由于版本较旧,如果参考此处,请适时更新。
FROM golang:1.15-buster as builder
ENV CGO_ENABLED=0
RUN apt update && apt install -y --no-install-recommends wget unzip make
WORKDIR /app
RUN wget -q https://github.com/h3poteto/kube-job/archive/refs/tags/v0.5.1.zip \
&& unzip -q v0.5.1.zip
RUN make -C /app/kube-job-0.5.1 build
FROM azul/zulu-openjdk:8
RUN apt-get update -qq && apt-get install -y curl
RUN curl -o /usr/local/bin/digdag --create-dirs -L 'https://dl.digdag.io/digdag-0.10.0' \
&& chmod +x /usr/local/bin/digdag
COPY --from=builder /app/kube-job-0.5.1/bin/kube-job /usr/local/bin/
digdag worker的部署
有关Embulk的卷,将在后文中进行说明。
apiVersion: apps/v1
kind: Deployment
metadata:
name: digdag-worker
labels:
app: digdag-worker
spec:
selector:
matchLabels:
app: digdag-worker
strategy:
type: Recreate
template:
metadata:
labels:
app: digdag-worker
spec:
volumes:
- name: digdag-config-volume
configMap:
name: digdag-config
- name: embulk-config-volume
configMap:
name: embulk-config
containers:
- name: digdag-worker
image: 'digdag_on_k8s_worker:latest'
imagePullPolicy: IfNotPresent
volumeMounts:
- name: digdag-config-volume
mountPath: /etc/config
- name: embulk-config-volume
mountPath: /embulk
command: ['/bin/bash']
args:
- '-cx'
- |
digdag server \
--config /etc/config/digdag.properties \
-X server.bind=0.0.0.0 \
-X server.port=8080 \
-X database.type=postgresql \
-X database.host=db \
-X database.port=5432 \
-X database.user=$POSTGRES_USER \
-X database.database=digdag \
-X database.password=$POSTGRES_PASSWORD \
-X digdag.secret-encryption-key=$SECRET_ENCRYPTION_KEY
env:
- name: POSTGRES_USER
valueFrom:
secretKeyRef:
name: digdag-secret
key: db_user
- name: POSTGRES_PASSWORD
valueFrom:
secretKeyRef:
name: digdag-secret
key: db_password
- name: SECRET_ENCRYPTION_KEY
valueFrom:
secretKeyRef:
name: digdag-secret
key: secret_encryption_key
执行embulk的digdag工作流
在上述的Pod上运行的工作流程。只执行kube-job。
timezone: UTC
+embulk:
sh>: kube-job run --template-file=/embulk/embulk.yml --container='embulk'
使用kube-job的Job清单。
这是一个用于生成Job清单的ConfigMap,可以使用kube-job中的–template-file参数指定。
在digdag worker的Deployment中,将此ConfigMap挂载到。
apiVersion: v1
kind: ConfigMap
metadata:
name: embulk-config
namespace: default
data:
embulk.yml: |+
apiVersion: batch/v1
kind: Job
metadata:
name: embulk
namespace: default
labels:
app: embulk
spec:
template:
metadata:
labels:
app: embulk
spec:
containers:
- name: embulk
image: 'digdag_on_k8s_embulk:latest'
imagePullPolicy: Never
args: ['embulk', '--version']
restartPolicy: Never
backoffLimit: 2
为了简化此清单,只需运行 embulk –version。
通过以上步骤,可以从digdag worker启动作业。
kube-job的好处是可以通过–args从外部覆盖Job的args参数,从而可以自由地在工作流中执行命令,而无需修改清单。
timezone: UTC
+embulk:
sh>: kube-job run --template-file=/embulk/embulk.yml --container='embulk' --args='embulk --version'
2. Digdag worker的HPA
希望的情况是根据digdag任务队列的积压情况,自动扩展digdag worker的Pod。因此,通过Kubernetes的自定义指标获取队列信息并进行自动扩展。
要将Digdag的任务队列作为自定义指标进行处理,需要进行以下两个设置。
-
- PostgreSQLで管理されるdigdagでキューイングされたタスク数をpostgres_exporterでPrometheusのメトリクスとしてexportする
- Prometheusのメトリクスとして取り出したものをprometheus-adapterでCustom metricsとして扱う
Postgres导出器
首先,使用Helm安装Prometheus并启动postgres_exporter。只安装了必要的运行环境。
repositories:
- name: prometheus-community
url: https://prometheus-community.github.io/helm-charts
releases:
- name: prometheus-operator
chart: prometheus-community/kube-prometheus-stack
values:
- helm-values/prometheus-operator.yml
alertmanager:
enabled: false
grafana:
enabled: false
kubeApiServer:
enabled: false
kubelet:
enabled: false
kubeControllerManager:
enabled: false
coreDns:
enabled: false
kubeDns:
enabled: false
kubeEtcd:
enabled: false
kubeScheduler:
enabled: false
kubeProxy:
enabled: false
kubeStateMetrics:
enabled: false
nodeExporter:
enabled: false
prometheus-node-exporter:
hostRootFsMount: false
prometheus:
prometheusSpec:
serviceMonitorSelectorNilUsesHelmValues: false
serviceMonitorSelector: {}
接下来,我们将启动postgres_exporter的服务。
这次我们不需要default和settings的指标,所以我们会在启动选项中禁用它们。
apiVersion: apps/v1
kind: Deployment
metadata:
name: postgres-exporter
spec:
selector:
matchLabels:
app: postgres-exporter
replicas: 1
template:
metadata:
labels:
app: postgres-exporter
name: postgres-exporter
spec:
volumes:
- name: config
configMap:
name: postgres-exporter-config-map
containers:
- name: postgres-exporter
image: quay.io/prometheuscommunity/postgres-exporter
env:
- name: DATA_SOURCE_URI
valueFrom:
secretKeyRef:
name: digdag-secret
key: data_source_uri
- name: DATA_SOURCE_USER
valueFrom:
secretKeyRef:
name: digdag-secret
key: db_user
- name: DATA_SOURCE_PASS
valueFrom:
secretKeyRef:
name: digdag-secret
key: db_password
args:
- '--disable-default-metrics'
- '--disable-settings-metrics'
- '--extend.query-path=/config/queries.yaml'
ports:
- containerPort: 9187
name: exporter
volumeMounts:
- name: config
readOnly: true
mountPath: /config
apiVersion: v1
kind: Service
metadata:
name: postgres-exporter
labels:
app: postgres-exporter
spec:
selector:
app: postgres-exporter
ports:
- port: 9187
name: exporter
请根据此区域的内容将queued_task_locks记录数视为排队的任务数。请查看Prometheus的度量类型来了解用法。
apiVersion: v1
kind: ConfigMap
metadata:
name: postgres-exporter-config-map
data:
queries.yaml: |+
pg_exporter:
query: "SELECT count(*) AS queued_tasks FROM queued_task_locks"
metrics:
- queued_tasks:
usage: "GAUGE"
description: "Number of tasks in queue"
普罗米修斯适配器 (Pǔ luó mǐ xiū sū shì qì)
同样,在Helm中引入prometheus-adapter。
repositories:
- name: prometheus-community
url: https://prometheus-community.github.io/helm-charts
releases:
- name: prometheus-operator
chart: prometheus-community/kube-prometheus-stack
values:
- helm-values/prometheus-operator.yml
- name: prometheus-adapter
chart: prometheus-community/prometheus-adapter
values:
- helm-values/prometheus-adapter.yml
请参考此文档以了解有关 Prometheus 适配器的配置。
rules:
default: false
custom:
- seriesQuery: 'pg_exporter_queued_tasks{namespace!="",service!=""}'
resources:
overrides:
namespace:
resource: namespace
service:
resource: service
name:
as: queued_tasks
metricsQuery: '<<.Series>>{<<.LabelMatchers>>}'
prometheus:
url: http://prometheus-operator-kube-p-prometheus.default
logLevel: 2
完成设置后,您可以使用以下命令来查看指标。
kubectl get --raw /apis/custom.metrics.k8s.io/v1beta1/namespaces/default/services/postgres-exporter/queued_tasks | jq .
只需将这些排队任务数视为自定义指标,然后设置HPA。
惠普
apiVersion: autoscaling/v2beta2
kind: HorizontalPodAutoscaler
metadata:
name: digdag-worker-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: digdag-worker
minReplicas: 1
maxReplicas: 2
metrics:
- type: Object
object:
describedObject:
apiVersion: v1
kind: Service
name: postgres-exporter
metric:
name: queued_tasks
target:
type: Value
value: 1
重要的是目标的值为1这一点。
这样做是为了只在排队任务数为0时才进行扩展。
当Digdag运行kill时,如果有正在执行的任务,
它将等待该任务完成,但对于整个工作流来说,任务以后的所有任务都将被取消。
因此,为了确保在workflow运行时不会发生缩容和中断,如果workflow的最大时间相对较短,则可以通过terminationGracePeriodSeconds在终止前等待一段时间来增加目标值的大小,使其大于1。