根据Learning Spark
请记住,重新划分数据是一项相当昂贵的操作。 Spark还有一个repartition()的优化版本,称为coalesce(),它允许避免数据移动,但仅当您正在减少RDD分区的数量时。
我得到的一个区别是,使用repartition()可以增加/减少分区的数量,但使用coalesce()只能减少分区的数量。
如果分区分布在多台机器上,并且运行了coalesce(),它如何避免数据移动?
根据Learning Spark
请记住,重新划分数据是一项相当昂贵的操作。 Spark还有一个repartition()的优化版本,称为coalesce(),它允许避免数据移动,但仅当您正在减少RDD分区的数量时。
我得到的一个区别是,使用repartition()可以增加/减少分区的数量,但使用coalesce()只能减少分区的数量。
如果分区分布在多台机器上,并且运行了coalesce(),它如何避免数据移动?
当前回答
所有的答案都为这个经常被问到的问题增添了一些伟大的知识。
所以根据这个问题的传统时间轴,这里是我的2美分。
我发现在非常具体的情况下,重新分区比合并更快。
在我的应用程序中,当我们估计的文件数量低于某个阈值时,重新分区工作得更快。
这就是我的意思
if(numFiles > 20)
df.coalesce(numFiles).write.mode(SaveMode.Overwrite).parquet(dest)
else
df.repartition(numFiles).write.mode(SaveMode.Overwrite).parquet(dest)
在上面的代码片段中,如果我的文件小于20,合并将永远无法完成,而重新分区要快得多,因此上面的代码。
当然,这个数字(20)将取决于工作人员的数量和数据量。
希望这能有所帮助。
其他回答
贾斯汀的回答很棒,这个回答更有深度。
重分区算法进行完全洗牌,并使用均匀分布的数据创建新分区。让我们用1到12的数字创建一个DataFrame。
val x = (1 to 12).toList
val numbersDf = x.toDF("number")
numbersDf在我的机器上包含4个分区。
numbersDf.rdd.partitions.size // => 4
下面是数据在分区上的划分方式:
Partition 00000: 1, 2, 3
Partition 00001: 4, 5, 6
Partition 00002: 7, 8, 9
Partition 00003: 10, 11, 12
让我们使用重分区方法进行一次完全洗牌,并在两个节点上获得这些数据。
val numbersDfR = numbersDf.repartition(2)
下面是如何在我的机器上划分numbersDfR数据:
Partition A: 1, 3, 4, 6, 7, 9, 10, 12
Partition B: 2, 5, 8, 11
重分区方法创建新分区,并在新分区中均匀分布数据(对于较大的数据集,数据分布更均匀)。
合并和重新划分的区别
Coalesce使用现有分区来最小化打乱的数据量。重新分区创建新分区并进行完全洗牌。合并的结果是产生具有不同数据量的分区(有时分区的大小相差很大),而重新分区的结果是产生大小大致相同的分区。
合并和重新分区哪个更快?
联合可能比重新分区运行得快,但大小不等的分区通常比大小相等的分区运行得慢。在过滤了一个大型数据集之后,通常需要对数据集重新分区。我发现重新分区总体上更快,因为Spark是为处理相同大小的分区而构建的。
注意:我很好奇地发现重新分区会增加磁盘上数据的大小。在对大型数据集使用重分区/合并时,请确保运行测试。
如果你想了解更多细节,请阅读这篇博客文章。
当你在实践中使用合并和重分区
See this question on how to use coalesce & repartition to write out a DataFrame to a single file It's critical to repartition after running filtering queries. The number of partitions does not change after filtering, so if you don't repartition, you'll have way too many memory partitions (the more the filter reduces the dataset size, the bigger the problem). Watch out for the empty partition problem. partitionBy is used to write out data in partitions on disk. You'll need to use repartition / coalesce to partition your data in memory properly before using partitionBy.
从代码和代码文档中可以看出,coalesce(n)与coalesce(n, shuffle = false)相同,而repartition(n)与coalesce(n, shuffle = true)相同。
因此,合并和重新分区都可以用来增加分区的数量
使用shuffle = true,实际上可以合并为更大的数字 的分区。如果你有少量的分区,这很有用, 比如100,可能有几个分区异常大。
另一个需要强调的重要注意事项是,如果您大幅减少分区数量,则应该考虑使用合并的打乱版本(在这种情况下与重新分区相同)。这将允许您的计算在父分区上并行执行(多个任务)。
然而,如果你正在做一个激烈的合并,例如numPartitions = 1,这可能会导致你的计算发生在比你想要的更少的节点上(例如,numPartitions = 1的情况下只有一个节点)。为了避免这种情况,你可以传递shuffle = true。这将添加一个shuffle步骤,但意味着当前的上游分区将并行执行(无论当前分区是什么)。
相关答案也请参考此处
合并比重新分区执行得更好。合并总是减少分区。假设你在yarn中启用动态分配,你有四个分区和执行器。如果过滤器应用于它,超过可能的一个或多个执行程序是空的,没有数据。这个问题可以通过合并而不是重新划分来解决。
用一种简单的方式 COALESCE:-仅用于减少分区数量,没有数据变换,它只是压缩分区
REPARTITION:-用于增加和减少分区的数量,但会发生洗牌
例子:-
val rdd = sc.textFile("path",7)
rdd.repartition(10)
rdd.repartition(2)
两者都很好
但是当我们需要在一个集群中看到输出时,我们通常会选择这两个。
这里需要注意的一点是,Spark RDD的基本原则是不变性。重新分区或合并将创建新的RDD。基本RDD将继续存在其原始分区数量。如果用例要求将RDD持久化在缓存中,则必须对新创建的RDD进行同样的操作。
scala> pairMrkt.repartition(10)
res16: org.apache.spark.rdd.RDD[(String, Array[String])] =MapPartitionsRDD[11] at repartition at <console>:26
scala> res16.partitions.length
res17: Int = 10
scala> pairMrkt.partitions.length
res20: Int = 2