2022年1月24日,让我们在Windows环境下使用PySpark[修正版]

(2022.1.24修正)
为了使Spark在Windows上运行,对winutils进行了修正。
为了在Windows环境中运行Spark,常常参考cdarlint/winutils和steveloughran/winutils,但是发现这些二进制文件在Windows11环境下无法运行,因此重新构建了winutils,并在GitHub上进行了公开发布。
下面将详细说明该过程。


0. 什么是PySpark?

PySpark是一个利用Apache Spark进行分布式处理等操作的库,用于分析大容量数据。它是在Python界面下可用的。

由於它可以適應多種數據源並進行靈活的數據處理,因此它非常適合在統一數據處理時使用。

用語 内容 Apache Spark 巨大なデータを高速に処理できる PySpark Apache SparkをPythonから使うことができるようにしたライブラリ

接下来,我们将主要讨论在Windows环境下使用PySpark。

1. Apache Spark (阿帕奇·斯巴克)

1.1. 配置 Apache Spark 的环境

请按照winutils的README.md文件来配置环境。

1.2. 確認動作

如果上述操作完成了,我们将尝试从命令行启动spark-shell。

>spark-shell

2. PySpark: 等同于使用 Python 的 Spark。

接下来,我们要使Python环境能够使用Spark。

2.1. 安装pyspark

在您的Python环境中,添加pyspark。

使用pip安装pyspark。

2.2.设置环境变量2

PYSPARK_PYTHON 的本地歸約

将要使用的Python环境设置为环境变量PYSPARK_PYTHON。

配置 PYSPARK_PYTHON 环境变量为 C:\xxxx\bin\python.exe。

2.3.确认操作

让我们启动 PySpark。

> pyspark

如果能够成功启动,我们就来编写程序吧。

试试PySpark

基本上,我们会创建一个SparkContext对象来操作数据。在这里,我们将尝试操作两种数据格式。

试着使用RDD。

RDD是指RDD(Resilient Distributed Dataset),在Apache Spark编程中,基本上是通过RDD来持有和操作数据的。

在PySpark中,我們創建一個SparkContext對象,通過它來進行各種數據處理。

import pyspark
from pyspark import SparkContext

# SparkContextの作成
sc = SparkContext(appName='spark_sample')

# RDDを作成する。
rdd = sc.parallelize([
    (1, 'Foo'),
    (2, 'Bar'),
    (3, 'Baz'),
])

# RDDにかけるFilter関数(2以上の要素を取り出す)
def filter_func(x):
    n,s = x
    return n >= 2

rdd = rdd.filter(filter_func)
# 結果の表示
print(rdd.collect())

由于RDD提供了诸如filter/map等函数式编程中熟悉的方法,因此可以实现灵活的操作。

尝试使用DataFrame

DataFrame是一种类似于数据库表结构的带有列的表格结构。可以通过为RDD提供数据模式来创建DataFrame。这样就可以实现类似SQL的查询操作。

続いてJSONをDataFrame化してデータ処理を行ってみましょう。

样本.json
样本.json
{“姓名”:”爱丽丝”,”年龄”:20}
{“姓名”:”鲍勃”,”年龄”:25}

さきほどはSparkContextをエントリとして使いましたが、__データセットやデータフレームのAPIを使用する際のエントリポイントは、SparkSessionです。_ SparkSessionは、builderを使用して以下のように作成します。

spark = SparkSession.builder \
    .master("local") \
    .appName("AppName") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

SparkSession作成し、データを処理してみます。JSONから取り込んだ場合、自動的にスキーマも推測されていますので、以下では複数行からなるJSONデータを取り込んでデータフレームを作成し、検索を行ってみます。

Python代码

import pyspark
from pyspark.sql import SparkSession

# SparkSessionの作成
spark = SparkSession.builder \
    .master("local") \
    .appName("JSON SQL") \
    .getOrCreate()

# DataFrameの作成
df = spark.read.json('sample.json')

# JSONを読み込んだのでスキーマを確認
df.printSchema()

# SparkSessionにテーブルとして登録
df.registerTempTable('people')

# Spark SQLによる検索
selected = spark.sql('SELECT * FROM people WHERE name==\"Alice\"')
selected.show()

连接各种不同的数据库

さまざまなデータベースに接続する場合、connectorをconfigに与えることで接続できます。たとえば、さまざまなデータベースの接続が、Spark Packageとして提供されています。

关系型数据库管理系统(MySQL)

首先,我们来尝试连接到关系型数据库管理系统(RDBMS)MySQL。您需要准备MySQL专用的JDBC驱动程序的JAR文件。

以下から、JARファイルをダウンロードして、適当な位置にJARファイルを設置します。
https://jar-download.com/artifacts/mysql/mysql-connector-java

需要使用SparkSession应用此JAR文件。

from pyspark.sql import SparkSession

spark = SparkSession.builder.config("spark.jars", "...jars\\mysql-connector-java-8.0.27.jar") \
.master("local").appName("PySpark_MySQL_test").getOrCreate()

df = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/my_schema") \
.option("driver", "com.mysql.cj.jdbc.Driver").option("dbtable", "shema_name") \
.option("user", "xxxx").option("password", "xxxx").load()

df.show()

MongoDB(非关系型数据库)

同样, 您也可以通过NoSQL数据库MongoDB进行访问。MongoDB驱动程序由Spark Package提供, 您可以按照以下方式进行访问。

MongoDB可以通过以下方式创建SparkSession。

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("myApp") \
.config("spark.mongodb.input.uri", "mongodb://localhost:27017/db_name.collection_name") \
.config("spark.mongodb.output.uri", "mongodb://localhost:27017/db_name.collection_name") \
.config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1') \
.getOrCreate()

df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()
df.show()

bannerAds