azkaban-web ProjectManagerServlet
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
handleMultiformPost(req, resp, params, session);
//
protected void handleMultiformPost(final HttpServletRequest req,
final HttpServletResponse resp, final Map<String, Object> params, final Session session)
throws ServletException, IOException {
// Looks like a duplicate, but this is a move away from the regular
// multiform post + redirect
// to a more ajax like command.
if (params.containsKey("ajax")) {
final String action = (String) params.get("ajax");
final HashMap<String, String> ret = new HashMap<>();
if (API_UPLOAD.equals(action)) {
ajaxHandleUpload(req, resp, ret, params, session);
}
this.writeJSON(resp, ret);
} else if (params.containsKey("action")) {
final String action = (String) params.get("action");
if (API_UPLOAD.equals(action)) {
handleUpload(req, resp, params, session);//
}
}
}
handleUpload内部比较简单,主要调用ajaxHandleUpload
ajaxHandleUpload主要式读取上传流中的数据在本地生成临时文件上传
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private void ajaxHandleUpload(final HttpServletRequest req, final HttpServletResponse resp,
final Map<String, String> ret, final Map<String, Object> multipart, final Session session)
throws ServletException, IOException {
//生成本地临时文件
final File archiveFile = new File(tempDir, name);
out = new BufferedOutputStream(new FileOutputStream(archiveFile));
IOUtils.copy(item.getInputStream(), out);
out.close();
//projectManager内部主要:
//解压zip文件,验证依赖任务等
//通过AzkabanProjectLoader生成project flow等信心
//最后持久化:上传文件,持久化project,flow
final Map<String, ValidationReport> reports = this.projectManager
.uploadProject(project, archiveFile, lowercaseExtension, user, props, uploaderIPAddr);
}
AzkabanProjectLoader
1
2
3
4
5
6
7
8
9
10
11
12
13
public Map<String, ValidationReport> uploadProject(final Project project,
final File archive, final String fileType, final User uploader, final Props additionalProps,
final String uploaderIPAddr)
throws ProjectManagerException, ExecutorManagerException {
//两种实现方式DirectoryFlowLoader或者DirectoryYamlFlowLoader
loader = this.flowLoaderFactory.createFlowLoader(folder);
//解析以及依赖等创建flow等
reports.put(DIRECTORY_FLOW_REPORT_KEY, loader.loadProjectFlow(project, folder));
// Upload the project to DB and storage.
final File startupDependenciesOrNull = isThinProject ? startupDependencies : null;
persistProject(project, loader, archive, folder, startupDependenciesOrNull, uploader,
uploaderIPAddr);
}
loadProjectFlow
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//DirectoryFlowLoader.java
public ValidationReport loadProjectFlow(final Project project, final File projectDir) {
this.propsList = new ArrayList<>();
this.flowPropsList = new ArrayList<>();
this.jobPropsMap = new HashMap<>();
this.nodeMap = new HashMap<>();
this.duplicateJobs = new HashSet<>();
this.nodeDependencies = new HashMap<>();
this.rootNodes = new HashSet<>();
this.flowDependencies = new HashMap<>();
// Load all the props files and create the Node objects
loadProjectFromDir(projectDir.getPath(), projectDir, null);
// Create edges and find missing dependencies
resolveDependencies();
// Create the flows.
buildFlowsFromDependencies();
// Resolve embedded flows
resolveEmbeddedFlows();
FlowLoaderUtils.checkJobProperties(project.getId(), this.props, this.jobPropsMap, this.errors);
return FlowLoaderUtils.generateFlowLoaderReport(this.errors);
}
persistProject
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
private void persistProject(final Project project, final FlowLoader loader, final File archive,
final File projectDir, final File startupDependencies, final User uploader,
final String uploaderIPAddr) throws ProjectManagerException {
synchronized (project) {
final int newProjectVersion = this.projectLoader.getLatestProjectVersion(project) + 1;
final Map<String, Flow> flows = loader.getFlowMap();
for (final Flow flow : flows.values()) {
flow.setProjectId(project.getId());
flow.setVersion(newProjectVersion);
}
//上传文件到hdfs/db
this.projectStorageManager.uploadProject(project, newProjectVersion, archive,
startupDependencies, uploader, uploaderIPAddr);
//插入flow到表:project_flows
log.info("Uploading flow to db for project " + archive.getName());
this.projectLoader.uploadFlows(project, newProjectVersion, flows.values());
project.setFlows(flows);
// Set the project version before upload of project files happens so that the files use
// new version.
project.setVersion(newProjectVersion);
if (loader instanceof DirectoryFlowLoader) {
final DirectoryFlowLoader directoryFlowLoader = (DirectoryFlowLoader) loader;
log.info("Uploading Job properties");//insert/更新project_properties表
this.projectLoader.uploadProjectProperties(project, new ArrayList<>(
directoryFlowLoader.getJobPropsMap().values()));
log.info("Uploading Props properties");
this.projectLoader.uploadProjectProperties(project, directoryFlowLoader.getPropsList());
} else if (loader instanceof DirectoryYamlFlowLoader) {
//yaml格递归调用上传文件,插入flow等
uploadFlowFilesRecursively(projectDir, project, newProjectVersion);
} else {
throw new ProjectManagerException("Invalid type of flow loader.");
}
// CAUTION : Always change the project version as the last item to make
// sure all the project related files are uploaded.
log.info("Changing project versions for project " + archive.getName());
this.projectLoader.changeProjectVersion(project, newProjectVersion,
uploader.getUserId());
this.projectLoader.postEvent(project, EventType.UPLOADED, uploader.getUserId(),
"Uploaded project files zip " + archive.getName());
}
}
调度
getTriggerManager().start();
加载所有
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void start() throws TriggerManagerException {
try {
// expect loader to return valid triggers
final List<Trigger> triggers = this.triggerLoader.loadTriggers();
for (final Trigger t : triggers) {
this.runnerThread.addTrigger(t);
triggerIdMap.put(t.getTriggerId(), t);
}
} catch (final Exception e) {
logger.error(e);
throw new TriggerManagerException(e);
}
this.runnerThread.start();
}
runnerThread是TriggerScannerThread
1
2
3
4
5
public void run() {
try {
checkAllTriggers();
} catch (final Exception e) {}
}
checkAllTriggers
如果是READY状态则执行onTriggerTrigger或onTriggerPause
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private void checkAllTriggers() throws TriggerManagerException {
// sweep through the rest of them
for (final Trigger t : this.triggers) {
try {
TriggerManager.this.scannerStage = "Checking for trigger " + t.getTriggerId();
if (t.getStatus().equals(TriggerStatus.READY)) {
/**
* Prior to this change, expiration condition should never be called though
* we have some related code here. ExpireCondition used the same BasicTimeChecker
* as triggerCondition do. As a consequence, we need to figure out a way to distinguish
* the previous ExpireCondition and this commit's ExpireCondition.
*/
if (t.getExpireCondition().getExpression().contains("EndTimeChecker") && t
.expireConditionMet()) {
onTriggerPause(t);
} else if (t.triggerConditionMet()) {
onTriggerTrigger(t);//出发
}
}
if (t.getStatus().equals(TriggerStatus.EXPIRED) && t.getSource().equals("azkaban")) {
removeTrigger(t);
} else {
t.updateNextCheckTime();//更新下一次调度的时间
}
} catch (final Throwable th) {
//skip this trigger, moving on to the next one
logger.error("Failed to process trigger with id : " + t, th);
}
}
}
onTriggerTrigger这里如果是执行任务 action是ExecuteFlowAction,顺便更新triggers表信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private void onTriggerTrigger(final Trigger t) throws TriggerManagerException {
final List<TriggerAction> actions = t.getTriggerActions();
for (final TriggerAction action : actions) {
try {
logger.info("Doing trigger actions " + action.getDescription() + " for " + t);
action.doAction();//这里是ExecuteFlowAction
} catch (final ExecutorManagerException e) {
if (e.getReason() == ExecutorManagerException.Reason.SkippedExecution) {
logger.info("Skipped action [" + action.getDescription() + "] for [" + t +
"] because: " + e.getMessage());
} else {
logger.error("Failed to do action [" + action.getDescription() + "] for [" + t + "]",
e);
}
} catch (final Throwable th) {
logger.error("Failed to do action [" + action.getDescription() + "] for [" + t + "]", th);
}
}
if (t.isResetOnTrigger()) {
t.resetTriggerConditions();
} else {
logger.info("NextCheckTime did not change. Setting status to expired for trigger"
+ t.getTriggerId());
t.setStatus(TriggerStatus.EXPIRED);
}
try {
//更新triggers信息
TriggerManager.this.triggerLoader.updateTrigger(t);
} catch (final TriggerLoaderException e) {
throw new TriggerManagerException(e);
}
}
1
2
3
4
5
6
7
8
9
10
11
public void doAction() throws Exception {
//缓存查找project,flow 用来构造ExecutableFlow
final Project project = FlowUtils.getProject(projectManager, this.projectId);
final Flow flow = FlowUtils.getFlow(project, this.flowName);
//生成ExecutableFlow
final ExecutableFlow exflow = FlowUtils.createExecutableFlow(project, flow);
//提交给executorManager
logger.info("Invoking flow " + project.getName() + "." + this.flowName);
executorManagerAdapter.submitExecutableFlow(exflow, this.submitUser);
logger.info("Invoked flow " + project.getName() + "." + this.flowName);ExecuteFlowAction.doAction()
}
executorManager直接提交给队列queuedFlows
1
2
3
4
5
6
public String submitExecutableFlow(final ExecutableFlow exflow, final String userId)
throws ExecutorManagerException {
//flow队列queuedFlows
this.queuedFlows.enqueue(exflow, reference);
}
消费者QueueProcessorThread,其run主要调用processQueuedFlows来处理flow
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void processQueuedFlows(final long activeExecutorsRefreshWindow,
final int maxContinuousFlowProcessed) throws InterruptedException,
ExecutorManagerException {
final ExecutionReference reference = ExecutorManager.this.runningCandidate.getFirst();
final ExecutableFlow exflow = ExecutorManager.this.runningCandidate.getSecond();
exflow.setUpdateTime(currentTime);
// process flow with current snapshot of activeExecutors
//选择最佳的一个Executor提交任务,包含host,port等信息
selectExecutorAndDispatchFlow(reference, exflow);
}
//
private void selectExecutorAndDispatchFlow(final ExecutionReference reference,
final ExecutableFlow exflow)
throws ExecutorManagerException {
//从可用的Executor选择一个提交
final Executor selectedExecutor = selectExecutor(exflow, remainingExecutors);
//dispatch下发任务到selectedExecutor
dispatch(reference, exflow, selectedExecutor);
}
//
private void dispatch(final ExecutionReference reference, final ExecutableFlow exflow,
final Executor choosenExecutor) throws ExecutorManagerException {
//ExecutorApiGateway内部负责远程调用,这里提交EXECUTE_ACTION请求
this.apiGateway.callWithExecutable(exflow, choosenExecutor,
ConnectorParams.EXECUTE_ACTION);
}
通过http方式提交给Executor server
1
2
3
4
5
6
7
8
9
10
11
12
private String callForJsonString(final String host, final int port, final String path,
final DispatchMethod dispatchMethod, final Optional<Integer> httpTimeout,
List<Pair<String, String>> paramList) throws IOException {
if (paramList == null) {
paramList = new ArrayList<>();
}
@SuppressWarnings("unchecked") final URI uri =
apiClient.buildExecutorUri(host, port, path, true, dispatchMethod);
return this.apiClient.doPost(uri, dispatchMethod, httpTimeout, paramList);
}
exec-server
直接看servlet如何处理EXECUTE_ACTION
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
if (action.equals(ConnectorParams.EXECUTE_ACTION)) {
handleAjaxExecute(req, respMap, execid);
}
//FlowRunnerManager
private void handleAjaxExecute(final HttpServletRequest req,
final Map<String, Object> respMap, final int execId) {
try {
this.flowRunnerManager.submitFlow(execId);
} catch (final ExecutorManagerException e) {
logger.error(e.getMessage(), e);
respMap.put(ConnectorParams.RESPONSE_ERROR, e.getMessage());
}
}
//FlowRunnerManager
public void submitFlow(final int execId) throws ExecutorManagerException {
if (isAlreadyRunning(execId)) {
return;
}
//创建FlowRunner
final FlowRunner runner = createFlowRunner(execId);
// Check again.
if (isAlreadyRunning(execId)) {
return;
}
submitFlowRunner(runner);
}
//把FlowRunner提交给线程池TrackingThreadPool处理
private void submitFlowRunner(final FlowRunner runner) throws ExecutorManagerException {
this.runningFlows.put(runner.getExecutionId(), runner);
try {
// The executorService already has a queue.
// The submit method below actually returns an instance of FutureTask,
// which implements interface RunnableFuture, which extends both
// Runnable and Future interfaces
final Future<?> future = this.executorService.submit(runner);//提交给线程池
// keep track of this future
this.submittedFlows.put(future, runner.getExecutionId());
// update the last submitted time.
this.lastFlowSubmittedDate = System.currentTimeMillis();
} catch (final RejectedExecutionException re) {
this.runningFlows.remove(runner.getExecutionId());
final StringBuffer errorMsg = new StringBuffer(
"Azkaban executor can't execute any more flows. ");
if (this.executorService.isShutdown()) {
errorMsg.append("The executor is being shut down.");
}
throw new ExecutorManagerException(errorMsg.toString(), re);
}
}
由此看来FlowRunner必然是一个,看起run处理逻辑
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public void run() {
//先做一些初始化工作
setupFlowExecution();
this.flow.setStartTime(System.currentTimeMillis());
this.logger.info("Updating initial flow directory.");
updateFlow();
this.logger.info("Fetching job and shared properties.");
if (!FlowLoaderUtils.isAzkabanFlowVersion20(this.flow.getAzkabanFlowVersion())) {
loadAllProperties();
}
this.fireEventListeners(
Event.create(this, EventType.FLOW_STARTED, new EventData(this.getExecutableFlow())));
//执行任务的地方
runFlow();
//如果一场,修改状态为FAILED
this.flow.setStatus(Status.FAILED);
//更新下时间等
this.flow.setEndTime(System.currentTimeMillis());
this.logger.info("Setting end time for flow " + this.execId + " to "
+ System.currentTimeMillis());
closeLogger();
updateFlow();
}
看下READY状态的怎么处理,主要看是不是ExecutableFlowBase节点,如果是则找出startNode 依次执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
if (nextNodeStatus == Status.READY) {
if (node instanceof ExecutableFlowBase) {
final ExecutableFlowBase flow = ((ExecutableFlowBase) node);
this.logger.info("Running flow '" + flow.getNestedId() + "'.");
flow.setStatus(Status.RUNNING);
// don't overwrite start time of root flows
if (flow.getStartTime() <= 0) {
flow.setStartTime(System.currentTimeMillis());
}
prepareJobProperties(flow);
//依次执行所有node
for (final String startNodeId : ((ExecutableFlowBase) node).getStartNodes()) {
final ExecutableNode startNode = flow.getExecutableNode(startNodeId);
runReadyJob(startNode);
}
} else {
//执行Node
runExecutableNode(node);
}
}
runExecutableNode才是执行节点任务的
1
2
3
4
5
6
7
8
9
10
11
12
13
private void runExecutableNode(final ExecutableNode node) throws IOException {
//azkaban.execapp.JobRunner
final JobRunner runner = createJobRunner(node);
this.logger.info("Submitting job '" + node.getNestedId() + "' to run.");
try {
// Job starts to queue
runner.setTimeInQueue(System.currentTimeMillis());
this.executorService.submit(runner);//提交JobRunner给线程池
this.activeJobRunners.add(runner);
} catch (final RejectedExecutionException e) {
this.logger.error(e);
}
}
跟上边FlowRunner类似直接看run方法
1
2
3
4
5
6
7
8
9
10
public void run() {
try {
doRun();
} catch (final Exception e) {
serverLogger.error("Unexpected exception", e);
throw e;
} finally {
Thread.currentThread().setContextClassLoader(this.threadClassLoader);
}
}
doRun的核心是runJob()
1
2
3
4
5
//生成job
if (prepareJob()) {
//运行具体的job
runJob();
}
runJob里执行具体的任务,这里的job具体实现主要有HadoopHiveJob,HadoopSparkJob等
1
2
3
4
5
6
7
8
private Status runJob() {
try {
this.job.run();//这里的job具体实现主要有HadoopHiveJob,HadoopSparkJob等
finalStatus = this.node.getStatus();
} catch (final Throwable e) {
//
}
}