根据Learning Spark
请记住,重新划分数据是一项相当昂贵的操作。 Spark还有一个repartition()的优化版本,称为coalesce(),它允许避免数据移动,但仅当您正在减少RDD分区的数量时。
我得到的一个区别是,使用repartition()可以增加/减少分区的数量,但使用coalesce()只能减少分区的数量。
如果分区分布在多台机器上,并且运行了coalesce(),它如何避免数据移动?
根据Learning Spark
请记住,重新划分数据是一项相当昂贵的操作。 Spark还有一个repartition()的优化版本,称为coalesce(),它允许避免数据移动,但仅当您正在减少RDD分区的数量时。
我得到的一个区别是,使用repartition()可以增加/减少分区的数量,但使用coalesce()只能减少分区的数量。
如果分区分布在多台机器上,并且运行了coalesce(),它如何避免数据移动?
当前回答
合并比重新分区执行得更好。合并总是减少分区。假设你在yarn中启用动态分配,你有四个分区和执行器。如果过滤器应用于它,超过可能的一个或多个执行程序是空的,没有数据。这个问题可以通过合并而不是重新划分来解决。
其他回答
以下是代码级别的一些额外细节/差异:
在这里只添加函数定义,完整的代码实现检查spark的github页面。
下面是在数据帧上重新分区的不同方法: 点击这里查看完整实现。
def repartition(numPartitions: Int): Dataset[T]
每当我们在dataframe上调用上述方法时,它都会返回一个新的数据集,该数据集恰好有numPartitions分区。
def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T]
上述方法返回一个新的数据集,该数据集由给定的分区表达式划分为numPartitions。生成的数据集是哈希分区的。
def repartition(partitionExprs: Column*): Dataset[T]
上面的方法返回一个新的数据集,由给定的分区表达式划分,使用spark.sql.shuffle.partitions作为分区数。生成的数据集是哈希分区的。
def repartitionByRange(numPartitions: Int, partitionExprs: Column*): Dataset[T]
上述方法返回一个新的数据集,该数据集由给定的分区表达式划分为numPartitions。生成的数据集是范围分区的。
def repartitionByRange(partitionExprs: Column*): Dataset[T]
上面的方法返回一个新的数据集,由给定的分区表达式划分,使用spark.sql.shuffle.partitions作为分区数。生成的数据集是范围分区的。
但是对于合并,我们只有以下方法在数据框架上:
def coalesce(numPartitions: Int): Dataset[T]
上述方法将返回一个新的数据集,该数据集恰好有numPartitions分区
下面是RDD上可用于重分区和合并的方法: 点击这里查看完整实现。
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T]
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}
基本上,重分区方法通过将shuffle值传递为true来调用合并方法。 现在如果我们在RDD上使用coalesce方法,通过传递shuffle值为true,我们也可以增加分区!
我想在贾斯汀和鲍尔的回答中补充一点——
重新分区将忽略现有分区并创建新分区。所以你可以用它来修复数据倾斜。您可以使用分区键来定义分布。数据倾斜是“大数据”问题空间中最大的问题之一。
Coalesce将使用现有分区并对其中的一个子集进行洗牌。它不能像重新分区那样修复数据倾斜。因此,即使它更便宜,它也可能不是你需要的东西。
对于所有这些伟大的答案,我想补充的是,重新分区是利用数据并行化的最佳选择之一。而coalesce提供了一个廉价的选择来减少分区,并且在将数据写入HDFS或其他接收器以利用大写入时非常有用。
我发现这在以拼花格式写数据时很有用,可以充分利用它。
它避免了完全洗牌。如果已知分区数量正在减少,则执行器可以安全地将数据保存在最小分区数量上,只将数据从额外的节点移到我们保留的节点上。
所以,它会是这样的:
Node 1 = 1,2,3
Node 2 = 4,5,6
Node 3 = 7,8,9
Node 4 = 10,11,12
然后合并到2个分区:
Node 1 = 1,2,3 + (10,11,12)
Node 3 = 7,8,9 + (4,5,6)
注意,节点1和节点3不需要移动其原始数据。
重新分区-建议在增加分区数量的同时使用它,因为它涉及到所有数据的洗牌。
Coalesce—建议在使用它的同时减少分区的数量。例如,如果你有3个分区,你想把它减少到2个,coalesce将把第3个分区的数据移动到分区1和分区2。分区1和分区2将保留在同一个容器中。 另一方面,重新分区将打乱所有分区中的数据,因此执行程序之间的网络使用将很高,这将影响性能。
在减少分区数量的同时,Coalesce比重分区的性能更好。