如果你是一位数据工程师,那么你一定想了解在Kubernetes上可以做些什么
首先
各位,大家是否了解Kubernetes呢?
在WEB服务器和游戏API等方面,它已被广泛采用,
实际上,它与批处理非常相配,我想向大家介绍一下。
顺便提一下,在某个项目中实际上已经将其作为批处理的一部分整合到了大数据分析基础设施当中,并且目前正在积极运行作为大数据分析基础设施的一部分。
Kubernetes和Docker
在了解Kubernetes之前,让我们先了解Docker。
基本上,我打算以Google Kubernetes Engine(GKE)为基础来解释Kubernetes环境。
Docker – 使用Docker
Docker 是一种软件容器技术,可以将应用程序打包成环境特定的技术之一。
近年来,尽管通过软件包管理器可以很好地管理软件包,
但是在处理诸如 Python 2.x 和 3.x 之类的不同运行环境时,仍然存在一些批处理需求,
例如将本地和批处理服务器的执行环境区分开来,可能导致数据损坏等问题的发生。
此外,由于其他项目批处理服务器的执行环境版本差异等原因,
可能还有人需要根据不同项目切换执行环境。
在这种情况下,Docker 是非常方便的,可以将执行环境封装并共享。

在这个批处理中,Docker所覆盖的范围示意图。
这个图中的中间件指的是Python3和Google Cloud SDK等。
应用程序作为批处理的核心。
Kubernetes = Kubernetes
这是一个管理Docker运行的工具。除了可以简单地运行外,使用Kubernetes可以在Web服务器负载较大的情况下将虚拟机从3台增加到10台,或者在负载减少时自动减少到3台等,可以进行设置。此外,该工具还广泛采用了容器技术以及基础设施即代码和不变基础设施等概念。
通过Docker和Kubernetes的组合,您可以方便地将应用程序环境打包,并随意运行Docker的图像,以便以整体方式运行应用程序。
对于数据工程师来说,Kubernetes的功能非常方便。
工作
我认为在Kubernetes上运行服务器时,大多数情况下会在Pod模式下运行。
Job是用于管理Pod的资源,与Pod不同,Job被管理为默认结束。
在Kubernetes中执行Job时,可以在Job的清单中设置以下项目。
-
- 成功数
-
- 並列数
-
- 再実行の設定
- 再実行回数
可以进行设定。
此外,作业的执行历史记录由Kubernetes管理,其中包含了正常完成的信息。不能重复注册相同名称的作业(需要先删除再重新注册)。我们可以通过执行历史记录来调查曾经失败的作业日志。

成功数: spec.completions
成功的数量: spec.completions
完成数量: spec.completions
完成次数: spec.completions
您可以直接设置成功次数。
将其设置为1,Kubernetes将一直执行直到成功。
此外,您可以设置为2或更高的数字,以便执行多次直到成功。
并行数:spec.parallelism
并列数是指设置作业并行运行的数量。
如果设置为大于2的数字,则可以同时运行相同的批处理作业。
例如,在使用机器学习进行决策处理时,以3个并行运行并取多数投票可能会很有用。
需要注意的是,并行化不会超过完成次数。
例如,如果并行数为3且成功次数为3,只会并行化到2个。
再执行设置:spec.template.spec.restartPolicy
可以设置常常再执行(Always)、从不再执行(Never)和仅在失败时重新执行(OnFailure)。
在OnFailure模式下,如果批处理作业异常结束,将重新从头开始执行作业。
这简直太方便了。
重试次数:spec.backoffLimit
这是关于失败时重复执行次数的设置。如果设置为5,作业即使失败也会重复执行最多5次。
自动缩放
我认为学习Kubernetes的动力之一是自动伸缩功能。如果在作业中使用,只需在节点池中设置自动伸缩,它可以自动调整根据处理需求来使用的虚拟机数量。例如,在平时使用一台虚拟机执行批处理的集群中,如果要一次性执行1000个用于传输历史数据的批处理作业,只需将节点池设置为自动伸缩,临时将虚拟机数量增加到200台也是可能的。在GKE中,可以手动设置超过自动伸缩上限的虚拟机数量,也可以利用这一点使作业消失时自动适应自动伸缩的范围。
定时任务
CronJob是定期执行工作的方式。类似于Unix命令crontab,通过设置spec.schedule字段为0 1 * * *,可以在指定时间自动执行该工作。另外,可以通过设置spec.concurrencyPolicy字段来决定是否允许同时执行,以及当前处理未完成时的行为。例如,可以跳过本次执行,或者停止前一个工作并开始新的执行等。CronJob可以管理工作的执行。

等待
可以使用kubectl wait命令来检测作业的完成和失败等情况。
通过这种方式,可以与另一个作业结合起来实现批处理作业的并行执行和串行执行。

假设图4是为了将Parent Job中的Child Job注册到Kubernetes上。如果直接进行注册,步骤2的Job会继续执行步骤1-1和1-2。但是,可以通过插入wait命令来等待完成后再进行注册。
在将数据加载到BigQuery并执行SQL等操作时非常方便。
使用Kubernetes执行批处理作业的注意事项。
使程序在錯誤時異常終止。
这主要是关于批处理程序的设计问题,
我们应该在失败时返回0作为正常结束状态,返回非0作为异常结束状态。
如果不这样做,Kubernetes将无法识别到异常结束,导致不会重新执行并被视为成功处理。
这其实是一件理所当然的事情,但在平时自行管理状态的情况下容易被忽视。
冪等性指的是操作可以多次执行而不会产生不同的结果。原子性指的是操作要么全部执行成功,要么全部不执行。
在写数据基础设施和批处理程序时,或许不用再次强调一遍,但是我们需要在容器级别确保批处理具有幂等性和原子性。幂等性意味着使用相同的参数进行执行将得到相同的结果。原子性表示必须要么全部成功,要么全部失败,就像事务一样。考虑到Kubernetes可能会自动重新执行任务,我认为确保无论执行多少次都能得到相同结果几乎是必需的。至于原子性,我认为也可以考虑添加恢复功能。
突然死亡
在执行容器时最令人担忧的是由于Kubernetes本身出现某种障碍而无法检测到失败,导致容器停止运行而没有任何错误提示。(虽然罕见)
在需要严格一致性的情况下。
-
- 成功した場合に成功したことをコンテナ外に通知・書き込む仕組み
- 外部での自動整合性チェック処理の実装
我建议您考虑将容器中的数据设计和构建为基本挥发性。
我经常制作的模式
工作的设定
通常情况下,我们经常使用以下配置来执行一次性的批处理处理。
-
- 並列数:1
-
- 成功数:1
-
- 再実行回数:5
- 再実行の設定:OnFailure(失敗時のみ再実行)
使用这个设置去运行作业的话,它会持续重复执行直到成功,而且在资源有空闲的情况下会被调度执行。这种机制使得即使有大量的一次性批处理任务产生,也可以通过Kubernetes进行管理,非常方便。只需注册数千个作业,便可让虚拟机自动增加并执行直到完成。
批处理
关于批处理程序的构建,不仅仅限于Kubernetes的处理方式,
我们经常准备了一个批处理程序,该程序以日期作为参数接收,如果没有参数,则作为昨天的日期运行。
获取日期并将其作为前一天的数据运行在幂等性方面有点微妙,但建议在CronJob和Job中使用相同的脚本。
此外,原子性的实现通常会使其行为接近原子性,虽然不完全,例如通过文件覆盖或表替换。
(在这种情况下,如果失败,可能会残留一些不完整的文件。)
总结
由于能够保证成功次数并执行的机制,批处理的管理变得非常容易。
无需构建和运营像AirFlow这样的批处理管理服务器,就可以轻松执行一次性任务,
此外,不再需要处理由外部因素导致的任务失败,也不需要提取失败的任务并重新执行。
我认为特别方便的是,如果要并行执行1000个批处理任务,使用批处理服务器的话,
可能会发生各种问题,如CPU资源、网络资源以及与其他人的协调等。
通过在Kubernetes上执行任务,可以专注于批处理任务的执行,
这对于数据分析基础设施开发等任务容易出现任务过载的项目非常有用。
希望这篇文章能够激发某人的动力,让他们能在Kubernetes上找到幸福。
请参考以下资料。
-
- 青山 真也(2018)『Kubernetes完全ガイド』(https://amzn.to/36k6qAZ)
-
- Kubernetes Documentation:
-
- https://kubernetes.io/docs/concepts/workloads/controllers/jobs-run-to-completion/
Kubectl Reference Docs – Kubernetes:https://kubernetes.io/docs/reference/generated/kubectl/kubectl-commands#wait