为什么需要消息队列?
- 异步处理
- 流量控制
- 服务解耦
该如何选择消息队列?
RabbitMQ 适合中小型公司,性能相对差点,但是用的人多。
当大量消息积压的时候,会导致 RabbitMQ 的性能急剧下降。不适合消息堆积。
RocketMQ 很在意响应时延。RocketMQ 的一个劣势是,作为国产的消息队列,相比国外的比较流行的同类产品,在国际上还没有那么流行,与周边生态系统的集成和兼容程度要略逊一筹。
Kafka 并不会立即发送出去,而是要等一会儿攒一批再发送,在它的 Broker 中,很多地方都会使用这种“先攒一波再一起处理”的设计。当你的业务场景中,每秒钟消息数量没有那么多的时候,Kafka 的时延反而会比较高。所以,Kafka 不太适合在线业务场景。适合在大数据和流计算领域,和日志系统。
主题和队列有什么区别?
RabbitMQ 的消息模型
rabbitmq 其实是队列模型,消息靠exchange(交换机)路由。 也支持发布订阅模型。(1对多)
RocketMQ 的消息模型
生成和消费确认机制
每个主题包含多个队列,通过多个队列来实现多实例并行生产和消费。需要注意的是,RocketMQ 只在队列上保证消息的有序性,主题层面是无法保证消息的严格顺序的。
消费组中包含多个消费者,同一个组内的消费者是竞争消费的关系,每个消费者负责消费组内的一部分消息。如果一条消息被消费者 Consumer1 消费了,那同组的其他消费者就不会再收到这条消息。
producer会往所有队列发消息,但不是“同一条消息每个队列都发一次”,每条消息只会往某个队列里面发送一次。
对于一个消费组,每个队列上只能串行消费,多个队列加一起就是并行消费了,并行度就是队列数量,队列数量越多并行度越大,所以水平扩展可以提升消费性能。
每队列每消费组维护一个消费位置(offset),记录这个消费组在这个队列上消费到哪儿了。
按照订单ID或者用户ID,用一致性哈希算法,计算出队列ID,指定队列ID发送,这样可以保证相同的订单/用户的消息总被发送到同一个队列上,就可以确保严格顺序了。
如何利用事务消息实现分布式事务?
消息队列中的“事务”,主要解决的是消息生产者和消息消费者的数据一致性问题。
依然拿我们熟悉的电商来举个例子。一般来说,用户在电商 APP 上购物时,先把商品加到购物车里,然后几件商品一起下单,最后支付,完成购物流程,就可以愉快地等待收货了。
这个过程中有一个需要用到消息队列的步骤,订单系统创建订单后,发消息给购物车系统,将已下单的商品从购物车中删除。因为从购物车删除已下单商品这个步骤,并不是用户下单支付这个主要流程中必需的步骤,使用消息队列来异步清理购物车是更加合理的设计。
对于订单系统来说,它创建订单的过程中实际上执行了 2 个步骤的操作:
- 在订单库中插入一条订单数据,创建订单;
- 发消息给消息队列,消息的内容就是刚刚创建的订单。
购物车系统订阅相应的主题,接收订单创建的消息,然后清理购物车,在购物车中删除订单中的商品。
在分布式系统中,上面提到的这些步骤,任何一个步骤都有可能失败,如果不做任何处理,那就有可能出现订单数据与购物车数据不一致的情况,比如说:
创建了订单,没有清理购物车;
订单没创建成功,购物车里面的商品却被清掉了。
在实际应用中,比较常见的分布式事务实现有 2PC(Two-phase Commit,也叫二阶段提交)、TCC(Try-Confirm-Cancel) 和事务消息。每一种实现都有其特定的使用场景,也有各自的问题,都不是完美的解决方案。
消息队列是如何实现分布式事务的?
回到订单和购物车这个例子,看下如何用消息队列来实现分布式事务。
首先,订单系统在消息队列上开启一个事务。然后订单系统给消息服务器发送一个“半消息”,这个半消息不是说消息内容不完整,它包含的内容就是完整的消息内容,半消息和普通消息的唯一区别是,在事务提交之前,对于消费者来说,这个消息是不可见的。
半消息发送成功后,订单系统就可以执行本地事务了,在订单库中创建一条订单记录,并提交订单库的数据库事务。然后根据本地事务的执行结果决定提交或者回滚事务消息。如果订单创建成功,那就提交事务消息,购物车系统就可以消费到这条消息继续后续的流程。如果订单创建失败,那就回滚事务消息,购物车系统就不会收到这条消息。这样就基本实现了“要么都成功,要么都失败”的一致性要求。
如果你足够细心,可能已经发现了,这个实现过程中,有一个问题是没有解决的。如果在第四步提交事务消息时失败了怎么办?对于这个问题,Kafka 和 RocketMQ 给出了 2 种不同的解决方案。
Kafka 的解决方案比较简单粗暴,直接抛出异常,让用户自行处理。我们可以在业务代码中反复重试提交,直到提交成功,或者删除之前创建的订单进行补偿。RocketMQ 则给出了另外一种解决方案。
RocketMQ 中的分布式事务实现
在 RocketMQ 中的事务实现中,增加了事务反查的机制来解决事务消息提交失败的问题。如果 Producer 也就是订单系统,在提交或者回滚事务消息时发生网络异常,RocketMQ 的 Broker 没有收到提交或者回滚的请求,Broker 会定期去 Producer 上反查这个事务对应的本地事务的状态,然后根据反查结果决定提交或者回滚这个事务。
为了支撑这个事务反查机制,我们的业务代码需要实现一个反查本地事务状态的接口,告知 RocketMQ 本地事务是成功还是失败。
在我们这个例子中,反查本地事务的逻辑也很简单,我们只要根据消息中的订单 ID,在订单库中查询这个订单是否存在即可,如果订单存在则返回成功,否则返回失败。RocketMQ 会自动根据事务反查的结果提交或者回滚事务消息。
这个反查本地事务的实现,并不依赖消息的发送方,也就是订单服务的某个实例节点上的任何数据。这种情况下,即使是发送事务消息的那个订单服务节点宕机了,RocketMQ 依然可以通过其他订单服务的节点来执行反查,确保事务的完整性。
如何确保消息不会丢失?
- 消息有3 个阶段:生产阶段、存储阶段、消费阶段
- 生产阶段: 目前生产者使用的是同步模式, 如异步则需要在回调方法中确认。
- 存储阶段: rocketmq broker 参数设置 ,阿里云内部实现
- 消费阶段:
- 如果 Broker 没有收到消费确认响应,下次拉消息的时候还会返回同一条消息,确保消息不会在网络传输过程中丢失,也不会因为客户端在执行消费逻辑中出错导致丢失
- 不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认。(逻辑写错的情况)
- 推荐捕获异常,异常中也进行 ack 操作, 如 go recover 中也要进行 ack 操作,记录错误的消息数据。