Flink Slot管理
JobMaster继承了RpcEndpoint,start的回调Onstart
1
2
3
4
5
6
7
8
9
10
protected void onStart() throws JobMasterException {
try {
startJobExecution();
} catch (Exception e) {
final JobMasterException jobMasterException =
new JobMasterException("Could not start the JobMaster.", e);
handleJobMasterError(jobMasterException);
throw jobMasterException;
}
}
JobMaster.startJobExecution
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private void startJobExecution() throws Exception {
validateRunsInMainThread();
//NettyShuffleMaster
JobShuffleContext context = new JobShuffleContextImpl(jobGraph.getJobID(), this);
shuffleMaster.registerJob(context);
//启动jobMaster服务,管理tm,rm心跳
startJobMasterServices();
log.info(
"Starting execution of job '{}' ({}) under job master id {}.",
jobGraph.getName(),
jobGraph.getJobID(),
getFencingToken());
startScheduling();
}
JobMaster.startJobMasterServices启动JobMaster服务
taskManagerHeartbeatManage与TM相关的心跳r管理等
resourceManagerHeartbeatManager与RM相关的心跳管理等
启动slotPoolService服务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private void startJobMasterServices() throws Exception {
try {
//TM
this.taskManagerHeartbeatManager = createTaskManagerHeartbeatManager(heartbeatServices);
this.resourceManagerHeartbeatManager =
createResourceManagerHeartbeatManager(heartbeatServices);
// start the slot pool make sure the slot pool now accepts messages for this leader
slotPoolService.start(getFencingToken(), getAddress(), getMainThreadExecutor());
// job is ready to go, try to establish connection with resource manager
// - activate leader retrieval for the resource manager
// - on notification of the leader, the connection will be established and
// the slot pool will start requesting slots
//Leader 服务
resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
} catch (Exception e) {
handleStartJobMasterServicesError(e);
}
}
ResourceManagerLeaderListener回调notifyLeaderAddress
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private class ResourceManagerLeaderListener implements LeaderRetrievalListener {
@Override
public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
//回调方法调用notifyOfNewResourceManagerLeader
runAsync(
() ->
notifyOfNewResourceManagerLeader(
leaderAddress,
ResourceManagerId.fromUuidOrNull(leaderSessionID)));
}
@Override
public void handleError(final Exception exception) {
handleJobMasterError(
new Exception("Fatal error in the ResourceManager leader service", exception));
}
}
JobMaster.notifyOfNewResourceManagerLeader
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private void notifyOfNewResourceManagerLeader(
final String newResourceManagerAddress, final ResourceManagerId resourceManagerId) {
resourceManagerAddress =
createResourceManagerAddress(newResourceManagerAddress, resourceManagerId);
reconnectToResourceManager(
new FlinkException(
String.format(
"ResourceManager leader changed to new address %s",
resourceManagerAddress)));
}
private void reconnectToResourceManager(Exception cause) {
closeResourceManagerConnection(cause);
tryConnectToResourceManager();
}
private void tryConnectToResourceManager() {
if (resourceManagerAddress != null) {
connectToResourceManager();
}
}
private void connectToResourceManager() {
assert (resourceManagerAddress != null);
assert (resourceManagerConnection == null);
assert (establishedResourceManagerConnection == null);
log.info("Connecting to ResourceManager {}", resourceManagerAddress);
resourceManagerConnection =
new ResourceManagerConnection(
log,
jobGraph.getJobID(),
resourceId,
getAddress(),
getFencingToken(),
resourceManagerAddress.getAddress(),
resourceManagerAddress.getResourceManagerId(),
futureExecutor);
resourceManagerConnection.start();
}
JobMaster.startScheduling
1
2
3
private void startScheduling() {
schedulerNG.startScheduling();
}
schedulerNG在中SchedulerBase
SchedulerBase.startScheduling
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public final void startScheduling() {
mainThreadExecutor.assertRunningInMainThread();
registerJobMetrics(
jobManagerJobMetricGroup,
executionGraph,
this::getNumberOfRestarts,
deploymentStateTimeMetrics,
executionGraph::registerJobStatusListener,
executionGraph.getStatusTimestamp(JobStatus.INITIALIZING),
jobStatusMetricsSettings);
operatorCoordinatorHandler.startAllOperatorCoordinators();
//DefaultScheduler
startSchedulingInternal();
}
DefaultScheduler.startSchedulingInternal();
1
2
3
4
5
6
7
8
protected void startSchedulingInternal() {
log.info(
"Starting scheduling with scheduling strategy [{}]",
schedulingStrategy.getClass().getName());
transitionToRunning();
//schedulingStrategy
schedulingStrategy.startScheduling();
}
PipelinedRegionSchedulingStrategy.startScheduling
1
2
3
4
5
6
7
public void startScheduling() {
final Set<SchedulingPipelinedRegion> sourceRegions =
IterableUtils.toStream(schedulingTopology.getAllPipelinedRegions())
.filter(this::isSourceRegion)
.collect(Collectors.toSet());
maybeScheduleRegions(sourceRegions);
}
PipelinedRegionSchedulingStrategy.startScheduling
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private void maybeScheduleRegions(final Set<SchedulingPipelinedRegion> regions) {
//DefaultExecutionTopology
final List<SchedulingPipelinedRegion> regionsSorted =
SchedulingStrategyUtils.sortPipelinedRegionsInTopologicalOrder(
schedulingTopology, regions);
final Map<ConsumedPartitionGroup, Boolean> consumableStatusCache = new HashMap<>();
for (SchedulingPipelinedRegion region : regionsSorted) {
maybeScheduleRegion(region, consumableStatusCache);
}
}
//对单个SchedulingPipelinedRegion调度
private void maybeScheduleRegion(
final SchedulingPipelinedRegion region,
final Map<ConsumedPartitionGroup, Boolean> consumableStatusCache) {
if (!areRegionInputsAllConsumable(region, consumableStatusCache)) {
return;
}
checkState(
areRegionVerticesAllInCreatedState(region),
"BUG: trying to schedule a region which is not in CREATED state");
final List<ExecutionVertexDeploymentOption> vertexDeploymentOptions =
SchedulingStrategyUtils.createExecutionVertexDeploymentOptions(
regionVerticesSorted.get(region), id -> deploymentOption);
//DefaultScheduler
schedulerOperations.allocateSlotsAndDeploy(vertexDeploymentOptions);
}
DefaultScheduler.allocateSlotsAndDeploy
大致可以分为两块,分配slots与想TM提交task
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void allocateSlotsAndDeploy(
final List<ExecutionVertexDeploymentOption> executionVertexDeploymentOptions) {
validateDeploymentOptions(executionVertexDeploymentOptions);
final Map<ExecutionVertexID, ExecutionVertexDeploymentOption> deploymentOptionsByVertex =
groupDeploymentOptionsByVertexId(executionVertexDeploymentOptions);
final List<ExecutionVertexID> verticesToDeploy =
executionVertexDeploymentOptions.stream()
.map(ExecutionVertexDeploymentOption::getExecutionVertexId)
.collect(Collectors.toList());
final Map<ExecutionVertexID, ExecutionVertexVersion> requiredVersionByVertex =
executionVertexVersioner.recordVertexModifications(verticesToDeploy);
transitionToScheduled(verticesToDeploy);
//分配slots
final List<SlotExecutionVertexAssignment> slotExecutionVertexAssignments =
allocateSlots(executionVertexDeploymentOptions);
final List<DeploymentHandle> deploymentHandles =
createDeploymentHandles(
requiredVersionByVertex,
deploymentOptionsByVertex,
slotExecutionVertexAssignments);
//调用submitTask提交task
waitForAllSlotsAndDeploy(deploymentHandles);
}
DefaultScheduler.allocateSlots
1
2
3
4
5
6
7
private List<SlotExecutionVertexAssignment> allocateSlots(
final List<ExecutionVertexDeploymentOption> executionVertexDeploymentOptions) {
return executionSlotAllocator.allocateSlotsFor(
executionVertexDeploymentOptions.stream()
.map(ExecutionVertexDeploymentOption::getExecutionVertexId)
.collect(Collectors.toList()));
}
SlotSharingExecutionSlotAllocator.allocateSlotsFor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public List<SlotExecutionVertexAssignment> allocateSlotsFor(
List<ExecutionVertexID> executionVertexIds) {
SharedSlotProfileRetriever sharedSlotProfileRetriever =
sharedSlotProfileRetrieverFactory.createFromBulk(new HashSet<>(executionVertexIds));
Map<ExecutionSlotSharingGroup, List<ExecutionVertexID>> executionsByGroup =
executionVertexIds.stream()
.collect(
Collectors.groupingBy(
slotSharingStrategy::getExecutionSlotSharingGroup));
Map<ExecutionSlotSharingGroup, SharedSlot> slots =
executionsByGroup.keySet().stream()
//SlotSharingExecutionSlotAllocator.getOrAllocateSharedSlot
.map(group -> getOrAllocateSharedSlot(group, sharedSlotProfileRetriever))
.collect(
Collectors.toMap(
SharedSlot::getExecutionSlotSharingGroup,
Function.identity()));
Map<ExecutionVertexID, SlotExecutionVertexAssignment> assignments =
allocateLogicalSlotsFromSharedSlots(slots, executionsByGroup);
// we need to pass the slots map to the createBulk method instead of using the allocator's
// 'sharedSlots'
// because if any physical slots have already failed, their shared slots have been removed
// from the allocator's 'sharedSlots' by failed logical slots.
SharingPhysicalSlotRequestBulk bulk = createBulk(slots, executionsByGroup);
bulkChecker.schedulePendingRequestBulkTimeoutCheck(bulk, allocationTimeout);
return executionVertexIds.stream().map(assignments::get).collect(Collectors.toList());
}
SlotSharingExecutionSlotAllocator.getOrAllocateSharedSlot
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private SharedSlot getOrAllocateSharedSlot(
ExecutionSlotSharingGroup executionSlotSharingGroup,
SharedSlotProfileRetriever sharedSlotProfileRetriever) {
return sharedSlots.computeIfAbsent(
executionSlotSharingGroup,
group -> {
SlotRequestId physicalSlotRequestId = new SlotRequestId();
ResourceProfile physicalSlotResourceProfile =
getPhysicalSlotResourceProfile(group);
SlotProfile slotProfile =
sharedSlotProfileRetriever.getSlotProfile(
group, physicalSlotResourceProfile);
PhysicalSlotRequest physicalSlotRequest =
new PhysicalSlotRequest(
physicalSlotRequestId,
slotProfile,
slotWillBeOccupiedIndefinitely);
CompletableFuture<PhysicalSlot> physicalSlotFuture =
slotProvider
.allocatePhysicalSlot(physicalSlotRequest)
.thenApply(PhysicalSlotRequest.Result::getPhysicalSlot);
return new SharedSlot(
physicalSlotRequestId,
physicalSlotResourceProfile,
group,
physicalSlotFuture,
slotWillBeOccupiedIndefinitely,
this::releaseSharedSlot);
});
}
PhysicalSlotProviderImpl.allocatePhysicalSlot
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public CompletableFuture<PhysicalSlotRequest.Result> allocatePhysicalSlot(
PhysicalSlotRequest physicalSlotRequest) {
SlotRequestId slotRequestId = physicalSlotRequest.getSlotRequestId();
SlotProfile slotProfile = physicalSlotRequest.getSlotProfile();
ResourceProfile resourceProfile = slotProfile.getPhysicalSlotResourceProfile();
LOG.debug(
"Received slot request [{}] with resource requirements: {}",
slotRequestId,
resourceProfile);
//尝试从分配
Optional<PhysicalSlot> availablePhysicalSlot =
tryAllocateFromAvailable(slotRequestId, slotProfile);
//请求新的slot
CompletableFuture<PhysicalSlot> slotFuture;
slotFuture =
availablePhysicalSlot
.map(CompletableFuture::completedFuture)
.orElseGet(
() ->
requestNewSlot(
slotRequestId,
resourceProfile,
slotProfile.getPreferredAllocations(),
physicalSlotRequest
.willSlotBeOccupiedIndefinitely()));
return slotFuture.thenApply(
physicalSlot -> new PhysicalSlotRequest.Result(slotRequestId, physicalSlot));
}
PhysicalSlotProviderImpl.requestNewSlot
1
2
3
4
5
6
7
8
9
10
11
12
13
private CompletableFuture<PhysicalSlot> requestNewSlot(
SlotRequestId slotRequestId,
ResourceProfile resourceProfile,
Collection<AllocationID> preferredAllocations,
boolean willSlotBeOccupiedIndefinitely) {
if (willSlotBeOccupiedIndefinitely) {
return slotPool.requestNewAllocatedSlot(
slotRequestId, resourceProfile, preferredAllocations, null);
} else {
return slotPool.requestNewAllocatedBatchSlot(
slotRequestId, resourceProfile, preferredAllocations);
}
}
DeclarativeSlotPoolBridge.requestNewAllocatedSlot
DeclarativeSlotPoolBridge.requestNewAllocatedBatchSlot
看requestNewAllocatedSlot
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(
@Nonnull SlotRequestId slotRequestId,
@Nonnull ResourceProfile resourceProfile,
@Nonnull Collection<AllocationID> preferredAllocations,
@Nullable Time timeout) {
assertRunningInMainThread();
log.debug(
"Request new allocated slot with slot request id {} and resource profile {}",
slotRequestId,
resourceProfile);
final PendingRequest pendingRequest =
PendingRequest.createNormalRequest(
slotRequestId, resourceProfile, preferredAllocations);
//调用
return internalRequestNewSlot(pendingRequest, timeout);
}
private CompletableFuture<PhysicalSlot> internalRequestNewSlot(
PendingRequest pendingRequest, @Nullable Time timeout) {
//这里处理pendingRequest返回
internalRequestNewAllocatedSlot(pendingRequest);
//没配置超时直接返回
if (timeout == null) {
return pendingRequest.getSlotFuture();
} else {
return FutureUtils.orTimeout(
pendingRequest.getSlotFuture(),
timeout.toMilliseconds(),
TimeUnit.MILLISECONDS,
componentMainThreadExecutor)
.whenComplete(
(physicalSlot, throwable) -> {
if (throwable instanceof TimeoutException) {
timeoutPendingSlotRequest(pendingRequest.getSlotRequestId());
}
});
}
}
DeclarativeSlotPoolBridge.internalRequestNewAllocatedSlot
1
2
3
4
5
6
7
private void internalRequestNewAllocatedSlot(PendingRequest pendingRequest) {
pendingRequests.put(pendingRequest.getSlotRequestId(), pendingRequest);
//DefaultDeclarativeSlotPool
getDeclarativeSlotPool()
.increaseResourceRequirementsBy(
ResourceCounter.withResource(pendingRequest.getResourceProfile(), 1));
}
DefaultDeclarativeSlotPool.increaseResourceRequirementsBy
DefaultDeclarativeSlotPool.declareResourceRequirements
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public void increaseResourceRequirementsBy(ResourceCounter increment) {
if (increment.isEmpty()) {
return;
}
totalResourceRequirements = totalResourceRequirements.add(increment);
declareResourceRequirements();
}
private void declareResourceRequirements() {
final Collection<ResourceRequirement> resourceRequirements = getResourceRequirements();
LOG.debug(
"Declare new resource requirements for job {}.{}\trequired resources: {}{}\tacquired resources: {}",
jobId,
System.lineSeparator(),
resourceRequirements,
System.lineSeparator(),
fulfilledResourceRequirements);
//这里的关键是notifyNewResourceRequirements
notifyNewResourceRequirements.accept(resourceRequirements);
}
notifyNewResourceRequirements的是在DeclarativeSlotPoolService实例化的时候创建的
declarativeSlotPoolFactory是DefaultDeclarativeSlotPoolFactory
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public DeclarativeSlotPoolService(
JobID jobId,
DeclarativeSlotPoolFactory declarativeSlotPoolFactory,
Clock clock,
Time idleSlotTimeout,
Time rpcTimeout) {
this.jobId = jobId;
this.clock = clock;
this.rpcTimeout = rpcTimeout;
this.registeredTaskManagers = new HashSet<>();
this.declarativeSlotPool =
declarativeSlotPoolFactory.create(
jobId, this::declareResourceRequirements, idleSlotTimeout, rpcTimeout);
}
declareResourceRequirements方法,内部其其实是封装的resourceRequirementServiceConnectionManager
1
2
3
4
5
6
private void declareResourceRequirements(Collection<ResourceRequirement> resourceRequirements) {
assertHasBeenStarted();
resourceRequirementServiceConnectionManager.declareResourceRequirements(
ResourceRequirements.create(jobId, jobManagerAddress, resourceRequirements));
}
DefaultDeclareResourceRequirementServiceConnectionManager.declareResourceRequirements
DefaultDeclareResourceRequirementServiceConnectionManager.triggerResourceRequirementsSubmission
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public void declareResourceRequirements(ResourceRequirements resourceRequirements) {
synchronized (lock) {
checkNotClosed();
if (isConnected()) {
currentResourceRequirements = resourceRequirements;
triggerResourceRequirementsSubmission(
Duration.ofMillis(1L),
Duration.ofMillis(10000L),
currentResourceRequirements);
}
}
}
private void triggerResourceRequirementsSubmission(
Duration sleepOnError,
Duration maxSleepOnError,
ResourceRequirements resourceRequirementsToSend) {
FutureUtils.retryWithDelay(
() -> sendResourceRequirements(resourceRequirementsToSend),
new ExponentialBackoffRetryStrategy(
Integer.MAX_VALUE, sleepOnError, maxSleepOnError),
throwable -> !(throwable instanceof CancellationException),
scheduledExecutor);
}
DefaultDeclareResourceRequirementServiceConnectionManager.sendResourceRequirements
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private CompletableFuture<Acknowledge> sendResourceRequirements(
ResourceRequirements resourceRequirementsToSend) {
synchronized (lock) {
if (isConnected()) {
if (resourceRequirementsToSend == currentResourceRequirements) {
return service.declareResourceRequirements(resourceRequirementsToSend);
} else {
LOG.debug("Newer resource requirements found. Stop sending old requirements.");
return FutureUtils.completedExceptionally(new CancellationException());
}
} else {
LOG.debug(
"Stop sending resource requirements to ResourceManager because it is not connected.");
return FutureUtils.completedExceptionally(new CancellationException());
}
}
}
这里的service注意下是怎么来的呢,需要看下resourceRequirementServiceConnectionManager.connect,也就是resourceRequirementServiceConnectionManager通过注册的
DeclarativeSlotPoolService.connectToResourceManager
1
2
3
4
5
6
7
8
9
10
public void connectToResourceManager(ResourceManagerGateway resourceManagerGateway) {
assertHasBeenStarted();
//先注册service,即resourceManagerGateway.declareRequiredResources
resourceRequirementServiceConnectionManager.connect(
resourceRequirements ->
resourceManagerGateway.declareRequiredResources(
jobMasterId, resourceRequirements, rpcTimeout));
declareResourceRequirements(declarativeSlotPool.getResourceRequirements());
}
resourceRequirementServiceConnectionManager是在DeclarativeSlotPoolService的start方法创建的
resourceRequirementServiceConnectionManager是DefaultDeclareResourceRequirementServiceConnectionManager
DefaultDeclareResourceRequirementServiceConnectionManager继承自AbstractServiceConnectionManager实现了connect方法
DeclarativeSlotPoolService.start()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public final void start(
JobMasterId jobMasterId, String address, ComponentMainThreadExecutor mainThreadExecutor)
throws Exception {
Preconditions.checkState(
state == State.CREATED, "The DeclarativeSlotPoolService can only be started once.");
this.jobMasterId = Preconditions.checkNotNull(jobMasterId);
this.jobManagerAddress = Preconditions.checkNotNull(address);
//创建resourceRequirementServiceConnectionManager实例
this.resourceRequirementServiceConnectionManager =
DefaultDeclareResourceRequirementServiceConnectionManager.create(
mainThreadExecutor);
onStart(mainThreadExecutor);
state = State.STARTED;
}
AbstractServiceConnectionManager.connect
1
2
3
4
5
6
public final void connect(S service) {
synchronized (lock) {
checkNotClosed();
this.service = service;
}
}
所以这里是调用的是resourceManagerGateway.declareRequiredResources,即ResourceManager,
是交给slotManager处理的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public CompletableFuture<Acknowledge> declareRequiredResources(
JobMasterId jobMasterId, ResourceRequirements resourceRequirements, Time timeout) {
final JobID jobId = resourceRequirements.getJobId();
final JobManagerRegistration jobManagerRegistration = jobManagerRegistrations.get(jobId);
if (null != jobManagerRegistration) {
if (Objects.equals(jobMasterId, jobManagerRegistration.getJobMasterId())) {
//
slotManager.processResourceRequirements(resourceRequirements);
return CompletableFuture.completedFuture(Acknowledge.get());
} else {
return FutureUtils.completedExceptionally(
new ResourceManagerException(
"The job leader's id "
+ jobManagerRegistration.getJobMasterId()
+ " does not match the received id "
+ jobMasterId
+ '.'));
}
} else {
return FutureUtils.completedExceptionally(
new ResourceManagerException(
"Could not find registered job manager for job " + jobId + '.'));
}
}
DeclarativeSlotManager.processResourceRequirements
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void processResourceRequirements(ResourceRequirements resourceRequirements) {
checkInit();
if (resourceRequirements.getResourceRequirements().isEmpty()
&& resourceTracker.isRequirementEmpty(resourceRequirements.getJobId())) {
return;
} else if (resourceRequirements.getResourceRequirements().isEmpty()) {
LOG.info("Clearing resource requirements of job {}", resourceRequirements.getJobId());
} else {
LOG.info(
"Received resource requirements from job {}: {}",
resourceRequirements.getJobId(),
resourceRequirements.getResourceRequirements());
}
if (!resourceRequirements.getResourceRequirements().isEmpty()) {
jobMasterTargetAddresses.put(
resourceRequirements.getJobId(), resourceRequirements.getTargetAddress());
}
resourceTracker.notifyResourceRequirements(
resourceRequirements.getJobId(), resourceRequirements.getResourceRequirements());
//主要逻辑
checkResourceRequirements();
}
DeclarativeSlotManager.checkResourceRequirements
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
private void checkResourceRequirements() {
final Map<JobID, Collection<ResourceRequirement>> missingResources =
resourceTracker.getMissingResources();
if (missingResources.isEmpty()) {
return;
}
final Map<JobID, ResourceCounter> unfulfilledRequirements = new LinkedHashMap<>();
for (Map.Entry<JobID, Collection<ResourceRequirement>> resourceRequirements :
missingResources.entrySet()) {
final JobID jobId = resourceRequirements.getKey();
//1.尝试从free slot分配
final ResourceCounter unfulfilledJobRequirements =
tryAllocateSlotsForJob(jobId, resourceRequirements.getValue());
if (!unfulfilledJobRequirements.isEmpty()) {
unfulfilledRequirements.put(jobId, unfulfilledJobRequirements);
}
}
if (unfulfilledRequirements.isEmpty()) {
return;
}
ResourceCounter pendingSlots =
ResourceCounter.withResources(
taskExecutorManager.getPendingTaskManagerSlots().stream()
.collect(
Collectors.groupingBy(
PendingTaskManagerSlot::getResourceProfile,
Collectors.summingInt(x -> 1))));
//2.尝试从pendingSlot分配,如果需要启动新的taskExecutor
for (Map.Entry<JobID, ResourceCounter> unfulfilledRequirement :
unfulfilledRequirements.entrySet()) {
pendingSlots =
tryFulfillRequirementsWithPendingSlots(
unfulfilledRequirement.getKey(),
unfulfilledRequirement.getValue().getResourcesWithCount(),
pendingSlots);
}
}
直接分配资源
DeclarativeSlotManager.tryAllocateSlotsForJob
DeclarativeSlotManager.internalTryAllocateSlots
DeclarativeSlotManager.allocateSlot
TaskExecutorGateway.requestSlot
TaskExecutor.requestSlot
TaskExecutor.allocateSlotForJob
TaskExecutor.offerSlotsToJobManager
TaskExecutor.internalOfferSlotsToJobManager
jobMasterGateway.offerSlots
JobMaster.offerSlots
DeclarativeSlotPoolService.offerSlots
DefaultDeclarativeSlotPool.offerSlots
启动taskExecutor流畅
DeclarativeSlotManager.tryFulfillRequirementsWithPendingSlots
DeclarativeSlotManager.tryAllocateWorkerAndReserveSlot
taskExecutorManager.allocateWorker
resourceActions.allocateResource
ResourceManager.startNewWorker
1
2
3
4
5
6
7
//application模式位于YarnApplicationClusterEntryPoint
//YarnResourceManagerFactory继承自ActiveResourceManagerFactory
//public class YarnResourceManagerFactory extends ActiveResourceManagerFactory<YarnWorkerNode> {
private YarnApplicationClusterEntryPoint(
final Configuration configuration, final PackagedProgram program) {
super(configuration, program, YarnResourceManagerFactory.getInstance());
}
ActiveResourceManager.requestNewWorker
resourceManagerDriver.requestResource
YarnResourceManagerDriver.requestResource
resourceManagerClient.addContainerRequest
AMRMClientAsync.CallbackHandler.onContainersAllocated
YarnResourceManagerDriver.onContainersOfPriorityAllocated
YarnResourceManagerDriver.startTaskExecutorInContainerAsync
YarnResourceManagerDriver.createTaskExecutorLaunchContext
nodeManagerClient.startContainerAsync
这里应该是交给yarn来调度了,而createTaskExecutorLaunchContext内部会封装启动命令
这里的mainClass是YarnTaskExecutorRunner
1
2
3
4
5
6
7
8
9
Utils.createTaskExecutorContext(
flinkConfig,
yarnConfig,
configuration,
taskManagerParameters,
taskManagerDynamicProperties,
currDir,
YarnTaskExecutorRunner.class,
log);
YarnTaskExecutorRunner的main入口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public static void main(String[] args) {
EnvironmentInformation.logEnvironmentInfo(LOG, "YARN TaskExecutor runner", args);
SignalHandler.register(LOG);
JvmShutdownSafeguard.installAsShutdownHook(LOG);
runTaskManagerSecurely(args);
}
private static void runTaskManagerSecurely(String[] args) {
Configuration configuration = null;
try {
LOG.debug("All environment variables: {}", ENV);
final String currDir = ENV.get(Environment.PWD.key());
LOG.info("Current working Directory: {}", currDir);
configuration = TaskManagerRunner.loadConfiguration(args);
setupAndModifyConfiguration(configuration, currDir, ENV);
} catch (Throwable t) {
LOG.error("YARN TaskManager initialization failed.", t);
System.exit(INIT_ERROR_EXIT_CODE);
}
TaskManagerRunner.runTaskManagerProcessSecurely(Preconditions.checkNotNull(configuration));
}
TaskManagerRunner.runTaskManagerProcessSecurely
TaskManagerRunner.runTaskManager
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
exitCode =
SecurityUtils.getInstalledContext()
.runSecured(() -> runTaskManager(configuration, pluginManager));
public static int runTaskManager(Configuration configuration, PluginManager pluginManager)
throws Exception {
final TaskManagerRunner taskManagerRunner;
try {
//创建并启动taskManagerRunner,
//注意这里的TaskManagerRunner::createTaskExecutorService
//跟后边taskExecutorServiceFactory.createTaskExecutor关联起来看
taskManagerRunner =
new TaskManagerRunner(
configuration,
pluginManager,
TaskManagerRunner::createTaskExecutorService);
taskManagerRunner.start();
} catch (Exception exception) {
throw new FlinkException("Failed to start the TaskManagerRunner.", exception);
}
try {
return taskManagerRunner.getTerminationFuture().get().getExitCode();
} catch (Throwable t) {
throw new FlinkException(
"Unexpected failure during runtime of TaskManagerRunner.",
ExceptionUtils.stripExecutionException(t));
}
}
TaskManagerRunner::createTaskExecutorService创建TaskExecutor,包装成TaskExecutorToServiceAdapter
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public static TaskExecutorService createTaskExecutorService(
Configuration configuration,
ResourceID resourceID,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
BlobCacheService blobCacheService,
boolean localCommunicationOnly,
ExternalResourceInfoProvider externalResourceInfoProvider,
WorkingDirectory workingDirectory,
FatalErrorHandler fatalErrorHandler)
throws Exception {
final TaskExecutor taskExecutor =
startTaskManager(
configuration,
resourceID,
rpcService,
highAvailabilityServices,
heartbeatServices,
metricRegistry,
blobCacheService,
localCommunicationOnly,
externalResourceInfoProvider,
workingDirectory,
fatalErrorHandler);
return TaskExecutorToServiceAdapter.createFor(taskExecutor);
}
TaskManagerRunner的start方法
taskExecutorService就是上边的TaskExecutorToServiceAdapter,就是包装的TaskExecutor
1
2
3
4
5
6
7
public void start() throws Exception {
synchronized (lock) {
startTaskManagerRunnerServices();
//
taskExecutorService.start();
}
}
TaskManagerRunner.startTaskManagerRunnerServices方法比较长,
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
private void startTaskManagerRunnerServices() throws Exception {
synchronized (lock) {
rpcSystem = RpcSystem.load(configuration);
this.executor =
Executors.newScheduledThreadPool(
Hardware.getNumberCPUCores(),
new ExecutorThreadFactory("taskmanager-future"));
highAvailabilityServices =
HighAvailabilityServicesUtils.createHighAvailabilityServices(
configuration,
executor,
AddressResolution.NO_ADDRESS_RESOLUTION,
rpcSystem,
this);
JMXService.startInstance(configuration.getString(JMXServerOptions.JMX_SERVER_PORT));
rpcService = createRpcService(configuration, highAvailabilityServices, rpcSystem);
this.resourceId =
getTaskManagerResourceID(
configuration, rpcService.getAddress(), rpcService.getPort());
this.workingDirectory =
ClusterEntrypointUtils.createTaskManagerWorkingDirectory(
configuration, resourceId);
LOG.info("Using working directory: {}", workingDirectory);
HeartbeatServices heartbeatServices =
HeartbeatServices.fromConfiguration(configuration);
metricRegistry =
new MetricRegistryImpl(
MetricRegistryConfiguration.fromConfiguration(
configuration,
rpcSystem.getMaximumMessageSizeInBytes(configuration)),
ReporterSetup.fromConfiguration(configuration, pluginManager));
final RpcService metricQueryServiceRpcService =
MetricUtils.startRemoteMetricsRpcService(
configuration,
rpcService.getAddress(),
configuration.getString(TaskManagerOptions.BIND_HOST),
rpcSystem);
metricRegistry.startQueryService(metricQueryServiceRpcService, resourceId.unwrap());
blobCacheService =
BlobUtils.createBlobCacheService(
configuration,
Reference.borrowed(workingDirectory.unwrap().getBlobStorageDirectory()),
highAvailabilityServices.createBlobStore(),
null);
final ExternalResourceInfoProvider externalResourceInfoProvider =
ExternalResourceUtils.createStaticExternalResourceInfoProviderFromConfig(
configuration, pluginManager);
taskExecutorService =
taskExecutorServiceFactory.createTaskExecutor(
this.configuration,
this.resourceId.unwrap(),
rpcService,
highAvailabilityServices,
heartbeatServices,
metricRegistry,
blobCacheService,
false,
externalResourceInfoProvider,
workingDirectory.unwrap(),
this);
handleUnexpectedTaskExecutorServiceTermination();
MemoryLogger.startIfConfigured(
LOG, configuration, terminationFuture.thenAccept(ignored -> {}));
}
}
taskExecutorService.start();
TaskExecutor也是一个RpcEndpoint,start()之后会回调它的onStart()方法;直接看TaskExecutor的Onstart就好了
1
2
3
4
5
6
7
8
9
10
11
12
13
public void onStart() throws Exception {
try {
startTaskExecutorServices();
} catch (Throwable t) {
final TaskManagerException exception =
new TaskManagerException(
String.format("Could not start the TaskExecutor %s", getAddress()), t);
onFatalError(exception);
throw exception;
}
startRegistrationTimeout();
}
TaskExecutor.onStart()–>TaskExecutor.startTaskExecutorServices
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private void startTaskExecutorServices() throws Exception {
try {
// start by connecting to the ResourceManager
resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
// tell the task slot table who's responsible for the task slot actions
taskSlotTable.start(new SlotActionsImpl(), getMainThreadExecutor());
// start the job leader service
jobLeaderService.start(
getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());
fileCache =
new FileCache(
taskManagerConfiguration.getTmpDirectories(),
blobCacheService.getPermanentBlobService());
} catch (Exception e) {
handleStartTaskExecutorServicesException(e);
}
}
ResourceManagerLeaderListener回调方法notifyOfNewResourceManagerLeader
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private final class ResourceManagerLeaderListener implements LeaderRetrievalListener {
@Override
public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
runAsync(
() ->
notifyOfNewResourceManagerLeader(
leaderAddress,
ResourceManagerId.fromUuidOrNull(leaderSessionID)));
}
@Override
public void handleError(Exception exception) {
onFatalError(exception);
}
}
reconnectToResourceManager()
tryConnectToResourceManager()
connectToResourceManager()
启动一个resourceManagerConnection(TaskExecutorToResourceManagerConnection)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private void connectToResourceManager() {
assert (resourceManagerAddress != null);
assert (establishedResourceManagerConnection == null);
assert (resourceManagerConnection == null);
log.info("Connecting to ResourceManager {}.", resourceManagerAddress);
final TaskExecutorRegistration taskExecutorRegistration =
new TaskExecutorRegistration(
getAddress(),
getResourceID(),
unresolvedTaskManagerLocation.getDataPort(),
hardwareDescription,
taskManagerConfiguration.getDefaultSlotResourceProfile(),
taskManagerConfiguration.getTotalResourceProfile());
resourceManagerConnection =
new TaskExecutorToResourceManagerConnection(
log,
getRpcService(),
taskManagerConfiguration.getRetryingRegistrationConfiguration(),
resourceManagerAddress.getAddress(),
resourceManagerAddress.getResourceManagerId(),
getMainThreadExecutor(),
new ResourceManagerRegistrationListener(),
taskExecutorRegistration);
//启动TaskExecutorToResourceManagerConnection
resourceManagerConnection.start();
}
resourceManagerConnection启动后向RM注册
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void start() {
checkState(!closed, "The RPC connection is already closed");
checkState(
!isConnected() && pendingRegistration == null,
"The RPC connection is already started");
final RetryingRegistration<F, G, S, R> newRegistration = createNewRegistration();
if (REGISTRATION_UPDATER.compareAndSet(this, null, newRegistration)) {
//向RM注册
newRegistration.startRegistration();
} else {
// concurrent start operation
newRegistration.cancel();
}
}
rpc调用RM注册TaskExecutor
RetryingRegistration.startRegistration()
RetryingRegistration.register()
ResourceManagerRegistration.invokeRegistration()
1
2
3
4
5
6
7
8
9
10
protected CompletableFuture<RegistrationResponse> invokeRegistration(
ResourceManagerGateway resourceManager,
ResourceManagerId fencingToken,
long timeoutMillis)
throws Exception {
Time timeout = Time.milliseconds(timeoutMillis);
//rpc调用RM注册TaskExecutor
return resourceManager.registerTaskExecutor(taskExecutorRegistration, timeout);
}
如何生成RetryingRegistration呢
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
private RetryingRegistration<F, G, S, R> createNewRegistration() {
RetryingRegistration<F, G, S, R> newRegistration = checkNotNull(generateRegistration());
CompletableFuture<RetryingRegistration.RetryingRegistrationResult<G, S, R>> future =
newRegistration.getFuture();
//虽然generateRegistration看起来啥都没干,但是包装了一个CompleteFuture,需要关注该Future后续处理
//尤其是onRegistrationSuccess
future.whenCompleteAsync(
(RetryingRegistration.RetryingRegistrationResult<G, S, R> result,
Throwable failure) -> {
if (failure != null) {
if (failure instanceof CancellationException) {
// we ignore cancellation exceptions because they originate from
// cancelling
// the RetryingRegistration
log.debug(
"Retrying registration towards {} was cancelled.",
targetAddress);
} else {
// this future should only ever fail if there is a bug, not if the
// registration is declined
onRegistrationFailure(failure);
}
} else {
if (result.isSuccess()) {
targetGateway = result.getGateway();
//ResourceManagerConnection
onRegistrationSuccess(result.getSuccess());
} else if (result.isRejection()) {
onRegistrationRejection(result.getRejection());
} else {
throw new IllegalArgumentException(
String.format(
"Unknown retrying registration response: %s.", result));
}
}
},
executor);
return newRegistration;
}
JobMaster.ResourceManagerConnection
跟task注册一样调用invokeRegistration注册
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
protected RetryingRegistration<
ResourceManagerId,
ResourceManagerGateway,
JobMasterRegistrationSuccess,
RegistrationResponse.Rejection>
generateRegistration() {
return new RetryingRegistration<
ResourceManagerId,
ResourceManagerGateway,
JobMasterRegistrationSuccess,
RegistrationResponse.Rejection>(
log,
getRpcService(),
"ResourceManager",
ResourceManagerGateway.class,
getTargetAddress(),
getTargetLeaderId(),
jobMasterConfiguration.getRetryingRegistrationConfiguration()) {
@Override
protected CompletableFuture<RegistrationResponse> invokeRegistration(
ResourceManagerGateway gateway,
ResourceManagerId fencingToken,
long timeoutMillis) {
Time timeout = Time.milliseconds(timeoutMillis);
//注册JobManager
return gateway.registerJobManager(
jobMasterId,
jobManagerResourceID,
jobManagerRpcAddress,
jobID,
timeout);
}
};
}
//注册成功回调
protected void onRegistrationSuccess(final JobMasterRegistrationSuccess success) {
runAsync(
() -> {
// filter out outdated connections
//noinspection ObjectEquality
if (this == resourceManagerConnection) {
establishResourceManagerConnection(success);
}
});
}
jobMaster通过slotPool向RM申请slot
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private void establishResourceManagerConnection(final JobMasterRegistrationSuccess success) {
final ResourceManagerId resourceManagerId = success.getResourceManagerId();
// verify the response with current connection
if (resourceManagerConnection != null
&& Objects.equals(
resourceManagerConnection.getTargetLeaderId(), resourceManagerId)) {
log.info(
"JobManager successfully registered at ResourceManager, leader id: {}.",
resourceManagerId);
final ResourceManagerGateway resourceManagerGateway =
resourceManagerConnection.getTargetGateway();
final ResourceID resourceManagerResourceId = success.getResourceManagerResourceId();
establishedResourceManagerConnection =
new EstablishedResourceManagerConnection(
resourceManagerGateway, resourceManagerResourceId);
//通过slotPool向RM申请slot
slotPool.connectToResourceManager(resourceManagerGateway);
resourceManagerHeartbeatManager.monitorTarget(
resourceManagerResourceId,
new HeartbeatTarget<Void>() {
@Override
public void receiveHeartbeat(ResourceID resourceID, Void payload) {
resourceManagerGateway.heartbeatFromJobManager(resourceID);
}
@Override
public void requestHeartbeat(ResourceID resourceID, Void payload) {
// request heartbeat will never be called on the job manager side
}
});
} else {
log.debug(
"Ignoring resource manager connection to {} because it's duplicated or outdated.",
resourceManagerId);
}
}