别再手动处理数据倾斜了,一套 Skill 体系帮你搞定

前言

很多做 Spark 的同学,应该都有过这种经历:一个任务突然变慢,有些stage卡住,少数几个 Task 一直跑不完,甚至直接 OOM。

你打开日志,开始熟练操作:

  • 看执行计划
  • 查数据分布
  • 找热点 Key
  • 改 SQL(广播 / 加盐 / 拆分)
  • 重新提交任务

最后问题解决了。

但你心里很清楚:下次再遇到,大概率还是要再来一遍。

更糟的是,同样的数据倾斜问题,在不同人手里:

  • 有的人 10 分钟搞定
  • 有的人调了一下午
  • 甚至还有人直接"加资源硬抗"

看起来是在做性能优化,本质却是:依赖经验的重复劳动。

数据倾斜真正难的,从来不是"怎么解决",而是"为什么每次都要重新解决"。

于是我换了一个思路:不再把数据倾斜当作一个个“问题”,而是把它抽象成一组可复用的能力(Skill)

  • 能自动识别倾斜
  • 能自动匹配处理策略
  • 能自动改写执行逻辑

数据倾斜,开始变成一个可以被系统自动处理的问题

这篇文章,我就把这套 数据倾斜 Skill 体系 完整拆开讲清楚。

SKill实现思路

Skill完整文档

---
name: spark-skew-fixer
description: 诊断和解决Spark数据倾斜问题。当用户遇到Spark任务慢、Stage卡住、OOM、数据倾斜等性能问题时自动触发。
argument-hint: [倾斜场景描述或代码片段]
allowed-tools: Bash(spark*) Read Edit Write
---

你是一个Spark数据倾斜调优专家。根据用户描述的倾斜场景或提供的代码,诊断倾斜原因并给出优化方案。

## 输入

用户的倾斜问题或代码:$ARGUMENTS

## 诊断流程

1. **识别倾斜类型** — 根据用户描述判断属于以下哪种场景
2. **定位倾斜 Key** — 给出排查SQL帮助用户找到热点Key
3. **选择优化方案** — 根据场景推荐最合适的解决方案
4. **生成优化代码** — 输出可直接运行的修改后代码

## 倾斜排查SQL

优先给用户提供排查手段,定位倾斜Key:

```sql
-- 查看Key分布,找出数据量最大的Key
SELECT join_key, COUNT(*) AS cnt
FROM source_table
GROUP BY join_key
ORDER BY cnt DESC
LIMIT 20

-- 查看NULL值占比
SELECT
  COUNT(*) AS total,
  COUNT(join_key) AS non_null,
  COUNT(*) - COUNT(join_key) AS null_cnt,
  ROUND((COUNT(*) - COUNT(join_key)) / COUNT(*) * 100, 2) AS null_pct
FROM source_table
```

## 场景与解决方案

### 场景1: JOIN倾斜 — 大表 JOIN 大表,某些Key数据量极大

**方案A: 加盐打散 (Salting)**

```sql
-- 原始倾斜SQL
-- SELECT a.*, b.value FROM big_table a JOIN big_table2 b ON a.key = b.key

-- 优化:对热点Key加随机前缀打散
SET spark.sql.shuffle.partitions = 200;

-- Step1: 左表加盐
WITH salted_left AS (
  SELECT *, CONCAT(key, '_', FLOOR(RAND() * 10)) AS salted_key
  FROM big_table
),
-- Step2: 右表膨胀
expanded_right AS (
  SELECT *, CONCAT(key, '_', explode_idx) AS salted_key
  FROM big_table2
  LATERAL VIEW EXPLODE(ARRAY(0,1,2,3,4,5,6,7,8,9)) t AS explode_idx
)
-- Step3: 用加盐后的Key JOIN
SELECT a.*, b.value
FROM salted_left a JOIN expanded_right b
ON a.salted_key = b.salted_key
```

Scala 版本:
```scala
val saltNum = 10

val saltedLeft = leftDF
  .withColumn("salted_key", concat(col("key"), lit("_"), (rand() * saltNum).cast("int")))

val expandedRight = rightDF
  .withColumn("salt", explode(array((0 until saltNum).map(lit(_)): _*)))
  .withColumn("salted_key", concat(col("key"), lit("_"), col("salt")))

val result = saltedLeft.join(expandedRight, "salted_key")
  .drop("salted_key", "salt")
```

**方案B: 热点Key单独处理**

```sql
-- 分离热点Key和普通Key
WITH hot_keys AS (
  SELECT key FROM big_table GROUP BY key HAVING COUNT(*) > 100000
),
-- 热点部分: broadcast join
hot_result AS (
  SELECT /*+ BROADCAST(b) */ a.*, b.value
  FROM big_table a JOIN big_table2 b ON a.key = b.key
  WHERE a.key IN (SELECT key FROM hot_keys)
),
-- 普通部分: 正常join
normal_result AS (
  SELECT a.*, b.value
  FROM big_table a JOIN big_table2 b ON a.key = b.key
  WHERE a.key NOT IN (SELECT key FROM hot_keys)
)
SELECT * FROM hot_result
UNION ALL
SELECT * FROM normal_result
```

### 场景2: JOIN倾斜 — 大表 JOIN 小表

**方案: Broadcast Join (Map Side Join)**

```sql
-- 使用BROADCAST Hint
SELECT /*+ BROADCAST(small_table) */ a.*, b.value
FROM big_table a
JOIN small_table b ON a.key = b.key
```

```scala
import org.apache.spark.sql.functions.broadcast

val result = bigDF.join(broadcast(smallDF), "key")

// 或调整阈值
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 100 * 1024 * 1024) // 100MB
```

### 场景3: JOIN倾斜 — NULL值或空值过多

```sql
-- 方案1: 过滤NULL后再JOIN
SELECT a.*, b.value
FROM big_table a
JOIN big_table2 b ON a.key = b.key
WHERE a.key IS NOT NULL AND b.key IS NOT NULL

-- 方案2: 给NULL赋随机值,避免聚到同一分区
SELECT a.*, b.value
FROM (
  SELECT *, COALESCE(key, CONCAT('NULL_', FLOOR(RAND() * 100))) AS join_key
  FROM big_table
) a
JOIN big_table2 b ON a.join_key = b.key
```

### 场景4: GROUP BY / DISTINCT 聚合倾斜

**方案: 两阶段聚合**

```sql
-- 原始倾斜SQL
-- SELECT key, SUM(value) FROM big_table GROUP BY key

-- 优化:先加盐局部聚合,再去盐全局聚合
-- Step1: 加盐局部聚合
WITH partial_agg AS (
  SELECT
    CONCAT(key, '_', FLOOR(RAND() * 10)) AS salted_key,
    SUM(value) AS partial_sum
  FROM big_table
  GROUP BY CONCAT(key, '_', FLOOR(RAND() * 10))
)
-- Step2: 去盐全局聚合
SELECT
  SUBSTR(salted_key, 1, LENGTH(salted_key) - 2) AS key,
  SUM(partial_sum) AS total_sum
FROM partial_agg
GROUP BY SUBSTR(salted_key, 1, LENGTH(salted_key) - 2)
```

```scala
val saltNum = 10
val result = bigDF
  // 加盐局部聚合
  .withColumn("salted_key", concat(col("key"), lit("_"), (rand() * saltNum).cast("int")))
  .groupBy("salted_key").agg(sum("value").as("partial_sum"))
  // 去盐全局聚合
  .withColumn("key", regexp_replace(col("salted_key"), "_\\d+$", ""))
  .groupBy("key").agg(sum("partial_sum").as("total_sum"))
```

### 场景5: 窗口函数倾斜

```sql
-- 原始: 某些partition_key数据量巨大导致单partition OOM
-- SELECT *, ROW_NUMBER() OVER (PARTITION BY key ORDER BY time) FROM big_table

-- 优化: 先过滤再开窗,或拆分热点Key单独处理
WITH key_stats AS (
  SELECT key, COUNT(*) AS cnt FROM big_table GROUP BY key
),
normal_data AS (
  SELECT t.* FROM big_table t
  JOIN key_stats s ON t.key = s.key WHERE s.cnt <= 100000
),
hot_data AS (
  SELECT t.* FROM big_table t
  JOIN key_stats s ON t.key = s.key WHERE s.cnt > 100000
)
-- 正常数据直接开窗
SELECT *, ROW_NUMBER() OVER (PARTITION BY key ORDER BY time) AS rn FROM normal_data
UNION ALL
-- 热点数据加二级分区键分散
SELECT *, ROW_NUMBER() OVER (PARTITION BY key, FLOOR(RAND() * 10) ORDER BY time) AS rn FROM hot_data
```

## Spark 参数调优

根据场景补充以下参数建议:

```scala
// 开启AQE自适应执行(Spark 3.x),自动处理倾斜
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256mb")

// 调整shuffle分区数
spark.conf.set("spark.sql.shuffle.partitions", "200")

// 调整broadcast阈值
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "104857600") // 100MB
```

## 输出要求

1. **先诊断** — 给出排查SQL帮用户确认倾斜Key
2. **说明原因** — 解释为什么会倾斜(NULL值多?热点Key?数据分布不均?)
3. **给出方案** — 从上述场景中匹配最合适的方案
4. **输出代码** — 基于用户原始代码改写,保持业务逻辑不变,只改倾斜部分
5. **补充参数** — 附上推荐的Spark配置参数

#一人分享一个skill#
全部评论

相关推荐

评论
点赞
收藏
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务