基于RocketMQ release-4.9.3,深入的介绍了DefaultMQPushConsumer处理Broker的拉取消息响应源码。
此前我们学习了Consumer如何发起的拉取消息请求,以及Broker如何处理拉取消息请求,现在我们来学习Consumer如何处理Broker的拉取消息响应的源码。
1 客户端异步请求回调
此前我们讲过在consumer发起拉取消息请求的时候,通过ASYNC模式异步的进行拉取,并且InvokeCallback#operationComplete方法将会在得到结果之后进行回调,内部调用pullCallback的回调方法。
在回调方法中,如果解析到了响应结果,那么调用pullCallback#onSuccess方法处理,否则调用pullCallback#onException方法处理。
/**
* MQClientAPIImpl的方法
* 异步的拉取消息,并且触发回调函数
*
* @param addr broker地址
* @param request 请求命令对象
* @param timeoutMillis 消费者消息拉取超时时间,默认30s
* @param pullCallback 拉取到消息之后调用的回调函数
* @throws RemotingException
* @throws InterruptedException
*/
private void pullMessageAsync(
final String addr,
final RemotingCommand request,
final long timeoutMillis,
final PullCallback pullCallback
) throws RemotingException, InterruptedException {
/*
* 基于netty给broker发送异步消息,设置一个InvokeCallback回调对象
*
* InvokeCallback#operationComplete方法将会在得到结果之后进行回调,内部调用pullCallback的回调方法
*/
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
/**
* 异步执行的回调方法
*/
@Override
public void operationComplete(ResponseFuture responseFuture) {
//返回命令对象
RemotingCommand response = responseFuture.getResponseCommand();
if (response != null) {
try {
//解析响应获取结果
PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response, addr);
assert pullResult != null;
//如果解析到了结果,那么调用pullCallback#onSuccess方法处理
pullCallback.onSuccess(pullResult);
} catch (Exception e) {
//出现异常则调用pullCallback#onException方法处理异常
pullCallback.onException(e);
}
} else {
//没有结果,都调用onException方法处理异常
if (!responseFuture.isSendRequestOK()) {
//发送失败
pullCallback.onException(new MQClientException("send request failed to " + addr + ". Request: " + request, responseFuture.getCause()));
} else if (responseFuture.isTimeout()) {
//超时
pullCallback.onException(new MQClientException("wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request,
responseFuture.getCause()));
} else {
pullCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " + request, responseFuture.getCause()));
}
}
}
});
}
1.1.1.1. processPullResponse解析响应
该方法处理response获取PullResult,根据响应的数据创建PullResultExt对象返回,注意此时拉取到的消息还是一个字节数组。
/**
* MQClientAPIImpl的方法
* 处理response获取PullResult
*/
private PullResult processPullResponse(
final RemotingCommand response,
final String addr) throws MQBrokerException, RemotingCommandException {
PullStatus pullStatus = PullStatus.NO_NEW_MSG;
//设置结果状态码
switch (response.getCode()) {
case ResponseCode.SUCCESS:
pullStatus = PullStatus.FOUND;
break;
case ResponseCode.PULL_NOT_FOUND:
pullStatus = PullStatus.NO_NEW_MSG;
break;
case ResponseCode.PULL_RETRY_IMMEDIATELY:
pullStatus = PullStatus.NO_MATCHED_MSG;
break;
case ResponseCode.PULL_OFFSET_MOVED:
pullStatus = PullStatus.OFFSET_ILLEGAL;
break;
default:
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
//解析响应头
PullMessageResponseHeader responseHeader =
(PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class);
//根据响应的数据创建PullResultExt对象返回,此时拉取到的消息还是一个字节数组
return new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(),
responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody());
}
2 PullCallback回调
在processPullResponse处理response之后,会调用此前DefaultMQPushConsumerImpl#pullMessage方法中创建的PullCallback消息拉取的回调函数,执行onSuccess回调方法。如果解析过程中抛出异常,则调用onException方法。
onSuccess回调方法的大概逻辑为:
1、 调用processPullResult方法处理pullResult,进行消息解码、过滤以及设置其他属性的操作,返回pullResult;
2、 如果没有拉取到消息,那么设置下一次拉取的起始offset到PullRequest中,调用executePullRequestImmediately方法立即将拉取请求再次放入PullMessageService的pullRequestQueue中,PullMessageService是一个线程服务,PullMessageService将会循环的获取pullRequestQueue中的pullRequest然后向broker发起新的拉取消息请求,进行下次消息的拉取;
3、 如果拉取到了消息,将拉取到的所有消息,存入对应的processQueue处理队列内部的msgTreeMap中,等待被异步的消费;
4、 通过consumeMessageService将拉取到的消息构建为ConsumeRequest,然后通过内部的consumeExecutor线程池消费消息,consumeMessageService有ConsumeMessageConcurrentlyService并发消费和ConsumeMessageOrderlyService顺序消费两种实现;
5、 获取配置的消息拉取间隔,默认为0,如果大于0则调用executePullRequestLater方法,等待间隔时间后将拉取请求再次放入pullRequestQueue中,否则立即调用executePullRequestImmediately放入pullRequestQueue中,进行下次消息的拉取;
如果是onException方法,那么延迟3s将拉取请求再次放入PullMessageService的pullRequestQueue中,等待下次拉取。
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
/*
* 1 处理pullResult,进行消息解码、过滤以及设置其他属性的操作
*/
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
switch (pullResult.getPullStatus()) {
case FOUND:
//拉取的起始offset
long prevRequestOffset = pullRequest.getNextOffset();
//设置下一次拉取的起始offset到PullRequest中
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
//增加拉取耗时
long pullRT = System.currentTimeMillis() - beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);
long firstMsgOffset = Long.MAX_VALUE;
//如果没有消息
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
/*
* 立即将拉取请求再次放入PullMessageService的pullRequestQueue中,PullMessageService是一个线程服务
* PullMessageService将会循环的获取pullRequestQueue中的pullRequest然后向broker发起新的拉取消息请求
* 进行下次消息的拉取
*/
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
//获取第一个消息的offset
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
//增加拉取tps
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
/*
* 2 将拉取到的所有消息,存入对应的processQueue处理队列内部的msgTreeMap中
*/
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
/*
* 3 通过consumeMessageService将拉取到的消息构建为ConsumeRequest,然后通过内部的consumeExecutor线程池消费消息
* consumeMessageService有ConsumeMessageConcurrentlyService并发消费和ConsumeMessageOrderlyService顺序消费两种实现
*/
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
/*
* 4 获取配置的消息拉取间隔,默认为0,则等待间隔时间后将拉取请求再次放入pullRequestQueue中,否则立即放入pullRequestQueue中
* 进行下次消息的拉取
*/
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
/*
* 将executePullRequestImmediately的执行放入一个PullMessageService的scheduledExecutorService延迟任务线程池中
* 等待给定的延迟时间到了之后再执行executePullRequestImmediately方法
*/
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
/*
* 立即将拉取请求再次放入PullMessageService的pullRequestQueue中,等待下次拉取
*/
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
}
if (pullResult.getNextBeginOffset() < prevRequestOffset
|| firstMsgOffset < prevRequestOffset) {
log.warn(
"[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
pullResult.getNextBeginOffset(),
firstMsgOffset,
prevRequestOffset);
}
break;
case NO_NEW_MSG:
//没有匹配到消息
case NO_MATCHED_MSG:
//更新下一次拉取偏移量
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
//立即将拉取请求再次放入PullMessageService的pullRequestQueue中,等待下次拉取
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
//请求offset不合法,过大或者过小
case OFFSET_ILLEGAL:
log.warn("the pull request offset illegal, {} {}",
pullRequest.toString(), pullResult.toString());
//更新下一次拉取偏移量
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
//丢弃拉取请求
pullRequest.getProcessQueue().setDropped(true);
DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
@Override
public void run() {
try {
//更新下次拉取偏移量
DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
pullRequest.getNextOffset(), false);
//持久化offset
DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
//移除对应的消费队列,同时将消息队列从负载均衡服务中移除
DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
log.warn("fix the pull request offset, {}", pullRequest);
} catch (Throwable e) {
log.error("executeTaskLater Exception", e);
}
}
}, 10000);
break;
default:
break;
}
}
}
@Override
public void onException(Throwable e) {
if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("execute the pull request exception", e);
}
/*
* 出现异常,延迟3s将拉取请求再次放入PullMessageService的pullRequestQueue中,等待下次拉取
*/
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
};
2.1 processPullResult处理拉取结果
处理pullResult,进行消息解码、过滤以及设置其他属性的操作。
1、 更新下次拉取建议的brokerId,下次拉取消息时从pullFromWhichNodeTable中直接取出;
2、 对消息二进制字节数组进行解码转换为java的List消息集合;
3. 如果存在tag,并且不是classFilterMode,那么按照tag过滤消息,这就是客户端的消息过滤。这采用String#equals方法过滤,而broker端则是比较的tagHash值,即hashCode。
4、 如果有消息过滤钩子,那么执行钩子方法,这里可以扩展自定义的消息过滤的逻辑;
5、 遍历过滤通过的消息,设置属性例如事务id,最大、最小偏移量、brokerName;
6、 将过滤后的消息存入msgFoundList集合;
7、 因为消息已经被解析了,那么设置消息的字节数组为null,释放内存;
/**
* PullAPIWrapper的方法
* 处理pullResult,进行消息解码、过滤以及设置其他属性的操作
*
* @param mq 消息队列
* @param pullResult 拉取结果
* @param subscriptionData 获取topic对应的SubscriptionData订阅关系
* @return 处理后的PullResult
*/
public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
final SubscriptionData subscriptionData) {
PullResultExt pullResultExt = (PullResultExt) pullResult;
/*
* 1 更新下次拉取建议的brokerId,下次拉取消息时从pullFromWhichNodeTable中直接取出
*/
this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
if (PullStatus.FOUND == pullResult.getPullStatus()) {
/*
* 2 对二进制字节数组进行解码转换为java的List<MessageExt>消息集合
*/
ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);
List<MessageExt> msgListFilterAgain = msgList;
/*
* 3 如果存在tag,并且不是classFilterMode,那么按照tag过滤消息,这就是客户端的消息过滤
*/
if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {
msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
for (MessageExt msg : msgList) {
if (msg.getTags() != null) {
//这采用String#equals方法过滤,而broker端则是比较的tagHash值,即hashCode
if (subscriptionData.getTagsSet().contains(msg.getTags())) {
msgListFilterAgain.add(msg);
}
}
}
}
/*
* 4 如果有消息过滤钩子,那么执行钩子方法,这里可以扩展自定义的消息过滤的逻辑
*/
if (this.hasHook()) {
FilterMessageContext filterMessageContext = new FilterMessageContext();
filterMessageContext.setUnitMode(unitMode);
filterMessageContext.setMsgList(msgListFilterAgain);
this.executeHook(filterMessageContext);
}
/*
* 5 遍历过滤通过的消息,设置属性
*/
for (MessageExt msg : msgListFilterAgain) {
//事务消息标识
String traFlag = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
//如果是事务消息,则设置事务id
if (Boolean.parseBoolean(traFlag)) {
msg.setTransactionId(msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
}
//将响应中的最小和最大偏移量存入msg
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET,
Long.toString(pullResult.getMinOffset()));
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,
Long.toString(pullResult.getMaxOffset()));
//设置brokerName到msg
msg.setBrokerName(mq.getBrokerName());
}
//将过滤后的消息存入msgFoundList集合
pullResultExt.setMsgFoundList(msgListFilterAgain);
}
//6 因为消息已经被解析了,那么设置消息的字节数组为null,释放内存
pullResultExt.setMessageBinary(null);
return pullResult;
}
/**
* PullAPIWrapper的方法
* 更新下次拉取建议的brokerId
*
* @param mq 消息队列
* @param brokerId 建议的brokerId
*/
public void updatePullFromWhichNode(final MessageQueue mq, final long brokerId) {
//存入pullFromWhichNodeTable集合
AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
if (null == suggest) {
this.pullFromWhichNodeTable.put(mq, new AtomicLong(brokerId));
} else {
suggest.set(brokerId);
}
}
2.2 executePullRequestImmediately再次拉取消息
将拉取请求再次放入PullMessageService的pullRequestQueue中,PullMessageService是一个线程服务。PullMessageService将会循环的获取pullRequestQueue中的pullRequest然后向broker发起新的拉取消息请求,进行下次消息的拉取。
/**
* DefaultMQPushConsumerImpl的方法
* 下一次消息拉取
*
* @param pullRequest 拉取请求
*/
public void executePullRequestImmediately(final PullRequest pullRequest) {
//调用PullMessageService#executePullRequestImmediately方法
this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest);
}
/**
* PullMessageService的方法
* 下一次消息拉取
*
* @param pullRequest 拉取请求
*/
public void executePullRequestImmediately(final PullRequest pullRequest) {
try {
//存入pullRequestQueue集合,等待下次拉取
this.pullRequestQueue.put(pullRequest);
} catch (InterruptedException e) {
log.error("executePullRequestImmediately pullRequestQueue.put", e);
}
}
2.3 putMessage存放消息
该方法将拉取到的所有消息,存入对应的processQueue处理队列内部的msgTreeMap中。
返回是否需要分发消费dispatchToConsume,当当前processQueue的内部的msgTreeMap中有消息并且consuming=false,即还没有开始消费时,将会返回true。
dispatchToConsume对并发消费无影响,只对顺序消费有影响。
/**
* ProcessQueue的方法
* 消息存入msgTreeMap这个红黑树map集合中
*
* @param msgs 一批消息
* @return 是否需要分发消费,当当前processQueue的内部的msgTreeMap中有消息并且consuming=false,即还没有开始消费时,将会返回true
*/
public boolean putMessage(final List<MessageExt> msgs) {
boolean dispatchToConsume = false;
try {
//尝试加写锁防止并发
this.treeMapLock.writeLock().lockInterruptibly();
try {
int validMsgCnt = 0;
for (MessageExt msg : msgs) {
//当该消息的偏移量以及该消息存入msgTreeMap
MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg);
if (null == old) {
//如果集合没有这个offset的消息,那么增加统计数据
validMsgCnt++;
this.queueOffsetMax = msg.getQueueOffset();
msgSize.addAndGet(msg.getBody().length);
}
}
//消息计数
msgCount.addAndGet(validMsgCnt);
//当前processQueue的内部的msgTreeMap中有消息并且consuming=false,即还没有开始消费时,dispatchToConsume = true,consuming = true
if (!msgTreeMap.isEmpty() && !this.consuming) {
dispatchToConsume = true;
this.consuming = true;
}
//计算broker累计消息数量
if (!msgs.isEmpty()) {
MessageExt messageExt = msgs.get(msgs.size() - 1);
String property = messageExt.getProperty(MessageConst.PROPERTY_MAX_OFFSET);
if (property != null) {
long accTotal = Long.parseLong(property) - messageExt.getQueueOffset();
if (accTotal > 0) {
this.msgAccCnt = accTotal;
}
}
}
} finally {
this.treeMapLock.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("putMessage exception", e);
}
return dispatchToConsume;
}
2.4 消息的两次过滤
通过前面的源码可以看到,消息实际上经过了两次过滤,一次是在broekr中,一次是拉取到consumer之后,为什么经过两次过滤呢?因为broker中的过滤是比较的hashCode值,而hashCode存在哈希碰撞的可能,因此hashCode对比相等之后,还需要在consumer端进行equals的比较,再过滤一次。
为什么服务端不直接进行equals过滤呢?因为tag的长度是不固定的,而通过hash算法可以生成固定长度的hashCode值,这样才能保证每个consumequeue索引条目的长度一致。而tag的真正值保存在commitLog的消息体中,虽然broker最终会回去到commitLog中的消息并返回,但是获取的获取一段消息字节数组,并没有进行反序列化为Message对象,因此无法获取真实值,而在consumer端一定会做反序列化操作的,因此tag的equals比较放在了consumer端。
3 总结
本次我们来学习Consumer如何处理Broker的拉取消息响应的源码。入口就是MQClientAPIImpl#pullMessageAsync方法内部的回调函数InvokeCallback#operationComplete方法。
1、 在这个方法中,首先进行消息的解码以及第二次过滤,然后将消息存入对应的processQueue处理队列内部的msgTreeMap中;
2. 然后通过consumeMessageService#submitConsumeRequest方法将拉取到的消息构建为ConsumeRequest,然后通过内部的consumeExecutor线程池的消费消息。
1、 consumeMessageService有ConsumeMessageConcurrentlyService并发消费和ConsumeMessageOrderlyService顺序消费两种实现;
3、 最后是再次发起消息拉取请求;
下一篇文章,我们将会去看看ConsumeMessageConcurrentlyService并发消费和ConsumeMessageOrderlyService顺序消费两种实现如何消费消息。