08、分布式事务 实战 - 分布式事务解决方案之可靠消息最终一致性

1. 什么是可靠消息最终一致性?

可靠消息最终一致性方案是指当事务发起方执行完成本地事务后并发出一条消息,事务参与方(消息消费者)一定能
够接收消息并处理事务成功,此方案强调的是只要消息发给事务参与方最终事务要达到一致。

这里面有2个重点:

1、 消息发送方本地事物执行成功之后,消息一定会投递成功;
2、 消息消费者最终也一定能够消费此消息,最终使分布式事务最终达成一致性;

2. 业务场景:下单送积分

电商中有这样的一个场景:商品下单之后,需给用户送积分,订单表和积分表分别在不同的db中,涉及到分布式事务的问题。

我们通过可靠消息来解决这个问题:

1、 商品下单成功之后送积分的操作,我们使用mq来实现;
2、 商品下单成功之后,投递一条消息到mq,积分系统消费消息,给用户增加积分;

我们主要讨论一下,商品下单及投递消息到mq的操作,如何实现?每种方式优缺点?

3. 投递消息过程:方式一

3.1. 过程

  • step1:开启本地事务
  • step2:生成购物订单
  • step3:投递消息到mq
  • step4:提交本地事务

这种方式是将发送消息放在了事务提交之前。

3.2. 可能存在的问题

  • step3发生异常:导致step4失败,商品下单失败,直接影响到商品下单业务
  • step4发生异常,其他step成功:商品下单失败,消息投递成功,给用户增加了积分

4. 投递消息过程:方式二

下面我们换种方式,我们将发送消息放到事务之后进行。

4.1. 过程

  • step1:开启本地事务
  • step2:生成购物订单
  • step3:提交本地事务
  • step4:投递消息到mq

4.2. 可能会出现的问题

step4发生异常,其他step成功:导致商品下单成功,投递消息失败,用户未增加积分

上面两种是比较常见的做法,也是最容易出错的。

5. 投递消息过程:方式三

  • step1:开启本地事务
  • step2:生成购物订单
  • step3:本地库中插入一条需要发送消息的记录t_msg_record
  • step3:提交本地事务
  • step5:新增一个定时器,轮询t_msg_record,将待发送的记录投递到mq中

这种方式借助了数据库的事务,业务和消息记录作为了一个原子操作,业务成功之后,消息日志必定是存在的。解决了前两种方式遇到的问题。如果我们的业务系统比较单一,可以采用这种方式。

对于微服务化的情况,上面这种方式不是太好,每个服务都需要上面的操作;也不利于扩展。

6. 投递消息过程:方式四

增加一个消息服务消息库,负责消息的落库、将消息发送投递到mq。

  • step1:开启本地事务
  • step2:生成购物订单
  • step3:当前事务库插入一条日志:生成一个唯一的业务id(bus_id),将bus_id和订单关联起来保存到当前事务所在的库中
  • step4:调用消息服务:携带bus_id,将消息先落地入库,此时消息的状态为待发送状态,返回消息id(msg_id)
  • step5:提交本地事务
  • step6:如果上面都成功,调用消息服务,将消息投递到mq中;如果上面有失败的情况,则调用消息服务取消消息的发送

能想到上面这种方式,已经算是有很大进步了,我们继续分析一下可能存在的问题:

1、 系统中增加了一个消息服务,商品下单操作依赖于该服务,业务对该服务依赖性比较高,当消息服务不可用时,整个业务将不可用;
2、 若step6失败,消息将处于待发送状态,此时业务方需要提供一个回查接口(通过bus_id查询),验证业务是否执行成功;消息服务需新增一个定时任务,对于状态为待发送状态的消息做补偿处理,检查一下业务是否处理成功;从而确定消息是投递还是取消发送;
3、 step4依赖于消息服务,如果消息服务性能不佳,会导致当前业务的事务提交时间延长,容易产生死锁,并导致并发性能降低我们通常是比较忌讳在事务中做远程调用处理的,远程调用的性能和时间往往不可控,会导致当前事务变为一个大事务,从而引发其他故障;

7. 投递消息过程:方式五

在以上方式中,我们继续改进,进而出现了更好的一种方式:

  • step1:生成一个全局唯一业务消息id(bus_msg_id),调用消息服务,携带bus_msg_id,将消息先落地入库,此时消息的状态为待发送状态,返回消息id(msg_id)
  • step2:开启本地事务
  • step3:生成购物订单
  • step4:当前事务库插入一条日志(将step3中的业务和bus_msg_id关联起来)
  • step5:提交本地事务
  • step6:分2种情况:如果上面都成功,调用消息服务,将消息投递到mq中;如果上面有失败的情况,则调用消息服务取消消息的发送

若step6失败,消息将处于待发送状态,此时业务方需要提供一个回查接口(通过bus_msg_id查询),验证业务是否执行成功;

消息服务需新增一个定时任务,对于状态为待发送状态的消息做补偿处理,检查一下业务是否处理成功;从而确定消息是投递还是取消发送。

方式五和方式四对比,比较好的一个地方:将调用消息服务,消息落地操作,放在了事务之外进行,这点小的改进其实算是一个非常好的优化,减少了本地事务的执行时间,从而可以提升并发量,阿里有个消息中间件RocketMQ就支持方式5这种,大家可以去用用。

 

8. 关于消息消费的一些问题

如何解决重复消费的问题?

消费者轮询从mq server中拉取消息,然后进行消费。

消息消费者消费消息的过程

  • step1:从mq中拉取消息
  • step2:执行本地业务,比如增加积分操作
  • step3:消费完毕之后,将消息从mq中删掉

当step2成功,step3失败之后,这个消息会再次从mq中拉取出来,会出现重复消费的问题,所以我们需要考虑消费的幂等性,同一条消息多次消费和一次消费产生的结果应该是一致的,关于幂等性是另外一个课题,下次会详说。