RocketMQ - 消息存储机制
消息存储是 RocketMQ 中最为复杂和最为重要的一部分,本节将分别从 RocketMQ 的消息存储整体架构、PageCache 与 Mmap 内存映射以及 RocketMQ 中两种不同的刷盘方式三方面来分别展开叙述。
# 消息存储整体架构
消息存储架构图中主要有下面三个跟消息存储相关的⽂件构成。
# CommitLog
消息主体以及元数据的存储主体,存储 Producer 端写入的消息主体内容,消息内容不是定长的。单个文件大小默认 1G,文件名长度为 20 位,左边补零,剩余为起始偏移量,比如 00000000000000000000 代表了第一个文件,起始偏移量为 0,文件大小为 1G = 1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为 1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件。
# ConsumeQueue
消息消费队列,引入的目的主要是提高消息消费的性能,由于 RocketMQ 是基于主题 Topic 的订阅模式,消息消费是针对主题进行的,如果要遍历 Commitlog 文件中根据 Topic 检索消息是非常低效的。Consumer 即可根据 ConsumeQueue 来查找待消费的消息。其中,ConsumeQueue (逻辑消费队列) 作为消费消息的索引,保存了指定 Topic 下的队列消息在 CommitLog 中的起始物理偏移量 offset,消息大小 size 和消息 Tag 的 HashCode 值。ConsumeQueue 文件可以看成是基于 Topic 的 Commitlog 索引文件,故 ConsumeQueue 文件夹的组织方式如下: topic/queue/file 三层组织结构,具体存储路径为: $HOME/store/consumequeue/{topic}/{queueId}/{fileName}
。同样 ConsumeQueue 文件采取定长设计,每一个条目共 20 个字节,分别为 8 字节的 Commitlog 物理偏移量、4 字节的消息长度、8 字节 Tag HashCode,单个文件由 30W 个条目组成,可以像数组一样随机访问每一个条目,每个 ConsumeQueue 文件大小约 5.72M。
# IndexFile
IndexFile (索引文件) 提供了一种可以通过 Key 或时间区间来查询消息的方法 Index 文件的存储位置是: $HOME\store\index${fileName}
,文件名 fileName 是以创建时的时间戳命名的,固定的单个 IndexFile 文件大小约为 400M,一个 IndexFile 可以保存 2000W 个索引,IndexFile 的底层存储设计为在文件系统中实现 HashMap 结构故 RocketMQ 的索引文件其底层实现为 Hash 索引。
在上面的 RocketMQ 的消息存储整体架构图中可以看出,RocketMQ 采用的是混合型的存储结构,即为 Broker 单个实例下所有的队列共用一个日志数据文件 (即为 CommitLog) 来存储。RocketMQ 的混合型存储结构(多个 Topic 的消息实体内容都存储于一个 CommitLog 中)针对 Producer 和 Consumer 分别采用了数据和索引部分相分离的存储结构,Producer 发送消息至 Broker 端,然后 Broker 端使用同步或者异步的方式对消息刷盘持久化,保存至 CommitLog 中。只要消息被刷盘持久化至磁盘文件 CommitLog 中,那么 Producer 发送的消息就不会丢失。正因为如此,Consumer 也就肯定有机会去消费这条消息。当无法拉取到消息后,可以等下一次消息拉取,同时服务端也支持长轮询模式,如果一个消息拉取请求未拉取到消息,Broker 允许等待 30s 的时间,只要这段时间内有新消息到达,将直接返回给消费端。这里 RocketMQ 的具体做法是,使用 Broker 端的后台服务线程 ReputMessageService 不停地分发请求并异步构建 ConsumeQueue (逻辑消费队列) 和 IndexFile (索引文件)数据。
# 页缓存与内存映射
页缓存 (PageCache)是 OS 对文件的缓存,用于加速对文件的读写。一般来说,程序对文件进行顺序读写的速度几乎接近于内存的读写速度,主要原因就是由于 OS 使用 PageCache 机制对读写访问操作进行了性能优化,将一部分的内存用作 PageCache。对于数据的写入,OS 会先写入至 Cache 内,随后通过异步的方式由 pdflush 内核线程将 Cache 内的数据刷盘至物理磁盘上。对于数据的读取,如果一次读取文件时出现未命中 PageCache 的情况,OS 从物理磁盘上访问读取文件的同时,会顺序对其他相邻块的数据文件进行预读取。
在 RocketMQ 中,ConsumeQueue 逻辑消费队列存储的数据较少,并且是顺序读取在 PageCache 机制的预读取作用下,ConsumeQueue 文件的读性能几乎接近读内存,即使在有消息堆积情况下也不会影响性能。而对于 CommitLog 消息存储的日志数据文件来说,读取消息内容时候会产生较多的随机访问读取,严重影响性能。如果选择合适的系统 IO 调度算法,比如设置度算法为 Deadline
(此时块存储采用 SSD 的话),随机读的性能也会有所提升。
另外,RocketMQ 主要通过 MappedByteBuffer 对文件进行读写操作。其中,利用了 NIO 中的 Filechannel 模型将磁盘上的物理文件直接映射到用户态的内存地址中 (这种 Mmap 的方式减少了传统 IO 将磁盘文件数据在操作系统内核地址空间的缓冲区和用户应用程序地址空间的缓冲区之间来回进行拷贝的性能开销) ,将对文件的操作转化为直接对内存地址进行操作,从而极大地提高了文件的读写效率 (正因为需要使用内存映射机制,故 RocketMQ 的文件存储都使用定长结构来存储,方便一次将整个文件映射至内存)
# 消息刷盘
# 同步刷盘
如上图所示,只有在消息真正持久化至磁盘后 RocketMQ 的 Broker 端才会真正返回给 Producer 端一个成功的 ACK 响应。同步刷盘对 MQ 消息可靠性来说是一种不错的保障,但是性能上会有较大影响,一般适用于金融业务应用该模式较多。
# 异步刷盘
能够充分利用 OS 的 PageCache 的优势,只要消息写入 PageCache 即可将成功的 ACK 返回给 Producer 端。消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟提高了 MQ的性能和吞吐量。