使用Python连接SensorTag、Kafka和Spark Streaming进行流处理 – 第5部分:通过Apache Toree在Jupyter上连接Spark

我准备配置一个Spark集群,并编写一些示例代码。我认为很多人都将Jupyter作为Python数据分析和机器学习的执行环境。使用Apache Toree,我们可以在Jupyter中以交互的方式编写Spark应用程序。您甚至可以使用Scala的REPL(交互式编程环境)直接在浏览器上运行,并在Jupyter中使用它。

闪耀

在Docker Compose中建立Spark集群。Docker Hub和GitHub上已经发布了许多用于Spark独立集群的镜像和docker-compose.yml文件。

    • semantive/spark

 

    • produktion/jupyter-pyspark

 

    gettyimages/docker-spark

我尝试了几个选项,但是semantive/spark给我留下了简单易用的印象。

Docker Compose:

关于semantive/spark 이미지的使用方法,请查阅Docker Images For Apache Spark。Docker Hub在这里,GitHub在这里。

从存储库中的docker-compose.yml文件中进行了一些更改。主要更改包括明确指定镜像标签以使Spark版本匹配,并指定SPARK_PUBLIC_DNS和SPARK_MASTER_HOST环境变量为云上虚拟机的公共IP地址。

version: '2'
services:
  master:
    image: semantive/spark:spark-2.1.1-hadoop-2.7.3
    command: bin/spark-class org.apache.spark.deploy.master.Master -h master
    hostname: master
    environment:
      MASTER: spark://master:7077
      SPARK_CONF_DIR: /conf
      SPARK_PUBLIC_DNS: <仮想マシンのパブリックIPアドレス>
      SPARK_MASTER_HOST: <仮想マシンのパブリックIPアドレス>
    ports:
      - 4040:4040
      - 6066:6066
      - 7077:7077
      - 8080:8080
    volumes:
      - spark_data:/tmp/data

  worker1:
    image: semantive/spark:spark-2.1.1-hadoop-2.7.3
    command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://master:7077
    hostname: worker1
    environment:
      SPARK_CONF_DIR: /conf
      SPARK_WORKER_CORES: 4
      SPARK_WORKER_MEMORY: 2g
      SPARK_WORKER_PORT: 8881
      SPARK_WORKER_WEBUI_PORT: 8081
      SPARK_PUBLIC_DNS: <仮想マシンのパブリックIPアドレス>
    depends_on:
      - master
    ports:
      - 8081:8081
    volumes:
      - spark_data:/tmp/data

  worker2:
    image: semantive/spark:spark-2.1.1-hadoop-2.7.3
    command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://master:7077
    hostname: worker2
    environment:
      SPARK_CONF_DIR: /conf
      SPARK_WORKER_CORES: 4
      SPARK_WORKER_MEMORY: 2g
      SPARK_WORKER_PORT: 8882
      SPARK_WORKER_WEBUI_PORT: 8082
      SPARK_PUBLIC_DNS: <仮想マシンのパブリックIPアドレス>
    depends_on:
      - master
    ports:
      - 8082:8082
    volumes:
      - spark_data:/tmp/data

volumes:
  spark_data:
    driver: local

启动Spark独立集群。

$ docker-compose up -d

打开Spark Master UI,查看集群的状态。

http://<仮想マシンのパブリックIPアドレス>:8080

执行Master容器中的spark-shell,检查Scala和Spark的版本。Spark的开发速度非常快,需要经常检查Scala版本,以免遇到意想不到的错误。

    • Scala: 2.11.8

 

    Spark: 2.1.1
$ docker-compose exec master spark-shell
...
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.1
      /_/

Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_131)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

朱庇特

我们将使用官方的jupyter/all-spark-notebook Docker镜像。这是一个包含Scala和Spark的一站式镜像。

阿帕奇托丽

Apache Toree是连接Jupyter到Spark集群的工具。除了PySpark外,还提供了Scala、SparkR和SQL的内核。

当查看Dockerfile时,发现安装了Apache Toree。

# Apache Toree kernel
RUN pip --no-cache-dir install https://dist.apache.org/repos/dist/dev/incubator/toree/0.2.0/snapshots/dev1/toree-pip/toree-0.2.0.dev1.tar.gz
RUN jupyter toree install --sys-prefix

docker-compose.yml文件

将Jupyter服务添加到Spark Standalone集群的docker-compose.yml文件中。

  jupyter:
    image: jupyter/all-spark-notebook:c1b0cf6bf4d6
    depends_on:
      - master
    ports:
      - 8888:8888
    volumes:
      - ./notebooks:/home/jovyan/work
      - ./ivy2:/home/jovyan/.ivy2
    env_file:
      - ./.env
    environment:
      TINI_SUBREAPER: 'true'
      SPARK_OPTS: --master spark://master:7077 --deploy-mode client --packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.3
    command: start-notebook.sh --NotebookApp.password=sha1:xxx --NotebookApp.iopub_data_rate_limit=10000000

关于Jupyter服务的选项

在Spark Standalone集群中,由于没有使用Hadoop,因此我们添加了配置以使用Amazon S3作为分布式文件系统。这对于保存示例数据和Parquet文件非常方便。

图片 (tú

Jupyter / all-spark-notebook图像经常更新。 使用Apache Toree的Spark和Spark集群的版本出错并停止启动。 由于本次Spark集群的版本是2.1.1,因此将指定相同版本的图像标签。 对于jupyter / all-spark-notebook图像,只知道ID而不知道标签是不方便的。

由于Spark版本已经升级到了2.2.0,所以我们会使用2.1.1的标签。
我们将拉取标签为Docker镜像并在spark-shell中进行确认。

$ docker pull jupyter/all-spark-notebook:c1b0cf6bf4d6
$ docker run -it --rm \
  jupyter/all-spark-notebook:c1b0cf6bf4d6 \
  /usr/local/spark-2.1.1-bin-hadoop2.7/bin/spark-shell

确认了Spark集群、Spark和Scala的版本相同。

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.1
      /_/

Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_131)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

我也会检查Jupyter的版本。

$ docker run -it --rm jupyter/all-spark-notebook:c1b0cf6bf4d6 jupyter --version
4.3.0

TINI_SUBREAPER 和 SPARK_OPTS

连接到远程Spark所需的两个必要配置是使用Apache Toree从Jupyter中。TINI_SUBREAPER环境变量将在init中使用Tini。

如果在Spark中不使用额外的Jar文件,只需在SPARK_OPTS环境变量中指定以下内容即可连接到远程的Spark Standalone集群。这与通常的spark-submit选项相同。

--master spark://master:7077 --deploy-mode client

如果有额外的Jar文件,则需要添加–packages标志。这是连接到亚马逊S3所需的包。

--packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.3

— 笔记本应用程序的iopub数据速率限制

使用Bokeh等可视化工具处理大型图像时,需要在Jupyter的启动脚本中指定选项。

    IOPub data rate exceeded when viewing image in Jupyter notebook

–笔记本应用程序密码

Jupyter的身份验证方法默认是令牌。如果频繁地启动和丢弃像Docker容器一样的对象,每次都要输入不同的令牌是很麻烦的,因此我们将其更改为密码验证。我们使用ipython来获取密码的哈希值。

$ docker run -it --rm jupyter/all-spark-notebook:c1b0cf6bf4d6 ipython
Python 3.6.1 | packaged by conda-forge | (default, May 23 2017, 14:16:20)
Type 'copyright', 'credits' or 'license' for more information
IPython 6.1.0 -- An enhanced Interactive Python. Type '?' for help.

生成密码的方式如下。将生成的哈希值作为Jupyter启动选项的指定值。

In [1]: from notebook.auth import passwd
In [2]: passwd()

Enter password:
Verify password:
Out[2]: 'sha1:xxx'

数量

/home/jovyan是运行Jupyter容器的用户的主目录。 备忘录和下载的Jar文件将被挂载到Docker主机上。

环境文件

将环境变量写入.env文件并传递给容器。请指定用于连接亚马逊S3的访问密钥和秘密密钥。

AWS_ACCESS_KEY_ID=xxx
AWS_SECRET_ACCESS_KEY=xxx

不要忘记向.gitignore文件中添加,以避免提交到Git中。

.env

使用 Jupyter 来操作 Spark 和 Amazon S3。

我打算用Scala和Python在Jupyter中编写一个使用Spark和Amazon S3的示例。我们将使用文章《使用Apache APIs监控实时Uber数据,第一部分:Spark机器学习》中使用的Uber接送数据作为示例。这里只是简单地从S3读取CSV文件并显示。

启动docker-compose.yml中定义的所有服务。

$ docker-compose up -d

使用浏览器打开Jupyter并使用刚刚创建的密码登录。

http://<仮想マシンのパブリックIPアドレス>:8888

数据准备

在克隆存储库后,使用s3cmd将uber.csv文件放入任意的存储桶中。

$ git clone https://github.com/caroljmcdonald/spark-ml-kmeans-uber
$ cd spark-ml-kmeans-uber/data
$ s3cmd put uber.csv s3://<バケット名>/uber-csv/

Scala (斯卡拉) is a programming language.

您可以将以下代码划分为单元格,并以交互方式执行以确认。如果您要编写Scala笔记本,请从右上方的新建按钮中选择Apache Toree – Scala。

import org.apache.spark.sql.SparkSession

val spark = SparkSession.
    builder.
    getOrCreate()

sc.hadoopConfiguration.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc.hadoopConfiguration.set("fs.s3a.fast.upload", "true")

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

val schema = StructType(
    StructField("dt", TimestampType, true) ::
    StructField("lat", DoubleType, true) ::
    StructField("lon", DoubleType, true) ::
    StructField("base", StringType, true) :: Nil
)

val df = 
    spark.read.
    option("header", false).
    schema(schema).
    csv("s3a://<バケット名>/uber-csv/uber.csv")

df.printSchema

df.cache
df.show(false)

在Scala中,可以这样写StructType的模式。

val schema = (new StructType).
    add("dt", "timestamp", true).
    add("lat", "double", true).
    add("lon", "double", true).
    add("base", "string", true)

这是最后的df.show(false)的输出结果。

+---------------------+-------+--------+------+
|dt                   |lat    |lon     |base  |
+---------------------+-------+--------+------+
|2014-08-01 00:00:00.0|40.729 |-73.9422|B02598|
|2014-08-01 00:00:00.0|40.7476|-73.9871|B02598|
|2014-08-01 00:00:00.0|40.7424|-74.0044|B02598|
|2014-08-01 00:00:00.0|40.751 |-73.9869|B02598|
|2014-08-01 00:00:00.0|40.7406|-73.9902|B02598|
|2014-08-01 00:00:00.0|40.6994|-73.9591|B02617|
|2014-08-01 00:00:00.0|40.6917|-73.9398|B02617|
|2014-08-01 00:00:00.0|40.7063|-73.9223|B02617|
|2014-08-01 00:00:00.0|40.6759|-74.0168|B02617|
|2014-08-01 00:00:00.0|40.7617|-73.9847|B02617|
|2014-08-01 00:00:00.0|40.6969|-73.9064|B02617|
|2014-08-01 00:00:00.0|40.7623|-73.9751|B02617|
|2014-08-01 00:00:00.0|40.6982|-73.9669|B02617|
|2014-08-01 00:00:00.0|40.7553|-73.9253|B02617|
|2014-08-01 00:00:00.0|40.7325|-73.9876|B02682|
|2014-08-01 00:00:00.0|40.6754|-74.017 |B02682|
|2014-08-01 00:00:00.0|40.7303|-74.0029|B02682|
|2014-08-01 00:00:00.0|40.7218|-73.9973|B02682|
|2014-08-01 00:00:00.0|40.7134|-74.0091|B02682|
|2014-08-01 00:00:00.0|40.7194|-73.9964|B02682|
+---------------------+-------+--------+------+
only showing top 20 rows

Python 是一种高级程序设计语言,它被广泛用于软件开发、数据分析和人工智能等领域。

如果要编写Python 3的Notebook,请从右上角的”New”按钮中选择Python 3。在一个合适的位置将以下代码分割成单元格并执行。与Scala不同的是,额外的Jar文件应该在PYSPARK_SUBMIT_ARGS环境变量中指定。

以下是使用Python编写Spark应用程序的方法,几乎与Scala相同。

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.3 pyspark-shell'

from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
        .getOrCreate()
)

sc = spark.sparkContext

sc._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc._jsc.hadoopConfiguration().set("fs.s3a.fast.upload", "true")

from pyspark.sql.types import *
from pyspark.sql.functions import *

schema = StructType([
    StructField("dt", TimestampType(), True),
    StructField("lat", DoubleType(), True),
    StructField("lon", DoubleType(), True),
    StructField("base", StringType(), True)
])

df = (
    spark.read
    .option("header", False)
    .schema(schema)
    .csv("s3a://<バケット名>/uber-csv/uber.csv")
)

df.printSchema()

df.cache()
df.show(truncate=False)

最后一行df.show(truncate=False)的输出结果与之前的Scala代码相同。

+---------------------+-------+--------+------+
|dt                   |lat    |lon     |base  |
+---------------------+-------+--------+------+
|2014-08-01 00:00:00.0|40.729 |-73.9422|B02598|
|2014-08-01 00:00:00.0|40.7476|-73.9871|B02598|
|2014-08-01 00:00:00.0|40.7424|-74.0044|B02598|
|2014-08-01 00:00:00.0|40.751 |-73.9869|B02598|
|2014-08-01 00:00:00.0|40.7406|-73.9902|B02598|
|2014-08-01 00:00:00.0|40.6994|-73.9591|B02617|
|2014-08-01 00:00:00.0|40.6917|-73.9398|B02617|
|2014-08-01 00:00:00.0|40.7063|-73.9223|B02617|
|2014-08-01 00:00:00.0|40.6759|-74.0168|B02617|
|2014-08-01 00:00:00.0|40.7617|-73.9847|B02617|
|2014-08-01 00:00:00.0|40.6969|-73.9064|B02617|
|2014-08-01 00:00:00.0|40.7623|-73.9751|B02617|
|2014-08-01 00:00:00.0|40.6982|-73.9669|B02617|
|2014-08-01 00:00:00.0|40.7553|-73.9253|B02617|
|2014-08-01 00:00:00.0|40.7325|-73.9876|B02682|
|2014-08-01 00:00:00.0|40.6754|-74.017 |B02682|
|2014-08-01 00:00:00.0|40.7303|-74.0029|B02682|
|2014-08-01 00:00:00.0|40.7218|-73.9973|B02682|
|2014-08-01 00:00:00.0|40.7134|-74.0091|B02682|
|2014-08-01 00:00:00.0|40.7194|-73.9964|B02682|
+---------------------+-------+--------+------+
only showing top 20 rows
bannerAds