大数据开发面试题之介绍下Spark Shuffle

刚刚在牛客的面经分享部分看了下,前几篇面经,Spark Shuffle就被问到了多次

比如:

比如:

还有很多,大家可以在牛客网的讨论区看下其他牛友分享的面经,基本上就知道面试官是怎么去问,我们又该思考如何去回答

牛客面经中常问的问题:

介绍下Spark Shuffle

问过的一些公司:字节x7,字节(2021.08)-(2021.10),爱奇艺,头条,第四范式,陌陌 x 3,蘑菇街,美团 x 6,美团(2021.08),海康,网易云音乐,阿里,商汤科技,作业帮社招,京东,携程(2021.09),腾讯云(2021.10)

带年份的是后面大数据面试题3.0中总结的

Shuffle,中文的意思就是洗牌。之所以需要Shuffle,是因为具有某种共同特征的一类数据需要最终汇聚(aggregate)到一个计算节点上进行计算。这些数据分布在各个存储节点上并且由不同节点的计算单元处理。以最简单的Word Count为例,其中数据保存在Node1、Node2和Node3;经过处理后,这些数据最终会汇聚到Nodea、Nodeb处理,如下图所示。


这个数据重新打乱然后汇聚到不同节点的过程就是Shuffle。但是实际上,Shuffle过程可能会非常复杂:
1)数据量会很大,比如单位为TB或PB的数据分散到几百甚至数千、数万台机器上。
2)为了将这个数据汇聚到正确的节点,需要将这些数据放入正确的Partition,因为数据大小已经大于节点的内存,因此这个过程中可能会发生多次硬盘续写。
3)为了节省带宽,这个数据可能需要压缩,如何在压缩率和压缩解压时间中间做一个比较好的选择?
4)数据需要通过网络传输,因此数据的序列化和发序列化也变得相对复杂。
一般来说,每个Task处理的数据可以完全载入内存(如果不能,可以减小每个Partition的大小),因此Task可以做到在内存中计算。除非非常复杂的计算逻辑,否则为了容错而持久化中间的数据是没有太大收益的,毕竟中间某个过程出错了可以从头开始计算。但是对于Shuffle来说,如果不持久化这个中间结果,一旦数据丢失,就需要重新计算依赖的全部RDD,因此有必要持久化这个中间结果。

一句话总结:Shuffle就是对数据进行重组,由于分布式计算的特性和要求,在实现细节上更加繁琐和复杂

在DAG调度的过程中,Stage阶段的划分是根据是否有shuffle过程,也就是存在Shuffle Dependency宽依赖的时候,需要进行shuffle,这时候会将作业job划分成多个Stage;并且在划分Stage的时候,构建Shuffle Dependency的时候进行shuffle注册,获取后续数据读取所需要的Shuffle Handle,最终每一个job提交后都会生成一个Result Stage和若干个Shuffle Map Stage,其中Result Stage表示生成作业的最终结果所在的Stage。Result Stage与Shuffle Map Stage中的task分别对应着Result Task与Shuffle Map Task。

RDD 的 Transformation 函数中,又分为窄依赖(narrow dependency)和宽依赖(wide dependency)的操作。窄依赖跟宽依赖的区别是是否发生shuffle(洗牌) 操作。宽依赖会发生shuffle操作。窄依赖是子RDD的各个分片(partition)不依赖于其他分片,能够独立计算得到结果,宽依赖指子RDD的各个分片会依赖于父RDD的多个分片,所以会造成父 RDD 的各个分片在集群中重新分片。

Spark Shuffle 分为两种:一种是基于 Hash 的 Shuffle;另一种是基于 Sort 的 Shuffle。

对于下面的内容,先明确一个假设前提:每个Executor只有1个CPU core,也就是说,无论这个Executor上分配多少个task线程,同一时间都只能执行一个task线程。

1、Hash Shuffle

1)未优化的HashShuffle

如下图中有3个 Reducer,从Task 开始那边各自把自己进行 Hash 计算(分区器:hash/numreduce取模),分类出3个不同的类别,每个 Task 都分成3种类别的数据,想把不同的数据汇聚然后计算出最终的结果,所以Reducer 会在每个 Task 中把属于自己类别的数据收集过来,汇聚成一个同类别的大集合,每1个 Task 输出3份本地文件,这里有4个 Mapper Tasks,所以总共输出了4个 Tasks x 3个分类文件 = 12个本地小文件。

2)优化后的HashShuffle

优化的HashShuffle过程就是启用合并机制,合并机制就是复用buffer,开启合并机制的配置是spark.shuffle.consolidateFiles。该参数默认值为false,将其设置为true即可开启优化机制。通常来说,如果我们使用HashShuffleManager,那么都建议开启这个选项。

这里还是有4个Tasks,数据类别还是分成3种类型,因为Hash算***根据你的 Key 进行分类,在同一个进程中,无论是有多少过Task,都会把同样的Key放在同一个Buffer里,然后把Buffer中的数据写入以Core数量为单位的本地文件中,(一个Core只有一种类型的Key的数据),每1个Task所在的进程中,分别写入共同进程中的3份本地文件,这里有4个Mapper Tasks,所以总共输出是 2个Cores x 3个分类文件 = 6个本地小文件。

基于 Hash 的 Shuffle 机制的优缺点

优点

  • 可以省略不必要的排序开销。

  • 避免了排序所需的内存开销。

缺点

  • 生产的文件过多,会对文件系统造成压力。

  • 大量小文件的随机读写带来一定的磁盘开销。

  • 数据块写入时所需的缓存空间也会随之增加,对内存造成压力。

2、Sort Shuffle

1)普通SortShuffle

在该模式下,数据会先写入一个数据结构,reduceByKey写入Map,一边通过Map局部聚合,一遍写入内存。Join算子写入ArrayList直接写入内存中。然后需要判断是否达到阈值,如果达到就会将内存数据结构的数据写入到磁盘,清空内存数据结构。

在溢写磁盘前,先根据key进行排序,排序过后的数据,会分批写入到磁盘文件中。默认批次为10000条,数据会以每批一万条写入到磁盘文件。写入磁盘文件通过缓冲区溢写的方式,每次溢写都会产生一个磁盘文件,也就是说一个Task过程会产生多个临时文件。

最后在每个Task中,将所有的临时文件合并,这就是merge过程,此过程将所有临时文件读取出来,一次写入到最终文件。意味着一个Task的所有数据都在这一个文件中。同时单独写一份索引文件,标识下游各个Task的数据在文件中的索引,start offset和end offset。


2)bypass SortShuffle

bypass运行机制的触发条件如下:

  • shuffle reduce task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值,默认为200

  • 不是聚合类的shuffle算子(比如reduceByKey)

此时task会为每个reduce端的task都创建一个临时磁盘文件,并将数据按key进行hash然后根据key的hash值,将key写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。

该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的HashShuffleManager来说,shuffle read的性能会更好。

而该机制与普通SortShuffleManager运行机制的不同在于:不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。

3)Tungsten Sort Shuffle 运行机制

基于 Tungsten Sort 的 Shuffle 实现机制主要是借助 Tungsten 项目所做的优化来高效处理 Shuffle。

Spark 提供了配置属性,用于选择具体的 Shuffle 实现机制,但需要说明的是,虽然默认情况下 Spark 默认开启的是基于 SortShuffle 实现机制,但实际上,参考 Shuffle 的框架内核部分可知基于 SortShuffle 的实现机制与基于 Tungsten Sort Shuffle 实现机制都是使用 SortShuffleManager,而内部使用的具体的实现机制,是通过提供的两个方法进行判断的:

对应非基于 Tungsten Sort 时,通过 SortShuffleWriter.shouldBypassMergeSort方法判断是否需要回退到 Hash 风格的 Shuffle 实现机制,当该方法返回的条件不满足时,则通过SortShuffleManager.canUseSerializedShuffle方法判断是否需要采用基于 Tungsten Sort Shuffle 实现机制,而当这两个方法返回都为 false,即都不满足对应的条件时,会自动采用普通运行机制。

因此,当设置了spark.shuffle.manager=tungsten-sort时,也不能保证就一定采用基于 Tungsten Sort 的 Shuffle 实现机制。

要实现 Tungsten Sort Shuffle 机制需要满足以下条件:

  • Shuffle 依赖中不带聚合操作或没有对输出进行排序的要求。

  • Shuffle 的序列化器支持序列化值的重定位(当前仅支持 KryoSerializer Spark SQL 框架自定义的序列化器)。

  • Shuffle 过程中的输出分区个数少于 16777216 个。

实际上,使用过程中还有其他一些限制,如引入 Page 形式的内存管理模型后,内部单条记录的长度不能超过 128 MB (具体内存模型可以参考 PackedRecordPointer 类)。另外,分区个数的限制也是该内存模型导致的。

所以,目前使用基于 Tungsten Sort Shuffle 实现机制条件还是比较苛刻的。

基于 Sort 的 Shuffle 机制的优缺点

优点

  • 小文件的数量大量减少,Mapper 端的内存占用变少;

  • Spark 不仅可以处理小规模的数据,即使处理大规模的数据,也不会很容易达到性能瓶颈。

缺点

  • 如果 Mapper 中 Task 的数量过大,依旧会产生很多小文件,此时在 Shuffle 传数据的过程中到 Reducer 端, Reducer 会需要同时大量地记录进行反序列化,导致大量内存消耗和 GC 负担巨大,造成系统缓慢,甚至崩溃;

  • 强制了在 Mapper 端必须要排序,即使数据本身并不需要排序;

  • 它要基于记录本身进行排序,这就是 Sort-Based Shuffle 最致命的性能消耗。


**********
#面试题##大数据开发工程师##面试题目#
全部评论
你好,向请教一下普通的SortShuffle是按key对应的分区值进行排序,分区内的key不需要排序?还是和MR一样,先按key对应的分区值进行排序,分区内的数据有序?两个版本的答案我都见过
1 回复 分享
发布于 2022-08-18 21:31 浙江
很强,视频课这里讲的太简略了
点赞 回复 分享
发布于 2022-08-31 18:24 重庆
点赞 回复 分享
发布于 2022-05-15 15:07
有无面试题pdf分享一下
点赞 回复 分享
发布于 2022-04-22 14:32
点赞 回复 分享
发布于 2022-04-21 21:39
强!
点赞 回复 分享
发布于 2022-04-19 20:03
6
点赞 回复 分享
发布于 2022-04-19 09:28

相关推荐

04-19 11:59
门头沟学院 Java
卷不动辣24314:挂,看来不该投这个部门的
点赞 评论 收藏
分享
05-07 17:58
门头沟学院 Java
wuwuwuoow:1.简历字体有些怪怪的,用啥写的? 2.Redis 一主二从为什么能解决双写一致性? 3.乐观锁指的是 SQL 层面的库存判断?比如 stock > 0。个人认为这种不算乐观锁,更像是乐观锁的思想,写 SQL 避免不了悲观锁的 4.奖项证书如果不是 ACM,说实话没什么必要写 5.逻辑过期时间为什么能解决缓存击穿问题?逻辑过期指的是什么 其实也没什么多大要改的。海投吧
点赞 评论 收藏
分享
评论
23
140
分享

创作者周榜

更多
牛客网
牛客企业服务