RocketMQ 处理分布式事务
Created at 2019-02-17 Updated at 2020-07-29 Category 分布式 views
在 RocketMQ 中生产者有三种角色 NormalProducer(普通)、OrderProducer(顺序)、TransactionProducer(事务)。
RocketMQ 是一种最终一致性的分布式事务。
分布式事务中,如果使用消息中间件来实现最终一致性的分布式事务,是先更新数据库再发消息还是先发消息再更新数据库,这两个操作不是原子的,无论谁先谁后都是有问题的。为了解决该问题,RocketMQ 提出了“事务消息的概念”。
最终一致性
发送消息时涉及两个操作:
- 上游服务处理本地事务
- 发送消息到消息队列
当这两个操作有一个失败的而情况下就会出现数据不一致。所以可靠消息要将消息的状态存进数据库,后台会有一个定时运行的线程不停的检查各个消息的状态。如果一直是“待确认”状态,就可以回调上游服务提供的一个接口来查询上游服务是否执行成功。
为什么不能把业务处理和发送消息到 MQ 放到一个本地事务?
如果本地事务处理成功,消息存储成功,MQ 处理超时,ACK 确认失败,会导致本地事务回滚。然而消息却已经发出去,下游就会消费这条消息,就会导致数据不一致。
RocketMQ 的事务消息,就实现了可靠消息服务的所有功能,核心思想跟上面类似。
Half Message 半消息
指暂不能被 Consumer 消费的消息。Producer 已经把消息成功发送到了 Broker 端,但此消息被标记为暂不能投递状态,处于该种状态下的消息称为半消息。需要 Producer
对消息的二次确认后,Consumer才能去消费它。
为什么要先发送 Half Message
- 可以先确认 Brock 服务器是否正常,如果半消息都发送失败了,说明 Brock 挂了。
- 可以通过半消息来回查事务,如果半消息发送成功后一直没有被二次确认,那么就会回查事务状态。
消息回查
Brock 服务器会定时扫描长期处于半消息的消息,会主动询问 Producer 端 该消息的最终状态(Commit 或者 Rollback),该消息即为消息回查。
什么情况会会查
- 执行本地事务的时候,由于突然网络等原因一直没有返回执行事务的结果(commit 或者 rollback)导致最终返回 UNKNOW,那么就会回查。
- 本地事务执行成功后,返回 Commit 进行消息二次确认的时候的服务挂了,在重启服务那么这个时候在 brock 端,它还是个 Half Message(半消息),这也会回查。
分布式事务交互流程

- A 服务先发送个 Half Message 给 Brock 端,消息中携带 B 服务 即将要 +100 元的信息。
- 当 A 服务知道 Half Message 发送成功后,那么开始第 3 步执行本地事务。
- 执行本地事务(会有三种情况 1、执行成功。2、执行失败。3、网络等原因导致没有响应)
- 如果本地事务成功,那么 Product 像 Brock 服务器发送 Commit,这样 B 服务就可以消费该 message。
- 如果本地事务失败,那么 Product 像 Brock 服务器发送 Rollback,那么就会直接删除上面这条半消息。
- 如果因为网络等原因迟迟没有返回失败还是成功,那么会执行 RocketMQ 的回调接口,来进行事务的回查。
只有 A 服务本地事务执行成功,B 服务才能消费该 message。
常见问题
为什么不在业务成功之后再发送消息?
如果业务成功,再去发消息,还没来得及发送消息,业务系统就宕机了,根本没有记录之前是否发送过消息,这样就会导致业务执行成功,消息最终没发出去的情况。
如果 consumer 消费失败,是否需要 producer 做回滚呢?
这里的事务消息,producer 不会因为 consumer 消费失败而做回滚,采用事务消息的应用,其所追求的是高可用和最终一致性,消息消费失败的话,MQ 自己会负责重推消息,直到消费成功。因此,事务消息是针对生产端而言的,而消费端的一致性是通过MQ的重试机制来完成的。
如何保证下游服务的可靠接收
- 在可靠消息服务里开发一个后台线程,不断的检查消息状态。
- 如果消息状态一直是“已发送”,始终没有变成“已完成”,那么就说明下游服务始终没有处理成功。
- 此时可靠消息服务就可以再次尝试重新投递消息到 MQ,让下游服务来再次处理。
- 只要下游服务的接口逻辑实现幂等性,保证多次处理一个消息,不会插入重复数据即可。
下游服务如果处理完成后但是通知 Brocker 的过程中挂掉了怎么办
可靠消息服务会启动相应的后台线程,轮询一直处于“已发送”状态的消息,判断状态持续时间是否超过了规定时间,如果超时,可靠消息服务就会再次向 MQ 服务投递此消息,从而确保消息能被再次消费处理。(注意,也可能出现下游服务处理成功,但是通知消息发送失败的情况,所以为了确保幂等,下游服务也需要在业务逻辑上做好相应的防重处理)。
当 MQ 故障时怎么办
使用基于 KV 存储的队列支持高可用降级方案。
- 封装 MQ 客户端组件与故障感知,连续 n 次重试尝试投递消息到 MQ 都报错,说明 MQ 故障,此时可以自动感知记忆自动触发降级开关。
- 通过 zookeeper 触发降级开关。
- 当 MQ 挂掉之后,使用 redis 的队列来代替。
- redis 中要根据实际场景划分 n 个队列,然后通过 hash 算法,均匀写入固定好 n 个 key 对应的 kv 存储队列中。(因为只往一个 key 中写入消息会导致负载过大)
- 降级开关打开后,需要开启一个线程,每隔一段时间尝试给 MQ 投递一个消息看是否恢复了。
- 如果恢复了,zk 就可以关闭降级开关,继续往 MQ 投递消息,下游服务在确认 kv 存储的各个队列中已经没有数据之后,就可以重新切换为从 MQ 消费消息。
如何保证不被重复消费
强校验:将 id + 业务场景 的唯一标识写入数据库中,将这个操作和业务操作放到同一个事务里,先用唯一标识去数据库查这条消息有没有被消费,如果没有被消费,就执行事务,被消费了就直接返回。
弱校验:将唯一标识作为 Redis 的 key 写入 Redis 里,就算 kv 丢了可能这样的场景也没关系,比如短信通知。
如何保证消费的可靠性传输
- 生产者丢数据
- 消息队列丢数据:开启持久化磁盘配置
- 消费者丢数据
如何保证顺序消费
场景:数据库主从同步,数据量过大,写入消息队列。而增删改的操作一定要是顺序的,比如增改删,变成了改删增,这样本应该被删掉的数据就还在。
RocketMQ 中的实现:一个 topic 下面有多个队列,为了保证发送有序,RocketMQ 提供了 MessageQueueSelector 队列选择机制。RocketMQ 的 topic内的队列机制,可以保证存储满足FIFO(First Input First Output 简单说就是指先进先出),剩下的只需要消费者顺序消费即可。RocketMQ仅保证顺序发送,顺序消费由消费者业务保证。