2022年1月24日,让我们在Windows环境下使用PySpark[修正版]
(2022.1.24修正)
为了使Spark在Windows上运行,对winutils进行了修正。
为了在Windows环境中运行Spark,常常参考cdarlint/winutils和steveloughran/winutils,但是发现这些二进制文件在Windows11环境下无法运行,因此重新构建了winutils,并在GitHub上进行了公开发布。
下面将详细说明该过程。
0. 什么是PySpark?
PySpark是一个利用Apache Spark进行分布式处理等操作的库,用于分析大容量数据。它是在Python界面下可用的。
由於它可以適應多種數據源並進行靈活的數據處理,因此它非常適合在統一數據處理時使用。
接下来,我们将主要讨论在Windows环境下使用PySpark。
1. Apache Spark (阿帕奇·斯巴克)
1.1. 配置 Apache Spark 的环境
请按照winutils的README.md文件来配置环境。
1.2. 確認動作
如果上述操作完成了,我们将尝试从命令行启动spark-shell。
>spark-shell
2. PySpark: 等同于使用 Python 的 Spark。
接下来,我们要使Python环境能够使用Spark。
2.1. 安装pyspark
在您的Python环境中,添加pyspark。
使用pip安装pyspark。
2.2.设置环境变量2
PYSPARK_PYTHON 的本地歸約
将要使用的Python环境设置为环境变量PYSPARK_PYTHON。
配置 PYSPARK_PYTHON 环境变量为 C:\xxxx\bin\python.exe。
2.3.确认操作
让我们启动 PySpark。
> pyspark
如果能够成功启动,我们就来编写程序吧。
试试PySpark
基本上,我们会创建一个SparkContext对象来操作数据。在这里,我们将尝试操作两种数据格式。
试着使用RDD。
RDD是指RDD(Resilient Distributed Dataset),在Apache Spark编程中,基本上是通过RDD来持有和操作数据的。
在PySpark中,我們創建一個SparkContext對象,通過它來進行各種數據處理。
import pyspark
from pyspark import SparkContext
# SparkContextの作成
sc = SparkContext(appName='spark_sample')
# RDDを作成する。
rdd = sc.parallelize([
(1, 'Foo'),
(2, 'Bar'),
(3, 'Baz'),
])
# RDDにかけるFilter関数(2以上の要素を取り出す)
def filter_func(x):
n,s = x
return n >= 2
rdd = rdd.filter(filter_func)
# 結果の表示
print(rdd.collect())
由于RDD提供了诸如filter/map等函数式编程中熟悉的方法,因此可以实现灵活的操作。
尝试使用DataFrame
DataFrame是一种类似于数据库表结构的带有列的表格结构。可以通过为RDD提供数据模式来创建DataFrame。这样就可以实现类似SQL的查询操作。
続いてJSONをDataFrame化してデータ処理を行ってみましょう。
样本.json
样本.json
{“姓名”:”爱丽丝”,”年龄”:20}
{“姓名”:”鲍勃”,”年龄”:25}
さきほどはSparkContextをエントリとして使いましたが、__データセットやデータフレームのAPIを使用する際のエントリポイントは、SparkSessionです。_ SparkSessionは、builderを使用して以下のように作成します。
spark = SparkSession.builder \
.master("local") \
.appName("AppName") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
SparkSession作成し、データを処理してみます。JSONから取り込んだ場合、自動的にスキーマも推測されていますので、以下では複数行からなるJSONデータを取り込んでデータフレームを作成し、検索を行ってみます。
Python代码
import pyspark
from pyspark.sql import SparkSession
# SparkSessionの作成
spark = SparkSession.builder \
.master("local") \
.appName("JSON SQL") \
.getOrCreate()
# DataFrameの作成
df = spark.read.json('sample.json')
# JSONを読み込んだのでスキーマを確認
df.printSchema()
# SparkSessionにテーブルとして登録
df.registerTempTable('people')
# Spark SQLによる検索
selected = spark.sql('SELECT * FROM people WHERE name==\"Alice\"')
selected.show()
连接各种不同的数据库
さまざまなデータベースに接続する場合、connectorをconfigに与えることで接続できます。たとえば、さまざまなデータベースの接続が、Spark Packageとして提供されています。
关系型数据库管理系统(MySQL)
首先,我们来尝试连接到关系型数据库管理系统(RDBMS)MySQL。您需要准备MySQL专用的JDBC驱动程序的JAR文件。
以下から、JARファイルをダウンロードして、適当な位置にJARファイルを設置します。
https://jar-download.com/artifacts/mysql/mysql-connector-java
需要使用SparkSession应用此JAR文件。
from pyspark.sql import SparkSession
spark = SparkSession.builder.config("spark.jars", "...jars\\mysql-connector-java-8.0.27.jar") \
.master("local").appName("PySpark_MySQL_test").getOrCreate()
df = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/my_schema") \
.option("driver", "com.mysql.cj.jdbc.Driver").option("dbtable", "shema_name") \
.option("user", "xxxx").option("password", "xxxx").load()
df.show()
MongoDB(非关系型数据库)
同样, 您也可以通过NoSQL数据库MongoDB进行访问。MongoDB驱动程序由Spark Package提供, 您可以按照以下方式进行访问。
MongoDB可以通过以下方式创建SparkSession。
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("myApp") \
.config("spark.mongodb.input.uri", "mongodb://localhost:27017/db_name.collection_name") \
.config("spark.mongodb.output.uri", "mongodb://localhost:27017/db_name.collection_name") \
.config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1') \
.getOrCreate()
df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()
df.show()