Home Flink1.15 InputChannel源码分析
Post
Cancel

Flink1.15 InputChannel源码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
this.subtaskCheckpointCoordinator = new SubtaskCheckpointCoordinatorImpl(checkpointStorageAccess, this.getName(), actionExecutor, this.getCancelables(), this.getAsyncOperationsThreadPool(), environment, this, this.configuration.isUnalignedCheckpointsEnabled(), (Boolean)this.configuration.getConfiguration().get(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH), this::prepareInputSnapshot);

private CompletableFuture<Void> prepareInputSnapshot(ChannelStateWriter channelStateWriter, long checkpointId) throws CheckpointException {
    return this.inputProcessor == null ? FutureUtils.completedVoidFuture() : this.inputProcessor.prepareSnapshot(channelStateWriter, checkpointId);
}
//StreamOneInputProcessor
public CompletableFuture<Void> prepareSnapshot(ChannelStateWriter channelStateWriter, long checkpointId) throws IOException {
    return this.input.prepareSnapshot(channelStateWriter, checkpointId);
}
//StreamTwoInputProcessor
public CompletableFuture<Void> prepareSnapshot(ChannelStateWriter channelStateWriter, long checkpointId) throws IOException {
    return CompletableFuture.allOf(this.processor1.prepareSnapshot(channelStateWriter, checkpointId), this.processor2.prepareSnapshot(channelStateWriter, checkpointId));
}
//StreamMultipleInputProcessor
public CompletableFuture<Void> prepareSnapshot(ChannelStateWriter channelStateWriter, long checkpointId) throws IOException {
    CompletableFuture<?>[] inputFutures = new CompletableFuture[this.inputProcessors.length];

    for(int index = 0; index < inputFutures.length; ++index) {
        inputFutures[index] = this.inputProcessors[index].prepareSnapshot(channelStateWriter, checkpointId);
    }

    return CompletableFuture.allOf(inputFutures);
}

Task中创建SingleInputGate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// consumed intermediate result partitions
final IndexedInputGate[] gates =
shuffleEnvironment
.createInputGates(taskShuffleContext, this, inputGateDeploymentDescriptors)
.toArray(new IndexedInputGate[0]);

this.inputGates = new IndexedInputGate[gates.length];
int counter = 0;
for (IndexedInputGate gate : gates) {
    inputGates[counter++] =
    new InputGateWithMetrics(
        gate, metrics.getIOMetricGroup().getNumBytesInCounter());
}
//NettyShuffleEnvironment
public List<SingleInputGate> createInputGates(
    ShuffleIOOwnerContext ownerContext,
    PartitionProducerStateProvider partitionProducerStateProvider,
    List<InputGateDeploymentDescriptor> inputGateDeploymentDescriptors) {
    synchronized (lock) {
        Preconditions.checkState(
            !isClosed, "The NettyShuffleEnvironment has already been shut down.");

        MetricGroup networkInputGroup = ownerContext.getInputGroup();
        @SuppressWarnings("deprecation")
        InputChannelMetrics inputChannelMetrics =
        new InputChannelMetrics(networkInputGroup, ownerContext.getParentGroup());

        SingleInputGate[] inputGates =
        new SingleInputGate[inputGateDeploymentDescriptors.size()];
        for (int gateIndex = 0; gateIndex < inputGates.length; gateIndex++) {
            final InputGateDeploymentDescriptor igdd =
            inputGateDeploymentDescriptors.get(gateIndex);
            SingleInputGate inputGate =
            singleInputGateFactory.create(
                ownerContext.getOwnerName(),
                gateIndex,
                igdd,
                partitionProducerStateProvider,
                inputChannelMetrics);
            InputGateID id =
            new InputGateID(
                igdd.getConsumedResultId(), ownerContext.getExecutionAttemptID());
            inputGatesById.put(id, inputGate);
            inputGate.getCloseFuture().thenRun(() -> inputGatesById.remove(id));
            inputGates[gateIndex] = inputGate;
        }

        registerInputMetrics(config.isNetworkDetailedMetrics(), networkInputGroup, inputGates);
        return Arrays.asList(inputGates);
    }
}
This post is licensed under CC BY 4.0 by the author.