RocketMQ 底层原理之存储设计
内容概述
# RocketMQ 底层原理之存储设计
- [RocketMQ 底层原理之存储设计](#RocketMQ 底层原理之存储设计)
# [1] Domain Model
领域模型(Domain Model)是对领域内的概念类或现实世界中对象的可视化表示。又称概念模型、领域对象模型、分析对象模型。它专注于分析问题领域本身,
发掘重要的业务领域概念,并建立业务领域概念之间的关系。

# [1.1] Message
Message 是RocketMQ 消息引擎中的主体。messageId 是全局唯一的。MessageKey 是业务系统(生产者)生成的,所以如果要结合业务,
可以使用MessageKey 作为业务系统的唯一索引。

另外Message 中的equals 方法和hashCode 主要是为了完成消息只处理一次(Exactly-Once)。
Exactly-Once 是指发送到消息系统的消息只能被消费端处理且仅处理一次,即使生产端重试消息发送导致某消息重复投递,
该消息在消费端也只被消费一次。
# [1.2] Topic
Tags 是在同一Topic 中对消息进行分类。
subTopics==Message Queue,其实在内存逻辑中,subTopics 是对Topics 的一个拓展,尤其是在MQTT 这种协议下,
在Topic 底下会有很多subTopics。
# [1.3] Queue
Queue 是消息物理管理单位,比如在RocketMQ 的控制台中,就可以看到每一个queue 中的情况。(比如消息的堆积情况、消息的TPS、QPS)
# [1.4] Offset
对于每一个Queue 来说都有Offset,这个是消费位点。
# [1.5] Group
业务场景中,如果有一堆发送者,一堆消费者,所以这里使用Group 的概念进行管理。
# [1.6] 对应关系
Message 与Topic 是多对一的关系,一个Topic 可以有多个Message。
Topic 到Queue 是一对多的关系,这个也是方便横向拓展,也就是消费的时候,这里可以有很多很多的Queue。
一个Queue 只有一个消费位点(Offset),所以Topic 和Offset 也是一对多的关系。
Topic 和Group 也是多对多的关系。
# [1.7] 消费并发度
从上面模型可以看出,要解决消费并发,就是要利用Queue,一个Topic 可以分出更多的queue,每一个queue 可以存放在不同的硬件上来提高并发。
# [1.8] 热点问题(顺序、重复)
前面讲过要确保消息的顺序,生产者、队列、消费者最好都是一对一的关系。但是这样设计,并发度就会成为消息系统的瓶颈(并发度不够)。
RocketMQ 不解决这个矛盾的问题。理由如下:
1. 乱序的应用实际大量存在
2. 队列无序并不意味着消息无序
另外还有消息重复,造成消息重复的根本原因是:网络不可达(网络波动)。所以如果消费者收到两条一样的消息,应该是怎么处理?
RocketMQ 不保证消息不重复,如果你的业务要严格确保消息不重复,需要在自己的业务端进行去重。
1. 消费端处理消息的业务逻辑保持幂等性
2. 确保每一条消息都有唯一的编号且保证消息处理成功与去重表的日志同时出现
# [2] RocketMQ的消息存储结构
RocketMQ 因为有高可靠性的要求(宕机不丢失数据),所以数据要进行持久化存储,因此RocketMQ 采用文件进行存储。
# [2.1] 存储文件

- commitLog: 消息存储目录
- config: 运行期间一些配置信息
- consumerqueue: 消息消费队列存储目录
- index: 消息索引文件存储目录
- abort: 如果存在改文件则Broker 非正常关闭
- checkpoint: 文件检查点,存储CommitLog 文件最后一次刷盘时间戳、consumerqueue 最后一次刷盘时间,index 索引文件最后一次刷盘时间戳。
# [2.2] 消息存储结构

RocketMQ 消息的存储是由ConsumeQueue 和CommitLog 配合完成的,消息真正的物理存储文件是CommitLog,ConsumeQueue
是消息的逻辑队列,
类似数据库的索引文件,存储的是指向物理存储的地址。每个Topic 下的每个Message Queue 都有一个对应的ConsumeQueue 文件。
- CommitLog: 存储消息的元数据
- ConsumerQueue: 存储消息在CommitLog 的索引
- IndexFile: 为了消息查询提供了一种通过key 或时间区间来查询消息的方法,这种通过IndexFile 来查找消息的方法不影响发送与消费消息的主流程

# [2.3] CommitLog
CommitLog 以物理文件的方式存放,每台Broker 上的CommitLog 被本机器所有ConsumeQueue 共享,
文件地址:${user.home}\store${commitlog}${fileName}。
在CommitLog 中,一个消息的存储长度是不固定的,RocketMQ 采取一些机制,尽量向CommitLog 中顺序写,但是**随机读
**。
commitlog 文件默认大小为lG ,可通过在broker 置文件中设置mappedFileSizeCommitLog 属性来改变默认大小。

Commitlog 文件存储的逻辑视图如下,每条消息的前面4 个字节存储该条消息的总长度。但是一个消息的存储长度是不固定的。

每个CommitLog 文件的大小为1G,一般情况下第一个CommitLog 的起始偏移量为0,
第二个CommitLog 的起始偏移量为1073741824 (1G = 1073741824byte)。
每台Rocket 只会往一个commitlog 文件中写,写完一个接着写下一个。
indexFile 和ComsumerQueue 中都有消息对应的物理偏移量,通过物理偏移量就可以计算出该消息位于哪个CommitLog 文件上。
# [2.4] ConsumeQueue
ConsumeQueue 是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。每个Topic 下的每个Message Queue 都有一个对应的
ConsumeQueue 文件,
文件地址:${$storeRoot}\consumequeue${topicName}${queueld}${fileName}。


ConsumeQueue 中存储的是消息条目,为了加速ConsumeQueue 消息条目的检索速度与节省磁盘空间,每一个Consumequeue
条目不会存储消息的全量信息,
消息条目如下:

ConsumeQueue 即为Commitlog 文件的索引文件, 其构建机制是当消息到达Commitlog 文件后由专门的线程产生消息转发任务,
从而构建消息消费队列文件(ConsumeQueue )与下文提到的索引文件。
存储机制这样设计有以下几个好处:
1)、CommitLog 顺序写,可以大大提高写入效率。
实际上,磁盘有时候会比你想象的快很多,有时候也比你想象的慢很多,关键在如何使用,使用得当,磁盘的速度完全可以匹配上网络的数据传输速度。
目前的高性能磁盘,顺序写速度可以达到600MB/s ,超过了一般网卡的传输速度,这是磁盘比想象的快的地方但是磁盘随机写的速度只有大概lOOKB/s,
和顺序写的性能相差6000 倍!
2)、虽然是随机读,但是利用操作系统的pagecache 机制,可以批量地从磁盘读取,作为cache 存到内存中,加速后续的读取速度。
3)、为了保证完全的顺序写,需要ConsumeQueue 这个中间结构,因为ConsumeQueue 里只存偏移量信息,所以尺寸是有限的,在实际情况中, 大部分的ConsumeQueue 能够被全部读入内存,所以这个中间结构的操作速度很快,可以认为是内存读取的速度。 此外为了保证CommitLog 和ConsumeQueue 的一致性, CommitLog 里存储了Consume Queues 、Message Key、Tag 等所有信息, 即使ConsumeQueue 丢失,也可以通过commitLog 完全恢复出来。
# [2.5] IndexFile
RocketMQ 还支持通过MessageID 或者MessageKey 来查询消息;使用ID 查询时,因为ID 就是用broker+offset 生成的(这里msgId
指的是服务端的),
所以很容易就找到对应的commitLog 文件来读取消息。但是对于用MessageKey 来查询消息,RocketMQ 则通过构建一个index
来提高读取速度。
index 存的是索引文件,这个文件用来加快消息查询的速度。消息消费队列RocketMQ 专门为消息订阅构建的索引文件,提高根据主题与消息检索消息的速度,
使用Hash 索引机制,具体是Hash 槽与Hash 冲突的链表结构。

# [2.6] Config
config 文件夹中存储着Topic 和Consumer 等相关信息。主题和消费者群组相关的信息就存在在此。
- topics.json : topic 配置属性。
- subscriptionGroup.json :消息消费组配置信息。
- delayOffset.json :延时消息队列拉取进度。
- consumerOffset.json :集群消费模式消息消进度。
- consumerFilter.json :主题消息过滤信息。

# [2.7] 其他
abort:如果存在abort 文件说明Broker 非正常闭,该文件默认启动时创建,正常退出之前删除。
checkpoint:文件检测点,存储commitlog 文件最后一次刷盘时间戳、consumequeue 最后一次刷盘时间、
index 索引文件最后一次刷盘时间戳。
# [3] 过期文件删除
由于RocketMQ 操作CommitLog,ConsumeQueue 文件是基于内存映射机制并在启动的时候会加载commitlog,ConsumeQueue 目录下的所有文件,
为了避免内存与磁盘的浪费,不可能将消息永久存储在消息服务器上,所以需要引入一种机制来删除己过期的文件。
删除过程分别执行清理消息存储文件( Commitlog )与消息消费队列文件( ConsumeQueue 文件),
消息消费队列文件与消息存储文件(Commitlog)共用一套过期文件机制。
RocketMQ 清除过期文件的方法是:如果非当前写文件在一定时间间隔内没有再次被更新,则认为是过期文件,可以被删除,
RocketMQ 不会关注这个文件上的消息是否全部被消费。默认每个文件的过期时间为42 小时(不同版本的默认值不同,这里以4.4.0 为例),
通过在Broker 配置文件中设置fileReservedTime 来改变过期时间,单位为小时。
触发文件清除操作的是一个定时任务,而且只有定时任务,文件过期删除定时任务的周期由该删除决定,默认每10s 执行一次。
# [3.1] 过期判断
文件删除主要是由这个配置属性:
- fileReservedTime:文件保留时间。(
也就是从最后一次更新时间到现在,如果超过了该时间,则认为是过期文件,可以删除。)
另外还有其他两个配置参数:
- deletePhysicFilesInterval: 删除物理文件的时间间隔(默认是100MS),在一次定时任务触发时,可能会有多个物理文件超过过期时间可被删除, 因此删除一个文件后需要间隔deletePhysicFilesInterval 这个时间再删除另外一个文件,由于删除文件是一个非常耗费IO 的操作, 会引起消息插入消费的延迟(相比于正常情况下),所以不建议直接删除所有过期文件。
- destroyMapedFileIntervalForcibly: 在删除文件时,如果该文件还被线程引用,此时会阻止此次删除操作,同时将该文件标记不可用并且纪录当 前时间戳destroyMapedFileIntervalForcibly 这个表示文件在第一次删除拒绝后,文件保存的最大时间,在此时间内一直会被拒绝删除, 当超过这个时 间时,会将引用每次减少1000,直到引用小于等于0 为止,即可删除该文件。
# [3.2] 删除条件
1)指定删除文件的时间点, RocketMQ 通过deleteWhen 设置一天的固定时间执行一次。删除过期文件操作, 默认为凌晨4 点。
2)磁盘空间是否充足,如果磁盘空间不充足(DiskSpaceCleanForciblyRatio。磁盘空间强制删除文件水位。默认是85)
,会触发过期文件删除操作。
另外还有RocketMQ 的磁盘配置参数:
- 物理使用率大于diskSpaceWarningLevelRatio(默认90%可通过参数设置),则会阻止新消息的插入。
- 物理磁盘使用率小于diskMaxUsedSpaceRatio(默认75%)表示磁盘使用正常。
# [4] 零拷贝与MMAP
# [4.1] 什么是零拷贝?
零拷贝(Zero-copy) 技术是指计算机执行操作时,CPU 不需要先将数据从某处内存复制到另一个特定区域。这种技术通常用于通过网络传输文件时节省CPU
周期和内存带宽。
零拷贝技术可以减少数据拷贝和共享总线操作的次数,消除传输数据在存储器之间不必要的中间拷贝次数,从而有效地提高数据传输效率。
零拷贝技术减少了用户进程地址空间和内核地址空间之间因为上:下文切换而带来的开销。
可以看出没有说不需要拷贝,只是说减少冗余[不必要]的拷贝。
下面这些组件、框架中均使用了零拷贝技术:Kafka、Netty、Rocketmq、Nginx、Apache。
# [4.2] 传统数据传送机制
比如:读取文件,再用socket 发送出去,实际经过四次copy。
伪码实现如下:
buffer = File.read()
Socket.send(buffer)
2
1、第一次:将磁盘文件,读取到操作系统内核缓冲区;
2、第二次:将内核缓冲区的数据,copy 到应用程序的buffer;
3、第三步:将application 应用程序buffer 中的数据,copy 到socket 网络发送缓冲区(属于操作系统内核的缓冲区);
4、第四次:将socket buffer 的数据,copy 到网卡,由网卡进行网络传输。

分析上述的过程,虽然引入DMA 来接管CPU 的中断请求,但四次copy 是存在“不必要的拷贝”的。实际上并不需要第二个和第三个数据副本。
应用程序除了缓存数据并将其传输回套接字缓冲区之外什么都不做。相反,数据可以直接从读缓冲区传输到套接字缓冲区。
显然,第二次和第三次数据copy 其实在这种场景下没有什么帮助反而带来开销(DMA 拷贝速度一般比CPU
拷贝速度快一个数量级),
这也正是零拷贝出现的背景和意义。
举个例子:200M 的数据,读取文件,再用socket 发送出去,实际经过四次copy(2 次cpu 拷贝每次100ms ,2 次DMS 拷贝每次10ms)
传统网络传输的话:合计耗时将有220ms
同时,read 和send 都属于系统调用,每次调用都牵涉到两次上下文切换:

总结下,传统的数据传送所消耗的成本:4 次拷贝,4 次上下文切换。
注意,4 次拷贝,其中两次是DMA copy,两次是CPU copy。
# [4.3] mmap 内存映射
硬盘上文件的位置和应用程序缓冲区(application buffers)进行映射(建立一种一一对应关系),由于mmap()将文件直接映射到用户空间,
所以实际文件读取时根据这个映射关系,直接将文件从硬盘拷贝到用户空间,只进行了一次数据拷贝,不再有文件内容从硬盘拷贝到内核空间的一个缓冲区。
mmap 内存映射将会经历:3 次拷贝: 1 次cpu copy,2 次DMA copy;
举个例子:200M 的数据,读取文件,再用socket 发送出去,如果是使用MMAP 实际经过三次copy(1 次cpu 拷贝每次100ms ,
2 次DMS 拷贝每次10ms)合计只需要120ms。
从数据拷贝的角度上来看,就比传统的网络传输,性能提升了近一倍。 
mmap()是在<sys/mman.h> 中定义的一个函数,此函数的作用是创建一个新的虚拟内存区域,并将指定的对象映射到此区域。
mmap 其实就是通过内存映射的机制来进行文件操作。
Windows 操作系统上也有虚拟机内存,如下图:

# [4.4] 代码演示


# [5] RocketMQ 中MMAP 运用
如果按照传统的方式进行数据传送,那肯定性能上不去,作为MQ 也是这样,尤其是RocketMQ,要满足一个高并发的消息中间件,一定要进行优化。
所以RocketMQ 使用的是MMAP。
RocketMQ 一个映射文件大概是,commitlog 文件默认大小为lG。
这里需要注意的是,采用MappedByteBuffer 这种内存映射的方式有几个限制,其中之一是一次只能映射1.5~2G
的文件至用户态的虚拟内存,
这也是为何RocketMQ 默认设置单个CommitLog 日志数据文件为1G 的原因了。
# [5.1] MMAP 文件对应

# [5.2] RocketMQ 源码中的MMAP 运用
RocketMQ 源码中,使用MappedFile 这个类类进行MMAP 的映射。

# [6] RocketMQ 存储整体设计总结
# [6.1] 消息生产与消息消费相互分离
Producer 端发送消息最终写入的是CommitLog(消息存储的日志数据文件),Consumer 端先从ConsumeQueue(消息逻辑队列)读取持久化消息的 起始物理位置偏移量offset、大小size 和消息Tag 的HashCode 值,随后再从CommitLog 中进行读取待拉取消费消息的真正实体内容部分;
# [6.2] RocketMQ 的CommitLog 文件采用混合型存储
所有的Topic 下的消息队列共用同一个CommitLog 的日志数据文件,并通过建立类似索引文件—ConsumeQueue 的方式来区分不同Topic 下面的不 同MessageQueue 的消息,同时为消费消息起到一定的缓冲作用(异步服务线生成了ConsumeQueue 队列的信息后,Consumer 端才能进行消费)。 这样,只要消息写入并刷盘至CommitLog 文件后,消息就不会丢失,即使ConsumeQueue 中的数据丢失,也可以通过CommitLog 来恢复。
# [6.3] RocketMQ 每次读写文件的时候真的是完全顺序读写吗?
发送消息时,生产者端的消息确实是顺序写入CommitLog;订阅消息时,消费者端也是顺序读取ConsumeQueue,然而根据其中的起始物理位置 偏移量offset 读取消息真实内容却是随机读取CommitLog。所以在RocketMQ 集群整体的吞吐量、并发量非常高的情况下, 随机读取文件带来的性能开销影响还是比较大的,RocketMQ 怎么优化的,源码解读部分进行讲解。