Home Flink Yarn Session的启动
Post
Cancel

Flink Yarn Session的启动

1
bin/yarn-session.sh -n 3 -s 4 -jm 4096m -tm 4096m -nm flink-1.6.0 –d

脚本yarn-session.sh的内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
bin=`dirname "$0"`
bin=`cd "$bin"; pwd`

# get Flink config
. "$bin"/config.sh

if [ "$FLINK_IDENT_STRING" = "" ]; then
        FLINK_IDENT_STRING="$USER"
fi

JVM_ARGS="$JVM_ARGS -Xmx512m"

CC_CLASSPATH=`manglePathList $(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS`

log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-yarn-session-$HOSTNAME.log
log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-yarn-session.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback-yarn.xml"

export FLINK_CONF_DIR

$JAVA_RUN $JVM_ARGS -classpath "$CC_CLASSPATH" $log_setting org.apache.flink.yarn.cli.FlinkYarnSessionCli -j "$FLINK_LIB_DIR"/flink-dist*.jar "$@"

执行的主类是FlinkYarnSessionCli,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public int run(String[] args) throws CliArgsException, FlinkException {
  //
  //	Command Line Options
  //
  final CommandLine cmd = parseCommandLineOptions(args, true);

  if (cmd.hasOption(help.getOpt())) {
    printUsage();
    return 0;
  }
  //initialize and start a YarnClient
  final AbstractYarnClusterDescriptor yarnClusterDescriptor = createClusterDescriptor(cmd);

  try {
    // Query cluster for metrics
    if (cmd.hasOption(query.getOpt())) {
      final String description = yarnClusterDescriptor.getClusterDescription();
      System.out.println(description);
      return 0;
    } else {
      final ClusterClient<ApplicationId> clusterClient;
      final ApplicationId yarnApplicationId;

      if (cmd.hasOption(applicationId.getOpt())) {
        yarnApplicationId = ConverterUtils.toApplicationId(cmd.getOptionValue(applicationId.getOpt()));

        clusterClient = yarnClusterDescriptor.retrieve(yarnApplicationId);
      } else {
        final ClusterSpecification clusterSpecification = getClusterSpecification(cmd);
        //create an application, and get its application id. 
        clusterClient = yarnClusterDescriptor.deploySessionCluster(clusterSpecification);

        //------------------ ClusterClient deployed, handle connection details
        yarnApplicationId = clusterClient.getClusterId();

        try {
          final LeaderConnectionInfo connectionInfo = clusterClient.getClusterConnectionInfo();

          System.out.println("Flink JobManager is now running on " + connectionInfo.getHostname() +
            ':' + connectionInfo.getPort() + " with leader id " + connectionInfo.getLeaderSessionID() + '.');
          System.out.println("JobManager Web Interface: " + clusterClient.getWebInterfaceURL());

          writeYarnPropertiesFile(
            yarnApplicationId,
            clusterSpecification.getNumberTaskManagers() * clusterSpecification.getSlotsPerTaskManager(),
            yarnClusterDescriptor.getDynamicPropertiesEncoded());
        } catch (Exception e) {
          try {
            clusterClient.shutdown();
          } catch (Exception ex) {
            LOG.info("Could not properly shutdown cluster client.", ex);
          }

          try {
            yarnClusterDescriptor.killCluster(yarnApplicationId);
          } catch (FlinkException fe) {
            LOG.info("Could not properly terminate the Flink cluster.", fe);
          }

          throw new FlinkException("Could not write the Yarn connection information.", e);
        }

具体步骤参考Hadoop: Writing YARN Applications

1, 初始化创建yarnclient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
//FlinkYarnSessionCli.java  initialize and start a YarnClient
final AbstractYarnClusterDescriptor yarnClusterDescriptor = createClusterDescriptor(cmd);
public AbstractYarnClusterDescriptor createClusterDescriptor(CommandLine commandLine) throws FlinkException {
  final Configuration effectiveConfiguration = applyCommandLineOptionsToConfiguration(commandLine);

  return createDescriptor(
    effectiveConfiguration,
    yarnConfiguration,
    configurationDirectory,
    commandLine);
}
//FlinkYarnSessionCli.java 
private AbstractYarnClusterDescriptor createDescriptor(
			Configuration configuration,
			YarnConfiguration yarnConfiguration,
			String configurationDirectory,
			CommandLine cmd) {

		AbstractYarnClusterDescriptor yarnClusterDescriptor = getClusterDescriptor(
			configuration,
			yarnConfiguration,
			configurationDirectory);
//FlinkYarnSessionCli.java 
private AbstractYarnClusterDescriptor getClusterDescriptor(
    Configuration configuration,
    YarnConfiguration yarnConfiguration,
    String configurationDirectory) {
  final YarnClient yarnClient = YarnClient.createYarnClient();
  yarnClient.init(yarnConfiguration);
  yarnClient.start();

2, 有了这个yarnClient就可以create an application, and get its application id

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
clusterClient = yarnClusterDescriptor.deploySessionCluster(clusterSpecification);
public ClusterClient<ApplicationId> deploySessionCluster(ClusterSpecification clusterSpecification) throws ClusterDeploymentException {
  try {//阻塞直到YarnApplicationMasterRunner启动起来,
    return deployInternal(
      clusterSpecification,
      "Flink session cluster",
      getYarnSessionClusterEntrypoint(),
      null,
      false);
  } catch (Exception e) {
    throw new ClusterDeploymentException("Couldn't deploy Yarn session cluster", e);
  }
}
//org.apache.flink.yarn.AbstractYarnClusterDescriptor.java
protected ClusterClient<ApplicationId> deployInternal(){
   ...
    // Create application via yarnClient
  final YarnClientApplication yarnApplication = yarnClient.createApplication();
  final GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();
  //appResponse.getApplicationId();
  ...
}

3, 两个主要的context: ApplicationSubmissionContext与ContainerLaunchContext,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//org.apache.flink.yarn.AbstractYarnClusterDescriptor.java:部分代码
public ApplicationReport startAppMaster(){
		ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();
    
    final ContainerLaunchContext amContainer = setupApplicationMasterContainer(
			yarnClusterEntrypoint,
			hasLogback,
			hasLog4j,
			hasKrb5,
			clusterSpecification.getMasterMemoryMB());
      
    amContainer.setLocalResources(localResources);
    amContainer.setEnvironment(appMasterEnv);
    
    appContext.setApplicationName(customApplicationName);
		appContext.setApplicationType("Apache Flink");
		appContext.setAMContainerSpec(amContainer);
		appContext.setResource(capability);

}

4, 有了yarnclient与appcontext就可以submit the application

1
2
yarnClient.submitApplication(appContext);

在此之后会启动一个amcontainer来启动ApplicationMaster,flink里的am是启动YarnApplicationMasterRunner

比如yarn里,可以通过default_container_executor.sh等方式 启动一个container

5, Get application report

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
try {
				report = yarnClient.getApplicationReport(appId);
			} catch (IOException e) {
				throw new YarnDeploymentException("Failed to deploy the cluster.", e);
			}
			YarnApplicationState appState = report.getYarnApplicationState();
			LOG.debug("Application State: {}", appState);
			switch(appState) {
				case FAILED:
				case FINISHED:
				case KILLED:
					throw new YarnDeploymentException("The YARN application unexpectedly switched to state "
						+ appState + " during deployment. \n" +
						"Diagnostics from YARN: " + report.getDiagnostics() + "\n" +
						"If log aggregation is enabled on your cluster, use this command to further investigate the issue:\n" +
						"yarn logs -applicationId " + appId);
					//break ..
				case RUNNING:
					LOG.info("YARN application has been deployed successfully.");
					break loop;
				default:
					if (appState != lastAppState) {
						LOG.info("Deploying cluster, current state " + appState);
					}
					if (System.currentTimeMillis() - startTime > 60000) {
						LOG.info("Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster");
					}

			}
This post is licensed under CC BY 4.0 by the author.