版本
基于rocketmq-all-4.3.1
版本;
ConsumeQueue
1、 每个ConsumeQueue
都有一个queueId,queueId的值为0到TopicConfig配置的队列数量比如某个Topic的消费队列数量为4,那么四个ConsumeQueue
的queueId就分别为0、1、2、3;
2、 ConsumerQueue
相当于CommitLog
的索引文件,消费者消费时会先从ConsumerQueue
中查找消息在CommitLog
中的offset,再去CommitLog中找原始消息数据如果某个消息只在CommitLog
中有数据,没在ConsumerQueue
中,则消费者无法消费;
3、 Consumequeue
类对应的是每个topic和queuId下面的所有文件.默认存储路径是**$HOME/store/consumequeue/{topic}/{queueId}/{fileName},每个文件由30w条数据组成,单个文件的大小是30wx20Byte**;
4、 每一个ConsumeQueue存储的格式为commitLogOffset(8B)+size(4B)+tagHashCode(8B)
,总共20B存tag是为了在Consumer取到消息offset后先根据tag做一次过滤,剩下的才需要到CommitLog
中取消息详情;
5、 ConsumeQueue核心属性;
public class ConsumeQueue {
private final DefaultMessageStore defaultMessageStore;
//映射文件队列,管理ConsumeQueue
private final MappedFileQueue mappedFileQueue;
// 消息topic
private final String topic;
// 消息队列Id
private final int queueId;
//指定大小的缓冲,记录的大小是20byte的固定大小
private final ByteBuffer byteBufferIndex;
//保存的路径
private final String storePath;
//映射文件的大小
private final int mappedFileSize;
//最后一个消息对应的物理偏移量 也就是在CommitLog中的偏移量
private long maxPhysicOffset = -1;
//最小的逻辑偏移量 在ConsumeQueue中的最小偏移量
private volatile long minLogicOffset = 0;
//ConsumeQueue的扩展文件
private ConsumeQueueExt consumeQueueExt = null;
}
6、 构造方法;
public ConsumeQueue(
final String topic,
final int queueId,
final String storePath,
final int mappedFileSize,
final DefaultMessageStore defaultMessageStore) {
this.storePath = storePath;
this.mappedFileSize = mappedFileSize;
this.defaultMessageStore = defaultMessageStore;
this.topic = topic;
this.queueId = queueId;
//存储路径${this.storePath}/{topic}/{queueId}/{fileName}
String queueDir = this.storePath
+ File.separator + topic
+ File.separator + queueId;
this.mappedFileQueue = new MappedFileQueue(queueDir, mappedFileSize, null);
//分配一个存储单元大小(20B)的缓冲区
this.byteBufferIndex = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE);
//是否启用消息队列的扩展存储
if (defaultMessageStore.getMessageStoreConfig().isEnableConsumeQueueExt()) {
this.consumeQueueExt = new ConsumeQueueExt(
topic,
queueId,
StorePathConfigHelper.getStorePathConsumeQueueExt(defaultMessageStore.getMessageStoreConfig().getStorePathRootDir()),
defaultMessageStore.getMessageStoreConfig().getMappedFileSizeConsumeQueueExt(),
defaultMessageStore.getMessageStoreConfig().getBitMapLengthConsumeQueueExt()
);
}
}
加载(load)
1、 Broker启动时,调用load加载ConsumeQueue加载直接委托mappedFileQueue进行加载;
public boolean load() {
boolean result = this.mappedFileQueue.load();
log.info("load consume queue " + this.topic + "-" + this.queueId + " " + (result ? "OK" : "Failed"));
//扩展存储存在则加载
if (isExtReadEnable()) {
result &= this.consumeQueueExt.load();
}
return result;
}
恢复(recover)
1、 Broker启动时会尝试恢复ConsumeQueue文件;
- 如果文件个数大于3个就从倒数第三个文件开始恢复,否则从第一个开始
- 循环遍历文件中的的所有数据,按照20个字节读取。知道全部读取完成
- 删除有效offset之后的文件
2、 源码;
public void recover() {
final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
if (!mappedFiles.isEmpty()) {
//如果文件列表大于3就从倒数第3个开始,否则从第一个开始
int index = mappedFiles.size() - 3;
if (index < 0)
index = 0;
//获取consumeQueue单个文件的大小
int mappedFileSizeLogics = this.mappedFileSize;
//获取index对应的映射文件
MappedFile mappedFile = mappedFiles.get(index);
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
//映射文件的起始偏移量(也是文件名)
long processOffset = mappedFile.getFileFromOffset();
long mappedFileOffset = 0;
long maxExtAddr = 1;
while (true) {
//遍历文件中的所有数据(20个字节一次遍历)
for (int i = 0; i < mappedFileSizeLogics; i += CQ_STORE_UNIT_SIZE) {
long offset = byteBuffer.getLong();
int size = byteBuffer.getInt();
long tagsCode = byteBuffer.getLong();
//顺序解析,每个数据单元隔20个字节,如果offset跟size大于0则表示有效
if (offset >= 0 && size > 0) {
//正常数据的大小
mappedFileOffset = i + CQ_STORE_UNIT_SIZE;
//设置最大的物理偏移量
this.maxPhysicOffset = offset;
if (isExtAddr(tagsCode)) {
maxExtAddr = tagsCode;
}
} else {
log.info("recover current consume queue file over, " + mappedFile.getFileName() + " "
+ offset + " " + size + " " + tagsCode);
break;
}
}
//文件加载完毕
if (mappedFileOffset == mappedFileSizeLogics) {
index++;
// 完成加载跳出循环
if (index >= mappedFiles.size()) {
log.info("recover last consume queue file over, last mapped file "
+ mappedFile.getFileName());
break;
} else {
//下一个文件,继续循环读取
mappedFile = mappedFiles.get(index);
byteBuffer = mappedFile.sliceByteBuffer();
processOffset = mappedFile.getFileFromOffset();
mappedFileOffset = 0;
log.info("recover next consume queue file, " + mappedFile.getFileName());
}
} else {
log.info("recover current consume queue queue over " + mappedFile.getFileName() + " "
+ (processOffset + mappedFileOffset));
break;
}
}
// 最后一个文件的起始偏移量+正常数据的长度
processOffset += mappedFileOffset;
//设置flush和commit偏移量位置
this.mappedFileQueue.setFlushedWhere(processOffset);
this.mappedFileQueue.setCommittedWhere(processOffset);
//删除有效的 offset 之后的文件(后面的是无效的,需要删除掉)
this.mappedFileQueue.truncateDirtyFiles(processOffset);
//如果有扩展文件,则恢复扩展文件
if (isExtReadEnable()) {
this.consumeQueueExt.recover();
log.info("Truncate consume queue extend file by max {}", maxExtAddr);
this.consumeQueueExt.truncateByMaxAddress(maxExtAddr);
}
}
}
追加分发的消息(putMessagePositionInfoWrapper)
1、 分发的消息通过putMessagePositionInfoWrapper方法追加;
- 判断消息队列是否可写,是否开启写ConsumeQueue扩展文件默认false
- 将消息写入缓存区MappedFile中
- 如果追加成功,则更新checkpoint的时间戳
2、 putMessagePositionInfoWrapper
源码;
public void putMessagePositionInfoWrapper(DispatchRequest request) {
final int maxRetries = 30;
//判断ConsumeQueue是否可写
boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
for (int i = 0; i < maxRetries && canWrite; i++) {
long tagsCode = request.getTagsCode();
//是否开启写ConsumeQueue扩展文件,默认false
//bloom过滤器先记录消息的bitMap,这样consumer来读取消息时先通过bloom过滤器判断是否有符合过滤条件的消息
if (isExtWriteEnable()) {
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
cqExtUnit.setFilterBitMap(request.getBitMap());
cqExtUnit.setMsgStoreTime(request.getStoreTimestamp());
cqExtUnit.setTagsCode(request.getTagsCode());
long extAddr = this.consumeQueueExt.put(cqExtUnit);
if (isExtAddr(extAddr)) {
tagsCode = extAddr;
} else {
log.warn("Save consume queue extend fail, So just save tagsCode! {}, topic:{}, queueId:{}, offset:{}", cqExtUnit,
topic, queueId, request.getCommitLogOffset());
}
}
//写入缓冲区
boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),
request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());
//如果更新成功,则更新checkpoint文件
if (result) {
this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
return;
} else {
// XXX: warn and notify me
log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + request.getCommitLogOffset()
+ " failed, retry " + i + " times");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
log.warn("", e);
}
}
}
// XXX: warn and notify me
log.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId);
this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
}
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
final long cqOffset) {
//maxPhysicOffset记录了上一次ConsumeQueue更新的消息在CommitLog中的偏移量
// 如果消息已经被处理,则直接返回true
if (offset <= this.maxPhysicOffset) {
return true;
}
this.byteBufferIndex.flip();
//20byte
this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
this.byteBufferIndex.putLong(offset);//写入消息偏移量
this.byteBufferIndex.putInt(size);//写入消息长度
this.byteBufferIndex.putLong(tagsCode);//写入tag hashcode
//cqOffset为ConsumerQueue中记录了偏移量总数
final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
//获取ConsumeQueue当前对应的MappedFile,ConsumeQueue本身也是通过MappedFileQueue来管理的
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
if (mappedFile != null) {
//对于新建的文件,填充0来预热PageCache
if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
this.minLogicOffset = expectLogicOffset;
this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
this.fillPreBlank(mappedFile, expectLogicOffset);
log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
+ mappedFile.getWrotePosition());
}
if (cqOffset != 0) {
long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
//说明已经处理过
if (expectLogicOffset < currentLogicOffset) {
log.warn("Build consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);
return true;
}
if (expectLogicOffset != currentLogicOffset) {
LOG_ERROR.warn(
"[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
expectLogicOffset,
currentLogicOffset,
this.topic,
this.queueId,
expectLogicOffset - currentLogicOffset
);
}
}
//更新物理偏移量,追加到MappedFile。如果appendMessage追加失败了,等下次继续追加,所以这里可以直接给maxPhysicOffset赋值,不用关心是否追加成功
this.maxPhysicOffset = offset;
return mappedFile.appendMessage(this.byteBufferIndex.array());
}
return false;
}
private void fillPreBlank(final MappedFile mappedFile, final long untilWhere) {
ByteBuffer byteBuffer = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE);
byteBuffer.putLong(0L);
byteBuffer.putInt(Integer.MAX_VALUE);
byteBuffer.putLong(0L);
int until = (int) (untilWhere % this.mappedFileQueue.getMappedFileSize());
for (int i = 0; i < until; i += CQ_STORE_UNIT_SIZE) {
mappedFile.appendMessage(byteBuffer.array());
}
}
ReputMessageService
1、 ReputMessageService是一个服务线程,用于ConsumeQueue持久化,它是DefaultMessageStore的内部类此服务线程主要运行doReput方法每间隔1ms执行一次;
public void run() {
DefaultMessageStore.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
Thread.sleep(1);
this.doReput();
} catch (Exception e) {
DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
DefaultMessageStore.log.info(this.getServiceName() + " service end");
}
2、 doReput大致步骤;
- 获取CommitLog在reputFromOffset处存储的可以被处理的消息(可能在不同的MappedFile中)
- 循环遍历这些消息,并将每个消息相关数据转换为DispatchRequest
- 分发DispatchRequest到CommitLogDispatcher中执行
3、 doReput流程图;
CommitLogDispatcher
1、 CommitLogDispatcher是CommitLog日志消息分发器,主要用来生成ConsumerQueue和IndexFile此接口有三个实现;
- CommitLogDispatcherBuildConsumeQueue 处理ConsumeQueue的生成
- CommitLogDispatcherBuildIndex处理IndexFile的生成
- CommitLogDispatcherCalcBitMap 处理计算bit Map
2、 CommitLogDispatcherBuildConsumeQueue源码只处理TRANSACTION_NOT_TYPE和TRANSACTION_COMMIT_TYPE这两种消息这两种消息一种是非事务消息,即普通消息一种是事务已经确切提交的消息;
class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {
@Override
public void dispatch(DispatchRequest request) {
final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
switch (tranType) {
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
DefaultMessageStore.this.putMessagePositionInfo(request);
break;
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
}
}
}
public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
//根据主题与队列获取对应的ConsumeQueue
ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
cq.putMessagePositionInfoWrapper(dispatchRequest);
}