Home Flink1.15 CKP InputData&OutPutData源码分析
Post
Cancel

Flink1.15 CKP InputData&OutPutData源码分析

AlternatingCollectingBarriers.alignmentTimeout

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public BarrierHandlerState alignmentTimeout(
    Controller controller, CheckpointBarrier checkpointBarrier)
throws IOException, CheckpointException {
    state.prioritizeAllAnnouncements();
    CheckpointBarrier unalignedBarrier = checkpointBarrier.asUnaligned();
    //初始化ckp
    controller.initInputsCheckpoint(unalignedBarrier);
    for (CheckpointableInput input : state.getInputs()) {
        input.checkpointStarted(unalignedBarrier);
    }
    controller.triggerGlobalCheckpoint(unalignedBarrier);
    //转换位Collecting
    return new AlternatingCollectingBarriersUnaligned(true, state);
}

AlternatingWaitingForFirstBarrierUnaligned.barrierReceived

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public BarrierHandlerState barrierReceived(
    Controller controller,
    InputChannelInfo channelInfo,
    CheckpointBarrier checkpointBarrier,
    boolean markChannelBlocked)
throws CheckpointException, IOException {

    // we received an out of order aligned barrier, we should book keep this channel as blocked,
    // as it is being blocked by the credit-based network
    if (markChannelBlocked
        && !checkpointBarrier.getCheckpointOptions().isUnalignedCheckpoint()) {
        channelState.blockChannel(channelInfo);
    }

    CheckpointBarrier unalignedBarrier = checkpointBarrier.asUnaligned();
    //初始化
    controller.initInputsCheckpoint(unalignedBarrier);
    for (CheckpointableInput input : channelState.getInputs()) {
        input.checkpointStarted(unalignedBarrier);
    }
    //出发ckp
    controller.triggerGlobalCheckpoint(unalignedBarrier);
    if (controller.allBarriersReceived()) {
        for (CheckpointableInput input : channelState.getInputs()) {
            input.checkpointStopped(unalignedBarrier.getId());
        }
        return stopCheckpoint();
    }
    return new AlternatingCollectingBarriersUnaligned(alternating, channelState);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void initInputsCheckpoint(CheckpointBarrier checkpointBarrier)
throws CheckpointException {
    checkState(subTaskCheckpointCoordinator != null);
    long barrierId = checkpointBarrier.getId();
    subTaskCheckpointCoordinator.initInputsCheckpoint(
        barrierId, checkpointBarrier.getCheckpointOptions());
}
//SubtaskCheckpointCoordinatorImpl
public void initInputsCheckpoint(long id, CheckpointOptions checkpointOptions)
throws CheckpointException {
    //非对齐模式
    if (checkpointOptions.isUnalignedCheckpoint()) {
        channelStateWriter.start(id, checkpointOptions);

        prepareInflightDataSnapshot(id);
    }
}

prepareInflightDataSnapshot主要是将将上游数据添加到inputData,见StreamTask

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//prepareInputSnapshot这个是StreamTask实例化就生成的,也就是构造方法里
private void prepareInflightDataSnapshot(long checkpointId) throws CheckpointException {
    prepareInputSnapshot
    .apply(channelStateWriter, checkpointId)
    .whenComplete(
        (unused, ex) -> {
            if (ex != null) {
                channelStateWriter.abort(
                    checkpointId,
                    ex,
                    false /* result is needed and cleaned by getWriteResult */);
            } else {
                channelStateWriter.finishInput(checkpointId);
            }
        });
}

StreamTask

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//StreamTask 实例化subtaskCheckpointCoordinator
this.subtaskCheckpointCoordinator =
new SubtaskCheckpointCoordinatorImpl(
    checkpointStorageAccess,
    getName(),
    actionExecutor,
    getCancelables(),
    getAsyncOperationsThreadPool(),
    environment,
    this,
    configuration.isUnalignedCheckpointsEnabled(),
    configuration
    .getConfiguration()
    .get(
        ExecutionCheckpointingOptions
        .ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH),
    this::prepareInputSnapshot);
//StreamTask
private CompletableFuture<Void> prepareInputSnapshot(
    ChannelStateWriter channelStateWriter, long checkpointId) throws CheckpointException {
    if (inputProcessor == null) {
        return FutureUtils.completedVoidFuture();
    }
    return inputProcessor.prepareSnapshot(channelStateWriter, checkpointId);
}

StreamMultipleInputProcessor.prepareSnapshot
StreamOneInputProcessor.prepareSnapshot
StreamTaskNetworkInput.prepareSnapshot
channelStateWriter.addInputData

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//StreamTaskNetworkInput
public CompletableFuture<Void> prepareSnapshot(
    ChannelStateWriter channelStateWriter, long checkpointId) throws CheckpointException {
    for (Map.Entry<
         InputChannelInfo,
         SpillingAdaptiveSpanningRecordDeserializer<
         DeserializationDelegate<StreamElement>>>
         e : recordDeserializers.entrySet()) {

        try {
            //
            channelStateWriter.addInputData(
                checkpointId,
                e.getKey(),
                ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
                e.getValue().getUnconsumedBuffer());
        } catch (IOException ioException) {
            throw new CheckpointException(CheckpointFailureReason.IO_EXCEPTION, ioException);
        }
    }
    return checkpointedInputGate.getAllBarriersReceivedFuture(checkpointId);
}

向下游广播checkpointEvent
SubtaskCheckpointCoordinatorImpl,这里broadcastEvent跟channelStateWriter.finishOutput连起来看
看注释是说broadcastEvent已经将数据写到output data了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//SubtaskCheckpointCoordinatorImpl
// Step (2): Send the checkpoint barrier downstream
operatorChain.broadcastEvent(
    new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options),
    options.isUnalignedCheckpoint());
// Step (3): Prepare to spill the in-flight buffers for input and output
if (options.isUnalignedCheckpoint()) {
    // output data already written while broadcasting event
    channelStateWriter.finishOutput(metadata.getCheckpointId());
}
//OperatorChain
public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException {
    for (RecordWriterOutput<?> streamOutput : streamOutputs) {
        streamOutput.broadcastEvent(event, isPriorityEvent);
    }
}
//RecordWriterOutput
public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException {
    if (isPriorityEvent
        && event instanceof CheckpointBarrier
        && !supportsUnalignedCheckpoints) {
        final CheckpointBarrier barrier = (CheckpointBarrier) event;
        event = barrier.withOptions(barrier.getCheckpointOptions().withUnalignedUnsupported());
        isPriorityEvent = false;
    }
    recordWriter.broadcastEvent(event, isPriorityEvent);
}
//RecordWriter
public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException {
    targetPartition.broadcastEvent(event, isPriorityEvent);

    if (flushAlways) {
        flushAll();
    }
}

targetPartition可以式pipeline,比如BufferWritingResultPartition

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException {
    checkInProduceState();
    finishBroadcastBufferBuilder();
    finishUnicastBufferBuilders();

    try (BufferConsumer eventBufferConsumer =
         EventSerializer.toBufferConsumer(event, isPriorityEvent)) {
        totalWrittenBytes += ((long) eventBufferConsumer.getWrittenBytes() * numSubpartitions);
        for (ResultSubpartition subpartition : subpartitions) {
            // Retain the buffer so that it can be recycled by each channel of targetPartition
            subpartition.add(eventBufferConsumer.copy(), 0);
        }
    }
}
//PipelinedSubpartition
public void addRecovered(BufferConsumer bufferConsumer) throws IOException {
    NetworkActionsLogger.traceRecover(
        "PipelinedSubpartition#addRecovered",
        bufferConsumer,
        parent.getOwningTaskName(),
        subpartitionInfo);
    if (add(bufferConsumer, Integer.MIN_VALUE) == -1) {
        throw new IOException("Buffer consumer couldn't be added to ResultSubpartition");
    }
}
//PipelinedSubpartition
public int add(BufferConsumer bufferConsumer, int partialRecordLength) {
    return add(bufferConsumer, partialRecordLength, false);
}
//PipelinedSubpartition
private int add(BufferConsumer bufferConsumer, int partialRecordLength, boolean finish) {
    checkNotNull(bufferConsumer);

    final boolean notifyDataAvailable;
    int prioritySequenceNumber = -1;
    int newBufferSize;
    synchronized (buffers) {
        if (isFinished || isReleased) {
            bufferConsumer.close();
            return -1;
        }
        //注意这里addBuffer
        // Add the bufferConsumer and update the stats
        if (addBuffer(bufferConsumer, partialRecordLength)) {
            prioritySequenceNumber = sequenceNumber;
        }
        updateStatistics(bufferConsumer);
        increaseBuffersInBacklog(bufferConsumer);
        notifyDataAvailable = finish || shouldNotifyDataAvailable();

        isFinished |= finish;
        newBufferSize = bufferSize;
    }

    if (prioritySequenceNumber != -1) {
        notifyPriorityEvent(prioritySequenceNumber);
    }
    if (notifyDataAvailable) {
        notifyDataAvailable();
    }

    return newBufferSize;
}

看注释addBuffer里处理bufferConsumer

1
2
3
4
5
6
7
8
9
private boolean addBuffer(BufferConsumer bufferConsumer, int partialRecordLength) {
    assert Thread.holdsLock(buffers);
    //优先级类型,非对齐模式
    if (bufferConsumer.getDataType().hasPriority()) {
        return processPriorityBuffer(bufferConsumer, partialRecordLength);
    }
    buffers.add(new BufferConsumerWithPartialRecordLength(bufferConsumer, partialRecordLength));
    return false;
}

解析barrier时间,手机buffers内部数据,inflightBuffers写入outputData

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private boolean processPriorityBuffer(BufferConsumer bufferConsumer, int partialRecordLength) {
    buffers.addPriorityElement(
        new BufferConsumerWithPartialRecordLength(bufferConsumer, partialRecordLength));
    final int numPriorityElements = buffers.getNumPriorityElements();
    //解析CheckpointBarrier
    CheckpointBarrier barrier = parseCheckpointBarrier(bufferConsumer);
    if (barrier != null) {
        checkState(
            barrier.getCheckpointOptions().isUnalignedCheckpoint(),
            "Only unaligned checkpoints should be priority events");
        final Iterator<BufferConsumerWithPartialRecordLength> iterator = buffers.iterator();
        Iterators.advance(iterator, numPriorityElements);
        List<Buffer> inflightBuffers = new ArrayList<>();
        //迭代buffers,即BufferConsumer
        while (iterator.hasNext()) {
            BufferConsumer buffer = iterator.next().getBufferConsumer();

            if (buffer.isBuffer()) {
                //如果式buffer,则加入inflightBuffers
                try (BufferConsumer bc = buffer.copy()) {
                    inflightBuffers.add(bc.build());
                }
            }
        }
        if (!inflightBuffers.isEmpty()) {
            //将inflightBuffers下入outputData
            channelStateWriter.addOutputData(
                barrier.getId(),
                subpartitionInfo,
                ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
                inflightBuffers.toArray(new Buffer[0]));
        }
    }
    return numPriorityElements == 1
    && !isBlocked; // if subpartition is blocked then downstream doesn't expect any
    // notifications
}
This post is licensed under CC BY 4.0 by the author.