Home Flink1.15 Cli提交源码分析
Post
Cancel

Flink1.15 Cli提交源码分析

org.apache.flink.client.cli.CliFrontend#run

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public static void main(final String[] args) {
    EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);

    // 1. find the configuration directory
    final String configurationDirectory = getConfigurationDirectoryFromEnv();

    // 2. load the global configuration
    final Configuration configuration =
    GlobalConfiguration.loadConfiguration(configurationDirectory);

    // 3. load the custom command lines
    final List<CustomCommandLine> customCommandLines =
    loadCustomCommandLines(configuration, configurationDirectory);

    int retCode = 31;
    try {
        final CliFrontend cli = new CliFrontend(configuration, customCommandLines);

        SecurityUtils.install(new SecurityConfiguration(cli.configuration));
        //执行parseAndRun
        retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args));
    } catch (Throwable t) {
        final Throwable strippedThrowable =
        ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
        LOG.error("Fatal error while running command line interface.", strippedThrowable);
        strippedThrowable.printStackTrace();
    } finally {
        System.exit(retCode);
    }
}
public int parseAndRun(String[] args) {

    // check for action
    if (args.length < 1) {
        CliFrontendParser.printHelp(customCommandLines);
        System.out.println("Please specify an action.");
        return 1;
    }

    // get action
    String action = args[0];

    // remove action from parameters
    final String[] params = Arrays.copyOfRange(args, 1, args.length);

    try {
        // do action
        switch (action) {
            case ACTION_RUN:
            run(params);
            return 0;
            case ACTION_RUN_APPLICATION:
            runApplication(params);
            return 0;
        }
}

非application模式 run(params)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
protected void run(String[] args) throws Exception {
    LOG.info("Running 'run' command.");

    final Options commandOptions = CliFrontendParser.getRunCommandOptions();
    final CommandLine commandLine = getCommandLine(commandOptions, args, true);

    // evaluate help flag
    if (commandLine.hasOption(HELP_OPTION.getOpt())) {
        CliFrontendParser.printHelpForRun(customCommandLines);
        return;
    }

    final CustomCommandLine activeCommandLine =
    validateAndGetActiveCommandLine(checkNotNull(commandLine));

    final ProgramOptions programOptions = ProgramOptions.create(commandLine);

    final List<URL> jobJars = getJobJarAndDependencies(programOptions);

    final Configuration effectiveConfiguration =
    getEffectiveConfiguration(activeCommandLine, commandLine, programOptions, jobJars);

    LOG.debug("Effective executor configuration: {}", effectiveConfiguration);

    try (PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration)) {
        executeProgram(effectiveConfiguration, program);
    }
}

executeProgram

1
2
3
4
5
protected void executeProgram(final Configuration configuration, final PackagedProgram program)
throws ProgramInvocationException {
    ClientUtils.executeProgram(
        new DefaultExecutorServiceLoader(), configuration, program, false, false);
}

ClientUtils.executeProgram

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public static void executeProgram(
    PipelineExecutorServiceLoader executorServiceLoader,
    Configuration configuration,
    PackagedProgram program,
    boolean enforceSingleJobExecution,
    boolean suppressSysout)
throws ProgramInvocationException {
    checkNotNull(executorServiceLoader);
    final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
    final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
    try {
        Thread.currentThread().setContextClassLoader(userCodeClassLoader);

        LOG.info(
            "Starting program (detached: {})",
            !configuration.getBoolean(DeploymentOptions.ATTACHED));

        ContextEnvironment.setAsContext(
            executorServiceLoader,
            configuration,
            userCodeClassLoader,
            enforceSingleJobExecution,
            suppressSysout);

        StreamContextEnvironment.setAsContext(
            executorServiceLoader,
            configuration,
            userCodeClassLoader,
            enforceSingleJobExecution,
            suppressSysout);

        try {
            program.invokeInteractiveModeForExecution();
        } finally {
            ContextEnvironment.unsetAsContext();
            StreamContextEnvironment.unsetAsContext();
        }
    } finally {
        Thread.currentThread().setContextClassLoader(contextClassLoader);
    }
}

org.apache.flink.client.program.PackagedProgram#invokeInteractiveModeForExecution

1
2
3
4
5
6
7
8
public void invokeInteractiveModeForExecution() throws ProgramInvocationException {
    FlinkSecurityManager.monitorUserSystemExitForCurrentThread();
    try {
        callMainMethod(mainClass, args);
    } finally {
        FlinkSecurityManager.unmonitorUserSystemExitForCurrentThread();
    }
}

mainMethod.invoke(null, (Object) args);开始调用用户的程序。最后都要调用env.execute()来执行job,比如StreamExecutionEnvironment.execute

1
2
3
public JobExecutionResult execute() throws Exception {
    return execute(getStreamGraph());
}

StreamGraph

1
2
3
4
5
6
7
public StreamGraph getStreamGraph(boolean clearTransformations) {
    final StreamGraph streamGraph = getStreamGraphGenerator(transformations).generate();
    if (clearTransformations) {
        transformations.clear();
    }
    return streamGraph;
}

JobGraph

1
2
final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);

application模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
protected void runApplication(String[] args) throws Exception {
    LOG.info("Running 'run-application' command.");

    final Options commandOptions = CliFrontendParser.getRunCommandOptions();
    final CommandLine commandLine = getCommandLine(commandOptions, args, true);

    if (commandLine.hasOption(HELP_OPTION.getOpt())) {
        CliFrontendParser.printHelpForRunApplication(customCommandLines);
        return;
    }

    final CustomCommandLine activeCommandLine =
    validateAndGetActiveCommandLine(checkNotNull(commandLine));
    //生成ApplicationClusterDeployer
    final ApplicationDeployer deployer =
    new ApplicationClusterDeployer(clusterClientServiceLoader);

    final ProgramOptions programOptions;
    final Configuration effectiveConfiguration;

    // No need to set a jarFile path for Pyflink job.
    if (ProgramOptionsUtils.isPythonEntryPoint(commandLine)) {
        programOptions = ProgramOptionsUtils.createPythonProgramOptions(commandLine);
        effectiveConfiguration =
        getEffectiveConfiguration(
            activeCommandLine,
            commandLine,
            programOptions,
            Collections.emptyList());
    } else {
        programOptions = new ProgramOptions(commandLine);
        programOptions.validate();
        final URI uri = PackagedProgramUtils.resolveURI(programOptions.getJarFilePath());
        effectiveConfiguration =
        getEffectiveConfiguration(
            activeCommandLine,
            commandLine,
            programOptions,
            Collections.singletonList(uri.toString()));
    }

    final ApplicationConfiguration applicationConfiguration =
    new ApplicationConfiguration(
        programOptions.getProgramArgs(), programOptions.getEntryPointClassName());
    deployer.run(effectiveConfiguration, applicationConfiguration);
}

ApplicationClusterDeployer.run

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public <ClusterID> void run(
    final Configuration configuration,
    final ApplicationConfiguration applicationConfiguration)
throws Exception {
    checkNotNull(configuration);
    checkNotNull(applicationConfiguration);

    LOG.info("Submitting application in 'Application Mode'.");

    final ClusterClientFactory<ClusterID> clientFactory =
    clientServiceLoader.getClusterClientFactory(configuration);
    try (final ClusterDescriptor<ClusterID> clusterDescriptor =
         clientFactory.createClusterDescriptor(configuration)) {
        final ClusterSpecification clusterSpecification =
        clientFactory.getClusterSpecification(configuration);

        clusterDescriptor.deployApplicationCluster(
            clusterSpecification, applicationConfiguration);
    }
}

clusterDescriptor两种实现KubernetesClusterDescriptor或者YarnClusterDescriptor
YarnClusterDescriptor.deployApplicationCluster

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public ClusterClientProvider<ApplicationId> deployApplicationCluster(
    final ClusterSpecification clusterSpecification,
    final ApplicationConfiguration applicationConfiguration)
throws ClusterDeploymentException {
    checkNotNull(clusterSpecification);
    checkNotNull(applicationConfiguration);

    final YarnDeploymentTarget deploymentTarget =
    YarnDeploymentTarget.fromConfig(flinkConfiguration);
    if (YarnDeploymentTarget.APPLICATION != deploymentTarget) {
        throw new ClusterDeploymentException(
            "Couldn't deploy Yarn Application Cluster."
            + " Expected deployment.target="
            + YarnDeploymentTarget.APPLICATION.getName()
            + " but actual one was \""
            + deploymentTarget.getName()
            + "\"");
    }

    applicationConfiguration.applyToConfiguration(flinkConfiguration);

    // No need to do pipelineJars validation if it is a PyFlink job.
    if (!(PackagedProgramUtils.isPython(applicationConfiguration.getApplicationClassName())
          || PackagedProgramUtils.isPython(applicationConfiguration.getProgramArguments()))) {
        final List<String> pipelineJars =
        flinkConfiguration
        .getOptional(PipelineOptions.JARS)
        .orElse(Collections.emptyList());
        Preconditions.checkArgument(pipelineJars.size() == 1, "Should only have one jar");
    }

    try {
        //注意这里YarnApplicationClusterEntryPoint.class.getName(),这个是AM启动的入口
        return deployInternal(
            clusterSpecification,
            "Flink Application Cluster",
            YarnApplicationClusterEntryPoint.class.getName(),
            null,
            false);
    } catch (Exception e) {
        throw new ClusterDeploymentException("Couldn't deploy Yarn Application Cluster", e);
    }
}

YarnClusterDescriptor.deployInternal
创建一个YarnClientApplication,包含最大资源
通过startAppMaster向yarn集群提交启动AM

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
private ClusterClientProvider<ApplicationId> deployInternal(
    ClusterSpecification clusterSpecification,
    String applicationName,
    String yarnClusterEntrypoint,
    @Nullable JobGraph jobGraph,
    boolean detached)
throws Exception {

    final UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
    if (HadoopUtils.isKerberosSecurityEnabled(currentUser)) {
        boolean useTicketCache =
        flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);

        if (!HadoopUtils.areKerberosCredentialsValid(currentUser, useTicketCache)) {
            throw new RuntimeException(
                "Hadoop security with Kerberos is enabled but the login user "
                + "does not have Kerberos credentials or delegation tokens!");
        }

        final boolean fetchToken =
        flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN);
        final boolean yarnAccessFSEnabled =
        !CollectionUtil.isNullOrEmpty(
            flinkConfiguration.get(YarnConfigOptions.YARN_ACCESS));
        if (!fetchToken && yarnAccessFSEnabled) {
            throw new IllegalConfigurationException(
                String.format(
                    "When %s is disabled, %s must be disabled as well.",
                    SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN.key(),
                    YarnConfigOptions.YARN_ACCESS.key()));
        }
    }

    isReadyForDeployment(clusterSpecification);

    // ------------------ Check if the specified queue exists --------------------

    checkYarnQueues(yarnClient);

    // ------------------ Check if the YARN ClusterClient has the requested resources
    // --------------
    
    //创建一个YarnClientApplication,返回最大MaximumResourceCapability,后边校验资源
    // Create application via yarnClient
    final YarnClientApplication yarnApplication = yarnClient.createApplication();
    final GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();

    Resource maxRes = appResponse.getMaximumResourceCapability();

    final ClusterResourceDescription freeClusterMem;
    try {
        freeClusterMem = getCurrentFreeClusterResources(yarnClient);
    } catch (YarnException | IOException e) {
        failSessionDuringDeployment(yarnClient, yarnApplication);
        throw new YarnDeploymentException(
            "Could not retrieve information about free cluster resources.", e);
    }

    final int yarnMinAllocationMB =
    yarnConfiguration.getInt(
        YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
    if (yarnMinAllocationMB <= 0) {
        throw new YarnDeploymentException(
            "The minimum allocation memory "
            + "("
            + yarnMinAllocationMB
            + " MB) configured via '"
            + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB
            + "' should be greater than 0.");
    }

    final ClusterSpecification validClusterSpecification;
    try {
        validClusterSpecification =
        validateClusterResources(
            clusterSpecification, yarnMinAllocationMB, maxRes, freeClusterMem);
    } catch (YarnDeploymentException yde) {
        failSessionDuringDeployment(yarnClient, yarnApplication);
        throw yde;
    }

    LOG.info("Cluster specification: {}", validClusterSpecification);

    final ClusterEntrypoint.ExecutionMode executionMode =
    detached
    ? ClusterEntrypoint.ExecutionMode.DETACHED
    : ClusterEntrypoint.ExecutionMode.NORMAL;

    flinkConfiguration.setString(
        ClusterEntrypoint.INTERNAL_CLUSTER_EXECUTION_MODE, executionMode.toString());

    ApplicationReport report =
    startAppMaster(
        flinkConfiguration,
        applicationName,
        yarnClusterEntrypoint,
        jobGraph,
        yarnClient,
        yarnApplication,
        validClusterSpecification);

    // print the application id for user to cancel themselves.
    if (detached) {
        final ApplicationId yarnApplicationId = report.getApplicationId();
        logDetachedClusterInformation(yarnApplicationId, LOG);
    }

    setClusterEntrypointInfoToConfig(report);

    return () -> {
        try {
            return new RestClusterClient<>(flinkConfiguration, report.getApplicationId());
        } catch (Exception e) {
            throw new RuntimeException("Error while creating RestClusterClient.", e);
        }
    };
}

startAppMaster代码比较长
主要是生成一个ContainerLaunchContext并包装成ApplicationSubmissionContext提交到yarn

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//达到一个ApplicationSubmissionContext
ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();
//创建ContainerLaunchContext
final ContainerLaunchContext amContainer =
setupApplicationMasterContainer(yarnClusterEntrypoint, hasKrb5, processSpec);
//将amContainer包装成appContext
appContext.setApplicationName(customApplicationName);
appContext.setApplicationType(applicationType != null ? applicationType : "Apache Flink");
appContext.setAMContainerSpec(amContainer);
appContext.setResource(capability);
//submit 一个application
LOG.info("Submitting application master " + appId);
yarnClient.submitApplication(appContext);

LOG.info("Waiting for the cluster to be allocated");


org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMActiveServices.serviceInit

1
2
3
4
ResourceManager.this.masterService = ResourceManager.this.createApplicationMasterService();
//创建ApplicationMasterLauncher
this.applicationMasterLauncher = ResourceManager.this.createAMLauncher();

ApplicationMasterLauncher实现接口EventHandler 的handle方法

1
2
3
4
5
6
7
8
9
10
11
12
public synchronized void handle(AMLauncherEvent appEvent) {
    AMLauncherEventType event = (AMLauncherEventType)appEvent.getType();
    RMAppAttempt application = appEvent.getAppAttempt();
    switch(event) {
        case LAUNCH:
        this.launch(application);
        break;
        case CLEANUP:
        this.cleanup(application);
    }

}

启动类YarnApplicationClusterEntryPoint

This post is licensed under CC BY 4.0 by the author.