专栏原创出处:github-源笔记文件 (opens new window)github-源码 (opens new window),欢迎 Star,转载请附上原文出处链接和本声明。

# 1. 什么是 Shuffle

当一个父 RDD 分区的数据分散到了多个子 RDD 的分区中时,这时会产生 Shuffle,即宽依赖之间会有 Shuffle。

  • Reduce Task 去拉取 Map Task 数据的时候会产生大量的网络、磁盘 IO、内存的消耗,Shuffle 性能的高低对整体任务的性能影响很大。

  • Shuffle 通常分为两个阶段,Map 阶段数据的准备及划分,Reduce 阶段数据的拉取。Map 端的 Shuffle 通常被称作 Shuffle Write,Reduce 端的 Shuffle 通常被称作 Shuffle Read。

# 2. Shuffle 管理器的发展史

  • Spark 1.2 之前 Shuffle 使用的计算引擎是 HashShuffleManager,这种方式虽然快速,但是会产生大量的文件,如果有 M 个 Mapper,N 个 Reducer 就会产生 M * N 个文件,如果 Mapper 和 Reducer 的数量很庞大将会带来性能上的影响。

  • Spark 1.2 之后引入了 SortShuffleManager,这种方式 Reduce 端需要读取的文件更少,因为 Map 端的每个 Task 最后会将临时文件合并成一个文件,并且会对 Map 端的数据进行排序,并生成记录数据位置的索引文件,Reducer 可以通过索引找到自己要拉取的数据,它也是 Spark 默认使用的 Shuffle 管理器。

  • Spark 2.0 之后移除了 HashShuffleManager,目前 ShuffleManager 只有 SortShuffleManager 这一个实现类。

# 3. SortShuffleManager 解析

SortShuffleManager 有两种运行机制,一种是普通机制,另一种是 bypass 机制。

# 3.1.普通机制解析

  • Shuffle Write 阶段会先将数据写入内存数据结构中,如果是聚合类型的算子 (reduceByKey),采用 Map 数据结构,先用 Map 进行预聚合处理,再写入内存中;如果是普通的 shuffle 算子的话 (join),采用 Array 数据结构,直接写入内存。

  • 当内存达到阈值的时候,会将这些内存中的数据进行排序,然后分批次写入磁盘文件 (默认 1W/批),这里并不会直接写入磁盘,会先写入内存缓冲流中,当缓冲流满溢之后,写入磁盘文件。

  • 最后会将每个 Task 写出的文件进行合并,最终生成一份数据文件和一份索引文件,索引文件记录了 Shuffle Read 阶段每个 Task 要读取的数据在文件中对应的开始和结束位置。

普通机制

# 3.2.bypass 机制解析

  • Shuffle Write 阶段会对每个 Task 数据的 key 进行 hash,相同 hash 的 key 会被写入同一个内存缓冲区,缓冲区满溢后会写到磁盘文件中。

  • 最后会将每个 Task 写出的磁盘文件进行合并,并创建索引文件 (与普通机制下的索引文件作用相同)。

bypass 机制

# 3.3.bypass 机制开启条件

  • shuffle read task 的数量小于等于 spark.shuffle.sort.bypassMergeThreshold 参数设置的阈值的时候,默认是 200。

  • 触发 shuffle 的算子不能是聚合类算子,比如 reduceByKey(在 Map 端对每一个 Task 的 key 会先进行一次预聚合处理)。

# 4. SortShuffleManager 两种机制的区别

  • 在将数据写入内存缓冲区的时候,普通机制要先将数据写入 Map 或者 Array 的内存数据结构中,而 bypass 机制是根据 key 的 hash 值直接写入内存缓冲区中。

  • bypass 机制在写入内存缓冲区之前没有对数据的排序操作,因此在 reduce task 比较少的情况下,开启 bypass 机制,不需要对数据排序,节省运算性能。

最后修改时间: 2/17/2020, 4:43:04 AM