Apache Kafka的部署

好的,现在我们开始部署Kafka。
除了Kafka之外,还会有其他必要的东西,我们来逐个查看吧。

卡夫卡。

首先是关键的 Kafka 核心。
我们将使用我们公司公开的 utilitywarehouse/uw-kafka 镜像。
预先构建好的镜像可从 quay.io/utilitywarehouse/uw-kafka 获取。

apiVersion: v1
kind: Service
metadata:
  name: kafka
  namespace: qiita
spec:
  ports:
  - port: 9092
    protocol: TCP
    targetPort: 9092
  selector:
    app: kafka
---
apiVersion: policy/v1beta1
kind: PodDisruptionBudget
metadata:
  name: kafka
  namespace: qiita
spec:
  maxUnavailable: 1
  selector:
    matchLabels:
      app: kafka
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  labels:
    app: kafka
  name: kafka
  namespace: qiita
spec:
  podManagementPolicy: Parallel
  replicas: 3
  selector:
    matchLabels:
      app: kafka
  serviceName: broker
  template:
    metadata:
      labels:
        app: kafka
    spec:
      terminationGracePeriodSeconds: 300
      affinity:
        podAntiAffinity:
          preferredDuringSchedulingIgnoredDuringExecution:
          - podAffinityTerm:
              labelSelector:
                matchExpressions:
                - key: app
                  operator: In
                  values:
                  - kafka
              topologyKey: kubernetes.io/hostname
            weight: 100
      containers:
      - name: broker
        image: quay.io/utilitywarehouse/uw-kafka:v2.0.1
        imagePullPolicy: Always
        command:
        - sh
        - -ecx
        - export JMX_PORT=9090 && exec ./kafka-server-start.sh ../config/server.properties --override broker.id=$(hostname | awk -F'-' '{print $2}')
        env:
        - name: KAFKA_HEAP_OPTS
          value: -Xmx2G -Xms2G
        ports:
        - containerPort: 9092
          protocol: TCP
        volumeMounts:
        - mountPath: /opt/kafka/data
          name: datadir
        - mountPath: /opt/kafka/config
          name: kafka-configmap
        readinessProbe:
          failureThreshold: 10
          initialDelaySeconds: 60
          periodSeconds: 30
          successThreshold: 1
          timeoutSeconds: 15
          exec:
            command:
            - sh
            - -c
            - "/opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server=localhost:9092"
        livenessProbe:
          failureThreshold: 10
          initialDelaySeconds: 60
          periodSeconds: 30
          successThreshold: 1
          timeoutSeconds: 15
          exec:
            command:
            - sh
            - -c
            - "/opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server=localhost:9092"
      - name: jmx-exporter
        image: quay.io/utilitywarehouse/jmx_exporter:0.11.0
        imagePullPolicy: Always
        env:
        - name: PORT
          value: "8080"
        ports:
        - containerPort: 8080
          name: web
          protocol: TCP
        volumeMounts:
        - mountPath: /app/config
          name: jmx-exporter-configmap
      volumes:
      - configMap:
          defaultMode: 420
          name: kafka-configmap
        name: kafka-configmap
      - configMap:
          defaultMode: 420
          name: jmx-exporter-configmap
        name: jmx-exporter-configmap
  volumeClaimTemplates:
  - metadata:
      name: datadir
    spec:
      accessModes:
      - ReadWriteOnce
      resources:
        requests:
          storage: 500Gi
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: kafka-configmap
  namespace: qiita
data:
  server.properties: |-
     ...
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: jmx-exporter-configmap
  namespace: qiita
data:
  config.yml: |-
    ---
    hostPort: localhost:9090
    rules:
    - pattern: ".*"
---

在StatefulSet中启动了3个集群。
由于configmap/kafka-configmap的内容是Kafka的配置,所以已省略,请查阅GitHub仓库。
虽然我们还不确定这次圣诞节活动是否能够接触到Prometheus,但我们已将用于以Prometheus格式导出度量信息的jmx-exporter作为Sidecar加载了进来。

zetcd的配置文件。

在使用Kafka时,需要一个名为Zookeeper的应用程序来提供分布式系统的共同基础设施,例如领导者选举等。虽然可以部署Zookeeper,但是本次我们希望使用etcd-io/zetcd。

etcd可以提供与Zookeeper相同的功能,但接口不同。zetcd是一个为etcd提供Zookeeper接口的代理层。通过使用更易于部署的etcd来替代Zookeeper,能够方便地实现代替,非常便利。

另外,之前我误删了,但我在configmap/kafka-configmap中按以下方式指定了。

...
zookeeper.connect=zetcd:2181/kafka
...

这就是zetcd和kafka所用的etcd资源。

apiVersion: v1
kind: Service
metadata:
  labels:
    app: zetcd
  name: zetcd
  namespace: qiita
spec:
  ports:
  - name: client
    port: 2181
    protocol: TCP
    targetPort: 2181
  selector:
    app: zetcd
---
apiVersion: policy/v1beta1
kind: PodDisruptionBudget
metadata:
  name: zetcd
  namespace: qiita
spec:
  maxUnavailable: 1
  selector:
    matchLabels:
      app: zetcd
---
apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    app: zetcd
  name: zetcd
  namespace: qiita
spec:
  replicas: 2
  selector:
    matchLabels:
      app: zetcd
  template:
    metadata:
      labels:
        app: zetcd
      namespace: qiita
    spec:
      affinity:
        podAntiAffinity:
          preferredDuringSchedulingIgnoredDuringExecution:
          - podAffinityTerm:
              labelSelector:
                matchExpressions:
                - key: app
                  operator: In
                  values:
                  - zetcd
              topologyKey: kubernetes.io/hostname
            weight: 100
      containers:
      - name: zetcd
        image: quay.io/coreos/zetcd:v0.0.4
        imagePullPolicy: Always
        command:
        - zetcd
        - --zkaddr
        - 0.0.0.0:2181
        - --endpoints
        - kafka-etcd:2379
        ports:
        - containerPort: 2181
          protocol: TCP
        resources:
          requests:
            memory: 80Mi
          limits:
            memory: 200Mi
---
apiVersion: v1
kind: Service
metadata:
  annotations:
    prometheus.io/path: /metrics
    prometheus.io/port: "2379"
    prometheus.io/scrape: "true"
    service.alpha.kubernetes.io/tolerate-unready-endpoints: "true"
  labels:
    role: etcd
  name: kafka-etcd
  namespace: qiita
spec:
  clusterIP: None
  ports:
  - name: client
    port: 2379
    protocol: TCP
    targetPort: 2379
  - name: peer
    port: 2380
    protocol: TCP
    targetPort: 2380
  selector:
    app: kafka-etcd
---
apiVersion: policy/v1beta1
kind: PodDisruptionBudget
metadata:
  name: kafka-etcd
  namespace: qiita
spec:
  maxUnavailable: 1
  selector:
    matchLabels:
      app: kafka-etcd
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  labels:
    app: kafka-etcd
    role: etcd
  name: kafka-etcd
  namespace: qiita
spec:
  podManagementPolicy: Parallel
  replicas: 3
  selector:
    matchLabels:
      app: kafka-etcd
  serviceName: kafka-etcd
  template:
    metadata:
      labels:
        app: kafka-etcd
      name: kafka-etcd
    spec:
      containers:
      - name: etcd
        image: quay.io/coreos/etcd:v3.2.9
        imagePullPolicy: IfNotPresent
        command:
        - /bin/sh
        - -ecx
        - |
          PEERS=""
          for i in $(seq 0 $((${CLUSTER_SIZE} - 1))); do
              PEERS="${PEERS}${PEERS:+,}${SET_NAME}-${i}=http://${SET_NAME}-${i}.${SET_NAME}:2380"
          done
          # start etcd. If cluster is already initialized the `--initial-*` options will be ignored.
          exec etcd --name ${HOSTNAME} \
            --listen-peer-urls http://0.0.0.0:2380 \
            --listen-client-urls http://0.0.0.0:2379 \
            --advertise-client-urls http://${HOSTNAME}.${SET_NAME}:2379 \
            --initial-advertise-peer-urls http://${HOSTNAME}.${SET_NAME}:2380 \
            --initial-cluster-token qiita-${SET_NAME} \
            --initial-cluster ${PEERS} \
            --initial-cluster-state new \
            --data-dir /var/run/etcd/default.etcd
        env:
        - name: CLUSTER_SIZE
          value: "3"
        - name: SET_NAME
          value: kafka-etcd
        livenessProbe:
          failureThreshold: 3
          httpGet:
            path: /health
            port: 2379
            scheme: HTTP
          initialDelaySeconds: 15
          periodSeconds: 10
          successThreshold: 1
          timeoutSeconds: 10
        ports:
        - containerPort: 2379
          name: client
          protocol: TCP
        - containerPort: 2380
          name: peer
          protocol: TCP
        volumeMounts:
        - mountPath: /var/run/etcd
          name: kafka-etcd-data
  updateStrategy:
    type: RollingUpdate
  volumeClaimTemplates:
  - metadata:
      name: kafka-etcd-data
    spec:
      accessModes:
      - ReadWriteOnce
      resources:
        requests:
          storage: 10Gi
---

部署

让我们尝试应用这些清单文件。

$ kubectl -n qiita apply -f kubernetes/kafka-etcd.yaml
service/kafka-etcd created
poddisruptionbudget.policy/kafka-etcd created
statefulset.apps/kafka-etcd created

$ kubectl -n qiita apply -f kubernetes/zetcd.yaml
service/zetcd created
poddisruptionbudget.policy/zetcd created
deployment.apps/zetcd created

$ kubectl -n qiita apply -f kubernetes/kafka.yaml 
service/kafka created
poddisruptionbudget.policy/kafka created
statefulset.apps/kafka created
configmap/kafka-configmap created
configmap/jmx-exporter-configmap created

$ kubectl -n qiita get pods
NAME                                          READY   STATUS      RESTARTS   AGE
...
kafka-0                                       1/2     Running     1          3m37s
kafka-1                                       1/2     Running     1          3m37s
kafka-2                                       1/2     Running     1          3m37s
kafka-etcd-0                                  1/1     Running     0          5m50s
kafka-etcd-1                                  1/1     Running     0          5m49s
kafka-etcd-2                                  1/1     Running     1          5m49s
...
zetcd-5cb8c844d6-7xt9t                        1/1     Running     0          4m43s
zetcd-5cb8c844d6-qtwpx                        1/1     Running     0          4m43s

事情并不顺利…看起来本地机器的资源已经达到了极限。


今天是第几天?第14天吗?呼,笑。明天首先要完成部署呢。。