知其然知其所以然,Flink运行底层原理和源码剖析(字节、阿里大数据面经)
第一章:Flink 是怎么跑起来的?——从入口点开始扒源码
Flink 项目那么大,要看源码从哪儿开始?别慌,带你一点点扒。
一般来说,我们提交一个 Flink 程序,大多是通过 StreamExecutionEnvironment.getExecutionEnvironment().execute()
这句熟得不能再熟的代码。那么我们今天就从这条链开始撸,看看底下藏了多少妖魔鬼怪。
StreamExecutionEnvironment 背后是个啥?
这货是用户入口类,不管你是写批的还是流的,基本都绕不过去它。我们直接看 getExecutionEnvironment()
这个方法:
public static StreamExecutionEnvironment getExecutionEnvironment() { return Utils.resolveFactory(threadContextClassLoader).createExecutionEnvironment(); }
看到没,它其实调了个 Utils.resolveFactory(...).createExecutionEnvironment()
,这一看就是 SPI 风格,动态决定到底是创建本地的,还是集群的环境。
resolveFactory 做了什么?
public static ExecutionEnvironmentFactory resolveFactory(ClassLoader classLoader) { final ServiceLoader<ExecutionEnvironmentFactory> loader = ServiceLoader.load(ExecutionEnvironmentFactory.class, classLoader); for (ExecutionEnvironmentFactory factory : loader) { return factory; } return defaultFactory; }
关键在 ServiceLoader
,它是 Java 标准的 SPI 实现,也就是说只要在 META-INF/services/
目录下注册了实现类,它就能把你捞出来。
在 Flink 中会有哪些实现呢?你猜到了:本地执行的 LocalStreamEnvironmentFactory
,还有远程执行的 RemoteStreamEnvironmentFactory
。源码里默认走的是本地:
return defaultFactory; // new LocalStreamEnvironmentFactory()
createExecutionEnvironment 是谁实现的?
我们看 LocalStreamEnvironmentFactory
:
@Override public StreamExecutionEnvironment createExecutionEnvironment() { return new LocalStreamEnvironment(); }
LocalStreamEnvironment 做了哪些事?
构造函数里干了不少事,最核心的就是给 ExecutionConfig 设置默认参数、启动 EmbeddedJobExecutor 等等。我们重点看 execute()
这个链路是怎么调用的。
execute() -> StreamGraph -> JobGraph
你写的 Flink 作业,一开始就是一个 API DAG,后面它会被转成可调度的 JobGraph,而这一系列的“转化过程”正藏在 StreamExecutionEnvironment#execute()
里:
public JobExecutionResult execute(String jobName) throws Exception { StreamGraph streamGraph = getStreamGraph(jobName); return execute(streamGraph); }
getStreamGraph()
是重点:
public StreamGraph getStreamGraph(String jobName) { this.streamGraph.setJobName(jobName); return this.streamGraph; }
等等,不对啊?这看着像缓存,真相呢?要看下 StreamGraph
到底是啥时候构造出来的。
StreamGraph 是怎么一步步 build 出来的?
看 API,比如你写了:
env.fromElements(1,2,3).map(x -> x * 2).print();
每一步其实都会调用一个 addTransformation()
方法。
例如:
DataStream<T> map(...) { return new SingleOutputStreamOperator<>(...) .transform("Map", TypeInformation, new MapOperator()); }
而 transform(...)
最终会走到:
public <R> SingleOutputStreamOperator<R> transform(...) { transformation = new OneInputTransformation<>(...); environment.addOperator(transformation); return new SingleOutputStreamOperator<>(...); }
注意 addOperator()
会把这个 transformation 放进 transformations
列表里,这些 Transformation 就是构造 StreamGraph 的基础。
接下来,StreamGraphGenerator
会把这些 Transformation 转成节点:
StreamGraphGenerator generator = new StreamGraphGenerator(transformations, config, checkpointCfg); StreamGraph streamGraph = generator.generate();
StreamGraph 到 JobGraph
这步是 Flink 中最核心的“编译”过程。
JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
这个方法会做很多事,包括 operator chaining、slot sharing group 设置、checkpoint config 生效等。你可以想象,StreamGraph 更像“语法树”,而 JobGraph 是“可执行计划”。
到这里为止,你写的 API DAG,已经变成了可提交的 JobGraph,下一步就是:提交到 cluster 去跑了。
第二章:JobGraph 是怎么提交给集群的?——LocalEnvironment 的幕后推手
继续沿着 LocalStreamEnvironment#execute(StreamGraph)
看,它最终走到了:
JobClient jobClient = executeAsync(streamGraph); JobExecutionResult result = jobClient.getJobExecutionResult().get();
你没看错,这里走的是异步提交 + 阻塞等待,典型的 submit-and-wait。
来重点看 executeAsync()
:
public JobClient executeAsync(StreamGraph streamGraph) throws Exception { JobGraph jobGraph = streamGraph.getJobGraph(); configureJobGraph(jobGraph); return jobExecutor.execute(jobGraph); }
这就出现了关键角色:jobExecutor
,它其实是个 ExecutorServiceLoader
配出来的 JobExecutor
实现,在 Local 模式下,它就是 EmbeddedExecutor
。
也就是说,在本地模式中,Flink 是在一个嵌套线程里跑了个 mini 集群,JobManager、TaskManager 全部“假装自己在远方”。
EmbeddedExecutor 的 execute 过程
我们看下 EmbeddedExecutor#execute(JobGraph)
:
public CompletableFuture<JobClient> execute(JobGraph jobGraph) { DispatcherGateway gateway = miniCluster.submitJob(jobGraph); return CompletableFuture.completedFuture(new EmbeddedJobClient(gateway, jobGraph)); }
你没看错,真正提交作业的是 MiniCluster#submitJob
。
MiniCluster 是 Flink 本地模式最核心的模拟集群,它内部其实真的维护了一堆组件:Dispatcher、JobManagerRunner、TaskManager。对,就是你集群里那一堆东东,只不过缩小了规模,搬到了 JVM 进程里。
MiniCluster 是怎么玩 Dispatcher 的?
public DispatcherGateway submitJob(JobGraph jobGraph) throws Exception { Dispatcher dispatcher = this.dispatcher; dispatcher.submitJob(jobGraph); return dispatcher.getGateway(); }
我们一步步来追进去看 Dispatcher 的 submitJob
:
public void submitJob(JobGraph jobGraph) throws Exception { jobManagerRunner = createJobManagerRunner(jobGraph); jobManagerRunner.start(); }
JobManagerRunner:核心中的核心
它内部封装了 JobMaster,是负责调度每个 Job 的老大哥。
public void start() { jobMaster.start(); }
而 JobMaster
的启动过程会注册资源管理器、分发 ExecutionGraph、启动 slot 分配逻辑,等等等等,一堆调度初始化逻辑。
第三章:ExecutionGraph 是怎么生出来的?——调度大脑初现端倪
说真的,ExecutionGraph
是 Flink 调度系统的灵魂。如果说 JobGraph 是结构图,那 ExecutionGraph 就是调度图,它定义了:
- 哪些 task 会启动,怎么切分成子任务;
- 每个 subtask 的生命周期,状态转移逻辑;
- 如何重启失败的节点,以及如何协同 checkpoint。
入口点在哪?
还记得上章里 JobMaster#start()
被调了吗?在它启动过程中,有一个特别重要的方法:
jobManagerRunner.createJobMaster().initializeJobMaster();
而这个 initializeJobMaster()
会调用:
ExecutionGraph executionGraph = schedulerNG.getExecutionGraph();
没错,ExecutionGraph 的创建,是由 Scheduler 负责的!
Scheduler 是谁?
在新版 Flink(尤其是 1.15+)中,调度器默认是 DefaultScheduler
,它继承自 SchedulerBase
。而构造 Scheduler 时,会传入一个叫 ExecutionGraphFactory
的东东。
ExecutionGraph executionGraph = executionGraphFactory.createAndRestoreExecutionGraph(...);
好,接下来咱们进入正题——到底 ExecutionGraph 是怎么构造出来的?
ExecutionGraphFactory 做了哪些黑科技?
public ExecutionGraph createAndRestoreExecutionGraph(...) { ExecutionGraph executionGraph = createExecutionGraph(); restoreState(executionGraph); return executionGraph; }
它做了两件大事:
- 构造 ExecutionGraph(基于 JobGraph);
- 如果有保存点(savepoint/checkpoint),就进行状态恢复。
这两步都很关键,我们先关注构造部分:
ExecutionGraph executionGraph = new ExecutionGraph(...); executionGraph.attachJobGraph(jobGraph);
熟悉的构造函数 + attach 方法组合拳来了。
attachJobGraph:组装逻辑集中地
这是整个 ExecutionGraph 生成过程中,最复杂的一环,绝对的“重头戏”。源码就像一锅炖肉,全是料:
for (JobVertex vertex : jobGraph.getVerticesSortedTopologicallyFromSources()) { executionVertices = createExec
剩余60%内容,订阅专栏后可继续查看/也可单篇购买
17年+码农经历了很多次面试,多次作为面试官面试别人,多次大数据面试和面试别人,深知哪些面试题是会被经常问到。 在多家企业从0到1开发过离线数仓实时数仓等多个大型项目,详细介绍项目架构等企业内部秘不外传的资料,介绍踩过的坑和开发干货,分享多个拿来即用的大数据ETL工具,让小白用户快速入门并精通,指导如何入职后快速上手。 计划更新内容100篇以上,包括一些企业内部秘不外宣的干货,欢迎订阅!