根据Learning Spark
请记住,重新划分数据是一项相当昂贵的操作。 Spark还有一个repartition()的优化版本,称为coalesce(),它允许避免数据移动,但仅当您正在减少RDD分区的数量时。
我得到的一个区别是,使用repartition()可以增加/减少分区的数量,但使用coalesce()只能减少分区的数量。
如果分区分布在多台机器上,并且运行了coalesce(),它如何避免数据移动?
根据Learning Spark
请记住,重新划分数据是一项相当昂贵的操作。 Spark还有一个repartition()的优化版本,称为coalesce(),它允许避免数据移动,但仅当您正在减少RDD分区的数量时。
我得到的一个区别是,使用repartition()可以增加/减少分区的数量,但使用coalesce()只能减少分区的数量。
如果分区分布在多台机器上,并且运行了coalesce(),它如何避免数据移动?
当前回答
重分区算法对数据进行完全洗牌,并创建大小相等的数据分区。Coalesce结合现有分区以避免完全洗牌。
Coalesce可以很好地使用一个具有大量分区的RDD,并将单个工作节点上的分区组合在一起,以生成一个具有较少分区的最终RDD。
重新分区将重新洗牌RDD中的数据,以产生您请求的最终分区数量。 DataFrames的分区看起来像是一个应该由框架管理的低级实现细节,但事实并非如此。当将大的dataframe过滤成小的dataframe时,你应该总是对数据进行重新分区。 你可能会经常把大的数据帧过滤成小的数据帧,所以要习惯重新分区。
如果你想了解更多细节,请阅读这篇博客文章。
其他回答
贾斯汀的回答很棒,这个回答更有深度。
重分区算法进行完全洗牌,并使用均匀分布的数据创建新分区。让我们用1到12的数字创建一个DataFrame。
val x = (1 to 12).toList
val numbersDf = x.toDF("number")
numbersDf在我的机器上包含4个分区。
numbersDf.rdd.partitions.size // => 4
下面是数据在分区上的划分方式:
Partition 00000: 1, 2, 3
Partition 00001: 4, 5, 6
Partition 00002: 7, 8, 9
Partition 00003: 10, 11, 12
让我们使用重分区方法进行一次完全洗牌,并在两个节点上获得这些数据。
val numbersDfR = numbersDf.repartition(2)
下面是如何在我的机器上划分numbersDfR数据:
Partition A: 1, 3, 4, 6, 7, 9, 10, 12
Partition B: 2, 5, 8, 11
重分区方法创建新分区,并在新分区中均匀分布数据(对于较大的数据集,数据分布更均匀)。
合并和重新划分的区别
Coalesce使用现有分区来最小化打乱的数据量。重新分区创建新分区并进行完全洗牌。合并的结果是产生具有不同数据量的分区(有时分区的大小相差很大),而重新分区的结果是产生大小大致相同的分区。
合并和重新分区哪个更快?
联合可能比重新分区运行得快,但大小不等的分区通常比大小相等的分区运行得慢。在过滤了一个大型数据集之后,通常需要对数据集重新分区。我发现重新分区总体上更快,因为Spark是为处理相同大小的分区而构建的。
注意:我很好奇地发现重新分区会增加磁盘上数据的大小。在对大型数据集使用重分区/合并时,请确保运行测试。
如果你想了解更多细节,请阅读这篇博客文章。
当你在实践中使用合并和重分区
See this question on how to use coalesce & repartition to write out a DataFrame to a single file It's critical to repartition after running filtering queries. The number of partitions does not change after filtering, so if you don't repartition, you'll have way too many memory partitions (the more the filter reduces the dataset size, the bigger the problem). Watch out for the empty partition problem. partitionBy is used to write out data in partitions on disk. You'll need to use repartition / coalesce to partition your data in memory properly before using partitionBy.
我想在贾斯汀和鲍尔的回答中补充一点——
重新分区将忽略现有分区并创建新分区。所以你可以用它来修复数据倾斜。您可以使用分区键来定义分布。数据倾斜是“大数据”问题空间中最大的问题之一。
Coalesce将使用现有分区并对其中的一个子集进行洗牌。它不能像重新分区那样修复数据倾斜。因此,即使它更便宜,它也可能不是你需要的东西。
另一个不同之处是考虑到存在倾斜连接的情况,您必须在其之上进行合并。在大多数情况下,重新分区将解决倾斜连接,然后您可以进行合并。
另一种情况是,假设你在一个数据帧中保存了一个中等/大量的数据,你必须批量生成到Kafka。在某些情况下,在生成到Kafka之前,重新分区有助于collectasList。但是,当容量非常大时,重新分区可能会导致严重的性能影响。在这种情况下,直接从dataframe生成Kafka会有所帮助。
附注:Coalesce并不像在工作人员之间进行完整的数据移动那样避免数据移动。但它确实减少了洗牌的次数。我想这就是那本书的意思。
Coalesce使用现有分区来最小化数据量 被打乱。重新分区将创建新的分区并执行满分区 洗牌。 合并会产生具有不同数据量的分区 (有时分区有许多不同的大小)和 重新分区会产生大小大致相同的分区。 合并可以减少分区,但修复可以用来增加或减少分区。
用一种简单的方式 COALESCE:-仅用于减少分区数量,没有数据变换,它只是压缩分区
REPARTITION:-用于增加和减少分区的数量,但会发生洗牌
例子:-
val rdd = sc.textFile("path",7)
rdd.repartition(10)
rdd.repartition(2)
两者都很好
但是当我们需要在一个集群中看到输出时,我们通常会选择这两个。