Home Flink1.15 JobMaster源码分析
Post
Cancel

Flink1.15 JobMaster源码分析

启动流程

启动流程

dispatcher.start();
调用start后回调onStart

1
2
3
    public final void start() {
        rpcServer.start();
    }

StandaloneDispatcher继承自Dispatcher

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class StandaloneDispatcher extends Dispatcher {
    public StandaloneDispatcher(
            RpcService rpcService,
            DispatcherId fencingToken,
            Collection<JobGraph> recoveredJobs,
            Collection<JobResult> recoveredDirtyJobResults,
            DispatcherBootstrapFactory dispatcherBootstrapFactory,
            DispatcherServices dispatcherServices)
            throws Exception {
        super(
                rpcService,
                fencingToken,
                recoveredJobs,
                recoveredDirtyJobResults,
                dispatcherBootstrapFactory,
                dispatcherServices);
    }
}

Dispatcher.onStart

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public void onStart() throws Exception {
    try {
        startDispatcherServices();
    } catch (Throwable t) {
        final DispatcherException exception =
        new DispatcherException(
            String.format("Could not start the Dispatcher %s", getAddress()), t);
        onFatalError(exception);
        throw exception;
    }
    // startRecoveredJobs
    startRecoveredJobs();
    //dispatcher.start
    //ApplicationDispatcherGatewayServiceFactory.create()
    this.dispatcherBootstrap =
    this.dispatcherBootstrapFactory.create(
        getSelfGateway(DispatcherGateway.class),
        this.getRpcService().getScheduledExecutor(),
        this::onFatalError);
}
//Dispatcher
private void startRecoveredJobs() {
    for (JobGraph recoveredJob : recoveredJobs) {
        runRecoveredJob(recoveredJob);
    }
    recoveredJobs.clear();
}

private void runRecoveredJob(final JobGraph recoveredJob) {
    checkNotNull(recoveredJob);
    FutureUtils.assertNoException(
        runJob(recoveredJob).handle(handleRecoveredJobStartError(recoveredJob.getJobID())));
}
//
private CompletableFuture<Void> runJob(JobGraph jobGraph) {
    Preconditions.checkState(!jobManagerRunnerFutures.containsKey(jobGraph.getJobID()));
    //创建JobManagerRunner
    final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture =
    createJobManagerRunner(jobGraph);

    jobManagerRunnerFutures.put(jobGraph.getJobID(), jobManagerRunnerFuture);
    //startJobManagerRunner内部执行 jobManagerRunner.start();启动
    return jobManagerRunnerFuture
    .thenApply(FunctionUtils.uncheckedFunction(this::startJobManagerRunner))
    .thenApply(FunctionUtils.nullFn())
    .whenCompleteAsync(
        (ignored, throwable) -> {
            if (throwable != null) {
                jobManagerRunnerFutures.remove(jobGraph.getJobID());
            }
        },
        getMainThreadExecutor());
}

createJobManagerRunner
jobManagerRunnerFactory是JobMasterServiceLeadershipRunnerFactory

1
2
3
4
5
6
7
8
9
10
11
12
13
    private JobManagerRunner createJobMasterRunner(JobGraph jobGraph) throws Exception {
        Preconditions.checkState(!jobManagerRunnerRegistry.isRegistered(jobGraph.getJobID()));
        return jobManagerRunnerFactory.createJobManagerRunner(
                jobGraph,
                configuration,
                getRpcService(),
                highAvailabilityServices,
                heartbeatServices,
                jobManagerSharedServices,
                new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
                fatalErrorHandler,
                System.currentTimeMillis());
    }

前面讲dispatcher的时候提到 DispatcherServices.from的参数其中之一是JobMasterServiceLeadershipRunnerFactory.INSTANCE
也就是这里的jobManagerRunnerFactory

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public enum SessionDispatcherFactory implements DispatcherFactory {
INSTANCE;

@Override
public StandaloneDispatcher createDispatcher(
    RpcService rpcService,
    DispatcherId fencingToken,
    Collection<JobGraph> recoveredJobs,
    Collection<JobResult> recoveredDirtyJobResults,
    DispatcherBootstrapFactory dispatcherBootstrapFactory,
    PartialDispatcherServicesWithJobPersistenceComponents
    partialDispatcherServicesWithJobPersistenceComponents)
throws Exception {
    // create the default dispatcher
    return new StandaloneDispatcher(
        rpcService,
        fencingToken,
        recoveredJobs,
        recoveredDirtyJobResults,
        dispatcherBootstrapFactory,
        DispatcherServices.from(
            partialDispatcherServicesWithJobPersistenceComponents,
            JobMasterServiceLeadershipRunnerFactory.INSTANCE,
            CheckpointResourcesCleanupRunnerFactory.INSTANCE));
}
}

JobMasterServiceLeadershipRunnerFactory.createJobMasterRunner

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
public JobManagerRunner createJobManagerRunner(
        JobGraph jobGraph,
        Configuration configuration,
        RpcService rpcService,
        HighAvailabilityServices highAvailabilityServices,
        HeartbeatServices heartbeatServices,
        JobManagerSharedServices jobManagerServices,
        JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
        FatalErrorHandler fatalErrorHandler,
        long initializationTimestamp)
        throws Exception {

    checkArgument(jobGraph.getNumberOfVertices() > 0, "The given job is empty");

    final JobMasterConfiguration jobMasterConfiguration =
            JobMasterConfiguration.fromConfiguration(configuration);

    final JobResultStore jobResultStore = highAvailabilityServices.getJobResultStore();

    final LeaderElectionService jobManagerLeaderElectionService =
            highAvailabilityServices.getJobManagerLeaderElectionService(jobGraph.getJobID());

    final SlotPoolServiceSchedulerFactory slotPoolServiceSchedulerFactory =
            DefaultSlotPoolServiceSchedulerFactory.fromConfiguration(
                    configuration, jobGraph.getJobType());

    if (jobMasterConfiguration.getConfiguration().get(JobManagerOptions.SCHEDULER_MODE)
            == SchedulerExecutionMode.REACTIVE) {
        Preconditions.checkState(
                slotPoolServiceSchedulerFactory.getSchedulerType()
                        == JobManagerOptions.SchedulerType.Adaptive,
                "Adaptive Scheduler is required for reactive mode");
    }

    final LibraryCacheManager.ClassLoaderLease classLoaderLease =
            jobManagerServices
                    .getLibraryCacheManager()
                    .registerClassLoaderLease(jobGraph.getJobID());

    final ClassLoader userCodeClassLoader =
            classLoaderLease
                    .getOrResolveClassLoader(
                            jobGraph.getUserJarBlobKeys(), jobGraph.getClasspaths())
                    .asClassLoader();

    final DefaultJobMasterServiceFactory jobMasterServiceFactory =
            new DefaultJobMasterServiceFactory(
                    jobManagerServices.getIoExecutor(),
                    rpcService,
                    jobMasterConfiguration,
                    jobGraph,
                    highAvailabilityServices,
                    slotPoolServiceSchedulerFactory,
                    jobManagerServices,
                    heartbeatServices,
                    jobManagerJobMetricGroupFactory,
                    fatalErrorHandler,
                    userCodeClassLoader,
                    initializationTimestamp);

    final DefaultJobMasterServiceProcessFactory jobMasterServiceProcessFactory =
            new DefaultJobMasterServiceProcessFactory(
                    jobGraph.getJobID(),
                    jobGraph.getName(),
                    jobGraph.getCheckpointingSettings(),
                    initializationTimestamp,
                    jobMasterServiceFactory);

    return new JobMasterServiceLeadershipRunner(
            jobMasterServiceProcessFactory,
            jobManagerLeaderElectionService,
            jobResultStore,
            classLoaderLease,
            fatalErrorHandler);
}

createJobManagerRunner方法有点长,主要是生成一个JobMasterServiceLeadershipRunner
JobMasterServiceLeadershipRunner实现了选举接口LeaderContender,看选举接口回调方法grantLeadership

1
2
3
4
5
    public void grantLeadership(UUID leaderSessionID) {
        runIfStateRunning(
                () -> startJobMasterServiceProcessAsync(leaderSessionID),
                "starting a new JobMasterServiceProcess");
    }

实现 LeaderContender 接口,选举成功回调方法 void grantLeadership(UUID leaderSessionID);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//JobManagerRunnerImpl
public void grantLeadership(final UUID leaderSessionID) {
    synchronized (lock) {
        if (shutdown) {
            log.debug(
                "JobManagerRunner cannot be granted leadership because it is already shut down.");
            return;
        }

        leadershipOperation =
        leadershipOperation.thenCompose(
            (ignored) -> {
                synchronized (lock) {
                    //startJobMaster
                    return verifyJobSchedulingStatusAndStartJobManager(
                        leaderSessionID);
                }
            });

        handleException(leadershipOperation, "Could not start the job manager.");
    }
}

verifyJobSchedulingStatusAndStartJobManager
jobMasterServiceProcessFactory来创建jobMasterServiceProcess

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private void verifyJobSchedulingStatusAndCreateJobMasterServiceProcess(UUID leaderSessionId)
throws FlinkException {
    try {
        if (jobResultStore.hasJobResultEntry(getJobID())) {
            jobAlreadyDone();
        } else {
            createNewJobMasterServiceProcess(leaderSessionId);
        }
    } catch (IOException e) {
        throw new FlinkException(
            String.format(
                "Could not retrieve the job scheduling status for job %s.", getJobID()),
            e);
    }
}
private void createNewJobMasterServiceProcess(UUID leaderSessionId) throws FlinkException {
    Preconditions.checkState(jobMasterServiceProcess.closeAsync().isDone());

    LOG.debug(
        "Create new JobMasterServiceProcess because we were granted leadership under {}.",
        leaderSessionId);

    jobMasterServiceProcess = jobMasterServiceProcessFactory.create(leaderSessionId);

    forwardIfValidLeader(
        leaderSessionId,
        jobMasterServiceProcess.getJobMasterGatewayFuture(),
        jobMasterGatewayFuture,
        "JobMasterGatewayFuture from JobMasterServiceProcess");
    forwardResultFuture(leaderSessionId, jobMasterServiceProcess.getResultFuture());
    confirmLeadership(leaderSessionId, jobMasterServiceProcess.getLeaderAddressFuture());
}

DefaultJobMasterServiceProcessFactory

1
2
3
4
5
6
7
public JobMasterServiceProcess create(UUID leaderSessionId) {
    return new DefaultJobMasterServiceProcess(
        jobId,
        leaderSessionId,
        jobMasterServiceFactory,
        cause -> createArchivedExecutionGraph(JobStatus.FAILED, cause));
}

DefaultJobMasterServiceProcess
jobMasterServiceFactory来创建JobMaster

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public DefaultJobMasterServiceProcess(
    JobID jobId,
    UUID leaderSessionId,
    JobMasterServiceFactory jobMasterServiceFactory,
    Function<Throwable, ArchivedExecutionGraph> failedArchivedExecutionGraphFactory) {
    this.jobId = jobId;
    this.leaderSessionId = leaderSessionId;
  
    this.jobMasterServiceFuture =
    jobMasterServiceFactory.createJobMasterService(leaderSessionId, this);

    jobMasterServiceFuture.whenComplete(
        (jobMasterService, throwable) -> {
            if (throwable != null) {
                final JobInitializationException jobInitializationException =
                new JobInitializationException(
                    jobId, "Could not start the JobMaster.", throwable);

                LOG.debug(
                    "Initialization of the JobMasterService for job {} under leader id {} failed.",
                    jobId,
                    leaderSessionId,
                    jobInitializationException);

                resultFuture.complete(
                    JobManagerRunnerResult.forInitializationFailure(
                        new ExecutionGraphInfo(
                            failedArchivedExecutionGraphFactory.apply(
                                jobInitializationException)),
                        jobInitializationException));
            } else {
                registerJobMasterServiceFutures(jobMasterService);
            }
        });
}

DefaultJobMasterServiceFactory.createJobMasterService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public CompletableFuture<JobMasterService> createJobMasterService(
    UUID leaderSessionId, OnCompletionActions onCompletionActions) {

    return CompletableFuture.supplyAsync(
        FunctionUtils.uncheckedSupplier(
            () -> internalCreateJobMasterService(leaderSessionId, onCompletionActions)),
        executor);
}

private JobMasterService internalCreateJobMasterService(
    UUID leaderSessionId, OnCompletionActions onCompletionActions) throws Exception {

    final JobMaster jobMaster =
    new JobMaster(
        rpcService,
        JobMasterId.fromUuidOrNull(leaderSessionId),
        jobMasterConfiguration,
        ResourceID.generate(),
        jobGraph,
        haServices,
        slotPoolServiceSchedulerFactory,
        jobManagerSharedServices,
        heartbeatServices,
        jobManagerJobMetricGroupFactory,
        onCompletionActions,
        fatalErrorHandler,
        userCodeClassloader,
        shuffleMaster,
        lookup ->
        new JobMasterPartitionTrackerImpl(
            jobGraph.getJobID(), shuffleMaster, lookup),
        new DefaultExecutionDeploymentTracker(),
        DefaultExecutionDeploymentReconciler::new,
        initializationTimestamp);

    jobMaster.start();

    return jobMaster;
}
}

JobMaster

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public CompletableFuture<Acknowledge> start(final JobMasterId newJobMasterId) throws Exception {
    // make sure we receive RPC and async calls
    start();

    return callAsyncWithoutFencing(
        () -> startJobExecution(newJobMasterId), RpcUtils.INF_TIMEOUT);
}

private Acknowledge startJobExecution(JobMasterId newJobMasterId) throws Exception {

    validateRunsInMainThread();

    checkNotNull(newJobMasterId, "The new JobMasterId must not be null.");

    if (Objects.equals(getFencingToken(), newJobMasterId)) {
        log.info("Already started the job execution with JobMasterId {}.", newJobMasterId);

        return Acknowledge.get();
    }

    setNewFencingToken(newJobMasterId);
    //slotPool.start
    //scheduler.start
    //reconnectToResourceManager
    startJobMasterServices();

    log.info(
        "Starting execution of job {} ({}) under job master id {}.",
        jobGraph.getName(),
        jobGraph.getJobID(),
        newJobMasterId);
    //
    resetAndStartScheduler();

    return Acknowledge.get();
}

JobMaster.startJobMasterServices

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private void startJobMasterServices() throws Exception {
    try {
        //task 是HeartbeatManagerSenderImpl 主动发送心跳请求
        this.taskManagerHeartbeatManager = createTaskManagerHeartbeatManager(heartbeatServices);
        //HeartbeatManagerImpl,实现monitorTarget方法供对端调用
        this.resourceManagerHeartbeatManager =
                createResourceManagerHeartbeatManager(heartbeatServices);

        // start the slot pool make sure the slot pool now accepts messages for this leader
        slotPoolService.start(getFencingToken(), getAddress(), getMainThreadExecutor());

        // job is ready to go, try to establish connection with resource manager
        //   - activate leader retrieval for the resource manager
        //   - on notification of the leader, the connection will be established and
        //     the slot pool will start requesting slots
        resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
    } catch (Exception e) {
        handleStartJobMasterServicesError(e);
    }
}

对RM来说,需要创建Sender同时给JM,TM发送心跳

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//ResourceManager
private void startHeartbeatServices() {
    taskManagerHeartbeatManager =
            heartbeatServices.createHeartbeatManagerSender(
                    resourceId,
                    new TaskManagerHeartbeatListener(),
                    getMainThreadExecutor(),
                    log);

    jobManagerHeartbeatManager =
            heartbeatServices.createHeartbeatManagerSender(
                    resourceId,
                    new JobManagerHeartbeatListener(),
                    getMainThreadExecutor(),
                    log);
}

对JM来说需要创建Sender对TM主动发送心跳,反向就是HeartbeatReceiver,
tm内部有ResourceManagerHeartbeatReceiver ,JobManagerHeartbeatReceiver
JM内部也有一个ResourceManagerHeartbeatReceiver

1
2
3
4
5
6
//JobMaster
private HeartbeatManager<TaskExecutorToJobManagerHeartbeatPayload, AllocatedSlotReport>
        createTaskManagerHeartbeatManager(HeartbeatServices heartbeatServices) {
    return heartbeatServices.createHeartbeatManagerSender(
            resourceId, new TaskManagerHeartbeatListener(), getMainThreadExecutor(), log);

1.心跳管理taskManagerHeartbeatManagerm,resourceManagerHeartbeatManager

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public <I, O> HeartbeatManager<I, O> createHeartbeatManagerSender(
        ResourceID resourceId,
        HeartbeatListener<I, O> heartbeatListener,
        ScheduledExecutor mainThreadExecutor,
        Logger log) {
    //具体实现类是HeartbeatManagerSenderImpl
    return new HeartbeatManagerSenderImpl<>(
            heartbeatInterval,
            heartbeatTimeout,
            failedRpcRequestsUntilUnreachable,
            resourceId,
            heartbeatListener,
            mainThreadExecutor,
            log);
}
HeartbeatManagerSenderImpl(
        long heartbeatPeriod,
        long heartbeatTimeout,
        int failedRpcRequestsUntilUnreachable,
        ResourceID ownResourceID,
        HeartbeatListener<I, O> heartbeatListener,
        ScheduledExecutor mainThreadExecutor,
        Logger log,
        HeartbeatMonitor.Factory<O> heartbeatMonitorFactory) {
    super(
            heartbeatTimeout,
            failedRpcRequestsUntilUnreachable,
            ownResourceID,
            heartbeatListener,
            mainThreadExecutor,
            log,
            heartbeatMonitorFactory);

    this.heartbeatPeriod = heartbeatPeriod;
    //交给线程池mainThreadExecutor定时调度,是HeartbeatManagerSenderImpl本身的run方法
    mainThreadExecutor.schedule(this, 0L, TimeUnit.MILLISECONDS);
}


//交给线程池mainThreadExecutor定时调度,是HeartbeatManagerSenderImpl本身的run方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public void run() {
    if (!stopped) {
        //
        log.debug("Trigger heartbeat request.");
        for (HeartbeatMonitor<O> heartbeatMonitor : getHeartbeatTargets().values()) {
            requestHeartbeat(heartbeatMonitor);
        }

        getMainThreadExecutor().schedule(this, heartbeatPeriod, TimeUnit.MILLISECONDS);
    }
}
private void requestHeartbeat(HeartbeatMonitor<O> heartbeatMonitor) {
    O payload = getHeartbeatListener().retrievePayload(heartbeatMonitor.getHeartbeatTargetId());
    final HeartbeatTarget<O> heartbeatTarget = heartbeatMonitor.getHeartbeatTarget();
    //requestHeartbeat由JM,RM对应的GateWa
    heartbeatTarget
            .requestHeartbeat(getOwnResourceID(), payload)
            .whenCompleteAsync(
                    handleHeartbeatRpc(heartbeatMonitor.getHeartbeatTargetId()),
                    getMainThreadExecutor());
}
  1. ResourceManagerLeaderListener
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    
    private class ResourceManagerLeaderListener implements LeaderRetrievalListener {
    
     @Override
     public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
         runAsync(
                 () ->
                         notifyOfNewResourceManagerLeader(
                                 leaderAddress,
                                 ResourceManagerId.fromUuidOrNull(leaderSessionID)));
     }
    
     @Override
     public void handleError(final Exception exception) {
         handleJobMasterError(
                 new Exception("Fatal error in the ResourceManager leader service", exception));
     }
    }
    //连接RM
    private void notifyOfNewResourceManagerLeader(
         final String newResourceManagerAddress, final ResourceManagerId resourceManagerId) {
     resourceManagerAddress =
             createResourceManagerAddress(newResourceManagerAddress, resourceManagerId);
    
     reconnectToResourceManager(
             new FlinkException(
                     String.format(
                             "ResourceManager leader changed to new address %s",
                             resourceManagerAddress)));
    }
    

    向ResourceManager发起连接,也就是注册

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    
     private void reconnectToResourceManager(Exception cause) {
         closeResourceManagerConnection(cause);
         tryConnectToResourceManager();
     }
    
     private void tryConnectToResourceManager() {
         if (resourceManagerAddress != null) {
             connectToResourceManager();
         }
     }
    
     private void connectToResourceManager() {
         assert (resourceManagerAddress != null);
         assert (resourceManagerConnection == null);
         assert (establishedResourceManagerConnection == null);
    
         log.info("Connecting to ResourceManager {}", resourceManagerAddress);
    
         resourceManagerConnection =
                 new ResourceManagerConnection(
                         log,
                         jobGraph.getJobID(),
                         resourceId,
                         getAddress(),
                         getFencingToken(),
                         resourceManagerAddress.getAddress(),
                         resourceManagerAddress.getResourceManagerId(),
                         futureExecutor);
    
         resourceManagerConnection.start();
     }
    

    注册以及成功的回调

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    
     protected RetryingRegistration<
                     ResourceManagerId,
                     ResourceManagerGateway,
                     JobMasterRegistrationSuccess,
                     RegistrationResponse.Rejection>
             generateRegistration() {
         return new RetryingRegistration<
                 ResourceManagerId,
                 ResourceManagerGateway,
                 JobMasterRegistrationSuccess,
                 RegistrationResponse.Rejection>(
                 log,
                 getRpcService(),
                 "ResourceManager",
                 ResourceManagerGateway.class,
                 getTargetAddress(),
                 getTargetLeaderId(),
                 jobMasterConfiguration.getRetryingRegistrationConfiguration()) {
    
             @Override
             protected CompletableFuture<RegistrationResponse> invokeRegistration(
                     ResourceManagerGateway gateway,
                     ResourceManagerId fencingToken,
                     long timeoutMillis) {
                 Time timeout = Time.milliseconds(timeoutMillis);
                 //向RM注册JM
                 return gateway.registerJobMaster(
                         jobMasterId,
                         jobManagerResourceID,
                         jobManagerRpcAddress,
                         jobID,
                         timeout);
             }
         };
     }
    
     @Override
     //注册成功的回调
     protected void onRegistrationSuccess(final JobMasterRegistrationSuccess success) {
         runAsync(
                 () -> {
                     // filter out outdated connections
                     //noinspection ObjectEquality
                     if (this == resourceManagerConnection) {
                         establishResourceManagerConnection(success);
                     }
                 });
     }
    

    CompletableFuture场景,返回cf,等reg注册返回whenCompleteAsync再回调在createNewRegistration内

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    
    public void start() {
     checkState(!closed, "The RPC connection is already closed");
     checkState(
             !isConnected() && pendingRegistration == null,
             "The RPC connection is already started");
     //这里返回一个CF,等后面reg成功在回调onRegistrationSuccess/onRegistrationFailure
     final RetryingRegistration<F, G, S, R> newRegistration = createNewRegistration();
     //注册
     if (REGISTRATION_UPDATER.compareAndSet(this, null, newRegistration)) {
         newRegistration.startRegistration();
     } else {
         // concurrent start operation
         newRegistration.cancel();
     }
    }
    
This post is licensed under CC BY 4.0 by the author.