27、RocketMQ 实战 - RocketMQ 消费幂等

为了防⽌消息重复消费导致业务处理异常,消息队列 RocketMQ 版的消费者在接收到消息后,有必要根据业务上的唯⼀ Key 对消息做幂等处理。

什么是消息幂等

如果有⼀个操作,多次执⾏与⼀次执⾏所产⽣的影响是相同的,我们就称这个操作是幂等的。
当出现消费者对某条消息重复消费的情况时,重复消费的结果与消费⼀次的结果是相同的,并且多次消费并未对业务系统产⽣任何负⾯影响,那么这整个过程就可实现消息幂等。

适⽤场景

在互联⽹应⽤中,尤其在⽹络不稳定的情况下,消息队列 RocketMQ 版的消息有可能会出现重复。如果消息重复会影响您的业务处理,请对消息做幂等处理。

消息重复的场景如下:

  • 发送时消息重复
     

当⼀条消息已被成功发送到服务端并完成持久化,此时出现了⽹络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时⽣产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。

  • 投递时消息重复
     

消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候⽹络闪断。为了保证消息⾄少被消费⼀次,消息队列 RocketMQ 版的服务端将在⽹络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID也相同的消息。

  • 负载均衡时消息重复(包括但不限于⽹络抖动、Broker 重启以及消费者应⽤重启)

当消息队列 RocketMQ 版的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息。

实现消息幂等

定义消息幂等的两要素:

幂等令牌

幂等令牌是⽣产者和消费者两者中的既定协议,在业务中通常是具备唯⼀业务标识的字符串,如:
订单号、流⽔号等。且⼀般由⽣产者端⽣成并传递给消费者端。

处理唯⼀性的确保

缓存唯⼀索引
可以使用Redis缓存

RocketMQ如何处理消息幂等

RocketMQ能够保证消息不丢失但不保证消息不重复。

如果在RocketMQ中实现消息去重实际也是可以的,但是考虑到⾼可⽤以及⾼性能的需求,如果做了服务端的消息去重,RocketMQ就需要对消息做额外的rehash、排序等操作,这会花费较⼤的时间和空间等资源代价,收益并不明显。
RocketMQ考虑到正常情况下出现重复消息的概率其实是很⼩的,因此RocketMQ将消息幂等操作交给了业务⽅处理。

因为Message ID 有可能出现冲突(重复)的情况,因此不建议通过MessageID作为处理依据,⽽最好的⽅式是以业务唯⼀标识作为幂等处理的关键依据如:订单号、流⽔号等作为幂等处理的关键依据。⽽业务的唯⼀标识可以通过消息 Key 设置。

以⽀付场景为例,可以将消息的 Key 设置为订单号,作为幂等处理的依据。具体代码示例如下

Message message = new Message();
message.setKeys("ORDERID_100");
SendResult sendResult = producer.send(message);

消费者收到消息时可以根据消息的 Key,即订单号来实现消息幂等:

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext
context) {
   
     
for(MessageExt msg:msgs){
   
     
String key = msg.getKeys();
// 根据业务唯⼀标识的 Key 做幂等处理
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

消费端常⻅的幂等操作

1、 业务操作之前进⾏状态查询;

消费端开始执⾏业务操作时,通过幂等id⾸先进⾏业务状态的查询,如:修改订单状态环节,当订单状态为成功/失败则不需要再进⾏处理。那么我们只需要在消费逻辑执⾏之前通过订单号进⾏订单状态查询,⼀旦获取到确定的订单状态则对消息进⾏提交,通知broker消息状态为:ConsumeConcurrentlyStatus.CONSUME_SUCCESS 。

2、 唯⼀性约束保证最后⼀道防线;

上述第⼆点操作并不能保证⼀定不出现重复的数据,如:并发插⼊的场景下,如果没有乐观锁、分布式锁作为保证的前提下,很有可能出现数据的重复插⼊操作,因此我们务必要对幂等id添加唯⼀性索引,这样就能够保证在并发场景下也能保证数据的唯⼀性。

3、 引⼊锁机制;

上述的第⼀点中,如果是并发更新的情况,没有使⽤悲观锁、乐观锁、分布式锁等机制的前提下,进⾏更新,很可能会出现多次更新导致状态的不准确。如:对订单状态的更新,业务要求订单只能从初始化->处理中,处理中->成功,处理中->失败,不允许跨状态更新。如果没有锁机制,很可能会将初始化的订单更新为成功,成功订单更新为失败等异常的情况。
⾼并发下,建议通过状态机的⽅式定义好业务状态的变迁,通过乐观锁、分布式锁机制保证多次更新的结果是确定的,悲观锁在并发环境不利于业务吞吐量的提⾼因此不建议使⽤。