领域模型
Message(消息)
? ? ? ? ? ? ? ?Message是RocketMQ消息引擎的主体。messageId是全局唯一的。MessageKey是业务系统生成的,可以使用MessageKey作为业务系统的唯一索引
Topic(主题)和tage(标签)
? ? ? ? ? ? ? Topic表示一类消息的集合,Tags是表示在同一Topic中对消息进行分类,subTopics==Message Queue,其实在内存逻辑中,subTopics是对Topics的一个拓展,尤其是在MQTT这种协议下,在Topic底下会有很多subTopics。
Queue(消息物理管理单位)
? ? ? ? ? ? ? Queue是消息物理管理单位,一个Topic下可以有一个Queue或多个Queue。在RocketMq的控制台中可以看到每一个Queue的具体情况(比如·消息的堆积情况,消息的TPS,QPS)
Offset(偏移量)
? ? ? ? ? ? ?每一个Queue都有offset,这个是消费位点,记录消息消费的起始位置
Group(组)
? ? ? ? ? ? 业务场景中,如果有一堆发送者,一堆消费者,所以这里使用Group的概念进行管理。一个组可以订阅多个Topic
消费并发度
? ? ? ? ? ?从上面模型可以看出,要解决消费并发,就是要利用Queue,一个Topic可以分出更多的queue,每一个queue可以存放在不同的硬件上来提高并发。
消费的顺序跟重复
? ? ? ? ? ? 要确保消息的顺序,生产者、队列、消费者最好都是一对一的关系。但是这样设计,并发度就会成为消息系统的瓶颈(并发度不够)
RocketMQ不解决这个矛盾的问题。理由如下:
? ? ? 1、 乱序的应用实际大量存在
? ? ? ?2、 队列无序并不意味着消息无序
? ? ? ? ? ?消息重复,造成消息重复的根本原因是:网络不可达(网络波动)。所以如果消费者收到两条一样的消息,应该是怎么处理?
RocketMQ不保证消息不重复,如果你的业务要严格确保消息不重复,需要在自己的业务端进行去重。
????????1、消费端处理消息的业务逻辑保持幂等性
????????2、 确保每一条消息都有唯一的编号且保证消息处理成功与去重表的日志同时出现
消息存储结构
? ? ? ? ? RocketMQ有高可靠性(宕机不丢失数据),数据要进行持久化存储。所以RocketMQ 采用文件进行存储
存储文件
commitLog:消息存储目录
config:运行期间一些配置信息
consumerqueue:消息消费队列存储目录
index:消息索引文件存储目录
abort:如果存在改文件则Broker非正常关闭
checkpoint:文件检查点存储CommitLog文件最后一次刷盘时间戳、consumerqueue最后一次刷盘时间,index索引文件最后一次刷盘时间戳。
消息存储结构
????????????????RocketMQ消息的存储是由ConsumeQueue和CommitLog配合完成 的,消息真正的物理存储文件是CommitLog,ConsumeQueue是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。每 个Topic下的每个Message Queue都有一个对应的ConsumeQueue文件。
CommitLog:存储消息的元数据
ConsumerQueue:存储消息在CommitLog的索引
IndexFile:为了消息查询提供了一种通过key或时间区间查询消息的方法,这种通过IndexFile来查找消息的方法不影响发送与消费消息的主流程
CommitLog
????????????????CommitLog 以物理文件的方式存放,每台 Broker 上的 CommitLog 被本机器所有 ConsumeQueue 共享,一个消息的存储长度是不固定的,RocketMQ采取一些机制,尽量向CommitLog 中顺序写 ,但是随机读。commitlog 文件默认大小为1G ,可通过在 broker 置文件中设置 mappedFileSizeCommitLog属性来改变默认大小
每台Rocket只会往一个commitlog文件中写,写完一个接着写下一个。indexFile和ComsumerQueue 中都有消息对应的物理偏移量,通过物理偏移量就可以计算出该消息位于哪个 CommitLog 文件上
ConsumeQueue
? ?????????ConsumeQueue 是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。每个Topic下的每个 Message Queue 都有一个对应的 ConsumeQueue 文件
????????ConsumeQueue 即为Commitlog 文件的索引文件, 其构建机制是 当消息到达 Commitlog 文件后 由专门的线程产生消息转发任务,从而构建消息消费队列文件(ConsumeQueue )与下文提到的索引文件。
存储机制这样设计有以下几个好处:
1 ) CommitLog 顺序写 ,可以大大提高写入效率。
(实际上,磁盘有时候会比你想象的快很多,有时候也比你想象的慢很多,关键在如何使用,使用得当,磁盘的速度完全可以匹配上网络的数据传输速度。目前的高性能磁盘,顺序写速度可以达到600MB/s ,超过了一般网卡的传输速度,这是磁盘比想象的快的地方 但是磁盘随机写的速度只有大概lOOKB/s,和顺序写的性能相差 6000 倍!)
2 )虽然是随机读,但是利用操作系统的 pagecache 机制,可以批量地从磁盘读取,作为 cache 存到内存中,加速后续的读取速度。
3 )为了保证完全的顺序写,需要 ConsumeQueue 这个中间结构 ,因为ConsumeQueue 里只存偏移量信息,所以尺寸是有限的,在实际情况中,大部分的 ConsumeQueue 能够被全部读入内存,所以这个中间结构的操作速度很快,可以认为是内存读取的速度。此外为了保证 CommitLog和ConsumeQueue 的一致性, CommitLog 里存储了 Consume Queues 、Message Key、 Tag 等所有信息,即使 ConsumeQueue 丢失,也可以通过 commitLog 完全恢复出来。
IndexFile
? ? ? ? RocketMQ还支持通过MessageID或者MessageKey来查询消息;使用ID查询时,因为ID就是用broker+offset生成的(这里msgId指的是服务端的),所以很容易就找到对应的commitLog文件来读取消息。但是对于用MessageKey来查询消息,RocketMQ则通过构建一个index来提高读取速度。
? ? ? ? index 存的是索引文件,这个文件用来加快消息查询的速度。消息消费队列 RocketMQ 专门为消息订阅构建的索引文件 ,提高根据主题与消息检索消息的速度 ,使用Hash索引机制,具体是Hash槽与Hash冲突的链表结构。(这里不做过多解释)
Config
config 文件夹中 存储着Topic和Consumer等相关信息。主题和消费者群组相关的信息就存在在此。
topics.json : topic 配置属性
subscriptionGroup.json :消息消费组配置信息。
delayOffset.json :延时消息队列拉取进度。
consumerOffset.json ?:集群消费模式消息消进度。
consumerFilter.json :主题消息过滤信息。
其他
abort :如果存在 abort 文件说明 Broker 非正常闭,该文件默认启动时创建,正常退出之前删除
checkpoint :文件检测点,存储 commitlog 文件最后一次刷盘时间戳 consumequeue最后一次刷盘时间、 index 索引文件最后一次刷盘时间戳。
?过期文件删除
? ? ? ? ? ?删除过程分别执行清理消息存储文件(Commitlog )与消息消费队列文件( ConsumeQueue 文件), 消息消费队列文件与消息存储文件共用一套过期文件机制。
? ? ? ? ? RocketMQ 清除过期文件的方法是 :如果非当前写文件在一定时间间隔内没有再次被更新,则认为是过期文件,可以被删除, RocketMQ 不会关注这个文件上的消息是否全部被消费。默认每个文件的过期时间为 42小时(不同版本的默认值不同,这里以4.4.0为例) ,通过在 Broker 配置文件中设置 fileReservedTime 来改变过期时间,单位为小时。触发文件清除操作的是一个定时任务,而且只有定时任务,文件过期删除定时任务的周期由该删除决定,默认每10s执行一次。
过期判断
? ? ? ? 文件删除主要是由这个配置属性:fileReservedTime:文件保留时间。也就是从最后一次更新时间到现在,如果超过了该时间,则认为是过期文件, 可以删除。
另外还有其他两个配置参数:
? ? ? ?(1) deletePhysicFilesInterval:删除物理文件的时间间隔(默认是100MS),在一次定时任务触发时,可能会有多个物理文件超过过期时间可被删除,因此删除一个文件后需要间隔deletePhysicFilesInterval这个时间再删除另外一个文件,由于删除文件是一个非常耗费IO的操作,会引起消息插入消费的延迟(相比于正常情况下),所以不建议直接删除所有过期文件。
? ? ? ?(2) destroyMapedFileIntervalForcibly:在删除文件时,如果该文件还被线程引用,此时会阻止此次删除操作,同时将该文件标记不可用并且纪录当前时间戳destroyMapedFileIntervalForcibly这个表示文件在第一次删除拒绝后,文件保存的最大时间,在此时间内一直会被拒绝删除,当超过这个时间时,会将引用每次减少1000,直到引用 小于等于 0为止,即可删除该文件.
删除条件
1)指定删除文件的时间点, RocketMQ 通过 deleteWhen 设置一天的固定时间执行一次。删除过期文件操作, 默认为凌晨4点。
2)磁盘空间是否充足,如果磁盘空间不充足(DiskSpaceCleanForciblyRatio。磁盘空间强制删除文件水位。默认是85%),会触发过期文件删除操作。
另外还有RocketMQ的磁盘配置参数:
1:物理使用率大于diskSpaceWarningLevelRatio(默认90%可通过参数设置),则会阻止新消息的插入。
2:物理磁盘使用率小于diskMaxUsedSpaceRatio(默认75%) 表示磁盘使用正常。
零拷贝与MMAP
什么是零拷贝?
? ? ? ?零拷贝(英语: Zero-copy) 技术是指计算机执行操作时,CPU不需要先将数据从某处内存复制到另一个特定区域。这种技术通常用于通过网络传输文件时节省CPU周期和内存带宽。
?零拷贝技术可以减少数据拷贝和共享总线操作的次数,消除传输数据在存储器之间不必要的中间拷贝次数,从而有效地提高数据传输效率
?零拷贝技术减少了用户进程地址空间和内核地址空间之间因为上下文切换而带来的开销,可以看出没有说不需要拷贝,只是说减少冗余[不必要]的拷贝。下面这些组件、框架中均使用了零拷贝技术:Kafka、Netty、Rocketmq、Nginx、Apache。
传统数据传送机制
?读取文件,再用socket发送出去,实际经过四次copy。
伪码实现如下:
buffer = File.read()
Socket.send(buffer)
1、第一次:将磁盘文件,读取到操作系统内核缓冲区;
2、第二次:将内核缓冲区的数据,copy到应用程序的buffer;
3、第三步:将application应用程序buffer中的数据,copy到socket网络发送缓冲区(属于操作系统内核的缓冲区);
4、第四次:将socket buffer的数据,copy到网卡,由网卡进行网络传输。
? 传统的数据传送所消耗的成本:4次拷贝,4次上下文切换。4次拷贝,其中两次是DMA copy,两次是CPU copy。
mmap内存映射
? ? ? ? ? 硬盘上文件的位置和应用程序缓冲区(application buffers)进行映射(建立一种一一对应关系),由于mmap()将文件直接映射到用户空间,所以实际文件读取时根据这个映射关系,直接将文件从硬盘拷贝到用户空间,只进行了一次数据拷贝,不再有文件内容从硬盘拷贝到内核空间的一个缓冲区。mmap内存映射将会经历:3次拷贝: 1次cpu copy,2次DMA copy;
? ? ? ? ? mmap()是在?<sys/mman.h>?中定义的一个函数,此函数的作用是创建一个新的?虚拟内存?区域,并将指定的对象映射到此区域。?mmap?其实就是通过?内存映射?的机制来进行文件操作。
RocketMQ中MMAP运用
? ? ? ? ?传统的方式进行数据传送,那肯定性能上不去,所以RocketMQ使用的是MMAP。RocketMQ一个映射文件大概是,commitlog 文件默认大小为1G。采用MappedByteBuffer这种内存映射的方式有几个限制,其中之一是一次只能映射1.5~2G 的文件至用户态的虚拟内存,这也是为何RocketMQ默认设置单个CommitLog日志数据文件为1G的原因了。
RocketMQ源码中的MMAP运用
RocketMQ存储整体设计总结
消息生产与消息消费相互分离
? ? ? ? ? Producer端发送消息最终写入的是CommitLog(消息存储的日志数据文件),Consumer端先从ConsumeQueue(消息逻辑队列)读取持久化消息的起始物理位置偏移量offset、大小size和消息Tag的HashCode值,随后再从CommitLog中进行读取待拉取消费消息的真正实体内容部分;
RocketMQ的CommitLog文件采用混合型存储
? ? ? ? ? 所有的Topic下的消息队列共用同一个CommitLog的日志数据文件,并通过建立类似索引文件ConsumeQueue的方式来区分不同Topic下面的不同MessageQueue的消息,同时为消费消息起到一定的缓冲作用(异步服务线生成了ConsumeQueue队列的信息后,Consumer端才能进行消费)。这样,只要消息写入并刷盘至CommitLog文件后,消息就不会丢失,即使ConsumeQueue中的数据丢失,也可以通过CommitLog来恢复。
RocketMQ每次读写文件的时候真的是完全顺序读写吗?
? ? ? ? ? ?发送消息时,生产者端的消息确实是顺序写入CommitLog;订阅消息时,消费者端也是顺序读取ConsumeQueue,然而根据其中的起始物理位置偏移量offset读取消息真实内容却是随机读取CommitLog。所以在RocketMQ集群整体的吞吐量、并发量非常高的情况下,随机读取文件带来的性能开销影响还是比较大的。