大数据开发面试题之介绍下Spark Shuffle
刚刚在牛客的面经分享部分看了下,前几篇面经,Spark Shuffle就被问到了多次
比如:
比如:
还有很多,大家可以在牛客网的讨论区看下其他牛友分享的面经,基本上就知道面试官是怎么去问,我们又该思考如何去回答。
牛客面经中常问的问题:
介绍下Spark Shuffle
问过的一些公司:字节x7,字节(2021.08)-(2021.10),爱奇艺,头条,第四范式,陌陌 x 3,蘑菇街,美团 x 6,美团(2021.08),海康,网易云音乐,阿里,商汤科技,作业帮社招,京东,携程(2021.09),腾讯云(2021.10)
带年份的是后面大数据面试题3.0中总结的
Shuffle,中文的意思就是洗牌。之所以需要Shuffle,是因为具有某种共同特征的一类数据需要最终汇聚(aggregate)到一个计算节点上进行计算。这些数据分布在各个存储节点上并且由不同节点的计算单元处理。以最简单的Word Count为例,其中数据保存在Node1、Node2和Node3;经过处理后,这些数据最终会汇聚到Nodea、Nodeb处理,如下图所示。
这个数据重新打乱然后汇聚到不同节点的过程就是Shuffle。但是实际上,Shuffle过程可能会非常复杂:
1)数据量会很大,比如单位为TB或PB的数据分散到几百甚至数千、数万台机器上。
2)为了将这个数据汇聚到正确的节点,需要将这些数据放入正确的Partition,因为数据大小已经大于节点的内存,因此这个过程中可能会发生多次硬盘续写。
3)为了节省带宽,这个数据可能需要压缩,如何在压缩率和压缩解压时间中间做一个比较好的选择?
4)数据需要通过网络传输,因此数据的序列化和发序列化也变得相对复杂。
一般来说,每个Task处理的数据可以完全载入内存(如果不能,可以减小每个Partition的大小),因此Task可以做到在内存中计算。除非非常复杂的计算逻辑,否则为了容错而持久化中间的数据是没有太大收益的,毕竟中间某个过程出错了可以从头开始计算。但是对于Shuffle来说,如果不持久化这个中间结果,一旦数据丢失,就需要重新计算依赖的全部RDD,因此有必要持久化这个中间结果。
一句话总结:Shuffle就是对数据进行重组,由于分布式计算的特性和要求,在实现细节上更加繁琐和复杂。
在DAG调度的过程中,Stage阶段的划分是根据是否有shuffle过程,也就是存在Shuffle Dependency宽依赖的时候,需要进行shuffle,这时候会将作业job划分成多个Stage;并且在划分Stage的时候,构建Shuffle Dependency的时候进行shuffle注册,获取后续数据读取所需要的Shuffle Handle,最终每一个job提交后都会生成一个Result Stage和若干个Shuffle Map Stage,其中Result Stage表示生成作业的最终结果所在的Stage。Result Stage与Shuffle Map Stage中的task分别对应着Result Task与Shuffle Map Task。
RDD 的 Transformation 函数中,又分为窄依赖(narrow dependency)和宽依赖(wide dependency)的操作。窄依赖跟宽依赖的区别是是否发生shuffle(洗牌) 操作。宽依赖会发生shuffle操作。窄依赖是子RDD的各个分片(partition)不依赖于其他分片,能够独立计算得到结果,宽依赖指子RDD的各个分片会依赖于父RDD的多个分片,所以会造成父 RDD 的各个分片在集群中重新分片。
对于下面的内容,先明确一个假设前提:每个Executor只有1个CPU core,也就是说,无论这个Executor上分配多少个task线程,同一时间都只能执行一个task线程。
1、Hash Shuffle
1)未优化的HashShuffle
如下图中有3个 Reducer,从Task 开始那边各自把自己进行 Hash 计算(分区器:hash/numreduce取模),分类出3个不同的类别,每个 Task 都分成3种类别的数据,想把不同的数据汇聚然后计算出最终的结果,所以Reducer 会在每个 Task 中把属于自己类别的数据收集过来,汇聚成一个同类别的大集合,每1个 Task 输出3份本地文件,这里有4个 Mapper Tasks,所以总共输出了4个 Tasks x 3个分类文件 = 12个本地小文件。
2)优化后的HashShuffle
优化的HashShuffle过程就是启用合并机制,合并机制就是复用buffer,开启合并机制的配置是spark.shuffle.consolidateFiles。该参数默认值为false,将其设置为true即可开启优化机制。通常来说,如果我们使用HashShuffleManager,那么都建议开启这个选项。
基于 Hash 的 Shuffle 机制的优缺点
优点:
-
可以省略不必要的排序开销。
-
避免了排序所需的内存开销。
缺点:
-
生产的文件过多,会对文件系统造成压力。
-
大量小文件的随机读写带来一定的磁盘开销。
-
数据块写入时所需的缓存空间也会随之增加,对内存造成压力。
2、Sort Shuffle
1)普通SortShuffle
在该模式下,数据会先写入一个数据结构,reduceByKey写入Map,一边通过Map局部聚合,一遍写入内存。Join算子写入ArrayList直接写入内存中。然后需要判断是否达到阈值,如果达到就会将内存数据结构的数据写入到磁盘,清空内存数据结构。
在溢写磁盘前,先根据key进行排序,排序过后的数据,会分批写入到磁盘文件中。默认批次为10000条,数据会以每批一万条写入到磁盘文件。写入磁盘文件通过缓冲区溢写的方式,每次溢写都会产生一个磁盘文件,也就是说一个Task过程会产生多个临时文件。
最后在每个Task中,将所有的临时文件合并,这就是merge过程,此过程将所有临时文件读取出来,一次写入到最终文件。意味着一个Task的所有数据都在这一个文件中。同时单独写一份索引文件,标识下游各个Task的数据在文件中的索引,start offset和end offset。
2)bypass SortShuffle
bypass运行机制的触发条件如下:
-
shuffle reduce task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值,默认为200
-
不是聚合类的shuffle算子(比如reduceByKey)
此时task会为每个reduce端的task都创建一个临时磁盘文件,并将数据按key进行hash然后根据key的hash值,将key写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。
该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的HashShuffleManager来说,shuffle read的性能会更好。
而该机制与普通SortShuffleManager运行机制的不同在于:不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。
3)Tungsten Sort Shuffle 运行机制
基于 Tungsten Sort 的 Shuffle 实现机制主要是借助 Tungsten 项目所做的优化来高效处理 Shuffle。
Spark 提供了配置属性,用于选择具体的 Shuffle 实现机制,但需要说明的是,虽然默认情况下 Spark 默认开启的是基于 SortShuffle 实现机制,但实际上,参考 Shuffle 的框架内核部分可知基于 SortShuffle 的实现机制与基于 Tungsten Sort Shuffle 实现机制都是使用 SortShuffleManager,而内部使用的具体的实现机制,是通过提供的两个方法进行判断的:
对应非基于 Tungsten Sort 时,通过 SortShuffleWriter.shouldBypassMergeSort方法判断是否需要回退到 Hash 风格的 Shuffle 实现机制,当该方法返回的条件不满足时,则通过SortShuffleManager.canUseSerializedShuffle方法判断是否需要采用基于 Tungsten Sort Shuffle 实现机制,而当这两个方法返回都为 false,即都不满足对应的条件时,会自动采用普通运行机制。
因此,当设置了spark.shuffle.manager=tungsten-sort时,也不能保证就一定采用基于 Tungsten Sort 的 Shuffle 实现机制。
要实现 Tungsten Sort Shuffle 机制需要满足以下条件:
-
Shuffle 依赖中不带聚合操作或没有对输出进行排序的要求。
-
Shuffle 的序列化器支持序列化值的重定位(当前仅支持 KryoSerializer Spark SQL 框架自定义的序列化器)。
-
Shuffle 过程中的输出分区个数少于 16777216 个。
实际上,使用过程中还有其他一些限制,如引入 Page 形式的内存管理模型后,内部单条记录的长度不能超过 128 MB (具体内存模型可以参考 PackedRecordPointer 类)。另外,分区个数的限制也是该内存模型导致的。
所以,目前使用基于 Tungsten Sort Shuffle 实现机制条件还是比较苛刻的。
基于 Sort 的 Shuffle 机制的优缺点
优点:
-
小文件的数量大量减少,Mapper 端的内存占用变少;
-
Spark 不仅可以处理小规模的数据,即使处理大规模的数据,也不会很容易达到性能瓶颈。
缺点:
-
如果 Mapper 中 Task 的数量过大,依旧会产生很多小文件,此时在 Shuffle 传数据的过程中到 Reducer 端, Reducer 会需要同时大量地记录进行反序列化,导致大量内存消耗和 GC 负担巨大,造成系统缓慢,甚至崩溃;
-
强制了在 Mapper 端必须要排序,即使数据本身并不需要排序;
-
它要基于记录本身进行排序,这就是 Sort-Based Shuffle 最致命的性能消耗。