Open lihongjie0209 opened 4 years ago
RocketMQ 的具体消息存储结构是怎样的呢?如何尽量保证顺序写的呢?先来看看整体的架构图,如图5-1所示。
RocketMQ 消息的存储是由 ConsumeQueue 和 CommitLog 配合完成的,消息真正的物理存储文件是 CommitLog,ConsumeQueue 是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。每个 Topic 下的每个 Message Queue 都有一个对应的 ConsumeQueue 文件。文件地址在 {storeRoot}\consumequeue\${topicName}\${queueId}\${fileName}。
图5-1 RocketMQ 的存储结构图
CommitLog 以物理文件的方式存放,每台 Broker 上的 CommitLog 被本机器所有 ConsumeQueue 共享,文件地址:${user.home}\store\${commitlog}\${fileName}。在 CommitLog 中,一个消息的存储长度是不固定的,RocketMQ 采取一些机制,尽量向 CommitLog 中顺序写,但是随机读。ConsumeQueue的内容也会被写到磁盘里作持久存储。
存储机制这样设计有以下几个好处:
1)CommitLog 顺序写,可以大大提高写入效率。
2)虽然是随机读,但是利用操作系统的 pagecache 机制,可以批量地从磁盘读取,作为 cache 存到内存中,加速后续的读取速度。
3)为了保证完全的顺序写,需要 ConsumeQueue 这个中间结构,因为 ConsumeQueue 里只存偏移量信息,所以尺寸是有限的,在实际情况中,大部分的 ConsumeQueue 能够被全部读入内存,所以这个中间结构的操作速度很快,可以认为是内存读取的速度。此外为了保证 CommitLog 和 ConsumeQueue 的一致性,CommitLog 里存储了 Consume Queues、Message Key、Tag 等所有信息,即使 ConsumeQueue 丢失,也可以通过 commitLog 完全恢复出来。
如图5-2所示是一个 Broker 在文件系统中存储的各个文件。我们可以看到 commitlog 文件夹、consumequeue 文件夹,还有在 config 文件夹中 Topic、Consumer 的相关信息。最下面那个文件夹 index 存的是索引文件,这个文件用来加快消息查询的速度。
RocketMQ 分布式集群是通过 Master 和 Slave 的配合达到高可用性的,首先说一下 Master 和 Slave 的区别:在 Broker 的配置文件中,参数 brokerId 的值为0表明这个 Broker 是 Master,大于0表明这个 Broker 是 Slave,同时 brokerRole 参数也会说明这个 Broker 是 Master 还是 Slave。Master 角色的 Broker 支持读和写,Slave 角色的 Broker 仅支持读,也就是 Producer 只能和 Master 角色的 Broker 连接写入消息;Consumer 可以连接 Master 角色的 Broker,也可以连接 Slave 角色的 Broker 来读取消息。
在 Consumer 的配置文件中,并不需要设置是从 Master 读还是从 Slave 读,当 Master 不可用或者繁忙的时候,Consumer 会被自动切换到从 Slave 读。有了自动切换 Consumer 这种机制,当一个 Master 角色的机器出现故障后,Consumer 仍然可以从 Slave 读取消息,不影响 Consumer 程序。这就达到了消费端的高可用性。
如何达到发送端的高可用性呢?在创建 Topic 的时候,把 Topic 的多个 Message Queue 创建在多个 Broker 组上(相同 Broker 名称,不同 brokerId 的机器组成一个 Broker 组),这样当一个 Broker 组的 Master 不可用后,其他组的 Master 仍然可用,Producer 仍然可以发送消息。RocketMQ 目前还不支持把 Slave 自动转成 Master,如果机器资源不足,需要把 Slave 转成 Master,则要手动停止 Slave 角色的 Broker,更改配置文件,用新的配置文件启动 Broker。
RocketMQ 的消息是存储到磁盘上的,这样既能保证断电后恢复,又可以让存储的消息量超出内存的限制。RocketMQ 为了提高性能,会尽可能地保证磁盘的顺序写。消息在通过 Producer 写入 RocketMQ 的时候,有两种写磁盘方式,下面逐一介绍。
异步刷盘方式:在返回写成功状态时,消息可能只是被写入了内存的 PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入。
同步刷盘方式:在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的 PAGECACHE 后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态。
同步复制和异步复制 如果一个 Broker 组有 Master 和 Slave,消息需要从 Master 复制到 Slave 上,有同步和异步两种复制方式。同步复制方式是等 Master 和 Slave 均写成功后才反馈给客户端写成功状态;异步复制方式是只要 Master 写成功即可反馈给客户端写成功状态。
这两种复制方式各有优劣,在异步复制方式下,系统拥有较低的延迟和较高的吞吐量,但是如果 Master 出了故障,有些数据因为没有被写入 Slave,有可能会丢失;在同步复制方式下,如果 Master 出故障,Slave 上有全部的备份数据,容易恢复,但是同步复制会增大数据写入延迟,降低系统吞吐量。
同步复制和异步复制是通过 Broker 配置文件里的 brokerRole 参数进行设置的,这个参数可以被设置成 ASYNC_MASTER、SYNC_MASTER、SLAVE 三个值中的一个。
实际应用中要结合业务场景,合理设置刷盘方式和主从复制方式,尤其是 SYNC_FLUSH 方式,由于频繁地触发磁盘写动作,会明显降低性能。通常情况下,应该把 Master 和 Save 配置成 ASYNC_FLUSH的刷盘方式,主从之间配置成 SYNC_MASTER 的复制方式,这样即使有一台机器出故障,仍然能保证数据不丢,是个不错的选择。
在 Broker 端进行消息过滤,可以减少无效消息发送到 Consumer,少占用网络带宽从而提高吞吐量。Broker 端有三种方式进行消息过滤。
对一个应用来说,尽可能只用一个 Topic,不同的消息子类型用 Tag 来标识(每条消息只能有一个 Tag),服务器端基于 Tag 进行过滤,并不需要读取消息体的内容,所以效率很高。发送消息设置了 Tag 以后,消费方在订阅消息时,才可以利用 Tag 在 Broker 端做消息过滤。
其次是消息的 Key。对发送的消息设置好 Key,以后可以根据这个 Key 来查找消息。所以这个 Key 一般用消息在业务层面的唯一标识码来表示,这样后续查询消息异常,消息丢失等都很方便。Broker 会创建专门的索引文件,来存储 Key 到消息的映射,由于是哈希索引,应尽量使 Key 唯一,避免潜在的哈希冲突。
Tag 和 Key 的主要差别是使用场景不同,Tag 用在 Consumer 的代码中,用来进行服务端消息过滤,Key 主要用于通过命令行查询消息。
用 Tag 方式进行过滤的方法是传入感兴趣的 Tag 标签,Tag 标签是一个普通字符串,是在创建 Message 的时候添加的,一个 Message 只能有一个 Tag。使用 Tag 方式过滤非常高效,Broker 端可以在 ConsumeQueue 中做这种过滤,只从 CommitLog 里读取过滤后被命中的消息。看一下 ConsumerQueue 的存储格式,如图7-1所示。
使用 Tag 方式过滤虽然高效,但是支持的逻辑比较简单,在构造 Message 的时候,还可以通过 putUserProperty 函数来增加多个自定义的属性,基于这些属性可以做复杂的过滤逻辑,如代码清单7-1所示。
代码清单7-1 在消息中增加自定义属性
Message msg = new Message("TopicTest",
tag,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// Set some properties.
msg.putUserProperty("a", String.valueOf(i));
msg.putUserProperty("b", “hello”);
代码中这个消息就有了两个特殊的属性值 a 和 b,我们用类似 SQL 表达式的方式对消息进行过滤,用法如下(目前只支持在 PushConsumer 中实现这种过滤):
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4"); // only subsribe messages have property a, also a >=0 and a <= 3 consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
consumer.registerMessageListener(new MessageListenerConcurrently()
{ @Override public ConsumeConcurrentlyStatus consumeMessage
(List<MessageExt> msgs, ConsumeConcurrentlyContext context)
{ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start();
类似 SQL 的过滤表达式,支持如下语法:
数字对比,比如>、>=、<、<=、BETWEEN、=;
字符串对比,比如=、<>、IN;
IS NULL or IS NOT NULL;
逻辑符号 AND、OR、NOT。
支持的数据类型:
数字型,比如123、3.1415;
字符型,比如'abc'、注意必须用单引号;
NULL,这个特殊字符;
布尔型,TRUEorFALSE。
SQL 表达式方式的过滤需要 Broker 先读出消息里的属性内容,然后做 SQL 计算,增大磁盘压力,没有 Tag 方式高效。
当 Consumer 的处理速度跟不上消息的产生速度,会造成越来越多的消息积压,这个时候首先查看消费逻辑本身有没有优化空间,除此之外还有三种方法可以提高 Consumer 的处理能力。
(1)提高消费并行度
在同一个 ConsumerGroup 下(Clustering 方式),可以通过增加 Consumer 实例的数量来提高并行度,通过加机器,或者在已有机器中启动多个 Consumer 进程都可以增加 Consumer 实例数。注意总的 Consumer 数量不要超过 Topic 下 Read Queue 数量,超过的 Consumer 实例接收不到消息。此外,通过提高单个 Consumer 实例中的并行处理的线程数,可以在同一个 Consumer 内增加并行度来提高吞吐量(设置方法是修改 consumeThreadMin 和 consumeThreadMax)。
(2)以批量方式进行消费
某些业务场景下,多条消息同时处理的时间会大大小于逐个处理的时间总和,比如消费消息中涉及 update 某个数据库,一次 update10 条的时间会大大小于十次 update1 条数据的时间。这时可以通过批量方式消费来提高消费的吞吐量。实现方法是设置 Consumer 的 consumeMessageBatchMaxSize 这个参数,默认是1,如果设置为 N,在消息多的时候每次收到的是个长度为 N 的消息链表。
(3)检测延时情况,跳过非重要消息
Consumer 在消费的过程中,如果发现由于某种原因发生严重的消息堆积,短时间无法消除堆积,这个时候可以选择丢弃不重要的消息,使 Consumer 尽快追上 Producer 的进度,如代码清单7-4所示。
代码清单7-4 判断消息堆积并处理示例
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
long Offset = msgs.get(0).getQueueOffset();
String maxOffset = msgs.get(0).getProperty(Message.PROPERTY_MAX_OFFSET); long diff = Long.parseLong(maxOffset) - Offset;
if (diff > 90000) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
//正常消费消息
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }
如代码所示,当某个队列的消息数堆积到90000条以上,就直接丢弃,以便快速追上发送消息的进度。
发送一条消息出去要经过三步,一是客户端发送请求到服务器,二是服务器处理该请求,三是服务器向客户端返回应答,一次消息的发送耗时是上述三个步骤的总和。在一些对速度要求高,但是可靠性要求不高的场景下,比如日志收集类应用,可以采用 Oneway 方式发送,Oneway 方式只发送请求不等待应答,即将数据写入客户端的 Socket 缓冲区就返回,不等待对方返回结果,用这种方式发送消息的耗时可以缩短到微秒级。
另一种提高发送速度的方法是增加 Producer 的并发量,使用多个 Producer 同时发送,我们不用担心多 Producer 同时写会降低消息写磁盘的效率,RocketMQ 引入了一个并发窗口,在窗口内消息可以并发地写入 DirectMem 中,然后异步地将连续一段无空洞的数据刷入文件系统当中。顺序写 CommitLog 可让 RocketMQ 无论在 HDD 还是 SSD 磁盘情况下都能保持较高的写入性能。目前在阿里内部经过调优的服务器上,写入性能达到90万+的 TPS,我们可以参考这个数据进行系统优化。
在 Linux 操作系统层级进行调优,推荐使用 EXT4 文件系统,IO 调度算法使用 deadline 算法。
如图7-3所示,EXT4 创建/删除文件的性能比 EXT3 及其他文件系统要好,RocketMQ 的 CommitLog 会有频繁的创建/删除动作。
消息存储和发送
分布式队列因为有高可靠性的要求,所以数据要通过磁盘进行持久化存储。用磁盘存储消息,速度会不会很慢呢?能满足实时性和高吞吐量的要求吗?
实际上,磁盘有时候会比你想象的快很多,有时候也会比你想象的慢很多,关键在如何使用,使用得当,磁盘的速度完全可以匹配上网络的数据传输速度。目前的高性能磁盘,顺序写速度可以达到 600MB/s,超过了一般网卡的传输速度,这是磁盘比想象的快的地方。但是磁盘随机写的速度只有大概 100KB/s,和顺序写的性能相差6000倍!因为有如此巨大的速度差别,好的消息队列系统会比普通的消息队列系统速度快多个数量级。
举个例子,Linux 操作系统分为“用户态”和“内核态”,文件操作、网络操作需要涉及这两种形态的切换,免不了进行数据复制,一台服务器把本机磁盘文件的内容发送到客户端,一般分为两个步骤:
1)read(file,tmp_buf,len);,读取本地文件内容;
2)write(socket,tmp_buf,len);,将读取的内容通过网络发送出去。
tmp_buf 是预先申请的内存,这两个看似简单的操作,实际进行了4次数据复制,分别是:从磁盘复制数据到内核态内存,从内核态内存复制到用户态内存(完成了 read(file,tmp_buf,len));然后从用户态内存复制到网络驱动的内核态内存,最后是从网络驱动的内核态内存复制到网卡中进行传输(完成 write(socket,tmp_buf,len))。
通过使用 mmap 的方式,可以省去向用户态的内存复制,提高速度。这种机制在 Java 中是通过 MappedByteBuffer 实现的,具体可以参考 Java 7的文档:https://docs.oracle.com/javase/7/docs/api/java/nio/MappedByteBuffer.html。RocketMQ 充分利用了上述特性,也就是所谓的“零拷贝”技术,提高消息存盘和网络发送的速度。