当前位置: 首页>移动开发>正文

RocketMQ学习笔记,一文弄懂相关知识与概念

RocketMQ是阿里开源的分布式消息中间件,跟其它中间件相比,RocketMQ的特点是纯JAVA实现集群和HA实现相对简单在发生宕机和其它故障时消息丢失率更低

RocketMQ学习笔记,一文弄懂相关知识与概念,第1张

RocketMQ 架构图

RocketMQ学习笔记,一文弄懂相关知识与概念,第2张

RocketMQ 部署结构图

一、NameServer

NameServer可以看做是RocketMQ的注册中心【为什么不用开源注册于发现中心,例如zk,nacos等】;它管理两部分数据:集群的Topic-Queue的路由配置;Broker的实时配置信息。其它模块通过Nameserv提供的接口获取最新的Topic配置和路由信息。

Producer/Consumer :通过查询接口获取Topic对应的Broker的地址信息

Broker : 注册配置信息到NameServer, 实时更新Topic信息到NameServer

NameServer 的特点

1、Name Server是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。

2、每个Broker与Name Server集群中的所有节点建立长连接,定时注册Topic信息到所有Name Server。

3、Producer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息。

4、Consumer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息。

这里面最核心的是每个Broker与Name Server集群中的所有节点建立长连接这样做好处多多。

1、这样可以使Name Server之间可以没有任何关联,因为它们绑定的Broker是一致的。

2、作为Producer或者Consumer可以绑定任何一个Name Server 因为它们都是一样的。

二、Broker

Broker是RocketMQ的核心模块,负责接收并存储消息,同时提供Push/Pull接口来将消息发送给Consumer。Consumer可选择从Master或者Slave读取数据。多个主/从组成Broker集群,集群内的Master节点之间不做数据交互。Broker同时提供消息查询的功能,可以通过MessageID和MessageKey来查询消息。Borker会将自己的Topic配置信息实时同步到NameServer。

1、Broker与Name Server关系

1)连接 单个Broker和所有Name Server保持长连接。

2)心跳

心跳间隔:每隔30秒向所有NameServer发送心跳,心跳包含了自身的Topic配置信息。

心跳超时:NameServer每隔10秒,扫描所有还存活的Broker连接,若某个连接2分钟内没有发送心跳数据,则断开连接。

3)断开:当Broker挂掉;NameServer会根据心跳超时主动关闭连接,一旦连接断开,会更新Topic与队列的对应关系,但不会通知生产者和消费者。

2、 负载均衡

一个Topic分布在多个Broker上,一个Broker可以配置多个Topic,它们是多对多的关系。

如果某个Topic消息量很大,应该给它多配置几个Queue,并且尽量多分布在不同Broker上,减轻某个Broker的压力。

3 、可用性

由于消息分布在各个Broker上,一旦某个Broker宕机,则该Broker上的消息读写都会受到影响。

所以RocketMQ提供了Master/Slave的结构,Salve定时从Master同步数据,如果Master宕机,则Slave提供消费服务,但是不能写入消息,此过程对应用透明,由RocketMQ内部解决。

有两个关键点:

思考1一旦某个broker master宕机,生产者和消费者多久才能发现?

受限于Rocketmq的网络连接机制,默认情况下最多需要30秒,因为消费者每隔30秒从nameserver获取所有topic的最新队列情况,这意味着某个broker如果宕机,客户端最多要30秒才能感知。

思考2 master恢复恢复后,消息能否恢复。

消费者得到Master宕机通知后,转向Slave消费,但是Slave不能保证Master的消息100%都同步过来了,因此会有少量的消息丢失。但是消息最终不会丢的,一旦Master恢复,未同步过去的消息会被消费掉。

三、Topic

Topic用于将消息按主题做划分,Producer将消息发往指定的Topic,Consumer订阅该Topic就可以收到这条消息。Topic跟发送方和消费方都没有强关联关系,发送方可以同时往多个Topic投放消息,消费方也可以订阅多个Topic的消息。在RocketMQ中,Topic是一个上逻辑概念。消息存储不会按Topic分开

1、Topic

Topic是一个逻辑上的概念,实际上Message是在每个Broker上以Queue的形式记录。

2、Queue

Topic和Queue是多对多的关系一个Topic下可以包含多个Queue,主要用于负载均衡。发送消息时,用户只指定Topic,Producer会根据Topic的路由信息选择具体发到哪个Queue上。Consumer订阅消息时,会根据负载均衡策略决定订阅哪些Queue的消息。

3、Offset

RocketMQ在存储消息时会为每个Topic下的每个Queue生成一个消息的索引文件,每个Queue都对应一个Offset记录当前Queue中消息条数

4、Tag

标签可以被认为是对 Topic 进一步细化。一般在相同业务模块中通过引入标签来标记不同用途的消息

RocketMQ学习笔记,一文弄懂相关知识与概念,第3张

从上图我们可以得出以下几条结论

1、消费者发送的Message会在Broker中的Queue队列中记录。

2、一个Topic的数据可能会存在多个Broker中。

3、一个Broker存在多个Queue。

4、单个的Queue也可能存储多个Topic的消息。

也就是说每个Topic在Broker上会划分成几个逻辑队列,每个逻辑队列保存一部分消息数据,但是保存的消息数据实际上不是真正的消息数据,而是指向commit log的消息索引。

Queue不是真正存储Message的地方,真正存储Message的地方是在CommitLog。

RocketMQ学习笔记,一文弄懂相关知识与概念,第4张

左边的是CommitLog。这个是真正存储消息的地方。RocketMQ所有生产者的消息都是往这一个地方存的。

右边是ConsumeQueue。这是一个逻辑队列。和上文中Topic下的Queue是一一对应的。消费者是直接和ConsumeQueue打交道。ConsumeQueue记录了消费位点,这个消费位点关联了commitlog的位置。所以即使ConsumeQueue出问题,只要commitlog还在,消息就没丢,可以恢复出来。还可以通过修改消费位点来重放或跳过一些消息。

四、Consumer

消息消费者,位于用户进程内。Consumer通过NameServer获取所有broker的路由信息后,向Broker发送Pull请求来获取消息数据。Consumer可以以两种模式启动,广播(Broadcast)和集群(Cluster)广播模式下,一条消息会发送给所有Consumer,集群模式下消息只会发送给一个Consumer

消费者组,和生产者类似,消费同一类消息的多个 Consumer 实例组成一个消费者组。

1 、Consumer与Name Server关系

1)连接 : 单个Consumer和一台NameServer保持长连接,如果该NameServer挂掉,消费者会自动连接下一个NameServer,直到有可用连接为止,并能自动重连。

2)心跳: 与NameServer没有心跳

3)轮询时间 : 默认情况下,消费者每隔30秒从NameServer获取所有Topic的最新队列情况,这意味着某个Broker如果宕机,客户端最多要30秒才能感知。

2、 Consumer与Broker关系

1)连接 :单个消费者和该消费者关联的所有broker保持长连接。

3、负载均衡

集群消费模式下,一个消费者集群多台机器共同消费一个Topic的多个队列,一个队列只会被一个消费者消费。如果某个消费者挂掉,分组内其它消费者会接替挂掉的消费者继续消费。

4、消息传送模式

拉模式:消费端一定的频率主动发起拉消息请求,获得服务端消息相应,再按照相同的频率发起拉消息请求。现在官方已经不推荐使用这种模式了。这个时候一定处理好拉取频率的设定,避免下面两种情况的出现。

消息堆积,服务端每分钟接受10万消息,消费端每分钟只能消息5万条消息,服务端忙,而消费者处理不过来。

消息空缺,服务端每小时接受到一条消息,消费者每次都去拉取消失,只能获取空消息,浪费计算机和网络资源。

推模式:消息到达消息服务器后,推送消息给消费者。 RocketMQ消息推模式并不是真正意义上的服务端将消息推送给消费端。本质实现为消费端消息拉取线程在从服务器拉取到一批消息后,然后提交到消息消费线程池后,又“马不停蹄”的继续向服务器再次尝试拉取消息,再提交到消息消费线程池,然后由消费端消费线程异步的从消费线程池获取消息进行消息消费,解耦了消息拉取和消息消费过程。如果未拉取到消息,则延迟一下又继续拉取。

5、顺序消息

顺序消息是指消息消费的顺序和生产者发送消息的顺序一样的。

分区有序:分区有序是指这个Topic下这个队列下的消息是有顺序的,生产者发送消息的时候,将严格按照消息的顺序,将消息们发送到一个Topic下的一个队列,从而保证了生产者分区消息有序,消费者进行消费时,进行单线程单队列消费,保证了消费有序。

适用场景:性能要求高,以 sharding key 作为分区字段,在同一个队列中严格的按照 FIFO 原则进行消息发布和消费的场景。sharding key 比如订单Id,一个订单的创建、付款、完成有序的,根据算法将这个订单的所有事件发送到同一个队列中去。

全局有序:全局有序是指某个Topic下的所有消息都要保证顺序,可以通过一个Topic只有一个消息队列,保证了全局有序,实际上市分区有序的变种。

适用场景:性能要求不高,所有的消息严格按照 FIFO 原则进行消息发布和消费的场景。

6、延时消息

producer端设置消息delayLevel延迟级别,消息属性DELAY中存储了对应了延时级别broker端收到消息后,判断延时消息延迟级别,如果大于0,则备份消息原始topic,queueId,并将消息topic改为延时消息队列特定topic(SCHEDULE_TOPIC),queueId改为延时级别-1

mq服务端ScheduleMessageService中,为每一个延迟级别单独设置一个定时器,定时(每隔1秒)拉取对应延迟级别的消费队列

根据消费偏移量offset从commitLog中解析出对应消息

从消息tagsCode中解析出消息应当被投递的时间,与当前时间做比较,判断是否应该进行投递

若到达了投递时间,则构建一个新的消息,并从消息属性中恢复出原始的topic,queueId,并清除消息延迟属性,从新进行消息投递

五、Producer

消息生产者,位于用户的进程内,Producer通过NameServer获取所有Broker的路由信息,根据负载均衡策略选择将消息发到哪个Broker,然后调用Broker接口提交消息。

生产者组,简单来说就是多个发送同一类消息的生产者称之为一个生产者组。

1、 Producer与Name Server关系

1)连接单个Producer和一台NameServer保持长连接,如果该NameServer挂掉,生产者会自动连接下一个NameServer,直到有可用连接为止,并能自动重连。

2)轮询时间默认情况下,生产者每隔30秒从NameServer获取所有Topic的最新队列情况,这意味着某个Broker如果宕机,生产者最多要30秒才能感知,在此期间,

发往该broker的消息发送失败。

3)心跳与nameserver没有心跳

2、与broker关系

连接单个生产者和该生产者关联的所有broker保持长连接。

六、消息发送

RocketMQ学习笔记,一文弄懂相关知识与概念,第5张

RocketMQ 流程图

RocketMQ支持3种消息发送方式: 同步(sync)、异步(async)、单向(oneway)。

同步: 发送者向MQ执行发送消息API时,同步等待,直到消息服务器返回发送结果。

异步: 发送者向MQ执行发送消息API时,指定消息发送成功后的回调函数,然后调用消息发送API后,立即返回,消息发送者线程不阻塞,直到运行结束,消息发送成功或失败的回调任务在一个新的线程中执行。

单向: 消息发送者向MQ执行发送消息API时,直接返回,不等待消息服务器的结果,也不注册回调函数,简单地说,就是只管发,不在乎消息是否成功存储在消息服务器上。

批量消息

//判断延时级别,如果大于0抛出异常,原因为:批量消息发送不支持延时

throw new UnsupportedOperationException("TimeDelayLevel is not supported for batching");

//判断topic是否以 **"%RETRY%"** 开头,如果是, 则抛出异常,原因为:批量发送消息不支持消息重试

throw new UnsupportedOperationException("Retry Group is not supported for batching");

//判断集合中的每个Message的topic与批量发送topic是否一致,如果不一致则抛出异常,原因为:批量消息中的每个消息实体的Topic要和批量消息整体的topic保持一致。

throw new UnsupportedOperationException("The topic of the messages in one batch should be the same");

//判断批量消息的首个Message与其他的每个Message实体的等待消息存储状态是否相同,如果不同则报错,原因为:批量消息中每个消息的waitStoreMsgOK状态均应该相同。

throw new UnsupportedOperationException("The waitStoreMsgOK of the messages in one batch should the same");

参考地址:

RocketMQ消息发送:https://www.jianshu.com/p/4075c91cad66

RocketMQ批量消息机制:https://www.jianshu.com/p/8c0fa3cc0796

七、消息存储

当前业界几款主流的MQ消息队列采用的存储方式主要有以下三种方式:

(1)分布式KV存储:这类MQ一般会采用诸如levelDB、RocksDB和Redis来作为消息持久化的方式,由于分布式缓存的读写能力要优于DB,所以在对消息的读写能力要求都不是比较高的情况下,采用这种方式倒也不失为一种可以替代的设计方案。消息存储于分布式KV需要解决的问题在于如何保证MQ整体的可靠性?

(2)文件系统:目前业界较为常用的几款产品(RocketMQ/Kafka/RabbitMQ)均采用的是消息刷盘至所部署虚拟机/物理机的文件系统来做持久化(刷盘一般可以分为异步刷盘和同步刷盘两种模式)。小编认为,消息刷盘为消息存储提供了一种高效率、高可靠性和高性能的数据持久化方式。除非部署MQ机器本身或是本地磁盘挂了,否则一般是不会出现无法持久化的故障问题。

(3)关系型数据库DB:Apache下开源的另外一款MQ—ActiveMQ(默认采用的KahaDB做消息存储)可选用JDBC的方式来做消息持久化,通过简单的xml配置信息即可实现JDBC消息存储。由于,普通关系型数据库(如Mysql)在单表数据量达到千万级别的情况下,其IO读写性能往往会出现瓶颈。因此,如果要选型或者自研一款性能强劲、吞吐量大、消息堆积能力突出的MQ消息队列,那么小编并不推荐采用关系型数据库作为消息持久化的方案。在可靠性方面,该种方案非常依赖DB,如果一旦DB出现故障,则MQ的消息就无法落盘存储会导致线上故障;

因此,综合上所述从存储效率来说, 文件系统>分布式KV存储>关系型数据库DB,直接操作文件系统肯定是最快和最高效的,而关系型数据库TPS一般相比于分布式KV系统会更低一些(简略地说,关系型数据库本身也是一个需要读写文件server,这时MQ作为client与其建立连接并发送待持久化的消息数据,同时又需要依赖DB的事务等,这一系列操作都比较消耗性能),所以如果追求高效的IO读写,那么选择操作文件系统会更加合适一些。但是如果从易于实现和快速集成来看,关系型数据库DB>分布式KV存储>文件系统,但是性能会下降很多。

另外,从消息中间件的本身定义来考虑,应该尽量减少对于外部第三方中间件的依赖。一般来说依赖的外部系统越多,也会使得本身的设计越复杂,所以小编个人的理解是采用文件系统作为消息存储的方式,更贴近消息中间件本身的定义。

RokcetMQ存储设计架构

RocketMQ学习笔记,一文弄懂相关知识与概念,第6张

1、RocketMQ消息存储结构类型及缺点

上图即为RocketMQ的消息存储整体架构,RocketMQ采用的是混合型的存储结构,即为Broker单个实例下所有的队列共用一个日志数据文件(即为CommitLog)来存储。而Kafka采用的是独立型的存储结构,每个队列一个文件。RocketMQ采用混合型存储结构的缺点在于,会存在较多的随机读操作,因此读的效率偏低。同时消费消息需要依赖ConsumeQueue,构建该逻辑消费队列需要一定开销。

(1)优点:a、ConsumeQueue消息逻辑队列较为轻量级;

b、对磁盘的访问串行化,避免磁盘竟争,不会因为队列增加导致IOWAIT增高;

(2)缺点:a、对于CommitLog来说写入消息虽然是顺序写,但是读却变成了完全的随机读;

b、Consumer端订阅消费一条消息,需要先读ConsumeQueue,再读Commit Log,一定程度上增加了开销;

2、RocketMQ消息存储架构深入分析

从上面的整体架构图中可见,RocketMQ的混合型存储结构针对Producer和Consumer分别采用了数据和索引部分相分离的存储结构,Producer发送消息至Broker端,然后Broker端使用同步或者异步的方式对消息刷盘持久化,保存至CommitLog中。只要消息被刷盘持久化至磁盘文件CommitLog中,那么Producer发送的消息就不会丢失。正因为如此,Consumer也就肯定有机会去消费这条消息,至于消费的时间可以稍微滞后一些也没有太大的关系。退一步地讲,即使Consumer端第一次没法拉取到待消费的消息,Broker服务端也能够通过长轮询机制等待一定时间延迟后再次发起拉取消息的请求。

这里,RocketMQ的具体做法是,使用Broker端的后台服务线程—ReputMessageService不停地分发请求并异步构建ConsumeQueue(逻辑消费队列)和IndexFile(索引文件)数据。然后,Consumer即可根据ConsumerQueue来查找待消费的消息了。其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。而IndexFile(索引文件)则只是为了消息查询提供了一种通过key或时间区间来查询消息的方法(ps:这种通过IndexFile来查找消息的方法不影响发送与消费消息的主流程)。

3、PageCache与Mmap内存映射

这里有必要先稍微简单地介绍下page cache的概念。系统的所有文件I/O请求,操作系统都是通过page cache机制实现的。对于操作系统来说,磁盘文件都是由一系列的数据块顺序组成,数据块的大小由操作系统本身而决定,x86的linux中一个标准页面大小是4KB。

操作系统内核在处理文件I/O请求时,首先到page cache中查找(page cache中的每一个数据块都设置了文件以及偏移量地址信息),如果未命中,则启动磁盘I/O,将磁盘文件中的数据块加载到page cache中的一个空闲块,然后再copy到用户缓冲区中。

page cache本身也会对数据文件进行预读取,对于每个文件的第一个读请求操作,系统在读入所请求页面的同时会读入紧随其后的少数几个页面。因此,想要提高page cache的命中率(尽量让访问的页在物理内存中),从硬件的角度来说肯定是物理内存越大越好。从操作系统层面来说,访问page cache时,即使只访问1k的消息,系统也会提前预读取更多的数据,在下次读取消息时, 就很可能可以命中内存。

在RocketMQ中,ConsumeQueue逻辑消费队列存储的数据较少,并且是顺序读取,在page cache机制的预读取作用下,Consume Queue的读性能会比较高近乎内存,即使在有消息堆积情况下也不会影响性能。而对于CommitLog消息存储的日志数据文件来说,读取消息内容时候会产生较多的随机访问读取,严重影响性能。如果选择合适的系统IO调度算法,比如设置调度算法为“Noop”(此时块存储采用SSD的话),随机读的性能也会有所提升。

另外,RocketMQ主要通过MappedByteBuffer对文件进行读写操作。其中,利用了NIO中的FileChannel模型直接将磁盘上的物理文件直接映射到用户态的内存地址中(这种Mmap的方式减少了传统IO将磁盘文件数据在操作系统内核地址空间的缓冲区和用户应用程序地址空间的缓冲区之间来回进行拷贝的性能开销),将对文件的操作转化为直接对内存地址进行操作,从而极大地提高了文件的读写效率(这里需要注意的是,采用MappedByteBuffer这种内存映射的方式有几个限制,其中之一是一次只能映射1.5~2G 的文件至用户态的虚拟内存,这也是为何RocketMQ默认设置单个CommitLog日志数据文件为1G的原因了)。

4、RocketMQ文件存储模型层次结构

RocketMQ学习笔记,一文弄懂相关知识与概念,第7张

RocketMQ文件存储模型结构

RocketMQ文件存储模型层次结构如上图所示,根据类别和作用从概念模型上大致可以划分为5层,下面将从各个层次分别进行分析和阐述:

(1)RocketMQ业务处理器层:Broker端对消息进行读取和写入的业务逻辑入口,这一层主要包含了业务逻辑相关处理操作(根据解析RemotingCommand中的RequestCode来区分具体的业务操作类型,进而执行不同的业务处理流程),比如前置的检查和校验步骤、构造MessageExtBrokerInner对象、decode反序列化、构造Response返回对象等;

(2)RocketMQ数据存储组件层;该层主要是RocketMQ的存储核心类—DefaultMessageStore,其为RocketMQ消息数据文件的访问入口,通过该类的“putMessage()”和“getMessage()”方法完成对CommitLog消息存储的日志数据文件进行读写操作(具体的读写访问操作还是依赖下一层中CommitLog对象模型提供的方法);另外,在该组件初始化时候,还会启动很多存储相关的后台服务线程,包括AllocateMappedFileService(MappedFile预分配服务线程)、ReputMessageService(回放存储消息服务线程)、HAService(Broker主从同步高可用服务线程)、StoreStatsService(消息存储统计服务线程)、IndexService(索引文件服务线程)等;

(3)RocketMQ存储逻辑对象层:该层主要包含了RocketMQ数据文件存储直接相关的三个模型类IndexFile、ConsumerQueue和CommitLog。IndexFile为索引数据文件提供访问服务,ConsumerQueue为逻辑消息队列提供访问服务,CommitLog则为消息存储的日志数据文件提供访问服务。这三个模型类也是构成了RocketMQ存储层的整体结构(对于这三个模型类的深入分析将放在后续篇幅中);

(4)封装的文件内存映射层:RocketMQ主要采用JDK NIO中的MappedByteBuffer和FileChannel两种方式完成数据文件的读写。其中,采用MappedByteBuffer这种内存映射磁盘文件的方式完成对大文件的读写,在RocketMQ中将该类封装成MappedFile类。这里限制的问题在上面已经讲过;对于每类大文件(IndexFile/ConsumerQueue/CommitLog),在存储时分隔成多个固定大小的文件(单个IndexFile文件大小约为400M、单个ConsumerQueue文件大小约5.72M、单个CommitLog文件大小为1G),其中每个分隔文件的文件名为前面所有文件的字节大小数+1,即为文件的起始偏移量,从而实现了整个大文件的串联。这里,每一种类的单个文件均由MappedFile类提供读写操作服务(其中,MappedFile类提供了顺序写/随机读、内存数据刷盘、内存清理等和文件相关的服务);

(5)磁盘存储层:主要指的是部署RocketMQ服务器所用的磁盘。这里,需要考虑不同磁盘类型(如SSD或者普通的HDD)特性以及磁盘的性能参数(如IOPS、吞吐量和访问时延等指标)对顺序写/随机读操作带来的

5、RocketMQ存储相关的模型与封装类简析

(1)CommitLog:消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容。单个文件大小默认1G ,文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件;

(2) ConsumeQueue:消息消费的逻辑队列,其中包含了这个MessageQueue在CommitLog中的起始物理位置偏移量offset,消息实体内容的大小和Message Tag的哈希值。从实际物理存储来说,ConsumeQueue对应每个Topic和QueuId下面的文件。单个文件大小约5.72M,每个文件由30W条数据组成,每个文件默认大小为600万个字节,当一个ConsumeQueue类型的文件写满了,则写入下一个文件;

(3)IndexFile:用于为生成的索引文件提供访问服务,通过消息Key值查询消息真正的实体内容。在实际的物理存储上,文件名则是以创建时的时间戳命名的,固定的单个IndexFile文件大小约为400M,一个IndexFile可以保存 2000W个索引;

(4)MapedFileQueue:对连续物理存储的抽象封装类,源码中可以通过消息存储的物理偏移量位置快速定位该offset所在MappedFile(具体物理存储位置的抽象)、创建、删除MappedFile等操作;

(5)MappedFile:文件存储的直接内存映射业务抽象封装类,源码中通过操作该类,可以把消息字节写入PageCache缓存区(commit),或者原子性地将消息持久化的刷盘(flush);

6、RocketMQ消息刷盘的主要过程

在RocketMQ中消息刷盘主要可以分为同步刷盘和异步刷盘两种。

RocketMQ学习笔记,一文弄懂相关知识与概念,第8张

(1)同步刷盘:如上图所示,只有在消息真正持久化至磁盘后,RocketMQ的Broker端才会真正地返回给Producer端一个成功的ACK响应。同步刷盘对MQ消息可靠性来说是一种不错的保障,但是性能上会有较大影响,一般适用于金融业务应用领域。RocketMQ同步刷盘的大致做法是,基于生产者消费者模型,主线程创建刷盘请求实例—GroupCommitRequest并在放入刷盘写队列后唤醒同步刷盘线程—GroupCommitService,来执行刷盘动作(其中用了CAS变量和CountDownLatch来保证线程间的同步)。这里,RocketMQ源码中用读写双缓存队列(requestsWrite/requestsRead)来实现读写分离,其带来的好处在于内部消费生成的同步刷盘请求可以不用加锁,提高并发度。

(2)异步刷盘:能够充分利用OS的PageCache的优势,只要消息写入PageCache即可将成功的ACK返回给Producer端。消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了MQ的性能和吞吐量。异步和同步刷盘的区别在于,异步刷盘时,主线程并不会阻塞,在将刷盘线程wakeup后,就会继续执行。

7、过期文件删除

https://zhuanlan.zhihu.com/p/345127767

八、主从同步(HA)机制

RocketMQ学习笔记,一文弄懂相关知识与概念,第9张

1、首先启动Master并在指定接口监听

2、Slave启动,主动连接Master,建立TCP连接

3、Slave以5秒的时间间隔向Mater拉取消息,如果是第一次拉取的话,先获取本地commitLog文件中的最大偏移量,以该offset向master拉取

4、master收到请求,并返回请求offset的数据给slave

5、slave收到一批消息后,将消息写入到本地commitLog文件中,然后向master汇报拉取进度,并更新下一次待拉取偏移量

6、重复执行3-5

rocketMq的主从不是传统意义上的主从,他不具备主从切换,也就是说,从永远不会变成主。当主节点宕机后,从不会接管消息发送、消息存储,只提供消息的读取。

也就是说,生产者无法发消息了,消费者还可以继续接收队列中未消费的消息。

1、master挂了,将不能写入消息,producer会发送失败。consumer开始从Slave读消息,此时消息的offset以Slave从Master同步的为准。

2、然后Master恢复了,消息消费将转移到Master,但是此时Master的offset可能是过期的,如何更新呢?此时有两种情况

A:Slave的定时任务从Master同步offset,发现master的offset是过期的,直接丢弃;直到offset不是过期的。

B:由于Consumer保存着最新的offset,consumer向master拉取数据时,master把来自consumer的offset和自己的offset对比,发现自己offset已过期,则直接更新自己的offset;然后返回数据。

PS:

1、如果同时consumer也挂了,offset也丢失了,则只有从过期的offset开始重新消费

2、集群模式下,由于offset是共享在远程的存储在Broker,重启后可以恢复

九、RocketMQ事务消息

RocketMQ事务消息实现原理基于两阶段提交和定时事务状态回查来决定消息最终是否提交还是回滚。

1、应用程序在事务内完成相关业务数据落库后,需要同步调用 RocketMQ消息发送接口,发送状态为 prepare的消息。消息发送成功后, RocketMQ服务器会回调 RocketMQ消息发送者的事件监听程序,记录消息的本地事务状态,该相关标记与本地业务操作同属个事务,确保消息发送与本地事务的原子性。

2、RocketMQ在收到类型为 prepare的消息时,会首先备份消息的原主题与原消息消费队列,然后将消息存储在主题为 RMQ_SYS_TRANS_HALF_TOPIC的消息消费队列中。

3、 RocketMQ消息服务器开启一个定时任务,消费 RMQ_SYS_TRANS_HALF_TOPIC的消息,向消息发送端(应用程序)发起消息事务状态回查,应用程序根据保存的事务状态回馈消息服务器事务的状态(提交、回滚、未知),如果提交或回滚,则消息服务器提交或回滚消息,如果是未知,则下一次在回查,超过配置的默认最大回查次数依然是未知状态,则默认回滚


https://www.xamrdz.com/mobile/4t41997585.html

相关文章: