Alice52 / java-ocean

java-tutorial .We intend to explain Java knowledge in this repository.
MIT License
0 stars 0 forks source link

[mq] mq knowledge #122

Open Alice52 opened 4 years ago

Alice52 commented 4 years ago

核心

  1. 消息队列技术选型
  2. 高可靠、高可用和高性能
  3. 消息不重复、不丢失
  4. 性能
  5. 扩展行: 水平
  6. 社区

分布式系统

  1. 最基本需求: 通信
  2. 特点: 3v-3h
    • 3v
      • 海量
      • 实时
      • 多样
    • 3h
      • 高并发
      • 高可靠
      • 高性能
  3. 底层技术面: 高性能通信、海量数据存储、高并发等.
  4. 消息队列:
    • 功能简洁
    • 结构清晰
    • 入门简单
    • 深度足够

knowledge list

mq-knowledge-list

  1. 应用

    • 日志
    • 监控
    • 微服务
    • 流计算
    • ETL
    • IoT
    • other
  2. 实现技术

    • 网络通信
    • 序列化反序列化
    • 一致性协议
    • 分布式事务
    • 异步编程
    • 数据压缩
    • 内存管理
    • 文件与高性能 IO
    • 高可用分布式系统

哪些问题适合使用消息队列来解决

  1. 异步处理: 秒杀

mq-seckill

  1. 流量控制:

    • 自身能力范围内尽可能多地处理请求, 拒绝处理不了的请求并且保证自身运行正常
    • 使用消息队列隔离网关和后端服务, 以达到流量控制和保护后端服务的目的

    mq-seckill-access-control

    • 能预估出秒杀服务的处理能力, 就可以用消息队列实现一个令牌桶
      • 单位时间内只发放固定数量的令牌到令牌桶中, 规定服务在处理请求之前必须先从令牌桶中拿出一个令牌,
      • 如果令牌桶中没有令牌, 则拒绝请求. 这样就保证单位时间内, 能处理的请求不超过发放令牌的数量, 起到了流量控制的作用.

    mq-seckill-token-bucket

Selection_076

  1. 服务解耦
  2. 作为发布 / 订阅系统实现一个微服务级系统间的观察者模式
  3. 连接流计算任务和数据
  4. 用于将消息广播给大量接收者

issue

  1. 消息堆积时不适合使用 RabbitMQ, 考虑使用 RocketMQ、Kafka 和 Pulsar
Alice52 commented 4 years ago

其他应用

  1. 用消息队列来做异构数据库之间的数据同步: 顺序问题
Alice52 commented 4 years ago

消息中间件的选型

  1. 要求

    • 开源
    • 消息的可靠传递: 确保不丢消息
    • Cluster: 支持集群, 确保不会因为某个节点宕机导致服务不可用
    • 性能: 具备足够好的性能, 能满足绝大多数场景的性能要求
  2. rabbit MQ: 队列模型

    • 优点:
      • AMQP 协议
      • 轻量快捷
      • exchange 模式的路由
      • 社区活跃
    • 缺点
      • 消息堆积
      • 性能相对不好: 每秒钟可以处理几万到十几万条消息
      • 不适合扩展和二次开发
  3. RocketMQ: 发布 - 订阅模型

    • 优点
      • 低时延
      • 性能: 每秒钟大概能处理几十万条消息
      • 每个主题包含多个队列, 通过多个队列来实现多实例并行生产和消费 进而水平扩展
      • RocketMQ 只在队列上保证消息的有序性,主题层面是无法保证消息的严格顺序的
    • 缺点
      • 周边生态系统的集成和兼容程度要略逊一筹
  4. Kafka: 发布 - 订阅模型

    • 优点
      • 周边生态系统的集成和兼容程度最好, 大数据和流计算领域
      • 设计上大量使用了批量和异步的思想
      • 性能: Kafka 的极限处理能力可以超过每秒 2000 万条消息
    • 缺点
      • 同步收发消息的响应时延比较高
      • 先攒一波再一起处理: Kafka 不太适合在线业务场景
  5. activeMQ

  6. Pulsar: 采用存储和计算分离的设计


different between MQ

  1. rabbitmq 通过 EXCHANGE 将 同一份消息 发送到多个 QUEUE

    rabbitmq-topic

  2. 其他 MQ 产品是将消息发送到 TOPIC, 订阅者逻辑接受

    mq-topic


conclusion

  1. 消息队列不是系统核心, 对*消息队列功能和性能都没有很高的要求, 只需要一个开箱即用易于维护的产品: RabbitMQ
  2. 处理在线业务, 比如在交易系统中用消息队列传递订单, 低延迟和金融级的稳定性, RocketMQ
  3. 处理海量的消息, 像收集日志、监控信息或是前端的埋点这类数据, 或是应用场景大量使用了大数据、流计算相关的开源产品, Kafka
Alice52 commented 4 years ago

issue

  1. 消息堆积时不适合使用 RabbitMQ, 考虑使用 RocketMQ、Kafka 和 Pulsar
  2. 为了确保消息的 由于网络或服务器故障丢失"请求 - 确认" 机制
    • 生产者发送消息到 broker, broker 需要回复确认, 否则生产者会重发
    • 消费者正确消费 broker 的消息之后, 需要回复确认, 否则会给消费者重发这条消息
    • issue: 带来了消息的顺序消费的问题 消息空洞 有序性
      • 每个主题在任意时刻, 至多只能有一个消费者实例在进行消费, 那就没法通过水平扩展消费者的数量来提升消费端总体的消费性能.为了解决这个问题, RocketMQ 在主题下面增加了队列的概念
Alice52 commented 4 years ago

分布式事务的实现

rocketMQ

rocketmq-transaction


reference

  1. rocket-mq
  2. rabbit-mq
Alice52 commented 4 years ago

丢消息

检测消息丢失的方法

  1. 使用分布式链路追踪系统
  2. 或者给每个消息都来一个一次递增的序号: 消息的有序性
    • Topic 不会严格顺序的消费, 只是分区顺序消费
    • Producer 是多实例的话, 则算出投放到指定分区, 且 每个 producer 维护各自序号
    • Consumer 实例的数量最好和分区数量一致, 做到 Consumer 和分区一一对应

确保消息可靠传递: Producer -- Broker -- Consumer

  1. 生产阶段

    • 请求确认机制,来保证消息的可靠传递
    • 编码: 正确处理返回值或者捕获异常
      • 同步发送时, 只要注意捕获异常即可
        try {
        RecordMetadata metadata = producer.send(record).get();
        System.out.println(" 消息发送成功。");
        } catch (Throwable e) {
        System.out.println(" 消息发送失败!");
        System.out.println(e);
        }
      • 异步发送时, 要回调方法里进行检查
        producer.send(record, (metadata, exception) -> {
        if (metadata != null) {
            System.out.println(" 消息发送成功。");
        } else {
            System.out.println(" 消息发送失败!");
            System.out.println(exception);
        }
        });
  2. 存储阶段

    • 只要 Broker 在正常运行,就不会出现丢失消息的问题
    • 如果对消息的可靠性要求非常高,可以通过配置 Broker 参数来避免因为宕机丢消息
    • 单个节点的 Broker,需要配置 Broker 参数,在收到消息后,将消息写入磁盘后再给Producer 返回确认响应
    • 集群,需要将 Broker 集群配置成:至少将消息发送到2 个以上的节点,再给客户端回复发送确认响应
  3. 消费阶段

    • 成功消费后, 才会给 Broker 发送消费确认响应
    • 编码: 不要在收到消息后就立即发送消费确认, 而是应该在执行完所有消费业务逻辑之后, 再发送消费确认
    def callback(ch, method, properties, body):
        print(" [x] 收到消息 %r" % body)
        # 在这儿处理收到的消息
        database.save(body)
        print(" [x] 消费完成 ")
        # 完成消费业务逻辑后发送消费确认响应
        ch.basic_ack(delivery_tag = method.delivery_tag)
    
    channel.basic_consume(queue='hello', on_message_callback=callback)
Alice52 commented 4 years ago

MQTT

  1. At most once: 至多一次
  2. At least once: 至少一次
  3. Exactly once: 恰好一次

重复消费: ACK 丢失

  1. 用幂等性解决重复消息问题: At least once + 幂等消费 = Exactly once
    • 幂等(Idempotence): 如果一个函数 f(x) 满足:f(f(x)) = f(x),则函数 f(x) 满足幂等性
    • 幂等: 其任意多次执行所产生的影响均与一次执行的影响相同

常用的设计幂等操作

  1. 利用数据库的唯一约束实现幂等

    • 转账: 转账流水表[转账单 ID、账户 ID 和变更金额], 账单 ID 和账户 ID 这两个字段联合 起来创建一个唯一约束, 只能插入一条记录, 之后异步异步操作更新用户余额
    • redis SETNX, INSERT IF NOT EXIST”语义的存储类系统
  2. 为更新的数据设置前置条件

    • 设置前置条件, 执行一次后改变, 之后就不会执行
    • 转账: 如果账户 X 当前的余额为 500 元,将余额加100 元
    • 给你的数据增加一个版本号属性
  3. 记录并检查操作

    • 在执行数据更新操作之前,先检查一下是否执行过这个更新操作
    • 在发送消息时,给每条消息指定一个全局唯一的 ID,消费时,先根据这个 ID 检查这条消息是否有被消费过,如果没有消费过,才更新数据,然后将消费状态置为已消费
    • 检查消费状态,然后更新数据并且设置消费状态
Alice52 commented 4 years ago

消息积压

  1. 发送端性能优化: 增加批量或者是增加并发

    • 发送消息的性能上不去,你需要优先检查一下,是不是发消息之前的业务逻辑耗时太多导致的
      • 发送网络请求之前: 发送端准备数据、序列化消息、构造请求等逻辑的时间
      • 发送消息和返回响应在网络传输中的耗时;
      • Broker 处理消息的时延
  2. 消费端性能优化: 增加并行的消费者需要同步扩容分区数量

    • 在扩容 Consumer 的实例数量的同时, 必须同步扩容主题中的分区(也叫队列)数量,确保 Consumer 的实例数和分区数量是相等的
      • 在每个分区上实际上只能支持单线程消费 Selection_075
      • 上图消息丢失由于内存内的消息没有被及时消费
  3. 排查

    • 发送消息的速度还是消费消息的速度和原来都没什么变化: 考虑是不是消费失败导致的
Alice52 commented 4 years ago

use dimensions

1. 环境

  1. 社区
  2. 安装-docker
  3. 性能
  4. 集群

2. 消息

  1. 结构
  2. 相关的概念
  3. 消息丢失
  4. 消息重复
  5. 消息积压
  6. 消息持久化
  7. 消息 retry: 生产者, 消费者
  8. 事务

3. 生产者

  1. 定时投递
  2. 异步投递
  3. 延时投递
  4. 批量投递

4. 消费者

  1. 消费幂等
  2. 顺序消费
  3. 延时消费
  4. ack
  5. 批量消费
  6. 并发消费
  7. 消费速度
    • 增加消费者
    • 提高Prefetch count
    • 多线程处理
    • 批量Ack
    • ...