Home RocketMQ 消费消息
Post
Cancel

RocketMQ 消费消息

1
2
3
4
5
6
7
8
9
//PullMessageProcessor.java
private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
throws RemotingCommandException {
    //从DefaultMessageStore获取消息
    final GetMessageResult getMessageResult =
    this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
                                                       requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
}

从DefaultMessageStore获取消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
                                   final int maxMsgNums,
                                   final MessageFilter messageFilter) {
    //定位ConsumeQueue
    ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
    //从commitlog读取攒批次知道
    SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
    if (null == selectResult) {
        if (getResult.getBufferTotalSize() == 0) {
            status = GetMessageStatus.MESSAGE_WAS_REMOVING;
        }

        nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
        continue;
    }
    //追加到getResult
    getResult.addMessage(selectResult);


}

getMessageResult获取到数据后响应给comsumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//TransferMsgByHeap默认true,先拷贝到堆内存ByteBuffer
if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {
    final long beginTimeMills = this.brokerController.getMessageStore().now();
    final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());
    this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(),
                                                                     requestHeader.getTopic(), requestHeader.getQueueId(),
                                                                     (int) (this.brokerController.getMessageStore().now() - beginTimeMills));
    response.setBody(r);
} else {
    try {//zero copy 从文件中直接mmap映射获取,直接发送
        FileRegion fileRegion =
        new ManyMessageTransfer(response.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult);
        channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                getMessageResult.release();
                if (!future.isSuccess()) {
                    log.error("transfer many message by pagecache failed, {}", channel.remoteAddress(), future.cause());
                }
            }
        });
    } catch (Throwable e) {
        log.error("transfer many message by pagecache exception", e);
        getMessageResult.release();
    }

    response = null;
}

堆内存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private byte[] readGetMessageResult(final GetMessageResult getMessageResult, final String group, final String topic,
                                    final int queueId) {
    final ByteBuffer byteBuffer = ByteBuffer.allocate(getMessageResult.getBufferTotalSize());//分配ByteBuffer

    long storeTimestamp = 0;
    try {
        List<ByteBuffer> messageBufferList = getMessageResult.getMessageBufferList();
        for (ByteBuffer bb : messageBufferList) {

            byteBuffer.put(bb);
            int sysFlag = bb.getInt(MessageDecoder.SYSFLAG_POSITION);
            //                bornhost has the IPv4 ip if the MessageSysFlag.BORNHOST_V6_FLAG bit of sysFlag is 0
            //                IPv4 host = ip(4 byte) + port(4 byte); IPv6 host = ip(16 byte) + port(4 byte)
            int bornhostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 8 : 20;
            int msgStoreTimePos = 4 // 1 TOTALSIZE
            + 4 // 2 MAGICCODE
            + 4 // 3 BODYCRC
            + 4 // 4 QUEUEID
            + 4 // 5 FLAG
            + 8 // 6 QUEUEOFFSET
            + 8 // 7 PHYSICALOFFSET
            + 4 // 8 SYSFLAG
            + 8 // 9 BORNTIMESTAMP
            + bornhostLength; // 10 BORNHOST
            storeTimestamp = bb.getLong(msgStoreTimePos);
        }
    } finally {
        getMessageResult.release();
    }

    this.brokerController.getBrokerStatsManager().recordDiskFallBehindTime(group, topic, queueId, this.brokerController.getMessageStore().now() - storeTimestamp);
    return byteBuffer.array();
}
This post is licensed under CC BY 4.0 by the author.