使用Python连接SensorTag、Kafka和Spark Streaming进行流处理 – 第5部分:通过Apache Toree在Jupyter上连接Spark
我准备配置一个Spark集群,并编写一些示例代码。我认为很多人都将Jupyter作为Python数据分析和机器学习的执行环境。使用Apache Toree,我们可以在Jupyter中以交互的方式编写Spark应用程序。您甚至可以使用Scala的REPL(交互式编程环境)直接在浏览器上运行,并在Jupyter中使用它。
闪耀
在Docker Compose中建立Spark集群。Docker Hub和GitHub上已经发布了许多用于Spark独立集群的镜像和docker-compose.yml文件。
-
- semantive/spark
-
- produktion/jupyter-pyspark
- gettyimages/docker-spark
我尝试了几个选项,但是semantive/spark给我留下了简单易用的印象。
Docker Compose:
关于semantive/spark 이미지的使用方法,请查阅Docker Images For Apache Spark。Docker Hub在这里,GitHub在这里。
从存储库中的docker-compose.yml文件中进行了一些更改。主要更改包括明确指定镜像标签以使Spark版本匹配,并指定SPARK_PUBLIC_DNS和SPARK_MASTER_HOST环境变量为云上虚拟机的公共IP地址。
version: '2'
services:
master:
image: semantive/spark:spark-2.1.1-hadoop-2.7.3
command: bin/spark-class org.apache.spark.deploy.master.Master -h master
hostname: master
environment:
MASTER: spark://master:7077
SPARK_CONF_DIR: /conf
SPARK_PUBLIC_DNS: <仮想マシンのパブリックIPアドレス>
SPARK_MASTER_HOST: <仮想マシンのパブリックIPアドレス>
ports:
- 4040:4040
- 6066:6066
- 7077:7077
- 8080:8080
volumes:
- spark_data:/tmp/data
worker1:
image: semantive/spark:spark-2.1.1-hadoop-2.7.3
command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://master:7077
hostname: worker1
environment:
SPARK_CONF_DIR: /conf
SPARK_WORKER_CORES: 4
SPARK_WORKER_MEMORY: 2g
SPARK_WORKER_PORT: 8881
SPARK_WORKER_WEBUI_PORT: 8081
SPARK_PUBLIC_DNS: <仮想マシンのパブリックIPアドレス>
depends_on:
- master
ports:
- 8081:8081
volumes:
- spark_data:/tmp/data
worker2:
image: semantive/spark:spark-2.1.1-hadoop-2.7.3
command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://master:7077
hostname: worker2
environment:
SPARK_CONF_DIR: /conf
SPARK_WORKER_CORES: 4
SPARK_WORKER_MEMORY: 2g
SPARK_WORKER_PORT: 8882
SPARK_WORKER_WEBUI_PORT: 8082
SPARK_PUBLIC_DNS: <仮想マシンのパブリックIPアドレス>
depends_on:
- master
ports:
- 8082:8082
volumes:
- spark_data:/tmp/data
volumes:
spark_data:
driver: local
启动Spark独立集群。
$ docker-compose up -d
打开Spark Master UI,查看集群的状态。
http://<仮想マシンのパブリックIPアドレス>:8080
执行Master容器中的spark-shell,检查Scala和Spark的版本。Spark的开发速度非常快,需要经常检查Scala版本,以免遇到意想不到的错误。
-
- Scala: 2.11.8
- Spark: 2.1.1
$ docker-compose exec master spark-shell
...
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.1
/_/
Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_131)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
朱庇特
我们将使用官方的jupyter/all-spark-notebook Docker镜像。这是一个包含Scala和Spark的一站式镜像。
阿帕奇托丽
Apache Toree是连接Jupyter到Spark集群的工具。除了PySpark外,还提供了Scala、SparkR和SQL的内核。
当查看Dockerfile时,发现安装了Apache Toree。
# Apache Toree kernel
RUN pip --no-cache-dir install https://dist.apache.org/repos/dist/dev/incubator/toree/0.2.0/snapshots/dev1/toree-pip/toree-0.2.0.dev1.tar.gz
RUN jupyter toree install --sys-prefix
docker-compose.yml文件
将Jupyter服务添加到Spark Standalone集群的docker-compose.yml文件中。
jupyter:
image: jupyter/all-spark-notebook:c1b0cf6bf4d6
depends_on:
- master
ports:
- 8888:8888
volumes:
- ./notebooks:/home/jovyan/work
- ./ivy2:/home/jovyan/.ivy2
env_file:
- ./.env
environment:
TINI_SUBREAPER: 'true'
SPARK_OPTS: --master spark://master:7077 --deploy-mode client --packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.3
command: start-notebook.sh --NotebookApp.password=sha1:xxx --NotebookApp.iopub_data_rate_limit=10000000
关于Jupyter服务的选项
在Spark Standalone集群中,由于没有使用Hadoop,因此我们添加了配置以使用Amazon S3作为分布式文件系统。这对于保存示例数据和Parquet文件非常方便。
图片 (tú
Jupyter / all-spark-notebook图像经常更新。 使用Apache Toree的Spark和Spark集群的版本出错并停止启动。 由于本次Spark集群的版本是2.1.1,因此将指定相同版本的图像标签。 对于jupyter / all-spark-notebook图像,只知道ID而不知道标签是不方便的。
由于Spark版本已经升级到了2.2.0,所以我们会使用2.1.1的标签。
我们将拉取标签为Docker镜像并在spark-shell中进行确认。
$ docker pull jupyter/all-spark-notebook:c1b0cf6bf4d6
$ docker run -it --rm \
jupyter/all-spark-notebook:c1b0cf6bf4d6 \
/usr/local/spark-2.1.1-bin-hadoop2.7/bin/spark-shell
确认了Spark集群、Spark和Scala的版本相同。
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.1
/_/
Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_131)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
我也会检查Jupyter的版本。
$ docker run -it --rm jupyter/all-spark-notebook:c1b0cf6bf4d6 jupyter --version
4.3.0
TINI_SUBREAPER 和 SPARK_OPTS
连接到远程Spark所需的两个必要配置是使用Apache Toree从Jupyter中。TINI_SUBREAPER环境变量将在init中使用Tini。
如果在Spark中不使用额外的Jar文件,只需在SPARK_OPTS环境变量中指定以下内容即可连接到远程的Spark Standalone集群。这与通常的spark-submit选项相同。
--master spark://master:7077 --deploy-mode client
如果有额外的Jar文件,则需要添加–packages标志。这是连接到亚马逊S3所需的包。
--packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.3
— 笔记本应用程序的iopub数据速率限制
使用Bokeh等可视化工具处理大型图像时,需要在Jupyter的启动脚本中指定选项。
- IOPub data rate exceeded when viewing image in Jupyter notebook
–笔记本应用程序密码
Jupyter的身份验证方法默认是令牌。如果频繁地启动和丢弃像Docker容器一样的对象,每次都要输入不同的令牌是很麻烦的,因此我们将其更改为密码验证。我们使用ipython来获取密码的哈希值。
$ docker run -it --rm jupyter/all-spark-notebook:c1b0cf6bf4d6 ipython
Python 3.6.1 | packaged by conda-forge | (default, May 23 2017, 14:16:20)
Type 'copyright', 'credits' or 'license' for more information
IPython 6.1.0 -- An enhanced Interactive Python. Type '?' for help.
生成密码的方式如下。将生成的哈希值作为Jupyter启动选项的指定值。
In [1]: from notebook.auth import passwd
In [2]: passwd()
Enter password:
Verify password:
Out[2]: 'sha1:xxx'
数量
/home/jovyan是运行Jupyter容器的用户的主目录。 备忘录和下载的Jar文件将被挂载到Docker主机上。
环境文件
将环境变量写入.env文件并传递给容器。请指定用于连接亚马逊S3的访问密钥和秘密密钥。
AWS_ACCESS_KEY_ID=xxx
AWS_SECRET_ACCESS_KEY=xxx
不要忘记向.gitignore文件中添加,以避免提交到Git中。
.env
使用 Jupyter 来操作 Spark 和 Amazon S3。
我打算用Scala和Python在Jupyter中编写一个使用Spark和Amazon S3的示例。我们将使用文章《使用Apache APIs监控实时Uber数据,第一部分:Spark机器学习》中使用的Uber接送数据作为示例。这里只是简单地从S3读取CSV文件并显示。
启动docker-compose.yml中定义的所有服务。
$ docker-compose up -d
使用浏览器打开Jupyter并使用刚刚创建的密码登录。
http://<仮想マシンのパブリックIPアドレス>:8888
数据准备
在克隆存储库后,使用s3cmd将uber.csv文件放入任意的存储桶中。
$ git clone https://github.com/caroljmcdonald/spark-ml-kmeans-uber
$ cd spark-ml-kmeans-uber/data
$ s3cmd put uber.csv s3://<バケット名>/uber-csv/
Scala (斯卡拉) is a programming language.
您可以将以下代码划分为单元格,并以交互方式执行以确认。如果您要编写Scala笔记本,请从右上方的新建按钮中选择Apache Toree – Scala。
import org.apache.spark.sql.SparkSession
val spark = SparkSession.
builder.
getOrCreate()
sc.hadoopConfiguration.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc.hadoopConfiguration.set("fs.s3a.fast.upload", "true")
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
val schema = StructType(
StructField("dt", TimestampType, true) ::
StructField("lat", DoubleType, true) ::
StructField("lon", DoubleType, true) ::
StructField("base", StringType, true) :: Nil
)
val df =
spark.read.
option("header", false).
schema(schema).
csv("s3a://<バケット名>/uber-csv/uber.csv")
df.printSchema
df.cache
df.show(false)
在Scala中,可以这样写StructType的模式。
val schema = (new StructType).
add("dt", "timestamp", true).
add("lat", "double", true).
add("lon", "double", true).
add("base", "string", true)
这是最后的df.show(false)的输出结果。
+---------------------+-------+--------+------+
|dt |lat |lon |base |
+---------------------+-------+--------+------+
|2014-08-01 00:00:00.0|40.729 |-73.9422|B02598|
|2014-08-01 00:00:00.0|40.7476|-73.9871|B02598|
|2014-08-01 00:00:00.0|40.7424|-74.0044|B02598|
|2014-08-01 00:00:00.0|40.751 |-73.9869|B02598|
|2014-08-01 00:00:00.0|40.7406|-73.9902|B02598|
|2014-08-01 00:00:00.0|40.6994|-73.9591|B02617|
|2014-08-01 00:00:00.0|40.6917|-73.9398|B02617|
|2014-08-01 00:00:00.0|40.7063|-73.9223|B02617|
|2014-08-01 00:00:00.0|40.6759|-74.0168|B02617|
|2014-08-01 00:00:00.0|40.7617|-73.9847|B02617|
|2014-08-01 00:00:00.0|40.6969|-73.9064|B02617|
|2014-08-01 00:00:00.0|40.7623|-73.9751|B02617|
|2014-08-01 00:00:00.0|40.6982|-73.9669|B02617|
|2014-08-01 00:00:00.0|40.7553|-73.9253|B02617|
|2014-08-01 00:00:00.0|40.7325|-73.9876|B02682|
|2014-08-01 00:00:00.0|40.6754|-74.017 |B02682|
|2014-08-01 00:00:00.0|40.7303|-74.0029|B02682|
|2014-08-01 00:00:00.0|40.7218|-73.9973|B02682|
|2014-08-01 00:00:00.0|40.7134|-74.0091|B02682|
|2014-08-01 00:00:00.0|40.7194|-73.9964|B02682|
+---------------------+-------+--------+------+
only showing top 20 rows
Python 是一种高级程序设计语言,它被广泛用于软件开发、数据分析和人工智能等领域。
如果要编写Python 3的Notebook,请从右上角的”New”按钮中选择Python 3。在一个合适的位置将以下代码分割成单元格并执行。与Scala不同的是,额外的Jar文件应该在PYSPARK_SUBMIT_ARGS环境变量中指定。
以下是使用Python编写Spark应用程序的方法,几乎与Scala相同。
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.3 pyspark-shell'
from pyspark.sql import SparkSession
spark = (
SparkSession.builder
.getOrCreate()
)
sc = spark.sparkContext
sc._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc._jsc.hadoopConfiguration().set("fs.s3a.fast.upload", "true")
from pyspark.sql.types import *
from pyspark.sql.functions import *
schema = StructType([
StructField("dt", TimestampType(), True),
StructField("lat", DoubleType(), True),
StructField("lon", DoubleType(), True),
StructField("base", StringType(), True)
])
df = (
spark.read
.option("header", False)
.schema(schema)
.csv("s3a://<バケット名>/uber-csv/uber.csv")
)
df.printSchema()
df.cache()
df.show(truncate=False)
最后一行df.show(truncate=False)的输出结果与之前的Scala代码相同。
+---------------------+-------+--------+------+
|dt |lat |lon |base |
+---------------------+-------+--------+------+
|2014-08-01 00:00:00.0|40.729 |-73.9422|B02598|
|2014-08-01 00:00:00.0|40.7476|-73.9871|B02598|
|2014-08-01 00:00:00.0|40.7424|-74.0044|B02598|
|2014-08-01 00:00:00.0|40.751 |-73.9869|B02598|
|2014-08-01 00:00:00.0|40.7406|-73.9902|B02598|
|2014-08-01 00:00:00.0|40.6994|-73.9591|B02617|
|2014-08-01 00:00:00.0|40.6917|-73.9398|B02617|
|2014-08-01 00:00:00.0|40.7063|-73.9223|B02617|
|2014-08-01 00:00:00.0|40.6759|-74.0168|B02617|
|2014-08-01 00:00:00.0|40.7617|-73.9847|B02617|
|2014-08-01 00:00:00.0|40.6969|-73.9064|B02617|
|2014-08-01 00:00:00.0|40.7623|-73.9751|B02617|
|2014-08-01 00:00:00.0|40.6982|-73.9669|B02617|
|2014-08-01 00:00:00.0|40.7553|-73.9253|B02617|
|2014-08-01 00:00:00.0|40.7325|-73.9876|B02682|
|2014-08-01 00:00:00.0|40.6754|-74.017 |B02682|
|2014-08-01 00:00:00.0|40.7303|-74.0029|B02682|
|2014-08-01 00:00:00.0|40.7218|-73.9973|B02682|
|2014-08-01 00:00:00.0|40.7134|-74.0091|B02682|
|2014-08-01 00:00:00.0|40.7194|-73.9964|B02682|
+---------------------+-------+--------+------+
only showing top 20 rows