hankviv / blog_issue

issue
2 stars 0 forks source link

MQ相关 #41

Open hankviv opened 4 years ago

hankviv commented 4 years ago

消息队列最常被使用的场景: 1.异步处理 同步业务改为异步处理,增加处理响应。 2.流量控制 削峰填谷,在秒杀的时候控制流量 3.服务解耦 下游业务订阅上游服务,下游业务的增减不会影响到上游业务。 还包括:作为发布 / 订阅系统实现一个微服务级系统间的观察者模式,连接流计算任务和数据,用于将消息广播给大量接收者。

hankviv commented 4 years ago

选择消息队列产品的基本标准: 1.必须开源,在遇到相关bug可以直接修改源代码。 2.活跃度比较高,bug修复就比较及时,而且周边生态系统会有一个比较好的集成和兼容。

相关产品:

  1. RabbitMQ 特点:轻量级、迅捷,非常容易部署和使用,世界上使用最广泛的开源消息队列,支持语言比较多 缺点:RabbitMQ 对消息堆积的支持并不好,消息堆积导致性能下降,性能比较差。Erlang语言冷门,不易学。

  2. RocketMQ 特点:性能比较兼容,稳定性和可靠性都不错,活跃的中文社区。java开发。对在线业务的响应时延做了很多的优化 缺点:不是很流行,生态一般。

  3. Kafka 特点:周边生态系统的兼容性是最好的没有之一。设计上大量使用了批量和异步的思想,性能最高。 缺点:响应时延比较高,不太适合在线业务场景。

对消息队列功能和性能都没有很高的要求,只需要一个开箱即用易于维护的产品,建议使用 RabbitMQ。 如果你的系统使用消息队列主要场景是处理在线业务,比如在交易系统中用消息队列传递订单,那 RocketMQ 的低延迟和金融级的稳定性是你需要的。 如果你需要处理海量的消息,像收集日志、监控信息或是前端的埋点这类数据,或是你的应用场景大量使用了大数据、流计算相关的开源产品,那 Kafka 是最适合你的消息队列。

hankviv commented 4 years ago

消息队列基础概念: 最初的消息队列,就是一个严格意义上的队列。在计算机领域,“队列(Queue)”是一种先进先出 FIFO,数据结构早期的消息队列,就是按照“队列”的数据结构来设计的。 image

多个生产者往同一个队列里发送消息。队列里面就是所有消息的合集,消息顺序就是按照生产的顺序。 多个消费者消费同一个队列的话,一个消息只能被消费一次,每个消息只能被一个消费者所消费。 如果需要一份消息发送给多人消费者的话,就需要每个消费者都生产一个队列。这就造成了 下游服务和上游的依赖。

为了解决这个问题,演化出了另外一种消息模型:“发布 - 订阅模型(Publish-Subscribe Pattern)”。 image 在发布 - 订阅模型中,消息的发送方称为发布者(Publisher),消息的接收方称为订阅者(Subscriber), 服务端存放消息的容器称为主题(Topic)。发布者将消息发送到主题中,订阅者在接收消息之前需要先“订阅主题”。“订阅”在这里既是一个动作,同时还可以认为是主题在消费时的一个逻辑副本,

我们仔细对比一下这两种模型,生产者就是发布者,消费者就是订阅者,队列就是主题,并没有本质的区别。它们最大的区别其实就是,一份消息数据能不能被消费多次的问题。 现代的消息队列产品使用的消息模型大多是这种发布 - 订阅模型

RabbitMQ,它是少数依然坚持使用队列模型的产品之一。 image 在 RabbitMQ 中,Exchange 位于生产者和队列之间,生产者并不关心将消息发送给哪个队列,而是将消息发送给 Exchange,由 Exchange 上配置的策略来决定将消息投递到哪些队列中。同一份消息如果需要被多个消费者来消费,需要配置 Exchange 将消息发送到多个队列,每个队列中都存放一份完整的消息数据,可以为一个消费者提供消费服务。

RocketMQ 也有队列(Queue)这个概念,并且队列在 RocketMQ 中是一个非常重要的概念 几乎所有的消息队列产品都使用一种非常朴素的“请求 - 确认”机制,确保消息不会在传递过程中由于网络或服务器故障丢失。 具体的做法也非常简单。在生产端,生产者先将消息发送给服务端,也就是 Broker,服务端在收到消息并将消息写入主题或者队列中后,会给生产者发送确认的响应。 如果生产者没有收到服务端的确认或者收到失败的响应,则会重新发送消息;在消费端,消费者在收到消息并完成自己的消费业务逻辑(比如,将数据保存到数据库中)后,也会给服务端发送消费成功的确认,服务端只有收到消费确认后,才认为一条消息被成功消费,否则它会给消费者重新发送这条消息,直到收到对应的消费成功确认。

这个确认机制很好地保证了消息传递过程中的可靠性,但是为了确保消息的有序性,在某一条消息被成功消费之前,下一条消息是不能被消费的,否则如果上一个消费失败的话,就会导致消费顺序出现问题。

每个主题在任意时刻,至多只能有一个消费者实例在进行消费,那就没法通过水平扩展消费者的数量来提升消费端总体的消费性能。为了解决这个问题,RocketMQ 在主题下面增加了队列的概念。每个主题包含多个队列,通过多个队列来实现多实例并行生产和消费。 在 Topic 的消费过程中,由于消息需要被不同的组进行多次消费,所以消费完的消息并不会立即被删除,这就需要 RocketMQ 为每个消费组在每个队列上维护一个消费位置(Consumer Offset),这个位置之前的消息都被消费过,之后的消息都没有被消费过,每成功消费一条消息,消费位置就加一。

消费组中包含多个消费者,同一个组内的消费者是竞争消费的关系,每个消费者负责消费组内的一部分消息。如果一条消息被消费者 Consumer1 消费了,那同组的其他消费者就不会再收到这条消息。 每个消费组都消费主题中一份完整的消息,不同消费组之间消费进度彼此不受影响,一条消息被 Consumer Group1 消费过,也会再给 Consumer Group2 消费。 当然这也导致了 如果一个topic有多个queue的话,单个queue消费顺序是保证的,但是topic消息顺序是无法保证的。 如果需要入topic的顺序就保证的话,就只能一个topic里面只有一个queue。 image

Kafka 的消息模型 Kafka 的消息模型和 RocketMQ 是完全一样,在 Kafka 中,队列这个概念的名称不一样,Kafka 中对应的名称是“分区(Partition)”。

hankviv commented 4 years ago

如何利用事务消息实现分布式事务: 消息队列引入了业务异步实现,这就导致了一些事务相关的问题。 消息队列中的“事务”,主要解决的是消息生产者和消息消费者的数据一致性问题。 实现严格的分布式事务是更加不可能完成的任务。比较常见的分布式事务实现有 2PC(Two-phase Commit,也叫二阶段提交)、TCC(Try-Confirm-Cancel) 和事务消息。每一种实现都有其特定的使用场景,也有各自的问题,都不是完美的解决方案。

事务消息需要消息队列提供相应的功能才能实现,Kafka 和 RocketMQ 都提供了事务相关功能。 image 订单系统在消息队列上开启一个事务。然后订单系统给消息服务器发送一个“半消息”,半消息的意思是 再未提交之前,这条消息对消费者是不可见的。 半消息发送成功后,如果本地事务创建订单成功,那就提交事务消息,消息就生产到队列。如果订单创建失败,那就回滚事务消息, 消息就不会发送到队列。这样就基本保证了事务的原子性。 这个实现过程中,有一个问题是没有解决的。如果在第四步提交事务消息时失败了怎么办?对于这个问题,Kafka 和 RocketMQ 给出了 2 种不同的解决方案。 Kafka 的解决方案比较简单粗暴,直接抛出异常,让用户自行处理。我们可以在业务代码中反复重试提交,直到提交成功,或者删除之前创建的订单进行补偿。RocketMQ 则给出了另外一种解决方案。

RocketMQ 中的分布式事务实现 在 RocketMQ 中的事务实现中,增加了事务反查的机制来解决事务消息提交失败的问题。 如果我们在提交或者回滚半消息的时候出现问题(网络异常),没有成功通知到RocketMQ,RocketMQ 的 Broker 没有收到提交或者回滚的请求,Broker 会定期去 Producer 上反查这个事务对应的本地事务的状态,然后根据反查结果决定提交或者回滚这个事务。

为了支撑这个事务反查机制,我们需要提供一个反查本地事务状态的接口,告知 RocketMQ 本地事务是成功还是失败。 image

hankviv commented 4 years ago

如何防止消息丢失 检测消息丢失的方法 我们可以利用消息队列的有序性来验证是否有消息丢失。原理非常简单,在 Producer 端,我们给每个发出的消息附加一个连续递增的序号,然后在 Consumer 端来检查这个序号的连续性。 如果没有消息丢失,Consumer 收到消息的序号必然是连续递增的,如果检测到序号不连续,那就是丢消息了。还可以通过缺失的序号来确定丢失的是哪条消息。 大多数消息队列的客户端都支持拦截器机制,你可以利用这个拦截器机制,在 Producer 发送消息之前的拦截器中将序号注入到消息中,在 Consumer 收到消息的拦截器中检测序号的连续性。 这个解决方案在分布式队列系统中可能存在的问题: 像 Kafka 和 RocketMQ 这样的消息队列,它是不保证在 Topic 上的严格顺序的,只能保证分区上的消息是有序的,所以我们在发消息的时候必须要指定分区,并且,在每个分区单独检测消息序号的连续性。

确保消息可靠传递 image

  1. 存储阶段: 如果 Broker 出现了故障,比如进程死掉了或者服务器宕机了,还是可能会丢失消息的。 如果对消息的可靠性要求非常高,可以通过配置 Broker 参数来避免因为宕机丢消息。 对于单个节点的 Broker,需要配置 Broker 参数,在收到消息后,将消息写入磁盘后再给 Producer 返回确认响应。 如果是 Broker 是由多个节点组成的集群,需要将 Broker 集群配置成:至少将消息发送到 2 个以上的节点,再给客户端回复发送确认响应。这样当某个 Broker 宕机时,其他的 Broker 可以替代宕机的 Broker,也不会发生消息丢失。

  2. 消费阶段 消费阶段采用和生产阶段类似的确认机制来保证消息的可靠传递,客户端从 Broker 拉取消息后,执行用户的消费业务逻辑,成功后,才会给 Broker 发送消费确认响应。如果 Broker 没有收到消费确认响应,下次拉消息的时候还会返回同一条消息,确保消息不会在网络传输过程中丢失,也不会因为客户端在执行消费逻辑中出错导致丢失。 不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认。

hankviv commented 4 years ago

如何处理消费过程中的重复消息

消息重复的情况必然存在: 在消息传递过程中,如果出现传递失败的情况,发送方会执行重试,重试的过程中就有可能会产生重复的消息。 在 MQTT 协议中,给出了三种传递消息时能够提供的服务质量标准,这三种服务质量从低到高依次是:

用幂等性解决重复消息问题: 从对系统的影响结果来说:At least once + 幂等消费 = Exactly once。 从业务逻辑设计上入手,将消费的业务逻辑设计成具备幂等性的操作。

几种常用的设计幂等操作的方法:

  1. 利用数据库的唯一约束实现幂等,比如记录事件唯一操作流水号。流水号已经存在了,肯定不能再次执行了。
  2. 为更新的数据设置前置条件,比如操作的时候加一个状态,比如status代表是否操作过,如果已经操作过则不执行。
  3. 记录并检查操作,给每条消息指定一个全局唯一的 ID,消费时,先根据这个 ID 检查这条消息是否有被消费过。
hankviv commented 4 years ago

消息积压了该如何处理

优化性能来避免消息积压:

  1. 发送端性能优化: Producer 发消息给 Broker,Broker 收到消息后返回确认响应,这是一次完整的交互。它包括了下面这些步骤的耗时:
    • 发送端准备数据、序列化消息、构造请求等逻辑的时间,也就是发送端在发送网络请求之前的耗时;
    • 发送消息和返回响应在网络传输中的耗时;
    • Broker 处理消息的时延。 如果是单线程发送,每次只发送 1 条消息,那么每秒只能发送 1000ms / 1ms * 1 条 /ms = 1000 条 消息,这种情况下并不能发挥出消息队列的全部实力。 无论是增加每次发送消息的批量大小,还是增加并发,都能成倍地提升发送性能。选择批量发送还是增加并发,主要取决于发送端程序的业务性质。
  2. 消费端性能优化: 一定要保证消费端的消费性能要高于生产端的发送性能,这样的系统才能健康的持续运行。 消费端的性能优化除了优化消费业务逻辑以外,也可以通过水平扩容,增加消费端的并发数来提升总体的消费性能。 在扩容 Consumer 的实例数量的同时,必须同步扩容主题中的分区(也叫队列)数量,确保 Consumer 的实例数和分区数量是相等的。如果 Consumer 的实例数量超过分区数量,这样的扩容实际上是没有效果的。因为对于消费者来说,在每个分区上实际上只能支持单线程消费。

消息积压了该如何处理? 能导致积压突然增加,最粗粒度的原因,只有两种:要么是发送变快了,要么是消费变慢了。 如果是单位时间发送的消息增多,比如说是赶上大促或者抢购,短时间内不太可能优化消费端的代码来提升消费性能,唯一的方法是通过扩容消费端的实例数来提升总体的消费能力。 还有一种就是可能是消费失败导致一直在重复消费。这样也会拖慢消费速度。可以配置消费失败重试次数来解决。