构建消息文件ConsumeQueue和IndexFile。
1、 ConsumeQueue:看作是CommitLog的消息偏移量索引文件,存储了它所属Topic的消息在CommitLog中的偏移量消费者拉取消息的时候,可以从ConsumeQueue中快速的根据偏移量定位消息在CommitLog中的位置;
2、 IndexFile索引文件:看作是CommitLog的消息时间范围索引文件IndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法;
ConsumeQueue和IndexFile, 加快了客户端的消费速度或者是查询效率。
1.ReputMessageService消息重放服务
ReputMessageService服务将会在循环中异步的每隔1ms对于写入CommitLog的消息进行重放, 将消息构建成为DispatchRequest对象, 然后将DispatchRequest对象分发给各个CommitLogDispatcher处理, 这些CommitLogDispatcher通常会尝试构建ConsumeQueue索引、IndexFile索引以及SQL92布隆过滤器。
/**
* ReputMessageService的方法
*/
@Override
public void run() {
DefaultMessageStore.log.info(this.getServiceName() + " service started");
/*
* 运行时逻辑
* 如果服务没有停止,则在死循环中执行重放的操作
*/
while (!this.isStopped()) {
try {
//睡眠1ms
Thread.sleep(1);
//执行重放
this.doReput();
} catch (Exception e) {
DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
DefaultMessageStore.log.info(this.getServiceName() + " service end");
}
每隔1ms执行一次duReput方法, 执行重放方法。
2.doReput执行重放
所谓的重放就是完成诸如ConsumeQueue索引、IndexFile索引、布隆过滤器、唤醒长轮询线程和被hold住的请求等操作。
1、 如果重放偏移量reputFromOffset小于commitlog的最小物理偏移量,那么设置为commitlog的最小偏移量,如果重放偏移量小于commitlog的最大偏移量,那么循环重放;
2、 调用getData方法根据reputFromOffset的物理偏移量找到mappedFileQueue中对应的CommitLog文件的MappedFile,然后从该MappedFile中截取一段自reputFromOffset偏移量开始的ByteBuffer,这段内存存储着将要重放的消息;
3、 开始循环读取这段ByteBuffer中的消息,依次重放;
1、 如果存在消息,调用checkMessageAndReturnSize,检查当前消息的属性并且构建一个DispatchRequest对象返回;
2、 调用doDispatch方法分发重放请求;
1. CommitLogDispatcherBuildConsumeQueue: 根据DispatchRequest写ConsumeQueue文件, 构建ConsumeQueue索引。
2. CommitLogDispatcherBuildIndex: 根据DispatchRequest写indexFile文件, 构建indexFile索引。
3. CommitLogDispatcherCalcBitMap: 根据DispatchRequest构建布隆过滤器, 加速SQL92过滤效率, 避免每次都解析sql。
3、 如果broker角色不是SLAVE,且支持长轮询,并且消息送达的监听器不为null,那么通过该监听器的arriving方法触发调用pullRequestHoldService的pullRequestHoldService方法,唤醒挂起的拉取消息请求,表示有新的消息落盘,可以进行拉取了;
4、 如果读取到MappedFile文件尾,那么获取下一个文件的起始索引继续重放;
/**
* DefaultMessageStore的方法
* <p>
* 执行重放
*/
private void doReput() {
//如果重放偏移量reputFromOffset小于commitlog的最小物理偏移量,那么设置为commitlog的最小物理偏移量
if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
}
/*
*
* 如果重放偏移量小于commitlog的最大物理偏移量,那么循环重放
*/
for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
//如果消息允许重复复制(默认为 false)并且reputFromOffset大于等于已确定的偏移量confirmOffset,那么结束循环
if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
&& this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
break;
}
/*
* 根据reputFromOffset的物理偏移量找到mappedFileQueue中对应的CommitLog文件的MappedFile
* 然后从该MappedFile中截取一段自reputFromOffset偏移量开始的ByteBuffer,这段内存存储着将要重放的消息
*/
SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
if (result != null) {
try {
//将截取的起始物理偏移量设置为重放偏起始移量
this.reputFromOffset = result.getStartOffset();
/*
* 开始读取这段ByteBuffer中的消息,依次进行重放
*/
for (int readSize = 0; readSize < result.getSize() && doNext; ) {
//检查消息的属性并且构建一个DispatchRequest对象返回
DispatchRequest dispatchRequest =
DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
//消息大小,如果是基于Dledger技术的高可用DLedgerCommitLog则取bufferSize
int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();
if (dispatchRequest.isSuccess()) {
//如果大小大于0,表示有消息
if (size > 0) {
/*
* 分发请求
* 1. CommitLogDispatcherBuildConsumeQueue:根据DispatchRequest写ConsumeQueue文件,构建ConsumeQueue索引。
* 2. CommitLogDispatcherBuildIndex:根据DispatchRequest写IndexFile文件,构建IndexFile索引。
* 3. CommitLogDispatcherCalcBitMap:根据DispatchRequest构建布隆过滤器,加速SQL92过滤效率,避免每次都解析sql。
*/
DefaultMessageStore.this.doDispatch(dispatchRequest);
//如果broker角色不是SLAVE,并且支持长轮询,并且消息送达的监听器不为null
if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
&& DefaultMessageStore.this.brokerConfig.isLongPollingEnable()
&& DefaultMessageStore.this.messageArrivingListener != null) {
//通过该监听器的arriving方法触发调用pullRequestHoldService的pullRequestHoldService方法
//即唤醒挂起的拉取消息请求,表示有新的消息落盘,可以进行拉取了
//这里涉及到RocketMQ的consumer消费push模式的实现,后面会专门讲解consumer消费
DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
notifyMessageArrive4MultiQueue(dispatchRequest);
}
//设置重放偏起始移量加上当前消息大小
this.reputFromOffset += size;
//设置读取的大小加上当前消息大小
readSize += size;
//如果是SLAVE角色,那么存储数据的统计信息更新
if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
DefaultMessageStore.this.storeStatsService
.getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).add(1);
DefaultMessageStore.this.storeStatsService
.getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
.add(dispatchRequest.getMsgSize());
}
} else if (size == 0) {
//如果等于0,表示读取到MappedFile文件尾
//获取下一个文件的起始索引
this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
//设置readSize为0,将会结束循环
readSize = result.getSize();
}
} else if (!dispatchRequest.isSuccess()) {
if (size > 0) {
log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
this.reputFromOffset += size;
} else {
doNext = false;
// If user open the dledger pattern or the broker is master node,
// it will not ignore the exception and fix the reputFromOffset variable
if (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog() ||
DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
log.error("[BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {}",
this.reputFromOffset);
this.reputFromOffset += result.getSize() - readSize;
}
}
}
}
} finally {
result.release();
}
} else {
//如果重做完毕,则跳出循环
doNext = false;
}
}
}
2.1 isCommitLogAvailable是否需要重放
用于判断CommitLog是否需要执行重放。
/**
* ReputMessageService的方法
* CommitLog是否需要执行重放
*/
private boolean isCommitLogAvailable() {
//重放偏移量是否小于commitlog的最大物理偏移量
return this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset();
}
2.2 getData获取重放数据
根据reputFromOffset的物理偏移量找到mappedFileQueue中对应的CommitLog文件的MappedFile, 然后从该MappedFile中截取一段自reputFromOffset偏移量开始的ByteBuffer, 这段内存存储着将要重放的消息。
/**
* CommitLog的方法
*
* 获取CommitLog的数据
*/
public SelectMappedBufferResult getData(final long offset) {
return this.getData(offset, offset == 0);
}
public SelectMappedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) {
//获取CommitLog文件大小,默认1G
int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
//根据指定的offset从mappedFileQueue中对应的CommitLog文件的MappedFile
MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, returnFirstOnNotFound);
if (mappedFile != null) {
//通过指定物理偏移量,除以文件大小,得到指定的相对偏移量
int pos = (int) (offset % mappedFileSize);
//从指定相对偏移量开始截取一段ByteBuffer,这段内存存储着将要重放的消息。
SelectMappedBufferResult result = mappedFile.selectMappedBuffer(pos);
return result;
}
return null;
}
2.2.1 selectMappedBuffer截取一段内存#
从该MappedFile中截取一段自reputFromOffset偏移量开始的ByteBuffer, 这段内存存储着将要重放的消息。这段ByteBuffer和原mappedByteBuffer共享同一块内存, 但是拥有自己的指针。
然后根据起始物理索引、截取的ByteBuffer、截取的ByteBuffer大小以及当前CommitLog对象构建一个SelectMappedBufferResult对象返回。
/**
* MappedFile的方法
* @param pos 相对偏移量
*/
public SelectMappedBufferResult selectMappedBuffer(int pos) {
//获取写入位置,即最大偏移量
int readPosition = getReadPosition();
//如果指定相对偏移量小于最大偏移量并且大于等于0,那么截取内存
if (pos < readPosition && pos >= 0) {
if (this.hold()) {
//从mappedByteBuffer截取一段内存
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
byteBuffer.position(pos);
int size = readPosition - pos;
ByteBuffer byteBufferNew = byteBuffer.slice();
byteBufferNew.limit(size);
//根据起始物理索引、新的ByteBuffer、ByteBuffer大小、当前CommitLog对象构建一个SelectMappedBufferResult对象返回
return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);
}
}
return null;
}
2.3 checkMessageAndReturnSize检查消息并构建请求
检查这段内存中的下一条消息, 读取消息的各种属性即可, 不需要读取消息body, 根据属性构建一个DispatchRequest对象。
/**
* CommitLog的方法
*
* @param byteBuffer 一段内存
* @param checkCRC 是否校验CRC
* @param readBody 是否读取消息体
*/
public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, final boolean checkCRC,
final boolean readBody) {
try {
// 1 TOTAL SIZE
//消息条目总长度
int totalSize = byteBuffer.getInt();
// 2 MAGIC CODE
//消息的magicCode属性,魔数,用来判断消息是正常消息还是空消息
int magicCode = byteBuffer.getInt();
switch (magicCode) {
case MESSAGE_MAGIC_CODE:
break;
case BLANK_MAGIC_CODE:
//读取到文件末尾
return new DispatchRequest(0, true /* success */);
default:
log.warn("found a illegal magic code 0x" + Integer.toHexString(magicCode));
return new DispatchRequest(-1, false /* success */);
}
byte[] bytesContent = new byte[totalSize];
//消息体CRC校验码
int bodyCRC = byteBuffer.getInt();
//消息消费队列id
int queueId = byteBuffer.getInt();
//消息flag
int flag = byteBuffer.getInt();
//消息在消息消费队列的偏移量
long queueOffset = byteBuffer.getLong();
//消息在commitlog中的偏移量
long physicOffset = byteBuffer.getLong();
//消息系统flag,例如是否压缩、是否是事务消息
int sysFlag = byteBuffer.getInt();
//消息生产者调用消息发送API的时间戳
long bornTimeStamp = byteBuffer.getLong();
//消息发送者的IP和端口号
ByteBuffer byteBuffer1;
if ((sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0) {
byteBuffer1 = byteBuffer.get(bytesContent, 0, 4 + 4);
} else {
byteBuffer1 = byteBuffer.get(bytesContent, 0, 16 + 4);
}
//消息存储时间
long storeTimestamp = byteBuffer.getLong();
//broker的IP和端口号
ByteBuffer byteBuffer2;
if ((sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0) {
byteBuffer2 = byteBuffer.get(bytesContent, 0, 4 + 4);
} else {
byteBuffer2 = byteBuffer.get(bytesContent, 0, 16 + 4);
}
//消息重试次数
int reconsumeTimes = byteBuffer.getInt();
//事务消息物理偏移量
long preparedTransactionOffset = byteBuffer.getLong();
//消息体长度
int bodyLen = byteBuffer.getInt();
if (bodyLen > 0) {
//读取消息体
if (readBody) {
byteBuffer.get(bytesContent, 0, bodyLen);
if (checkCRC) {
int crc = UtilAll.crc32(bytesContent, 0, bodyLen);
if (crc != bodyCRC) {
log.warn("CRC check failed. bodyCRC={}, currentCRC={}", crc, bodyCRC);
return new DispatchRequest(-1, false/* success */);
}
}
} else {
//不需要读取消息体,那么跳过这段内存
byteBuffer.position(byteBuffer.position() + bodyLen);
}
}
//Topic名称内容大小
byte topicLen = byteBuffer.get();
byteBuffer.get(bytesContent, 0, topicLen);
//topic的值
String topic = new String(bytesContent, 0, topicLen, MessageDecoder.CHARSET_UTF8);
long tagsCode = 0;
String keys = "";
String uniqKey = null;
//消息属性大小
short propertiesLength = byteBuffer.getShort();
Map<String, String> propertiesMap = null;
if (propertiesLength > 0) {
byteBuffer.get(bytesContent, 0, propertiesLength);
//消息属性
String properties = new String(bytesContent, 0, propertiesLength, MessageDecoder.CHARSET_UTF8);
propertiesMap = MessageDecoder.string2messageProperties(properties);
keys = propertiesMap.get(MessageConst.PROPERTY_KEYS);
//客户端生成的uniqId,也被称为msgId,从逻辑上代表客户端生成的唯一一条消息
uniqKey = propertiesMap.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
//tag
String tags = propertiesMap.get(MessageConst.PROPERTY_TAGS);
//普通消息的tagsCode被设置为tag的hashCode
if (tags != null && tags.length() > 0) {
tagsCode = MessageExtBrokerInner.tagsString2tagsCode(MessageExt.parseTopicFilterType(sysFlag), tags);
}
/*
* 延迟消息处理
* 对于延迟消息,tagsCode被替换为延迟消息的发送时间,主要用于后续判断消息是否到期
*/
{
//消息属性中获取延迟级别DELAY字段,如果是延迟消息则生产者会在构建消息的时候设置进去
String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
//如果topic是SCHEDULE_TOPIC_XXXX,即延迟消息的topic
if (TopicValidator.RMQ_SYS_SCHEDULE_TOPIC.equals(topic) && t != null) {
int delayLevel = Integer.parseInt(t);
if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
delayLevel = this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel();
}
if (delayLevel > 0) {
//tagsCode被替换为延迟消息的发送时间,即真正投递时间
tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,
storeTimestamp);
}
}
}
}
//读取的当前消息的大小
int readLength = calMsgLength(sysFlag, bodyLen, topicLen, propertiesLength);
//不相等则记录BUG
if (totalSize != readLength) {
doNothingForDeadCode(reconsumeTimes);
doNothingForDeadCode(flag);
doNothingForDeadCode(bornTimeStamp);
doNothingForDeadCode(byteBuffer1);
doNothingForDeadCode(byteBuffer2);
log.error(
"[BUG]read total count not equals msg total size. totalSize={}, readTotalCount={}, bodyLen={}, topicLen={}, propertiesLength={}",
totalSize, readLength, bodyLen, topicLen, propertiesLength);
return new DispatchRequest(totalSize, false/* success */);
}
//根据读取的消息属性内容,构建为一个DispatchRequest对象并返回
return new DispatchRequest(
topic,
queueId,
physicOffset,
totalSize,
tagsCode,
storeTimestamp,
queueOffset,
keys,
uniqKey,
sysFlag,
preparedTransactionOffset,
propertiesMap
);
} catch (Exception e) {
}
//读取异常
return new DispatchRequest(-1, false /* success */);
}
2.4 doDispatch分发请求
ReputMessageService服务的核心代码, 循环调用DefaultMessageStore内部的dispatcherList中的CommitLogDispatcher的dispatch方法, 处理这个请求。
1、 CommitLogDispatcherBuildConsumeQueue:根据DispatchRequest写ConsumeQueue文件,构建ConsumeQueue索引;
2、 CommitLogDispatcherBuildIndex:根据DispatchRequest写indexFile文件,构建indexFile索引;
3、 CommitLogDispatcherCalcBitMap:根据DispatchRequest构建布隆过滤器,加速SQL92过滤效率,避免每次都解析sql;
/**
* DefaultMessageStore的方法
*
* @param req 分发请求
*/
public void doDispatch(DispatchRequest req) {
//循环调用CommitLogDispatcher#dispatch处理
for (CommitLogDispatcher dispatcher : this.dispatcherList) {
dispatcher.dispatch(req);
}
}