Home Flink1.15 Dispatcher源码分析
Post
Cancel

Flink1.15 Dispatcher源码分析

AM启动类YarnApplicationClusterEntryPoint
YarnApplicationClusterEntryPoint.main
ClusterEntrypoint.runClusterEntrypoint
ClusterEntrypoint.startCluster
ClusterEntrypoint.runCluster

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
private void runCluster(Configuration configuration, PluginManager pluginManager)
throws Exception {
    synchronized (lock) {
        initializeServices(configuration, pluginManager);

        // write host information into configuration
        configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
        configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());
        //创建dispatcherResourceManagerComponentFactory
        final DispatcherResourceManagerComponentFactory
        dispatcherResourceManagerComponentFactory =
        createDispatcherResourceManagerComponentFactory(configuration);
        //创建DispatcherResourceManagerComponent
        clusterComponent =
        dispatcherResourceManagerComponentFactory.create(
            configuration,
            resourceId.unwrap(),
            ioExecutor,
            commonRpcService,
            haServices,
            blobServer,
            heartbeatServices,
            metricRegistry,
            executionGraphInfoStore,
            new RpcMetricQueryServiceRetriever(
                metricRegistry.getMetricQueryServiceRpcService()),
            this);

        clusterComponent
        .getShutDownFuture()
        .whenComplete(
            (ApplicationStatus applicationStatus, Throwable throwable) -> {
                if (throwable != null) {
                    shutDownAsync(
                        ApplicationStatus.UNKNOWN,
                        ShutdownBehaviour.GRACEFUL_SHUTDOWN,
                        ExceptionUtils.stringifyException(throwable),
                        false);
                } else {
                    // This is the general shutdown path. If a separate more
                    // specific shutdown was
                    // already triggered, this will do nothing
                    shutDownAsync(
                        applicationStatus,
                        ShutdownBehaviour.GRACEFUL_SHUTDOWN,
                        null,
                        true);
                }
            });
    }
}

createDispatcherResourceManagerComponentFactory这里用的是DefaultDispatcherResourceManagerComponentFactory
另外注意下这里的参数SessionDispatcherFactory.INSTANCE

1
2
3
4
5
6
7
8
9
10
//注意下SessionDispatcherFactory
protected DispatcherResourceManagerComponentFactory
createDispatcherResourceManagerComponentFactory(final Configuration configuration) {
    return new DefaultDispatcherResourceManagerComponentFactory(
        new DefaultDispatcherRunnerFactory(
            ApplicationDispatcherLeaderProcessFactoryFactory.create(
                configuration, SessionDispatcherFactory.INSTANCE, program)),
        resourceManagerFactory,
        JobRestEndpointFactory.INSTANCE);
}

SessionDispatcherFactory后边生成dispatcher是StandaloneDispatcher

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public enum SessionDispatcherFactory implements DispatcherFactory {
    INSTANCE;

    @Override
    public StandaloneDispatcher createDispatcher(
        RpcService rpcService,
        DispatcherId fencingToken,
        Collection<JobGraph> recoveredJobs,
        Collection<JobResult> recoveredDirtyJobResults,
        DispatcherBootstrapFactory dispatcherBootstrapFactory,
        PartialDispatcherServicesWithJobPersistenceComponents
        partialDispatcherServicesWithJobPersistenceComponents)
    throws Exception {
        // create the default dispatcher
        return new StandaloneDispatcher(
            rpcService,
            fencingToken,
            recoveredJobs,
            recoveredDirtyJobResults,
            dispatcherBootstrapFactory,
            DispatcherServices.from(
                partialDispatcherServicesWithJobPersistenceComponents,
                JobMasterServiceLeadershipRunnerFactory.INSTANCE,
                CheckpointResourcesCleanupRunnerFactory.INSTANCE));
    }
}

DefaultDispatcherResourceManagerComponentFactory.create
启动dispatcher
启动RM

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
log.debug("Starting Dispatcher.");
dispatcherRunner =
dispatcherRunnerFactory.createDispatcherRunner(
    highAvailabilityServices.getDispatcherLeaderElectionService(),
    fatalErrorHandler,
    new HaServicesJobPersistenceComponentFactory(highAvailabilityServices),
    ioExecutor,
    rpcService,
    partialDispatcherServices);

log.debug("Starting ResourceManagerService.");
resourceManagerService.start();

resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);
dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);

return new DispatcherResourceManagerComponent(
    dispatcherRunner,
    resourceManagerService,
    dispatcherLeaderRetrievalService,
    resourceManagerRetrievalService,
    webMonitorEndpoint,
    fatalErrorHandler,
    dispatcherOperationCaches);

DefaultDispatcherRunnerFactory.createDispatcherRunner

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class DefaultDispatcherRunnerFactory implements DispatcherRunnerFactory {
    private final DispatcherLeaderProcessFactoryFactory dispatcherLeaderProcessFactoryFactory;

    public DefaultDispatcherRunnerFactory(
        DispatcherLeaderProcessFactoryFactory dispatcherLeaderProcessFactoryFactory) {
        this.dispatcherLeaderProcessFactoryFactory = dispatcherLeaderProcessFactoryFactory;
    }
    //application
    @Override
    public DispatcherRunner createDispatcherRunner(
        LeaderElectionService leaderElectionService,
        FatalErrorHandler fatalErrorHandler,
        JobPersistenceComponentFactory jobPersistenceComponentFactory,
        Executor ioExecutor,
        RpcService rpcService,
        PartialDispatcherServices partialDispatcherServices)
    throws Exception {
        //SessionDispatcherLeaderProcessFactory
        final DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory =
        dispatcherLeaderProcessFactoryFactory.createFactory(
            jobPersistenceComponentFactory,
            ioExecutor,
            rpcService,
            partialDispatcherServices,
            fatalErrorHandler);

        return DefaultDispatcherRunner.create(
            leaderElectionService, fatalErrorHandler, dispatcherLeaderProcessFactory);
    }
    //session 
    public static DefaultDispatcherRunnerFactory createSessionRunner(
        DispatcherFactory dispatcherFactory) {
        return new DefaultDispatcherRunnerFactory(
            SessionDispatcherLeaderProcessFactoryFactory.create(dispatcherFactory));
    }
    //job
    public static DefaultDispatcherRunnerFactory createJobRunner(
        JobGraphRetriever jobGraphRetriever) {
        return new DefaultDispatcherRunnerFactory(
            JobDispatcherLeaderProcessFactoryFactory.create(jobGraphRetriever));
    }
}

application模式
ApplicationDispatcherLeaderProcessFactoryFactory.createDispatcherRunner

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public DispatcherRunner createDispatcherRunner(
    LeaderElectionService leaderElectionService,
    FatalErrorHandler fatalErrorHandler,
    JobPersistenceComponentFactory jobPersistenceComponentFactory,
    Executor ioExecutor,
    RpcService rpcService,
    PartialDispatcherServices partialDispatcherServices)
throws Exception {

    final DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory =
    dispatcherLeaderProcessFactoryFactory.createFactory(
        jobPersistenceComponentFactory,
        ioExecutor,
        rpcService,
        partialDispatcherServices,
        fatalErrorHandler);

    return DefaultDispatcherRunner.create(
        leaderElectionService, fatalErrorHandler, dispatcherLeaderProcessFactory);
}

DefaultDispatcherRunner实现LeaderContender选举回调grantLeadership

1
2
3
4
5
6
7
8
9
10
11
public void grantLeadership(UUID leaderSessionID) {
    runActionIfRunning(
        () -> {
            LOG.info(
                "{} was granted leadership with leader id {}. Creating new {}.",
                getClass().getSimpleName(),
                leaderSessionID,
                DispatcherLeaderProcess.class.getSimpleName());
            startNewDispatcherLeaderProcess(leaderSessionID);
        });
}

DefaultDispatcherRunner.startNewDispatcherLeaderProcess

1
2
3
4
5
6
7
8
9
10
private void startNewDispatcherLeaderProcess(UUID leaderSessionID) {
    stopDispatcherLeaderProcess();

    dispatcherLeaderProcess = createNewDispatcherLeaderProcess(leaderSessionID);

    final DispatcherLeaderProcess newDispatcherLeaderProcess = dispatcherLeaderProcess;
    FutureUtils.assertNoException(
        previousDispatcherLeaderProcessTerminationFuture.thenRun(
            newDispatcherLeaderProcess::start));
}

AbstractDispatcherLeaderProcess.start
AbstractDispatcherLeaderProcess.startInternal

1
2
3
4
5
6
7
8
9
public final void start() {
    runIfStateIs(State.CREATED, this::startInternal);
}

private void startInternal() {
    log.info("Start {}.", getClass().getSimpleName());
    state = State.RUNNING;
    onStart();
}

SessionDispatcherLeaderProcess.onStart()

1
2
3
4
5
6
protected void onStart() {
    startServices();

    onGoingRecoveryOperation =
    createDispatcherBasedOnRecoveredJobGraphsAndRecoveredDirtyJobResults();
}

createDispatcherBasedOnRecoveredJobGraphsAndRecoveredDirtyJobResults

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
    private CompletableFuture<Void>
            createDispatcherBasedOnRecoveredJobGraphsAndRecoveredDirtyJobResults() {
        final CompletableFuture<Collection<JobResult>> dirtyJobsFuture =
                CompletableFuture.supplyAsync(this::getDirtyJobResultsIfRunning, ioExecutor);

        return dirtyJobsFuture
                .thenApplyAsync(
                        dirtyJobs ->
                                this.recoverJobsIfRunning(
                                        dirtyJobs.stream()
                                                .map(JobResult::getJobId)
                                                .collect(Collectors.toSet())),
                        ioExecutor)
                .thenAcceptBoth(dirtyJobsFuture, this::createDispatcherIfRunning)
                .handle(this::onErrorIfRunning);
    }

SessionDispatcherLeaderProcess.createDispatcherIfRunning
SessionDispatcherLeaderProcess.createDispatcher

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private void createDispatcherIfRunning(
    Collection<JobGraph> jobGraphs, Collection<JobResult> recoveredDirtyJobResults) {
    runIfStateIs(State.RUNNING, () -> createDispatcher(jobGraphs, recoveredDirtyJobResults));
}

private void createDispatcher(
    Collection<JobGraph> jobGraphs, Collection<JobResult> recoveredDirtyJobResults) {

    final DispatcherGatewayService dispatcherService =
    dispatcherGatewayServiceFactory.create(
        DispatcherId.fromUuid(getLeaderSessionId()),
        jobGraphs,
        recoveredDirtyJobResults,
        jobGraphStore,
        jobResultStore);

    completeDispatcherSetup(dispatcherService);
}

ApplicationDispatcherGatewayServiceFactory.create

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public AbstractDispatcherLeaderProcess.DispatcherGatewayService create(
    DispatcherId fencingToken,
    Collection<JobGraph> recoveredJobs,
    Collection<JobResult> recoveredDirtyJobResults,
    JobGraphWriter jobGraphWriter,
    JobResultStore jobResultStore) {

    final List<JobID> recoveredJobIds = getRecoveredJobIds(recoveredJobs);

    final Dispatcher dispatcher;
    try {
        dispatcher =
        dispatcherFactory.createDispatcher(
            rpcService,
            fencingToken,
            recoveredJobs,
            recoveredDirtyJobResults,
            (dispatcherGateway, scheduledExecutor, errorHandler) ->
            new ApplicationDispatcherBootstrap(
                application,
                recoveredJobIds,
                configuration,
                dispatcherGateway,
                scheduledExecutor,
                errorHandler),
            PartialDispatcherServicesWithJobPersistenceComponents.from(
                partialDispatcherServices, jobGraphWriter, jobResultStore));
    } catch (Exception e) {
        throw new FlinkRuntimeException("Could not create the Dispatcher rpc endpoint.", e);
    }

    dispatcher.start();

    return DefaultDispatcherGatewayService.from(dispatcher);
}

这里SessionDispatcherFactory生成StandaloneDispatcher

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public enum SessionDispatcherFactory implements DispatcherFactory {
    INSTANCE;

    @Override
    public StandaloneDispatcher createDispatcher(
            RpcService rpcService,
            DispatcherId fencingToken,
            Collection<JobGraph> recoveredJobs,
            Collection<JobResult> recoveredDirtyJobResults,
            DispatcherBootstrapFactory dispatcherBootstrapFactory,
            PartialDispatcherServicesWithJobPersistenceComponents
                    partialDispatcherServicesWithJobPersistenceComponents)
            throws Exception {
        // create the default dispatcher
        return new StandaloneDispatcher(
                rpcService,
                fencingToken,
                recoveredJobs,
                recoveredDirtyJobResults,
                dispatcherBootstrapFactory,
                DispatcherServices.from(
                        partialDispatcherServicesWithJobPersistenceComponents,
                        JobMasterServiceLeadershipRunnerFactory.INSTANCE,
                        CheckpointResourcesCleanupRunnerFactory.INSTANCE));
    }
}

StandaloneDispatcher是Dispatcher的子类,本身什么也没干,逻辑都在Dispatcher

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class StandaloneDispatcher extends Dispatcher {
    public StandaloneDispatcher(
            RpcService rpcService,
            DispatcherId fencingToken,
            Collection<JobGraph> recoveredJobs,
            Collection<JobResult> recoveredDirtyJobResults,
            DispatcherBootstrapFactory dispatcherBootstrapFactory,
            DispatcherServices dispatcherServices)
            throws Exception {
        super(
                rpcService,
                fencingToken,
                recoveredJobs,
                recoveredDirtyJobResults,
                dispatcherBootstrapFactory,
                dispatcherServices);
    }
}

Dispatcher继承了RpcEndpoint,start方法

1
2
3
public final void start() {
    rpcServer.start();
}

启动RPC后会调用onStart

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public void onStart() throws Exception {
    try {
        startDispatcherServices();
    } catch (Throwable t) {
        final DispatcherException exception =
                new DispatcherException(
                        String.format("Could not start the Dispatcher %s", getAddress()), t);
        onFatalError(exception);
        throw exception;
    }

    startCleanupRetries();
    //
    startRecoveredJobs();

    this.dispatcherBootstrap =
            this.dispatcherBootstrapFactory.create(
                    getSelfGateway(DispatcherGateway.class),
                    this.getRpcService().getScheduledExecutor(),
                    this::onFatalError);
}

主要看startRecoveredJobs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private void startRecoveredJobs() {
    for (JobGraph recoveredJob : recoveredJobs) {
        runRecoveredJob(recoveredJob);
    }
    recoveredJobs.clear();
}

private void runRecoveredJob(final JobGraph recoveredJob) {
    checkNotNull(recoveredJob);
    try {
        runJob(createJobMasterRunner(recoveredJob), ExecutionType.RECOVERY);
    } catch (Throwable throwable) {
        onFatalError(
                new DispatcherException(
                        String.format(
                                "Could not start recovered job %s.", recoveredJob.getJobID()),
                        throwable));
    }
}
This post is licensed under CC BY 4.0 by the author.