知其然知其所以然,Flink运行底层原理和源码剖析(字节、阿里大数据面经)

第一章:Flink 是怎么跑起来的?——从入口点开始扒源码

Flink 项目那么大,要看源码从哪儿开始?别慌,带你一点点扒。

一般来说,我们提交一个 Flink 程序,大多是通过 StreamExecutionEnvironment.getExecutionEnvironment().execute() 这句熟得不能再熟的代码。那么我们今天就从这条链开始撸,看看底下藏了多少妖魔鬼怪。

StreamExecutionEnvironment 背后是个啥?

这货是用户入口类,不管你是写批的还是流的,基本都绕不过去它。我们直接看 getExecutionEnvironment() 这个方法:

public static StreamExecutionEnvironment getExecutionEnvironment() {
    return Utils.resolveFactory(threadContextClassLoader).createExecutionEnvironment();
}

看到没,它其实调了个 Utils.resolveFactory(...).createExecutionEnvironment(),这一看就是 SPI 风格,动态决定到底是创建本地的,还是集群的环境。

resolveFactory 做了什么?

public static ExecutionEnvironmentFactory resolveFactory(ClassLoader classLoader) {
    final ServiceLoader<ExecutionEnvironmentFactory> loader = ServiceLoader.load(ExecutionEnvironmentFactory.class, classLoader);
    for (ExecutionEnvironmentFactory factory : loader) {
        return factory;
    }
    return defaultFactory;
}

关键在 ServiceLoader,它是 Java 标准的 SPI 实现,也就是说只要在 META-INF/services/ 目录下注册了实现类,它就能把你捞出来。

在 Flink 中会有哪些实现呢?你猜到了:本地执行的 LocalStreamEnvironmentFactory,还有远程执行的 RemoteStreamEnvironmentFactory。源码里默认走的是本地:

return defaultFactory; // new LocalStreamEnvironmentFactory()

createExecutionEnvironment 是谁实现的?

我们看 LocalStreamEnvironmentFactory

@Override
public StreamExecutionEnvironment createExecutionEnvironment() {
    return new LocalStreamEnvironment();
}

LocalStreamEnvironment 做了哪些事?

构造函数里干了不少事,最核心的就是给 ExecutionConfig 设置默认参数、启动 EmbeddedJobExecutor 等等。我们重点看 execute() 这个链路是怎么调用的。

execute() -> StreamGraph -> JobGraph

你写的 Flink 作业,一开始就是一个 API DAG,后面它会被转成可调度的 JobGraph,而这一系列的“转化过程”正藏在 StreamExecutionEnvironment#execute() 里:

public JobExecutionResult execute(String jobName) throws Exception {
    StreamGraph streamGraph = getStreamGraph(jobName);
    return execute(streamGraph);
}

getStreamGraph() 是重点:

public StreamGraph getStreamGraph(String jobName) {
    this.streamGraph.setJobName(jobName);
    return this.streamGraph;
}

等等,不对啊?这看着像缓存,真相呢?要看下 StreamGraph 到底是啥时候构造出来的。

StreamGraph 是怎么一步步 build 出来的?

看 API,比如你写了:

env.fromElements(1,2,3).map(x -> x * 2).print();

每一步其实都会调用一个 addTransformation() 方法。

例如:

DataStream<T> map(...) {
    return new SingleOutputStreamOperator<>(...)
        .transform("Map", TypeInformation, new MapOperator());
}

transform(...) 最终会走到:

public <R> SingleOutputStreamOperator<R> transform(...) {
    transformation = new OneInputTransformation<>(...);
    environment.addOperator(transformation);
    return new SingleOutputStreamOperator<>(...);
}

注意 addOperator() 会把这个 transformation 放进 transformations 列表里,这些 Transformation 就是构造 StreamGraph 的基础。

接下来,StreamGraphGenerator 会把这些 Transformation 转成节点:

StreamGraphGenerator generator = new StreamGraphGenerator(transformations, config, checkpointCfg);
StreamGraph streamGraph = generator.generate();

StreamGraph 到 JobGraph

这步是 Flink 中最核心的“编译”过程。

JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);

这个方法会做很多事,包括 operator chaining、slot sharing group 设置、checkpoint config 生效等。你可以想象,StreamGraph 更像“语法树”,而 JobGraph 是“可执行计划”。

到这里为止,你写的 API DAG,已经变成了可提交的 JobGraph,下一步就是:提交到 cluster 去跑了。

第二章:JobGraph 是怎么提交给集群的?——LocalEnvironment 的幕后推手

继续沿着 LocalStreamEnvironment#execute(StreamGraph) 看,它最终走到了:

JobClient jobClient = executeAsync(streamGraph);
JobExecutionResult result = jobClient.getJobExecutionResult().get();

你没看错,这里走的是异步提交 + 阻塞等待,典型的 submit-and-wait。

来重点看 executeAsync()

public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
    JobGraph jobGraph = streamGraph.getJobGraph();
    configureJobGraph(jobGraph);
    return jobExecutor.execute(jobGraph);
}

这就出现了关键角色:jobExecutor,它其实是个 ExecutorServiceLoader 配出来的 JobExecutor 实现,在 Local 模式下,它就是 EmbeddedExecutor

也就是说,在本地模式中,Flink 是在一个嵌套线程里跑了个 mini 集群,JobManager、TaskManager 全部“假装自己在远方”。

EmbeddedExecutor 的 execute 过程

我们看下 EmbeddedExecutor#execute(JobGraph)

public CompletableFuture<JobClient> execute(JobGraph jobGraph) {
    DispatcherGateway gateway = miniCluster.submitJob(jobGraph);
    return CompletableFuture.completedFuture(new EmbeddedJobClient(gateway, jobGraph));
}

你没看错,真正提交作业的是 MiniCluster#submitJob

MiniCluster 是 Flink 本地模式最核心的模拟集群,它内部其实真的维护了一堆组件:Dispatcher、JobManagerRunner、TaskManager。对,就是你集群里那一堆东东,只不过缩小了规模,搬到了 JVM 进程里。

MiniCluster 是怎么玩 Dispatcher 的?

public DispatcherGateway submitJob(JobGraph jobGraph) throws Exception {
    Dispatcher dispatcher = this.dispatcher;
    dispatcher.submitJob(jobGraph);
    return dispatcher.getGateway();
}

我们一步步来追进去看 Dispatcher 的 submitJob

public void submitJob(JobGraph jobGraph) throws Exception {
    jobManagerRunner = createJobManagerRunner(jobGraph);
    jobManagerRunner.start();
}

JobManagerRunner:核心中的核心

它内部封装了 JobMaster,是负责调度每个 Job 的老大哥。

public void start() {
    jobMaster.start();
}

JobMaster 的启动过程会注册资源管理器、分发 ExecutionGraph、启动 slot 分配逻辑,等等等等,一堆调度初始化逻辑。

第三章:ExecutionGraph 是怎么生出来的?——调度大脑初现端倪

说真的,ExecutionGraph 是 Flink 调度系统的灵魂。如果说 JobGraph 是结构图,那 ExecutionGraph 就是调度图,它定义了:

  • 哪些 task 会启动,怎么切分成子任务;
  • 每个 subtask 的生命周期,状态转移逻辑;
  • 如何重启失败的节点,以及如何协同 checkpoint。

入口点在哪?

还记得上章里 JobMaster#start() 被调了吗?在它启动过程中,有一个特别重要的方法:

jobManagerRunner.createJobMaster().initializeJobMaster();

而这个 initializeJobMaster() 会调用:

ExecutionGraph executionGraph = schedulerNG.getExecutionGraph();

没错,ExecutionGraph 的创建,是由 Scheduler 负责的!

Scheduler 是谁?

在新版 Flink(尤其是 1.15+)中,调度器默认是 DefaultScheduler,它继承自 SchedulerBase。而构造 Scheduler 时,会传入一个叫 ExecutionGraphFactory 的东东。

ExecutionGraph executionGraph = executionGraphFactory.createAndRestoreExecutionGraph(...);

好,接下来咱们进入正题——到底 ExecutionGraph 是怎么构造出来的?

ExecutionGraphFactory 做了哪些黑科技?

public ExecutionGraph createAndRestoreExecutionGraph(...) {
    ExecutionGraph executionGraph = createExecutionGraph();
    restoreState(executionGraph);
    return executionGraph;
}

它做了两件大事:

  • 构造 ExecutionGraph(基于 JobGraph);
  • 如果有保存点(savepoint/checkpoint),就进行状态恢复。

这两步都很关键,我们先关注构造部分:

ExecutionGraph executionGraph = new ExecutionGraph(...);
executionGraph.attachJobGraph(jobGraph);

熟悉的构造函数 + attach 方法组合拳来了。

attachJobGraph:组装逻辑集中地

这是整个 ExecutionGraph 生成过程中,最复杂的一环,绝对的“重头戏”。源码就像一锅炖肉,全是料:

for (JobVertex vertex : jobGraph.getVerticesSortedTopologicallyFromSources()) {
    executionVertices = createExec

剩余60%内容,订阅专栏后可继续查看/也可单篇购买

17年+码农经历了很多次面试,多次作为面试官面试别人,多次大数据面试和面试别人,深知哪些面试题是会被经常问到。 在多家企业从0到1开发过离线数仓实时数仓等多个大型项目,详细介绍项目架构等企业内部秘不外传的资料,介绍踩过的坑和开发干货,分享多个拿来即用的大数据ETL工具,让小白用户快速入门并精通,指导如何入职后快速上手。 计划更新内容100篇以上,包括一些企业内部秘不外宣的干货,欢迎订阅!

全部评论

相关推荐

不愿透露姓名的神秘牛友
07-05 15:22
已编辑
字节跳动 后端开发实习生 0 本科其他
点赞 评论 收藏
分享
评论
点赞
收藏
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务