Home RocketMQ 消息存储
Post
Cancel

RocketMQ 消息存储

BrokerController初始化的时候实例化一个DefaultMessageStore,并且负责load文件

1
2
3
4
5
6
7
//BrokerController.initialize()
this.messageStore =
new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
                        this.brokerConfig);
//加载
result = result && this.messageStore.load();

加载

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
//DefaultMessageStore.load
public boolean load() {
    boolean result = true;

    try {
        boolean lastExitOK = !this.isTempFileExist();
        log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");

        if (null != scheduleMessageService) {
            result = result && this.scheduleMessageService.load();
        }

        // load Commit Log 加载commitlog
        result = result && this.commitLog.load();

        // load Consume Queue 加载Consume Queue
        result = result && this.loadConsumeQueue();

        if (result) {
            this.storeCheckpoint =
            new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
            //加载 IndexFile
            this.indexService.load(lastExitOK);

            this.recover(lastExitOK);

            log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
        }
    } catch (Exception e) {
        log.error("load exception", e);
        result = false;
    }

    if (!result) {
        this.allocateMappedFileService.shutdown();
    }

    return result;
}

commitlog加载,主要是mappedFileQueue负责管理的,一个mappedFileQueue持有多个MappedFile

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
//MappedFileQueue.load
public boolean load() {
    boolean result = this.mappedFileQueue.load();
    log.info("load commit log " + (result ? "OK" : "Failed"));
    return result;
}
//
public boolean load() {
    File dir = new File(this.storePath);
    File[] files = dir.listFiles();
    //遍历目录下所有的文件,生成MappedFile集合mappedFiles
    if (files != null) {
        // ascending order
        Arrays.sort(files);
        for (File file : files) {

            if (file.length() != this.mappedFileSize) {
                log.warn(file + "\t" + file.length()
                         + " length not matched message store config value, please check it manually");
                return false;
            }

            try {
        
                MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);

                mappedFile.setWrotePosition(this.mappedFileSize);
                mappedFile.setFlushedPosition(this.mappedFileSize);
                mappedFile.setCommittedPosition(this.mappedFileSize);
                this.mappedFiles.add(mappedFile);
                log.info("load " + file.getPath() + " OK");
            } catch (IOException e) {
                log.error("load file " + file + " error", e);
                return false;
            }
        }
    }

    return true;
}

mappedFiles这个是一个CopyOnWriteArrayList

1
2
private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();

MappedFile 对应一个具体的文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public MappedFile(final String fileName, final int fileSize) throws IOException {
    init(fileName, fileSize);
}
public void init(final String fileName, final int fileSize,
                 final TransientStorePool transientStorePool) throws IOException {
    init(fileName, fileSize);
    this.writeBuffer = transientStorePool.borrowBuffer();//堆外内存
    this.transientStorePool = transientStorePool;
}
//文件内存映射fileChannel.map
private void init(final String fileName, final int fileSize) throws IOException {
    this.fileName = fileName;
    this.fileSize = fileSize;
    this.file = new File(fileName);
    this.fileFromOffset = Long.parseLong(this.file.getName());
    boolean ok = false;

    ensureDirOK(this.file.getParent());

    try {
        this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
        //文件内存映射mappedByteBuffer
        this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
        TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
        TOTAL_MAPPED_FILES.incrementAndGet();
        ok = true;
    } catch (FileNotFoundException e) {
        log.error("Failed to create file " + this.fileName, e);
        throw e;
    } catch (IOException e) {
        log.error("Failed to map file " + this.fileName, e);
        throw e;
    } finally {
        if (!ok && this.fileChannel != null) {
            this.fileChannel.close();
        }
    }
}

broker端put大致流程

1
2
3
4
5
6
SendMessageProcessor.processRequest
SendMessageProcessor.asyncProcessRequest
SendMessageProcessor.asyncSendBatchMessage/asyncSendMessage
DefaultMessageStore.asyncPutMessage
CommitLog.asyncPutMessage

CommitLog 的asyncPutMessage

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
    //从mappedFileQueue获取最新的一个MappedFile
    MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
    //如果没有获取到或者文件满了,则重新创建一个
    if (null == mappedFile || mappedFile.isFull()) {
        mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
    }
    //追加到最新的MappedFile
    result = mappedFile.appendMessage(msg, this.appendMessageCallback);
    //提交刷盘请求
    CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, putMessageResult, msg);
    //
    CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, putMessageResult, msg);

}

MappedFile
填充最新的message,这里并没有强制刷盘
获取byteBuffer,两种类型writeBuffer、mappedByteBuffer用来

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
    assert messageExt != null;
    assert cb != null;

    int currentPos = this.wrotePosition.get();

    if (currentPos < this.fileSize) {
        ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();//堆外内存或者文件内存映射
        byteBuffer.position(currentPos);
        AppendMessageResult result;
        //doAppend只是用messageExt填充byteBuffer,没有强制刷盘,还在内存中
        if (messageExt instanceof MessageExtBrokerInner) {
            result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
        } else if (messageExt instanceof MessageExtBatch) {
            result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
        } else {
            return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
        }
        this.wrotePosition.addAndGet(result.getWroteBytes());
        this.storeTimestamp = result.getStoreTimestamp();
        return result;
    }
    log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
    return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}

堆外内存ByteBuffer:

1
2
 this.writeBuffer = transientStorePool.borrowBuffer();//堆外内存

物理文件对应的内存映射Buffer

1
2
3
 this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
        //文件内存映射mappedByteBuffer
 this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);

那么到底何时刷盘呢,有两种方式同步或者异步

1
2
3
4
5
6
7
8
9
public enum FlushDiskType {
    SYNC_FLUSH,
    ASYNC_FLUSH
}
if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
    this.flushCommitLogService = new GroupCommitService();
} else {
    this.flushCommitLogService = new FlushRealTimeService();
}

如果是同步刷盘Commitlog将消息appendMessage到MappedFile后会调用submitFlushRequest

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, PutMessageResult putMessageResult,
                                                              MessageExt messageExt) {
    // Synchronization flush 同步模式直接交给GroupCommitService
    if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
        final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
        if (messageExt.isWaitStoreMsgOK()) {
            GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
                                                                this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
            service.putRequest(request);
            return request.future();
        } else {
            service.wakeup();
            return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
        }
    }
    // Asynchronous flush
    else {
        //异步刷盘策略唤醒commitLogService,具体实现FlushRealTimeService
        if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
            flushCommitLogService.wakeup();
        } else  {
            commitLogService.wakeup();
        }
        return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
    }
}

GroupCommitService先放到队列

1
2
3
4
5
6
7
8
9
10
11
12
13
//GroupTransferService
public synchronized void putRequest(final CommitLog.GroupCommitRequest request) {
    synchronized (this.requestsWrite) {
        this.requestsWrite.add(request);//放入队列requestsWrite
    }
    this.wakeup();
}
//唤醒消费
public void wakeup() {
    if (hasNotified.compareAndSet(false, true)) {
        waitPoint.countDown(); // notify
    }
}

消费在run 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public void run() {
    CommitLog.log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        try {
            this.waitForRunning(10);//swapRequests交换数据requestsWrite-->requestsRead
            this.doCommit();//读取requestsRead的数据执行flush操作
        } catch (Exception e) {
            CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
        }
    }

    // Under normal circumstances shutdown, wait for the arrival of the
    // request, and then flush
    try {
        Thread.sleep(10);
    } catch (InterruptedException e) {
        CommitLog.log.warn("GroupCommitService Exception, ", e);
    }

    synchronized (this) {
        this.swapRequests();
    }

    this.doCommit();

    CommitLog.log.info(this.getServiceName() + " service end");
}       
//requestsWrite给requestsRead
private void swapRequests() {
    List<GroupCommitRequest> tmp = this.requestsWrite;
    this.requestsWrite = this.requestsRead;
    this.requestsRead = tmp;
}


doCommit主要读取requestsRead数据,最终调用CommitLog.this.mappedFileQueue.flush(0) 来进行刷盘

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private void doCommit() {
    synchronized (this.requestsRead) {
        if (!this.requestsRead.isEmpty()) {
            for (GroupCommitRequest req : this.requestsRead) {
                // There may be a message in the next file, so a maximum of
                // two times the flush
                boolean flushOK = false;
                for (int i = 0; i < 2 && !flushOK; i++) {
                    flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
                    //
                    if (!flushOK) {
                        CommitLog.this.mappedFileQueue.flush(0);//消息同步flush
                    }
                }

                req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT);
            }

            long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
            if (storeTimestamp > 0) {
                CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
            }

            this.requestsRead.clear();
        } else {
            // Because of individual messages is set to not sync flush, it
            // will come to this process
            CommitLog.this.mappedFileQueue.flush(0);//消息同步
        }
    }
}

异步刷盘 分两种情况
正常情况下使用:FlushRealTimeService
如果开启内存缓冲池transientStorePool则会使用CommitRealTimeService

1
2
3
4
5
6
7
if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
    this.flushCommitLogService = new GroupCommitService(); //mmap模式同步
} else {
    this.flushCommitLogService = new FlushRealTimeService();//mmap模式异步
}

this.commitLogService = new CommitRealTimeService();//内存缓冲池时间使用

如果开启,在获取最新的MappedFile文件的时候,通过来AllocateMappedFileService异步创建,
AllocateMappedFileService继承自ServiceThread,其run方法会调用mmapOperation,在mmapOperation中创建mappedFile

1
2
3
4
5
6
7
8
9
10
11
12
13
//AllocateMappedFileService.mmapOperation
if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {//开启内存缓冲池
    //缓冲池
    try {
        mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
        mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
    } catch (RuntimeException e) {
        log.warn("Use default implementation.");
        mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());//
    }
} else {
    mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());//mmap方式
}

常规异步刷盘FlushRealTimeService
在run方法周期性的进行刷盘操作,主要逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
while (!this.isStopped()) {
    //休眠策略 使用Thread.sleep 还是CountDownLatch2.await
    boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
    //刷盘周期,默认: 500ms
    int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
    //每次刷盘至少要刷多少页内容,默认每次要刷4页,每页大小为4k
    int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
    //最大时间间隔10s
    int flushPhysicQueueThoroughInterval =
    CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();

    boolean printFlushProgress = false;

    // Print flush progress
    long currentTimeMillis = System.currentTimeMillis();
    //当前距离上次刷盘时间间隔
    if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
        this.lastFlushTimestamp = currentTimeMillis;
        flushPhysicQueueLeastPages = 0;//超时全部需要flush了
        printFlushProgress = (printTimes++ % 10) == 0;
    }

    try {
        //休眠策略
        if (flushCommitLogTimed) {
            Thread.sleep(interval);
        } else {
            this.waitForRunning(interval);
        }

        if (printFlushProgress) {
            this.printFlushProgress();
        }

        long begin = System.currentTimeMillis();
        //执行刷盘操作
        CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
        long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
        if (storeTimestamp > 0) {
            CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
        }
        long past = System.currentTimeMillis() - begin;
        if (past > 500) {
            log.info("Flush data to disk costs {} ms", past);
        }
    } catch (Throwable e) {
        CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
        this.printFlushProgress();
    }
}

CommitRealTimeService
调用方法MappedFileQueue#commit执行数据提交到磁盘工作,commit没有执行force操作强制刷盘
一旦有数据被commit,就会唤醒flushCommitLogService执行刷盘操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
while (!this.isStopped()) {
    //提交到FileChannel的时间间隔,在TransientStorePool打开的情况下使用,默认200ms
    int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();
    //每次提交至少多少个page(默认是4个)
    int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();
    //提交完成间隔时间
    int commitDataThoroughInterval =
    CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();

    long begin = System.currentTimeMillis();
    if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {
        this.lastCommitTimestamp = begin;
        commitDataLeastPages = 0;
    }

    try {
        //有数据commited返回false
        boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
        long end = System.currentTimeMillis();
        if (!result) {
            this.lastCommitTimestamp = end; // result = false means some data committed.
            //now wake up flush thread.
            flushCommitLogService.wakeup();//唤醒flushCommitLogService执行刷盘
        }

        if (end - begin > 500) {
            log.info("Commit data to file costs {} ms", end - begin);
        }
        this.waitForRunning(interval);
    } catch (Throwable e) {
        CommitLog.log.error(this.getServiceName() + " service has exception. ", e);
    }
}
This post is licensed under CC BY 4.0 by the author.