一、基本原理
可靠消息最终一致性的基本原理是事务发起方(消息发送者)执行本地事务成功后发出一条消息,事务参与方(消息消费者)接收到事务发起方发送过来的消息,并成功执行本地事务。事务发起方和事务参与方最终的数据能够达到一致的状态
这里主要强调如下两点:
1、 事务发起方一定能够将消息成功发送出去;
2、 事务参与方一定能够成功接收到消息;
可以利用消息中间件实现可靠消息最终一致性分布式事务方案,如下图所示:
事务发起方将消息发送给消息中间件,事务参与方从消息中间件中订阅(接收)消息。事务发起方会通过网络将消息发送给消息中间件,而事务参与方也需要哦通过网络接收消息中间件的消息。网络的不确定性可能会造成事务发起方发送消息失败,也可能会造成事务参与方接收消息失败,即造成分布式事务的问题
在使用可靠消息最终一致性方案解决分布式事务的问题时,需要确保消息发送和消息消费的一致性,从而确保消息的可靠性
二、本地消息表
为了防止在使用消息一致性方案处理分布式事务的过程中出现消息丢失的情况,使用本地事务保证数据业务操作和消息的一致性,也就是通过本地事务,将业务数据和消息数据分别保存到本地数据库的业务数据表和本地消息表中,然后通过定时任务读取本地消息表中的消息数据,将消息发送到消息中间件,等到消息消费者成功接收到消息后,再将本地消息表中的消息删除。这种方式实现的分布式事务就是基于本地消息表的可靠消息最终一致性分布式事务
1.实现原理
基于本地消息表实现的可靠消息最终一致性方案的核心思想是将需要通过分布式系统处理的任务,比如同步数据等操作,通过消息或者日志的形式异步执行,这些消息或者日志可以存储到本地文件中,也可以存储到本地数据库的数据表中,还可以存储到消息中间件中,然后通过一定的业务规则进行重试。这种方案要求各个服务的接口具有幂等性,原理如下图所示:
存放消息的本地消息表和存放数据的业务数据表位于同一个数据库中,这种设计能够保证使用本地事务达到消息和业务数据的一致性,并且引入消息中间件实现多个分支事务之间的最终一致性,整体流程如下:
1、 事务发起方向业务数据表成功写入数据后,会向本地消息表发送一条消息数据,因为写业务数据和写消息数据在同一个本地事务中,所以本地事务会保证这条消息数据一定能够正确地写入本地消息表;
2、 使用专门的定时任务将本地消息表中的消息写入消息中间件,如果写入成功,会将消息从本地消息表中删除否则,继续根据一定的规则进行重试操作;
3、 如果消息根据一定的规则写入消息中间件仍然失败,可以将失败的消息数据转储到“死信”队列数据表中,后续进行人工干预,以达到事务最终一致性的目的;
4、 事务参与方,也就是消息消费者会订阅消息中间件的消息,当接收到消息中间件的消息时,完成本地的业务逻辑;
5、 事务参与方的本地事务执行成功,则整个分布式事务执行成功否则,会根据一定的规则进行重试如果仍然不能成功执行本地事务,则会给事务发起方发送一条事务执行失败的消息,以此来通知事务发起方进行事务回滚;
2.优缺点
可靠消息最终一致性分布式事务解决方案是处理分布式事务的典型方案,也是业界使用比较多的一种方案。基于本地消息表实现的可靠消息方案是其中一种具体实现方式,这种实现方式有如下明显的优点:
1、 使用消息中间件在多个服务之前传递消息数据,在一定程度上避免了分布式事务的问题;
2、 作为业界使用比较多的一种方案,相对比较成熟;
也有比较明显的缺点,如下所示:
1、 无法保证各个服务节点之间数据的强一致性;
2、 某个时刻可能会查不到提交的最新数据;
3、 消息表会耦合到业务库中,需要额外手动处理很多发送消息的逻辑,不利于消息数据的扩展如果消息表中存储了大量的消息数据,会对操作业务数据的性能造成一定的影响;
4、 消息发送失败时需要重试,事务参与方需要保证消息的幂等;
5、 如果消息重试后仍然失败,则需要引入人工干预机制;
6、 消息服务与业务服务耦合,不利于消息服务的扩展和维护;
7、 消息服务不能共用,每次需要实现分布式事务时,都需要单独开发消息服务逻辑,增加了开发和维护的成本;
三、独立消息服务
顾名思义,独立消息服务就是将消息处理部分独立部署成单独的服务,以便消息服务能够单独开发和维护,这样就实现了消息服务和业务服务的解耦、消息数据和业务数据的解耦,方便对消息服务进行扩展
1.实现原理
独立消息服务是在本地消息表的基础上进一步优化,将消息服务独立出来,并将消息数据从本地消息表独立成单独消息数据库,引入消息确认服务和消息恢复服务,如下图所示:
独立消息服务实现的分布式事务中有几个核心服务,分别为可靠消息服务、消息确认服务、消息恢复服务和消息中间件,具体流程如下:
1、 事务发起方向可靠消息服务成功发送消息后,执行本地事务;
2、 可靠消息服务接收到事务发起方发送的消息后,将消息存储到消息库中,并将消息记录的状态标记为“待发送”,并不会马上向消息中间件发送消息同时,向事务发起方响应消息发送已就绪的状态;
3、 当事务发起方的事务执行成功时,事务发起方会向可靠消息服务发送确认消息,否则,发送取消消息;
4、 当可靠消息服务接收到事务发起方发送过来的确认消息时,会直接将消息库中保存的当前消息删除或标记为“已删除”;
5、 消息中间件接收到可靠消息服务发送过来的消息时,会将消息投递给业务参与方,业务参与方接收到消息后,执行本地事务,并将执行结果作为确认消息发送到消息中间件;
6、 消息中间件将确认结果投递到可靠消息服务,可靠消息服务接收到确认消息后,根据结果状态将消息库中的当前消息记录标记为“已完成”;
7、 如果事务发起方向可靠消息服务发送消息失败,会触发消息重试机制如果重试后仍然失败,则会由消息确认服务定时校对事务发起方的事务状态和消息数据库中当前消息的状态,发现状态不一致时,采用一定的校对规则进行校对;
8、 如果可靠消息服务向消息中间件发送消息失败,会触发消息重试机制如果重试后仍然失败,则会由消息恢复服务根据一定的规则定时恢复消息库中的消息数据;
2.优缺点
使用独立消息服务实现分布式事务的优点如下:
1、 消息服务能偶独立部署、独立开发和维护;
2、 消息服务与业务服务解耦,具有更好的扩展性和伸缩性;
3、 消息表从本地数据库解耦出来,使用独立的数据库存储,具有更好的扩展性和伸缩性;
4、 消息服务可以被多个服务共用,降低了重复开发消息服务的成本;
5、 消息数据的可靠性不依赖于消息中间件,弱化了对于消息中间件的依赖性;
缺点如下:
1、 发送一次消息需要请求两次接口;
2、 事务发起方需要开发比较多的事务查询接口,在一定程度上增加了开发成本;
四、RocketMQ 事务消息
RocketMQ 是阿里巴巴开源的一款支持事务消息的消息中间件,于 2012 年正式开源,2017 年成为 Apache 基金会的顶级项目。RocketMQ 的高可用机制以及可靠消息设计能够在系统发生异常时保证事务达到最终一致性
1.实现原理
RocketMQ 主要由 Producer 端和 Broker 端组成。RocketMQ 的事务消息主要是为了让 Producer 端的本地事务与消息发送逻辑形成一个完整的原子操作,即 Producer 端的本地事务和消息发送逻辑要么全部执行成功,要么全部不执行。在 RocketMQ 内部,Producer 端和 Broker 端具有双向通信能力,使得 Broker 端具备事务协调者的功能。RocketMQ 提供的消息存储机制本身就能够对消息进行持久化操作,这些可靠的设计能够保证在系统出现异常时,事务依然能够达到一致性
RocketMQ 4.3 版本之后引入了完整的事务消息机制,其内部实现了完整的本地消息表逻辑,使用 RocketMQ 实现可靠消息分布式事务就不用用户再实现本地消息表的逻辑了,极大地减轻开发工作量
使用RocketMQ 实现可靠消息分布式事务解决方案的基本原理如下图所示:
整体流程如下:
1、 事务发起方向RocketMQ发送Half消息;
2、 RocketMQ向事务发起方响应Half消息发送成功;
3、 事务发起方执行本地事务,向本地数据库中插入、更新、删除数据;
4、 事务发起方向RocketMQ发送提交事务或者回滚事务的消息;
5、 如果事务参与方未收到消息或者执行事务失败,且RocketMQ未删除保存的消息数据,则RocketMQ会回查事务发起方的接口,查询事务状态,以此确认是再次提交事务还是回滚事务;
6、 事务发起方查询本地数据库,确认事务是否执行成功的状态;
7、 事务发起方根据查询到的事务状态,向RocketMQ发送提交事务或者回滚事务的消息;
8、 如果第七步中,事务发起方向RocketMQ发送的是提交事务的消息,则RocketMQ会向事务参与方投递消息;
如果第七步中,事务发起方向 RocketMQ 发送的是回滚事务的消息,则 RocketMQ 不会向事务参与方投递消息,并且会删除内部存储的消息数据
9、 如果RocketMQ向事务参与方投递的是执行本地事务的消息,则事务参与方会执行本地事务,向本地数据库中插入、更新、删除数据;
10、 如果RocketMQ向事务参与方投递的是查询本地事务状态的消息,则事务参与方会查询本地数据库中事务的执行状态;
在使用RocketMQ 实现分布式事务时,上述流程中的主要部分都由 RocketMQ 自动实现了,开发人员只需要实现本地事务的执行逻辑和本地事务的回查方法,重点关注事务的执行状态即可
2.RocketMQ 本地事务监听接口
RocketMQ 内部提供了本地事务的监听接口 RocketMQLocalTransactionListener。Rocket-MQLocalTransactionListener 接口中主要有 executeLocalTransaction(Message, Object) 和 check-LocalTransaction(Message) 两个方法,源码如下:
public interface RocketMQLocalTransactionListener{
RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg);
RocketMQLocalTransactionState checkLocalTransaction(Message msg);
}
当事务发起方成功向 RocketMQ 发送准备执行事务的消息后,RocketMQ 会回调 RocketMQLocalTransactionListener 接口中的 executeLocalTransaction(Message, Object) 方法。executeLocalTransaction(Message, Object) 方法中主要接收两个参数:一个是 Message 类型参数,表示回传的消息;另一个是 Object 类型参数,是事务发起方调用 RocketMQ 的 send() 方法时传递的参数。此方法会返回事务的状态,当返回 COMMIT 时,表示事务提交,当返回 ROLLBACK 时,表示事务回滚,当返回 UNKNOW 时,表示事务回调
当需要回查本地事务状态时,调用 checkLocalTransaction(Message) 方法。checkLocal-Transaction(Message) 方法中接收一个 Message 类型参数,表示要回查的事务消息。此方法返回事务的状态,同 executeLocalTransaction(Message, Object) 方法返回的事务状态,这里不再赘述
使用RocketMQ 实现分布式事务时,事务发起方向 RocketMQ 发送事务消息比较简单,代码片段如下所示:
//创建一个事务消息生产者
TransactionMQProducer producer = new TransactionMQProducer("ProducerGroup");
//设置 Producer 端的地址
producer.setNamesrvAddr("127.0.0.1:9876");
//启动 Producer 端
producer.start();
//设置 TransactionListtener 实现
//transactionListener 表示发送准备消息成功后执行的回调接口
producer.setTransactionListener(transactionListener);
//发送事务消息
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
五、消息发送的一致性
消息发送一致性指的是事务发起方执行本地事务与产生消息数据和发送消息的整体一致性。换句话说,就是事务发起方执行事务操作成功,则一定能够将其产生的消息成功发送出去。这里一般会将消息发送到消息中间件中,例如 Kafka、RocketMQ、RabbitMQ 等。消息发送的一致性包括消息发送与确认机制、消息发送的不可靠性、保证发送消息的一致性
1.消息发送与确认机制
消息发送的一致性设计消息的发送与确认机制。常规消息中间件的消息发送与确认机制如下所示:
1、 消息生产者生成消息并将消息发送给消息中间件这里可以通过同步和异步的方式发送;
2、 消息中间件接收到消息后,将消息数据持久化存储到磁盘这里可以根据配置调整存储策略;
3、 消息中间件向消息生产者返回消息的发送结果这里返回的可以是消息发送的状态,也可以是异常信息;
4、 消息消费者监听消息中间件并消费指定主题中的数据;
5、 消息消费者获取消息中间件中的数据后,执行本地的业务逻辑;
6、 消息消费者对已经成功消费的消息向消息中间件进行确认,消息中间件收到收费者反馈的确认消息后,将确认后的消息从消息中间件中删除;
一般情况下,常规的消息中间件对消息的处理流程无法实现消息发送的一致性,因此,直接使用现成的消息中间件无法完成实现消息发送的一致性。在实现分布式事务时,需要手动开发消息发送与确认机制以满足消息发送的一致性
2.消息发送的不一致性
如果不做处理,消息的发送是不可靠的,无法满足消息发送的一致性。这里通过几个具体的案例来说明因消息发送的不可靠性导致的不一致问题
先操作数据库,再发送消息,代码片段如下所示:
public void saveDataAndSendMessage(){
//保存交易流水信息
payService.save(payInfo);
//发送消息
messageService.sendMessage(message);
}
这种情况无法保证消息发送的一致性,可能虽然数据保存成功了,但是消息发送失败了。事务参与方未能收到消息,无法执行事务参与方的业务逻辑,最终导致事务的不一致
先发送消息,再操作数据库,代码片段如下所示:
public void saveDataAndSendMessage(){
//发送消息
messageService.sendMessage(message);
//保存交易流水信息
payService.save(payInfo);
}
这种情况无法保证消息发送的一致性,可能虽然消息发送成功了,但是保存数据失败了。事务参与方收到消息并成功地执行了本地事务操作,而事务发起方保存数据失败了,最终导致事务的不一致
在同一事务中,先发送消息,后操作数据库,代码片段如下所示:
@Transaction
public void saveDataAndSendMessage(){
//发送消息
messageService.sendMessage(message);
//保存交易流水信息
payService.save(payInfo);
}
这种情况下,虽然使用了 Spring 的 @Transactional 注解,使发送消息和保存数据的操作在同一事务中,但是仍然无法保证消息发送的一致性。可能消息发送成功,数据库操作失败,消息发送成功后无法进行回滚操作。事务发起方执行事务失败,事务参与方接收到消息后执行事务成功,最终导致事务的不一致
在同一事务中,先操作数据库,后发送消息,代码片段如下所示:
@Transactional
public void saveDataAndSendMessage(){
//保存交易流水信息
payService.save(payInfo);
//发送消息
messageService.sendMessage(message);
}
这种情况下,如果保存数据成功,而发送消息失败,则抛出异常,对保存数据的操作进行回滚,最终,保存数据的操作和发送消息的操作都执行失败,看上去发送消息满足一致性了,实际上,这种情况仍然无法满足消息发送的一致性
如果数据保存成功,消息发送成功,由于网络出现故障等异常情况,导致发送消息的响应超时,则抛出异常,回滚保存数据的操作。但是事务参与者可能已经成功接收到消息,并成功执行了事务操作,最终导致事务的不一致
3.如何保证消息发送的一致性
要保证消息发送的一致性,就要实现消息的发送与确认机制。事务发起方向消息中间件成功发送消息后,消息中间件向事务发起方返回消息发送成功的状态。当事务参与方接收到消息并处理完事务操作后,需要向消息中间件发送确认消息,整体流程如下图所示:
主体流程如下所示:
1、 事务发起方向消息中间件发送待确认消息;
2、 消息中间件接收到事务发起方发送过来的消息,将消息存储到本地数据库,此时并不会向事务参与方投递消息;
3、 消息中间件向事务发起方返回消息存储结果,事务发起方根据返回的结果确定执行的业务逻辑,必要时,还会向上层抛出异常信息;
4、 事务发起方完成业务处理后,把业务处理的结果发送给消息中间件;
5、 消息中间件收到事务发起方发送过来的结果数据后,根据结果确定后续的处理逻辑如果事务发起方发送过来的结果为“成功”,消息中间件会更新本地数据库中的消息状态为“待发送”否则,将本地数据库中的消息状态标记为“已删除”,或者直接删除数据库中相应的消息记录;
6、 事务参与方会监听消息中间件,并接收状态为“待发送”的消息,当收到消息中间件的消息后,会执行对应的业务逻辑,消息中间件对应的记录变更为“已发送”;
7、 事务参与方的业务操作完成后,会向消息中间件发送确认消息,表示事务参与方已经收到消息并且执行完对应的业务逻辑,消息中间件会将消息从本地数据库中删除;
8、 为了保证事务发起方一定能能够将消息发送出去,在事务发起方的应用服务中需要暴露一个回调查询接口消息服务在后台开启一个线程,定时扫描消息服务中状态为“待发送”的消息,回调事务发起方提供的回调查询接口,根据消息服务中的业务参数回查事务发起方本地事务的执行状态如果消息服务查询到事务发起方的事务状态为“执行成功”同时,当前消息中间件中对应的消息状态为“待发送”,则将对应的消息投递出去,并且将对应的消息记录更新为“已发送”如果消息服务查询到事务发起方的执行状态为“执行失败”,则消息服务会删除消息中间件中对应的消息,不在投递;
9、 消息中间件也会根据状态向事务发起方投递事务参与方的执行状态,事务发起方会根据状态执行对应的操作,比如事务回滚等;
经过上述的流程,事务发起方就能够保证消息发送的一致性了
六、消息接收的一致性
消息发送的一致性主要由事务发起方保证,消息服务进行辅助。消息接收的一致性需要由事务参与方保证,消息服务进行辅助
1.消息接收与确认机制
消息接收的一致性在一定程度上需要满足消息的接收与确认机制,具体过程如下所示:
1、 消息中间件向消息消费方投递信息;
2、 消息消费方接收到消息中间件投递过来的消息,执行本地业务逻辑,执行完成后,将执行结果发送到消息中间件;
3、 消息中间件接收到消费者发送过来的结果状态,如果状态为“执行成功”,则删除对应的消息记录,或者将其状态设置为“已删除”;
4、 如果消息中间件向消息消费方投递消息失败,会根据一定的规则进行重试如果重试多次后,仍然无法投递到消息消费方,则会将对应的消息存储到死信队列中,后续进行人工干预;
5、 如果消息消费方执行完业务逻辑后,无法成功将结果返回给消息中间件,则同样需要引入重试机制,在消息消费方单独开启一个线程,定时扫描本地数据库中状态为执行完成但向消息中间件发送消息失败的记录,并定时向消息中间件发送状态结果;
这里,有两点需要注意:
1、 消息消费方接收消息中间件消息的接口需要满足幂等性;
2、 消息消费方向消息中间件发送结果状态时,如果需要重试,则应限制最大的重试次数,否则,消息发送操作可能会变成死循环;
常规消息中间件无法做到上述流程,在实现分布式事务时,需要手动实现
2.消息接收的不一致性
如果对消息的接收逻辑不做任何限制和处理,消息的接收是不可靠的,会导致每次接收到消息后,对事务的处理得出不同的结果,最终导致消息接收的不一致性,具体表现在如下几个方面:
1、 事务参与方接收消息的方法没有实现幂等,消息中间件向事务参与方多次重试投递消息时,事务参与方得出不同的业务处理结果,导致事务参与方与事务发起方的事务结果不一致;
2、 事务参与方可能无法收到消息中间件投递的消息,但是消息中间件未实现消息重试投递机制,事务参与方无法执行分支事务,导致事务参与方与事务发起方的事务结果不一致;
3、 事务参与方执行完本地业务逻辑后,无法正确地将执行结果反馈给消息中间件,消息中间件无法正确删除已处理过的消息,会再次向事务发起方重试投递消息,可能会导致事务参与方与事务发起方的事务结果不一致;
4、 事务参与方无法保证完整收到消息中间件投递过来的消息,导致事务参与方与事务发起方的事务结果不一致;
事务参与方如果需要保证消息接受的一致性,需要对消息的接收逻辑进行相应的限制和处理,并且消息中间件需要支持重试消息投递的逻辑
3.如何保证消息接收的一致性
如果需要实现消息接收的一致性,则需要解决如下几个问题:
1、 限制消息中间件重复投递消息的最大次数;
2、 事务参与方接收消息的接口满足幂等性;
3、 实现事务参与方与消息中间件之间的确认机制;
4、 消息中间件中的消息多次重试投递失败后,放入死信队列,后续引入人工干预机制;
具体处理流程如下图所示:
整体流程说明如下所示:
1、 消息中间件向事务参与方投递消息时,如果投递失败,则会按照一定的重试规则重新投递未确认的消息,也就是会按照一定的规则,扫描发送失败并且状态为“待发送”的消息,将其投递给事务参与方;
2、 如果重试次数达到最大重试次数,仍然无法成功将消息投递出去,则将对应的消息存入死信队列,后续通过人工干预投递;
3、 如果消息正确投递出去,则会将数据中存储的对应的消息记录状态更新为“已发送”;
4、 事务参与方接收到消息中间件投递的消息,执行业务逻辑后,将执行的结果发送给消息中间件;
5、 消息中间件接收到事务参与方发送的确认消息后,根据确认消息更新数据库中对应消息的记录状态还会根据确认消息执行是否向事务发起方投递消息的逻辑,事务发起方根据接收的消息执行相应的逻辑处理,比如事务回滚等;
总之,上述流程需要满足消息中间件重试消息投递时,有最大重试次数。事务接收方的接口需要满足幂等性。事务接收方与消息中间件之间需要实现消息确认机制。消息中间件向事务参与方多次投递消息失败后,达到最大重试次数,需要将消息放入死信队列,并引入人工干预机制
经过上述流程,消息的接收就能够保证一致性了
七、消息的可靠性
在实现可靠消息最终一致性分布式事务时,需要满足消息的可靠性,这里的可靠性包括消息发送的可靠性、消息存储的可靠性和消息消费的可靠性
1.消息发送的可靠性
消息发送的可靠性除了要满足消息发送的一致性,还需要保证事务发起方的可靠性,最简单的实现方式就是多副本机制。也就是说,将事务发起方部署多份,形成集群模式
另外,还需要保证消息生产和发送的可靠性。引入回调确认机制,在事务发起方提供回调接口,在消息发送异常时,消息服务也能通过一定的机制回调事务发起方提供的回调接口,获取事务发起方的事务执行状态和消息数据,确保消息一定被消息服务成功接收。消息服务收到消息后,会返回一个确认信息,表示消息服务已经成功收到事务发起方发送的消息。如果事务发起方在一定时间内未收到消息服务返回的确认消息,就会触发消息重试机制,按照一定的规则重新发送消息
例如,事务发起方消息发送成功,但是由于网络异常,未能收到消息服务返回的确认消息,此时,事务发起方就会按照一定的规则重新发送消息,消息服务就有可能收到多条相同的消息数据。因此消息服务也需要实现幂等
消息的重试机制需要实现响应时长判断逻辑(例如,超出 1 分钟,未收到返回的确认消息,就认为本次消息需要重新发送),也需要对重试的次数进行限制
2.消息存储的可靠性
消息存储的可靠性就是确保消息能够进行持久化,不会因为消息堆积、服务崩溃、服务器宕机、网络故障等因素,造成消息丢失
实现存储的可靠性最简单的方式就是消息存储的多副本机制,将原本只存储一份的消息,冗余存储成多份。目前,大多数消息中间件都实现了消息的冗余副本机制,这里不再赘述
3.消息消费的可靠性
消息消费的可靠性除了要满足消息接收的一致性外,还要确保消息被成功消费,避免由于消息丢失,事务参与方崩溃或者服务器宕机造成消息消费不成功。此时,就需要将事务参与方冗余成多个副本,部署成集群模式
除了事务参与方的多副本机制,还要实现事务参与方的重试机制与幂等机制,按照一定的规则获取消息中间件中的数据,以确保事务发起方成功收到消息并成功消费
例如,使用 RocketMQ 消息中间件实现的分布式事务中,事务参与方从 RocketMQ 消息中间件中拉取消息,如果成功获取消息并执行完本地事务操作,则会向 RocketMQ 返回确认消息。如果事务参与方从 RocketMQ 拉取消息失败,或者消费消息失败,就需要重新获取消息进行消费,如果达到一定的重试次数仍然失败,则会将该消息发送到 RocketMQ 的重试队列中
如果事务参与者崩溃或者服务器宕机,RocketMQ 会认为该消息没有被事务参与方成功消费,会被其他的事务参与方重新消费。此时,就有可能造成事务参与方重复消费的情况,因此需要事务参与方实现消息的幂等操作