在Kubernetes上运行Digdag和Embulk的工作流平台

关于这篇文章

我将写一篇关于在Kubernetes上运行Digdag,并从该工作流中调用Embulk批处理的工作流基础设施的文章。
我将展示一个在Digdag工作流中从Big Query索引到Elasticsearch的Embulk示例。

整体架构

Kubernetes digdag embulk.png

Digdag将API请求处理分为主节点(master)和执行工作流的工作节点(worker),并在各自的Pod中启动。Digdag worker会动态地启动一个Kubernetes作业(Job),用于执行embulk进行实际的处理任务。此外,Digdag worker会根据排队的任务数自动缩放。

问题

要实现这种结构,有两个问题需要解决。

    1. 从digdag worker启动作业

 

    digdag worker的水平自动伸缩

从Digdag工作程序中启动作业

在执行digdag工作时,希望在容器中进行执行。因为Digdag已经实现了Docker的命令执行器,所以可以很容易地在Docker上执行程序。

 

然而,由于digdag worker本身在Pod容器上运行,因此会变成类似于Docker in Docker的状态。因此,我们需要从digdag worker中动态地启动作业并在其上运行程序,使其能够执行工作流程。

使用kube-job可以从命令行启动Kubernetes的Job。
然而,在kube-job v0.5.0以下的版本中,由于无法轻易地从Kubernetes内部调用kube-job命令,请允许我们应用补丁。

 

digdag工作者的配置如下所示。

digdag工作人员的镜像

请将 kube-job 的二进制文件放置在 multi-stage builds 中。
由于版本较旧,如果参考此处,请适时更新。

FROM golang:1.15-buster as builder

ENV CGO_ENABLED=0

RUN apt update && apt install -y --no-install-recommends wget unzip make

WORKDIR /app
RUN wget -q https://github.com/h3poteto/kube-job/archive/refs/tags/v0.5.1.zip \
    && unzip -q v0.5.1.zip
RUN make -C /app/kube-job-0.5.1 build

FROM azul/zulu-openjdk:8

RUN apt-get update -qq && apt-get install -y curl

RUN curl -o /usr/local/bin/digdag --create-dirs -L 'https://dl.digdag.io/digdag-0.10.0' \
    && chmod +x /usr/local/bin/digdag

COPY --from=builder /app/kube-job-0.5.1/bin/kube-job /usr/local/bin/

digdag worker的部署

有关Embulk的卷,将在后文中进行说明。

apiVersion: apps/v1
kind: Deployment
metadata:
  name: digdag-worker
  labels:
    app: digdag-worker
spec:
  selector:
    matchLabels:
      app: digdag-worker
  strategy:
    type: Recreate
  template:
    metadata:
      labels:
        app: digdag-worker
    spec:
      volumes:
      - name: digdag-config-volume
        configMap:
          name: digdag-config
      - name: embulk-config-volume
        configMap:
          name: embulk-config
      containers:
      - name: digdag-worker
        image: 'digdag_on_k8s_worker:latest'
        imagePullPolicy: IfNotPresent
        volumeMounts:
        - name: digdag-config-volume
          mountPath: /etc/config
        - name: embulk-config-volume
          mountPath: /embulk
        command: ['/bin/bash']
        args:
        - '-cx'
        - |
          digdag server \
          --config /etc/config/digdag.properties \
          -X server.bind=0.0.0.0 \
          -X server.port=8080 \
          -X database.type=postgresql \
          -X database.host=db \
          -X database.port=5432 \
          -X database.user=$POSTGRES_USER \
          -X database.database=digdag \
          -X database.password=$POSTGRES_PASSWORD \
          -X digdag.secret-encryption-key=$SECRET_ENCRYPTION_KEY
        env:
        - name: POSTGRES_USER
          valueFrom:
            secretKeyRef:
              name: digdag-secret
              key: db_user
        - name: POSTGRES_PASSWORD
          valueFrom:
            secretKeyRef:
              name: digdag-secret
              key: db_password
        - name: SECRET_ENCRYPTION_KEY
          valueFrom:
            secretKeyRef:
              name: digdag-secret
              key: secret_encryption_key

执行embulk的digdag工作流

在上述的Pod上运行的工作流程。只执行kube-job。

timezone: UTC

+embulk:
  sh>: kube-job run --template-file=/embulk/embulk.yml --container='embulk'

使用kube-job的Job清单。

这是一个用于生成Job清单的ConfigMap,可以使用kube-job中的–template-file参数指定。
在digdag worker的Deployment中,将此ConfigMap挂载到。

apiVersion: v1
kind: ConfigMap
metadata:
  name: embulk-config
  namespace: default
data:
  embulk.yml: |+
    apiVersion: batch/v1
    kind: Job
    metadata:
      name: embulk
      namespace: default
      labels:
        app: embulk
    spec:
      template:
        metadata:
          labels:
            app: embulk
        spec:
          containers:
          - name: embulk
            image: 'digdag_on_k8s_embulk:latest'
            imagePullPolicy: Never
            args: ['embulk', '--version']
          restartPolicy: Never
      backoffLimit: 2

为了简化此清单,只需运行 embulk –version。
通过以上步骤,可以从digdag worker启动作业。

kube-job的好处是可以通过–args从外部覆盖Job的args参数,从而可以自由地在工作流中执行命令,而无需修改清单。

timezone: UTC

+embulk:
  sh>: kube-job run --template-file=/embulk/embulk.yml --container='embulk' --args='embulk --version'

2. Digdag worker的HPA

希望的情况是根据digdag任务队列的积压情况,自动扩展digdag worker的Pod。因此,通过Kubernetes的自定义指标获取队列信息并进行自动扩展。

要将Digdag的任务队列作为自定义指标进行处理,需要进行以下两个设置。

    • PostgreSQLで管理されるdigdagでキューイングされたタスク数をpostgres_exporterでPrometheusのメトリクスとしてexportする

 

    Prometheusのメトリクスとして取り出したものをprometheus-adapterでCustom metricsとして扱う

Postgres导出器

首先,使用Helm安装Prometheus并启动postgres_exporter。只安装了必要的运行环境。

repositories:
- name: prometheus-community
  url: https://prometheus-community.github.io/helm-charts
releases:
  - name: prometheus-operator
    chart: prometheus-community/kube-prometheus-stack
    values:
      - helm-values/prometheus-operator.yml
alertmanager:
  enabled: false
grafana:
  enabled: false
kubeApiServer:
  enabled: false
kubelet:
  enabled: false
kubeControllerManager:
  enabled: false
coreDns:
  enabled: false
kubeDns:
  enabled: false
kubeEtcd:
  enabled: false
kubeScheduler:
  enabled: false
kubeProxy:
  enabled: false
kubeStateMetrics:
  enabled: false
nodeExporter:
  enabled: false
prometheus-node-exporter:
  hostRootFsMount: false
prometheus:
  prometheusSpec:
    serviceMonitorSelectorNilUsesHelmValues: false
    serviceMonitorSelector: {}

接下来,我们将启动postgres_exporter的服务。
这次我们不需要default和settings的指标,所以我们会在启动选项中禁用它们。

apiVersion: apps/v1
kind: Deployment
metadata:
  name: postgres-exporter
spec:
  selector:
    matchLabels:
      app: postgres-exporter
  replicas: 1
  template:
    metadata:
      labels:
        app: postgres-exporter
      name: postgres-exporter
    spec:
      volumes:
      - name: config
        configMap:
          name: postgres-exporter-config-map
      containers:
      - name: postgres-exporter
        image: quay.io/prometheuscommunity/postgres-exporter
        env:
        - name: DATA_SOURCE_URI
          valueFrom:
            secretKeyRef:
              name: digdag-secret
              key: data_source_uri
        - name: DATA_SOURCE_USER
          valueFrom:
            secretKeyRef:
              name: digdag-secret
              key: db_user
        - name: DATA_SOURCE_PASS
          valueFrom:
            secretKeyRef:
              name: digdag-secret
              key: db_password
        args:
        - '--disable-default-metrics'
        - '--disable-settings-metrics'
        - '--extend.query-path=/config/queries.yaml'
        ports:
        - containerPort: 9187
          name: exporter
        volumeMounts:
        - name: config
          readOnly: true
          mountPath: /config
apiVersion: v1
kind: Service
metadata:
  name: postgres-exporter
  labels:
    app: postgres-exporter
spec:
  selector:
    app: postgres-exporter
  ports:
  - port: 9187
    name: exporter

 

请根据此区域的内容将queued_task_locks记录数视为排队的任务数。请查看Prometheus的度量类型来了解用法。

apiVersion: v1
kind: ConfigMap
metadata:
  name: postgres-exporter-config-map
data:
  queries.yaml: |+
    pg_exporter:
      query: "SELECT count(*) AS queued_tasks FROM queued_task_locks"
      metrics:
        - queued_tasks:
            usage: "GAUGE"
            description: "Number of tasks in queue"

普罗米修斯适配器 (Pǔ luó mǐ xiū sū shì qì)

同样,在Helm中引入prometheus-adapter。

repositories:
- name: prometheus-community
  url: https://prometheus-community.github.io/helm-charts
releases:
  - name: prometheus-operator
    chart: prometheus-community/kube-prometheus-stack
    values:
      - helm-values/prometheus-operator.yml
  - name: prometheus-adapter
    chart: prometheus-community/prometheus-adapter
    values:
      - helm-values/prometheus-adapter.yml

请参考此文档以了解有关 Prometheus 适配器的配置。

rules:
  default: false
  custom:
  - seriesQuery: 'pg_exporter_queued_tasks{namespace!="",service!=""}'
    resources:
      overrides:
        namespace:
          resource: namespace
        service:
          resource: service
    name:
      as: queued_tasks
    metricsQuery: '<<.Series>>{<<.LabelMatchers>>}'
prometheus:
  url: http://prometheus-operator-kube-p-prometheus.default
logLevel: 2

完成设置后,您可以使用以下命令来查看指标。

kubectl get --raw /apis/custom.metrics.k8s.io/v1beta1/namespaces/default/services/postgres-exporter/queued_tasks | jq .

只需将这些排队任务数视为自定义指标,然后设置HPA。

惠普

apiVersion: autoscaling/v2beta2
kind: HorizontalPodAutoscaler
metadata:
  name: digdag-worker-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: digdag-worker
  minReplicas: 1
  maxReplicas: 2
  metrics:
  - type: Object
    object:
      describedObject:
        apiVersion: v1
        kind: Service
        name: postgres-exporter
      metric:
        name: queued_tasks
      target:
        type: Value
        value: 1

重要的是目标的值为1这一点。
这样做是为了只在排队任务数为0时才进行扩展。
当Digdag运行kill时,如果有正在执行的任务,
它将等待该任务完成,但对于整个工作流来说,任务以后的所有任务都将被取消。

 

因此,为了确保在workflow运行时不会发生缩容和中断,如果workflow的最大时间相对较短,则可以通过terminationGracePeriodSeconds在终止前等待一段时间来增加目标值的大小,使其大于1。

bannerAds