Home Flink1.15 SLOT源码分析
Post
Cancel

Flink1.15 SLOT源码分析

Flink Slot JobMaster继承了RpcEndpoint,start的回调Onstart

1
2
3
4
5
6
7
8
9
10
protected void onStart() throws JobMasterException {
    try {
        startJobExecution();
    } catch (Exception e) {
        final JobMasterException jobMasterException =
        new JobMasterException("Could not start the JobMaster.", e);
        handleJobMasterError(jobMasterException);
        throw jobMasterException;
    }
}

JobMaster.startJobExecution

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private void startJobExecution() throws Exception {
    validateRunsInMainThread();
    //NettyShuffleMaster
    JobShuffleContext context = new JobShuffleContextImpl(jobGraph.getJobID(), this);
    shuffleMaster.registerJob(context);
    //启动jobMaster服务,管理tm,rm心跳
    startJobMasterServices();

    log.info(
        "Starting execution of job '{}' ({}) under job master id {}.",
        jobGraph.getName(),
        jobGraph.getJobID(),
        getFencingToken());

    startScheduling();
}

JobMaster.startJobMasterServices启动JobMaster服务
taskManagerHeartbeatManage与TM相关的心跳r管理等
resourceManagerHeartbeatManager与RM相关的心跳管理等
启动slotPoolService服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private void startJobMasterServices() throws Exception {
    try {
        //TM
        this.taskManagerHeartbeatManager = createTaskManagerHeartbeatManager(heartbeatServices);
        this.resourceManagerHeartbeatManager =
        createResourceManagerHeartbeatManager(heartbeatServices);

        // start the slot pool make sure the slot pool now accepts messages for this leader
        slotPoolService.start(getFencingToken(), getAddress(), getMainThreadExecutor());

        // job is ready to go, try to establish connection with resource manager
        //   - activate leader retrieval for the resource manager
        //   - on notification of the leader, the connection will be established and
        //     the slot pool will start requesting slots
        //Leader 服务
        resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
    } catch (Exception e) {
        handleStartJobMasterServicesError(e);
    }
}

ResourceManagerLeaderListener回调notifyLeaderAddress

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private class ResourceManagerLeaderListener implements LeaderRetrievalListener {

    @Override
    public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
        //回调方法调用notifyOfNewResourceManagerLeader
        runAsync(
            () ->
            notifyOfNewResourceManagerLeader(
                leaderAddress,
                ResourceManagerId.fromUuidOrNull(leaderSessionID)));
    }

    @Override
    public void handleError(final Exception exception) {
        handleJobMasterError(
            new Exception("Fatal error in the ResourceManager leader service", exception));
    }
}

JobMaster.notifyOfNewResourceManagerLeader

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private void notifyOfNewResourceManagerLeader(
    final String newResourceManagerAddress, final ResourceManagerId resourceManagerId) {
    resourceManagerAddress =
    createResourceManagerAddress(newResourceManagerAddress, resourceManagerId);

    reconnectToResourceManager(
        new FlinkException(
            String.format(
                "ResourceManager leader changed to new address %s",
                resourceManagerAddress)));
}
private void reconnectToResourceManager(Exception cause) {
    closeResourceManagerConnection(cause);
    tryConnectToResourceManager();
}

private void tryConnectToResourceManager() {
    if (resourceManagerAddress != null) {
        connectToResourceManager();
    }
}

private void connectToResourceManager() {
    assert (resourceManagerAddress != null);
    assert (resourceManagerConnection == null);
    assert (establishedResourceManagerConnection == null);

    log.info("Connecting to ResourceManager {}", resourceManagerAddress);
    
    resourceManagerConnection =
    new ResourceManagerConnection(
        log,
        jobGraph.getJobID(),
        resourceId,
        getAddress(),
        getFencingToken(),
        resourceManagerAddress.getAddress(),
        resourceManagerAddress.getResourceManagerId(),
        futureExecutor);

    resourceManagerConnection.start();
}

JobMaster.startScheduling

1
2
3
private void startScheduling() {
    schedulerNG.startScheduling();
}

schedulerNG在中SchedulerBase
SchedulerBase.startScheduling

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public final void startScheduling() {
    mainThreadExecutor.assertRunningInMainThread();
    registerJobMetrics(
        jobManagerJobMetricGroup,
        executionGraph,
        this::getNumberOfRestarts,
        deploymentStateTimeMetrics,
        executionGraph::registerJobStatusListener,
        executionGraph.getStatusTimestamp(JobStatus.INITIALIZING),
        jobStatusMetricsSettings);
    operatorCoordinatorHandler.startAllOperatorCoordinators();
    //DefaultScheduler
    startSchedulingInternal();
}

DefaultScheduler.startSchedulingInternal();

1
2
3
4
5
6
7
8
protected void startSchedulingInternal() {
    log.info(
        "Starting scheduling with scheduling strategy [{}]",
        schedulingStrategy.getClass().getName());
    transitionToRunning();
    //schedulingStrategy
    schedulingStrategy.startScheduling();
}

PipelinedRegionSchedulingStrategy.startScheduling

1
2
3
4
5
6
7
public void startScheduling() {
    final Set<SchedulingPipelinedRegion> sourceRegions =
    IterableUtils.toStream(schedulingTopology.getAllPipelinedRegions())
    .filter(this::isSourceRegion)
    .collect(Collectors.toSet());
    maybeScheduleRegions(sourceRegions);
}

PipelinedRegionSchedulingStrategy.startScheduling

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private void maybeScheduleRegions(final Set<SchedulingPipelinedRegion> regions) {
    //DefaultExecutionTopology
    final List<SchedulingPipelinedRegion> regionsSorted =
    SchedulingStrategyUtils.sortPipelinedRegionsInTopologicalOrder(
        schedulingTopology, regions);

    final Map<ConsumedPartitionGroup, Boolean> consumableStatusCache = new HashMap<>();
    for (SchedulingPipelinedRegion region : regionsSorted) {
        maybeScheduleRegion(region, consumableStatusCache);
    }
}
//对单个SchedulingPipelinedRegion调度
private void maybeScheduleRegion(
    final SchedulingPipelinedRegion region,
    final Map<ConsumedPartitionGroup, Boolean> consumableStatusCache) {
    if (!areRegionInputsAllConsumable(region, consumableStatusCache)) {
        return;
    }

    checkState(
        areRegionVerticesAllInCreatedState(region),
        "BUG: trying to schedule a region which is not in CREATED state");

    final List<ExecutionVertexDeploymentOption> vertexDeploymentOptions =
    SchedulingStrategyUtils.createExecutionVertexDeploymentOptions(
        regionVerticesSorted.get(region), id -> deploymentOption);
    //DefaultScheduler
    schedulerOperations.allocateSlotsAndDeploy(vertexDeploymentOptions);
}

DefaultScheduler.allocateSlotsAndDeploy
大致可以分为两块,分配slots与想TM提交task

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void allocateSlotsAndDeploy(
    final List<ExecutionVertexDeploymentOption> executionVertexDeploymentOptions) {
    validateDeploymentOptions(executionVertexDeploymentOptions);

    final Map<ExecutionVertexID, ExecutionVertexDeploymentOption> deploymentOptionsByVertex =
    groupDeploymentOptionsByVertexId(executionVertexDeploymentOptions);

    final List<ExecutionVertexID> verticesToDeploy =
    executionVertexDeploymentOptions.stream()
    .map(ExecutionVertexDeploymentOption::getExecutionVertexId)
    .collect(Collectors.toList());

    final Map<ExecutionVertexID, ExecutionVertexVersion> requiredVersionByVertex =
    executionVertexVersioner.recordVertexModifications(verticesToDeploy);

    transitionToScheduled(verticesToDeploy);
    //分配slots
    final List<SlotExecutionVertexAssignment> slotExecutionVertexAssignments =
    allocateSlots(executionVertexDeploymentOptions);

    final List<DeploymentHandle> deploymentHandles =
    createDeploymentHandles(
        requiredVersionByVertex,
        deploymentOptionsByVertex,
        slotExecutionVertexAssignments);
    //调用submitTask提交task
    waitForAllSlotsAndDeploy(deploymentHandles);
}

DefaultScheduler.allocateSlots

1
2
3
4
5
6
7
private List<SlotExecutionVertexAssignment> allocateSlots(
    final List<ExecutionVertexDeploymentOption> executionVertexDeploymentOptions) {
    return executionSlotAllocator.allocateSlotsFor(
        executionVertexDeploymentOptions.stream()
        .map(ExecutionVertexDeploymentOption::getExecutionVertexId)
        .collect(Collectors.toList()));
}

SlotSharingExecutionSlotAllocator.allocateSlotsFor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public List<SlotExecutionVertexAssignment> allocateSlotsFor(
    List<ExecutionVertexID> executionVertexIds) {

    SharedSlotProfileRetriever sharedSlotProfileRetriever =
    sharedSlotProfileRetrieverFactory.createFromBulk(new HashSet<>(executionVertexIds));
    Map<ExecutionSlotSharingGroup, List<ExecutionVertexID>> executionsByGroup =
    executionVertexIds.stream()
    .collect(
        Collectors.groupingBy(
            slotSharingStrategy::getExecutionSlotSharingGroup));
    Map<ExecutionSlotSharingGroup, SharedSlot> slots =
    executionsByGroup.keySet().stream()
    //SlotSharingExecutionSlotAllocator.getOrAllocateSharedSlot
    .map(group -> getOrAllocateSharedSlot(group, sharedSlotProfileRetriever))
    .collect(
        Collectors.toMap(
            SharedSlot::getExecutionSlotSharingGroup,
            Function.identity()));
    Map<ExecutionVertexID, SlotExecutionVertexAssignment> assignments =
    allocateLogicalSlotsFromSharedSlots(slots, executionsByGroup);

    // we need to pass the slots map to the createBulk method instead of using the allocator's
    // 'sharedSlots'
    // because if any physical slots have already failed, their shared slots have been removed
    // from the allocator's 'sharedSlots' by failed logical slots.
    SharingPhysicalSlotRequestBulk bulk = createBulk(slots, executionsByGroup);
    bulkChecker.schedulePendingRequestBulkTimeoutCheck(bulk, allocationTimeout);

    return executionVertexIds.stream().map(assignments::get).collect(Collectors.toList());
}

SlotSharingExecutionSlotAllocator.getOrAllocateSharedSlot

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private SharedSlot getOrAllocateSharedSlot(
    ExecutionSlotSharingGroup executionSlotSharingGroup,
    SharedSlotProfileRetriever sharedSlotProfileRetriever) {
    return sharedSlots.computeIfAbsent(
        executionSlotSharingGroup,
        group -> {
            SlotRequestId physicalSlotRequestId = new SlotRequestId();
            ResourceProfile physicalSlotResourceProfile =
            getPhysicalSlotResourceProfile(group);
            SlotProfile slotProfile =
            sharedSlotProfileRetriever.getSlotProfile(
                group, physicalSlotResourceProfile);
            PhysicalSlotRequest physicalSlotRequest =
            new PhysicalSlotRequest(
                physicalSlotRequestId,
                slotProfile,
                slotWillBeOccupiedIndefinitely);
            CompletableFuture<PhysicalSlot> physicalSlotFuture =
            slotProvider
            .allocatePhysicalSlot(physicalSlotRequest)
            .thenApply(PhysicalSlotRequest.Result::getPhysicalSlot);
            return new SharedSlot(
                physicalSlotRequestId,
                physicalSlotResourceProfile,
                group,
                physicalSlotFuture,
                slotWillBeOccupiedIndefinitely,
                this::releaseSharedSlot);
        });
}

PhysicalSlotProviderImpl.allocatePhysicalSlot

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public CompletableFuture<PhysicalSlotRequest.Result> allocatePhysicalSlot(
    PhysicalSlotRequest physicalSlotRequest) {
    SlotRequestId slotRequestId = physicalSlotRequest.getSlotRequestId();
    SlotProfile slotProfile = physicalSlotRequest.getSlotProfile();
    ResourceProfile resourceProfile = slotProfile.getPhysicalSlotResourceProfile();

    LOG.debug(
        "Received slot request [{}] with resource requirements: {}",
        slotRequestId,
        resourceProfile);
    //尝试从分配
    Optional<PhysicalSlot> availablePhysicalSlot =
    tryAllocateFromAvailable(slotRequestId, slotProfile);
    //请求新的slot
    CompletableFuture<PhysicalSlot> slotFuture;
    slotFuture =
    availablePhysicalSlot
    .map(CompletableFuture::completedFuture)
    .orElseGet(
        () ->
        requestNewSlot(
            slotRequestId,
            resourceProfile,
            slotProfile.getPreferredAllocations(),
            physicalSlotRequest
            .willSlotBeOccupiedIndefinitely()));

    return slotFuture.thenApply(
        physicalSlot -> new PhysicalSlotRequest.Result(slotRequestId, physicalSlot));
}

PhysicalSlotProviderImpl.requestNewSlot

1
2
3
4
5
6
7
8
9
10
11
12
13
private CompletableFuture<PhysicalSlot> requestNewSlot(
    SlotRequestId slotRequestId,
    ResourceProfile resourceProfile,
    Collection<AllocationID> preferredAllocations,
    boolean willSlotBeOccupiedIndefinitely) {
    if (willSlotBeOccupiedIndefinitely) {
        return slotPool.requestNewAllocatedSlot(
            slotRequestId, resourceProfile, preferredAllocations, null);
    } else {
        return slotPool.requestNewAllocatedBatchSlot(
            slotRequestId, resourceProfile, preferredAllocations);
    }
}

DeclarativeSlotPoolBridge.requestNewAllocatedSlot
DeclarativeSlotPoolBridge.requestNewAllocatedBatchSlot
看requestNewAllocatedSlot

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(
    @Nonnull SlotRequestId slotRequestId,
    @Nonnull ResourceProfile resourceProfile,
    @Nonnull Collection<AllocationID> preferredAllocations,
    @Nullable Time timeout) {
    assertRunningInMainThread();

    log.debug(
        "Request new allocated slot with slot request id {} and resource profile {}",
        slotRequestId,
        resourceProfile);

    final PendingRequest pendingRequest =
    PendingRequest.createNormalRequest(
        slotRequestId, resourceProfile, preferredAllocations);
    //调用
    return internalRequestNewSlot(pendingRequest, timeout);
}
private CompletableFuture<PhysicalSlot> internalRequestNewSlot(
    PendingRequest pendingRequest, @Nullable Time timeout) {
    //这里处理pendingRequest返回
    internalRequestNewAllocatedSlot(pendingRequest);
    //没配置超时直接返回
    if (timeout == null) {
        return pendingRequest.getSlotFuture();
    } else {
        return FutureUtils.orTimeout(
            pendingRequest.getSlotFuture(),
            timeout.toMilliseconds(),
            TimeUnit.MILLISECONDS,
            componentMainThreadExecutor)
        .whenComplete(
            (physicalSlot, throwable) -> {
                if (throwable instanceof TimeoutException) {
                    timeoutPendingSlotRequest(pendingRequest.getSlotRequestId());
                }
            });
    }
}

DeclarativeSlotPoolBridge.internalRequestNewAllocatedSlot

1
2
3
4
5
6
7
private void internalRequestNewAllocatedSlot(PendingRequest pendingRequest) {
    pendingRequests.put(pendingRequest.getSlotRequestId(), pendingRequest);
    //DefaultDeclarativeSlotPool
    getDeclarativeSlotPool()
    .increaseResourceRequirementsBy(
        ResourceCounter.withResource(pendingRequest.getResourceProfile(), 1));
}

DefaultDeclarativeSlotPool.increaseResourceRequirementsBy
DefaultDeclarativeSlotPool.declareResourceRequirements

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public void increaseResourceRequirementsBy(ResourceCounter increment) {
    if (increment.isEmpty()) {
        return;
    }
    totalResourceRequirements = totalResourceRequirements.add(increment);

    declareResourceRequirements();
}
private void declareResourceRequirements() {
    final Collection<ResourceRequirement> resourceRequirements = getResourceRequirements();

    LOG.debug(
        "Declare new resource requirements for job {}.{}\trequired resources: {}{}\tacquired resources: {}",
        jobId,
        System.lineSeparator(),
        resourceRequirements,
        System.lineSeparator(),
        fulfilledResourceRequirements);
    //这里的关键是notifyNewResourceRequirements
    notifyNewResourceRequirements.accept(resourceRequirements);
}

notifyNewResourceRequirements的是在DeclarativeSlotPoolService实例化的时候创建的
declarativeSlotPoolFactory是DefaultDeclarativeSlotPoolFactory

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public DeclarativeSlotPoolService(
    JobID jobId,
    DeclarativeSlotPoolFactory declarativeSlotPoolFactory,
    Clock clock,
    Time idleSlotTimeout,
    Time rpcTimeout) {
    this.jobId = jobId;
    this.clock = clock;
    this.rpcTimeout = rpcTimeout;
    this.registeredTaskManagers = new HashSet<>();

    this.declarativeSlotPool =
    declarativeSlotPoolFactory.create(
        jobId, this::declareResourceRequirements, idleSlotTimeout, rpcTimeout);
}

declareResourceRequirements方法,内部其其实是封装的resourceRequirementServiceConnectionManager

1
2
3
4
5
6
private void declareResourceRequirements(Collection<ResourceRequirement> resourceRequirements) {
    assertHasBeenStarted();

    resourceRequirementServiceConnectionManager.declareResourceRequirements(
        ResourceRequirements.create(jobId, jobManagerAddress, resourceRequirements));
}

DefaultDeclareResourceRequirementServiceConnectionManager.declareResourceRequirements
DefaultDeclareResourceRequirementServiceConnectionManager.triggerResourceRequirementsSubmission

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public void declareResourceRequirements(ResourceRequirements resourceRequirements) {
    synchronized (lock) {
        checkNotClosed();
        if (isConnected()) {
            currentResourceRequirements = resourceRequirements;

            triggerResourceRequirementsSubmission(
                Duration.ofMillis(1L),
                Duration.ofMillis(10000L),
                currentResourceRequirements);
        }
    }
}

private void triggerResourceRequirementsSubmission(
    Duration sleepOnError,
    Duration maxSleepOnError,
    ResourceRequirements resourceRequirementsToSend) {

    FutureUtils.retryWithDelay(
        () -> sendResourceRequirements(resourceRequirementsToSend),
        new ExponentialBackoffRetryStrategy(
            Integer.MAX_VALUE, sleepOnError, maxSleepOnError),
        throwable -> !(throwable instanceof CancellationException),
        scheduledExecutor);
}

DefaultDeclareResourceRequirementServiceConnectionManager.sendResourceRequirements

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private CompletableFuture<Acknowledge> sendResourceRequirements(
    ResourceRequirements resourceRequirementsToSend) {
    synchronized (lock) {
        if (isConnected()) {
            if (resourceRequirementsToSend == currentResourceRequirements) {
                return service.declareResourceRequirements(resourceRequirementsToSend);
            } else {
                LOG.debug("Newer resource requirements found. Stop sending old requirements.");
                return FutureUtils.completedExceptionally(new CancellationException());
            }
        } else {
            LOG.debug(
                "Stop sending resource requirements to ResourceManager because it is not connected.");
            return FutureUtils.completedExceptionally(new CancellationException());
        }
    }
}

这里的service注意下是怎么来的呢,需要看下resourceRequirementServiceConnectionManager.connect,也就是resourceRequirementServiceConnectionManager通过注册的
DeclarativeSlotPoolService.connectToResourceManager

1
2
3
4
5
6
7
8
9
10
public void connectToResourceManager(ResourceManagerGateway resourceManagerGateway) {
    assertHasBeenStarted();
    //先注册service,即resourceManagerGateway.declareRequiredResources
    resourceRequirementServiceConnectionManager.connect(
        resourceRequirements ->
        resourceManagerGateway.declareRequiredResources(
            jobMasterId, resourceRequirements, rpcTimeout));

    declareResourceRequirements(declarativeSlotPool.getResourceRequirements());
}

resourceRequirementServiceConnectionManager是在DeclarativeSlotPoolService的start方法创建的
resourceRequirementServiceConnectionManager是DefaultDeclareResourceRequirementServiceConnectionManager
DefaultDeclareResourceRequirementServiceConnectionManager继承自AbstractServiceConnectionManager实现了connect方法
DeclarativeSlotPoolService.start()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public final void start(
    JobMasterId jobMasterId, String address, ComponentMainThreadExecutor mainThreadExecutor)
throws Exception {
    Preconditions.checkState(
        state == State.CREATED, "The DeclarativeSlotPoolService can only be started once.");

    this.jobMasterId = Preconditions.checkNotNull(jobMasterId);
    this.jobManagerAddress = Preconditions.checkNotNull(address);
    //创建resourceRequirementServiceConnectionManager实例
    this.resourceRequirementServiceConnectionManager =
    DefaultDeclareResourceRequirementServiceConnectionManager.create(
        mainThreadExecutor);

    onStart(mainThreadExecutor);

    state = State.STARTED;
}

AbstractServiceConnectionManager.connect

1
2
3
4
5
6
public final void connect(S service) {
    synchronized (lock) {
        checkNotClosed();
        this.service = service;
    }
}

所以这里是调用的是resourceManagerGateway.declareRequiredResources,即ResourceManager,
是交给slotManager处理的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public CompletableFuture<Acknowledge> declareRequiredResources(
    JobMasterId jobMasterId, ResourceRequirements resourceRequirements, Time timeout) {
    final JobID jobId = resourceRequirements.getJobId();
    final JobManagerRegistration jobManagerRegistration = jobManagerRegistrations.get(jobId);

    if (null != jobManagerRegistration) {
        if (Objects.equals(jobMasterId, jobManagerRegistration.getJobMasterId())) {
            //
            slotManager.processResourceRequirements(resourceRequirements);

            return CompletableFuture.completedFuture(Acknowledge.get());
        } else {
            return FutureUtils.completedExceptionally(
                new ResourceManagerException(
                    "The job leader's id "
                    + jobManagerRegistration.getJobMasterId()
                    + " does not match the received id "
                    + jobMasterId
                    + '.'));
        }
    } else {
        return FutureUtils.completedExceptionally(
            new ResourceManagerException(
                "Could not find registered job manager for job " + jobId + '.'));
    }
}

DeclarativeSlotManager.processResourceRequirements

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void processResourceRequirements(ResourceRequirements resourceRequirements) {
    checkInit();
    if (resourceRequirements.getResourceRequirements().isEmpty()
        && resourceTracker.isRequirementEmpty(resourceRequirements.getJobId())) {
        return;
    } else if (resourceRequirements.getResourceRequirements().isEmpty()) {
        LOG.info("Clearing resource requirements of job {}", resourceRequirements.getJobId());
    } else {
        LOG.info(
            "Received resource requirements from job {}: {}",
            resourceRequirements.getJobId(),
            resourceRequirements.getResourceRequirements());
    }

    if (!resourceRequirements.getResourceRequirements().isEmpty()) {
        jobMasterTargetAddresses.put(
            resourceRequirements.getJobId(), resourceRequirements.getTargetAddress());
    }
    resourceTracker.notifyResourceRequirements(
        resourceRequirements.getJobId(), resourceRequirements.getResourceRequirements());
   //主要逻辑
    checkResourceRequirements();
}

DeclarativeSlotManager.checkResourceRequirements

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
private void checkResourceRequirements() {
    final Map<JobID, Collection<ResourceRequirement>> missingResources =
    resourceTracker.getMissingResources();
    if (missingResources.isEmpty()) {
        return;
    }

    final Map<JobID, ResourceCounter> unfulfilledRequirements = new LinkedHashMap<>();
    for (Map.Entry<JobID, Collection<ResourceRequirement>> resourceRequirements :
         missingResources.entrySet()) {
        final JobID jobId = resourceRequirements.getKey();
        //1.尝试从free slot分配
        final ResourceCounter unfulfilledJobRequirements =
        tryAllocateSlotsForJob(jobId, resourceRequirements.getValue());
        if (!unfulfilledJobRequirements.isEmpty()) {
            unfulfilledRequirements.put(jobId, unfulfilledJobRequirements);
        }
    }
    if (unfulfilledRequirements.isEmpty()) {
        return;
    }

    ResourceCounter pendingSlots =
    ResourceCounter.withResources(
        taskExecutorManager.getPendingTaskManagerSlots().stream()
        .collect(
            Collectors.groupingBy(
                PendingTaskManagerSlot::getResourceProfile,
                Collectors.summingInt(x -> 1))));
    //2.尝试从pendingSlot分配,如果需要启动新的taskExecutor
    for (Map.Entry<JobID, ResourceCounter> unfulfilledRequirement :
         unfulfilledRequirements.entrySet()) {
        pendingSlots =
        tryFulfillRequirementsWithPendingSlots(
            unfulfilledRequirement.getKey(),
            unfulfilledRequirement.getValue().getResourcesWithCount(),
            pendingSlots);
    }
}

直接分配资源
DeclarativeSlotManager.tryAllocateSlotsForJob
DeclarativeSlotManager.internalTryAllocateSlots
DeclarativeSlotManager.allocateSlot
TaskExecutorGateway.requestSlot
TaskExecutor.requestSlot
TaskExecutor.allocateSlotForJob
TaskExecutor.offerSlotsToJobManager
TaskExecutor.internalOfferSlotsToJobManager
jobMasterGateway.offerSlots
JobMaster.offerSlots
DeclarativeSlotPoolService.offerSlots
DefaultDeclarativeSlotPool.offerSlots
启动taskExecutor流畅
DeclarativeSlotManager.tryFulfillRequirementsWithPendingSlots
DeclarativeSlotManager.tryAllocateWorkerAndReserveSlot
taskExecutorManager.allocateWorker
resourceActions.allocateResource
ResourceManager.startNewWorker

1
2
3
4
5
6
7
//application模式位于YarnApplicationClusterEntryPoint
//YarnResourceManagerFactory继承自ActiveResourceManagerFactory
//public class YarnResourceManagerFactory extends ActiveResourceManagerFactory<YarnWorkerNode> {
private YarnApplicationClusterEntryPoint(
    final Configuration configuration, final PackagedProgram program) {
    super(configuration, program, YarnResourceManagerFactory.getInstance());
}

ActiveResourceManager.requestNewWorker
resourceManagerDriver.requestResource
YarnResourceManagerDriver.requestResource
resourceManagerClient.addContainerRequest
AMRMClientAsync.CallbackHandler.onContainersAllocated
YarnResourceManagerDriver.onContainersOfPriorityAllocated
YarnResourceManagerDriver.startTaskExecutorInContainerAsync
YarnResourceManagerDriver.createTaskExecutorLaunchContext
nodeManagerClient.startContainerAsync
这里应该是交给yarn来调度了,而createTaskExecutorLaunchContext内部会封装启动命令
这里的mainClass是YarnTaskExecutorRunner

1
2
3
4
5
6
7
8
9
Utils.createTaskExecutorContext(
    flinkConfig,
    yarnConfig,
    configuration,
    taskManagerParameters,
    taskManagerDynamicProperties,
    currDir,
    YarnTaskExecutorRunner.class,
    log);

YarnTaskExecutorRunner的main入口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public static void main(String[] args) {
    EnvironmentInformation.logEnvironmentInfo(LOG, "YARN TaskExecutor runner", args);
    SignalHandler.register(LOG);
    JvmShutdownSafeguard.installAsShutdownHook(LOG);

    runTaskManagerSecurely(args);
}
private static void runTaskManagerSecurely(String[] args) {
    Configuration configuration = null;

    try {
        LOG.debug("All environment variables: {}", ENV);

        final String currDir = ENV.get(Environment.PWD.key());
        LOG.info("Current working Directory: {}", currDir);

        configuration = TaskManagerRunner.loadConfiguration(args);
        setupAndModifyConfiguration(configuration, currDir, ENV);
    } catch (Throwable t) {
        LOG.error("YARN TaskManager initialization failed.", t);
        System.exit(INIT_ERROR_EXIT_CODE);
    }

    TaskManagerRunner.runTaskManagerProcessSecurely(Preconditions.checkNotNull(configuration));
}

TaskManagerRunner.runTaskManagerProcessSecurely
TaskManagerRunner.runTaskManager

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
exitCode =
SecurityUtils.getInstalledContext()
.runSecured(() -> runTaskManager(configuration, pluginManager));
public static int runTaskManager(Configuration configuration, PluginManager pluginManager)
throws Exception {
    final TaskManagerRunner taskManagerRunner;

    try {
        //创建并启动taskManagerRunner,
        //注意这里的TaskManagerRunner::createTaskExecutorService
        //跟后边taskExecutorServiceFactory.createTaskExecutor关联起来看
        taskManagerRunner =
        new TaskManagerRunner(
            configuration,
            pluginManager,
            TaskManagerRunner::createTaskExecutorService);
        taskManagerRunner.start();
    } catch (Exception exception) {
        throw new FlinkException("Failed to start the TaskManagerRunner.", exception);
    }

    try {
        return taskManagerRunner.getTerminationFuture().get().getExitCode();
    } catch (Throwable t) {
        throw new FlinkException(
            "Unexpected failure during runtime of TaskManagerRunner.",
            ExceptionUtils.stripExecutionException(t));
    }
}

TaskManagerRunner::createTaskExecutorService创建TaskExecutor,包装成TaskExecutorToServiceAdapter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public static TaskExecutorService createTaskExecutorService(
    Configuration configuration,
    ResourceID resourceID,
    RpcService rpcService,
    HighAvailabilityServices highAvailabilityServices,
    HeartbeatServices heartbeatServices,
    MetricRegistry metricRegistry,
    BlobCacheService blobCacheService,
    boolean localCommunicationOnly,
    ExternalResourceInfoProvider externalResourceInfoProvider,
    WorkingDirectory workingDirectory,
    FatalErrorHandler fatalErrorHandler)
throws Exception {

    final TaskExecutor taskExecutor =
    startTaskManager(
        configuration,
        resourceID,
        rpcService,
        highAvailabilityServices,
        heartbeatServices,
        metricRegistry,
        blobCacheService,
        localCommunicationOnly,
        externalResourceInfoProvider,
        workingDirectory,
        fatalErrorHandler);

    return TaskExecutorToServiceAdapter.createFor(taskExecutor);
}

TaskManagerRunner的start方法
taskExecutorService就是上边的TaskExecutorToServiceAdapter,就是包装的TaskExecutor

1
2
3
4
5
6
7
public void start() throws Exception {
    synchronized (lock) {
        startTaskManagerRunnerServices();
        //
        taskExecutorService.start();
    }
}

TaskManagerRunner.startTaskManagerRunnerServices方法比较长,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
private void startTaskManagerRunnerServices() throws Exception {
    synchronized (lock) {
        rpcSystem = RpcSystem.load(configuration);

        this.executor =
        Executors.newScheduledThreadPool(
            Hardware.getNumberCPUCores(),
            new ExecutorThreadFactory("taskmanager-future"));

        highAvailabilityServices =
        HighAvailabilityServicesUtils.createHighAvailabilityServices(
            configuration,
            executor,
            AddressResolution.NO_ADDRESS_RESOLUTION,
            rpcSystem,
            this);

        JMXService.startInstance(configuration.getString(JMXServerOptions.JMX_SERVER_PORT));

        rpcService = createRpcService(configuration, highAvailabilityServices, rpcSystem);

        this.resourceId =
        getTaskManagerResourceID(
            configuration, rpcService.getAddress(), rpcService.getPort());

        this.workingDirectory =
        ClusterEntrypointUtils.createTaskManagerWorkingDirectory(
            configuration, resourceId);

        LOG.info("Using working directory: {}", workingDirectory);

        HeartbeatServices heartbeatServices =
        HeartbeatServices.fromConfiguration(configuration);

        metricRegistry =
        new MetricRegistryImpl(
            MetricRegistryConfiguration.fromConfiguration(
                configuration,
                rpcSystem.getMaximumMessageSizeInBytes(configuration)),
            ReporterSetup.fromConfiguration(configuration, pluginManager));

        final RpcService metricQueryServiceRpcService =
        MetricUtils.startRemoteMetricsRpcService(
            configuration,
            rpcService.getAddress(),
            configuration.getString(TaskManagerOptions.BIND_HOST),
            rpcSystem);
        metricRegistry.startQueryService(metricQueryServiceRpcService, resourceId.unwrap());

        blobCacheService =
        BlobUtils.createBlobCacheService(
            configuration,
            Reference.borrowed(workingDirectory.unwrap().getBlobStorageDirectory()),
            highAvailabilityServices.createBlobStore(),
            null);

        final ExternalResourceInfoProvider externalResourceInfoProvider =
        ExternalResourceUtils.createStaticExternalResourceInfoProviderFromConfig(
            configuration, pluginManager);
        
        taskExecutorService =
        taskExecutorServiceFactory.createTaskExecutor(
            this.configuration,
            this.resourceId.unwrap(),
            rpcService,
            highAvailabilityServices,
            heartbeatServices,
            metricRegistry,
            blobCacheService,
            false,
            externalResourceInfoProvider,
            workingDirectory.unwrap(),
            this);

        handleUnexpectedTaskExecutorServiceTermination();

        MemoryLogger.startIfConfigured(
            LOG, configuration, terminationFuture.thenAccept(ignored -> {}));
    }
}

taskExecutorService.start();
TaskExecutor也是一个RpcEndpoint,start()之后会回调它的onStart()方法;直接看TaskExecutor的Onstart就好了

1
2
3
4
5
6
7
8
9
10
11
12
13
public void onStart() throws Exception {
    try {
        startTaskExecutorServices();
    } catch (Throwable t) {
        final TaskManagerException exception =
        new TaskManagerException(
            String.format("Could not start the TaskExecutor %s", getAddress()), t);
        onFatalError(exception);
        throw exception;
    }

    startRegistrationTimeout();
}

TaskExecutor.onStart()–>TaskExecutor.startTaskExecutorServices

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private void startTaskExecutorServices() throws Exception {
    try {
        // start by connecting to the ResourceManager
        resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());

        // tell the task slot table who's responsible for the task slot actions
        taskSlotTable.start(new SlotActionsImpl(), getMainThreadExecutor());

        // start the job leader service
        jobLeaderService.start(
            getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());

        fileCache =
        new FileCache(
            taskManagerConfiguration.getTmpDirectories(),
            blobCacheService.getPermanentBlobService());
    } catch (Exception e) {
        handleStartTaskExecutorServicesException(e);
    }
}

ResourceManagerLeaderListener回调方法notifyOfNewResourceManagerLeader

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private final class ResourceManagerLeaderListener implements LeaderRetrievalListener {

    @Override
    public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
        runAsync(
            () ->
            notifyOfNewResourceManagerLeader(
                leaderAddress,
                ResourceManagerId.fromUuidOrNull(leaderSessionID)));
    }

    @Override
    public void handleError(Exception exception) {
        onFatalError(exception);
    }
}

reconnectToResourceManager()
tryConnectToResourceManager()
connectToResourceManager()
启动一个resourceManagerConnection(TaskExecutorToResourceManagerConnection)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private void connectToResourceManager() {
    assert (resourceManagerAddress != null);
    assert (establishedResourceManagerConnection == null);
    assert (resourceManagerConnection == null);

    log.info("Connecting to ResourceManager {}.", resourceManagerAddress);

    final TaskExecutorRegistration taskExecutorRegistration =
    new TaskExecutorRegistration(
        getAddress(),
        getResourceID(),
        unresolvedTaskManagerLocation.getDataPort(),
        hardwareDescription,
        taskManagerConfiguration.getDefaultSlotResourceProfile(),
        taskManagerConfiguration.getTotalResourceProfile());

    resourceManagerConnection =
    new TaskExecutorToResourceManagerConnection(
        log,
        getRpcService(),
        taskManagerConfiguration.getRetryingRegistrationConfiguration(),
        resourceManagerAddress.getAddress(),
        resourceManagerAddress.getResourceManagerId(),
        getMainThreadExecutor(),
        new ResourceManagerRegistrationListener(),
        taskExecutorRegistration);
    //启动TaskExecutorToResourceManagerConnection
    resourceManagerConnection.start();
}

resourceManagerConnection启动后向RM注册

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void start() {
    checkState(!closed, "The RPC connection is already closed");
    checkState(
        !isConnected() && pendingRegistration == null,
        "The RPC connection is already started");

    final RetryingRegistration<F, G, S, R> newRegistration = createNewRegistration();

    if (REGISTRATION_UPDATER.compareAndSet(this, null, newRegistration)) {
        //向RM注册
        newRegistration.startRegistration();
    } else {
        // concurrent start operation
        newRegistration.cancel();
    }
}

rpc调用RM注册TaskExecutor
RetryingRegistration.startRegistration()
RetryingRegistration.register()
ResourceManagerRegistration.invokeRegistration()

1
2
3
4
5
6
7
8
9
10
protected CompletableFuture<RegistrationResponse> invokeRegistration(
    ResourceManagerGateway resourceManager,
    ResourceManagerId fencingToken,
    long timeoutMillis)
throws Exception {

    Time timeout = Time.milliseconds(timeoutMillis);
    //rpc调用RM注册TaskExecutor
    return resourceManager.registerTaskExecutor(taskExecutorRegistration, timeout);
}

如何生成RetryingRegistration呢

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
private RetryingRegistration<F, G, S, R> createNewRegistration() {
    RetryingRegistration<F, G, S, R> newRegistration = checkNotNull(generateRegistration());

    CompletableFuture<RetryingRegistration.RetryingRegistrationResult<G, S, R>> future =
    newRegistration.getFuture();
    //虽然generateRegistration看起来啥都没干,但是包装了一个CompleteFuture,需要关注该Future后续处理
    //尤其是onRegistrationSuccess
    future.whenCompleteAsync(
        (RetryingRegistration.RetryingRegistrationResult<G, S, R> result,
         Throwable failure) -> {
            if (failure != null) {
                if (failure instanceof CancellationException) {
                    // we ignore cancellation exceptions because they originate from
                    // cancelling
                    // the RetryingRegistration
                    log.debug(
                        "Retrying registration towards {} was cancelled.",
                        targetAddress);
                } else {
                    // this future should only ever fail if there is a bug, not if the
                    // registration is declined
                    onRegistrationFailure(failure);
                }
            } else {
                if (result.isSuccess()) {
                    targetGateway = result.getGateway();
                    //ResourceManagerConnection
                    onRegistrationSuccess(result.getSuccess());
                } else if (result.isRejection()) {
                    onRegistrationRejection(result.getRejection());
                } else {
                    throw new IllegalArgumentException(
                        String.format(
                            "Unknown retrying registration response: %s.", result));
                }
            }
        },
        executor);

    return newRegistration;
}

JobMaster.ResourceManagerConnection
跟task注册一样调用invokeRegistration注册

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
protected RetryingRegistration<
ResourceManagerId,
ResourceManagerGateway,
JobMasterRegistrationSuccess,
RegistrationResponse.Rejection>
generateRegistration() {
    return new RetryingRegistration<
    ResourceManagerId,
    ResourceManagerGateway,
    JobMasterRegistrationSuccess,
    RegistrationResponse.Rejection>(
        log,
        getRpcService(),
        "ResourceManager",
        ResourceManagerGateway.class,
        getTargetAddress(),
        getTargetLeaderId(),
        jobMasterConfiguration.getRetryingRegistrationConfiguration()) {

        @Override
        protected CompletableFuture<RegistrationResponse> invokeRegistration(
            ResourceManagerGateway gateway,
            ResourceManagerId fencingToken,
            long timeoutMillis) {
            Time timeout = Time.milliseconds(timeoutMillis);
            //注册JobManager
            return gateway.registerJobManager(
                jobMasterId,
                jobManagerResourceID,
                jobManagerRpcAddress,
                jobID,
                timeout);
        }
    };
}
//注册成功回调
protected void onRegistrationSuccess(final JobMasterRegistrationSuccess success) {
    runAsync(
        () -> {
            // filter out outdated connections
            //noinspection ObjectEquality
            if (this == resourceManagerConnection) {
                establishResourceManagerConnection(success);
            }
        });
}

jobMaster通过slotPool向RM申请slot

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private void establishResourceManagerConnection(final JobMasterRegistrationSuccess success) {
    final ResourceManagerId resourceManagerId = success.getResourceManagerId();

    // verify the response with current connection
    if (resourceManagerConnection != null
        && Objects.equals(
            resourceManagerConnection.getTargetLeaderId(), resourceManagerId)) {

        log.info(
            "JobManager successfully registered at ResourceManager, leader id: {}.",
            resourceManagerId);

        final ResourceManagerGateway resourceManagerGateway =
        resourceManagerConnection.getTargetGateway();

        final ResourceID resourceManagerResourceId = success.getResourceManagerResourceId();

        establishedResourceManagerConnection =
        new EstablishedResourceManagerConnection(
            resourceManagerGateway, resourceManagerResourceId);
        //通过slotPool向RM申请slot
        slotPool.connectToResourceManager(resourceManagerGateway);

        resourceManagerHeartbeatManager.monitorTarget(
            resourceManagerResourceId,
            new HeartbeatTarget<Void>() {
                @Override
                public void receiveHeartbeat(ResourceID resourceID, Void payload) {
                    resourceManagerGateway.heartbeatFromJobManager(resourceID);
                }

                @Override
                public void requestHeartbeat(ResourceID resourceID, Void payload) {
                    // request heartbeat will never be called on the job manager side
                }
            });
    } else {
        log.debug(
            "Ignoring resource manager connection to {} because it's duplicated or outdated.",
            resourceManagerId);
    }
}
This post is licensed under CC BY 4.0 by the author.