在Kubernetes上验证Apache Spark
Apache Spark on Kubernetes 是什么
Apache Spark(以下称为Spark)从v2.3.0开始支持原生的 Kubernetes。在此之前,通过事先准备好的 Spark/Hadoop 和相应的计算资源,由管理员进行并行计算。现在通过原生支持的 Kubernetes,管理员只需准备好用于 Spark 的 Pod(容器)即可,无需担心用于 Spark 的计算资源。
大致执行流程如下:Spark 用户执行 Spark 的 CLI(spark-submit 命令),通过 Kubernetes 的 API 服务器,通过调度器指定要执行 Spark 的计算资源(节点)。然后,Spark Driver 的 Pod(负责管理 Executor 的 Pod)和用于计算的 Spark Executor 的 Pod 将部署并进行并行计算。此外,每当 Spark 完成计算时,会删除/停止相关的 Pod,以避免浪费计算资源。换言之,在使用按需计费的公共云(如 AWS 或 GCP)时,由于不会一直存在像守护进程一样启动的 Pod,因此计算资源(CPU、内存)的费用只会按实际使用量计算。
通过这种方式,Spark 用户只需知道要指定的 Kubernetes URL,即可轻松获得使用大量计算资源的并行计算环境。这样一来,当自身拥有的计算资源不足以完成任务时,用户可以方便地通过并行计算环境解决问题,无需花费额外的时间和精力,只需支付相应的费用即可。
处理流程
我们将使用下图按顺序追踪Spark在Kubernetes上的处理流程。

-
- (Step1)ユーザにて’spark-submit’コマンドを使い、計算の実行を指示します
-
- (Step2) KubernetesのNode上にSpark DriverのPod(コンテナ)がデプロイされます
-
- (Step3) SparkDriverがspark-submitで指定された数のExecutorのPodをデプロイし、各Executorにて計算が実行されます
- (Step4) 計算が終了したExecutorのPodは自動で削除されます。全てのExecutorのPodが削除された後、最後にSpark DriverのPodが停止(Terminate)されます。計算結果は、Spark DriverのPodのログに出力されます。
由于Spark Driver的Pod不会自动删除,因此在确认计算结果后,需要使用kubectl命令手动删除它。
在 Spark v2.3.0 中的限制条件
Spark在Kubernetes上仍然是一个处于开发初期的项目。
因此,以下内容被列为未来的工作。
-
- PySpark
-
- R
-
- Dynamic Executor Scaling
-
- Local File Dependency Management
-
- Spark Application Management
- Job Queues and Resource Management
使用Python或者R在Kubernetes上尝试Spark可能还需要一段时间。请参考此处获取更多详细信息。
验证行动
接下来,我们将实际运行和验证Kubernetes上的Spark。
验证环境将采用以下环境。
-
- Sparkのコンパイル/CLI実行環境: Ubuntu 16.04.4 LTS + Docker 18.03.1-ce
-
- Kubernetes環境: Kubernetes v1.10.2
-
- (kubeadmを使いVirtual Box上に構築したMaster, WorkerNode x 2VMの合計3VMのクラスタ環境)
-
- コンテナレポジトリ: Docker Hub
- サンプルアプリ: Sparkに付属のSparkPi (Javaで書かれたπの計算プログラム)
关于操作系统、Docker和Kubernetes本身的设置以及Docker Hub用户的创建,将其省略。
下载并编译
从Spark的下载网站上下载Spark 2.3.0版本。
$ wget http://ftp.jaist.ac.jp/pub/apache/spark/spark-2.3.0/spark-2.3.0.tgz
$ gzip -cd spark-2.3.0.tgz | tar xvf -
$ cd spark-2.3.0/
我們將在spark-2.3.0的目錄下進行操作。然後,我們將編譯下載的Spark源碼(大約需要45分鐘)。
$ build/mvn -DskipTests clean package
<snip>
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary:
[INFO]
[INFO] Spark Project Parent POM ........................... SUCCESS [09:56 min]
[INFO] Spark Project Tags ................................. SUCCESS [05:12 min]
[INFO] Spark Project Sketch ............................... SUCCESS [ 9.913 s]
[INFO] Spark Project Networking ........................... SUCCESS [ 58.391 s]
[INFO] Spark Project Shuffle Streaming Service ............ SUCCESS [ 12.961 s]
[INFO] Spark Project Unsafe ............................... SUCCESS [ 32.245 s]
[INFO] Spark Project Launcher ............................. SUCCESS [02:26 min]
[INFO] Spark Project Core ................................. SUCCESS [05:59 min]
[INFO] Spark Project ML Local Library ..................... SUCCESS [02:44 min]
[INFO] Spark Project GraphX ............................... SUCCESS [ 25.961 s]
[INFO] Spark Project Streaming ............................ SUCCESS [01:10 min]
[INFO] Spark Project Catalyst ............................. SUCCESS [02:45 min]
[INFO] Spark Project SQL .................................. SUCCESS [05:33 min]
[INFO] Spark Project ML Library ........................... SUCCESS [02:18 min]
[INFO] Spark Project Tools ................................ SUCCESS [ 10.025 s]
[INFO] Spark Project Hive ................................. SUCCESS [02:19 min]
[INFO] Spark Project REPL ................................. SUCCESS [ 6.327 s]
[INFO] Spark Project Assembly ............................. SUCCESS [ 3.664 s]
[INFO] Spark Project External Flume Sink .................. SUCCESS [ 33.437 s]
[INFO] Spark Project External Flume ....................... SUCCESS [ 12.331 s]
[INFO] Spark Project External Flume Assembly .............. SUCCESS [ 3.056 s]
[INFO] Spark Integration for Kafka 0.8 .................... SUCCESS [ 39.349 s]
[INFO] Kafka 0.10 Source for Structured Streaming ......... SUCCESS [ 33.933 s]
[INFO] Spark Project Examples ............................. SUCCESS [ 25.154 s]
[INFO] Spark Project External Kafka Assembly .............. SUCCESS [ 4.582 s]
[INFO] Spark Integration for Kafka 0.10 ................... SUCCESS [ 12.030 s]
[INFO] Spark Integration for Kafka 0.10 Assembly .......... SUCCESS [ 4.851 s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 45:57 min
[INFO] Finished at: 2018-05-1T23:21:21+09:00
[INFO] Final Memory: 86M/861M
[INFO] ------------------------------------------------------------------------
如果输出BUILD SUCCESS,则表示编译成功。
这次,我从源代码文件编译了,但也可以从Apache Spark的官方网站下载编译好的二进制文件。

Spark容器的创建和仓库注册。
我們將創建一個部署在Kubernetes上的Spark容器映像。我們會使用docker-image-tool.sh進行創建。如果需要,請在kubernetes/dockerfiles/spark目錄下的Dockerfile中添加你自己的Spark程序等。
$ sudo bin/docker-image-tool.sh -r ysakashita -t v2.3.0 build
<snap>
Successfully built f9cd85baa796
Successfully tagged ysakashita/spark:v2.3.0
$ sudo docker images
REPOSITORY TAG IMAGE ID CREATED SIZE
ysakashita/spark v2.3.0 f9cd85baa796 2 minutes ago 350MB
接下来,将构建的容器注册到仓库中。
$ sudo docker login
Login with your Docker ID to push and pull images from Docker Hub. If you don't have a Docker ID, head over to https://hub.docker.com to create one.
Username (ysakashita): ysakashita
Password:
$ sudo bin/docker-image-tool.sh -r ysakashita -t v2.3.0 push
The push refers to a repository [docker.io/ysakashita/spark]
<snip>
v2.3.0: digest: sha256:3252c88b5527a97b9743824ae283c40e9d69b8587155ebd8c4d6f1b451d972f8 size: 2626

如果已经注册,那么容器镜像就完成了。
然后,在Kubernetes上创建一个用于执行Spark on Kubernetes的账号。
$ kubectl create serviceaccount spark
$ kubectl create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=default:spark --namespace=default
在本次验证中,由于使用默认命名空间,因此请授予默认权限。
以上,準備工作已经完成。
接下来,我们将开始执行使用Spark on Kubernetes进行并行计算的步骤。
使用 Kubernetes 上的 Spark 执行并行计算。
使用Kubernetes上的Spark来执行并行计算。
使用spark-submit命令来执行。
$ kubectl cluster-info
Kubernetes master is running at https://192.168.0.23:6443
KubeDNS is running at https://192.168.0.23:6443/api/v1/namespaces/kube-system/services/kube-dns:dns/proxy
$ bin/spark-submit \
--master k8s://https://192.168.0.23:6443 \
--deploy-mode cluster \
--conf spark.executor.instances=3 \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.kubernetes.container.image=ysakashita/spark:v2.3.0 \
--class org.apache.spark.examples.SparkPi \
--name spark-pi \
local:///opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar
关于其他选项,请参考这里。
在spark-submit命令的执行结果中,如果显示Exit code为0(即正常结束),则表示计算已正确结束。
Container name: spark-kubernetes-driver
Container image: ysakashita/spark:v2.3.0
Container state: Terminated
Exit code: 0
使用 kubectl logs 命令查看 Spark Driver Pod 的日志以确认计算结果。
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
spark-pi-9efb8a5e1e3b38f789505ace52dac3fe-driver 0/1 Completed 0 29s
$ kubectl logs spark-pi-9efb8a5e1e3b38f789505ace52dac3fe-driver |less
<snip>
2018-05-03 01:04:22 INFO DAGScheduler:54 - Job 0 finished: reduce at SparkPi.scala:38, took 1.594295 s
Pi is roughly 3.144675723378617
<snip>
你可以通过 SparkPi 的执行结果来确认输出的是 3.144675723378617。
为了让用户能够确认执行结果,Spark Driver 的 Pod 会以 Ready 0/1 的状态停止,但不会被删除。
确认了执行结果后,请删除 Pod。
$ kubectl delete pods spark-pi-9efb8a5e1e3b38f789505ace52dac3fe-driver
正在计算处理中的Pod部署的验证。
首先,我们来看一下在使用spark-submit命令进行计算处理时,Kubernetes上Pod的部署情况。
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
spark-pi-9efb8a5e1e3b38f789505ace52dac3fe-driver 1/1 Running 0 4s
Spark Driver的Pod已经部署。
这是在处理流程章节中提到的(步骤2)的状态。

过几秒后,我会再次确认Pod的状态。
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
spark-pi-9efb8a5e1e3b38f789505ace52dac3fe-driver 1/1 Running 0 14s
spark-pi-9efb8a5e1e3b38f789505ace52dac3fe-exec-1 1/1 Running 0 7s
spark-pi-9efb8a5e1e3b38f789505ace52dac3fe-exec-2 1/1 Running 0 7s
spark-pi-9efb8a5e1e3b38f789505ace52dac3fe-exec-3 1/1 Running 0 7s
然后,现在可以确认已经部署了3个Spark Executor的Pod。这是在流程的章节中提到的(第3步)状态。

由于我们在这次使用spark-submit命令时将Executor数目指定为3,所以对应的是3个Pods。如果您想增加并行计算的数量,请尝试增加Spark Executor的数目。
另外,会等待一段时间。
接着,每个Spark Executor会在计算完成后自动删除。
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
spark-pi-9efb8a5e1e3b38f789505ace52dac3fe-driver 0/1 Completed 0 19s
spark-pi-9efb8a5e1e3b38f789505ace52dac3fe-exec-1 0/1 Terminating 0 12s
spark-pi-9efb8a5e1e3b38f789505ace52dac3fe-exec-2 0/1 Terminating 0 12s
spark-pi-9efb8a5e1e3b38f789505ace52dac3fe-exec-3 0/1 Terminating 0 12s
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
spark-pi-9efb8a5e1e3b38f789505ace52dac3fe-driver 0/1 Completed 0 29s
最后,可以确认Spark驱动程序的Pod处于READY 0/1的状态。
这是在处理流程章节中所述的(第4步)状态。

根据Spark并行计算的处理方式,在Kubernetes上部署和删除Pod。
清理
登录Docker Hub后,在”设置”标签下,选择”删除仓库”并点击”删除”按钮,以从存储库中删除已创建的Spark容器镜像。

从本地的Docker中删除容器镜像。
$ sudo docker images
REPOSITORY TAG IMAGE ID CREATED SIZE
ysakashita/spark v2.3.0 f9cd85baa796 14 hours ago 350MB
$ sudo docker rmi f9cd85baa796
Untagged: ysakashita/spark:v2.3.0
Untagged: ysakashita/spark@sha256:3252c88b5527a97b9743824ae283c40e9d69b8587155ebd8c4d6f1b451d972f8
Deleted: sha256:f9cd85baa79655ccefe0d7b646b499cbf568acd064c77add7e1773dfe8e14eaa
<snip>
最终,将删除已下载和编译的 Spark。
$ cd ..
$ rm -rf spark-2.3.0
专业知识
在本次验证中,我们遇到了Spark Executor的Pod无法被执行而一直处于挂起状态的问题。
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
<snip>
spark-pi-8d89987d72ba32c2a89b99876ca81129-exec-3 0/1 Pending 0 10s
<snip>
因此,我們可以使用kubectl describe命令來檢查失敗的Pod。
$ kubectl describe pods spark-pi-8d89987d72ba32c2a89b99876ca81129-exec-3
<snip>
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Warning FailedScheduling 12s (x6 over 27s) default-scheduler 0/3 nodes are available: 1 Insufficient memory, 3 Insufficient cpu.
由于CPU和内存不足,导致Kubernetes调度程序无法分配Pod运行的节点。根据Spark 2.3.0的官方文档,我们建议使用3个CPU和4GB的内存来启动一个简单的Spark应用程序,只需要一个执行器。因此,需要至少3个CPU和4GB以上的内存。因此,我增加了VirtualBox虚拟机的CPU和内存,成功地运行了Pod。在以下命令中,我修改了分配给VirtualBox虚拟机的CPU和内存。请确保在操作VirtualBox的主机上执行此命令之前将所有Kubernetes虚拟机关闭。
$ VBoxManage list vms
"k8s" {3c2bf51f-3344-431f-8a3e-5925e065389b}
"k8s-node1" {82226c81-30ac-4292-9987-e401d9f41356}
"k8s-node2" {e82c1953-1d60-4f3f-9a2b-282299249f78}
$ VBoxManage modifyvm k8s --memory 4096 --cpus 3
$ VBoxManage modifyvm k8s-node1 --memory 4096 --cpus 3
$ VBoxManage modifyvm k8s-node2 --memory 4096 --cpus 3

印象
第一次接触Spark on Kubernetes的印象是,它并不是为”Kubernetes”用户而设计的Spark集成工具,而是为”Spark”用户而设计的Kubernetes集成工具。对于已经使用Kubernetes的用户来说,可能会感觉有点不习惯,例如不使用Kubernetes的Replicaset,而是自己控制Executor的数量。
能够熟练使用Spark和Kubernetes的用户可能在全球范围内都很少见。我认为Spark的用户主要是那些拥有类似数据科学家专注于数据分析等技能的人。对于这样的用户来说,可以通过传统的Spark中使用的spark-submit命令来指定并行计算可能会让他们感到高兴。除了–conf参数外,–master参数可以指定为k8s://,其他选项都不变。因此,即使是不了解基础设施或Kubernetes的Spark用户,也觉得使用起来门槛较低。
此外,还需要考虑另一个方面的问题,即Spark用户是否可以创建和注册容器镜像。在这次验证中,由于使用了示例程序,所以Spark应用程序已经包含在容器中,但也可以将其放在外部而不包含在容器中(参见Using Remote Dependencies)。这样一来,每次更新Spark应用程序时,无需重新创建和注册容器镜像。只需由具备管理Kubernetes集群的基础设施技能的管理员创建并注册一次Spark on Kubernetes的容器镜像,Spark用户只需指定spark.kubernetes.container.image参数就可以使用Spark on Kubernetes。
如上所述,我认为Spark on Kubernetes对于”Spark”用户来说是易于使用的。要提到的遗憾之处是,在v2.3.0版本中尚不支持Job Queues。我认为Spark用户更多地使用批处理等作业,而不是交互式计算。关于这一点,它被列为将来的工作,我期待着它的到来。
参考资料
-
- Running Spark on Kubernetes
- apache-spark-on-k8s/spark
添加内容(2018年8月)。
我們在Kubernetes上發布了一篇關於驗證HDFS的文章。
這意味著我們可以在Kubernetes上管理存儲spark.files(或–files)的HDFS數據位置,這在”Using Remote Dependencies”中有提到的spark-submit命令中可以指定。