flink jobmaster分析
一、JobMaster与JobManager
在上一篇重点阐述了工作图的导入与传播。鉴于版本更新的影响,在现有框架下实现的功能模块通常会在后续版本中得到相应升级。本文仅就 Jobmaster 进行说明,不再赘述原有的 JobManager。
经由 JobManagerRunner 生成后,工作流程将被传递至 JobMaster 执行层,随后将 ExecutionGraph 传递至 Task 执行层。
如前所述,工作的图传播及分配主要依靠 JobManagerRunner 这一机制完成。
public abstract class Dispatcher extends FencedRpcEndpoint
the responsibilities of a dispatcher service. It realizes
the core functionalities of a dispatcher and is designed
to manage asynchronous job submissions and coordination
between worker nodes in a distributed system.
private final Map<JobID, CompletableFuture
private final LeaderElectionService leaderElectionService;
private ArchivingFinalizedExecutionGraph Store archived Execution Graph Store;
//JobManagerRunner的生成工厂类对象
private final JobManagerRunnerFactory jobManagerRunnerFactory;
......
}
//这里得到了JobManagerRunner并且archivedExecutionGraph和ExecutionGraph继承了共同的接口AccessExecutionGraph
private JobManagerRunner startJobManagerRunner(JobManagerRunner jobManagerRunner) throws Exception {
final JobID jobId = jobManagerRunner.getJobGraph().getJobID();
jobManagerRunner.getResultFuture().whenCompleteAsync(
(ArchivedExecutionGraph archivedExecutionGraph, Throwable throwable) -> {
// check if we are still the active JobManagerRunner by checking the identity
//noinspection ObjectEquality
if (jobManagerRunner == jobManagerRunnerFutures.get(jobId).getNow(null)) {
if (archivedExecutionGraph != null) {
jobReachedGloballyTerminalState(archivedExecutionGraph);
} else {
final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
......
}
} else {
log.debug("There is a newer JobManagerRunner for the job {}.", jobId);
}
}, getMainThreadExecutor());
jobManagerRunner.start();
return jobManagerRunner;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
通过上面的两个地方可以看到这两个类的应用,那么,下面看一下这两个类的实现:
该JobManagerRunner类支持Leader Contender、On-Completion Actions以及Auto-Closable Asynchronous接口。
该JobManagerRunner类支持Leader Contender、On-Completion Actions以及Auto-Closable Asynchronous接口。
private static final Logger = LoggerFactory.getLogger(JobManagerRunner.class);
// ------------------------------------------------------------------------
/** A lock variable is used to ensure that this runner can handle leader election events and job completion notifications simultaneously. */ private final Object lock = new Object(); // A synchronization mechanism to manage concurrent access.
/** The job graph needs to run. */
private final JobGraph jobGraph;
/** Serves to examine if a job requires execution. */
private final RunningJobsRegistry runningJobsRegistry;
/**
* Conducting a leader election is essential for this task.
*/
private final LeaderElectionService leaderElectionService;
private final LibraryCacheManager libraryCacheManager;
private final Executor executor;
private final JobMasterService jobMasterService;
private final FatalErrorHandler fatalErrorHandler;
private final CompletableFuture
private final CompletableFuture
private CompletableFuture
/** flag marking the runner as shut down. */
private volatile boolean shutdown;
这是一个私且不可变的CompletableFuture
// ------------------------------------------------------------------------
/**
- 当创建JobManager或JobManagerRunner时发生的异常直接抛出,并未报告给指定的FatalErrorHandler。
- @throws Exception 如果无法配置 runner,则会抛出异常(原因包括:所需的服务无法启动 或 无法初始化该Job)。
*/
public JobManagerRunner(
final JobGraph jobGraph,
final JobMasterServiceFactory jobMasterFactory,
final HighAvailabilityServices haServices,
final LibraryCacheManager libraryCacheManager,
final Executor executor,
final FatalErrorHandler fatalErrorHandler) throws Exception {
this.resultFuture 初始化为新 CompletableFuture<>> 空指针实例;
this.terminationFuture 初始化为新 CompletableFuture<> 无结果空指针实例;
this.leadershipOperation 设定为已完成未来任务且无结果;
// make sure we cleanly shut down out JobManager services if initialization fails
try {
this.jobGraph = checkNotNull(jobGraph);
this.libraryCacheManager = checkNotNull(libraryCacheManager);
this.executor = checkNotNull(executor);
this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
checkArgument(jobGraph.getNumberOfVertices() > 0, "The given job is empty");
// libraries and class loader first
try {
libraryCacheManager.scheduleJob(
jobGraph.retrieveJobID(), jobGraph.getUserJarBlobKeys(), jobGraph.getClasspaths());
} catch (IOException e) {
throw new Exception("Cannot setup the user's JAR files: " + e.getMessage(), e);
}
final ClassLoader applicationSpecificClassLoader = cacheManager.getApplicationSpecificClassLoader(jobGraph.getJobID());
if (applicationSpecificClassLoader.isNull()) {
throw new Exception("The application-specific class loader was not initialized.");
}
// high availability services next
the running jobs registry is retrieved via haServices.retrieveRunningJobsRegistry();
the leader election service is retrieved via haServices.retrieveJobLeader ElectionService(jobGraph.jobid);
this.leaderGatewayFuture = new CompletableFuture<>();
// 开始运行JobManager
this.jobMasterService = jobMasterFactory.generateJobMasterService(jobGraph, this, the user code loader);
}
catch (t) {
terminationFuture.handleExceptionally(t);
resultFuture.handleExceptionally(t);
throw new JobExecutionException(jobGraph.getJobID(), "Could not set up JobManager", t);
}
}
......
public void start() throws Exception {
try {
// 调用领导选举服务启动当前实例
leaderElectionService.start(this);
} catch (Exception e) {
log.error("未能启动JobManager的主要原因是领导选举服务未能正常运行.", e);
throw new Exception("导致无法启动领导选举服务的原因.", e);
}
}
final CompletableFuture
final CompletableFuture
leaderGatewayFuture = new CompletableFuture<>();
if (!oldLeaderGatewayFuture.isDone()) {
leaderGatewayFuture.whenComplete(
(JobMasterGateway jobMasterGateway, Throwable throwable) -> {
if (throwable != null) {
oldLeaderGatewayFuture.completeExceptionally(throwable);
} else {
oldLeaderGatewayFuture.complete(jobMasterGateway);
}
});
}
}
......
}
//在org.apache.flink.runtime.dispatcher这个包中创建了这个JobManagerRunner
public enum DefaultJobManagerRunnerFactory implements JobManagerRunnerFactory {
INSTANCE;
@Override
public JobManagerRunner createJobManagerRunner(
定义一个特定的JVM环境,
配置configuration,
指定一种特定的高可用性服务,
高可用性服务high availability services,
指定心跳服务,
心跳服务heartbeat services,
共享的JVM管理服务job manager shared services,
JVM性能监控工厂job manager job metric group factory,
致命错误处理器fatal error handler) throws Exception {
创建并返回一个新的JobManagerRunner实例(基于提供的jobGraph、jobMasterFactory以及highAvailabilityServices等组件配置)。
该实例将依次调用以下服务:
- jobManagerServices.getLibraryCacheManager(),
- jobManagerServices.getScheduledExecutorService(),
和
fatalErrorHandler。
此为分发执行图的类
......
This includes specific details such as the job id, job name, and job configuration, and so on.
/** Represents either a serialized version of the job information or a blob key pointing to the offloaded job information. */
private final Either<SerializedValue
/** The executor which operates on futures. */
private final ScheduledExecutorService futuresExecutor;
/** 该异步执行器负责执行IO阻塞操作. */
private final Executor ioExecutor;
/** Service that operates tasks in the primary thread of the job manager. */
@Nonnull
private static final ComponentMainThreadExecutor jobMasterMainThreadExecutor;
/** {@code true} if all source tasks are stoppable. */
private final boolean isAllTasksStoppable = true;
Each instance of job vertex belonging to this graph.
Each instance of job vertex belonging to this graph.
/** Each vertex, following its creation sequence, as it was generated. **/
private final List
/** All types of interim data belonging to the current graph flow. */
private final ConcurrentHashMap<IntermediateDataSetID, IntermediateResult> intermediateDataStorage;
/** The currently executed tasks, utilizing callback mechanisms. */
private final ConcurrentHashMap<ExecutionAttemptID, Execution> currentExecutions;
/** Listeners for messages when the entire job triggers a state change
- including transitions such as from RUNNING to FINISHED. */
final ListjobStatusListeners;
listeners that respond to updates in the task's status state whenever a single task execution occurs
/** The realization that makes decisions on mitigating errors in tasks. */
private final Fault Tolerance Mechanism failoverStrategy;
/** 作业执行的状态. */
private volatile JobStatus类型的state变量 = JobStatus.CREATED;
/** 该future一旦工作状态达到终端状态即刻完成. */
私有且不可变的CompletableFuture
/**
- 每次全局恢复时,本版本字段会被递增.
- 通过局部故障转移策略来解决并行重启尝试中产生的冲突.
*/
private volatile long globalModVersion;
/**
- The cause of the job's failure.
- This represents the primary uncaught exception that is irreparable and directly causes job failure.
*/
private final static Throwable causesOfFailure;
/** 该字段存储了扩展的失败原因信息。此外,在此字段之外还存在另一个变量 named 'failureCause'。其中前者作为强引用直接指向异常对象(Exception),而后者不与任何用户定义的类建立强引用关系。*/
private volatile ErrorInfo failureInfo;
/**
- 代表正在进行或已完成的调度任务的未来。
*/
@Nullable
private volatile CompletableFutureschedulingFuture;
// ------ Fields related to the execution process must be cleared prior to archiving ------
/**
- 负责管理 checkpoint的协调者(仅当快照式检查点启用时)。
*/
private CheckpointCoordinator checkpointCoordinator;
/** Checkpoint stats tracker distinct from the coordinator, to become accessible once archived. */
private CheckpointStatsTracker checkpointStatsTracker;
// ------ Fields that are solely applicable to archived execution graphs ------------
private String planJson;
@VisibleForTesting
public ExecutionGraph(
@VisibleForTesting
Scheduled executor service future executor,
@VisibleForTesting
Executor executable executor input executor,
@VisibleForTesting
Job ID jobId,
@VisibleForTesting
String job name,
@VisibleForTesting
Configuration job config,
@VisibleForTesting
Serialized value of execution configuration serialized config,
@VisibleForTesting
Time timeout period,
@VisibleForTesting
Restart strategy strategy restart strategy,
@VisibleForTesting
) throws IOException {
实例化为新的JobInformation对象(id、name、serializedConfig、jobConfig、空集合、空集合),并传递给后续执行器:futureExecutor和ioExecutor等
,指定超时时间和重试策略:timeout和restartStrategy
,以及槽位提供者:slotProvider
......
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
这两个类的基本作用知道了,一个用于从分发器在创建JobManagerRunner时内部启动服务createJobMasterService时由分发向下给JobMaster,一个经JobMaster分发到Task,可见JobMaster是一个中继者,看一下它的定义。
二、JobMaster的构成
这个类是作业管理的类,有点大:
该类继承于FencedRpcEndpoint,并实现JobMasterGateway和JobMasterService
/** Standard default names for Flink's distributed components. */
public static final String JOB_MANAGER_STRING = "JOB_MANAGER_NAME";
public static final String ARCHIVE_STRING = "archive string";
// ------------------------------------------------------------------------
private final JobMasterConfiguration jobMasterConfiguration;
private final ResourceID resourceId;
private final JobGraph jobGraph;
private final Time rpcTimeout;
private final HighAvailabilityServices highAvailabilityServices;
private final BlobWriter blobWriter;
private final JobManagerJobMetricGroupFactory jobmetricGroupFactory;
private final HeartbeatManager<AccumulatorReport, Void> taskManagerHeartbeatManager;
private final HeartbeatManager<Void, Void> resourceManagerHeartbeatManager;
private final ScheduledExecutorService scheduledExecutorService;
private final OnCompletionActions jobCompletionActions;
private final FatalErrorHandler fatalErrorHandler;
private final ClassLoader userCodeLoader;
private final SlotPool slotPool;
private final Scheduler scheduler;
private final RestartStrategy restartStrategy;
// --------- BackPressure --------
private final BackPressureStatsTracker backPressureStatsTracker;
// --------- ResourceManager --------
private final LeaderRetrievalService resourceManagerLeaderRetriever is a final instance of LeaderRetrievalService.
// --------- TaskManagers --------
private final Map<ResourceID, Tuple2<TaskManagerLocation, TaskExecutorGateway>> registeredTaskManagers;
// -------- Mutable fields ---------
private ExecutionGraph executionGraph;
@Nullable
private JobManagerJobStatusListener jobStatusListener;
@Nullable
private job_manager_job_metric_group: JobManagerJobMetricGroup;
@Nullable
private String lastInternalSavepoint;
@Nullable
private ResourceManagerAddress resourceManagerAddress;
@允许null的私有变量ResourceManagerConnection
@Nullable private Established ResourceManager Connection establishedResourceManagerConnection;
private Map<String, Object> accumulators;
// ------------------------------------------------------------------------
public JobMaster(
this.rpcService rpcService,
this.jobMasterConfiguration jobMasterConfiguration,
this.resourceId resourceId,
this.jobGraph jobGraph,
this.highAvailabilityService highAvailabilityService,
this.slotPoolFactory slotPoolFactory,
this.schedulerFactory schedulerFactory,
this.jobManagerSharedServices jobManagerSharedServices,
this.heartbeatService heartbeatServices, // 原文为"HeartbeatServices"应为"is"关系
this.jobMetricGroupFactory jobMetricGroupFactory,
onCompletionActions onCompletionActions, // 原文为"OnCompletionActions"应为"is"关系
fatalErrorHandler fatalErrorHandler, // 原文为"FatalErrorHandler"应为"is"关系
userCodeLoader userCodeLoader) throws Exception {
Extend the rpcService by generating a unique name using AkkaRpcServiceUtils.createRandomName with the JOB_MANAGER_NAME as the identifier.
final JobMasterGateway self s = getSelf s (JobMaster s class);
确保jobMasterConfiguration的赋值有效;
确保resourceId的赋值不为空;
确保jobGraph的赋值合法;
获取jobMasterConfiguration中的RpcTimeout设置;
确保highAvailabilityService的存在性;
获得blobWriter服务实例;
调用jobManagerSharedServices获取ScheduledExecutorService实例;
确保jobCompletionActions的有效性;
确保fatalErrorHandler的配置正确性;
确保userCodeLoader能够正常加载用户代码;
确保jobMetricGroupFactory配置无误。
current.task.heartbeat.manager is assigned to taskmanagerscheduler by calling the create heartbeatscheduler sender with the following parameters: resourceId, a new Task heartbeat listener connected to self.gateway, the result of RPC service's scheduled executor, and log.
the resource manager for heartbeat manager = heartbeatServices.createHeartbeatManager(
the resourceId,
a ResourceManager with a HeartbeatListener,
the scheduled executor from rpcService,
log
);
将字符串变量jobName赋值为jobGraph.getName()的结果。
log.info("Initializing job {} ({}).", jobName, jid);
the final configuration of the RestartStrategy in the RestartStrategies package is assigned to restartStrategyConfiguration. The process involves first retrieving the serialized execution configuration from job graph. Then, this configuration is de-serialized using userCodeLoader. Finally, getRestartStrategy() is called to obtain the configured restart strategy for job execution.
this.restartStrategy is set to be resolved by RestartStrategyResolving through the parameters restartStrategyConfiguration, jobManagerSharedServices.getRestartStrategyFactory(), and jobGraph.isCheckpointingEnabled.
log.info("Using restart strategy {} for {} ({}).", this.restartStrategy, jobName, jid);
资源管理器的 leader retriever 赋于 resourceManagerLeaderRetriever 等于 高可用性服务中资源管理器的 leader retriever.
self.slot_pool = _verifyObjectIsNotEmpty(slot_pool_factory).create_slot_pool(graph_job_id);
the scheduler is assigned the result of ensuring that schedulerFactory is not null and then generating a scheduler instance by passing in the slotPool.
this.registeredTaskManagers = new HashMap<>(4);
this.backPressureStatisticsCollector = verifyNotNull(jobManagerSharedServices.getBackPressureStatisticsCollector());
this.lastValidatedInternalSavePoint = null;
this.jobManagerJobMetricGroup is constructed by jobMetricGroupFactory from jobGraph.
This execution graph is both created and restored based on the jobManagerJobMetricGroup.
The jobStatusListener is assigned a null value.
将this.resourceManagerConnection设置为空,并将this.establishedResourceManagerConnection初始化为null。
this.accumulators = new HashMap<>();
}
public CompletableFuture
Collection
int newParallelism,
RescalingBehaviour rescalingBehaviour,
Time timeout) {
当newParallelism的值小于或等于零时,
程序将返回一个由FutureUtils类提供的已完成的异常处理结果。
这个异常是一个JobModificationException实例,
并将其消息字段设置为“该缩放操作的目标并行度必须大于零”。
// 1. Verify if it's possible to adjust resource allocation for operators and corresponding vertices
try {
Call the function to adjust resource allocation for operators.
} catch (FlinkException e) {
final String msg = String.format("Failed to allocate resources for operation named: %s.", jobGraph.getName());
}
调用日志记录器并传递相关信息;随后返回一个已完成异常处理的未来实例。
final ExecutionGraph currentExecutionGraph = executionGraph;
通过工厂方法生成该对象实例。
创建并赋值给新执行图变量。
begin {
variable = method(parameter);
} catch (exceptionType1 | exceptionType2 exception) {
return utility.completedExceptionally(new ExceptionModificationException(
"无法按预期缩放执行图", exception));
}
//disable the checkpoint coordinator to suppress subsequent checkpoints
final CheckpointCoordinator coordinator = currentExecutionGraph.retrieveCheckpointCoordinator();
coordinator.stopCheckpointScheduler();
// 4. select a save point
final CompletableFuture
final CompletableFuture
newExecutionGraph,
savepointFuture
).handleAsync((
ExecutionGraph executionGraph,
Throwable failure
) -> {
if (failure != null) {
// In the event that we were unable to obtain a savepoint or restore from it,
// we must restart the checkpoint coordinator and halt the rescaling process.
if (checkpointCoordinator.isPeriodicCheckpointingConfigured()) {
checkpointCoordinator.startCheckpointScheduler();
}
}
}
try {
throw new CompletionException(ExceptionUtils.stripCompletionException(failure));
} else {
return executionGraph;
}
}, get主线进程执行器();
// 5. 挂起当前的任务
final CompletableFuture
(忽略的ExecutionGraph) -> {
挂起执行图(new FlinkException("Job is being rescaled."));
返回currentExecutionGraph.getTerminationFuture();
},
get主线程Executor());
final CompletableFuture
(JobStatus jobStatus) -> else if (jobStatus != JobStatus.SUSPENDED) {
final String msg = String.format(
"Job ${jobGraph.getName()} rescaling failed because we could not suspend the execution graph."
);
log.info(msg);
throw new CompletionException(JobModificationException(msg));
}
);
从取点后恢复新的执行图。
final CompletableFuture
executionGraphFuture,
(Void ignored, ExecutionGraph restoredExecutionGraph) -> {
// 确认当前执行图是否与之前的一致
if (executionGraph == currentExecutionGraph) {
clearExecutionGraphFields();
assignExecutionGraph(restoredExecutionGraph, newJobManagerJobMetricGroup);
scheduleExecutionGraph();
}
});
调用Acknowledge对象获取当前值。
} else {
抛出...JobModificationException异常。
},
getMainThreadExecutor());
rescalingFuture.whenCompleteAsync(
acknowledged and ignoring Throwables, which are instances of Throwable,
-> {
if (throwable != null) {
// resulting in failure of the new execution graph
newExecutionGraph.failGlobal(
new SuppressRestartsException(
new FlinkException(
String.format("Failed to rescale the job %s.", jobGraph.getJobID()),
throwable)));
}
}, get主线程执行器());
return rescalingFuture;
}
@Override
public CompletableFuture<Collection
// 方法体内容保持不变
}
由两个字段组成的元组TASK_MANAGER_LOCATION和TASK_EXECUTOR_GATEWAY定义的任务管理器对象taskManager被赋值为registeredTaskManagers通过get方法调用的结果
如果任务管理器为null,则执行以下操作:调用FutureUtils.completedExceptionally方法,并创建一个新的Exception对象(拼接"Unknown TaskManager"和taskManagerId字段的值),然后返回该异常处理的结果。
永久性 final Task Location Manager taskLocationManager = taskManager.f0;
永久性 final Task Gateway for Execution executorGateway = taskManager.f1;
final RpcTaskManagerGatewary finalObject = new RpcTaskManagerGatewary(taskExecutorGatewary, fencingToken)
后端将返回完成未来的操作结果;
池子向提供槽位的操作请求;
传入参数包括位置管理器地址、远程位置管理器入口以及可用槽位数量;
@Override public void failSlot(final ResourceID taskManagerId, final AllocationID allocationId, final Exception cause) { }
if (注册的任务管理者包含该任务经理ID) {
触发失败分配(分配ID, 原因);
} else {
log.warn("无法失败分配槽位" + 分配ID + "因为任务经理" + 任务经理ID + "未知");
}
@Override
public CompletableFuture
final String resourceAllocationUrl,
final TaskManagerPosition taskManagerPosition,
final Time timeout) {
final ResourceID taskManagerId = taskManagerLocation.getResourceID();
当前注册任务管理器已成功获取指定任务经理ID。
生成完整的注册管理响应并立即执行。
返回与之对应的 CompletableFuture执行结果。
当当前注册任务管理器未成功获取指定任务经理ID时,
通过远程服务对接指定任务管理器 Rpc地址并发起异步请求。
请求处理完成后,
根据接收到的异常信息返回相应的拒绝响应。
槽池将任务管理系统注册为槽池管理的一部分。
在槽池中存储了一个名为taskManagerId的任务管理元组,
该元组由两个字段组成:
一是位置信息,
二是任务执行 gateway。
The task manager should be maintained as a heartbeat target. The heart rate monitoring system should periodically check for status updates from the task manager. The heart rate monitoring system should periodically check for status updates from the task manager using a custom implementation of a heartbeat target that reads status information when available and reports any issues to higher levels in real time. Since the heart rate monitoring system does not require periodic heartbeats from the task manager, this monitoring mechanism is unnecessary.
@Override
public method sendHeartbeat(ResourceID resourceID) {
taskExecutorGateway.sendHeartbeatFromJobManager(resourceID);
}
创建新实例的JMTMRegistrationSuccess对象,并传递资源ID作为参数;然后获取主线执行器,并将其作为参数传递到另一个方法中;最后完成操作。
private Recognize startJobExecution(JobMasterId newJobMasterId) throws Exception {
validateRunsInMainThread();
checkNotNull(newJobMasterId, "The new JobMasterId must not be null.");
if (调用获取网眼令牌的方法得到的结果与新JobMasterId相等) {
log.info("已启动以使用JobMasterId {}执行任务.", newJobMasterId);
}
return Acknowledge.get();
}
setNewFencingToken(newJobMasterId);
startJobMasterServices();
输出日志信息:基于主节点{}下启动执行任务(编号{})
resetAndScheduleExecutionGraph();
return Acknowledge.get();
}
启动JobMasterServices方法
private void startJobMasterServices() throws Exception {
// 确保槽池已准备好接收来自该领导的消息
// 启动槽池以允许其接收消息
SlotPool.start(fencingToken, address, getMainThreadExecutor());
// 启动相关操作流程
Scheduler.start(getMainThreadExecutor());
}
// Once the ZooKeeperLeaderRetrieval has successfully retrieved the stored address, remove this block.
// Attempt to re-establish a connection with the previous leader.
The resource manager will attempt to re-establish a connection. (This may fail if initiated by "Starting JobMaster component".)
Task has completed and intends to initiate a connection with the resource manager.
Initiate leader retrieval process for the resource manager.
Upon notification of the leader, initiate connection establishment and enable slot pool slot requests.
The task has completed and intends to initiate a connection with the resource manager.
Initiate leader retrieval process for the resource manager.
Upon notification of the leader, initiate connection establishment and enable slot pool slot requests.
定义一个名为setNewFencingToken的操作(参数:JobMasterId类型的newJobMasterId){
如果当前的防 fencing token不为null{
日志信息‘正在重启旧作业’;
该操作会使用新的JobMasterId值:{}。
其中第一个空白处填入当前的防 fencing token值;
第二个空白处填入参数newJobMasterId值。
}
}
// 必须悬停当前执行流程
suspendExecution(new FlinkException("Old job with JobMasterId " + getFencingToken() +
"is being restarted using the new JobMasterId " + newJobMasterId + ".");
// set new leader id
setFencingToken(newJobMasterId);
}
private void assignExecutionGraph(
new ExecutionGraph executionGraph,
new JobManagerJobMetricGroup jobManagerJobMetricGroup) {
verifyThatTheOperationIsPerformedOnMainThread();
ensureThatTheExecutionGraphIsInTerminalState(executionGraph.getState());
assertThatJobManagerJobMetricGroupIsNotNull(jobManagerJobMetricGroup);
}
执行图变量被赋值为新的执行图。
工作管理器的 jobManagerJobMetricGroup 被赋值为新的 jobManagerJobMetricGroup。
private void scheduleExecutionGraph() {
checkState(jobStatusListener == null);
// register self as job status change listener
jobStatusListener = new JobManagerJobStatusListener();
executionGraph.registerJobStatusListener(jobStatusListener);
try {
通过执行图调度器来执行任务;
executionGraph.scheduleForExecution();
}
catch (Throwable t) {
当遇到不可预见的异常时进行处理;
executionGraph.failGlobal(t);
}
private ExecutionGraph createAndSaveExecutionGraph(JobManagerJobMetricGroup jobManagerJobMetricGroup) throws Exception {
执行图newExecutionGraph被赋值为由currentJobManagerJobMetricGroup参数生成的结果返回值
final DeploymentManager deploymentManager = newExecutionGraph.getDeploymentManager();
if (the checkpoint coordinator is not null) {
// verifying the presence of a valid checkpoint
if (!checkpointCoordinator.retrieve the latest checkpointed state(
all vertices of the new execution graph,
both set to false,
false)) {
// verify if we can attempt to recover the execution graph from a savepoint
attemptToRestoreExecutionGraph(modifiedExecutionGraph, retrieveSavepointSettings);
}
return newExecutionGraph;
}
private ExecutionGraph createAndReturnExecutionGraph(JobManagerJobMetricGroup currentJobManagerJobMetricGroup) throws JobExecutionException, JobException {
return ExecutionGraphBuilder.buildAndReturn(
null,
jobGraph,
jobMasterConfiguration.getConfiguration(),
scheduledExecutorService,
scheduler,
userCodeLoader,
highAvailabilityServices.getCheckpointRecoveryFactory(),
rpcTimeout,
restartStrategy,
currentJobManagerJobMetricGroup,
blobWriter,
jobMasterConfiguration.getSlotRequestTimeout(),
log
);
}
......
私有 CompletableFuture<执行图>从缩放保存点恢复执行图(新执行图:newExecutionGraph, 保存点未来:savepointFuture){
返回 savepointFuture
.thenApplyAsync(
(@可选字符串 savepointPath)-> {
如果 savepointPath 不为 null {
尝试 {
tryRestore执行图 From Savepoint(新执行图:newExecutionGraph, 从 Savepoint 对象:SavepointRestoreSettings.forPath(savepointPath, false))
} 捕捉(Exception e){
最终字符串 message = 格式化字符串:"无法从临时缩放保存点恢复 %s. 这可能表明保存点 %s 被损坏. 作为预防措施, 删除此保存点.",
参数包括: message, savepointPath);
}
}
}
)
}
log.info(message);
CompletableFuture 执行异步操作:
\text{首先执行一个嵌套的 } \text{\texttt{runAsync}} \text{ 方法:}
\text{它内部定义了一个匿名函数:}
\text{如果 } savepointPath \text{ 等于 } lastInternalSavepoint \text{ 则将 } lastInternalSavepoint \text{ 设置为 null}。
\text{\texttt{getMainThreadExecutor}} \text{ 返回的结果作为运行环境参数传递给 } runAsync。
\text{\texttt{thenRunAsync}} \text{ 方法随后被调用:}
\text{它内部定义了一个新的匿名函数:}
\text{通过调用 } disposeSavepoint(savepointPath) \text{ 实施资源释放操作}。
throw new CompletionException(new JobModificationException(message, e));
}
} else {
// No rescaling savepoint, restart from the initial savepoint or none
try {
tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, jobGraph.getSavepointRestoreSettings());
} catch (Exception e) {
final String message = String.format("Could not restore from initial savepoint. This might indicate " +
"that the savepoint %s got corrupted.", jobGraph.getSavepointRestoreSettings().getRestorePath());
log.info(message);
调用一个新的JobModificationException异常实例并将其包裹在当前的CompletionException中进行抛出
return newExecutionGraph;
}, scheduledExecutorService);
}
private CompletableFuture
return triggerSavePoint(
// ...
getMainThreadExecutor()
);
}
private method rescaleJobGraph(
var collection of JobVertexID objects named operators,
integer newParallelism,
RescalingBehaviour rescalingBehaviour
) throws an FlinkException {
for each JobVertexID jobVertexId in the collection operators {
final JobVertex jobVertex = jobGraph.finds the corresponding vertex with the ID jobVertexId;
}
}
// Update maximum parallelism if it hasn't been configured.
final ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId);
if (执行顶点不为null) {
设置最大并发数(执行顶点的最大并行度);
}
rescalingBehaviour.accept(jobVertex, newParallelism);
}
}
@Override
public JobManager getJobManager() {
return this.getSelfGateway(JobManager.class);
}
private class ResourceManager-LeaderListener implements Leader Retrieval Listener {
@Override
public void notifyResourceManagerLeader(final String leaderAddress, final UUID sessionId) {
executeAsynchronous(() -> notifyOfNewResourceManagerLeader(
leaderAddress,
RESOURCE_MANAGER_ID_ID.fromUuidOrNull(sessionId)));
}
@Override
public void handleError(final Exception exception) {
handleJobMasterError(new Exception("Critical error in ResourceManager leader服务中", exception));
}
private class ResourceManagerConnection extends RegisteredRpcConnection<ResourceManagerId, ResourceManagerGateway, JobMasterRegistrationSuccess> {
private final JobID jobID;
......
}
//----------------------------------------------------------------------------------------------
private subclass JobStatusMonitorListener implements JobStatusListener {
private volatile boolean running = true;
......
}
该私有类实现了该心跳监听器接口,并接收一个累积报告类型和无类型的两个参数。
private final JobMasterGateway jobMasterGateway;
......
}
这是一个私有类,并实现了HeartbeatListener<Void, Void>接口
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
这个类有点长。但是通过看它的变量和功能函数,可以发现它主要有以下几个功能:
1、工作图的调度执行和管理
2、资源管理(Leader、Gateway、心跳等)
3、任务管理
4、调度分配
5、BackPressure控制
这里重点介绍一下作业图的调度管理,在JobMaster的构造函数里,会把大量的相关服务注册进来,同时得到JobGraph的ID,同时拿到Leader的信息。当然其它的一些基本的状态和管理数据结构也会根据配置文件等进行创建。其中最典型的是createSlotPool和createScheduler等,详细的内容可以看一下上面的构造函数的代码。需要注意的是,这里就包含上面提到的ExecutionGraph,如果仔细看,会发现这个变量几乎贯穿了整个JobMaster这个类,下面会重点分析一下这个变量的创建和使用。
在JobMaster中最主要的就是干了两件事,一个是JobGraph(通过ExecutionGraph)的处理分配,另外一个就是监听并处理分配任务的结果及状态。为了提高处理的效率,这里肯定要使用异步的通信机制了,所以这里要把CompletableFuture和CompletionStage这两个JAVA的基础类的用法搞清楚。
在成员函数里可以看到开始就有start,suspend,onStop,cancel,stop这几个最基础的控制接口。完成的功能也相对来说简单,启动里启动RPC服务,异步启动工作执行,在取消和停止里可以看到对ExecutionGraph的相关操作。在rescaleJob和rescaleOperators中,涉及到了JobVertex,在前面提到过,它是多个operator组成的。它和ExecutionJobVertex对应,而其又和ExecutionVertex相对应。或者这样来理解,为了提高异步的并行度,每个JobGraph对应着并行化ExecutionGraph,它是JobMaster最主要的数据结构和功能。而每个ExecutionVertex是ExecutionJobVertex的一个并发的子任务。
在rescaleOperators,一个重要的部分是重新扩展的动作,会引起检查点和保存点的重新处理,以保证数据流计算的安全性和及时性。而一下updateTaskExecutionState由后面的Task相关来进行更新,requestNextInputSplit函数则获得下一个Task的split.在这个函数内部,得到数据后,得调用Execution按照尝试进行执行,需要注意的是作业和任务之间通过ExecutionAttemptID 来进行联系。再向后是两个检查点相关的函数declineCheckpoint和acknowledgeCheckpoint。
在JobMaster中,还有一个KvStateRegistryGateway的接口的相关实现,它是从JobMasterService继承下来,其实就是一个对象的查找表,看名字也可以大致猜得出来。查找以后进行通知动作。下面的一些槽的处理和相关动作略过,看一下registerTaskManager这个函数,它主要是通过RPC来实现槽池的注册。而requestJobDetails这个函数,则是前面提到的通过具体的实时控制来得到阶段性的状态信息(最后完成才叫结果)。
startCheckpointScheduler检查点调度触发。startJobExecution,内部的JOB启动执行。startJobMasterServices启动JOB相关的服务。setNewFencingToken标记令牌和ID的绑定。assignExecutionGraph,分配执行图,等于是给定了自定义的执行方式。resetAndScheduleExecutionGraph这个函数用来重新规划和调度执行图,这个非常有用,在实际情况中可能会不断的重新执行某一段作业标记。
scheduleExecutionGraph,这个把真正的作业图调度起来,供任务使用。createAndRestoreExecutionGraph,createExecutionGraph,这几个都是对ExecutionGraph的管理和调度。JobStatusChanged这个从字面上就可以看出是作业状态的变化,它由ExecutionGraph进行控制,具体的可以看一下JobStatus这个枚举体的定义。
restoreExecutionGraphFromRescalingSavepoint,恢复保存点。rescaleJobGraph这个函数被rescaleOperators调用,重新处理操作的并行性。
从这上面的分析来看,requestNextInputSplit和scheduleExecutionGraph这两个函数,基本上把ExecutionGraph和Task挂接起来,而rescaleOperators则是提供了一个非常灵活的调用机制。
三、总结
该系统通过其高效的JobMaster组件成功实现了作业图从作业转换至任务之间的信息传输。值得注意的是,在Java语言的学习过程中仍存在诸多挑战。由于Java面向对象编程的特点导致其继承机制复杂且不易于查找与定位。至此,在整个作业转换流程中已完成关键信息包的有效传递。接下来将进入任务启动阶段,并展开一系列后续处理工作。
