专栏原创出处:github-源笔记文件 github-源码 ,欢迎 Star,转载请附上原文出处链接和本声明。

# 1. 什么是分区

RDD 是一个分布式的数据集,会存放很大量的数据,一个 RDD 是由若干个分区组成的,对 RDD 进行的各种操作,实际上就是对 RDD 中的分区并行的操作。因此,合理的控制分区数,可以更有效的利用集群的计算资源,减少网络传输上的开销,提升整体性能。

# 2. 分区方式

数据的分区方式只作用于 <key,value> 形式的 RDD。因此,当对一个 RDD 使用 shuffle 类型的算子的时候,这时就会用到数据分区器。

spark 默认提供了两种分区器,一种是 HashPartitioner,另一种是 RangePartitioner;还可以自定义分区器,通过继承 Partitioner 抽象类来实现。

# 1.HashPartitioner

采用哈希的方式对 <key,value> 形式的 RDD 进行分区,首先根据 key 值计算出 hashCode,然后再除以分区数取余,最后的到的结果就是这个 key 对应的分区 Id。

HashPartitioner 分区的方式有可能会造成数据倾斜,如果有大量的数据取余后得到的分区 Id 都相同,那么这些数据会放到一个分区中,因此生成的 Task 在运行的时候也会比其他的 Task 耗时要长很多。

class HashPartitioner(partitions: Int) extends Partitioner {
  require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")

  def numPartitions: Int = partitions
  // 获取分区 Id 的方法
  // 如果 key 是 null 的话得到的分区 id 是 0
  // key 不为 null 得到的 id = key.hashCode % numPartitions
  def getPartition(key: Any): Int = key match {
    case null => 0
    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
  }

  override def equals(other: Any): Boolean = other match {
    case h: HashPartitioner =>
      h.numPartitions == numPartitions
    case _ =>
      false
  }

  override def hashCode: Int = numPartitions
}

# 2.RangePartitioner

RangePartitioner 可以解决 HashPartitioner 有可能会产生的数据倾斜问题,尽量保证分配到每个分区的数据是均匀的,它会将一个范围内的数据分配到一个分区中。

RangePartitioner 工作原理:

  • 首先根据子 RDD 分区的数量计算所需采样的总数据量:每个分区采样 20 条数据,所有分区最多采集 1000000 条数据。

  • 然后计算对父 RDD 每个分区需要采样的平均据量:将要采样的总数据量扩容 3 倍 (防止采样的时候产生数据倾斜),然后再除以父 RDD 的分区数,采样的方式是使用水库采样。

  • 采样完成后对每个分区采样的数据量进行判断,如果大于先前计算好的平均数据量 (已经进行了 3 倍扩容),说明这个分区采样的数据不均衡,需要使用 sample 算子对这些分区重新采样。

  • 首次采样数据分布均匀的以及分布不均匀进行二次采样的分区,最终都会得到每个分区中所有采样的 key,和这个 key 所占的权重。

  • 通过 determineBounds() 方法,求出这些 key 的平均权重,然后将这些 key 排序,按顺序依次相加每一个 key 的权重,每当达到平均权重的 1 倍、2 倍、3 倍 ··· 的时候,将最后相加的权重对应的 key 值放入一个数组 rangeBounds 中,作为划分每个分区边界的标记。

  • 获取分区 Id 时,如果 rangeBounds 数组长度大于 128 则采用二分法找到 RDD 中的 key 对应的分区 Id,否则按顺序查找对应的分区 Id。

由于代码量比较多,此处不粘贴源码,相关源码可以参考 RangePartitioner 类。

# 3. 如何设置合理的分区数

  1. 分区数越多越好吗?

不是的,分区数太多意味着任务数太多,每次调度任务也是很耗时的,所以分区数太多会导致总体耗时增多。

  1. 分区数太少会有什么影响?

分区数太少的话,会导致一些结点没有分配到任务;另一方面,分区数少则每个分区要处理的数据量就会增大,从而对每个结点的内存要求就会提高;还有分区数不合理,会导致数据倾斜问题。

  1. 合理的分区数是多少?如何设置?

总核数=executor-cores * num-executor

一般合理的分区数设置为总核数的 2~3 倍

最后修改时间: 2/17/2020, 4:43:04 AM