启动流程
dispatcher.start();
调用start后回调onStart
1
2
3
public final void start() {
rpcServer.start();
}
StandaloneDispatcher继承自Dispatcher
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class StandaloneDispatcher extends Dispatcher {
public StandaloneDispatcher(
RpcService rpcService,
DispatcherId fencingToken,
Collection<JobGraph> recoveredJobs,
Collection<JobResult> recoveredDirtyJobResults,
DispatcherBootstrapFactory dispatcherBootstrapFactory,
DispatcherServices dispatcherServices)
throws Exception {
super(
rpcService,
fencingToken,
recoveredJobs,
recoveredDirtyJobResults,
dispatcherBootstrapFactory,
dispatcherServices);
}
}
Dispatcher.onStart
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public void onStart() throws Exception {
try {
startDispatcherServices();
} catch (Throwable t) {
final DispatcherException exception =
new DispatcherException(
String.format("Could not start the Dispatcher %s", getAddress()), t);
onFatalError(exception);
throw exception;
}
// startRecoveredJobs
startRecoveredJobs();
//dispatcher.start
//ApplicationDispatcherGatewayServiceFactory.create()
this.dispatcherBootstrap =
this.dispatcherBootstrapFactory.create(
getSelfGateway(DispatcherGateway.class),
this.getRpcService().getScheduledExecutor(),
this::onFatalError);
}
//Dispatcher
private void startRecoveredJobs() {
for (JobGraph recoveredJob : recoveredJobs) {
runRecoveredJob(recoveredJob);
}
recoveredJobs.clear();
}
private void runRecoveredJob(final JobGraph recoveredJob) {
checkNotNull(recoveredJob);
FutureUtils.assertNoException(
runJob(recoveredJob).handle(handleRecoveredJobStartError(recoveredJob.getJobID())));
}
//
private CompletableFuture<Void> runJob(JobGraph jobGraph) {
Preconditions.checkState(!jobManagerRunnerFutures.containsKey(jobGraph.getJobID()));
//创建JobManagerRunner
final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture =
createJobManagerRunner(jobGraph);
jobManagerRunnerFutures.put(jobGraph.getJobID(), jobManagerRunnerFuture);
//startJobManagerRunner内部执行 jobManagerRunner.start();启动
return jobManagerRunnerFuture
.thenApply(FunctionUtils.uncheckedFunction(this::startJobManagerRunner))
.thenApply(FunctionUtils.nullFn())
.whenCompleteAsync(
(ignored, throwable) -> {
if (throwable != null) {
jobManagerRunnerFutures.remove(jobGraph.getJobID());
}
},
getMainThreadExecutor());
}
createJobManagerRunner
jobManagerRunnerFactory是JobMasterServiceLeadershipRunnerFactory
1
2
3
4
5
6
7
8
9
10
11
12
13
private JobManagerRunner createJobMasterRunner(JobGraph jobGraph) throws Exception {
Preconditions.checkState(!jobManagerRunnerRegistry.isRegistered(jobGraph.getJobID()));
return jobManagerRunnerFactory.createJobManagerRunner(
jobGraph,
configuration,
getRpcService(),
highAvailabilityServices,
heartbeatServices,
jobManagerSharedServices,
new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
fatalErrorHandler,
System.currentTimeMillis());
}
前面讲dispatcher的时候提到 DispatcherServices.from的参数其中之一是JobMasterServiceLeadershipRunnerFactory.INSTANCE
也就是这里的jobManagerRunnerFactory
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public enum SessionDispatcherFactory implements DispatcherFactory {
INSTANCE;
@Override
public StandaloneDispatcher createDispatcher(
RpcService rpcService,
DispatcherId fencingToken,
Collection<JobGraph> recoveredJobs,
Collection<JobResult> recoveredDirtyJobResults,
DispatcherBootstrapFactory dispatcherBootstrapFactory,
PartialDispatcherServicesWithJobPersistenceComponents
partialDispatcherServicesWithJobPersistenceComponents)
throws Exception {
// create the default dispatcher
return new StandaloneDispatcher(
rpcService,
fencingToken,
recoveredJobs,
recoveredDirtyJobResults,
dispatcherBootstrapFactory,
DispatcherServices.from(
partialDispatcherServicesWithJobPersistenceComponents,
JobMasterServiceLeadershipRunnerFactory.INSTANCE,
CheckpointResourcesCleanupRunnerFactory.INSTANCE));
}
}
JobMasterServiceLeadershipRunnerFactory.createJobMasterRunner
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
public JobManagerRunner createJobManagerRunner(
JobGraph jobGraph,
Configuration configuration,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
JobManagerSharedServices jobManagerServices,
JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
FatalErrorHandler fatalErrorHandler,
long initializationTimestamp)
throws Exception {
checkArgument(jobGraph.getNumberOfVertices() > 0, "The given job is empty");
final JobMasterConfiguration jobMasterConfiguration =
JobMasterConfiguration.fromConfiguration(configuration);
final JobResultStore jobResultStore = highAvailabilityServices.getJobResultStore();
final LeaderElectionService jobManagerLeaderElectionService =
highAvailabilityServices.getJobManagerLeaderElectionService(jobGraph.getJobID());
final SlotPoolServiceSchedulerFactory slotPoolServiceSchedulerFactory =
DefaultSlotPoolServiceSchedulerFactory.fromConfiguration(
configuration, jobGraph.getJobType());
if (jobMasterConfiguration.getConfiguration().get(JobManagerOptions.SCHEDULER_MODE)
== SchedulerExecutionMode.REACTIVE) {
Preconditions.checkState(
slotPoolServiceSchedulerFactory.getSchedulerType()
== JobManagerOptions.SchedulerType.Adaptive,
"Adaptive Scheduler is required for reactive mode");
}
final LibraryCacheManager.ClassLoaderLease classLoaderLease =
jobManagerServices
.getLibraryCacheManager()
.registerClassLoaderLease(jobGraph.getJobID());
final ClassLoader userCodeClassLoader =
classLoaderLease
.getOrResolveClassLoader(
jobGraph.getUserJarBlobKeys(), jobGraph.getClasspaths())
.asClassLoader();
final DefaultJobMasterServiceFactory jobMasterServiceFactory =
new DefaultJobMasterServiceFactory(
jobManagerServices.getIoExecutor(),
rpcService,
jobMasterConfiguration,
jobGraph,
highAvailabilityServices,
slotPoolServiceSchedulerFactory,
jobManagerServices,
heartbeatServices,
jobManagerJobMetricGroupFactory,
fatalErrorHandler,
userCodeClassLoader,
initializationTimestamp);
final DefaultJobMasterServiceProcessFactory jobMasterServiceProcessFactory =
new DefaultJobMasterServiceProcessFactory(
jobGraph.getJobID(),
jobGraph.getName(),
jobGraph.getCheckpointingSettings(),
initializationTimestamp,
jobMasterServiceFactory);
return new JobMasterServiceLeadershipRunner(
jobMasterServiceProcessFactory,
jobManagerLeaderElectionService,
jobResultStore,
classLoaderLease,
fatalErrorHandler);
}
createJobManagerRunner方法有点长,主要是生成一个JobMasterServiceLeadershipRunner
JobMasterServiceLeadershipRunner实现了选举接口LeaderContender,看选举接口回调方法grantLeadership
1
2
3
4
5
public void grantLeadership(UUID leaderSessionID) {
runIfStateRunning(
() -> startJobMasterServiceProcessAsync(leaderSessionID),
"starting a new JobMasterServiceProcess");
}
实现 LeaderContender 接口,选举成功回调方法 void grantLeadership(UUID leaderSessionID);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//JobManagerRunnerImpl
public void grantLeadership(final UUID leaderSessionID) {
synchronized (lock) {
if (shutdown) {
log.debug(
"JobManagerRunner cannot be granted leadership because it is already shut down.");
return;
}
leadershipOperation =
leadershipOperation.thenCompose(
(ignored) -> {
synchronized (lock) {
//startJobMaster
return verifyJobSchedulingStatusAndStartJobManager(
leaderSessionID);
}
});
handleException(leadershipOperation, "Could not start the job manager.");
}
}
verifyJobSchedulingStatusAndStartJobManager
jobMasterServiceProcessFactory来创建jobMasterServiceProcess
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private void verifyJobSchedulingStatusAndCreateJobMasterServiceProcess(UUID leaderSessionId)
throws FlinkException {
try {
if (jobResultStore.hasJobResultEntry(getJobID())) {
jobAlreadyDone();
} else {
createNewJobMasterServiceProcess(leaderSessionId);
}
} catch (IOException e) {
throw new FlinkException(
String.format(
"Could not retrieve the job scheduling status for job %s.", getJobID()),
e);
}
}
private void createNewJobMasterServiceProcess(UUID leaderSessionId) throws FlinkException {
Preconditions.checkState(jobMasterServiceProcess.closeAsync().isDone());
LOG.debug(
"Create new JobMasterServiceProcess because we were granted leadership under {}.",
leaderSessionId);
jobMasterServiceProcess = jobMasterServiceProcessFactory.create(leaderSessionId);
forwardIfValidLeader(
leaderSessionId,
jobMasterServiceProcess.getJobMasterGatewayFuture(),
jobMasterGatewayFuture,
"JobMasterGatewayFuture from JobMasterServiceProcess");
forwardResultFuture(leaderSessionId, jobMasterServiceProcess.getResultFuture());
confirmLeadership(leaderSessionId, jobMasterServiceProcess.getLeaderAddressFuture());
}
DefaultJobMasterServiceProcessFactory
1
2
3
4
5
6
7
public JobMasterServiceProcess create(UUID leaderSessionId) {
return new DefaultJobMasterServiceProcess(
jobId,
leaderSessionId,
jobMasterServiceFactory,
cause -> createArchivedExecutionGraph(JobStatus.FAILED, cause));
}
DefaultJobMasterServiceProcess
jobMasterServiceFactory来创建JobMaster
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public DefaultJobMasterServiceProcess(
JobID jobId,
UUID leaderSessionId,
JobMasterServiceFactory jobMasterServiceFactory,
Function<Throwable, ArchivedExecutionGraph> failedArchivedExecutionGraphFactory) {
this.jobId = jobId;
this.leaderSessionId = leaderSessionId;
this.jobMasterServiceFuture =
jobMasterServiceFactory.createJobMasterService(leaderSessionId, this);
jobMasterServiceFuture.whenComplete(
(jobMasterService, throwable) -> {
if (throwable != null) {
final JobInitializationException jobInitializationException =
new JobInitializationException(
jobId, "Could not start the JobMaster.", throwable);
LOG.debug(
"Initialization of the JobMasterService for job {} under leader id {} failed.",
jobId,
leaderSessionId,
jobInitializationException);
resultFuture.complete(
JobManagerRunnerResult.forInitializationFailure(
new ExecutionGraphInfo(
failedArchivedExecutionGraphFactory.apply(
jobInitializationException)),
jobInitializationException));
} else {
registerJobMasterServiceFutures(jobMasterService);
}
});
}
DefaultJobMasterServiceFactory.createJobMasterService
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public CompletableFuture<JobMasterService> createJobMasterService(
UUID leaderSessionId, OnCompletionActions onCompletionActions) {
return CompletableFuture.supplyAsync(
FunctionUtils.uncheckedSupplier(
() -> internalCreateJobMasterService(leaderSessionId, onCompletionActions)),
executor);
}
private JobMasterService internalCreateJobMasterService(
UUID leaderSessionId, OnCompletionActions onCompletionActions) throws Exception {
final JobMaster jobMaster =
new JobMaster(
rpcService,
JobMasterId.fromUuidOrNull(leaderSessionId),
jobMasterConfiguration,
ResourceID.generate(),
jobGraph,
haServices,
slotPoolServiceSchedulerFactory,
jobManagerSharedServices,
heartbeatServices,
jobManagerJobMetricGroupFactory,
onCompletionActions,
fatalErrorHandler,
userCodeClassloader,
shuffleMaster,
lookup ->
new JobMasterPartitionTrackerImpl(
jobGraph.getJobID(), shuffleMaster, lookup),
new DefaultExecutionDeploymentTracker(),
DefaultExecutionDeploymentReconciler::new,
initializationTimestamp);
jobMaster.start();
return jobMaster;
}
}
JobMaster
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public CompletableFuture<Acknowledge> start(final JobMasterId newJobMasterId) throws Exception {
// make sure we receive RPC and async calls
start();
return callAsyncWithoutFencing(
() -> startJobExecution(newJobMasterId), RpcUtils.INF_TIMEOUT);
}
private Acknowledge startJobExecution(JobMasterId newJobMasterId) throws Exception {
validateRunsInMainThread();
checkNotNull(newJobMasterId, "The new JobMasterId must not be null.");
if (Objects.equals(getFencingToken(), newJobMasterId)) {
log.info("Already started the job execution with JobMasterId {}.", newJobMasterId);
return Acknowledge.get();
}
setNewFencingToken(newJobMasterId);
//slotPool.start
//scheduler.start
//reconnectToResourceManager
startJobMasterServices();
log.info(
"Starting execution of job {} ({}) under job master id {}.",
jobGraph.getName(),
jobGraph.getJobID(),
newJobMasterId);
//
resetAndStartScheduler();
return Acknowledge.get();
}
JobMaster.startJobMasterServices
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private void startJobMasterServices() throws Exception {
try {
//task 是HeartbeatManagerSenderImpl 主动发送心跳请求
this.taskManagerHeartbeatManager = createTaskManagerHeartbeatManager(heartbeatServices);
//HeartbeatManagerImpl,实现monitorTarget方法供对端调用
this.resourceManagerHeartbeatManager =
createResourceManagerHeartbeatManager(heartbeatServices);
// start the slot pool make sure the slot pool now accepts messages for this leader
slotPoolService.start(getFencingToken(), getAddress(), getMainThreadExecutor());
// job is ready to go, try to establish connection with resource manager
// - activate leader retrieval for the resource manager
// - on notification of the leader, the connection will be established and
// the slot pool will start requesting slots
resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
} catch (Exception e) {
handleStartJobMasterServicesError(e);
}
}
对RM来说,需要创建Sender同时给JM,TM发送心跳
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//ResourceManager
private void startHeartbeatServices() {
taskManagerHeartbeatManager =
heartbeatServices.createHeartbeatManagerSender(
resourceId,
new TaskManagerHeartbeatListener(),
getMainThreadExecutor(),
log);
jobManagerHeartbeatManager =
heartbeatServices.createHeartbeatManagerSender(
resourceId,
new JobManagerHeartbeatListener(),
getMainThreadExecutor(),
log);
}
对JM来说需要创建Sender对TM主动发送心跳,反向就是HeartbeatReceiver,
tm内部有ResourceManagerHeartbeatReceiver ,JobManagerHeartbeatReceiver
JM内部也有一个ResourceManagerHeartbeatReceiver
1
2
3
4
5
6
//JobMaster
private HeartbeatManager<TaskExecutorToJobManagerHeartbeatPayload, AllocatedSlotReport>
createTaskManagerHeartbeatManager(HeartbeatServices heartbeatServices) {
return heartbeatServices.createHeartbeatManagerSender(
resourceId, new TaskManagerHeartbeatListener(), getMainThreadExecutor(), log);
1.心跳管理taskManagerHeartbeatManagerm,resourceManagerHeartbeatManager
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public <I, O> HeartbeatManager<I, O> createHeartbeatManagerSender(
ResourceID resourceId,
HeartbeatListener<I, O> heartbeatListener,
ScheduledExecutor mainThreadExecutor,
Logger log) {
//具体实现类是HeartbeatManagerSenderImpl
return new HeartbeatManagerSenderImpl<>(
heartbeatInterval,
heartbeatTimeout,
failedRpcRequestsUntilUnreachable,
resourceId,
heartbeatListener,
mainThreadExecutor,
log);
}
HeartbeatManagerSenderImpl(
long heartbeatPeriod,
long heartbeatTimeout,
int failedRpcRequestsUntilUnreachable,
ResourceID ownResourceID,
HeartbeatListener<I, O> heartbeatListener,
ScheduledExecutor mainThreadExecutor,
Logger log,
HeartbeatMonitor.Factory<O> heartbeatMonitorFactory) {
super(
heartbeatTimeout,
failedRpcRequestsUntilUnreachable,
ownResourceID,
heartbeatListener,
mainThreadExecutor,
log,
heartbeatMonitorFactory);
this.heartbeatPeriod = heartbeatPeriod;
//交给线程池mainThreadExecutor定时调度,是HeartbeatManagerSenderImpl本身的run方法
mainThreadExecutor.schedule(this, 0L, TimeUnit.MILLISECONDS);
}
//交给线程池mainThreadExecutor定时调度,是HeartbeatManagerSenderImpl本身的run方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public void run() {
if (!stopped) {
//
log.debug("Trigger heartbeat request.");
for (HeartbeatMonitor<O> heartbeatMonitor : getHeartbeatTargets().values()) {
requestHeartbeat(heartbeatMonitor);
}
getMainThreadExecutor().schedule(this, heartbeatPeriod, TimeUnit.MILLISECONDS);
}
}
private void requestHeartbeat(HeartbeatMonitor<O> heartbeatMonitor) {
O payload = getHeartbeatListener().retrievePayload(heartbeatMonitor.getHeartbeatTargetId());
final HeartbeatTarget<O> heartbeatTarget = heartbeatMonitor.getHeartbeatTarget();
//requestHeartbeat由JM,RM对应的GateWa
heartbeatTarget
.requestHeartbeat(getOwnResourceID(), payload)
.whenCompleteAsync(
handleHeartbeatRpc(heartbeatMonitor.getHeartbeatTargetId()),
getMainThreadExecutor());
}
- ResourceManagerLeaderListener
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
private class ResourceManagerLeaderListener implements LeaderRetrievalListener { @Override public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) { runAsync( () -> notifyOfNewResourceManagerLeader( leaderAddress, ResourceManagerId.fromUuidOrNull(leaderSessionID))); } @Override public void handleError(final Exception exception) { handleJobMasterError( new Exception("Fatal error in the ResourceManager leader service", exception)); } } //连接RM private void notifyOfNewResourceManagerLeader( final String newResourceManagerAddress, final ResourceManagerId resourceManagerId) { resourceManagerAddress = createResourceManagerAddress(newResourceManagerAddress, resourceManagerId); reconnectToResourceManager( new FlinkException( String.format( "ResourceManager leader changed to new address %s", resourceManagerAddress))); }
向ResourceManager发起连接,也就是注册
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
private void reconnectToResourceManager(Exception cause) { closeResourceManagerConnection(cause); tryConnectToResourceManager(); } private void tryConnectToResourceManager() { if (resourceManagerAddress != null) { connectToResourceManager(); } } private void connectToResourceManager() { assert (resourceManagerAddress != null); assert (resourceManagerConnection == null); assert (establishedResourceManagerConnection == null); log.info("Connecting to ResourceManager {}", resourceManagerAddress); resourceManagerConnection = new ResourceManagerConnection( log, jobGraph.getJobID(), resourceId, getAddress(), getFencingToken(), resourceManagerAddress.getAddress(), resourceManagerAddress.getResourceManagerId(), futureExecutor); resourceManagerConnection.start(); }
注册以及成功的回调
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
protected RetryingRegistration< ResourceManagerId, ResourceManagerGateway, JobMasterRegistrationSuccess, RegistrationResponse.Rejection> generateRegistration() { return new RetryingRegistration< ResourceManagerId, ResourceManagerGateway, JobMasterRegistrationSuccess, RegistrationResponse.Rejection>( log, getRpcService(), "ResourceManager", ResourceManagerGateway.class, getTargetAddress(), getTargetLeaderId(), jobMasterConfiguration.getRetryingRegistrationConfiguration()) { @Override protected CompletableFuture<RegistrationResponse> invokeRegistration( ResourceManagerGateway gateway, ResourceManagerId fencingToken, long timeoutMillis) { Time timeout = Time.milliseconds(timeoutMillis); //向RM注册JM return gateway.registerJobMaster( jobMasterId, jobManagerResourceID, jobManagerRpcAddress, jobID, timeout); } }; } @Override //注册成功的回调 protected void onRegistrationSuccess(final JobMasterRegistrationSuccess success) { runAsync( () -> { // filter out outdated connections //noinspection ObjectEquality if (this == resourceManagerConnection) { establishResourceManagerConnection(success); } }); }
CompletableFuture场景,返回cf,等reg注册返回whenCompleteAsync再回调在createNewRegistration内
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
public void start() { checkState(!closed, "The RPC connection is already closed"); checkState( !isConnected() && pendingRegistration == null, "The RPC connection is already started"); //这里返回一个CF,等后面reg成功在回调onRegistrationSuccess/onRegistrationFailure final RetryingRegistration<F, G, S, R> newRegistration = createNewRegistration(); //注册 if (REGISTRATION_UPDATER.compareAndSet(this, null, newRegistration)) { newRegistration.startRegistration(); } else { // concurrent start operation newRegistration.cancel(); } }