专栏原创出处:github-源笔记文件 (opens new window) ,github-源码 (opens new window),欢迎 Star,转载请附上原文出处链接和本声明。
# 1. 什么是分区
RDD 是一个分布式的数据集,会存放很大量的数据,一个 RDD 是由若干个分区组成的,对 RDD 进行的各种操作,实际上就是对 RDD 中的分区并行的操作。因此,合理的控制分区数,可以更有效的利用集群的计算资源,减少网络传输上的开销,提升整体性能。
# 2. 分区方式
数据的分区方式只作用于 <key,value> 形式的 RDD。因此,当对一个 RDD 使用 shuffle 类型的算子的时候,这时就会用到数据分区器。
spark 默认提供了两种分区器,一种是 HashPartitioner,另一种是 RangePartitioner;还可以自定义分区器,通过继承 Partitioner 抽象类来实现。
# 1.HashPartitioner
采用哈希的方式对 <key,value> 形式的 RDD 进行分区,首先根据 key 值计算出 hashCode,然后再除以分区数取余,最后的到的结果就是这个 key 对应的分区 Id。
HashPartitioner 分区的方式有可能会造成数据倾斜,如果有大量的数据取余后得到的分区 Id 都相同,那么这些数据会放到一个分区中,因此生成的 Task 在运行的时候也会比其他的 Task 耗时要长很多。
class HashPartitioner(partitions: Int) extends Partitioner {
require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
def numPartitions: Int = partitions
// 获取分区 Id 的方法
// 如果 key 是 null 的话得到的分区 id 是 0
// key 不为 null 得到的 id = key.hashCode % numPartitions
def getPartition(key: Any): Int = key match {
case null => 0
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}
override def equals(other: Any): Boolean = other match {
case h: HashPartitioner =>
h.numPartitions == numPartitions
case _ =>
false
}
override def hashCode: Int = numPartitions
}
# 2.RangePartitioner
RangePartitioner 可以解决 HashPartitioner 有可能会产生的数据倾斜问题,尽量保证分配到每个分区的数据是均匀的,它会将一个范围内的数据分配到一个分区中。
RangePartitioner 工作原理:
首先根据子 RDD 分区的数量计算所需采样的总数据量:每个分区采样 20 条数据,所有分区最多采集 1000000 条数据。
然后计算对父 RDD 每个分区需要采样的平均据量:将要采样的总数据量扩容 3 倍 (防止采样的时候产生数据倾斜),然后再除以父 RDD 的分区数,采样的方式是使用水库采样。
采样完成后对每个分区采样的数据量进行判断,如果大于先前计算好的平均数据量 (已经进行了 3 倍扩容),说明这个分区采样的数据不均衡,需要使用 sample 算子对这些分区重新采样。
首次采样数据分布均匀的以及分布不均匀进行二次采样的分区,最终都会得到每个分区中所有采样的 key,和这个 key 所占的权重。
通过 determineBounds() 方法,求出这些 key 的平均权重,然后将这些 key 排序,按顺序依次相加每一个 key 的权重,每当达到平均权重的 1 倍、2 倍、3 倍 ··· 的时候,将最后相加的权重对应的 key 值放入一个数组 rangeBounds 中,作为划分每个分区边界的标记。
获取分区 Id 时,如果 rangeBounds 数组长度大于 128 则采用二分法找到 RDD 中的 key 对应的分区 Id,否则按顺序查找对应的分区 Id。
由于代码量比较多,此处不粘贴源码,相关源码可以参考 RangePartitioner 类。
# 3. 如何设置合理的分区数
- 分区数越多越好吗?
不是的,分区数太多意味着任务数太多,每次调度任务也是很耗时的,所以分区数太多会导致总体耗时增多。
- 分区数太少会有什么影响?
分区数太少的话,会导致一些结点没有分配到任务;另一方面,分区数少则每个分区要处理的数据量就会增大,从而对每个结点的内存要求就会提高;还有分区数不合理,会导致数据倾斜问题。
- 合理的分区数是多少?如何设置?
总核数=executor-cores * num-executor
一般合理的分区数设置为总核数的 2~3 倍
← Shuffle 机制解析 共享变量 →