RocketMQ 处理分布式事务

Created at 2019-02-17 Updated at 2020-07-29 Category 分布式 Tag RocketMQ / 分布式事务

在 RocketMQ 中生产者有三种角色 NormalProducer(普通)、OrderProducer(顺序)、TransactionProducer(事务)。

RocketMQ 是一种最终一致性的分布式事务。

分布式事务中,如果使用消息中间件来实现最终一致性的分布式事务,是先更新数据库再发消息还是先发消息再更新数据库,这两个操作不是原子的,无论谁先谁后都是有问题的。为了解决该问题,RocketMQ 提出了“事务消息的概念”。

最终一致性

发送消息时涉及两个操作:

  1. 上游服务处理本地事务
  2. 发送消息到消息队列
    当这两个操作有一个失败的而情况下就会出现数据不一致。所以可靠消息要将消息的状态存进数据库,后台会有一个定时运行的线程不停的检查各个消息的状态。如果一直是“待确认”状态,就可以回调上游服务提供的一个接口来查询上游服务是否执行成功。

为什么不能把业务处理和发送消息到 MQ 放到一个本地事务?

如果本地事务处理成功,消息存储成功,MQ 处理超时,ACK 确认失败,会导致本地事务回滚。然而消息却已经发出去,下游就会消费这条消息,就会导致数据不一致。

RocketMQ 的事务消息,就实现了可靠消息服务的所有功能,核心思想跟上面类似。

Half Message 半消息

指暂不能被 Consumer 消费的消息。Producer 已经把消息成功发送到了 Broker 端,但此消息被标记为暂不能投递状态,处于该种状态下的消息称为半消息。需要 Producer

对消息的二次确认后,Consumer才能去消费它。

为什么要先发送 Half Message

  1. 可以先确认 Brock 服务器是否正常,如果半消息都发送失败了,说明 Brock 挂了。
  2. 可以通过半消息来回查事务,如果半消息发送成功后一直没有被二次确认,那么就会回查事务状态。

消息回查

Brock 服务器会定时扫描长期处于半消息的消息,会主动询问 Producer 端 该消息的最终状态(Commit 或者 Rollback),该消息即为消息回查。

什么情况会会查

  1. 执行本地事务的时候,由于突然网络等原因一直没有返回执行事务的结果(commit 或者 rollback)导致最终返回 UNKNOW,那么就会回查。
  2. 本地事务执行成功后,返回 Commit 进行消息二次确认的时候的服务挂了,在重启服务那么这个时候在 brock 端,它还是个 Half Message(半消息),这也会回查。

分布式事务交互流程

整体交互流程图

  1. A 服务先发送个 Half Message 给 Brock 端,消息中携带 B 服务 即将要 +100 元的信息。
  2. 当 A 服务知道 Half Message 发送成功后,那么开始第 3 步执行本地事务。
  3. 执行本地事务(会有三种情况 1、执行成功。2、执行失败。3、网络等原因导致没有响应)
  4. 如果本地事务成功,那么 Product 像 Brock 服务器发送 Commit,这样 B 服务就可以消费该 message。
  5. 如果本地事务失败,那么 Product 像 Brock 服务器发送 Rollback,那么就会直接删除上面这条半消息。
  6. 如果因为网络等原因迟迟没有返回失败还是成功,那么会执行 RocketMQ 的回调接口,来进行事务的回查。

只有 A 服务本地事务执行成功,B 服务才能消费该 message。

常见问题

为什么不在业务成功之后再发送消息?

如果业务成功,再去发消息,还没来得及发送消息,业务系统就宕机了,根本没有记录之前是否发送过消息,这样就会导致业务执行成功,消息最终没发出去的情况。

如果 consumer 消费失败,是否需要 producer 做回滚呢?

这里的事务消息,producer 不会因为 consumer 消费失败而做回滚,采用事务消息的应用,其所追求的是高可用和最终一致性,消息消费失败的话,MQ 自己会负责重推消息,直到消费成功。因此,事务消息是针对生产端而言的,而消费端的一致性是通过MQ的重试机制来完成的。

如何保证下游服务的可靠接收

  1. 在可靠消息服务里开发一个后台线程,不断的检查消息状态。
  2. 如果消息状态一直是“已发送”,始终没有变成“已完成”,那么就说明下游服务始终没有处理成功。
  3. 此时可靠消息服务就可以再次尝试重新投递消息到 MQ,让下游服务来再次处理。
  4. 只要下游服务的接口逻辑实现幂等性,保证多次处理一个消息,不会插入重复数据即可。

下游服务如果处理完成后但是通知 Brocker 的过程中挂掉了怎么办

可靠消息服务会启动相应的后台线程,轮询一直处于“已发送”状态的消息,判断状态持续时间是否超过了规定时间,如果超时,可靠消息服务就会再次向 MQ 服务投递此消息,从而确保消息能被再次消费处理。(注意,也可能出现下游服务处理成功,但是通知消息发送失败的情况,所以为了确保幂等,下游服务也需要在业务逻辑上做好相应的防重处理)。

当 MQ 故障时怎么办

使用基于 KV 存储的队列支持高可用降级方案。

  1. 封装 MQ 客户端组件与故障感知,连续 n 次重试尝试投递消息到 MQ 都报错,说明 MQ 故障,此时可以自动感知记忆自动触发降级开关。
  2. 通过 zookeeper 触发降级开关。
  3. 当 MQ 挂掉之后,使用 redis 的队列来代替。
  4. redis 中要根据实际场景划分 n 个队列,然后通过 hash 算法,均匀写入固定好 n 个 key 对应的 kv 存储队列中。(因为只往一个 key 中写入消息会导致负载过大)
  5. 降级开关打开后,需要开启一个线程,每隔一段时间尝试给 MQ 投递一个消息看是否恢复了。
  6. 如果恢复了,zk 就可以关闭降级开关,继续往 MQ 投递消息,下游服务在确认 kv 存储的各个队列中已经没有数据之后,就可以重新切换为从 MQ 消费消息。

如何保证不被重复消费

强校验:将 id + 业务场景 的唯一标识写入数据库中,将这个操作和业务操作放到同一个事务里,先用唯一标识去数据库查这条消息有没有被消费,如果没有被消费,就执行事务,被消费了就直接返回。
弱校验:将唯一标识作为 Redis 的 key 写入 Redis 里,就算 kv 丢了可能这样的场景也没关系,比如短信通知。

如何保证消费的可靠性传输

  1. 生产者丢数据
  2. 消息队列丢数据:开启持久化磁盘配置
  3. 消费者丢数据

如何保证顺序消费

场景:数据库主从同步,数据量过大,写入消息队列。而增删改的操作一定要是顺序的,比如增改删,变成了改删增,这样本应该被删掉的数据就还在。
RocketMQ 中的实现:一个 topic 下面有多个队列,为了保证发送有序,RocketMQ 提供了 MessageQueueSelector 队列选择机制。RocketMQ 的 topic内的队列机制,可以保证存储满足FIFO(First Input First Output 简单说就是指先进先出),剩下的只需要消费者顺序消费即可。RocketMQ仅保证顺序发送,顺序消费由消费者业务保证。

参考文章

Site by Cellophane using Hexo & Random

Hide