Checkpoint流程
CheckpointedInputGate.pollNext(1.10)
1,拉取数据的时候先从BufferStorage拉去,如果由则消费,否则从inputGate拉去
2, 对齐模式,如果正在对齐barrier,则当前channel会阻塞,先将消息加入CachedBufferStorage
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public Optional<BufferOrEvent> pollNext() throws Exception {
while (true) {
// process buffered BufferOrEvents before grabbing new ones
//首先从CachedBufferStorage取数据,没有则从inputGate获取next
Optional<BufferOrEvent> next;
if (bufferStorage.isEmpty()) {
next = inputGate.pollNext();
}
else {
// TODO: FLINK-12536 for non credit-based flow control, getNext method is blocking
next = bufferStorage.pollNext();
if (!next.isPresent()) {
return pollNext();
}
}
if (!next.isPresent()) {
return handleEmptyBuffer();
}
BufferOrEvent bufferOrEvent = next.get();
//barrier对齐,当前channel阻塞,将bufferOrEvent加入CachedBufferStorage
if (barrierHandler.isBlocked(offsetChannelIndex(bufferOrEvent.getChannelIndex()))) {
// if the channel is blocked, we just store the BufferOrEvent
//channel阻塞后,先放入bufferStorage
bufferStorage.add(bufferOrEvent);
//如果bufferStorage满了,对齐模式抛出CheckpointException,清空bufferStorage
if (bufferStorage.isFull()) {
barrierHandler.checkpointSizeLimitExceeded(bufferStorage.getMaxBufferedBytes());
bufferStorage.rollOver();
}
}
else if (bufferOrEvent.isBuffer()) {
return next;
}
else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {
CheckpointBarrier checkpointBarrier = (CheckpointBarrier) bufferOrEvent.getEvent();
if (!endOfInputGate) {
// process barriers only if there is a chance of the checkpoint completing
//处理Barrier
if (barrierHandler.processBarrier(checkpointBarrier, offsetChannelIndex(bufferOrEvent.getChannelIndex()), bufferStorage.getPendingBytes())) {
bufferStorage.rollOver();
}
}
}
else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) {
//处理CancelCheckpointMarker
if (barrierHandler.processCancellationBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent())) {
bufferStorage.rollOver();
}
}
else {
if (bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class) {
//处理EndOfPartitionEvent
if (barrierHandler.processEndOfPartition()) {
bufferStorage.rollOver();
}
}
return next;
}
}
}
BufferStorage类型,EXACTLY_ONCE模式是CachedBufferStorage
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private static BufferStorage createBufferStorage(
CheckpointingMode checkpointMode,
IOManager ioManager,
int pageSize,
Configuration taskManagerConfig,
String taskName) {
switch (checkpointMode) {
case EXACTLY_ONCE: {
long maxAlign = taskManagerConfig.getLong(TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT);
if (!(maxAlign == -1 || maxAlign > 0)) {
throw new IllegalConfigurationException(
TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key()
+ " must be positive or -1 (infinite)");
}
//EXACTLY_ONCE 模式CachedBufferStorage
return new CachedBufferStorage(pageSize, maxAlign, taskName);
}
case AT_LEAST_ONCE:
return new EmptyBufferStorage();
default:
throw new UnsupportedOperationException("Unrecognized Checkpointing Mode: " + checkpointMode);
}
}
}
CheckpointedInputGate(>1.10)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public Optional<BufferOrEvent> pollNext() throws Exception {
while (true) {
Optional<BufferOrEvent> next = inputGate.pollNext();
if (!next.isPresent()) {
return handleEmptyBuffer();
}
BufferOrEvent bufferOrEvent = next.get();
//对齐模式,这里可能是blocked,会阻塞Channel继续消费数据,老版本用bufferStore缓存数据
checkState(!barrierHandler.isBlocked(bufferOrEvent.getChannelInfo()));
//Buffer 数据
if (bufferOrEvent.isBuffer()) {
return next;
} else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {//处理CheckpointBarrier
CheckpointBarrier checkpointBarrier = (CheckpointBarrier) bufferOrEvent.getEvent();
barrierHandler.processBarrier(checkpointBarrier, bufferOrEvent.getChannelInfo());
return next;
} else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) {//处理CancelCheckpointMarker
barrierHandler.processCancellationBarrier(
(CancelCheckpointMarker) bufferOrEvent.getEvent());
} else {
if (bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class) {//处理EndOfPartitionEvent
barrierHandler.processEndOfPartition();
}
return next;
}
}
}
对齐CheckpointBarrierAligner
CheckpointBarrierAligner.processBarrier中:
- 当前算子的上游只有一个inputchannel,更新currentCheckpointId,直接触发notifyCheckpoint
- 如果上游是多个通道channel
2.1 如果当前正在进行checkpoint
2.1.1 如果barrierId > currentCheckpointId 说明是新来barrier, 直接skip之前的ckp,开始新一轮对齐
2.1.2 如果barrierId == currentCheckpointId,说明是当前正在进行ckp的 barrier,直接设置当前channel为 block,阻塞消费
2.1.3 如果barrierId < currentCheckpointId,说明是是老的ckp,无视这个barrier, 让通道继续消费通道
2.2 当前本轮checkpoint的第一个barrier,新来的barrier的Id(barrierId )与当前正在进行currentCheckpointId大,更新currentCheckpointId, 设置通道为block,不能继续消费消息
2.3 如果新来的barrierId <currentCheckpointId 或者numBarriersReceived==0(被取消) 则让通道重新继续消费
- 最后判断如果收到的Barrier数量+已关闭的通道数量=总通道数,说明已经收到所有的Barrier了,开始执行CheckpointBarrierHandler.notifyCheckpoint
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
public void processBarrier(CheckpointBarrier receivedBarrier, InputChannelInfo channelInfo)
throws Exception {
final long barrierId = receivedBarrier.getId();
// fast path for single channel cases
if (totalNumberOfInputChannels == 1) {//单个通道
resumeConsumption(channelInfo);//通知通道继续消费消息
if (barrierId > currentCheckpointId) {//如果是一个新的checkpoint,则执行notifyCheckpoint
// new checkpoint
currentCheckpointId = barrierId;//更新currentCheckpointId
notifyCheckpoint(receivedBarrier, latestAlignmentDurationNanos);
}
return;
}
// -- general code path for multiple input channels --
//正在进行chp,numBarriersReceived > 0,多个inputchannel
if (isCheckpointPending()) {
// this is only true if some alignment is already progress and was not canceled
//2.1.2 当前checkpointId,设置当前channel为block,
if (barrierId == currentCheckpointId) {
// regular case
onBarrier(channelInfo);
} else if (barrierId > currentCheckpointId) {
// 2.1.1 大于当前currentCheckpointId,说明是一个新的,直接skip当前ckp,因为之前的ckp还没完成
// we did not complete the current checkpoint, another started before
LOG.warn(
"{}: Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. "
+ "Skipping current checkpoint.",
taskName,
barrierId,
currentCheckpointId);
//broadcast事件CancelCheckpointMarker
// let the task know we are not completing this
notifyAbort(
currentCheckpointId,
new CheckpointException(
"Barrier id: " + barrierId,
CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED));
// abort the current checkpoint
releaseBlocksAndResetBarriers();
//开启一个新的ckp
// begin a new checkpoint
beginNewAlignment(barrierId, channelInfo, receivedBarrier.getTimestamp());
} else {
// 2.1.3 比当前ckpId小,说明是之前的ckpId,直接唤醒重新消费
// ignore trailing barrier from an earlier checkpoint (obsolete now)
resumeConsumption(channelInfo);
}
} else if (barrierId > currentCheckpointId) {
// first barrier of a new checkpoint
//2.2 收到的barrierId比当前检查点id(currentCheckpointId)大,新的ckp
beginNewAlignment(barrierId, channelInfo, receivedBarrier.getTimestamp());
} else {
// either the current checkpoint was canceled (numBarriers == 0) or
// this barrier is from an old subsumed checkpoint
// 2.3 当前批次的ckp被取消了或者是一个跟早的ckp,直接换新重复消费
resumeConsumption(channelInfo);//
}
//已经收到Barriers总数+已经关闭的通道数=总通道数
// check if we have all barriers - since canceled checkpoints always have zero barriers
// this can only happen on a non canceled checkpoint
if (numBarriersReceived + numClosedChannels == totalNumberOfInputChannels) {
// actually trigger checkpoint
if (LOG.isDebugEnabled()) {
LOG.debug(
"{}: Received all barriers, triggering checkpoint {} at {}.",
taskName,
receivedBarrier.getId(),
receivedBarrier.getTimestamp());
}
releaseBlocksAndResetBarriers();
notifyCheckpoint(receivedBarrier, latestAlignmentDurationNanos);
}
}