Home Azkaban
Post
Cancel

Azkaban

azkaban-web ProjectManagerServlet

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
handleMultiformPost(req, resp, params, session);
//
protected void handleMultiformPost(final HttpServletRequest req,
                                   final HttpServletResponse resp, final Map<String, Object> params, final Session session)
throws ServletException, IOException {
    // Looks like a duplicate, but this is a move away from the regular
    // multiform post + redirect
    // to a more ajax like command.
    if (params.containsKey("ajax")) {
        final String action = (String) params.get("ajax");
        final HashMap<String, String> ret = new HashMap<>();
        if (API_UPLOAD.equals(action)) {
            ajaxHandleUpload(req, resp, ret, params, session);
        }
        this.writeJSON(resp, ret);
    } else if (params.containsKey("action")) {
        final String action = (String) params.get("action");
        if (API_UPLOAD.equals(action)) {
            handleUpload(req, resp, params, session);//
        }
    }
}

handleUpload内部比较简单,主要调用ajaxHandleUpload
ajaxHandleUpload主要式读取上传流中的数据在本地生成临时文件上传

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
  private void ajaxHandleUpload(final HttpServletRequest req, final HttpServletResponse resp,
      final Map<String, String> ret, final Map<String, Object> multipart, final Session session)
throws ServletException, IOException {
//生成本地临时文件
    final File archiveFile = new File(tempDir, name);
    out = new BufferedOutputStream(new FileOutputStream(archiveFile));
    IOUtils.copy(item.getInputStream(), out);
    out.close();
    //projectManager内部主要:
    //解压zip文件,验证依赖任务等
    //通过AzkabanProjectLoader生成project flow等信心
    //最后持久化:上传文件,持久化project,flow
    final Map<String, ValidationReport> reports = this.projectManager
    .uploadProject(project, archiveFile, lowercaseExtension, user, props, uploaderIPAddr);

}

AzkabanProjectLoader

1
2
3
4
5
6
7
8
9
10
11
12
13
public Map<String, ValidationReport> uploadProject(final Project project,
                                                   final File archive, final String fileType, final User uploader, final Props additionalProps,
                                                   final String uploaderIPAddr)
throws ProjectManagerException, ExecutorManagerException {
    //两种实现方式DirectoryFlowLoader或者DirectoryYamlFlowLoader
    loader = this.flowLoaderFactory.createFlowLoader(folder);
    //解析以及依赖等创建flow等
    reports.put(DIRECTORY_FLOW_REPORT_KEY, loader.loadProjectFlow(project, folder));
    // Upload the project to DB and storage.
    final File startupDependenciesOrNull = isThinProject ? startupDependencies : null;
    persistProject(project, loader, archive, folder, startupDependenciesOrNull, uploader,
                   uploaderIPAddr);
}

loadProjectFlow

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//DirectoryFlowLoader.java
public ValidationReport loadProjectFlow(final Project project, final File projectDir) {
    this.propsList = new ArrayList<>();
    this.flowPropsList = new ArrayList<>();
    this.jobPropsMap = new HashMap<>();
    this.nodeMap = new HashMap<>();
    this.duplicateJobs = new HashSet<>();
    this.nodeDependencies = new HashMap<>();
    this.rootNodes = new HashSet<>();
    this.flowDependencies = new HashMap<>();

    // Load all the props files and create the Node objects
    loadProjectFromDir(projectDir.getPath(), projectDir, null);

    // Create edges and find missing dependencies
    resolveDependencies();

    // Create the flows.
    buildFlowsFromDependencies();

    // Resolve embedded flows
    resolveEmbeddedFlows();

    FlowLoaderUtils.checkJobProperties(project.getId(), this.props, this.jobPropsMap, this.errors);

    return FlowLoaderUtils.generateFlowLoaderReport(this.errors);

}

persistProject

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
private void persistProject(final Project project, final FlowLoader loader, final File archive,
                            final File projectDir, final File startupDependencies, final User uploader,
                            final String uploaderIPAddr) throws ProjectManagerException {
    synchronized (project) {
        final int newProjectVersion = this.projectLoader.getLatestProjectVersion(project) + 1;
        final Map<String, Flow> flows = loader.getFlowMap();
        for (final Flow flow : flows.values()) {
            flow.setProjectId(project.getId());
            flow.setVersion(newProjectVersion);
        }
        //上传文件到hdfs/db
        this.projectStorageManager.uploadProject(project, newProjectVersion, archive,
                                                 startupDependencies, uploader, uploaderIPAddr);
        //插入flow到表:project_flows
        log.info("Uploading flow to db for project " + archive.getName());
        this.projectLoader.uploadFlows(project, newProjectVersion, flows.values());
        project.setFlows(flows);

        // Set the project version before upload of project files happens so that the files use
        // new version.
        project.setVersion(newProjectVersion);

        if (loader instanceof DirectoryFlowLoader) {
            final DirectoryFlowLoader directoryFlowLoader = (DirectoryFlowLoader) loader;
            log.info("Uploading Job properties");//insert/更新project_properties表
            this.projectLoader.uploadProjectProperties(project, new ArrayList<>(
                directoryFlowLoader.getJobPropsMap().values()));
            log.info("Uploading Props properties");
            this.projectLoader.uploadProjectProperties(project, directoryFlowLoader.getPropsList());

        } else if (loader instanceof DirectoryYamlFlowLoader) {
            //yaml格递归调用上传文件,插入flow等
            uploadFlowFilesRecursively(projectDir, project, newProjectVersion);
        } else {
            throw new ProjectManagerException("Invalid type of flow loader.");
        }

        // CAUTION : Always change the project version as the last item to make
        // sure all the project related files are uploaded.
        log.info("Changing project versions for project " + archive.getName());
        this.projectLoader.changeProjectVersion(project, newProjectVersion,
                                                uploader.getUserId());
        this.projectLoader.postEvent(project, EventType.UPLOADED, uploader.getUserId(),
                                     "Uploaded project files zip " + archive.getName());
    }
}

调度
getTriggerManager().start();

加载所有

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
  public void start() throws TriggerManagerException {

    try {
      // expect loader to return valid triggers
      final List<Trigger> triggers = this.triggerLoader.loadTriggers();
      for (final Trigger t : triggers) {
        this.runnerThread.addTrigger(t);
        triggerIdMap.put(t.getTriggerId(), t);
      }
    } catch (final Exception e) {
      logger.error(e);
      throw new TriggerManagerException(e);
    }

    this.runnerThread.start();
  }

runnerThread是TriggerScannerThread

1
2
3
4
5
public void run() {
    try {
        checkAllTriggers();
    } catch (final Exception e) {}
}

checkAllTriggers
如果是READY状态则执行onTriggerTrigger或onTriggerPause

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private void checkAllTriggers() throws TriggerManagerException {
    // sweep through the rest of them
    for (final Trigger t : this.triggers) {
        try {
            TriggerManager.this.scannerStage = "Checking for trigger " + t.getTriggerId();

            if (t.getStatus().equals(TriggerStatus.READY)) {

                /**
             * Prior to this change, expiration condition should never be called though
             * we have some related code here. ExpireCondition used the same BasicTimeChecker
             * as triggerCondition do. As a consequence, we need to figure out a way to distinguish
             * the previous ExpireCondition and this commit's ExpireCondition.
             */
                if (t.getExpireCondition().getExpression().contains("EndTimeChecker") && t
                    .expireConditionMet()) {
                    onTriggerPause(t);
                } else if (t.triggerConditionMet()) {
                    onTriggerTrigger(t);//出发
                }
            }
            if (t.getStatus().equals(TriggerStatus.EXPIRED) && t.getSource().equals("azkaban")) {
                removeTrigger(t);
            } else {
                t.updateNextCheckTime();//更新下一次调度的时间
            }
        } catch (final Throwable th) {
            //skip this trigger, moving on to the next one
            logger.error("Failed to process trigger with id : " + t, th);
        }
    }
}

onTriggerTrigger这里如果是执行任务 action是ExecuteFlowAction,顺便更新triggers表信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private void onTriggerTrigger(final Trigger t) throws TriggerManagerException {
    final List<TriggerAction> actions = t.getTriggerActions();
    for (final TriggerAction action : actions) {
        try {
            logger.info("Doing trigger actions " + action.getDescription() + " for " + t);
            action.doAction();//这里是ExecuteFlowAction
        } catch (final ExecutorManagerException e) {
            if (e.getReason() == ExecutorManagerException.Reason.SkippedExecution) {
                logger.info("Skipped action [" + action.getDescription() + "] for [" + t +
                            "] because: " + e.getMessage());
            } else {
                logger.error("Failed to do action [" + action.getDescription() + "] for [" + t + "]",
                             e);
            }
        } catch (final Throwable th) {
            logger.error("Failed to do action [" + action.getDescription() + "] for [" + t + "]", th);
        }
    }

    if (t.isResetOnTrigger()) {
        t.resetTriggerConditions();
    } else {
        logger.info("NextCheckTime did not change. Setting status to expired for trigger"
                    + t.getTriggerId());
        t.setStatus(TriggerStatus.EXPIRED);
    }
    try {
        //更新triggers信息
        TriggerManager.this.triggerLoader.updateTrigger(t);
    } catch (final TriggerLoaderException e) {
        throw new TriggerManagerException(e);
    }
}
1
2
3
4
5
6
7
8
9
10
11
public void doAction() throws Exception {
    //缓存查找project,flow 用来构造ExecutableFlow
    final Project project = FlowUtils.getProject(projectManager, this.projectId);
    final Flow flow = FlowUtils.getFlow(project, this.flowName);
    //生成ExecutableFlow
    final ExecutableFlow exflow = FlowUtils.createExecutableFlow(project, flow);
    //提交给executorManager
    logger.info("Invoking flow " + project.getName() + "." + this.flowName);
    executorManagerAdapter.submitExecutableFlow(exflow, this.submitUser);
    logger.info("Invoked flow " + project.getName() + "." + this.flowName);ExecuteFlowAction.doAction()
}

executorManager直接提交给队列queuedFlows

1
2
3
4
5
6
public String submitExecutableFlow(final ExecutableFlow exflow, final String userId)
throws ExecutorManagerException {
    //flow队列queuedFlows
    this.queuedFlows.enqueue(exflow, reference);
}

消费者QueueProcessorThread,其run主要调用processQueuedFlows来处理flow

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void processQueuedFlows(final long activeExecutorsRefreshWindow,
                                final int maxContinuousFlowProcessed) throws InterruptedException,
ExecutorManagerException {
    final ExecutionReference reference = ExecutorManager.this.runningCandidate.getFirst();
    final ExecutableFlow exflow = ExecutorManager.this.runningCandidate.getSecond();
    exflow.setUpdateTime(currentTime);
    // process flow with current snapshot of activeExecutors
    //选择最佳的一个Executor提交任务,包含host,port等信息
    selectExecutorAndDispatchFlow(reference, exflow);
}
//
private void selectExecutorAndDispatchFlow(final ExecutionReference reference,
                                           final ExecutableFlow exflow)
throws ExecutorManagerException {
    //从可用的Executor选择一个提交
    final Executor selectedExecutor = selectExecutor(exflow, remainingExecutors);
    //dispatch下发任务到selectedExecutor
    dispatch(reference, exflow, selectedExecutor);
}
//
private void dispatch(final ExecutionReference reference, final ExecutableFlow exflow,
                      final Executor choosenExecutor) throws ExecutorManagerException {
    //ExecutorApiGateway内部负责远程调用,这里提交EXECUTE_ACTION请求
    this.apiGateway.callWithExecutable(exflow, choosenExecutor,
                                       ConnectorParams.EXECUTE_ACTION);
}

通过http方式提交给Executor server

1
2
3
4
5
6
7
8
9
10
11
12
private String callForJsonString(final String host, final int port, final String path,
                                 final DispatchMethod dispatchMethod, final Optional<Integer> httpTimeout,
                                 List<Pair<String, String>> paramList) throws IOException {
    if (paramList == null) {
        paramList = new ArrayList<>();
    }

    @SuppressWarnings("unchecked") final URI uri =
    apiClient.buildExecutorUri(host, port, path, true, dispatchMethod);

    return this.apiClient.doPost(uri, dispatchMethod, httpTimeout, paramList);
}

exec-server
直接看servlet如何处理EXECUTE_ACTION

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
if (action.equals(ConnectorParams.EXECUTE_ACTION)) {
    handleAjaxExecute(req, respMap, execid);
}
//FlowRunnerManager
private void handleAjaxExecute(final HttpServletRequest req,
                               final Map<String, Object> respMap, final int execId) {
    try {
        this.flowRunnerManager.submitFlow(execId);
    } catch (final ExecutorManagerException e) {
        logger.error(e.getMessage(), e);
        respMap.put(ConnectorParams.RESPONSE_ERROR, e.getMessage());
    }
}
//FlowRunnerManager
public void submitFlow(final int execId) throws ExecutorManagerException {
    if (isAlreadyRunning(execId)) {
        return;
    }
    //创建FlowRunner
    final FlowRunner runner = createFlowRunner(execId);
    // Check again.
    if (isAlreadyRunning(execId)) {
        return;
    }
    submitFlowRunner(runner);
}
//把FlowRunner提交给线程池TrackingThreadPool处理
private void submitFlowRunner(final FlowRunner runner) throws ExecutorManagerException {
    this.runningFlows.put(runner.getExecutionId(), runner);
    try {
        // The executorService already has a queue.
        // The submit method below actually returns an instance of FutureTask,
        // which implements interface RunnableFuture, which extends both
        // Runnable and Future interfaces
        final Future<?> future = this.executorService.submit(runner);//提交给线程池
        // keep track of this future
        this.submittedFlows.put(future, runner.getExecutionId());
        // update the last submitted time.
        this.lastFlowSubmittedDate = System.currentTimeMillis();
    } catch (final RejectedExecutionException re) {
        this.runningFlows.remove(runner.getExecutionId());
        final StringBuffer errorMsg = new StringBuffer(
            "Azkaban executor can't execute any more flows. ");
        if (this.executorService.isShutdown()) {
            errorMsg.append("The executor is being shut down.");
        }
        throw new ExecutorManagerException(errorMsg.toString(), re);
    }
}

由此看来FlowRunner必然是一个,看起run处理逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public void run() {
    //先做一些初始化工作
    setupFlowExecution();
    this.flow.setStartTime(System.currentTimeMillis());
    this.logger.info("Updating initial flow directory.");
    updateFlow();
    this.logger.info("Fetching job and shared properties.");
    if (!FlowLoaderUtils.isAzkabanFlowVersion20(this.flow.getAzkabanFlowVersion())) {
        loadAllProperties();
    }

    this.fireEventListeners(
        Event.create(this, EventType.FLOW_STARTED, new EventData(this.getExecutableFlow())));
    //执行任务的地方
    runFlow();
    //如果一场,修改状态为FAILED
    this.flow.setStatus(Status.FAILED);
    //更新下时间等
    this.flow.setEndTime(System.currentTimeMillis());
    this.logger.info("Setting end time for flow " + this.execId + " to "
                     + System.currentTimeMillis());
    closeLogger();
    updateFlow();


}

看下READY状态的怎么处理,主要看是不是ExecutableFlowBase节点,如果是则找出startNode 依次执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
if (nextNodeStatus == Status.READY) {
    if (node instanceof ExecutableFlowBase) {
        final ExecutableFlowBase flow = ((ExecutableFlowBase) node);
        this.logger.info("Running flow '" + flow.getNestedId() + "'.");
        flow.setStatus(Status.RUNNING);
        // don't overwrite start time of root flows
        if (flow.getStartTime() <= 0) {
            flow.setStartTime(System.currentTimeMillis());
        }
        prepareJobProperties(flow);
        //依次执行所有node
        for (final String startNodeId : ((ExecutableFlowBase) node).getStartNodes()) {
            final ExecutableNode startNode = flow.getExecutableNode(startNodeId);
            runReadyJob(startNode);
        }
    } else {
        //执行Node
        runExecutableNode(node);
    }
}

runExecutableNode才是执行节点任务的

1
2
3
4
5
6
7
8
9
10
11
12
13
private void runExecutableNode(final ExecutableNode node) throws IOException {
        //azkaban.execapp.JobRunner
       final JobRunner runner = createJobRunner(node);
    this.logger.info("Submitting job '" + node.getNestedId() + "' to run.");
    try {
      // Job starts to queue
      runner.setTimeInQueue(System.currentTimeMillis());
      this.executorService.submit(runner);//提交JobRunner给线程池
      this.activeJobRunners.add(runner);
    } catch (final RejectedExecutionException e) {
      this.logger.error(e);
    } 
}

跟上边FlowRunner类似直接看run方法

1
2
3
4
5
6
7
8
9
10
public void run() {
    try {
        doRun();
    } catch (final Exception e) {
        serverLogger.error("Unexpected exception", e);
        throw e;
    } finally {
        Thread.currentThread().setContextClassLoader(this.threadClassLoader);
    }
}

doRun的核心是runJob()

1
2
3
4
5
      //生成job
if (prepareJob()) {
    //运行具体的job
    runJob();
}

runJob里执行具体的任务,这里的job具体实现主要有HadoopHiveJob,HadoopSparkJob等

1
2
3
4
5
6
7
8
private Status runJob() {
    try {
      this.job.run();//这里的job具体实现主要有HadoopHiveJob,HadoopSparkJob等
      finalStatus = this.node.getStatus();
    } catch (final Throwable e) {
    //
    }
}
This post is licensed under CC BY 4.0 by the author.