用 Spark × Keras × Docker 技术实现了可扩展的深度学习
用Spark × Keras × Docker的组合尝试将深度学习变得可扩展。
2017年2月,Yahoo!发布了一个允许使用Spark进行分布式处理的TensorFlow库。
http://yahoohadoop.tumblr.com/post/157196317141/open-sourcing-tensorflowonspark-distributed-deep
我个人把这个东西放在Docker上玩了一下,然后就想试试Keras怎么样?所以这次打算用Docker来运行Dist-Keras,将Keras在Spark上进行分布式处理。
我們的目標是將深度學習函式庫以可擴展的方式應用。通過運用GPGPU,深度學習在擴展性方面正在努力。
然而,如果将处理加速性的需求分别放在计算设备性能的规模化(Scale-up)和计算设备数量的规模化(Scale-out)上相结合,就可以实现更快的处理速度。
此外,个人觉得没有GPU的穷人应该尽量尝试通过规模化来解决(这个更加切实(笑))。
因此,我尝试将深度学习库Keras装载到Docker容器中,并启动多个容器以实现通过规模化进行负载均衡的尝试。
由于普通的Keras无法进行规模化,这次我们将使用一个能在Spark上运行Keras的库。
Spark上的Keras
据说在Spark中有两个库可以将Keras进行分布式处理。
第一个是Elephas,据说这个项目大概有一年的历史了。
http://maxpumperla.github.io/elephas/
另一个选择是Dist-Keras,这是CERN(欧洲核子研究组织)开发的。
http://joerihermans.com/work/distributed-keras/
https://db-blog.web.cern.ch/blog/joeri-hermans/2017-01-distributed-deep-learning-apache-spark-and-keras
当谈到CERN时,它是STEINS;GATE中SERN的灵感来源。CERN也似乎了解岡部仓太郎。
这次要做的事情
我打算使用Docker来运行CERN的Dist-Keras,并尝试执行MNIST任务。我选择Dist-Keras的原因是它最近有更新,并且它确实运作良好。
由于我曾经试过触摸大象,所以也许不久将来我会再次与它玩耍。
在Docker中运行Dist-Keras的原因是,容器比虚拟机更容易准备环境。
虽然也可以建立多个EC2实例,但是费用会比较高,所以我选择在一台虚拟机中启动了3个Docker容器。
其中一台是Spark主节点兼工作节点(从节点),剩下的两台是工作节点。
这样就构建了这样一种架构。

分布式Keras
请阅读CERN对Dist-Keras的热烈讨论。
我会对其进行一次意译。
Distributed Keras是一个使用Apache Spark和Keras构建的分布式深度学习框架。其目标是显著改善机器学习的训练时间,实现分析超过内存容量的数据集。该项目于2016年8月与CMS合作开始。
建筑学
一般来说,学习过程通过Lambda函数传递给Spark工作器。然而,例如当传递多个参数,如参数服务器端口号时,我们会将它们包装在一个对象中,并将其转换为学习函数,以便Spark可以使用所需的参数。为了全面了解,我们使用下图进行解释。首先,学习对象在Spark驱动程序上启动参数服务器。然后启动工作器进程,其中包含了训练Keras模型所需的所有参数和进程。此外,为了准备所需数量的并行处理工作器,将数据集按设定的容量进行分割。然而,处理大数据时,增加并行处理要素是可取的。这样,当一个工作器处于空闲状态时,可以避免其他较低能力的工作器没有完成批处理的情况(也称为滞后问题)。在这种情况下,建议按照Spark文档的设置将分布式元素设置为3。
然而,在某些情况下,可能需要考虑使用更大的分散因素。基本上,分散处理与分区数(即分割数)成比例。举个例子,假设将20个工作者分配给某个任务,并将分散因素设置为3。Spark将将数据集分割成60个分片,并且在工作者开始处理分区之前,首先需要加载完成执行该任务所需的所有Python库,还需要将Keras模型反序列化并编译。这个过程会产生很大的开销。因此,该技术只在需要长时间预热开销的非均质系统(即每个工作者加载不同的硬件或值的情况下)并且处理大型数据集时才有效。

Dist-Keras 安装
Dist-Keras 在 Github 上公开了。
https://github.com/cerndb/dist-keras
在安装Dist-Keras之前,需要先安装Spark和Keras。
以下是这些安装的方法链接:
https://spark.apache.org/docs/latest/index.html
https://keras.io/ja/
Dist-Keras的安装方法如下命令所示。
## if you need git, run "yum -y install git" beforehand
git clone https://github.com/JoeriHermans/dist-keras
cd dist-keras
pip install -e .
我在虚拟机上的CentOS7.3和Docker容器(CentOS7.3)上进行了尝试,没有遇到任何问题,成功安装了。
重新确认一下,这次要做的事情。
好的,話長了些,本次要做的是將Dist-Keras在Docker容器中運行,將Keras模型的訓練在容器之間進行分散。
改寫後的中文句子如下:
好的,說得比較冗長,這次要做的是在Docker容器中運行Dist-Keras,將Keras模型的訓練在容器之間進行分散。

Docker上的Dist-Keras
Dist-Keras缺乏Dockerfile和Docker镜像,所以我自己创建了一个。
https://github.com/shibuiwilliam/distkeras-docker
当您使用git clone命令时,将会下载Dockerfile和spark_run.sh文件。
git clone https://github.com/shibuiwilliam/distkeras-docker.git
请进入distkeras-docker目录并进行docker build操作。
cd distkeras-docker
docker build -t distkeras .
虽然会花些时间,但让我们慢慢等待吧。
Docker build主要做的事情如下。
-
- 必要なツールのインストール
-
- PythonやJupyter Notebookのインストール、構成
-
- Standalone Sparkのインストール、構成
-
- Dist kerasのインストール
- SparkMaster & Jupyter Notebook起動用スクリプトspark_master.sh、spark_slave.shを追加
基础容器是CentOS。Spark版本为2.1.0,Keras版本为2.0.2,都使用的是最新版。
docker build完成后,我们立即尝试运行。
这次为了扩展规模,我们将启动3个Docker容器。
# docker dist-keras for spark master and slave
docker run -it -p 18080:8080 -p 17077:7077 -p 18888:8888 -p 18081:8081 -p 14040:4040 -p 17001:7001 -p 17002:7002 \
-p 17003:7003 -p 17004:7004 -p 17005:7005 -p 17006:7006 --name spmaster -h spmaster distkeras /bin/bash
# docker dist-keras for spark slave1
docker run -it --link spmaster:master -p 28080:8080 -p 27077:7077 -p 28888:8888 -p 28081:8081 -p 24040:4040 -p 27001:7001 \
-p 27002:7002 -p 27003:7003 -p 27004:7004 -p 27005:7005 -p 27006:7006 --name spslave1 -h spslave1 distkeras /bin/bash
# docker dist-keras for spark slave2
docker run -it --link spmaster:master -p 38080:8080 -p 37077:7077 -p 38888:8888 -p 38081:8081 -p 34040:4040 -p 37001:7001 \
-p 37002:7002 -p 37003:7003 -p 37004:7004 -p 37005:7005 -p 37006:7006 --name spslave2 -h spslave2 distkeras /bin/bash
Spark集群由Spark主节点(spmaster)和两个工作节点(spslave1和spslave2)组成。
在spmaster上运行Spark主节点和工作节点,并将spslave1和spslave2作为工作节点加入Spark集群。
Spark主节点和工作节点的启动脚本已经准备好,分别为 /opt/spark_master.sh 和 /opt/spark_slave.sh。
每个容器在启动时会切换到 /opt/ 目录下,所以在那里执行命令即可启动主节点和工作节点。
# Spark masterで実行
# マスターとワーカーが起動し、ワーカーがクラスターに参加
sh spark_master.sh
# Spark spslave1, 2で実行
# ワーカーが起動し、マスターのクラスターに参加
sh spark_slave.sh
在Spark集群中,现已有1台主节点和3台工作节点参与。可以通过Spark主节点的控制台进行确认。网址为http://<spark主节点>:18080。

我想用Docker Compose或Swarm一次性启动多台机器,但这也是以后的作业。
用Spark进行Keras上的MNIST
我要用Dist-Keras来做MNIST。
MNIST的示例代码可以在Dist-Keras中找到。
该示例数据和程序位于/opt/dist-keras/examples目录下。
[root@spm examples]# tree
.
|-- cifar-10-preprocessing.ipynb
|-- data
| |-- atlas_higgs.csv
| |-- mnist.csv
| |-- mnist.zip
| |-- mnist_test.csv
| `-- mnist_train.csv
|-- example_0_data_preprocessing.ipynb
|-- example_1_analysis.ipynb
|-- kafka_producer.py
|-- kafka_spark_high_throughput_ml_pipeline.ipynb
|-- mnist.ipynb
|-- mnist.py
|-- mnist_analysis.ipynb
|-- mnist_preprocessing.ipynb
|-- spark-warehouse
`-- workflow.ipynb
我們將運行mnist.py,但需要根據環境進行一些程式碼的更改。
复制原文件进行备份,并应用以下更改。
cp mnist.py mnist.py.bk
修改处1: 导入SparkSession
我会在下方添加冒号。
from pyspark.sql import SparkSession
修改部分2: 参数设置
将Spark的参数调整到适应这次环境。
调整的意图如下。
-
- Spark2を使うこと
-
- ローカル環境を使うこと
-
- ローカル環境でマスターurlを定義
- ワーカー数を1から3に変更
# Modify these variables according to your needs.
application_name = "Distributed Keras MNIST"
using_spark_2 = True # False→True
local = True # False→True
path_train = "data/mnist_train.csv"
path_test = "data/mnist_test.csv"
if local:
# Tell master to use local resources.
# master = "local[*]" comment out
master = "spark://spm:7077" # add
num_processes = 1
num_executors = 3 # 1→3
else:
# Tell master to use YARN.
master = "yarn-client"
num_executors = 20
num_processes = 1
变更点3:工人的内存
将工作内存从4G降低到2G。
这只是根据自己的环境进行的简单适配,并不是必要的更改。
conf = SparkConf()
conf.set("spark.app.name", application_name)
conf.set("spark.master", master)
conf.set("spark.executor.cores", `num_processes`)
conf.set("spark.executor.instances", `num_executors`)
conf.set("spark.executor.memory", "2g") # 4G→2G
conf.set("spark.locality.wait", "0")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
我们已经准备好了。让我们运行MNIST。
python mnist.py
可以通过Spark主控台检查执行情况。
http://<spark主控节点>:18080

当然可以打开Jobs或Executer等详细信息。


三台工作机加载了数据并进行了MNIST的训练。
顺便说一下,MNIST模型如下所示。
_________________________________________________________________
Layer (type) Output Shape Param #
=================================================================
conv2d_1 (Conv2D) (None, 26, 26, 32) 320
_________________________________________________________________
activation_1 (Activation) (None, 26, 26, 32) 0
_________________________________________________________________
conv2d_2 (Conv2D) (None, 24, 24, 32) 9248
_________________________________________________________________
activation_2 (Activation) (None, 24, 24, 32) 0
_________________________________________________________________
max_pooling2d_1 (MaxPooling2 (None, 12, 12, 32) 0
_________________________________________________________________
flatten_1 (Flatten) (None, 4608) 0
_________________________________________________________________
dense_1 (Dense) (None, 225) 1037025
_________________________________________________________________
activation_3 (Activation) (None, 225) 0
_________________________________________________________________
dense_2 (Dense) (None, 10) 2260
_________________________________________________________________
activation_4 (Activation) (None, 10) 0
=================================================================
Total params: 1,048,853.0
Trainable params: 1,048,853.0
Non-trainable params: 0.0
_________________________________________________________________
程序大约在30分钟左右完成。
MNIST的结果
我们用3台机器工作的结果如下。
Training time: 1497.86584091
Accuracy: 0.9897
Number of parameter server updates: 3751
在不到25分钟的学习时间内,达到了0.9897的准确率。
算是不错的成绩。
顺便说一下,试着只使用一台工人进行尝试的结果如下。
Training time: 1572.04011703
Accuracy: 0.9878
Number of parameter server updates: 3751
无论是三个工人还是一个工人,差别只有大约一分钟而已(-_-;)

未来的展望
我們這次在同一個伺服器上啟動了Docker容器,但我認為真正的負載均衡應該跨伺服器實現。
未來,我們希望在多個伺服器或ECS上啟動Dist-Keras容器,構建Spark叢集並進行負載均衡。
【2017/04/16 追記】
我们验证了性能改进的方法。
http://qiita.com/cvusk/items/f54ce15f8c76a396aeb1
【2017年05月26日 更新】
我们选择了Kubernetes作为我们的集群管理工具。
链接:http://qiita.com/cvusk/items/42a5ffd4e3228963234d