Home Flink1.15 CheckPoint源码分析
Post
Cancel

Flink1.15 CheckPoint源码分析

Checkpoint流程

Checkpoint CheckpointedInputGate.pollNext(1.10)
1,拉取数据的时候先从BufferStorage拉去,如果由则消费,否则从inputGate拉去
2, 对齐模式,如果正在对齐barrier,则当前channel会阻塞,先将消息加入CachedBufferStorage

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public Optional<BufferOrEvent> pollNext() throws Exception {
    while (true) {
        // process buffered BufferOrEvents before grabbing new ones
        //首先从CachedBufferStorage取数据,没有则从inputGate获取next
        Optional<BufferOrEvent> next;
        if (bufferStorage.isEmpty()) {
            next = inputGate.pollNext();
        }
        else {
            // TODO: FLINK-12536 for non credit-based flow control, getNext method is blocking
            next = bufferStorage.pollNext();
            if (!next.isPresent()) {
                return pollNext();
            }
        }

        if (!next.isPresent()) {
            return handleEmptyBuffer();
        }

        BufferOrEvent bufferOrEvent = next.get();
        //barrier对齐,当前channel阻塞,将bufferOrEvent加入CachedBufferStorage
        if (barrierHandler.isBlocked(offsetChannelIndex(bufferOrEvent.getChannelIndex()))) {
            // if the channel is blocked, we just store the BufferOrEvent
            //channel阻塞后,先放入bufferStorage
            bufferStorage.add(bufferOrEvent);
            //如果bufferStorage满了,对齐模式抛出CheckpointException,清空bufferStorage
            if (bufferStorage.isFull()) {
                barrierHandler.checkpointSizeLimitExceeded(bufferStorage.getMaxBufferedBytes());
                bufferStorage.rollOver();
            }
        }
        else if (bufferOrEvent.isBuffer()) {
            return next;
        }
        else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {
            CheckpointBarrier checkpointBarrier = (CheckpointBarrier) bufferOrEvent.getEvent();
            if (!endOfInputGate) {
                // process barriers only if there is a chance of the checkpoint completing
                //处理Barrier
                if (barrierHandler.processBarrier(checkpointBarrier, offsetChannelIndex(bufferOrEvent.getChannelIndex()), bufferStorage.getPendingBytes())) {
                    bufferStorage.rollOver();
                }
            }
        }
        else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) {
            //处理CancelCheckpointMarker
            if (barrierHandler.processCancellationBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent())) {
                bufferStorage.rollOver();
            }
        }
        else {
            if (bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class) {
                //处理EndOfPartitionEvent
                if (barrierHandler.processEndOfPartition()) {
                    bufferStorage.rollOver();
                }
            }
            return next;
        }
    }
}

BufferStorage类型,EXACTLY_ONCE模式是CachedBufferStorage

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private static BufferStorage createBufferStorage(
    CheckpointingMode checkpointMode,
    IOManager ioManager,
    int pageSize,
    Configuration taskManagerConfig,
    String taskName) {
    switch (checkpointMode) {
        case EXACTLY_ONCE: {
            long maxAlign = taskManagerConfig.getLong(TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT);
            if (!(maxAlign == -1 || maxAlign > 0)) {
                throw new IllegalConfigurationException(
                    TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key()
                    + " must be positive or -1 (infinite)");
            }
            //EXACTLY_ONCE 模式CachedBufferStorage
            return new CachedBufferStorage(pageSize, maxAlign, taskName);
        }
    case AT_LEAST_ONCE:
        return new EmptyBufferStorage();
        default:
        throw new UnsupportedOperationException("Unrecognized Checkpointing Mode: " + checkpointMode);
    }
}
}

CheckpointedInputGate(>1.10)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public Optional<BufferOrEvent> pollNext() throws Exception {
    while (true) {
        Optional<BufferOrEvent> next = inputGate.pollNext();

        if (!next.isPresent()) {
            return handleEmptyBuffer();
        }

        BufferOrEvent bufferOrEvent = next.get();
        //对齐模式,这里可能是blocked,会阻塞Channel继续消费数据,老版本用bufferStore缓存数据
        checkState(!barrierHandler.isBlocked(bufferOrEvent.getChannelInfo()));
        //Buffer 数据
        if (bufferOrEvent.isBuffer()) {
            return next;
        } else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {//处理CheckpointBarrier
            CheckpointBarrier checkpointBarrier = (CheckpointBarrier) bufferOrEvent.getEvent();
            barrierHandler.processBarrier(checkpointBarrier, bufferOrEvent.getChannelInfo());
            return next;
        } else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) {//处理CancelCheckpointMarker
            barrierHandler.processCancellationBarrier(
                (CancelCheckpointMarker) bufferOrEvent.getEvent());
        } else {
            if (bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class) {//处理EndOfPartitionEvent
                barrierHandler.processEndOfPartition();
            }
            return next;
        }
    }
}

对齐CheckpointBarrierAligner
CheckpointBarrierAligner.processBarrier中:

  1. 当前算子的上游只有一个inputchannel,更新currentCheckpointId,直接触发notifyCheckpoint
  2. 如果上游是多个通道channel

2.1 如果当前正在进行checkpoint
2.1.1 如果barrierId > currentCheckpointId 说明是新来barrier, 直接skip之前的ckp,开始新一轮对齐
2.1.2 如果barrierId == currentCheckpointId,说明是当前正在进行ckp的 barrier,直接设置当前channel为 block,阻塞消费
2.1.3 如果barrierId < currentCheckpointId,说明是是老的ckp,无视这个barrier, 让通道继续消费通道
2.2 当前本轮checkpoint的第一个barrier,新来的barrier的Id(barrierId )与当前正在进行currentCheckpointId大,更新currentCheckpointId, 设置通道为block,不能继续消费消息
2.3 如果新来的barrierId <currentCheckpointId 或者numBarriersReceived==0(被取消) 则让通道重新继续消费

  1. 最后判断如果收到的Barrier数量+已关闭的通道数量=总通道数,说明已经收到所有的Barrier了,开始执行CheckpointBarrierHandler.notifyCheckpoint
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
public void processBarrier(CheckpointBarrier receivedBarrier, InputChannelInfo channelInfo)
throws Exception {
    final long barrierId = receivedBarrier.getId();

    // fast path for single channel cases
    if (totalNumberOfInputChannels == 1) {//单个通道
        resumeConsumption(channelInfo);//通知通道继续消费消息
        if (barrierId > currentCheckpointId) {//如果是一个新的checkpoint,则执行notifyCheckpoint
            // new checkpoint
            currentCheckpointId = barrierId;//更新currentCheckpointId
            notifyCheckpoint(receivedBarrier, latestAlignmentDurationNanos);
        }
        return;
    }

    // -- general code path for multiple input channels --
    //正在进行chp,numBarriersReceived > 0,多个inputchannel
    if (isCheckpointPending()) {
        // this is only true if some alignment is already progress and was not canceled
        //2.1.2 当前checkpointId,设置当前channel为block,
        if (barrierId == currentCheckpointId) {
            // regular case
            onBarrier(channelInfo);
        } else if (barrierId > currentCheckpointId) {
            //  2.1.1 大于当前currentCheckpointId,说明是一个新的,直接skip当前ckp,因为之前的ckp还没完成
            // we did not complete the current checkpoint, another started before
            LOG.warn(
                "{}: Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. "
                + "Skipping current checkpoint.",
                taskName,
                barrierId,
                currentCheckpointId);
            //broadcast事件CancelCheckpointMarker
            // let the task know we are not completing this
            notifyAbort(
                currentCheckpointId,
                new CheckpointException(
                    "Barrier id: " + barrierId,
                    CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED));

            // abort the current checkpoint
            releaseBlocksAndResetBarriers();
            //开启一个新的ckp
            // begin a new checkpoint
            beginNewAlignment(barrierId, channelInfo, receivedBarrier.getTimestamp());
        } else {
            //  2.1.3 比当前ckpId小,说明是之前的ckpId,直接唤醒重新消费
            // ignore trailing barrier from an earlier checkpoint (obsolete now)
            resumeConsumption(channelInfo);
        }
    } else if (barrierId > currentCheckpointId) {
        // first barrier of a new checkpoint
        //2.2 收到的barrierId比当前检查点id(currentCheckpointId)大,新的ckp
        beginNewAlignment(barrierId, channelInfo, receivedBarrier.getTimestamp());
    } else {
        // either the current checkpoint was canceled (numBarriers == 0) or
        // this barrier is from an old subsumed checkpoint
        // 2.3 当前批次的ckp被取消了或者是一个跟早的ckp,直接换新重复消费
        resumeConsumption(channelInfo);//
    }
    //已经收到Barriers总数+已经关闭的通道数=总通道数
    // check if we have all barriers - since canceled checkpoints always have zero barriers
    // this can only happen on a non canceled checkpoint
    if (numBarriersReceived + numClosedChannels == totalNumberOfInputChannels) {
        // actually trigger checkpoint
        if (LOG.isDebugEnabled()) {
            LOG.debug(
                "{}: Received all barriers, triggering checkpoint {} at {}.",
                taskName,
                receivedBarrier.getId(),
                receivedBarrier.getTimestamp());
        }

        releaseBlocksAndResetBarriers();
        notifyCheckpoint(receivedBarrier, latestAlignmentDurationNanos);
    }
}
This post is licensed under CC BY 4.0 by the author.