在Kubernetes中使用Apache Flink的方法

阿里巴巴高级开发工程师唐云(查干)撰写,Flink社区志愿者张庄庄编著。

本文是由阿里巴巴资深研发工程师唐云(查干)根据Apache Flink系列广播的基础进行编辑而成。本文的主要话题如下:

    • コンテナ管理システムの進化

 

    • Flink on Kubernetesの紹介

 

    • Flink on Kubernetesの実践

 

    hostPathの使用方法のデモ

本文的前半部分介绍了容器管理系统的发展。第二部分介绍了Flink on Kubernetes,包括部署模式和集群调度原理等。第三部分介绍了我们在过去一年中在Kubernetes上使用Flink时遇到的问题和所学到的经验等实践经验。最后一部分将演示集群部署和任务投入。

集装箱管理系统的发展

image.png

首先,让我们从非内核的Kubernetes开发人员的角度探讨Kubernetes和YARN之间的关系。正如您所知,Apache Hadoop YARN可能是中国使用最广泛的调度系统。主要原因是Hadoop HDFS在中国或者整个大数据行业中被广泛使用的存储系统。因此,Apache Hadoop YARN成为了自然而然地被广泛使用的调度系统,包括早期的Hadoop MapReduce。通过YARN 2.0之后的Framework的开放,现在可以在YARN上调度Spark on YARN和Flink on YARN等项目。

当然,YARN本身也有一定的限制。

    • 例えば、YARNはJavaをベースに開発されているため、ほとんどのリソースの分離には制限があります。

 

    YARN 3.0では、GPUのスケジューリングと管理をある程度サポートしています。しかし、それ以前のバージョンのYARNは、GPUをあまりサポートしていません。

除了Apache软件基金会以外,Cloud Native Computing Foundation(CNCF)也在开发以原生云计算为基础的Kubernetes调度技术。

作为开发者,我认为Kubernetes更接近于拥有更多功能的操作系统。YARN主要用于资源调度,它在整个操作系统中相对较小。它也是大数据生态系统的先驱。接下来,我将聚焦于Kubernetes,并解释从YARN容器到Kubernetes容器(或Pod)的演化过程中获取的经验和教训。

在Kubernetes上介绍Flink。

集群部署

image.png

前述的图表展示了在Kubernetes上的Flink独立会话集群中的调度流程。蓝色虚线框表示在Kubernetes集群中运行的组件,灰色框表示Kubernetes原生提供的命令和组件,如kubectl和Kubernetes Master。左侧列出了Flink官方文档提供的五个yaml文件。使用这些文件,可以在Kubernetes上部署最简单的Flink独立会话集群。

使用以下kubectl命令来启动集群。

kubectl create -f flink-configuration-configmap.yaml
kubectl create -f jobmanager-service.yaml
kubectl create -f jobmanager-deployment.yaml
kubectl create -f taskmanager-deployment.yaml
    • 最初のコマンドは、Flink ConfigMapを作成するためのKubernetes Masterに適用されます。ConfigMapはflink-conf.yamlやlog4j.propertiesなど、Flinkクラスターを実行するために必要な設定を提供します。

 

    • 2番目のコマンドでは、TaskManagerをJobManagerに接続するためのFlink JobManagerサービスを作成します。

 

    • 3つ目のコマンドは、JobMasterを起動するためのFlink JobManager Deploymentを作成します。このデプロイメントには、DispatcherとResource managerが含まれます。

 

    最後のコマンドでは、TaskManagerを起動するためにFlink TaskManager Deploymentを作成します。公式のFlink taskmanager-deployment.yamlインスタンスには2つのレプリカが指定されているため、図には2つのTaskManagerノードが表示されています。

此外, 您还可以创建JobManager REST服务,并通过该服务提交作业。

上述的图表展示了Flink独立会话集群的概念。

职务的投入

下图展示了使用Flink客户端将作业提交到独立会话集群的过程。

image.png

在Flink客户端中,执行以下命令来提交作业。

./bin/flink run -m : ./examples/streaming/WordCount.jar

-m参数需要的参数public-node-IP和node-port是通过jobmanager-service.yaml公开的REST服务的IP地址和端口。执行此命令可以将Streaming WordCount作业提交到集群。该过程不依赖于运行Flink独立会话集群的环境。无论集群在Kubernetes上运行还是运行在物理机上,提交作业的过程都是相同的。

在Kubernetes上使用独立会话的优点和缺点如下:

    • メリット:クラスターを起動する前にいくつかのyamlファイルを定義するだけでよく、Flinkのソースコードを変更する必要がないこと。クラスタ間の通信はKubernetes Masterに依存しません。

 

    デメリット:リソースを事前に要求する必要があり、動的に調整することができません。しかし、Flink on YARNでは、ジョブの投入時に、クラスターが必要とするJMやTMのリソースを宣言することができます。

在Flink 1.10的开发过程中,阿里巴巴的工程师贡献了对Flink on Kubernetes的本地计算模式进行调度的功能。

image.png

最显著的区别是通过Flink客户端提交作业时,整个集群的JobMaster会通过Kubernetes ResourceManager动态申请资源并创建运行TaskManager的Pod。然后,TaskManager和JobMaster之间进行通信。有关原生Kubernetes的详细信息,请参阅Wang Yang分享的在Kubernetes上原生运行Flink的文章。

总的来说,这句话的意思是希望像使用YARN一样使用Kubernetes,并尽可能地将相关配置项与YARN保持一致。为了简化解释,我们将使用Standalone Session集群进行说明。在下面的部分中,会解释一些功能,其中有些功能在Flink 1.10中没有实现,但在Flink 1.11中将会实现。

Kubernetes上的Flink实践

日志的收集

在使用Kubernetes上运行Flink作业时,避免功能性问题的日志是不可避免的。当将该作业运行在YARN上时,YARN会处理这一切。例如,当容器执行完成后,YARN会收集日志并将其上传到HDFS中,以后可以进行查看。然而,在Kubernetes上,日志的收集和保存没有自动处理。有很多方法可以收集和展示日志。由于日志会在作业异常导致Pod终止时丢失,因此故障排除变得非常困难。

如果在YARN上执行此作业,您可以使用”yarn logs -applicationId”命令来查看相关日志。但是,如果在Kubernetes上执行此作业呢?

目前,使用fluentd来收集日志是很常见的做法,并且在一些用户的生产环境中也被使用。

image.png

Fluentd也是CNCF的一个项目。通过设置一些规则,如正则表达式匹配,可以定期将.log、.out、*.gc等日志上传到分布式存储文件系统(如HDFS),以进行日志收集。也就是说,除了TM和JM之外,还需要在Pod中启动一个单独的容器(sidecar)来运行fluentd进程。

还有其他的方法。比如,您可以使用logback-elasticsearch-appender,将日志发送到Elasticsearch而无需添加容器。实现的原则是通过Elasticsearch REST API支持的socket流,直接将日志写入Elasticsearch。

与Fluentd相比,不需要添加其他容器来收集日志。然而,除了log4j日志外,无法收集System.out和System.err等其他日志。特别是在作业内发生核心转储或崩溃转储时,相关日志将直接写入System.out或System.err。从这个角度来看,使用logback-elasticsearch-appender将日志写入Elasticsearch并不是完美的解决方案。相反,Fluentd可以通过设置各种策略来收集所需的日志。

度量衡

日志在特定问题出现时,有助于观察作业的执行状态。可以回溯发生的场景,进行故障排查和分析。度量和监控是常见但重要的问题。在该行业中,有很多监控系统解决方案,例如广泛使用的阿里巴巴的Druid,开源的InfluxDB,商业集群版的InfluxDB,CNCF的Prometheus,以及Uber的开源M3等。

接下来我们以 Prometheus 为例来说明。Prometheus 和 Kubernetes 都是 CNCF 项目,它们在指标收集领域有各自独特的优点。在某种程度上,Prometheus 是 Kubernetes 中标准的监控和收集系统。Prometheus 不仅可以监控警报,还可以定期进行基于设定规则的多精度管理。

image.png

但是,实际上,我们可以发现 Prometheus 并不是被设计成可以在水平方向上很好地扩展的。正如上面的图所示,Prometheus 的联邦分布式体系结构实际上是一个多层结构。上层节点负责路由和转发,查询下层节点的结果。显然,无论我们部署多少层,上层节点都容易成为性能瓶颈,难以部署整个集群。如果用户规模不大,一个 Prometheus 节点就可以满足监控需求。然而,当用户规模变大时,例如在拥有几百个节点的 Flink 集群中,一个 Prometheus 节点将成为性能的瓶颈,无法满足监控需求。

怎样才能解决这个问题呢?

image.png

我们正在实现对不同Flink作业的指标的一致哈希。当然,并不是将一个作业的指标发送到一个Prometheus实例中。相反,我们将作业不同范围的指标发送到多个Prometheus实例中。Flink的指标强度有大到小不等。

    • JobManager/TaskManager メトリクス

 

    • ジョブ・メトリクス(チェックポイント数、サイズ、失敗回数)

 

    • タスク・メトリクス

 

    オペレーター・メトリクス(1秒間に処理されるレコード数、受信バイト数)

现在,根据范围实施一致的哈希来生成指标,并将哈希结果发送到不同的Prometheus实例,最后与Thanos进行协同。Thanos是在《复仇者联盟3》中出现的反派角色的名字。Thanos是一个扩展组件,用于支持Prometheus指标的分布式查询。因此,在Prometheus的架构中,可以将Thanos的伴侣部件放置在一个包含单个Prometheus实例的容器中。

在整个架构中存在一些限制,并且需要创建一个一致的哈希。如果将Thanos与Prometheus一起部署,那么由于某种原因,某些指标数据将同时存在于Prometheus A和Prometheus B中。因此,Thanos的查询必须遵循一定的规则来丢弃数据,即删除其中一个将导致另一个数据被优先选择的规则。结果是,UI上的指标图表线条会变得断断续续,带来不友好的体验。因此,需要实现Thanos的一致哈希和分布式查询。

然而,在整个解决方案的应用过程中,可能会出现一些性能问题。虽然Prometheus在许多服务级别的指标中表现良好,但为什么在Flink或作业级别上表现不佳呢?这是因为作业指标的剧烈变化。与HDFS和Hbase的监控相比,这些组件的指标数量有限且维度较低。为了解释维度的概念,让我们使用一个查询场景来说明。例如,您需要查询作业内所有taskmanager_job_task_buffers_outPoolUsage关于主机的任务。换句话说,您需要使用标签来过滤查询。结果发现,Flink的taskAttemptId是一个不友好的标签。它是一个UUID,在每次作业失败时都会更改。

如果作业持续失败并且需要将新标签持久化到Prometheus,并且连接到Prometheus的数据库需要创建标签索引,则需要创建大量索引。例如,在InfluxDB上承受高负载时,可能会导致内存和CPU无法使用。这是意外的情况。因此,需要向社区求助,要求他们过滤报告中的高维标签。如果您有兴趣,请关注FLINK-15110。

表演

网络性能

首先,我会介绍一下网络性能。即使使用容器网络接口(Container Network Interface,CNI)或Kubernetes的网络插件,也无法避免网络性能下降。在一般的Flannel网络中,一些测试项目中出现了约30%的性能损失。它并不太稳定。在作业中经常会出现“PartitionNotFoundException Partition xx@host not found”的错误,这意味着下游无法获取上游分区。

image.png

可以通过Flink层来提高网络的容错性。例如,可以将taskmanager.network.request-backoff.max从默认的10秒设置为300秒,并将Akka的超时值设定为更大的值。

还有一个需要动脑筋的问题。

image.png

当作业正在运行时,经常会收到对等方发出的连接重置的报告。这是由于Flink的设计对网络的稳定性要求很高。为了确保准确只发生一次,如果数据发送失败,整个任务将会失败并重新启动。因此,会频繁收到对等方发出的连接重置报告。

我们有几个解决方案可供选择。

    • 異種ネットワーク環境を避ける(クロスIDCアクセスをしない)。

 

    • クラウドサービス事業者のマシンにマルチキューNICを設定する(インスタンス内のネットワーク遮断を異なるCPUに分散して処理することでパフォーマンスを向上させる)。

 

    • Alibaba Cloud Terwayなどのクラウドサービスプロバイダーの高性能ネットワークプラグインを選択する。

 

    Kubernetesの仮想化ネットワークを避けてホストネットワークを選択する(一定の開発が必要)。

首先,需要确保集群在不同网络环境中运行时不会出现问题。如果将Kubernetes主机部署在不同的数据中心,访问IDC时可能会导致网络抖动问题。接下来,需要在来自云服务提供商的机器上设置多队列网络接口卡(NIC)。ECS虚拟机使用一定的CPU资源进行网络虚拟化。如果没有设置多队列NIC,则在虚拟化中可能只使用一个或两个核心,而不是使用两个核心。这种情况下会发生数据包丢失,并报告”Connection Reset by peer”错误。

另一个解决方案是选择由云服务提供商提供的高性能网络插件。例如,阿里云提供的Terway插件可以支持与主机网络相当的性能,不会出现类似Flannel的性能下降。

最后,如果无法使用Terway,可以使用主机网络来绕过Kubernetes的虚拟化网络。但是,这个解决方案需要比Flink更多的开发工作。在使用Kubernetes的情况下,使用主机网络在某种程度上感觉不自然。这与Kubernetes的风格不符合。此外,也有一些机器无法使用Terway,遇到了相同的问题。我们还准备了相应的项目,使用主机网络代替覆盖网络Flannel。

磁盘性能

接下来,我们将对硬盘性能进行说明。正如前面所述,所有虚拟化的环境都会导致某种程度的性能下降。如果 RocksDB 需要读写本地硬盘,超越叠加的文件系统会导致约30%的性能下降。

image.png

那么应该怎么办呢?

我们决定使用hostPath。简单来说,容器可以访问主机的物理磁盘。请参考前述图中右侧的hostPath定义。当然,Flink镜像的用户需要先确认他们具有访问主机目录的权限。因此,最好将目录权限更改为777或775。

如果想使用此功能,请参考提供Pod模板的Flink-15656。您可以自行进行调整。Kubernetes提供了各种复杂的功能,而我们已经意识到对每个功能都进行Flink调整是不可行的。如果在模板中定义了hostPath,编写的Pod将能够根据模板的hostPath来访问目录。

虚拟内存耗尽

OOM killed是一个令人疲惫的问题。在容器环境中部署服务时,需要预先设置Pod所需的内存和CPU资源。然后,Kubernetes将指定用于调度相关节点(主机)的配置。除了指定请求参数外,还需要设置限制参数以限制所需的内存和CPU资源。

例如,假设某个节点的物理内存为64GB,并且需要8个Pod,每个Pod具有8GB的内存。看起来没有问题。但是,如果没有限制8个Pod的情况下会怎样呢?由于每个Pod可能使用10GB的内存,它们将争夺资源。结果是,某个Pod将正常工作,而另一个Pod可能会突然被终止。因此,需要设置内存限制。由于内存限制的原因,Pod可能会意外终止。通过查看Kubernetes的事件,可以发现Pod由于OOM(内存不足)而被终止。如果有使用过Kubernetes的人,应该遇到过这个问题。

请问应该如何解决呢?

image.png

第一个解决方案是启用JVM的本地内存跟踪,并定期检查内存。使用这种方法可以仅检查JVM请求的本地内存(包括Metaspace),而无法检查JVM不需要的内存。另一个解决方案是使用Jemalloc和jeprof定期转储用于分析的内存。

坦率地说,几乎没有使用第二个解决方案的必要。以前我们在YARN上应用了这个解决方案,但发现一些作业占用了巨大的内存。由于JVM限制了最大内存,所以应该存在一些与本机内存相关的问题。因此,我们通过使用Jemalloc和jeprof来分析内存,并找出准确的本机内存。例如,某些用户会自行解析设置文件。他们在每次解析文件之前都会解压文件,最终导致内存用尽。

这是导致内存溢出的情景。如果使用用于节省本机内存的状态后端RocksDB,RocksDB可能会导致内存溢出的可能性增加。因此,我们在Flink 1.10中向社区提供了一个功能。该功能可以管理RocksDB的内存,并由state.backend.rockdb.memory.managed参数进行控制。该功能默认情况下是启用的。

这张图是关于什么的?

image.png

RocksDB没有内存控制。它有四个状态,分别是value、list、map和window。在顶部行中,显示了RocksDB当前使用的总内存大小,包括块缓存使用量和RocksDB写入缓冲区的大小,可以看出四种状态的总内存使用量超过400MB。

这是因为Flink RocksDB目前没有限制状态的数量。状态是指独占写缓冲和块缓存的列族。默认情况下,列族最多可以占用2个64MB的写缓冲和1个8MB的块缓存。一个状态使用136MB,四个状态使用544MB。

如果启用了state.backend.rockdb.memory.managed,则这四个状态基本上都会使用相同的块缓存。

image.png

为什么会这样呢?那是因为使用了缓存共享功能。换句话说,LRU缓存创建了一个状态,无论处于什么样的情况,都会将内存分散和调度,并释放最近使用的内存。因此,通过启用Flink 1.10及以上版本的state.backend.rockdb.memory.managed,可以解决大部分问题。

image.png

然而,在开发过程中发现RocksDB的缓存共享设计并不是很好。这其中存在一些实现上的问题,导致无法实施严格的缓存。启用RocksDB的缓存共享可能会出现奇怪的NPE问题。换句话说,RocksDB的缓存共享在特定场景下可能无法正常工作。在这种情况下,可能需要增加taskmanager.memory.task.off-heap.size来获取额外的缓冲区空间。

当然,首先需要了解正在使用的内存情况。在先前的内存监控图中,需要将参数 state.backend.rockdb.metrics.block-cache-usage 设置为 true。这样,就可以在指标监控图中获取相关指标,并观察内存使用过多的情况。例如,默认情况下,1GB状态TM的管理器使用了294MB。

经理有时会占用300MB或310MB的空间。在这种情况下,可以通过调整taskmanager.memory.task.off-heap.size参数(默认值为0)来增加一些内存,例如64MB的内存。这将为Flink留出额外的空间,以增加RocksDB缓冲区,避免因OOM而被杀掉。目前这是可用的解决方案。然而,为了根本解决问题,需要与RocksDB社区合作。

如果你遇到类似的问题,请与我们联系,我们将乐意跟进相关问题。

示威活动 (Dǐ mó shì

最后,我们来演示一下hostPath的用法。大多数yaml文件与社区实例相同。只需将任务管理器的yaml文件更改如下。

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
spec:
  replicas: 2
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      containers:
      - name: taskmanager
        image: reg.docker.alibaba-inc.com/chagan/flink:latest
        workingDir: /opt/flink
        command: ["/bin/bash", "-c", "$FLINK_HOME/bin/taskmanager.sh start; \
          while :;
          do
            if [[ -f $(find log -name '*taskmanager*.log' -print -quit) ]];
              then tail -f -n +1 log/*taskmanager*.log;
            fi;
          done"]
        ports:
        - containerPort: 6122
          name: rpc
        livenessProbe:
          tcpSocket:
            port: 6122
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf/
        - name: state-volume
          mountPath: /dump/1/state
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j.properties
            path: log4j.properties
      - name: state-volume
        hostPath:
          path: /dump/1/state
          type: DirectoryOrCreate

常见的问题

1、Flink如何使用Kubernetes的Pod与HDFS进行交互?

Flink与HDFS的互动很简单。只需将相关依赖复制到镜像中。将flink-shaded-hadoop-2-uber-{hadoopVersion}-{flinkVersion}.jar放置在flink-home/lib目录下,并将一些Hadoop配置文件(如hdfs-site.xml和core-site.xml)放置在可访问的目录中。然后,Flink就可以访问HDFS了。这个过程与从不具备HDFS的集群节点访问HDFS相同。

2、在Kubernetes上,Flink是如何实现高可用性的?

Flink集群的高可用性与Flink在Kubernetes上运行无关。Flink社区的高可用性需要依赖ZooKeeper的支持。在高可用性中,ZooKeeper需要实现检查点ID计数器、检查点停止和流图停止。因此,在Kubernetes集群上为Flink提供ZooKeeper服务是实现高可用性的核心。ZooKeeper集群可以部署在Kubernetes上,也可以部署在物理主机上。另一方面,社区也在尝试使用etcd为Kubernetes提供高可用性解决方案。目前,只有ZooKeeper能够提供工业级的高可用性。

3、Flink在Kubernetes上和在YARN上哪个更好?如何进行选择?

目前来说,Flink on YARN是一个相对成熟的系统,但并不适用于云原生环境。在将服务迁移到云端的趋势中,Flink on Kubernetes有着光明的未来。虽然Flink on YARN是一个成熟的系统,但可能无法满足新的需求和挑战。例如,相较于YARN,Kubernetes在GPU调度和流水线创建方面有更高的性能。

如果只是执行工作,相对成熟的Flink on YARN也可以稳定运行。另一方面,Flink on Kubernetes是新的且受欢迎的,易于迭代。然而,使用Flink on Kubernetes需要陡峭的学习曲线,并需要一个健全的Kubernetes O&M团队的支持。而在Kubernetes虚拟化中,不可避免地会出现一定的磁盘和网络性能损失,这可以说是一些缺点。当然,虚拟化(容器化)也带来了更明显的优势。

4、你如何配置/etc/hosts文件?为了与HDFS进行互动,需要将HDFS节点的IP地址和主机映射到/etc/hosts文件中。

您既可以通过Volume将ConfigMap内容挂载到/etc/hosts,并进行映射,也可以依靠CoDNS而无需修改/etc/hosts。

5、如何有效地从容解决在Kubernetes上使用Flink时遇到的问题?

首先,從故障排除的角度來看,我們需要了解Flink在Kubernetes和Flink在YARN上的區別。也許Kubernetes本身存在問題,這讓人有點困擾。可以將Kubernetes視為一個具有多個複雜組件的操作系統。YARN是一個基於Java的資源調度器。集群的大多數異常都是由主機故障引起的。在我看來,相對於YARN,Kubernetes的故障排除更困難。Kubernetes有很多組件。如果DNS解析無法正常工作,需要檢查CoDNS的日誌。對於網絡或磁盤錯誤,需要檢查kube的事件。如果Pod異常結束,需要了解為什麼事件Pod會結束。老實說,支持,尤其是O&M支持是必要的。

在处理Flink的故障排除方面,无论是在Kubernetes还是YARN上,故障排除的方法是相同的。

    • 例外が発生していないかログを確認する。

 

    • パフォーマンスの問題については、jstackを使ってCPUとコールスタックに例外がないかチェックする。

 

    OOMリスクが常に存在したり、フルGCが発生しやすかったりする場合は、jmapを使ってメモリを占有するブロックを確認したり、メモリリークがないか分析する。

这些故障排除方法是与平台无关的,适用于所有场景。需要注意的是,Pod镜像可能缺少部分调试工具。建议在构建Flink在Kubernetes集群时创建私有镜像,并安装相应的调试工具。

此博客为英文版翻译而成,原文位于此处可供参考。部分使用机器翻译,请如有翻译错误之处,敬请指正。

阿里巴巴云拥有日本的两个数据中心,并拥有超过60个可用区域,是亚太地区排名第一的云基础设施服务提供商(2019年Gartner报告)。欲了解更多阿里巴巴云的详细信息,请访问阿里巴巴云日本官方网页。

bannerAds