Spark考核要点
Spark 3.x AQE 与 DPP
题目一:AQE 的三大能力与适用场景
场景:你们的离线数仓从 Spark 2.x 升级到 Spark 3.x 之后,发现同一份 TPC-DS 报表整体性能提升了不少,但有些 SQL 在数据倾斜场景下表现差异更明显。Leader 让你梳理并落地 Spark 3.x 的 AQE(Adaptive Query Execution,自适应查询执行)能力,用来统一优化 SQL。
要求:
- 说明 Spark 3.x AQE 相比 Spark 2.x 的核心目标是什么,解决了 Spark 2.x 哪些「静态优化」的痛点?
- 结合实际查询场景,分别举例说明 AQE 的 三大能力: 动态合并 Shuffle 分区(Coalesce Shuffle Partitions)动态调整 Join 策略(Demote SortMergeJoin to BroadcastHashJoin 等)动态倾斜 Join 优化(Skew Join)
- 对于每一项能力,分别回答: 需要开启或依赖的 关键配置 是什么?触发原理:在运行时依赖哪些统计信息?是在什么阶段做决策?带来的 收益 和 可能的风险/限制 是什么?
参考答案要点(题目一)
1. AQE 的核心目标与痛点
- 核心目标:在 运行时 基于真实数据分布动态调整执行计划,弥补纯规则优化(RBO)和静态代价优化(CBO)的不足。
- <font style="color:#DF2A3F;">解决的痛点</font><font style="color:#DF2A3F;">(Spark 2.x):</font> <font style="color:#DF2A3F;">计划一旦确定就无法调整</font><font style="color:#DF2A3F;">:CBO 仅依赖元数据统计,无法根据实际中间结果修正计划。</font><font style="color:#DF2A3F;">Shuffle 分区数固定</font><font style="color:#DF2A3F;">:</font><font style="color:#DF2A3F;">spark.sql.shuffle.partitions</font><font style="color:#DF2A3F;"> 是静态常数,数据量变动或分布不均时要么小 Task 过多、要么单 Task 过大。</font><font style="color:#DF2A3F;">Join 策略不够灵活</font><font style="color:#DF2A3F;">:在编译时决定 Broadcast Join / SortMergeJoin 等策略,一旦执行中某一侧「意外变小」也不会自动改用 Broadcast。</font><font style="color:#DF2A3F;">倾斜 Join 需要手工优化</font><font style="color:#DF2A3F;">:2.x 对 skew join 几乎都是「手工加盐、拆表」等方案,运维/开发成本高。</font>
2. AQE 的三大能力与场景
能力一:动态合并 Shuffle 分区
- 用途:解决小分区/小文件过多的问题,减少 Task 数量和调度开销,同时减轻 NameNode 小文件压力。
- 典型场景: 动态过滤、where 条件筛掉了大量数据,实际 Shuffle 输出远小于预估。业务高低峰明显,淡季仍使用旺季设定的较大 spark.sql.shuffle.partitions。
- 关键配置(常用): spark.sql.adaptive.enabled=truespark.sql.adaptive.coalescePartitions.enabled=truespark.sql.adaptive.advisoryPartitionSizeInBytes=64MB(目标分区大小)
- 触发原理: AQE 将物理计划拆成多个 QueryStage。每个 Shuffle Map 阶段结束后,收集 MapOutputStatistics(每个 reduce 分区的数据量、空分区数量等)。对明显偏小的分区进行合并,使得合并后分区大小尽量接近 advisoryPartitionSizeInBytes。
- 收益: 减少小 Task、小文件;降低调度开销、NameNode 压力。更合理利用资源,避免「过度并行」带来的额外开销。
- 风险/限制: 若数据本身分布不均,部分大分区仍然会存在,无法完全解决倾斜问题。调得过大可能导致某些分区过大,引起单 Task OOM,需要结合业务数据量调参。
能力二:动态调整 Join 策略
- 用途:在运行时根据某侧表的实际大小调整 Join 策略,例如将原本计划的 SortMergeJoin 降级/升级为 BroadcastHashJoin,从而减少 Shuffle 和排序。
- 典型场景: 编译时估算小表略大于 autoBroadcastJoinThreshold,CBO 选择 SortMergeJoin。实际运行中,经过 filter/group 等算子后,小表变得很小,完全可以广播。
- 关键配置(示例): spark.sql.adaptive.enabled=truespark.sql.autoBroadcastJoinThreshold(广播阈值,默认值10M)与 AQE join 策略相关的内部规则(如 DemoteBroadcastHashJoin 等)。
- 触发原理: 在对应 QueryStage 的 Map 阶段完成之后,AQE 拿到该侧 Shuffle 输出的 真实大小。若某侧的中间结果大小 < autoBroadcastJoinThreshold,且满足空分区比例等附加条件,则将 Join 物理算子从 SortMergeJoin 改写为 BroadcastHashJoin。
- 收益: 避免不必要的双侧 shuffle + sort,Query 延迟显著降低(特别是典型星型模型中,小维表 Join 大事实表的场景)。减少网络 IO 和磁盘 IO。
- 风险/限制: 广播表仍然会占用 executor 内存,若阈值配置过大可能引起 OOM。仅在包含 Shuffle 的查询,且开启 AQE 时生效。
能力三:动态倾斜 Join 优化(Skew Join)
- 用途:针对少数 超大 reduce 分区 做自动拆分,降低单 Task 处理时间,缓解数据倾斜。
- 典型场景: 事实表与维表 Join,某些热点 key(如大 V 用户、主类目)数据量远高于其他 key,导致某些 reduce 分区特别大。
- 关键配置(示例): spark.sql.adaptive.enabled=truespark.sql.adaptive.skewJoin.enabled=true与 skew 检测阈值相关的配置(如「分区大小 > 若干倍中位数」等)。
- 触发原理: Shuffle Map 阶段结束后,根据 reduce 分区的大小分布,检测「异常大」的分区。对这些 skewed 分区进行拆分(如 1 个大分区拆成多个子分区),并在 Join 时与对侧表的对应分区分别做多次局部 Join,最后再 union 结果。
- 收益: 自动化处理数据倾斜,对业务逻辑透明,减少「手动加盐、拆 key」的工作。提升整体 job 的尾部延迟(p95/p99),避免某个倾斜 Task 拖慢整个 Stage。
- 风险/限制: 需要额外的 shuffle 和任务调度开销,极端情况下可能收益有限。阈值设置不合理,可能误判或者漏判倾斜。
题目二:DPP(动态分区裁剪)在分区表 Join 中的作用
场景:数仓中有一张按 dt 分区的大事实表 fact_orders,以及一张维度表 dim_users。某个报表 SQL 大致如下:
SELECT o.order_id, o.dt, u.province FROM fact_orders o JOIN dim_users u ON o.user_id = u.user_id WHERE u.province = 'Zhejiang' AND o.dt BETWEEN '2024-01-01' AND '2024-01-31';
业务方反馈:在开启 Spark 3.x 的 DPP 后,这个 SQL 的执行时间显著下降。请你向团队同学讲清楚 DPP 的价值和原理。
要求:
- 说明 静态分区裁剪 与 动态分区裁剪(DPP) 的区别,各自适用于什么场景。
- 以上述 SQL 为例,画出/描述: 没有 DPP 时,fact_orders 在执行时大致会扫描哪些分区?开启 DPP 后,Spark 如何利用 dim_users 这一侧的过滤条件,进一步减少 fact_orders 的扫描分区数?
- 说明 DPP 的 触发条件 和 限制,至少包括: 被裁剪的一侧表在物理上需要满足什么条件?哪些 Join 类型支持 DPP?DPP 何时会因为「收益不够大或成本过高」而被放弃?
参考答案要点(题目二)
1. 静态分区裁剪 vs 动态分区裁剪
- 静态分区裁剪(Static Partition Pruning): 在 编译阶段,直接从 SQL 语句中解析出对分区列(如 dt)的过滤条件。例:WHERE o.dt BETWEEN '2024-01-01' AND '2024-01-31',编译时即可确定只需扫描 1 月份分区。适用场景:分区列上的常量过滤、表达式在编译期可求值。
- 动态分区裁剪(Dynamic Partition Pruning, DPP): 在 运行阶段,基于 Join 另一侧的实际过滤结果,动态推导出需要扫描的分区。例如:o.dt 与另一侧 t2.dt Join,而 t2 上有 WHERE t2.id < 5 等条件,则只有满足该条件的 t2.dt 才有意义,从而动态裁剪 t1 的分区。适用场景:分区列值取决于另外一张表的过滤结果,编译时无法得出具体分区集合。
2. 示例 SQL 中 DPP 的行为
无 DPP 时
- 静态裁剪可以利用
o.dt BETWEEN '2024-01-01' AND '2024-01-31': fact_orders 只扫描 2024-01-01 ~ 2024-01-31 这些 dt 分区。对于每个 dt 分区,都会参与与 dim_users 的 Join。 - 但
u.province = 'Zhejiang'只在dim_users这一侧生效,与fact_orders的分区列dt没有直接静态关系,无法再进一步裁剪。
开启 DPP 后
- 优化思想(简化理解): 将原查询在逻辑上重写为:o.dt IN (SELECT DISTINCT dt FROM fact_orders 与 dim_users 以 province 过滤后的 Join 结果) 或等价的半连接逻辑。更常见的形式是:利用 dim_users 的过滤结果生成一个针对 fact_orders 分区键的「动态过滤器」。
- 运行过程(抽象): 对 dim_users 应用 province='Zhejiang' 的过滤,得到相关用户集合。这些用户在一定条件下可以与 fact_orders 的分区键(如 dt 或 (dt, user_id))建立映射。Spark 在真正扫描 fact_orders 对应分区之前,根据这批用户/键,裁剪掉不会命中 Join 结果的 dt 分区。
- 结果: 实际扫描的 fact_orders 分区数 ≤ 静态裁剪后的分区数;在很多 TPC-DS 场景中,能从几十个分区缩减到个位数,大幅降低 IO。
说明:真实实现中,Spark 会根据 Join 方向、Join 类型、是否广播、收益估计等采用不同的实现策略(如基于子查询或 Reused Broadcast Exchange 等),但核心思想都是「用运行时 Join 一侧的结果去裁剪另一侧的分区」。
3. DPP 的触发条件与限制
- 触发条件(部分): 被裁剪的一侧表必须是 按 Join key 分区 的分区表(例如 PARTITIONED BY (dt) 且 Join key 中包含 dt)。Join 类型通常为: INNER JOINLEFT SEMI(被裁剪表在左侧)LEFT / RIGHT OUTER(有特定要求,被裁剪方需与 Join 方向匹配)(实现细节)对于 Broadcast Hash Join,DPP 会尝试复用广播结果;在非广播场景下可能通过子查询形式实现。
- 限制与放弃场景: 若评估运行动态子查询的代价 大于 裁剪分区带来的收益(例如很少分区可被裁剪),优化器可能放弃 DPP。若 Join key 与分区列并不匹配(例如按 dt 分区,却在 city 上 Join),DPP 无法生效。某些复杂 Join(多表 Join、嵌套子查询)中,DPP 可能因为规划复杂度或不满足启发式规则而不触发。
以上两道题覆盖了 Spark 3.x 中 AQE 和 DPP 的核心概念、使用场景和原理,既可以考察候选人对新版本特性的理解深度,也能引导其思考如何在真实数仓/报表场景中落地这些能力。
Spark Join 倾斜
题目一:Join 倾斜产生原因
场景:大表 A(如用户行为日志)与大表 B(如用户维度表)按 userId 做 Join。线上发现某个 Task 运行时间远超其他 Task,甚至 OOM,而其他 Task 很快完成。
要求:
- 说明 Spark Join 数据倾斜 产生的原因。
- 为什么 Sort-Merge Join 和 Hash Join 都会出现倾斜?
- 如何快速定位哪个 key 发生了倾斜?
参考答案要点
1. Join 倾斜产生的原因
- Shuffle 分区规则:Spark 的 Join 一般需要 shuffle,按
hash(key) % numPartitions将相同 key 的数据发往同一分区。(关于numPartitions的值,Join 一般需要 shuffle 时,若写的是 SQL 或 DataFrame,分区数就来自 spark.sql.shuffle.partitions,默认值200;若写的是 RDD 的 join,则来自你传入的分区数或 spark.default.parallelism,如果是本地运行默认值是local的CPU核数,集群模式是分配的executor的总核数和2比较的大者。) - 热点 key 存在:若某个 key(如某大 V 用户 ID)在大表 A 或 B 中对应的数据量极大,则承载该 key 的分区会收到远超其他分区的数据量。
- 表现:该分区所在的 Task 成为瓶颈,运行时间过长、可能 OOM;整体 Job 时间被拖慢,资源利用率低。
2. 为什么 Sort-Merge Join 和 Hash Join 都会出现倾斜?
Sort-Merge Join | 两表按 key shuffle 后各自排序,再按序归并 | 相同 key 必然进入同一 reduce 分区,热点 key 所在分区数据量过大 |
Hash Join(Shuffle Hash Join) | 一侧构建 Hash 表,另一侧按 key 匹配 | 同样依赖按 key 的 shuffle,热点 key 所在分区数据量大,且 build 端 Hash 表可能 OOM |
Broadcast Join | 小表广播到各 executor,无 shuffle | 不产生倾斜 (每个 Task 本地 join),但只适用于小表 |
只要涉及按 key 的 shuffle,热点 key 就会导致倾斜。
3. 如何快速定位倾斜的 key?
- 看 Spark UI:在 Stages 页面查看各 Task 的输入数据量、处理时间,找出明显偏大的 Task,对应的是倾斜分区。
- 采样统计:对 Join key 做
groupByKey或countByKey,找出 count 最大的 top N 个 key。 - 自定义指标:在算子中统计每个 key 的数量,写入监控或日志,识别热点 key。
题目二:Join 倾斜的常见解决方案
要求:列举并说明 3~5 种 Spark Join 倾斜的解决思路,并说明各自适用场景。
参考答案要点
1. 广播 Join(Broadcast Join)
- 做法:把小表 broadcast 到各 executor,与大表做 map 侧 join,无 shuffle。
- 适用:小表足够小(通常 < 10MB~100MB,视 executor 内存而定)。
- 优点:彻底避免 shuffle,无倾斜;性能最好。
- 局限:大表无法 broadcast,只适用于小表。
2. 加盐打散(Salting / 两阶段 Join)
- 做法: 对倾斜 key 加随机前缀或者后缀,如 hotKey → hotKey_0、hotKey_1、…、hotKey_(N-1)大表:倾斜 key 加 N 个随机前缀,非倾斜 key 加 1 个固定前缀(如 _0)小表:数据膨胀 N 倍,每条记录复制 N 份并分别加上 _0~_N-1 后缀按「带前缀或者后缀的 key」做 Join,得到局部结果再去掉前缀或者后缀,按原始 key 做一次聚合(如 groupBy + sum)得到最终结果
- 适用:已知或可识别的热点 key,且小表数据量可接受膨胀。
- 优点:有效分散热点 key 的压力。
- 局限:小表膨胀会增加内存和网络开销;需要事先识别倾斜 key。
3. 拆分热点 key 单独处理
- 做法:将倾斜 key 过滤出来,单独做 Join(可用 broadcast join 或单机处理),非倾斜 key 做普通 Join,最后 union 结果。
- 适用:倾斜 key 数量较少,且可单独处理。
- 优点:思路清晰,避免影响正常 key 的 Join 性能。
- 局限:需要两次 Join + union,代码复杂度增加。
4. 增加分区数
- 做法:通过
spark.sql.shuffle.partitions或repartition增加 Join 的分区数。 - 适用:轻度倾斜,或配合其他方案使用。
- 局限:若某个 key 的数据量本身就远大于其他 key,增加分区无法让该 key 分散到多个分区,治标不治本。
5. 使用 Spark 3.x 的 AQE skew join 优化
- 做法:开启
spark.sql.adaptive.enabled和spark.sql.adaptive.skewJoin.enabled,AQE 会自动检测倾斜分区并拆分为多个子分区做 join。 - 适用:Spark 3.x,无需手动识别热点 key。
- 优点:自动化,对业务透明。
- 局限:依赖 AQE,某些极端倾斜场景可能仍需手动优化。
题目三:加盐打散 Join 的详细流程
要求:以「大表 A Join 小表 B,key hotUser 倾斜」为例,说明加盐打散 Join 的完整步骤,并解释为什么小表需要膨胀。
参考答案要点
流程概览
1 | 倾斜 key
加随机前缀 →
~
;非倾斜 key 加
| 每条记录复制 N 份,分别加
~
后缀 | 保证 Join 时 key 能一一对应 |
2 | 按
Join | 同上 | 热点 key 被分散到 10 个分区 |
3 | 去掉前缀,按原始 key 聚合 | - | 同一逻辑 key 的多个前缀桶结果合并 |
为什么小表需要膨胀?
- 大表中
hotUser被拆成hotUser_0~hotUser_9,会发往 10 个不同分区。 - 若小表中
hotUser对应的记录只有 1 条且不加后缀,Join 时该条记录只会进入 1 个分区,无法与另外 9 个分区中的hotUser_1~hotUser_9匹配。 - 因此需要将小表中
hotUser对应的记录复制 N 份,每份加上_0~_N-1后缀,这样每个分区都能拿到对应前缀的小表数据,Join 结果才能正确。
伪代码示例(Scala)
val numSalt = 10
val skewKeys = Set("hotUser")
// 大表加盐
val saltedA = bigTable.map { row =>
val key = row.getString(0)
val newKey = if (skewKeys.contains(key)) {
key + "_" + (Random.nextInt(numSalt))
} else {
key + "_0"
}
(newKey, row)
}
// 小表膨胀
val saltedB = smallTable.flatMap { row =>
val key = row.getString(0)
(0 until numSalt).map { i =>
(key + "_" + i, row)
}
}
// Join 后去盐聚合
saltedA.join(saltedB)
.map { case (saltedKey, (a, b)) => (saltedKey.dropRight(2), (a, b)) } // 去掉 _i
.groupByKey()
.mapValues(_.head) // 或按业务做聚合
题目四:Broadcast Join 与 Shuffle Join 的选择
要求:
- Spark 何时自动选择 Broadcast Join?
- 什么情况下 Broadcast Join 反而会变慢或 OOM?
- 如何手动指定 Broadcast Join?
参考答案要点
1. 自动选择 Broadcast Join 的条件
- 小表大小小于
spark.sql.autoBroadcastJoinThreshold(默认 10MB)。 - 等值 Join。
- 非 full outer join(Broadcast 不支持 full outer)。
2. Broadcast Join 变慢或 OOM 的原因
- 小表过大:超过 executor 内存或 broadcast 阈值,导致广播耗时、OOM。
- 大表分区过多:每个 Task 都要接收一份小表副本,网络和内存压力大。
- 倾斜在 build 端:若 broadcast 的是大表(不常见),会导致 driver 或 executor OOM。
3. 手动指定 Broadcast Join
-- Spark SQL SELECT /*+ BROADCAST(small_table) */ * FROM big_table JOIN small_table ON big_table.id = small_table.id
// DataFrame API import org.apache.spark.sql.functions.broadcast bigDF.join(broadcast(smallDF), "id")
题目五:综合场景题
场景:表 A 1 亿行,表 B 1000 万行,按 user_id Join。发现 user_id = 12345 在表 A 中有 5000 万行,其他 user_id 分布均匀。请问你会如何优化?
参考答案要点
- 识别问题:
user_id = 12345是热点 key,导致按user_idshuffle 时该分区严重倾斜。 - 方案选择: 若表 B 可 broadcast(< 百兆级):优先考虑 Broadcast Join,可规避 shuffle 倾斜。若表 B 无法 broadcast:采用拆分热点 key 或加盐打散: 将 user_id = 12345 从两表分别过滤出来,单独做 Join(可用 broadcast 或小规模处理)其余数据做普通 Join最后 union 两部分结果或对 user_id = 12345 加盐打散,小表相应膨胀,按「盐化 key」Join 后再聚合。
- Spark 3.x:可开启 AQE skew join,先观察是否自动缓解,若不足再叠加上述方案。
Spark Catalyst 与 Tungsten
题目
1. Catalyst 和 Tungsten 分别解决什么问题?一句话概括各自职责,并说明它们分别作用于 Spark 的哪个阶段(逻辑计划 / 物理计划 / 执行)。
2. Catalyst 的优化流程是怎样的?请说出至少 4 种常见的逻辑优化规则,并各举一个简单例子说明「优化前 / 优化后」的差异。
3. 什么是谓词下推(Predicate Pushdown)?结合「读 Parquet + WHERE + SELECT 部分列」的场景,说明 Catalyst 如何减少 I/O 和计算量。
4. Tungsten 的三大改进是什么?分别解决 JVM/传统执行引擎的什么问题?
5. 什么是 Whole-Stage Codegen(全阶段代码生成)?它如何提升 CPU 执行效率?和 RDD 的逐算子执行有什么本质区别?
6. 为什么 DataFrame/SQL 通常比等价的 RDD 实现更快?请从 Catalyst 和 Tungsten 两方面各举一点说明。
参考答案要点
1. Catalyst vs Tungsten:职责与作用阶段
Catalyst | 对 逻辑计划 做等价变换,少读少算、优化 join/聚合顺序等 | 逻辑计划 → 物理计划 (优化「要算什么」) |
Tungsten | 在 物理执行 时用二进制存储和代码生成,提升 CPU/内存效率 | 物理计划执行 (优化「怎么算」) |
- Catalyst:决定「算哪些数据、用哪种算子、顺序如何」。
- Tungsten:决定「数据在内存里怎么存、循环怎么跑、如何减少 GC 和虚调用」。
2. Catalyst 优化流程与常见规则
优化流程(简述):
- 解析:SQL / DataFrame API → 逻辑计划(Logical Plan)树。
- 逻辑优化:对逻辑计划应用一系列规则(Rule),如谓词下推、列剪裁等,得到优化后的逻辑计划。
- 物理计划:将逻辑计划转换为物理计划(Physical Plan),选择具体算子(如 BroadcastHashJoin、SortMergeJoin)。
- 成本模型 / 规则:在多个物理计划中选一个(或按规则选),生成可执行计划。
常见逻辑优化规则与示例:
谓词下推 | 把过滤条件尽量推到数据源或 join 前 |
变为
,数据源只读满足条件的行/块 |
列剪裁 | 只保留后续用到的列,不读无用列 |
时,Parquet 只读
、
列,不读其它列 |
常量折叠 | 在编译期计算常量表达式 |
→ 只保留
|
谓词/投影合并 | 合并多个 Filter、多个 Project |
合并为一次
|
3. 谓词下推 + 列剪裁示例
场景:SELECT name, age FROM users_parquet WHERE age > 18 AND city = 'Beijing'
- 不优化:全表扫描 Parquet,读所有列,再在 Spark 里做
age > 18和city = 'Beijing'。 - Catalyst 优化后: 谓词下推:把 age > 18、city = 'Beijing' 推到 Parquet Reader,利用 row group 统计信息跳过不满足的块,或只扫描满足条件的 row group。列剪裁:只读 name、age、city 三列(甚至只读过滤需要的列 + 最终要的列),不读其它列。
结果:I/O 和内存中的计算量都显著减少,逻辑等价。
4. Tungsten 三大改进
Unsafe Row / 二进制存储 | JVM 对象多、GC 压力大、缓存不友好 | 将多列压成连续二进制(类似 C struct),减少小对象和指针,可 off-heap |
Whole-Stage Codegen | 虚函数调用多、分支多、CPU 流水线利用率低 | 把多算子融合成一段手写风格的紧凑循环,减少调用与分支 |
Cache-friendly 聚合/Join | 传统 Hash 表随机访问多、cache miss 高 | 用连续内存/数组做 hash 表,列式访问,提高 cache 命中率 |
5. Whole-Stage Codegen 与 RDD 的区别
Whole-Stage Codegen:
- 把多个算子(如 Filter + Project + HashAggregate)在物理计划中合并为一个「代码生成节点」。
- 运行时动态生成一段类似手写的 Java 字节码(或 JVM 可执行的代码),在一个大循环里完成:读二进制 → 过滤 → 投影 → 聚合,中间少用 Row 对象和虚调用。
- 效果:CPU 流水线更满、分支更少、cache 更友好,吞吐提升。
与 RDD 的本质区别:
- RDD:每个算子(map、filter、reduceByKey)是独立的 Stage/迭代,数据以「对象」形式在算子间传递,每层都有虚调用和可能的新对象。
- Tungsten + Codegen:多个算子在同一段生成代码里完成,数据以二进制在寄存器/缓存中流动,几乎没有中间对象和跨算子调用。
<font style="color:#DF2A3F;">6. 为什么 DataFrame/SQL 比等价 RDD 更快?</font>
Catalyst 方面:
- 有 schema 和逻辑计划,可以做谓词下推、列剪裁等,少读少算;RDD 是「黑盒」元素,引擎无法做这类优化。
- 可以优化 join 顺序、选择 broadcast join 等,RDD 需要手写。
Tungsten 方面:
- DataFrame/SQL 的物理执行走 Unsafe Row + Codegen,内存和 CPU 效率高;RDD 默认是 JVM 对象 + 逐算子执行,GC 和虚调用开销大。
- 聚合/join 使用 Tungsten 的列式、cache-friendly 实现,而 RDD 的 groupByKey/reduceByKey 是通用对象流。
总结:DataFrame/SQL 在「逻辑层」被 Catalyst 少算少读,在「执行层」被 Tungsten 算得更快,所以同样业务逻辑下通常比 RDD 更快。
reduceByKey 数据倾斜
题目
场景:有一份用户行为 RDD,每条记录是 (userId, 行为次数),需要按 userId 做 reduceByKey 汇总每个用户的总次数。线上发现某个热点用户(如 "hotUser")的数据量极大,导致 reduceByKey 时该 key 所在分区处理时间远高于其他分区,出现明显数据倾斜。
要求:
- 说明 reduceByKey 数据倾斜 产生的原因(为什么某个 key 会集中到少数分区、带来什么问题)。
- 给出一种经典的解决思路:给倾斜的 key 加随机前缀打散 → 第一次 reduceByKey → 去掉前缀 → 第二次 reduceByKey,并说明每一步的作用。
- 为什么去掉前缀后不能只做 mapToPair,而必须再做一次 reduceByKey?请用简单例子说明。
- 除「加前缀打散」外,还能举出 1~2 种常见的数据倾斜处理思路吗?
参考答案要点
1. reduceByKey 数据倾斜产生的原因
- 分区方式:reduceByKey 按 key 的 hash(key) % numPartitions 决定该 key 去哪个分区。同一个 key 一定落在同一个分区。
- 倾斜表现:若某个 key(如
"hotUser")对应的 (key, value) 条数或数据量远大于其他 key,则承载该 key 的分区会收到远多于其他分区的数据,该分区成为瓶颈。 - 带来的问题:该分区所在 Task 运行时间过长、可能 OOM;整体作业时间被拖慢,资源利用不均。
2. 经典解法:加前缀打散 + 两次 reduceByKey
思路:让「热点 key」不再以单一 key 出现,而是拆成多个「带随机前缀的 key」,把压力分散到多个分区;先对带前缀的 key 做一次聚合,再去掉前缀做第二次聚合,得到与原始 reduceByKey 等价的结果。
1. 打散 | 对已知的倾斜 key,在 map 阶段加随机前缀,如
→
、
、…、
| 同一逻辑 key 被拆成 N 个不同 key,分散到不同分区,减轻单分区压力 |
2. 第一次 reduceByKey | 对「带前缀的 key」做聚合 | 在各分区内先做局部聚合,数据量已大幅下降 |
3. 去掉前缀 | mapToPair:把
还原为
,其他 key 不变 | 恢复「逻辑 key」,为最终汇总做准备 |
4. 第二次 reduceByKey | 对「原始 key」再做一次聚合 | 把同一逻辑 key 的多个前缀桶的部分结果合并成最终一个值,等价于原始语义 |
伪代码示例:
// 假设 pairRdd 是 JavaPairRDD<String, Integer>,skewKey = "hotUser",numSalt = 10
JavaPairRDD<String, Integer> salted = pairRdd.mapToPair(kv -> {
if (skewKey.equals(kv._1)) {
int prefix = new Random().nextInt(numSalt);
return new Tuple2<>(kv._1 + "_" + prefix, kv._2);
}
return kv;
});
JavaPairRDD<String, Integer> firstAgg = salted.reduceByKey(Integer::sum);
JavaPairRDD<String, Integer> finalResult = firstAgg
.mapToPair(kv -> {
if (kv._1.startsWith(skewKey + "_"))
return new Tuple2<>(skewKey, kv._2);
return kv;
})
.reduceByKey(Integer::sum);
3. 为什么去掉前缀后必须再做一次 reduceByKey?
- 第一次 reduceByKey 之后,同一个逻辑 key(如
hotUser)会对应多条记录:(hotUser_0, sum0)、(hotUser_1, sum1)、…。 - 去掉前缀后变成:
(hotUser, sum0)、(hotUser, sum1)、…,即同一个 key 对应多个 value,还没有聚合成「一个总结果」。 - 若只做 mapToPair 去掉后缀而不做第二次 reduceByKey,结果集中会有多个
(hotUser, 部分和),不符合「按 key 聚合成一个值」的语义。 - 因此必须对「去掉前缀后的 RDD」再做一次 reduceByKey,把同一 key 的多个部分和合并成最终的一个和,才与「不做打散时的 reduceByKey」结果等价。
4. 其他常见的数据倾斜处理思路
- 过滤或单独处理热点 key:若业务允许,可先把倾斜 key 过滤出来单独算(如单机或小 RDD),再与正常 key 的结果合并。
- 增加分区数:适当增加 reduceByKey 的分区数(如
reduceByKey(func, numPartitions)),可减轻单分区数据量,但若倾斜 key 仍远多于其他 key,单 key 仍会集中在一个分区,治标不治本;配合「打散」更有效。 - 两阶段聚合(加前缀打散):即本题中的经典方案,适合已知或可识别的热点 key。
- 使用广播 + map 侧 join:若倾斜发生在 join 上,可把小表广播,避免大表按倾斜 key shuffle;或对倾斜 key 单独用 broadcast join,非倾斜 key 用普通 join 再 union。
mapPartitions
5.1. mapPartitions 用法总结
1. 是什么
- mapPartitions 是 Spark RDD 的 Transformation 算子,对 RDD 的每个分区施加一个函数,输入是分区内元素的迭代器
Iterator[T],输出是新的迭代器Iterator[U]。 - 与 map 的区别:map 是逐元素处理(一个输入 → 一个输出);mapPartitions 是按分区处理(一个分区 → 一个迭代器),在分区级别做批量逻辑。
2. 为什么用
减少调用次数 | 分区内 N 条只调用 1 次函数,而不是 N 次,降低序列化/反序列化和任务调度开销。 |
分区级共享 | 可在分区内建连接池、打开文件、初始化资源,整分区复用,避免每条数据都创建/关闭连接。 |
批量 I/O | 适合按分区批量写库、批量写文件,比逐条写更高效。 |
与分区数对应 | 分区数 = 并行度;若 1 个分区对应 1 个数据源分片(如 1 个 dt),则 mapPartitions 自然实现“按分片并行”。 |
3. 怎么用(Java 示例)
// 签名:mapPartitions(f: Iterator[T] => Iterator[U])
JavaRDD<String> result = rdd.mapPartitions((Iterator<Row> partition) -> {
List<String> out = new ArrayList<>();
while (partition.hasNext()) {
Row row = partition.next();
out.add(process(row)); // 或做批量 DB 写入等
}
return out.iterator();
});
- 入参:当前分区内元素的
Iterator<T>。 - 返回值:新元素的
Iterator<U>(可为空迭代器)。 - 注意:不要在处理中缓存整个分区到内存(如
iter.toList()再慢慢处理),数据量大时易 OOM;应流式处理迭代器。
4. 与分区数的关系
- mapPartitions 的并行度 = RDD 的分区数:有几个分区,该算子就会起几个 Task,每个 Task 处理一个分区。
- 分区数由上游决定(如
sc.parallelize(..., 4)、df.repartition(4)、JDBC 读时的numPartitions、按 dt 分次读再 union 等),不是由 mapPartitions 自己决定。 - 若希望“4 个 dt 分区 → 4 个 Spark 分区并行”,需要在读数据时保证 1 个 dt 对应 1 个 RDD 分区(例如按 dt 分次读再 union),mapPartitions 就会在 4 个分区上并行。
5. 典型用法小结
- 逐行转换:分区内遍历迭代器,每条转成新类型,返回新迭代器(等价于 map,但调用次数少)。
- 分区内建连接:在迭代器处理前创建 DB 连接/连接池,分区内每条复用,处理完关闭,避免每条数据都建连。
- 按分区批量写:分区内攒一批再写 HBase/MySQL/文件,减少 I/O 次数。
- 与 repartition/coalesce 配合:先调整分区数再 mapPartitions,控制并行度与数据倾斜。
5.2. 大数据面试题:mapPartitions
题目:
在 Spark 中,从一张按天分区的 Hive 表(分区字段为
dt)里读取最近 7 天的数据,要求:
- 按天(dt)并行读取,即 7 个分区并行;
- 使用 mapPartitions 对每个分区的数据做处理(例如清洗、过滤或聚合);
- 最终每个分区只输出前 100 条记录。
请简要说明实现思路(读数据、分区设计、mapPartitions 里做什么),并指出若改用 map 实现“每个分区只输出前 100 条”会有什么问题。
参考答案要点:
- 读数据与分区设计先拿到最近 7 天的 dt 列表(如通过 Hive 元数据或一次小查询)。按 每个 dt 分别读(例如每个 dt 一个 spark.read.table(...).filter($"dt" === dt) 或 JDBC 按 dt 分次查),再 union 成一个大 DataFrame/RDD,这样自然得到 7 个 RDD 分区,与 7 个 dt 一一对应,实现“按天并行”。
- mapPartitions 里做什么输入:当前分区内该 dt 的所有记录(可能很多)。逻辑:在分区内只取前 100 条(例如用计数器,或转成列表后 take(100)),再做业务处理(清洗/过滤等),返回这最多 100 条结果的迭代器。这样每个分区最多输出 100 条,总数据量可控,且 7 个分区并行执行。
- 若用 map 实现“每个分区只输出前 100 条”会有什么问题map 是逐元素无状态的,单条记录无法知道“自己是不是分区内前 100 条”,无法在 map 内做“分区内前 100”的截断。要实现“每分区前 100 条”,要么在 map 之后用 take(100) 等行动算子(但那是全分区或全局的语义,不是“每个分区各 100 条”),要么在 mapPartitions 里按分区维度处理,在分区内计数/截断。因此“每个分区只输出前 N 条”这类分区级逻辑,用 mapPartitions 更合适;用 map 难以正确且高效地实现。
加分点:
- 能区分 RDD 分区数与业务上的“天(dt)分区”,并说明如何通过读数据方式让两者一致。
- 能提到 mapPartitions 的注意点:分区内不要一次性把迭代器全拉入内存,否则大分区易 OOM;应流式处理或限制条数(如前 100 条)。
reduceByKey 与 groupByKey
题目
场景:有一份日志 RDD,每条记录是 (userId, 点击次数),例如:
(user1, 3), (user2, 1), (user1, 2), (user3, 5), (user2, 4), ...
请用 Spark RDD 实现:按 userId 汇总每个用户的总点击次数。
要求:
- 分别用 groupByKey 和 reduceByKey 两种方式实现。
- 从 Shuffle 数据量、内存、适用场景 三方面比较这两种写法,并说明在大数据场景下更推荐哪一种及原因。
- 若坚持用
groupByKey做「按 key 求和」,如何改进能减少 Shuffle?(提示:可考虑先做一次「本地聚合」再 groupByKey,或换用其他算子。)
参考答案要点
1. 两种实现
reduceByKey:
pairRDD.reduceByKey(Integer::sum);
- 先在各分区内按 key 做本地聚合(map 端 combine),再 shuffle,最后再按 key 做最终聚合。
groupByKey:
pairRDD.groupByKey().mapValues(iter -> {
int sum = 0;
for (Integer v : iter) sum += v;
return sum;
});
- 先把同一 key 的所有 value 都 shuffle 到同一分区,再在分区内对每个 key 的 value 列表求和。
2. 对比(为什么说 reduceByKey 更优)
Shuffle 数据量 | 分区内先聚合,每个 key 只传一个「部分和」,数据量小 | 同一 key 的每条 (userId, 点击次数) 都参与 shuffle,数据量大 |
内存 | reduce 端只对「部分和」再聚合,单 key 占用小 | reduce 端要先收齐同一 key 的 全部 value 再处理,易 OOM |
适用场景 | 按 key 做聚合(sum、max、count 等) | 必须保留「每个 key 下的全部 value 列表」时(如去重、排序、复杂逻辑) |
结论:做「按 key 求和」这类聚合时,应优先用 reduceByKey(或 aggregateByKey),避免用 groupByKey。
3. 若用 groupByKey 做「按 key 求和」的改进
- 不推荐继续用 groupByKey 做求和;应改为 reduceByKey 或 aggregateByKey,本质上就是「先本地聚合再 shuffle」。
- 若题目坚持 groupByKey:可说明「先 map 端用 HashMap 做一次本地 sum(类似 combine),再按 key 发出去」,但这等价于 reduceByKey 的 combiner 思路,标准答案仍是:这类聚合场景用 reduceByKey,避免 groupByKey。
一句话总结
按 key 做聚合(如求和、求最大)时,用 reduceByKey 会先做本地聚合再 shuffle,Shuffle 量和内存都更小;groupByKey 会把同一 key 的所有 value 都 shuffle 过去,不做合并,只适合「必须保留全量 value」的场景。
// 数据: (部门, 员工名)
JavaPairRDD<String, String> deptEmployee = jsc.parallelizePairs(Arrays.asList(
new Tuple2<>("技术", "张三"),
new Tuple2<>("产品", "李四"),
new Tuple2<>("技术", "王五"),
new Tuple2<>("技术", "赵六")
));
// groupByKey -> 每个 key 对应一个 Iterable<value>
JavaPairRDD<String, Iterable<String>> byDept = deptEmployee.groupByKey();
byDept.collect().forEach(t -> {
System.out.print(t._1 + ": ");
t._2.forEach(name -> System.out.print(name + " "));
System.out.println();
});
// 技术: 张三 王五 赵六
// 产品: 李四
aggregateByKey
题目
场景**:有一份学生成绩 RDD,每条记录是 **(学生姓名, 单科分数)**,例如:**
(张三, 80), (李四, 90), (张三, 70), (王五, 85), (李四, 88), (张三, 90), ...
请用 Spark RDD 实现:按学生姓名汇总,得到每个学生的平均分(结果类型为 **(姓名, 平均分)**)。
要求**:**
- 用 aggregateByKey 实现「按 key 求平均」,并写出
**zeroValue**、**seqOp**、**combOp**** 的含义。** - 为什么这里不能用 reduceByKey 直接求平均?reduceByKey 和 aggregateByKey 在「输入/输出类型」上有什么本质区别?
- 若用 groupByKey 再在 value 上求平均,和 aggregateByKey 相比有什么劣势?
参考答案要点
1. 用 aggregateByKey 实现「按 key 求平均」
思路:先按 key 聚合成 **(总分, 个数)**,再算 **总分/个数**。
Java 示例:
// 假设 scores 是 JavaPairRDD<String, Integer>
JavaPairRDD<String, Tuple2<Integer, Integer>> sumCount = scores.aggregateByKey(
new Tuple2<>(0, 0), // zeroValue: 初始 (总分=0, 个数=0)
(acc, v) -> new Tuple2<>(acc._1 + v, acc._2 + 1), // seqOp: 分区内,累加分数和次数
(a, b) -> new Tuple2<>(a._1 + b._1, a._2 + b._2) // combOp: 分区间,合并两个 (sum, count)
);
JavaPairRDD<String, Double> avgByKey = sumCount.mapValues(t -> (double) t._1 / t._2);
参数含义:
zeroValue | 每个 key 的初始聚合值,这里用
表示 (总分, 个数),类型可与 value 不同 |
seqOp | **分区内:当前聚合值
遇到一个分数
,更新为 **
|
combOp | **shuffle 后分区间:把两个
和
合并成 **
|
2. 为什么不能用 reduceByKey 直接求平均?类型上的区别?
- reduceByKey**:聚合函数是
**(V, V) => V**,输入和输出类型都是 V。若 value 是分数**Int**,只能不断「两个分数合并成一个分数」(如相加),最终得到的是总分,无法同时维护「个数」来算平均。** - aggregateByKey**:可以指定聚合结果类型 U ≠ V。这里 value 是
**Int**,聚合中间结果是**(Int, Int)**即 (总分, 个数),最后再**mapValues**算平均。所以「按 key 求平均」这类结果类型与 value 不同的聚合,要用 aggregateByKey(或先 map 成 (key, (v,1)) 再用 reduceByKey,但写法更啰嗦)。**
本质区别**:reduceByKey 是「同类型聚合」;aggregateByKey 支持「异类型聚合」和自定义初始值、分区内/分区间不同逻辑。**
3. groupByKey 再求平均 vs aggregateByKey
Shuffle 量 | 分区内先聚成 (sum, count),只 shuffle 少量元组 | 同一 key 的 每条分数 都 shuffle,数据量大 |
内存 | reduce 端只维护 (sum, count) | reduce 端要收齐该 key 的 全部分数列表 ,易 OOM |
适用 | 聚合场景首选 | 需要「全量 value 列表」时才用 |
结论:做「按 key 求平均」应用 aggregateByKey,避免 groupByKey。
一句话总结
aggregateByKey 适合「按 key 聚合且结果类型与 value 不同」的场景(如按 key 求平均、按 key 构造 (sum, count) 或集合);理解 zeroValue、seqOp(分区内)、combOp(分区间)三个参数,并能与 reduceByKey、groupByKey 区分即可。
// 数据: (学生名, 分数)
JavaPairRDD<String, Integer> scores = jsc.parallelizePairs(Arrays.asList(
new Tuple2<>("张三", 80),
new Tuple2<>("李四", 90),
new Tuple2<>("张三", 70),
new Tuple2<>("李四", 85),
new Tuple2<>("张三", 90)
));
// aggregateByKey(初始值)(分区内合并, 分区间合并)
// 初始值: (0, 0) 表示 (总分, 个数)
// seqOp: 分区内,把当前分数合并进 (sum, count)
// combOp: shuffle 后,把两个 (sum, count) 合并
JavaPairRDD<String, Tuple2<Integer, Integer>> sumCount = scores.aggregateByKey(
new Tuple2<>(0, 0), // zeroValue: (sum=0, count=0)
(acc, v) -> new Tuple2<>(acc._1 + v, acc._2 + 1), // seqOp: 分区内
(a, b) -> new Tuple2<>(a._1 + b._1, a._2 + b._2) // combOp: 分区间
);
// 再算平均
JavaPairRDD<String, Double> avgByKey = sumCount.mapValues(t -> (double) t._1 / t._2);
avgByKey.collect().forEach(t -> System.out.println(t._1 + " -> " + t._2));
// 张三 -> 80.0, 李四 -> 87.5
repartition 与 coalesce
题目
场景:某 Spark 任务从 1000 个分区读取数据,经过 filter 后大量分区变空或数据很少,最后要写出到 HDFS,希望只生成 10 个文件。
要求:
- 应使用 repartition(10) 还是 coalesce(10)?说明理由。
- repartition 和 coalesce 在「是否 shuffle」「分区数增减」「典型用途」上的区别是什么?
- 什么情况下必须用 repartition、什么情况下优先用 coalesce?
参考答案要点
1. 本例应选 coalesce(10)
- 目标:从很多分区减少到 10 个分区,且主要是合并分区、写出少量文件。
- coalesce(10):只做分区合并,默认不触发全量 shuffle,性能更好,适合「只减不增」。
- repartition(10):会触发全量 shuffle,数据会重新均匀分布,本例不需要重分布,用 repartition 会多一次不必要的 shuffle。
结论:减少分区且不要求重新打散数据时,优先用 coalesce(10)。
2. repartition 与 coalesce 对比
是否 shuffle | 会,全量重分布 | 默认不 shuffle,只合并现有分区 |
分区数 | 可增可减 | 一般只减不增(不 shuffle 时无法真正增加) |
典型用途 | 增加分区、均匀重分布、主动 shuffle | 减少分区、合并小分区、写少量文件 |
3. 何时用 repartition / coalesce
- 必须或适合用 repartition: 需要增加分区数(如从 2 个分区扩到 100 个);需要均匀重分布(如 filter 后数据倾斜,想重新打散);宽依赖、需要一次按新分区规则的 shuffle。
- 优先用 coalesce: 只需要减少分区数(如 1000 → 10);filter 后分区过多、过小,想合并成较少分区再计算或写文件;不要求分区内数据绝对均匀,能接受「相邻分区合并」带来的略不均匀。
补充:coalesce(n, shuffle = true) 会触发 shuffle,效果接近 repartition(n),在「减分区但希望更均匀」时可用。
一句话总结
减少分区、且不想全量 shuffle 时用 coalesce;增加分区或需要均匀重分布时用 repartition。
Spark Action 算子
题目一:Transformation 和 Action 的区别
问题**:Transformation 和 Action 的区别是什么?各举 3 个例子,并说明「真正的计算」是在哪一类算子执行时触发的?**
参考答案要点
Transformation | 定义 RDD 的转换关系,返回新 RDD | 否,懒执行 | map、filter、reduceByKey、groupByKey |
Action | 需要得到结果或产生副作用,触发 Job | 是 | collect、count、take、reduce、foreach、saveAsTextFile |
触发计算**:只有遇到 Action 时,Spark 才会根据前面的 Transformation 链生成 DAG、划分 Stage、提交 Job 并执行。Transformation 只是记录依赖,不执行。**
题目二:collect() 与 OOM、替代方案
问题**:**collect()** 为什么在大数据场景下容易导致 Driver OOM?在什么场景下可以用哪些 Action 替代?**
参考答案要点
- collect()** 会把 整个 RDD 的数据拉到 Driver 内存。数据量大时,Driver 内存装不下就会 OOM。**
- 替代思路**:** 只要前几条:用 take(n) 或 first()。要个数:用 count()。要写出去:用 saveAsTextFile / saveAsSequenceFile 等,不要先 collect 再写。要抽样:takeSample 或先 sample 再 take/collect。
- 原则**:大数据量下尽量避免对全量 RDD 做 collect(),优先用 take、count、写文件、抽样等 Action。**
题目三:reduce 与 fold
问题**:**reduce** 和 **fold** 的区别是什么?使用 **fold** 时对 zeroValue 有什么要求?**
参考答案要点
- reduce**:
**(T, T) => T**,无初始值,从第一个元素开始聚合;空 RDD 会报错。** - fold**:有 zeroValue,每个分区先以 zeroValue 为起点聚合,最后分区间合并时还会再用一次 zeroValue。**
- 对 fold 的 zeroValue 要求**:必须是该聚合运算的 单位元(和任何元素运算不改变结果),例如:求和用 0,求积用 1,列表拼接用空列表。否则会多算多次 zeroValue,结果错误。**
一句话总结
Action 才会触发计算;collect 易 OOM,大数据量用 take/count/save 等替代;fold 的 zeroValue 必须是单位元。
coalesce 减少分区为什么不产生 shuffle
题目
问题**:Spark 的 **coalesce** 减少分区时,为什么说不产生 shuffle?相邻的多个分区会合并为一个分区,数据不是从多个父 RDD 传到子 RDD 吗,这样难道不产生 shuffle 吗?**
参考答案要点
1. Spark 里「shuffle」指什么?
- Shuffle** = Map 端按 Partitioner(如
**hash(key) % numPartitions**)把数据重新分区并写出(shuffle write),Reduce 端再按新分区拉取(shuffle read)。** - 特点:每条数据都会按新的分区规则重新决定去哪个分区,通常伴随大量跨节点传输。
2. coalesce 减少分区时在做什么?
- 不引入新的 Partitioner**,不做「按 key 重新分区」。**
- 只是做分区索引的合并:例如父 RDD 的 partition 0、1 → 子 RDD 的 partition 0;partition 2、3 → partition 1……
- 子分区和父分区的对应关系是固定的、按索引连续合并,不是按 key 重分布。
- 实现上:子 RDD 的一个分区对应一个 task,该 task 按顺序读那几个父分区的迭代器(或拉取块),在内存里拼成一个大迭代器,不经过「按 key 写 + 按分区读」的 shuffle 流程。
3. 为什么说「不产生 shuffle」?
是否按新 Partitioner 重算每条记录的去向 | 是 | 否,只按分区索引合并 |
是否有 shuffle write + shuffle read | 有(ShuffleDependency) | 无 |
数据移动方式 | 全量重分布,通常跨节点 | 子分区直接读/拉取那几个父分区;同节点则本地合并 |
- 同一 Executor 上的多个父分区****合并时:直接在本地拼数据,无网络、无 shuffle。
- 跨 Executor** 时:需要拉取远程分区数据,走的是块拉取,不是「先按 Partitioner 写、再 shuffle read」的机制,在 API/依赖类型上仍不算一次 shuffle。**
- 常见场景(如 1000 分区 → 10 分区):很多合并发生在同节点内,因此说「默认不触发全量 shuffle」。
4. 一句话总结
coalesce 减分区是「按索引合并分区 + 本地合并或按块拉取」,不做按 key 的重新分区,不经过 ShuffleDependency,因此不产生(我们平时说的)shuffle;数据确实从多个父分区进到一个子分区,但传的方式不是 shuffle 的写+读两阶段。
Spark 宽依赖与窄依赖
题目
1. 什么是窄依赖、什么是宽依赖?各举 3 个算子例子,并说明两者在「分区对应关系」和「是否 shuffle」上的区别。
2. Spark 为什么要区分宽依赖和窄依赖?Stage 是如何根据依赖类型划分的?
3. 给定一段 RDD 链:**textFile → map → filter → reduceByKey → collect**,请标出宽依赖、窄依赖,并说明会划分成几个 Stage、为什么。
参考答案要点
1. 窄依赖 vs 宽依赖
定义 | 父 RDD 的每个分区最多被 一个 子 RDD 分区使用 | 父 RDD 的一个分区可能被 多个 子 RDD 分区使用 |
分区对应 | 一对一或多对一 | 一对多(需按 key 重分布) |
是否 shuffle | 否 | 是 |
典型算子 | map、filter、flatMap、mapPartitions、union | groupByKey、reduceByKey、repartition、join(按 key 时) |
2. 为什么要区分?Stage 如何划分?
- Stage 划分**:从 Action 往前回溯 DAG,遇到宽依赖就切一刀,形成 Stage 边界。每个 Stage 内部是一串窄依赖,可以合并成一批 Task 流水线执行;宽依赖处必须等前一 Stage 写出 shuffle 结果,再启动下一 Stage。**
- 容错**:窄依赖只需重算丢失的父分区;宽依赖可能要从多个父分区重算并重新 shuffle。**
- 调度与并行**:窄依赖可 pipeline,宽依赖是 shuffle 边界,决定 Task 划分和网络传输。**
3. 示例 RDD 链的依赖与 Stage
textFile → map → filter → reduceByKey → collect
[窄] [窄] [宽] Action
- 窄依赖**:
**textFile → map**、**map → filter**(无 shuffle,分区一一对应或合并)。** - 宽依赖**:
**filter → reduceByKey**(reduceByKey 按 key 重分区,需要 shuffle)。** - Stage 数量**:2 个。 ** **Stage1:textFile → map → filter(一串窄依赖,可 pipeline)。 **Stage2:reduceByKey 之后到 collect(以 reduceByKey 的 shuffle 为边界)。
一句话总结
窄依赖 = 无 shuffle、父分区至多对应一个子分区;宽依赖 = 有 shuffle、一对多。Stage 在宽依赖处切分,窄依赖在 Stage 内流水线执行。
#大数据面试##大数据##spark#
腾讯成长空间 6071人发布