Spark的分区

Spark分区 – 博客 | luminousmen翻译。

這本書是摘譯版本,無法保證其準確性。請參考原文以獲得正確的內容。

Spark是一个用于在集群中并行处理数据的引擎。通过Apache Spark的并行性,开发者可以在数百台机器的集群上并行且独立地执行任务。这全部都是基于Apache Spark的RDD基本概念实现的。

在内部,这些RDD以不同的集群节点分区的形式存储。分区基本上是大规模分布式数据集的逻辑块。这使得工作在集群之间分配,并将任务分解为更小的部分,从而减少每个节点的内存需求。分区是Apache Spark中并行性的主要单位。

我们来谈谈关于分区的事情。

spark-partitions-2.jpeg

即使在意识不到分区的边界时仍会使用分区,但通过操作它们可以加快数据处理速度。

让我们来看一下内部发生了什么。

import pandas as pd
import numpy as np

length = 100
names = np.random.choice(['Bob', 'James', 'Marek', 'Johannes', None], length)
amounts = np.random.randint(0, 1000000, length)
country = np.random.choice(
	['United Kingdom', 'Poland', 'USA', 'Germany', None], 
	length
)
df = pd.DataFrame({'name': names, 'amount': amounts, 'country': country})

transactions = spark.createDataFrame(df)
print('Number of partitions: {}'.format(transactions.rdd.getNumPartitions()))
print('Partitioner: {}'.format(transactions.rdd.partitioner))
print('Partitions structure: {}'.format(transactions.rdd.glom().collect()))

当查看分区结构时,可以看到我们的数据被分成了4个分区(因为我的笔记本有4个核心,并且Spark以独立模式创建了4个执行器)。当我们对该数据帧应用转换处理时,对于每个分区的处理将在单独的线程中进行(在我的情况下,这将是每个处理器核心)。

然而,为什么要考虑这件事呢?

最重要的原因是性能。通过拥有在单一节点上进行计算所需的所有数据,可以减少洗牌(需要序列化和网络流量)的开销。

第二个原因是成本削减。通过提高集群利用率,有助于减少闲置资源。

为了有效解决这些问题,需要建立一个管理分区的机制。

重新划分分区

管理分区的第一种方法是重新分区操作。

再分区是一个操作,用于增加或减少在集群中被分割的数据的分区数。这个过程包括全局洗牌。结果是,再分区是一个高成本的过程。在典型场景中,大部分数据应该被序列化、移动和反序列化。

例如,

repartitioned = transactions.repartition(8)
print('Number of partitions: {}'.format(repartitidoned.rdd.getNumPartitions()))
print('Partitions structure: {}'.format(repartitioned.rdd.glom().collect()))

可以看到分区数量增加到了8个,并且数据已根据每个分区重新调整的。

除了可以指定直接分区的数量,还可以指定要对数据进行分区的列名。

比如说,

repartitioned = transactions.repartition('country')
print('Number of partitions: {}'.format(repartitidoned.rdd.getNumPartitions()))
print('Partitions structure: {}'.format(repartitioned.rdd.glom().collect()))

有200个分区,其中许多分区都完全为空。本书将讨论这一点。

只有在明确理解如何加速使用您的Spark作业,并且这个操作在大多数情况下并不那么有用时,才有意义利用这种方法。

融合

管理分区的第二种方法是使用 coalesce。

这个操作可以减少分区的数量,避免进行完全洗牌。执行者可以安全地将数据移动到最少数量的分区,只从冗余节点移动数据。因此,在需要减少分区数量的情况下,应该使用coalesce而不是repartition。

然而,我们应该理解可以大大减少数据处理的并行性。在许多情况下,coalesce会在转换链中进一步推动,并可能变成比预期少的节点。为了避免这种情况,您可以指定shuffle = true。这会添加一步洗牌,但如果可能,重新洗牌的分区将使用所有的集群资源。

coalesced = transactions.coalesce(2)
print('Number of partitions: {}'.format(coalesced.rdd.getNumPartitions()))
print('Partitions structure: {}'.format(coalesced.rdd.glom().collect()))

在我们的示例中,我们可以遍历剩余的分区,确认哪些数据被移动,哪些数据仍然留在原位置上。

分区的层次

我想強調Spark分區的不同數據處理層級的主題。這篇文章中,我只想討論這些層級中的一個,但下一篇文章將跟進具體的提示。

在物理层面上,存在着三个关键阶段, 即输入、洗牌和输出,这些阶段中分区的数量是很重要的。

这些都可以用不同的方法来调整和管理。例如,在输入和输出方面,您可以通过控制分区的大小来控制 输出方面,您还可以通过执行coalesce或repartition来控制文件数量和任务数量。在洗牌方面,您可以控制移动数据的网络数据量。

在输入阶段的分区

让我们从根据输入数据集的大小来确定分区数量开始吧。

如果可以将数据分割,Spark可以非常有效地处理输入数据。在HDFS或Cassandra等数据存储中,数据的布局方式以及Spark在读取时如何分割数据具有明显的相似之处。

想象一下,假设我们有一个由未压缩的文本文件组成的输入数据,存储在分布式文件系统HDFS中的十个节点上,大约为30GB(约30000MB)。

Spark将从HDFS中读取数据,并为单个输入分区创建一个分区。输入分区是通过Hadoop的InputFormat来设置用于读取文件的。存在一个30GB未压缩文本文件存储在HDFS中,根据默认的HDFS块大小设置(128MB)和默认的spark.files.maxPartitionBytes(128MB),该文件将被存储为240个块。因此,从该文件读取的数据框将有240个分区。

这与Spark的默认并行度(spark.default.parallelism)的值相匹配。

如果无法分割您的数据,则Spark将使用默认的分区数。在作业启动时,分区的数量将等于所有执行节点的总核心数。

洗牌分割

spark-partitions-3.jpeg

在Spark管道中,最令人痛苦的地方是需要其他分区的信息并触发shuffle的宽转换处理。不幸的是,虽然可以消除这种转换处理,但可以减轻对性能的shuffle影响。

Shuffle partition是在广泛的转换处理中用于数据洗牌的一种分区方式。然而,在广泛的转换处理中,shuffle partition的数量被设置为200。无论您的数据是大还是小,您的集群配置是20个executor与否,并不是问题,仍然是200。是的,是的,这是在repartition部分看到的,并且这是一个神秘的数字。

因此,在Shuffle中控制并行性的参数是spark.sql.shuffle.partitions。默认值是200的原因是,通过现实世界的经验发现,这是一个非常好的默认值。然而,实际上,这个值并不总是好的。

spark-partitions-4.jpeg

当一个非常小的分区中存在大量数据时,执行器处理的任务数量会减少,每个执行器的负荷会增加,从而可能导致内存错误。另外,如果将分区大小设置为超过执行器可用内存的大小,会导致溢出(spill)到磁盘。溢出通常是最慢的操作之一。基本上,在执行磁盘溢出期间,如果Spark操作无法放入内存,则会将部分数据写入磁盘,以使Spark作业能够处理任意大小的数据。虽然这不会破坏您的流水线,但由于磁盘I/O和垃圾回收的额外开销,效率将非常低下。

因此,spark.sql.shuffle.partitions成为在使用Spark时最常设置的参数之一。

功率分配

通过以适当选择的条件对数据进行分区并保存,可以极大地加快后续处理流程中所需数据的读取和获取速度。

最初,在一些情况下,可以在数据源的分区搜索之后使用分区剪切来限制Spark在查询时读取的文件和分区的数量。有时候甚至可以避免不必要的分区搜索(例如AWS S3)。在Spark 3.0中,动态分区剪切的概念也是很重要的。然而,有时这些优化措施可能会导致情况变得更糟。例如,为了获取理解分区所需的元数据,可能需要对文件系统进行递归扫描,这可能会花费很长时间(特别是当分区数量很多时)。此外,所有表的元数据都必须在驱动程序进程的内存中实例化,这将导致内存使用量非常大。

首先,保存数据框到磁盘时,请特别注意分区的大小。在写入时,Spark会为每个任务生成一个文件(每个分区一个文件),在读取时,它会读取至少一个文件的任务。问题是,保存的数据框在配置了更多内存的集群中可以处理大分区大小而没有任何问题,但当尝试在小型集群中加载保存的数据框时,可能会出现问题。

假设有一个用于大规模前处理的集群,并且还有一个用于小规模且更高效的服务的集群。在这种情况下,解决方案是在写入数据帧之前,将其重新分区到更多的分区,以确保后续的集群不会过载。

总结

    • パーティションの数を増やす際には、repartition()(フルシャッフルを実行します)を使います。

 

    パーティションの数を減らす際には、coalesce()(シャッフルを最小化します)を使います。

今天,我們談論了物理層面的分區。下一篇文章將深入介紹更高層次的分區調整方法。

推荐的书籍

    • Spark: The Definitive Guide

 

    Learning Spark: Lightning-Fast Data Analytics
广告
将在 10 秒后关闭
bannerAds