大数据面试2小时前冲刺必备:大厂高频大数据面经【中】(大数据开发篇-多张原理图)
Hadoop 的核心组件有哪些?各自的作用是什么?(脉脉、微信、银联)
Hadoop 是一个开源的分布式计算框架,旨在解决海量数据的存储与计算问题。其核心组件主要包括 HDFS(Hadoop Distributed File System)、YARN(Yet Another Resource Negotiator)、MapReduce 和 Hadoop Common。
HDFS: 是 Hadoop 的分布式文件系统,主要用于大规模数据的可靠存储。
- NameNode: 管理元数据,如文件目录结构、文件与块的映射关系、块的副本信息等。
- DataNode: 实际存储数据块,周期性向 NameNode 汇报状态。
- SecondaryNameNode: 辅助 NameNode 合并 fsimage 和 editlog,降低启动时加载成本。
YARN: 是资源管理与任务调度框架。
- ResourceManager: 管理集群资源,负责全局资源分配和调度。
- NodeManager: 管理节点级别资源和任务执行,负责容器(Container)的生命周期。
- ApplicationMaster: 针对每个应用管理其生命周期,包括资源请求、任务调度与容错。
MapReduce: 是一种编程模型,用于大规模数据的并行处理。包括 Map 阶段和 Reduce 阶段,分别用于数据的过滤、转换与聚合。
- JobTracker(1.x)/ApplicationMaster(YARN): 负责作业的协调与调度。
- TaskTracker(1.x)/Container(YARN): 执行具体任务。
Hadoop Common: 提供支撑 Hadoop 各组件运行的公共工具包和 Java 库,如序列化、日志、配置解析等。
Hadoop 1.x、2.x、3.x 的区别是什么?(米哈游)
Hadoop 自发布以来,经历了多个重要版本演进。每代版本在架构、性能、功能等方面均有显著变化,尤其是从 1.x 到 2.x 是一次关键性重构。
1.x |
2.x |
3.x |
|
资源管理 |
JobTracker/TaskTracker |
YARN(ResourceManager + NodeManager) |
YARN(增强的调度与容器特性) |
支持计算框架 |
仅 MapReduce |
多种,如 Tez、Spark、Flink 等 |
同 2.x,兼容更多版本组件 |
HDFS 高可用 |
不支持 |
支持 NameNode HA |
更成熟,支持 ObserverNode |
Federation(联邦) |
不支持 |
支持多个 NameNode |
支持更多 HDFS 联邦实例 |
HDFS 块大小 |
默认 64MB |
默认 128MB |
支持配置为 256MB 或更高 |
容器支持 |
不支持 |
基于 YARN |
支持 Docker、GPU、GPU isolation |
erasure coding |
不支持 |
不支持 |
支持,减少存储冗余 |
HDFS 精简存储 |
不支持 |
不支持 |
引入 EC 纠删码方案,节省存储 |
HDFS 的读写流程是什么?(脉脉、携程、tcl)
HDFS 写流程:
- Client 请求 NameNode 创建文件。
- NameNode 返回是否可写并返回第一个 block 的候选 DataNode 列表(考虑副本策略、机架感知等)。
- Client 启动 pipeline 写入流程,将数据切分为 packet 后按 DataNode 顺序传输。
- DataNode 之间形成管道逐层传递,写入磁盘同时返回 ack。
- Client 接收全部 ack 后认为写入成功。
- 所有 block 写入后通知 NameNode 更新元数据。
HDFS 读流程:
- Client 请求 NameNode 获取文件 block 列表及其副本位置。
- Client 根据副本就近原则选择一个 DataNode。
- Client 与目标 DataNode 建立连接并开始读取。
- DataNode 将数据流式传输给 Client,Client 解码并缓存/写入本地。
HDFS 的默认块大小是多少?为什么设置这个大小?(tcl)
在不同版本中,HDFS 默认块大小有所变化:
- Hadoop 1.x 默认 64MB
- Hadoop 2.x/3.x 默认 128MB(可配置)
设置块大小的原因主要包括:
- 减少元数据开销: NameNode 内存中维护所有块的元数据,块越多开销越大,适当增大块大小能显著减少 NameNode 压力。
- 优化磁盘与网络吞吐: 大块顺序读写效率更高,尤其在高带宽网络下。
- 提升 MapReduce 任务性能: 每个块通常对应一个 Map 任务,块大可减少任务数,降低调度开销。
- 容错与副本效率权衡: 块太大则丢失代价大,块太小副本太多,浪费资源。
HDFS 小文件过多的原因及解决方法是什么?(tcl、富途证券、携程)
在 HDFS 中,小文件问题是指大量远小于 HDFS 块大小(默认 128MB)的文件存在,这会导致 NameNode 内存资源的严重消耗,从而影响整个集群的稳定性和可扩展性。
小文件过多的原因
- 数据采集粒度过细:某些采集系统将每分钟甚至每秒的日志或业务数据存储为一个文件,导致文件数量激增。
- 应用未做合并逻辑:业务应用(如 Flume、Sqoop)未对小文件进行预处理直接写入 HDFS。
- 频繁写入新文件:某些流式任务频繁创建新文件而不是对已有文件追加。
- 历史数据未归档:长期未做冷数据归档与压缩,造成大量低频访问的小文件保留。
小文件过多带来的问题
- NameNode 元数据压力增大,因为每个文件、Block 都需要记录在内存中,消耗资源。
- 文件系统操作(如 List、Get)效率下降。
- Job 提交时需要加载大量文件,带来调度和网络压力。
解决小文件过多的办法
使用 HDFS 文件合并工具(如 CombineFileInputFormat) |
MapReduce 作业使用 CombineFileInputFormat 类来处理多个小文件,提高处理效率。 |
使用 SequenceFile、Avro、ORC、Parquet 等格式 |
将多个小文件合并为一个压缩格式的大文件,提高存储效率与压缩比。 |
通过 Spark/Flink 等定期批处理归并 |
编写 ETL 任务周期性地对小文件进行重写合并。 |
调整 Flume/Sqoop 参数 |
控制生成文件的最小大小和最大时间间隔,减少文件生成频率。 |
Archive + HAR 格式 |
对不常访问的文件使用 Hadoop Archive(HAR) 格式归档。 |
使用 HBase 替代 |
对于小记录型数据,可考虑使用 HBase 存储,避免文件碎片。 |
示例:Spark 合并小文件
val inputRDD = spark.read.textFile("hdfs://path/to/smallfiles/*") inputRDD.coalesce(1).write.text("hdfs://path/to/mergedfile")
通过上述手段,可以有效缓解小文件问题,提升 HDFS 的可用性与系统性能。
NameNode 的作用是什么?是否存储数据?(4399大数据面经)
NameNode 是 HDFS 的核心组件之一,负责整个文件系统的命名空间管理以及客户端对文件的访问控制。
NameNode 的作用
- 元数据管理:维护文件目录结构、文件与数据块(Block)的映射、每个块所在的 DataNode 等信息。
- 客户端请求处理:当客户端请求读写文件时,NameNode 负责返回块位置及 DataNode 地址。
- 元数据持久化:NameNode 的元数据以 FsImage(镜像文件)和 EditLog(操作日志)形式保存在本地磁盘中,并支持 Checkpoint。
- 心跳机制监控 DataNode:定期接收来自 DataNode 的心跳与 Block 报告,了解 DataNode 的健康状态。
NameNode 是否存储数据
NameNode 本身 不存储用户的实际数据内容(即 HDFS 文件的 Block 内容),而是存储 元数据。
- 实际的数据块分布在多个 DataNode 上。
- NameNode 只知道每个数据块在哪些 DataNode 上,便于客户端访问时定位。
数据读写流程中的角色
- 写流程:客户端向 NameNode 请求分配块位置,得到 DataNode 列表后直接向 DataNode 写入数据。
- 读流程:客户端向 NameNode 请求文件的块列表,随后从对应的 DataNode 拉取数据。
NameNode 的重要性
NameNode 是 HDFS 的单点组件,一旦宕机,整个文件系统将不可用,因此在生产中需配置高可用机制(如 HA、ZKFC、QJM)。
Hadoop 的高可用性(HA)如何实现?(4399、昆仑万维)
Hadoop 的高可用性(High Availability, HA)机制旨在解决早期版本 NameNode 单点故障(SPOF)问题。
高可用性的架构组成
- Active NameNode:当前工作的主 NameNode,处理所有客户端请求。
- Standby NameNode:备用 NameNode,实时同步 Active 的元数据状态,随时可切换。
- JournalNode 集群(推荐 3~5 个):用于记录 EditLog 的中间节点,确保两台 NameNode 的元数据同步。
- Zookeeper + ZKFC(Zookeeper Failover Controller):用于监控 NameNode 状态并自动完成主备切换。
HA 实现机制
- 元数据同步:Active NameNode 将每次 EditLog 写入多个 JournalNode。Standby NameNode 从 JournalNode 获取 EditLog 并应用,保持元数据同步。
- 故障转移机制:ZKFC 监控 NameNode 心跳,若发现 Active 异常,会通知 Standby 切换为 Active。切换过程自动完成,不需要人工干预。
- 共享存储(老方案):使用 NFS 或 Quorum Journal Manager(QJM)保存元数据。已被 QJM 模式替代。
注意事项
- HDFS 客户端需配置 HA URI,例如:
<property> <name>fs.defaultFS</name> <value>hdfs://mycluster</value> </property>
- 客户端通过逻辑名称(如 mycluster)访问,不感知后端 NameNode 切换。
高可用性保证了 Hadoop 集群在 NameNode 故障时仍能自动恢复,增强系统稳定性和业务连续性。
MapReduce 的工作原理及流程是什么?(脉脉、新浪微博)
MapReduce 是一种分布式计算框架,基于 "Map" 与 "Reduce" 两个阶段的编程模型设计,广泛应用于批处理任务如日志分析、文本挖掘、数据聚合等。
MapReduce 的基本思想
- Map(映射):对输入数据进行处理和转换,生成键值对(Key-Value)。
- Shuffle(洗牌):对 Map 输出按 Key 进行分组、排序,传输到相应的 Reduce 节点。
- Reduce(归约):对相同 Key 的数据进行聚合处理,输出最终结果。
MapReduce 执行流程
- 输入分片 InputSplit:作业提交后,InputFormat 根据文件大小将数据切分为多个逻辑片段(InputSplit)。每个 Split 由一个 Mapper 处理。
- Map 阶段:Mapper 读取 Split 数据,调用用户自定义的 map() 方法处理,输出 KV 对。输出暂存于内存缓冲区,达阈值后溢写磁盘(Spill 文件),并进行本地合并排序(Combiner)。
- Shuffle 阶段:Reducer 拉取多个 Mapper 输出(KV 对),按 Key 排序、归并。Shuffle 是 MapReduce 中最复杂也最容易成为性能瓶颈的阶段。
- Reduce 阶段:调用用户定义的 reduce() 方法,对相同 Key 的所有值进行计算。输出的结果通过 OutputFormat 写入到 HDFS。
示例:WordCount
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { for (String word : value.toString().split(" ")) { context.write(new Text(word), new IntWritable(1)); } } } public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) sum += val.get(); context.write(key, new IntWritable(sum)); } }
MapReduce 的优缺点
优点 |
缺点 |
编程模型简单,适合大批量数据 |
批处理延迟高,实时性差 |
自动处理容错、数据本地化 |
Shuffle 阶段复杂,易瓶颈 |
易于扩展,适配大规模集群 |
编写调试繁琐,不适合复杂逻辑 |
目前 MapReduce 已逐渐被 Spark、Flink 等计算框架替代,但在传统离线数仓和大数据平台中仍有应用价值。
Hadoop 的高可用性(HA)如何实现?(4399、昆仑万维)
Hadoop 的高可用性(High Availability, HA)机制旨在解决早期版本 NameNode 单点故障(SPOF)问题。
高可用性的架构组成
- Active NameNode:当前工作的主 NameNode,处理所有客户端请求。
- Standby NameNode:备用 NameNode,实时同步 Active 的元数据状态,随时可切换。
- JournalNode 集群(推荐 3~5 个):用于记录 EditLog 的中间节点,确保两台 NameNode 的元数据同步。
- Zookeeper + ZKFC(Zookeeper Failover Controller):用于监控 NameNode 状态并自动完成主备切换。
HA 实现机制
- 元数据同步:Active NameNode 将每次 EditLog 写入多个 JournalNode。Standby NameNode 从 JournalNode 获取 EditLog 并应用,保持元数据同步。
- 故障转移机制:ZKFC 监控 NameNode 心跳,若发现 Active 异常,会通知 Standby 切换为 Active。切换过程自动完成,不需要人工干预。
- 共享存储(老方案):使用 NFS 或 Quorum Journal Manager(QJM)保存元数据。已被 QJM 模式替代。
注意事项
- HDFS 客户端需配置 HA URI,例如:
<property> <name>fs.defaultFS</name> <value>hdfs://mycluster</value> </property>
- 客户端通过逻辑名称(如 mycluster)访问,不感知后端 NameNode 切换。
高可用性保证了 Hadoop 集群在 NameNode 故障时仍能自动恢复,增强系统稳定性和业务连续性。
SecondaryNameNode 的作用是什么?(米哈游)
SecondaryNameNode 是 Hadoop HDFS 架构中的一个辅助角色,常被误解为 "NameNode 的热备份",实际上其主要职责是协助主 NameNode 管理元数据的合并与清理,而不是作为 HA 的备用节点。
SecondaryNameNode 的主要作用
- FsImage 与 EditLog 的合并(Checkpoint):NameNode 会将文件系统的操作记录在 EditLog 中,这个文件会随着操作增多而不断增长。SecondaryNameNode 会定期从 NameNode 获取 EditLog 和 FsImage,对其进行合并生成新的 FsImage。合并后的 FsImage 会传回 NameNode 保存,并用于下次重启时快速加载元数据。
- 缓解 NameNode 重启时间过长的问题:如果没有合并操作,NameNode 在重启时需要加载庞大的 FsImage 并回放 EditLog,时间非常长。通过周期性合并,可以显著加快重启速度。
SecondaryNameNode 不具备的能力
- 不具备与 NameNode 实时同步状态的能力。
- NameNode 宕机后,SecondaryNameNode 不能直接接管主控角色。
- 与 HA 中的 Standby NameNode 机制完全不同。
启动机制与资源消耗
- SecondaryNameNode 是单独运行的进程,通常部署在与 NameNode 不同的机器上,以避免资源竞争。
- 合并操作较为耗 CPU 与磁盘 IO,需部署在资源充足的节点上。
在实际应用中,SecondaryNameNode 在 Hadoop 2.x 之后被 CheckpointNode 与 HA 机制部分取代,但仍在一些小型集群中发挥作用。
MapReduce 的工作原理及流程是什么?(脉脉、新浪微博)
MapReduce 是一种分布式计算模型,其核心思想是将大规模数据集拆分成多个小数据块,分别进行 map 操作(映射),之后进行 reduce 操作(归约),实现并行处理。MapReduce 的基本执行流程如下:
- 输入数据的分片(Input Splits):输入数据被分割为若干个逻辑切片(split),每个 split 对应一个 Map 任务,通常由 InputFormat 类实现分片逻辑。
- Map 阶段:每个 Map 任务对对应的 split 进行处理。Map 函数接收 <key, value> 对作为输入,输出一个中间的 <key, value> 对集合。这些中间结果暂存在内存中。
- Partition 分区:Map 端会根据 key 计算 hash 值分发到不同的 reduce task 对应的分区,通常使用默认的 HashPartitioner。
- Shuffle 阶段:中间结果经过排序、合并(可选的 Combiner)、分区之后,通过网络传输到对应的 Reduce 任务节点。这个阶段是 MapReduce 最关键的性能瓶颈之一。
- Reduce 阶段:Reduce 任务根据 key 聚合对应的所有 value,执行归约操作。用户可以定义 reduce 函数处理聚合后的数据,并输出最终结果。
- 输出结果:最终由 OutputFormat 类将 Reduce 输出写入 HDFS 或其他存储介质。
整个流程高度解耦,具备良好的容错性和可扩展性,适合处理大规模批量数据分析任务。
MapReduce 的 Shuffle 过程是什么?(脉脉)
Shuffle 是连接 Map 阶段和 Reduce 阶段的桥梁,主要指 Map 输出结果从产生到被 Reduce 消费的全过程,包含排序、缓存、溢写、分区、网络传输等多个子阶段。详细过程如下:
- Map 端排序(Map-Side Sort):Map 任务输出中间结果后,先在内存中进行缓存,缓存区大小由参数 io.sort.mb 决定。
- 环形缓冲区:当缓冲区达到阈值时,进行溢写操作,数据写入磁盘,溢写文件会被排序和合并,最终形成一个或多个 spill 文件。
- 合并溢写文件(Merge):Map 任务完成后会将多个 spill 文件进行归并,形成一个大的排序文件,每个 key 的数据按 reducer 分区编号组织好。
- 分区传输:Map 输出文件会按分区划分,每个分区对应一个 Reduce 任务,Reduce 端通过 HTTP 请求将对应分区的数据拉取到本地。
- Reduce 端排序与合并(Reduce-Side Merge):从多个 Map 节点拉取的数据需要进行归并排序,确保同一个 key 的 value 能连续排列。
Shuffle 是整个 MapReduce 中最复杂也是最影响性能的阶段,对 IO、网络和 CPU 有较高要求。
MapReduce 中 Combine 的作用是什么?(大厂八股文)
Combine 是一个可选的局部聚合器,设计用于在 Map 阶段本地对中间数据进行预聚合,从而减少 Shuffle 阶段传输的数据量,提高整体性能。
适用场景是 Map 输出中某些 key 对应大量 value,例如求和、计数等聚合型操作。Combine 会在每个 Map 输出中进行一次局部 reduce,将同一个 key 的多个 value 合并为一个结果。例如:
- 原始输出:<a,1> <a,1> <a,1> <b,1>
- Combine 处理后:<a,3> <b,1>
Combine 函数通常与 Reduce 函数相同或相似,但需要具备可交换性与结合性。
注意事项:
- Combine 不保证执行次数和执行是否发生,因此不适合有副作用的逻辑。
- Hadoop 的 Combine 默认在 Map 端执行一次,不影响最终结果准确性,但能显著降低网络传输和 Reduce 压力。
MapReduce 为什么需要环形缓冲区?(大厂八股文)
Map 任务在输出中间结果时,需要一个高效的缓冲区来暂存 key-value 对,Hadoop 使用环形缓冲区(Circular Buffer)来优化内存使用和写磁盘的性能。
环形缓冲区的主要作用如下:
- 提升写入效率:Map 的输出结果写入内存缓冲区后,达到阈值即溢写到磁盘。环形结构便于连续写入与读取,减少内存碎片。
- 支持多线程并发:数据写入和溢写可以通过 Producer/Consumer 线程并行处理,提高任务吞吐量。
- 控制内存使用:避免一次性写入大量数据造成内存溢出。
- 便于排序与合并:缓冲区中数据可在溢写前按 key 排序,便于后续 Reduce 操作。
参数 io.sort.mb
控制缓冲区大小,过小会导致频繁溢写,过大则可能引发 GC 压力。
Map 和 Reduce 的数量由什么决定?(脉脉)
Map 和 Reduce 的数量并不是随意配置的,具体受以下因素影响:
Map Task 的数量:
- 决定于输入文件的切片数量(Input Splits),由 InputFormat 决定。
- 一般来说,一个 HDFS block 会生成一个 split,HDFS 默认 block 大小为 128MB 或 256MB。
Reduce Task 的数量:
- 由用户在 Job 配置中设置(job.setNumReduceTasks(n)),需要综合考虑以下因素:集群的 Reduce slot 数量数据量和 key 分布网络带宽与磁盘 IO
设置建议:
- Reduce 数量太少:每个 reduce 处理的数据量过大,容易造成瓶颈;
- Reduce 数量太多:会导致启动开销大、资源浪费。
YARN 环境下,为了提升资源利用率,常按如下经验设定:
numReduceTasks = 总输入数据量 / 每个 reduce 理想处理量(128~256MB)
YARN 的架构及工作原理是什么?(科大讯飞、携程)
YARN(Yet Another Resource Negotiator)是 Hadoop 的资源管理与调度框架,支持多种计算框架(如 MapReduce、Spark、Tez)的统一资源调度。
YARN 架构由以下核心组件构成:
- ResourceManager(RM):全局资源调度器,负责资源统一分配与管理,包括调度策略实现。
- NodeManager(NM):运行在每个节点上的资源代理,负责节点资源的报告、容器(Container)生命周期管理。
- ApplicationMaster(AM):每个应用独立的调度和协调器,负责该应用任务的调度、失败重试等工作。
- Container:最小的资源单位,由 NodeManager 创建并运行任务进程。
工作流程:
- 客户端向 ResourceManager 提交应用(如 MapReduce 作业)。
- RM 分配 Container 启动 ApplicationMaster。
- AM 向 RM 申请资源并分配任务。
- RM 分配 Container 给 AM,AM 通知 NM 启动任务。
- 任务完成后,AM 向 RM 报告完成,释放资源。
YARN 实现资源隔离(CPU、内存)、任务调度公平性和集群利用率提升,是 Hadoop 2.x 后的核心改进。
YARN 的调度器有哪些?(tcl)
YARN 的调度器(Scheduler)负责在多个作业之间分配集群资源,是 ResourceManager 的核心组件之一。常见调度器包括以下几种:
FIFO Scheduler |
先进先出,默认调度器,任务按提交时间顺序依次执行 |
作业数量少、无并发需求场景 |
Capacity Scheduler |
按照容量划分资源池,不同队列配置不同容量,支持多租户 |
多组织/部门共用集群 |
Fair Scheduler |
每个作业最终能获得公平资源占比,动态调整资源 |
多用户同时提交作业、对响应时间要求高 |
Dominant Resource Fairness(DRF) |
提供资源公平性,综合考虑 CPU、内存等多种资源维度 |
混合资源调度需求高的场景 |
在实际部署中,Capacity 和 Fair Scheduler 被使用最广泛。
用户可通过 yarn-site.xml
配置调度策略和各队列资源参数,如最小资源单位、最大任务数、优先级等。
Hadoop 的 Checkpoint 流程是什么?(昆仑万维)
Hadoop 中的 Checkpoint 主要是指 SecondaryNameNode 对 HDFS 元数据(fsimage 和 edits)的合并流程,并不是高可用机制。其主要作用是防止 edits 文件过大,导致 NameNode 启动缓慢甚至失败。
Checkpoint 流程如下:
- 拉取元数据:SecondaryNameNode 从 NameNode 拉取 fsimage(镜像文件)和 edits(日志文件)。
- 合并操作:将 edits 应用到 fsimage 中,生成新的 fsimage。
- 上传回 NameNode:合并后的新 fsimage 上传至 NameNode,同时清空 edits 日志。
- NameNode 切换状态:NameNode 将新 fsimage 设为当前元数据,并开始记录新的 edits 文件。
该流程一般定时触发,可通过 fs.checkpoint.period
和 fs.checkpoint.size
控制执行间隔与日志大小阈值。
注意:Checkpoint 本身不具备 HA 能力,NameNode 发生宕机后无法自动切换,Hadoop 2.x 通过 QJM + HA 实现了真正的高可用。
Hadoop 的压缩算法有哪些?(科大讯飞大数据面经)
Hadoop 支持多种压缩算法,可在数据读写过程中提升 IO 性能,常见压缩算法包括:
压缩算法 |
是否支持切片 |
压缩率 |
压缩/解压速度 |
特点 |
Gzip |
否 |
高 |
中 |
常用于最终存储,压缩率高但不可切片 |
Bzip2 |
否 |
非常高 |
慢 |
适合归档,压缩比最高,但速度慢 |
LZO |
是 |
中等 |
非常快 |
适合实时读取或中间存储 |
Snappy |
是 |
中等偏低 |
快 |
平衡压缩率和速度,HBase 默认使用 |
LZ4 |
是 |
中等 |
快 |
适合大规模数据快速处理 |
使用压缩需配置 mapreduce.output.compress
、mapreduce.output.compression.codec
等参数。
Hive 有哪些表类型?内部表与外部表的区别是什么?(4399、tcl、联通)
Hive 提供多种表类型,常见包括:
- 内部表(Managed Table):Hive 管理表数据的生命周期,包括数据的创建与删除。
- 外部表(External Table):数据存储在外部(如 HDFS 的某目录),Hive 仅维护元数据。
- 临时表(Temporary Table):作用域限定在当前 session,断开连接后自动销毁。
- 分区表(Partitioned Table):对数据按逻辑列(如日期)进行划分,提升查询效率。
- 桶表(Bucketed Table):将数据散列到固定桶中,有利于 map join 和抽样查询。
内部表 vs 外部表区别如下:
比较项 |
内部表 |
外部表 |
数据存储位置 |
Hive 默认仓库 |
用户指定目录 |
DROP 表行为 |
删除元数据 + 数据 |
仅删除元数据 |
数据可重用性 |
一般不重用 |
多系统共享 |
应用场景 |
数据完全由 Hive 管理 |
数据由外部产生或共享使用 |
什么时候用 Hive 内部表,什么时候用外部表?(4399)
- 使用内部表场景:数据完全由 Hive 创建和管理临时分析数据或中间结果不需保留脚本统一导入导出数据,不需要手动清理
- 使用外部表场景:数据由其他程序(如 Flume/Sqoop)产出多个应用共享同一份数据,如 Hive + Spark数据不可随 Hive 表生命周期删除元数据需要灵活重建或多环境复用
一般建议生产环境中采用外部表,便于数据独立管理和容错恢复。
Hive 的存储格式有哪些?(shein)
Hive 支持多种存储格式,不同格式影响查询性能、压缩比与兼容性,主要包括:
存储格式 |
是否列式 |
是否压缩 |
特点 |
适用场景 |
TextFile |
否 |
否 |
默认格式,兼容性强,效率低 |
数据量小或临时使用 |
SequenceFile |
否 |
是 |
可压缩二进制,适合中间数据 |
MapReduce 输出中间结果 |
ORC |
是 |
是 |
高压缩率,高效读取,支持索引 |
大量读操作,如数仓聚合查询 |
Parquet |
是 |
是 |
跨平台支持好,支持嵌套类型 |
Hive、Spark、Impala 共用数据 |
RCFile |
是 |
是 |
历史格式,已被 ORC 替代 |
兼容老系统数据 |
推荐在数仓中采用 ORC 或 Parquet,提高压缩率与查询性能,可结合 Zlib、Snappy、LZO 等压缩算法使用。
Hive 数据倾斜的原因及解决方案是什么?(4399、度小满、联通)
Hive 数据倾斜是指某些 key 分布不均,导致某些 reduce 或 task 处理的数据量远大于其他,进而引起作业运行时间拉长甚至失败。常见的原因如下:
- key 分布极度不均衡:如某个热点 key(如 null、默认值、某热门 ID)出现频率远高于其他 key。
- join 时一张小表广播失败:原应使用 map join 却转为 reduce join。
- group by、distinct、count 等聚合操作集中在单个 key 上。
- 倾斜字段未预处理:如日志中的 user_id、ip 等存在天然倾斜。
常见解决方案如下:
group by 倾斜 |
使用
或 map-side 聚合、预聚合,或加随机前缀拆分 key |
join 倾斜 |
尽可能使用 map join;或对大表添加 salting(加随机前缀)+ 后期合并 |
数据预处理 |
过滤掉明显倾斜 key(如 null),或进行数据抽样、分桶等 |
参数优化 |
设置
、
,让 Hive 自动处理倾斜 |
此外,还可以用 Hive UDF 对热点 key 特殊处理,将其拆分成多个 fake key,再聚合还原。
Hive 的三种自定义函数(UDF、UDAF、UDTF)的区别及实现步骤是什么?(4399、富途证券)
Hive 支持三种类型的用户自定义函数,用于扩展其 SQL 功能。
类型 |
全称 |
输入 |
输出 |
使用场景 |
UDF |
User Defined Function |
一行 -> 一行 |
一行 |
转换函数,如字符串处理 |
UDAF |
User Defined Aggregate Function |
多行 -> 一行 |
一行 |
聚合函数,如自定义 sum、avg |
UDTF |
User Defined Table Function |
一行 -> 多行 |
多行 |
拆分函数,如 explode |
UDF 开发步骤:
- 继承 org.apache.hadoop.hive.ql.exec.UDF 类。
- 重写 evaluate(...) 方法。
- 打包 JAR 上传 Hive。
- ADD JAR + CREATE TEMPORARY FUNCTION 注册。
UDAF 开发步骤(需实现 AggregationBuffer):
- 继承 org.apache.hadoop.hive.ql.exec.UDAF 或 GenericUDAFResolver。
- 实现初始化、聚合、合并、输出逻辑。
- 注册函数。
UDTF 开发步骤:
- 继承 org.apache.hadoop.hive.ql.udf.generic.GenericUDTF。
- 实现 initialize()、process()、close() 方法。
示例:
public class MyUDF extends UDF { public Text evaluate(Text input) { return new Text("hello, " + input.toString()); } }
Hive 中 order by、sort by、distribute by、cluster by 的区别是什么?(4399、富途证券)
这四个关键字都与排序和分发有关,但使用语义和执行机制不同:
关键字 |
含义 |
排序范围 |
数据分发 |
应用场景 |
order by |
全局排序 |
整体数据 |
单个 reduce |
小数据集,严格排序 |
sort by |
每个 reduce 内排序 |
局部 |
可多个 reduce |
排序但不全局,常与 distribute by 配合 |
distribute by |
控制分区 |
无排序 |
指定分区字段 |
控制 reduce 输入,防止数据倾斜 |
cluster by |
distribute by + sort by |
每个 reduce 内排序 |
指定字段分区 |
常用于分区排序输出 |
示例:
select * from user order by age; -- 全局排序 select * from user distribute by age sort by age; -- age 分区且每区内部有序 select * from user cluster by age; -- 效果与上面语句相同
注意:order by 会触发单个 reduce,极易成为瓶颈,慎用于大数据量排序。
Hive 分区和分桶的区别及优化方式是什么?(4399、富途证券)
项目 |
分区(Partition) |
分桶(Bucket) |
原理 |
按目录划分数据 |
按字段哈希划分桶文件 |
存储结构 |
每个分区一个目录 |
每个桶一个文件 |
查询优化 |
支持分区裁剪 |
支持 map join、抽样 |
创建方式 |
|
|
使用限制 |
无限制 |
需指定桶数,且插入必须开启
|
优化方式:
- 分区优化:控制分区数量,避免过多小文件(如日期+小时分区)启用分区裁剪(如 where dt='2024-01-01')
- 分桶优化:配合 map join 提升 join 效率启用并行写入,提高加载性能
注意:分桶需配合 sort by
保证桶内有序,否则可能失去优化效果。
Hive 的 join 操作原理是什么?left join、right join、inner join、outer join 的异同?(度小满、大厂八股文)
Hive Join 操作底层基于 MapReduce 执行,主要分为:
- Common Join(Reduce Join):将所有关联表通过 key shuffle 到 reducer,再进行 join。
- Map Join(广播 join):将小表加载到内存,在 map 阶段与大表 join,提升性能。
- SMB Join:两表都按 join key 排序 + 分桶,适用于大表 join。
Join 类型比较:
类型 |
匹配条件 |
左表无匹配 |
右表无匹配 |
特点 |
inner join |
两边都匹配才保留 |
丢弃 |
丢弃 |
最常见 join 类型 |
left join |
保留左表所有 |
保留 + null |
丢弃 |
主表保留 |
right join |
保留右表所有 |
丢弃 |
保留 + null |
从表保留 |
full outer join |
所有记录保留 |
保留 + null |
保留 + null |
最全信息汇总 |
示例:
select * from A a inner join B b on a.id = b.id; select * from A a left join B b on a.id = b.id;
优化建议:优先使用 map join 或广播 join,并开启 hive.auto.convert.join=true
,让 Hive 自动判断是否可用 map join。
Hive 如何优化 join 操作?(大厂八股文、科大讯飞)
在 Hive 中,Join 操作是性能瓶颈的主要来源之一。Hive 基于 MapReduce 或 Tez 执行 SQL,因此优化 Join 操作,关键在于减少数据的 Shuffle 和存储开销,并避免过大的中间数据。以下是主要的优化方式:
1. 使用 MapJoin(又称广播 Join) 适用于一个表非常小(建议小于 25MB),另一个表很大时,将小表广播到每个 Mapper 端进行内存 Join,可以避免 Reduce 端 Shuffle,大大提升性能。
SELECT /*+ MAPJOIN(small_table) */ big.a, small.b FROM big_table big JOIN small_table small ON big.id = small.id;
2. 控制 Join 顺序 Hive 默认会选择最小表放在左边,但在某些情况下优化器判断不准,可以手动调整 Join 顺序或使用 hint
明确指定。
3. 使用 Bucket Map Join 或 Sort Merge Join 适用于两个大表 Join:
- Bucket Map Join 要求两个表按照相同字段进行 bucketing,并且 bucket 数目一致,可以避免 Shuffle。
- Sort Merge Join 要求 Join 字段有序,可以在 Reducer 端有序 Merge,适合大表 Join,Hive 会自动判断是否可用。
SET hive.optimize.bucketmapjoin=true; SET hive.auto.convert.sortmerge.join=true;
4. 合理配置 Hive 参数
- hive.mapjoin.smalltable.filesize:决定广播 Join 小表的阈值。
- hive.auto.convert.join:启用自动 MapJoin 判断。
- hive.optimize.skewjoin:开启后可处理数据倾斜 Join。
5. 规避数据倾斜
- 对热点 Key 使用 SALTING 技术打散,如 concat(id, rand())。
- 使用 GROUP BY 前加 DISTRIBUTE BY 均衡 Reduce 负载。
- 使用 hint 手动指定 skew join,如:
SELECT /*+ SKEWJOIN */ a.key, b.value FROM a JOIN b ON (a.key = b.key);
6. 减少 Join 次数和字段数量 能提前过滤或只提取需要字段的,可以通过子查询优化 Join 前数据规模。
7. Join 转为 Semi Join 或 Exists 在某些场景下可以改写为 EXISTS
或 IN
,Hive 也能优化执行计划。
Hive 的窗口函数有哪些?举 examples 说明。(4399、三七互娱大数据面经)
Hive 支持 ANSI SQL 标准中的大部分窗口函数,窗口函数在保留行数不变的前提下进行聚合、排序、排名等计算。
常用窗口函数分类如下:
排名类 |
row_number(), rank(), dense_rank() |
给每行排序排名 |
聚合类 |
sum(), avg(), max(), min(), count() |
按窗口聚合,不丢行 |
分布类 |
percent_rank(), cume_dist(), ntile(n) |
计算分位排名等 |
导航类 |
lag(), lead(), first_value(), last_value() |
取上下行的值 |
示例 1:计算每个部门工资最高的 3 人
SELECT * FROM ( SELECT name, dept, salary, row_number() OVER(PARTITION BY dept ORDER BY salary DESC) AS rk FROM employee ) tmp WHERE rk <= 3;
示例 2:计算每人上一期的工资
SELECT name, salary, lag(salary, 1, 0) OVER(PARTITION BY name ORDER BY year) AS prev_salary FROM salary_table;
示例 3:计算员工累计工资和
SELECT name, year, salary, sum(salary) OVER(PARTITION BY name ORDER BY year ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS total_salary FROM salary_table;
这些窗口函数在数据统计、横向计算、趋势分析中非常重要,配合 PARTITION BY
和 ORDER BY
使用,可以实现灵活的窗口分析能力。
row_number、rank、dense_rank 的区别是什么?(4399、富途证券)
三者都用于排序并进行排名,但在处理并列值(即排序字段相同)时,排名规则不同。
函数名 |
作用 |
是否跳跃排名 |
是否并列排名 |
row_number() |
所有行按顺序编号 |
否 |
否(即使值相同也不重复) |
rank() |
同值排名相同,下一个排名跳过 |
是 |
是 |
dense_rank() |
同值排名相同,下一个排名不跳过 |
否 |
是 |
示例: 假设工资排名如下:
name |
salary |
A |
10000 |
B |
9000 |
C |
9000 |
D |
8000 |
SELECT name, salary, row_number() OVER (ORDER BY salary DESC) AS rn, rank() OVER (ORDER BY salary DESC) AS rnk, dense_rank() OVER (ORDER BY salary DESC) AS drnk FROM employee;
结果:
name |
salary |
rn |
rnk |
drnk |
A |
10000 |
1 |
1 |
1 |
B |
9000 |
2 |
2 |
2 |
C |
9000 |
3 |
2 |
2 |
D |
8000 |
4 |
4 |
3 |
这些函数广泛用于分页查询、TopN计算、统计排名等业务中。
Hive 的 count(distinct) 有其他写法吗?(soul)
count(distinct)
在大数据量场景下计算开销非常大,因为其底层需要 MapReduce Shuffle 进行去重统计。Hive 提供多种方式优化或替代:
1. group by 多字段结合 count 计算
SELECT COUNT(*) FROM ( SELECT col1, col2 FROM table GROUP BY col1, col2 ) t;
适用于多个字段去重的 count(distinct col1, col2),先 group by 再 count。
2. 使用 approximate(近似)函数 从 Hive 2.x 开始支持 approx_distinct
函数,使用 HyperLogLog 算法。
SELECT approx_distinct(userid) FROM big_table;
这种方式牺牲一定精度,换来计算性能大幅提升。
3. 拆分多列 distinct 成多个子查询(Hive 不能 count(distinct col1, col2)
)
SELECT COUNT(DISTINCT col1), COUNT(DISTINCT col2) FROM table;
4. 利用 bitmap 技术 Hive on Druid 或其他引擎支持 bitmap_count 方法高效去重。
5. 使用 Hive UDAF 实现高效去重计数 可以自己实现 HyperLogLog 等自定义聚合函数。
优化建议:
- 避免多个字段组合去重,尽量预聚合;
- 数据量非常大时,推荐近似算法;
- T+1 数据建议离线聚合后写入宽表中。
Hive SQL 转化为 MapReduce 的过程是什么?(中国移动、南方电网面经)
Hive SQL 在提交执行时并不会直接运行,而是经过多步转换生成 MapReduce 作业,具体流程如下:
1. SQL 解析(Parsing) Hive 的解析器将 SQL 语句解析为 AST(抽象语法树),检查语法合法性。
2. 语义分析(Semantic Analysis) 分析 SQL 各字段、表是否合法,构建逻辑执行计划,绑定字段与表的元数据。
3. 优化器重写逻辑计划(Logical Plan Optimization) 如谓词下推(Predicate Push Down)、列裁剪、Join 重写、MapJoin 优化等。
4. 生成物理执行计划(Physical Plan) 将逻辑计划转换为若干个 Stage 的 MapReduce Job,每个 Stage 定义了 Mapper 和 Reducer 的处理逻辑。
5. 编译为 MapReduce 任务(Task Compilation) 将计划中的操作转换为具体的 Mapper 类和 Reducer 类,生成执行 DAG(有向无环图)。
6. 提交 Job 执行 Hive 将 MapReduce Job 提交给 YARN 执行,每个 Job 中 Mapper 和 Reducer 分别处理对应数据分片,最终将结果写入 HDFS 或临时目录中。
7. 返回结果到客户端 如果是查询类语句(如 SELECT),Hive 会将结果写入临时目录并返回;如果是建表、插入等,则更新元数据后结束。
该过程可通过如下命令查看执行计划:
EXPLAIN EXTENDED SELECT * FROM table WHERE id = 1;
这个流程对于理解 Hive 的性能瓶颈(如 Shuffle、Skew、临时文件开销)和调优手段有关键作用。
Hive 的元数据存储方式有哪些?各有什么特点?(金域大数据)
Hive 的元数据主要存储在 Metastore 中,Metastore 是一个集中式的服务,用于存储 Hive 表、分区、列、SerDe 以及存储格式等信息。元数据的存储方式主要有以下几种:
内嵌式 Derby |
默认方式,轻量级,仅适用于测试或单用户模式,不支持并发操作。 |
本地 MySQL/PostgreSQL |
Metastore 以独立进程运行,Hive 与 Metastore 共享本地数据库,适合单节点测试环境。 |
远程 Metastore 服务 |
HiveClient 与 Metastore 通过 Thrift 通信,元数据存储在如 MySQL、PostgreSQL、Oracle 中。适合生产集群部署。 |
Hive Catalog(如 Hive on Spark) |
使用 Spark catalog 接管元数据管理,适合 Spark SQL 与 Hive 混合架构。 |
特点比较:
- 嵌入式 Derby:配置简单但只能单进程访问,多个用户并发连接容易造成元数据锁定或异常。
- 本地 MySQL/PostgreSQL:可用于小规模开发测试,但不具备远程服务能力。
- 远程 Metastore(生产推荐):支持多 HiveServer 并发访问,便于集群部署,具有可扩展性和稳定性。
- HA 支持:MySQL/PostgreSQL 配合主从复制和 keepalived 可实现元数据高可用。
元数据的可靠性直接影响 Hive SQL 的稳定性,因此生产环境下通常配置远程 Metastore + 高可用数据库,并对元数据表做定期备份。
Hive 的 union 和 union all 的区别是什么?(携程、前程无忧)
Hive 中的 union
与 union all
都用于将多个查询结果合并为一个结果集,但存在重要差异:
- union:对合并的结果集进行去重操作。
- union all:直接合并结果集,不去重,效率更高。
示例说明:
-- UNION:去重 SELECT name FROM student_a UNION SELECT name FROM student_b; -- UNION ALL:不去重 SELECT name FROM student_a UNION ALL SELECT name FROM student_b;
主要区别:
是否去重 |
是 |
否 |
执行效率 |
较慢(涉及去重) |
更快(直接合并) |
数据重复 |
不包含重复数据 |
保留重复数据 |
使用建议:
- 如果不需要去重,应尽量使用 union all,避免排序去重带来的性能损耗;
- union 使用时 Hive 会将所有子查询结果做全局去重,易产生大量内存开销;
- Hive 的 union 实际上转换为多个子查询加 insert into ... select 再进行 merge,通常消耗较大资源。
如何调整 Hive 中 Mapper 和 Reducer 的数目?(美的)
调整 Mapper 和 Reducer 数量,是 Hive 作业调优的重要手段,涉及 MapReduce 执行框架的资源分配与性能优化。
Mapper 数量控制:
- Mapper 的数量由输入数据的 切分(split)方式决定。
- 一个输入 split 对应一个 Mapper,默认每 128MB 划分一个 split(可配置)。
相关参数:
-- 控制 InputSplit 大小 mapreduce.input.fileinputformat.split.maxsize mapreduce.input.fileinputformat.split.minsize
Reducer 数量控制:
Reducer 数量可以显式控制或由 Hive 自动估算:
-- 手动指定 Reducer 数量 set mapreduce.job.reduces = 10; -- 自动调整 reducer 数量阈值(默认256MB) set hive.exec.reducers.bytes.per.reducer=256000000; -- 最小 Reducer 个数 set hive.exec.reducers.min = 2; -- 最大 Reducer 个数 set hive.exec.reducers.max = 1009;
最佳实践:
- Mapper 通常不显式控制,主要通过控制输入文件大小与 split 策略间接影响;
- Reducer 可根据输出数据量和资源情况动态控制,推荐以每个 Reducer 处理 256MB 左右数据为参考;
- 当出现 only 1 reducer 问题时,应排查是否启用了 ORDER BY、GROUP BY 等全局聚合语句。
Hive 的小文件问题如何解决?(度小满、富途证券)
Hive 小文件过多会严重影响性能,特别是在启动 Mapper 阶段。主要表现为:
- 启动大量 Mapper 导致 YARN 启动开销陡增;
- Job 整体运行时间增加;
- Namenode 元数据压力上升,甚至触发 GC 或失败。
解决方案:
- 使用合并文件机制(合并小文件为大文件):
-- 合并输入文件(适用于 select 查询时触发) set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
- 使用 INSERT OVERWRITE 合并输出:
INSERT OVERWRITE TABLE target_table SELECT * FROM source_table;
- 使用 FileMergeTask(MR 作业合并):
使用 MapReduce 手动合并多个小文件为大文件,例如写一个 MR Job 来聚合同一分区内的小文件。
- 动态分区小文件控制:
-- 控制每个分区 reducer 个数 set hive.exec.reducers.max=1000; set hive.exec.dynamic.partition=true; set hive.exec.dynamic.partition.mode=nonstrict;
- 压缩输出:
set hive.exec.compress.output=true; set mapreduce.output.fileoutputformat.compress=true;
- ETL 层控制文件生成逻辑:
避免过度分区、低频更新导致每个分区对应极少数据,推荐合并更新机制或者合并中间结果再落表。
笛卡尔积会产生什么问题?会导致数据倾斜吗?(360面经、度小满)
在 Hive 中执行不加限制的 Join(无 ON 条件或条件恒为真),容易导致 笛卡尔积(Cross Join),即两个表的所有记录两两配对。
影响与问题:
- 笛卡尔积会造成结果数量爆炸:如果一个表有 N 条记录,另一个表有 M 条记录,结果将是 N*M;
- 会产生大量中间数据,严重影响性能;
- 极易导致内存溢出(OOM)、磁盘写爆、Job 卡死;
- 是数据倾斜的极端情况:所有数据被调度到同一个或极少数 reducer 处理。
是否造成数据倾斜:
是的,严重的数据倾斜。Cross Join 往往没有 shuffle key,Hive 默认只能将所有任务落到一个 Reducer 上,产生极端倾斜。
如何避免笛卡尔积:
- 使用 Join 时必须指定合理的 Join 条件;
- 确保 Join 条件中字段存在,并是等值连接;
- Hive 4.0 以后,默认禁止无条件 Join 操作,执行前会抛出异常(如开启安全模式);
- 若确实需要 Cross Join,可显式开启:
sqlCopyEditset hive.strict.checks.cartesian.product=false; set hive.mapred.mode=nonstrict;
但必须控制参与数据量在可控范围内,并合理规划业务逻辑,避免滥用。
Spark 的核心特点是什么?与 Hadoop 的区别是什么?(tcl、银联)
Apache Spark 是一个基于内存的快速通用计算引擎,其核心优势主要体现在以下几个方面:
高性能:Spark 的最大特点是将中间计算结果保存在内存中(默认),而不是像 Hadoop MapReduce 那样依赖磁盘 I/O。这种基于内存的计算模型大大提升了迭代计算场景下的效率,例如机器学习、图计算等场景的表现远优于 MapReduce。
通用性:Spark 不仅支持批处理(Spark Core),还支持流处理(Spark Streaming)、交互式查询(Spark SQL)、图计算(GraphX)和机器学习(MLlib),提供一站式的数据处理能力。
容错机制:Spark 通过 RDD 的血缘机制(lineage)提供容错能力。当某个分区数据丢失时,可以基于血缘关系自动重新计算,避免频繁的磁盘 checkpoint。
易用性:Spark 提供了丰富的 API,支持 Java、Scala、Python 和 R,并集成了 SQL 查询能力,门槛相对较低,开发效率高。
可扩展性强:Spark 能运行在多种集群管理系统上,包括 YARN、Mesos 和 Kubernetes,并支持水平扩展,适用于从笔记本到大规模集群的不同规模。
与 Hadoop MapReduce 的核心区别如下表:
Spark |
Hadoop |
|
执行模型 |
基于磁盘的两阶段 Map-Reduce |
基于内存的 DAG 计算模型 |
性能 |
中间结果写磁盘,IO成本高 |
中间结果可缓存,性能优异 |
容错机制 |
基于 checkpoint 与 job 重跑 |
基于 RDD 的 lineage 自动恢复 |
编程复杂度 |
Map/Reduce 编程模型相对复杂 |
提供丰富 API,开发效率高 |
支持场景 |
适合批处理 |
同时支持批处理、流处理、SQL、图计算、机器学习 |
综上,Spark 在大多数场景下都表现出优于 Hadoop 的能力,尤其适合需要快速响应和高并发的数据处理业务。
Spark 的架构包含哪些组件?(科大讯飞)
Spark 的整体架构由 Driver、Cluster Manager 和 Executor 三部分组成,在逻辑层面又可细分为若干核心组件,各司其职,实现对不同类型数据处理任务的支持。
1. Spark Core: Spark Core 是 Spark 的基础组件,提供了内存计算能力、任务调度、容错机制和存储管理,是其他所有功能的基础。
2. Spark SQL: 用于结构化数据处理,支持 SQL 查询、DataFrame 和 Dataset API。Spark SQL 引入了 Catalyst 优化器和 Tungsten 执行引擎,实现了高效的执行和代码生成。
3. Spark Streaming(已被 Structured Streaming 替代): 支持微批处理流数据。最新推荐使用 Structured Streaming,它将流处理抽象为一个不断更新的表,统一批处理和流处理模型。
4. MLlib: Spark 的机器学习库,支持分类、回归、聚类、协同过滤等算法,提供了 pipeline API 来组织机器学习流程。
5. GraphX: 提供图计算能力,支持图遍历、PageRank 等算法,底层将图建模为 RDD 的拓展。
6. Catalyst 和 Tungsten 引擎: Catalyst 是 Spark SQL 的查询优化器,支持逻辑优化、物理优化等。 Tungsten 引擎用于物理执行优化,提升 CPU 和内存使用效率。
7. Cluster Manager(集群管理器): Spark 支持多种集群管理器,如 YARN、Standalone、Kubernetes 和 Mesos,负责资源分配和容器调度。
8. Driver 和 Executor: Driver 负责构建 DAG、调度任务,Executor 负责实际执行任务并存储计算结果。
Spark 架构的层次清晰,功能模块之间解耦,支持灵活组合和横向扩展,满足从开发到生产的多样化需求。
Spark 的运行流程是什么?(金山云、大厂八股文)
Spark 的运行流程包括作业提交、任务划分、资源调度和任务执行几个阶段,其核心机制是通过构建 DAG(有向无环图)来优化任务执行路径。
1. 作业提交阶段: 用户在 Driver 节点编写并提交 Spark 应用程序,SparkContext 会初始化运行环境,连接集群管理器。
2. 构建 DAG(DAG Scheduler): 用户代码中的转换操作(如 map、filter)会被记录为逻辑执行计划。遇到行动操作(如 collect、count)时,DAG Scheduler 会将逻辑操作链切分为若干 Stage。
3. 划分 Stage 与 Task: 每个 Stage 被划分成多个 Task,通常以分区为单位。窄依赖的操作会被合并到同一个 Stage,宽依赖的操作(如 shuffle)会触发 Stage 间的边界。
4. 资源调度(TaskScheduler): TaskScheduler 接收到 DAG Scheduler 划分的 TaskSet 后,将其分发到集群中的可用 Executor 上执行。调度策略可配置为 FIFO、公平调度器等。
5. 任务执行(Executor): Executor 负责运行分配的 Task,并将中间结果保存在内存或磁盘上。结果通过 Shuffle 服务进行数据交换。
6. 结果回收: 最终结果返回给 Driver 程序,用户可进行进一步处理或保存。
整个流程中 Spark 利用了内存管理、Shuffle 优化和缓存机制,提高了任务执行效率和容错能力。
RDD 的概念及弹性体现在哪里?(水滴、tcl)
RDD(Resilient Distributed Dataset)是 Spark 最基本的数据抽象,表示一个不可变、可并行计算的分布式对象集合。
RDD 的核心特性包括:
- 不可变性:一旦创建不可修改,所有转换都会生成新的 RDD。
- 分区性:RDD 被划分为多个分区,可并行处理。
- 计算性:RDD 是惰性计算的,只有遇到 action 操作时才真正触发计算。
- 容错性:每个 RDD 都记录了血缘信息(Lineage),用于在数据丢失时进行重算。
RDD 的弹性体现在以下方面:
1. 容错恢复能力强: 当某个节点或分区数据丢失时,Spark 会利用该 RDD 的 lineage(血缘依赖)信息,从上游 RDD 重新计算缺失部分,而非依赖磁盘持久化备份。
2. 并行计算能力强: RDD 的每个分区可由集群中的不同 Executor 并发处理,充分利用分布式资源。
3. 数据本地性支持: Spark 尽量将计算调度到数据所在节点,减少数据移动,提升效率。
4. 可组合性: RDD 支持多种 transformation 操作(如 map、flatMap、groupByKey),可组合成复杂的计算逻辑。
5. 低延迟缓存机制: RDD 提供 persist 和 cache 方法可将中间结果缓存到内存,提高迭代计算效率。
RDD 的设计使得 Spark 在面对节点失败、大规模数据并发计算时仍具有强健的容错与调度能力,是 Spark 的计算基石。
RDD 的宽依赖和窄依赖分别对应哪些算子?(携程)
RDD 的依赖关系分为窄依赖(Narrow Dependency)和宽依赖(Wide Dependency),两者在计算调度和 shuffle 行为上有本质区别。
窄依赖(Narrow Dependency): 一个父 RDD 的每个分区最多被一个子 RDD 的分区所使用,因此无需 shuffle,可并行执行。窄依赖具有较好的本地性和容错性能。
常见窄依赖算子包括:
- map
- filter
- flatMap
- union
- sample
- glom
- mapPartitions
宽依赖(Wide Dependency): 一个子 RDD 的分区依赖多个父 RDD 的分区,通常会引发 shuffle,需要跨节点拉取数据。
常见宽依赖算子包括:
- groupByKey
- reduceByKey
- join
- cogroup
- distinct
- repartition
- sortByKey
区别总结如下:
依赖类型 |
含义 |
是否引起 Shuffle |
是否可并行恢复 |
典型算子 |
窄依赖 |
每个父分区最多被一个子分区使用 |
否 |
是 |
map、filter、flatMap |
宽依赖 |
一个子分区依赖多个父分区 |
是 |
否(需重算整个父 RDD) |
groupByKey、join |
理解依赖类型对于调优 Spark 任务执行计划至关重要,避免不必要的 shuffle 是提升性能的关键策略之一。
什么情况下会产生 Spark Shuffle?(新浪微博)
Shuffle 是 Spark 中一种代价昂贵但必不可少的数据重分布机制,主要在以下几类操作中产生:
1. 聚合类操作(Aggregation):如 reduceByKey、aggregateByKey、combineByKey,会按 key 对数据进行分组,需要在不同分区间打散数据。
2. join 操作:两个 RDD 之间的 join 操作可能涉及 key 的重新分布,特别是两个大表 join 会产生大量 shuffle。
3. 重分区操作:如 repartition、coalesce、repartitionByRange,这些操作显式要求重新分区,会引起数据移动。
4. 排序操作:如 sortBy、sortByKey,会按顺序重新组织数据,也会触发 shuffle。
5. groupByKey:将数据按 key 分组,也是典型的触发 shuffle 的操作。
判断是否引发 shuffle 的关键是:是否需要跨分区拉取数据。如果需要对数据按 key 聚合、对齐或排序,基本都会引起 shuffle。
合理规避不必要的 shuffle,是提升 Spark 应用性能的关键优化点。
Spark Shuffle 的过程及优缺点是什么?(shein大数据面经)
Shuffle 过程是 Spark 将数据从一个或多个 Executor 重新分布到其他 Executor 的过程,包含 map 端写数据和 reduce 端拉数据两大阶段。
Shuffle 过程细节如下:
1. map 阶段(Shuffle Write):
- 每个 task 将数据按照 key 的 hash 分发到不同的 bucket(分区)。
- 数据先写入内存 buffer,超过阈值后写入本地磁盘临时文件。
- 每个分区会生成一个索引文件和若干数据文件。
2. reduce 阶段(Shuffle Read):
- reduce task 根据索引文件从不同节点拉取自己对应的 bucket 数据。
- 支持 push-based shuffle(新)或传统 pull-based shuffle。
- 拉取后数据可能在本地排序或聚合。
优点:
- 保证了跨分区数据对齐,是 join、聚合、排序等操作的基础。
- 支持灵活的调度与并行度调整。
缺点:
- 产生大量磁盘 IO 和网络 IO。
- 数据量大时容易引发 GC 压力或 OOM。
- shuffle 文件多,导致存储压力和任务失败概率增加。
优化建议:
- 尽量减少 shuffle 操作。
- 使用 map-side combine 减少数据量(如使用 reduceByKey 而非 groupByKey)。
- 合理调整分区数、内存比例和并发数。
reduceByKey 和 groupByKey 的区别是什么?(tcl、富途证券)
reduceByKey 和 groupByKey 都是基于 key 的聚合操作,但两者在性能、Shuffle 行为以及使用场景上存在明显区别。
是否预聚合 |
是(map 端预聚合) |
否(直接 Shuffle 后再聚合) |
是否触发 Shuffle |
是(但 Shuffle 数据量小) |
是(Shuffle 数据量大) |
性能 |
较优,网络传输数据较少 |
较差,网络传输数据多 |
内存占用 |
较低 |
较高(可能内存溢出) |
适用场景 |
大数据量聚合,倾向 reduceByKey |
少量数据收集所有值,使用 groupByKey |
reduceByKey 示例:
val rdd = sc.parallelize(Seq(("a", 1), ("b", 1), ("a", 2))) rdd.reduceByKey(_ + _).collect() // 输出:Array((a,3), (b,1))
groupByKey 示例:
val rdd = sc.parallelize(Seq(("a", 1), ("b", 1), ("a", 2))) rdd.groupByKey().mapValues(_.sum).collect() // 输出:Array((a,3), (b,1))
尽管结果一样,但 groupByKey 会先将所有相同 key 的数据全部传输到一个节点,再执行聚合,因此数据量大时容易 OOM。reduceByKey 会先在 map 端聚合,相同 key 的中间结果更少,效率更高。
Spark 数据倾斜的定位及解决方案是什么?(shein、新浪微博、滴滴)
Spark 数据倾斜是指在进行 Shuffle 操作(如 groupByKey、reduceByKey、join 等)时,由于某些 key 对应的数据量远大于其他 key,导致部分 task 数据处理时间远超其他 task,从而拉长整体作业运行时间,甚至导致作业失败。
定位数据倾斜的方法:
- 查看 Spark UI:进入 Spark Web UI 的 Stage 页面,查看每个 task 的运行时间、Shuffle Read Size 等指标。如果某些 task 耗时远高于平均值,可能存在倾斜。
- 通过 SQL 加 profile 工具:在 SQL 模式下使用 spark.sql.adaptive.skewJoin.enabled=true 并结合 Spark UI 的物理计划查看是否存在 skewed partition。
- 查看 key 的分布:使用 Hive 或 Spark SQL 分析相关 key 的分布,比如 select key, count(*) from table group by key order by count(*) desc limit 100。
解决数据倾斜的常用策略:
源头治理 |
数据预处理,去除或过滤超大 key 的数据,如使用 where 条件进行剪枝 |
增加并行度 |
调整
或
的值 |
加盐打散 |
对 key 进行加随机前缀(salting),如
,使热点 key 被打散处理 |
广播小表 |
使用 broadcast join,将小表广播至 executor,避免 shuffle 大表 |
Map端预聚合 |
使用
替代
,先在 map 阶段局部聚合减少传输数据量 |
倾斜 key 特处理 |
针对倾斜 key 单独处理,如拆分 job 或用专门逻辑处理热点 key,其余数据常规处理 |
代码示例(加盐打散):
val saltedRDD = rawRDD.map { case (key, value) => val salt = Random.nextInt(10) ((key + "_" + salt), value) }.reduceByKey(_ + _) val unsaltedRDD = saltedRDD.map { case (saltedKey, value) => val key = saltedKey.split("_")(0) (key, value) }.reduceByKey(_ + _)
该方法适用于热点 key 占据多数流量时,可显著缓解 executor 被压死的问题。
Spark 的 Stage 如何划分?(新浪微博大数据面经)
Spark 的 Stage 是 Job 执行过程中的
剩余60%内容,订阅专栏后可继续查看/也可单篇购买
17年+码农经历了很多次面试,多次作为面试官面试别人,多次大数据面试和面试别人,深知哪些面试题是会被经常问到。 在多家企业从0到1开发过离线数仓实时数仓等多个大型项目,详细介绍项目架构等企业内部秘不外传的资料,介绍踩过的坑和开发干货,分享多个拿来即用的大数据ETL工具,让小白用户快速入门并精通,指导如何入职后快速上手。 计划更新内容100篇以上,包括一些企业内部秘不外宣的干货,欢迎订阅!