摘要
1、可靠消息最终一致性,概念特点、问题、针对问题的方案(本地消息表、Rocketmq)
2、本地消息表流程图
3、Rocketmq流程、实战
4、最大努力通知,流程、
可靠消息最终一致性
当事务发起方执行完成本地事务后并发出一条消息,事务参与方(消息消费者)一定能 够接收消息并处理事务成功,此方案强调的是只要消息发给事务参与方最终事务要达到一致。
特点
- 适合执行周期长且实时性要求不高的场景
- 引入消息机制后,同步的事务操作变为基于消 息执行的异步操作, 避免了分布式事务中的同步阻塞操作的影响,并实现了两个服务的解耦。
网络通信的不确定性会导致分布式事务问题:
- 事务发起方(消息生产方)将消息发给消息中间件
- 事务参与方从消息中间件接收消息
可靠消息最终一致性的问题
1、本地事务与消息发送的原子性问题
- 发送消息成功,数据库操作失败
1
2
3
4begin transaction;
//1.发送MQ
//2.数据库操作
commit transation; - 如果发送MQ消息失败,就会抛出异常,导致数据库事务回滚。但如果是超时异常,数 据库回滚,但MQ其实已经正常发送了,同样会导致不一致。
1
2
3
4begin transaction;
//1.数据库操作
//2.发送MQ
commit transation;
2、事务参与方接收消息的可靠性,需要如果接收消息失败可以重复接收消息
3、消息重复消费的问题,要解决消息重复消费的问题就要实现事务参与方的方法幂等性。
本地消息表方案
- 交互流程规范
- 用户服务在本地事务新增用户和增加 ”积分消息日志“。(用户表和消息表通过本地事务保证一致)
- 定时任务扫描日志–>保证将消息发送给消息队列。启动独立的线程,定时对消息日志表中的消息进行扫描并发送至消息中间件,在消息中间件反馈发送成功后删除该消息日志,否则等待定时任务下一周期重试。
- MQ的ack(即消息确认)机制–>幂等,收到ack,MQ将不再向消费者推送消息,否则消费者会不断重 试向消费者来发送消息。
RocketMQ事务消息方案
- 交互流程实现
- Producer (MQ发送方)发送事务消息至MQ Server,MQ Server将消息状态标记为Prepared(预备状态),注意此时这条消息消费者(MQ订阅方)是无法消费到的。
- MQ Server接收到Producer 发送给的消息则回应发送成功表示MQ已接收到消息。
- Producer 端执行业务代码逻辑,通过本地数据库事务控制。
- 若Producer 本地事务执行成功则自动向MQServer发送commit消息,MQ Server接收到commit消息后将状态标记为可消费,此时MQ订阅方(积分服务)即正常消费消息。若Producer 本地事务执行失败则自动向MQServer发送rollback消息,MQ Server接收到rollback消息后 将删除消息。
- 如果执行Producer端本地事务过程中,执行端挂掉或者超时,MQ Server将会不停的询问同组的其他 Producer来获取事务执行状态,这个过程叫事务回查。MQ Server会根据事务回查结果来决定是否投递消息。
- 以上主干流程已由RocketMQ实现,对用户侧来说,用户需要:
- 实现本地事务执行
- 本地事务回查方法,因此关注本地事务的执行状态
RocketMq事务
RocketMQ主要解决了两个功能:
1、本地事务与消息发送的原子性问题。
2、事务参与方接收消息的可靠性。
1 | //回调 |
实践:
- 在application-local.propertis中配置rocketMQ nameServer地址及生产组
- service
1
2
3
4
5
6
7
8
9
10//从event构建消息体
public void sendUpdateAccountBalance(AccountChangeEvent accountChangeEvent)
//发送消息并收到回应后执行本地事务
public void doUpdateAccountBalance(AccountChangeEvent accountChangeEvent)
//实现执行本地事务和事务回查两个方法
@RocketMQTransactionListener(txProducerGroup = "producer_group_txmsg_bank1")
public class ProducerTxmsgListener implements RocketMQLocalTransactionListener- controller 略
- B账户的MQ 监听类
1
2
3
4//需要实现幂等
@RocketMQMessageListener(topic = "topic_txmsg",consumerGroup = "consumer_txmsg_group_bank2")
public class TxmsgConsumer implements RocketMQListener<String>
最大努力通知
- 流程
本文作者:
yuqing wang
本文链接: https://satyrswang.github.io/2021/02/04/分布式事务-下/
版权声明: 本作品采用 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 进行许可。转载请注明出处!
本文链接: https://satyrswang.github.io/2021/02/04/分布式事务-下/
版权声明: 本作品采用 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 进行许可。转载请注明出处!