1.MQ的作用、优缺点及对比
MQ的作用主要有以下三个方面:
- 异步
作用:异步能提高系统的响应速度、吞吐量。 - 解耦
作用:
1、服务之间进行解耦,才可以减少服务之间的影响。提高系统整体的稳定性以及可扩展性。
2、另外,解耦后可以实现数据分发。生产者发送一个消息后,可以由一个或者多个消费者进行消费,并且消费者的增加或者减少对生产者没有影响。 - 削峰
作用:以稳定的系统资源应对突发的流量冲击。
引入MQ的缺点:
- 系统可用性降低
系统引入的外部依赖增多,系统的稳定性就会变差。一旦MQ宕机,对业务会产生影响。这就需要考虑如何保证MQ的高可用。 - 系统复杂度提高
引入MQ后系统的复杂度会大大提高。以前服务之间可以进行同步的服务调用,引入MQ后,会变为异步调用,数据的链路就会变得更复杂。并且还会带来其他一些问题。比如:如何保证消费不会丢失?不会被重复调用?怎么保证消息的顺序性等问题。 - 消息一致性问题
A系统处理完业务,通过MQ发送消息给B、C系统进行后续的业务处理。如果B系统处理成功,C系统处理失败怎么办?这就需要考虑如何保证消息数据处理的一致性。
2.RocketMQ的结构
RocketMQ由以下这几个组件组成
- NameServer : 提供轻量级的Broker路由服务。
- Broker:实际处理消息存储、转发等服务的核心组件。
- Producer:消息生产者集群。通常是业务系统中的一个功能模块。
- Consumer:消息消费者集群。通常也是业务系统中的一个功能模块。
3.生产者
- 消息发送者的固定步骤
1.创建消息生产者producer,并制定生产者组名
2.指定Nameserver地址
3.启动producer
4.创建消息对象,指定主题Topic、Tag和消息体
5.发送消息
6.关闭生产者producer
消息生产者分别通过三种方式发送消息,同步发送、异步发送以及单向发送。
- 1、同步发送消息Producer
- 2、异步发送AsyncProducer
- 3、单向发送消息producer.sendOneWay
3.1 生产者负载均衡
Producer发送消息时,默认会轮询目标Topic下的所有MessageQueue,并采用递增取模的方式往不同的MessageQueue上发送消息,以达到让消息平均落在不同的queue上的目的。而由于MessageQueue是分布在不同的Broker上的,所以消息也会发送到不同的broker上。
同时生产者在发送消息时,可以指定一个MessageQueueSelector。通过这个对象来将消息发送到自己指定的MessageQueue上。这样可以保证消息局部有序。
4.消费者
- 消息消费者的固定步骤
1.创建消费者Consumer,制定消费者组名
2.指定Nameserver地址
3.订阅主题Topic和Tag
4.设置回调函数,处理消息
5.启动消费者consumer
消费者消费消息有两种模式,一种是消费者主动去Broker上拉取消息的拉模式,另一种是消费者等待Broker把消息推送过来的推模式。
- 拉模式:DefaultMQPullConsumerImpl这个消费者类已标记为过期,但是还是可以使用的。替换的类是DefaultLitePullConsumerImpl。
- 推模式:实际上RocketMQ的推模式也是由拉模式封装出来的。
4.1 消费者负载均衡
Consumer也是以MessageQueue为单位来进行负载均衡。分为集群模式和广播模式。
1、集群模式
在集群消费模式下,每条消息只需要投递到订阅这个topic的Consumer Group下的一个实例即可。RocketMQ采用主动拉取的方式拉取并消费消息,在拉取的时候需要明确指定拉取哪一条message queue。
而每当实例的数量有变更,都会触发一次所有实例的负载均衡,这时候会按照queue的数量和实例的数量平均分配queue给每个实例。
每次分配时,都会将MessageQueue和消费者ID进行排序后,再用不同的分配算法进行分配。内置的分配的算法共有六种,分别对应AllocateMessageQueueStrategy下的六种实现类,可以在consumer中直接set来指定。默认情况下使用的是最简单的平均分配策略。
- AllocateMachineRoomNearby: 将同机房的Consumer和Broker优先分配在一起。
这个策略可以通过一个machineRoomResolver对象来定制Consumer和Broker的机房解析规则。然后还需要引入另外一个分配策略来对同机房的Broker和Consumer进行分配。一般也就用简单的平均分配策略或者轮询分配策略。 - AllocateMessageQueueAveragely:平均分配。将所有MessageQueue平均分给每一个消费者
- AllocateMessageQueueAveragelyByCircle: 轮询分配。轮流的给一个消费者分配一个MessageQueue。
- AllocateMessageQueueByConfig: 不分配,直接指定一个messageQueue列表。类似于广播模式,直接指定所有队列。
- AllocateMessageQueueByMachineRoom:按逻辑机房的概念进行分配。又是对BrokerName和ConsumerIdc有定制化的配置。
- AllocateMessageQueueConsistentHash。源码中有测试代码AllocateMessageQueueConsitentHashTest。这个一致性哈希策略只需要指定一个虚拟节点数,是用的一个哈希环的算法,虚拟节点是为了让Hash数据在换上分布更为均匀。
2、广播模式
广播模式下,每一条消息都会投递给订阅了Topic的所有消费者实例,所以也就没有消息分配这一说。而在实现上,就是在Consumer分配Queue时,所有Consumer都分到所有的Queue。
广播模式实现的关键是将消费者的消费偏移量不再保存到broker当中,而是保存到客户端当中,由客户端自行维护自己的消费偏移量。
4.2 消息重试
首先对于广播模式的消息, 是不存在消息重试的机制的,即消息消费失败后,不会再重新进行发送,而只是继续消费新的消息。而对于普通的消息,当消费者消费消息失败后,你可以通过设置返回状态达到消息重试的结果。
如何让消息进行重试
集群消费方式下,消息消费失败后期望消息重试,需要在消息监听器接口的实现中明确进行配置。可以有三种配置方式:
- 返回Action.ReconsumeLater-推荐
- 返回null
- 抛出异常
重试消息如何处理
重试的消息会进入一个 “%RETRY%”+ConsumeGroup 的队列中。
然后RocketMQ默认允许每条消息最多重试16次,每次重试的间隔时间如下
重试次数:
如果消息重试16次后仍然失败,消息将不再投递。转为进入死信队列。
另外一条消息无论重试多少次,这些重试消息的MessageId始终都是一样的。
然后关于这个重试次数,RocketMQ可以进行定制。例如通过consumer.setMaxReconsumeTimes(20);将重试次数设定为20次。当定制的重试次数超过16次后,消息的重试时间间隔均为2小时。
关于MessageId:
在老版本的RocketMQ中,一条消息无论重试多少次,这些重试消息的MessageId始终都是一样的
但是在4.9.1版本中,每次重试MessageId都会重建。
配置覆盖:
消息最大重试次数的设置对相同GroupID下的所有Consumer实例有效。并且最后启动的Consumer会覆盖之前启动的Consumer的配置。
4.3 死信队列
当一条消息消费失败,RocketMQ就会自动进行消息重试。而如果消息超过最大重试次数,RocketMQ就会认为这个消息有问题。但是此时,RocketMQ不会立刻将这个有问题的消息丢弃,而会将其发送到这个消费者组对应的一种特殊队列:死信队列。
RocketMQ默认的重试次数是16次。见源码org.apache.rocketmq.common.subscription.SubscriptionGroupConfig中的retryMaxTimes属性。
这个重试次数可以在消费者端进行配置。 例如 DefaultMQPushConsumer实例中有个setMaxReconsumeTimes方法指定重试次数。
死信队列的名称是%DLQ%+ConsumGroup
死信队列的特征:
- 一个死信队列对应一个ConsumGroup,而不是对应某个消费者实例。
- 如果一个ConsumeGroup没有产生死信队列,RocketMQ就不会为其创建相应的死信队列。
- 一个死信队列包含了这个ConsumeGroup里的所有死信消息,而不区分该消息属于哪个Topic。
- 死信队列中的消息不会再被消费者正常消费。
- 死信队列的有效期跟正常消息相同。默认3天,对应broker.conf中的fileReservedTime属性。超过这个最长时间的消息都会被删除,而不管消息是否消费过。
通常,一条消息进入了死信队列,意味着消息在消费处理的过程中出现了比较严重的错误,并且无法自行恢复。此时,一般需要人工去查看死信队列中的消息,对错误原因进行排查。然后对死信消息进行处理,比如转发到正常的Topic重新进行消费,或者丢弃。
注:默认创建出来的死信队列,他里面的消息是无法读取的,在控制台和消费者中都无法读取。这是因为这些默认的死信队列,他们的权限perm被设置成了2:禁读(这个权限有三种 2:禁读,4:禁写,6:可读可写)。需要手动将死信队列的权限配置成6,才能被消费(可以通过mqadmin指定或者web控制台)。
4.4 消息幂等
在互联网应用中,尤其在网络不稳定的情况下,消息队列 RocketMQ 的消息有可能会出现重复,这个重复简单可以概括为以下情况:
- 发送时消息重复
当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。 - 投递时消息重复
消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。 为了保证消息至少被消费一次,消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。 - 负载均衡时消息重复(包括但不限于网络抖动、Broker 重启以及订阅方应用重启)
当消息队列 RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息。
从上面的分析中,我们知道,在RocketMQ中,是无法保证每个消息只被投递一次的,所以要在业务上自行来保证消息消费的幂等性。
而要处理这个问题,RocketMQ的每条消息都有一个唯一的MessageId,这个参数在多次投递的过程中是不会改变的,所以业务上可以用这个MessageId来作为判断幂等的关键依据。
但是,这个MessageId是无法保证全局唯一的,也会有冲突的情况。所以在一些对幂等性要求严格的场景,最好是使用业务上唯一的一个标识比较靠谱。例如订单ID。而这个业务标识可以使用Message的Key来进行传递。
5.broker
5.1 读队列与写队列
在RocketMQ的管理控制台创建Topic时,可以看到要单独设置读队列和写队列。通常在运行时,都需要设置读队列=写队列。
perm字段表示Topic的权限。有三个可选项。 2:禁写禁订阅,4:可订阅,不能写,6:可写可订阅
这其中,写队列会真实的创建对应的存储文件,负责消息写入。而读队列会记录Consumer的Offset,负责消息读取。这其实是一种读写分离的思想。RocketMQ在配置MessageQueue的路由策略时,就可以通过指向不同的队列来实现读写分离。
5.2 消息持久化
RocketMQ消息直接采用磁盘文件保存消息,默认路径在${user_home}/store目录。这些存储目录可以在broker.conf中自行指定。
存储文件主要分为三个部分:
- CommitLog:存储消息的元数据。所有消息都会顺序存入到CommitLog文件当中。CommitLog由多个文件组成,每个文件固定大小1G。以第一条消息的偏移量为文件名。
- ConsumerQueue:存储消息在CommitLog的索引。一个MessageQueue一个文件,记录当前MessageQueue被哪些消费者组消费到了哪一条CommitLog。
- IndexFile:为了消息查询提供了一种通过key或时间区间来查询消息的方法,这种通过IndexFile来查找消息的方法不影响发送与消费消息的主流程
另外,还有几个辅助的存储文件:
- checkpoint:数据存盘检查点。里面主要记录commitlog文件、ConsumeQueue文件以及IndexFile文件最后一次刷盘的时间戳。
- config/*.json:这些文件是将RocketMQ的一些关键配置信息进行存盘保存。例如Topic配置、消费者组配置、消费者组消息偏移量Offset 等等一些信息。
- abort:这个文件是RocketMQ用来判断程序是否正常关闭的一个标识文件。正常情况下,会在启动时创建,而关闭服务时删除。但是如果遇到一些服务器宕机,或者kill -9这样一些非正常关闭服务的情况,这个abort文件就不会删除,因此RocketMQ就可以判断上一次服务是非正常关闭的,后续就会做一些数据恢复的操作。
整体的消息存储结构如下图:
1、CommitLog文件存储所有消息实体。所有生产者发过来的消息,都会无差别的依次存储到Commitlog文件当中。这样的好处是可以减少查找目标文件的时间,让消息以最快的速度落盘。对比Kafka存文件时,需要寻找消息所属的Partition文件,再完成写入,当Topic比较多时,这样的Partition寻址就会浪费比较多的时间,所以Kafka不太适合多Topic的场景。而RocketMQ的这种快速落盘的方式在多Topic场景下,优势就比较明显。
**文件结构:**CommitLog的文件大小是固定的,但是其中存储的每个消息单元长度是不固定的,具体格式可以参考org.apache.rocketmq.store.CommitLog
正因为消息的记录大小不固定,所以RocketMQ在每次存CommitLog文件时,都会去检查当前CommitLog文件空间是否足够,如果不够的话,就重新创建一个CommitLog文件。文件名为当前消息的偏移量。
2、ConsumeQueue文件主要是加速消费者的消息索引。他的每个文件夹对应RocketMQ中的一个MessageQueue,文件夹下的文件记录了每个MessageQueue中的消息在CommitLog文件当中的偏移量。这样,消费者通过ComsumeQueue文件,就可以快速找到CommitLog文件中感兴趣的消息记录。而消费者在ConsumeQueue文件当中的消费进度,会保存在config/consumerOffset.json文件当中。
文件结构:每个ConsumeQueue文件固定由30万个固定大小20byte的数据块组成,数据块的内容包括:msgPhyOffset(8byte,消息在文件中的起始位置)+msgSize(4byte,消息在文件中占用的长度)+msgTagCode(8byte,消息的tag的Hash值)。
在ConsumeQueue.java当中有一个常量CQ_STORE_UNIT_SIZE=20,这个常量就表示一个数据块的大小。
3、IndexFile文件主要是辅助消息检索。消费者进行消息消费时,通过ConsumeQueue文件就足够完成消息检索了,但是如果要按照MeessageId或者MessageKey来检索文件,比如RocketMQ管理控制台的消息轨迹功能,ConsumeQueue文件就不够用了。IndexFile文件就是用来辅助这类消息检索的。他的文件名比较特殊,不是以消息偏移量命名,而是用的时间命名。但是其实,他也是一个固定大小的文件。
文件结构:他的文件结构由 indexHeader(固定40byte)+ slot(固定500W个,每个固定20byte) + index(最多500W*4个,每个固定20byte) 三个部分组成。
indexFile的详细结构有大厂之前面试过,可以参考一下我的博文: https://blog.csdn.net/roykingw/article/details/120086520
5.3 过期文件删除
消息既然要持久化,就必须有对应的删除机制。RocketMQ内置了一套过期文件的删除机制。
首先:如何判断过期文件:
RocketMQ中,CommitLog文件和ConsumeQueue文件都是以偏移量命名,对于非当前写的文件,如果超过了一定的保留时间,那么这些文件都会被认为是过期文件,随时可以删除。这个保留时间就是在broker.conf中配置的fileReservedTime属性。
注意,RocketMQ判断文件是否过期的唯一标准就是非当前写文件的保留时间,而并不关心文件当中的消息是否被消费过。所以,RocketMQ的消息堆积也是有时间限度的。
然后:何时删除过期文件:
RocketMQ内部有一个定时任务,对文件进行扫描,并且触发文件删除的操作。用户可以指定文件删除操作的执行时间。在broker.conf中deleteWhen属性指定。默认是凌晨四点。
另外,RocketMQ还会检查服务器的磁盘空间是否足够,如果磁盘空间的使用率达到一定的阈值,也会触发过期文件删除。所以RocketMQ官方就特别建议,broker的磁盘空间不要少于4G。
5.4 高效文件写
5.4.1 零拷贝技术加速文件读写
mmap
以一次文件的读写操作为例,应用程序对磁盘文件的读与写,都需要经过内核态与用户态之间的状态切换,每次状态切换的过程中,就需要有大量的数据复制。
在这个过程中,总共需要进行四次数据拷贝。而磁盘与内核态之间的数据拷贝,在操作系统层面已经由CPU拷贝优化成了DMA拷贝。而内核态与用户态之间的拷贝依然是CPU拷贝。所以,在这个场景下,零拷贝技术优化的重点,就是内核态与用户态之间的这两次拷贝。
而mmap文件映射的方式,就是在用户态不再保存文件的内容,而只保存文件的映射,包括文件的内存起始地址,文件大小等。真实的数据,也不需要在用户态留存,可以直接通过操作映射,在内核态完成数据复制。
mmap的映射机制由于还是需要用户态保存文件的映射信息,数据复制的过程也需要用户态的参与,这其中的变数还是非常多的。所以,mmap机制适合操作小文件,如果文件太大,映射信息也会过大,容易造成很多问题。通常mmap机制建议的映射文件大小不要超过2G 。而RocketMQ做大的CommitLog文件保持在1G固定大小,也是为了方便文件映射。
sendfile
早期的sendfile实现机制其实还是依靠CPU进行页缓存与socket缓存区之间的数据拷贝。但是,在后期的不断改进过程中,sendfile优化了实现机制,在拷贝过程中,并不直接拷贝文件的内容,而是只拷贝一个带有文件位置和长度等信息的文件描述符FD,这样就大大减少了需要传递的数据。而真实的数据内容,会交由DMA控制器,从页缓存中打包异步发送到socket中。
sendfile机制在内核态直接完成了数据的复制,不需要用户态的参与,所以这种机制的传输效率是非常稳定的。sendfile机制非常适合大数据的复制转移。
5.4.2 顺序写加速文件写入磁盘
通常应用程序往磁盘写文件时,由于磁盘空间不是连续的,会有很多碎片。所以我们去写一个文件时,也就无法把一个文件写在一块连续的磁盘空间中,而需要在磁盘多个扇区之间进行大量的随机写。这个过程中有大量的寻址操作,会严重影响写数据的性能。而顺序写机制是在磁盘中提前申请一块连续的磁盘空间,每次写数据时,就可以避免这些寻址操作,直接在之前写入的地址后面接着写就行。
Kafka官方详细分析过顺序写的性能提升问题。Kafka官方曾说明,顺序写的性能基本能够达到内存级别。而如果配备固态硬盘,顺序写的性能甚至有可能超过写内存。而RocketMQ很大程度上借鉴了Kafka的这种思想。
例如可以看下org.apache.rocketmq.store.CommitLog#DefaultAppendMessageCallback中的doAppend方法。在这个方法中,会以追加的方式将消息先写入到一个堆外内存byteBuffer中,然后再通过fileChannel写入到磁盘。
5.4.3 刷盘
在操作系统层面,当应用程序写入一个文件时,文件内容并不会直接写入到硬件当中,而是会先写入到操作系统中的一个缓存PageCache中。PageCache缓存以4K大小为单位,缓存文件的具体内容。这些写入到PageCache中的文件,在应用程序看来,是已经完全落盘保存好了的,可以正常修改、复制等等。但是,本质上,PageCache依然是内存状态,所以一断电就会丢失。因此,需要将内存状态的数据写入到磁盘当中,这样数据才能真正完成持久化,断电也不会丢失。这个过程就称为刷盘。
-
同步刷盘:
在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘, 然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写 成功的状态。RocketMQ是有个定时任务,10ms刷一次盘。并不是完全的同步。
-
异步刷盘:
在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入。
-
配置方式:
刷盘方式是通过Broker配置文件里的flushDiskType 参数设置的,这个参数被配置成SYNC_FLUSH、ASYNC_FLUSH中的 一个。
同步刷盘机制会更频繁的调用fsync,所以吞吐量相比异步刷盘会降低,但是数据的安全性会得到提高。
5.5 消息主从复制
果Broker以一个集群的方式部署,会有一个master节点和多个slave节点,消息需要从Master复制到Slave上。而消息复制的方式分为同步复制和异步复制。
- 同步复制:
同步复制是等Master和Slave都写入消息成功后才反馈给客户端写入成功的状态。
在同步复制下,如果Master节点故障,Slave上有全部的数据备份,这样容易恢复数据。但是同步复制会增大数据写入的延迟,降低系统的吞吐量。
- 异步复制:
异步复制是只要master写入消息成功,就反馈给客户端写入成功的状态。然后再异步的将消息复制给Slave节点。
在异步复制下,系统拥有较低的延迟和较高的吞吐量。但是如果master节点故障,而有些数据没有完成复制,就会造成数据丢失。
- 配置方式:
消息复制方式是通过Broker配置文件里的brokerRole参数进行设置的,这个参数可以被设置成ASYNC_MASTER、 SYNC_MASTER、SLAVE三个值中的一个。
5.6 Dledger集群
Dledger是RocketMQ自4.5版本引入的实现高可用集群的一项技术。他基于Raft算法进行构建,在RocketMQ的主从集群基础上,增加了自动选举的功能。当master节点挂了之后,会在集群内自动选举出一个新的master节点。虽然Dledger机制目前还在不断验证改进的阶段,但是作为基础的Raft算法,已经是目前互联网行业非常认可的一种高可用算法了。Kafka目前也在基于Raft算法,构建摆脱Zookeeper的集群化方案。
RocketMQ中的Dledger集群主要包含两个功能:1、从集群中选举产生master节点。2、优化master节点往slave节点的消息同步机制。
先来看第一个功能:Dledger是使用Raft算法来进行节点选举的。
首先:每个节点有三个状态,Leader,follower和candidate(候选人)。正常运行的情况下,集群中会有一个leader,其他都是follower,follower只响应Leader和Candidate的请求,而客户端的请求全部由Leader处理,即使有客户端请求到了一个follower,也会将请求转发到leader。
集群刚启动时,每个节点都是follower状态,之后集群内部会发送一个timeout信号,所有follower就转成candidate去拉取选票,获得大多数选票的节点选为leader,其他候选人转为follower。如果一个timeout信号发出时,没有选出leader,将会重新开始一次新的选举。而Leader节点会往其他节点发送心跳信号,确认他的leader状态。然后会启动定时器,如果在指定时间内没有收到Leader的心跳,就会转为Candidate状态,然后向其他成员发起投票请求,如果收到半数以上成员的投票,则Candidate会晋升为Leader。然后leader也有可能会退化成follower。
然后,在Raft协议中,会将时间分为一些任意时间长度的时间片段,叫做term。term会使用一个全局唯一,连续递增的编号作为标识,也就是起到了一个逻辑时钟的作用。
在每一个term时间片里,都会进行新的选举,每一个Candidate都会努力争取成为leader。获得票数最多的节点就会被选举为Leader。被选为Leader的这个节点,在一个term时间片里就会保持leader状态。这样,就会保证在同一时间段内,集群中只会有一个Leader。在某些情况下,选票可能会被各个节点瓜分,形成不了多数派,那这个term可能直到结束都没有leader,直到下一个term再重新发起选举,这也就没有了Zookeeper中的脑裂问题。而在每次重新选举的过程中, leader也有可能会退化成为follower。也就是说,在这个集群中, leader节点是会不断变化的。
然后,每次选举的过程中,每个节点都会存储当前term编号,并在节点之间进行交流时,都会带上自己的term编号。如果一个节点发现他的编号比另外一个小,那么他就会将自己的编号更新为较大的那一个。而如果leader或者candidate发现自己的编号不是最新的,他就会自动转成follower。如果接收到的请求term编号小于自己的编号,term将会拒绝执行。
在选举过程中,Raft协议会通过心跳机制发起leader选举。节点都是从follower状态开始的,如果收到了来自leader或者candidate的心跳RPC请求,那他就会保持follower状态,避免争抢成为candidate。而leader会往其他节点发送心跳信号,来确认自己的地位。如果follower一段时间(两个timeout信号)内没有收到Leader的心跳信号,他就会认为leader挂了,发起新一轮选举。
选举开始后,每个follower会增加自己当前的term,并将自己转为candidate。然后向其他节点发起投票请求,请求时会带上自己的编号和term,也就是说都会默认投自己一票。之后candidate状态可能会发生以下三种变化:
- 赢得选举,成为leader: 如果它在一个term内收到了大多数的选票,将会在接下的剩余term时间内称为leader,然后就可以通过发送心跳确立自己的地位。(每一个server在一个term内只能投一张选票,并且按照先到先得的原则投出)
- 其他节点成为leader: 在等待投票时,可能会收到其他server发出心跳信号,说明其他leader已经产生了。这时通过比较自己的term编号和RPC过来的term编号,如果比对方大,说明leader的term过期了,就会拒绝该RPC,并继续保持候选人身份; 如果对方编号不比自己小,则承认对方的地位,转为follower。
- 选票被瓜分,选举失败: 如果没有candidate获取大多数选票, 则没有leader产生, candidate们等待超时后发起另一轮选举. 为了防止下一次选票还被瓜分,必须采取一些额外的措施, raft采用随机election timeout(随机休眠时间)的机制防止选票被持续瓜分。通过将timeout随机设为一段区间上的某个值, 因此很大概率会有某个candidate率先超时然后赢得大部分选票。
所以以三个节点的集群为例,选举过程会是这样的:
- 集群启动时,三个节点都是follower,发起投票后,三个节点都会给自己投票。这样一轮投票下来,三个节点的term都是1,是一样的,这样是选举不出Leader的。
- 当一轮投票选举不出Leader后,三个节点会进入随机休眠,例如A休眠1秒,B休眠3秒,C休眠2秒。
- 一秒后,A节点醒来,会把自己的term加一票,投为2。然后2秒时,C节点醒来,发现A的term已经是2,比自己的1大,就会承认A是Leader,把自己的term也更新为2。实际上这个时候,A已经获得了集群中的多数票,2票,A就会被选举成Leader。这样,一般经过很短的几轮选举,就会选举出一个Leader来。
- 到3秒时,B节点会醒来,他也同样会承认A的term最大,他是Leader,自己的term也会更新为2。这样集群中的所有Candidate就都确定成了leader和follower.
- 然后在一个任期内,A会不断发心跳给另外两个节点。当A挂了后,另外的节点没有收到A的心跳,就会都转化成Candidate状态,重新发起选举。
然后,Dledger还会采用Raft协议进行多副本的消息同步
使用Dledger集群后,数据主从同步会分为两个阶段,一个是uncommitted阶段,一个是commited阶段。
Leader Broker上的Dledger收到一条数据后,会标记为uncommitted状态,然后他通过自己的DledgerServer组件把这个uncommitted数据发给Follower Broker的DledgerServer组件。
接着Follower Broker的DledgerServer收到uncommitted消息之后,必须返回一个ack给Leader Broker的Dledger。然后如果Leader Broker收到超过半数的Follower Broker返回的ack之后,就会把消息标记为committed状态。
再接下来, Leader Broker上的DledgerServer就会发送committed消息给Follower Broker上的DledgerServer,让他们把消息也标记为committed状态。这样,就基于Raft协议完成了两阶段的数据同步。
6.消息类型
6.1 顺序消息
一个订单产生了三条消息分别是订单创建、订单付款、订单完成。消费时要按照这个顺序消费才能有意义,但是同时订单之间是可以并行消费的。
顺序消息分为全局顺序消息与部分顺序消息,全局顺序是指某个Topic下的所有消息都要保证顺序;部分顺序消息只要保证每一组消息被顺序消费即可。
如果想要实现全局顺序消息,那么只能使用一个队列,以及单个生产者,这是会严重影响性能。
顺序消费实际上有两个核心点,一个是生产者有序存储,另一个是消费者有序消费。
生产者有序发送:
- 只需要保证一组相同的消息按照给定的顺序存入同一个队列中,就能保证生产者有序存储。
- RocketMQ支持生产者在投放消息的时候自定义投放策略,实现一个MessageQueueSelector接口,使用Hash取模法来保证同一个订单在同一个队列中就行了,即通过订单ID%队列数量得到该ID的订单所投放的队列在队列列表中的索引,然后该订单的所有消息都会被投放到这个队列中。
- 顺序消息必须使用同步发送的方式才能保证生产者发送的消息有序。
- 实际上,采用队列选择器的方法不能保证消息的严格顺序,我们的目的是将消息发送到同一个队列中,如果某个broker挂了,那么队列就会减少一部分,如果采用取余的方式投递,将可能导致同一个业务中的不同消息被发送到不同的队列中,导致同一个业务的不同消息被存入不同的队列中,短暂的造成部分消息无序。同样的,如果增加了服务器,那么也会造成短暂的造成部分消息无序。
消费者有序消费:
- RockerMQ的MessageListener回调函数提供了两种消费模式,有序消费模式MessageListenerOrderly和并发消费模式MessageListenerConcurrently。在消费的时候,还需要保证消费者注册MessageListenerOrderly类型的回调接口实现顺序消费,如果消费者采用Concurrently并行消费,则仍然不能保证消息消费顺序。
- 实际上,每一个消费者的的消费端都是采用线程池实现多线程消费的模式,即消费端是多线程消费。虽然MessageListenerOrderly被称为有序消费模式,但是仍然是使用的线程池去消费消息。
- MessageListenerConcurrently是拉取到新消息之后就提交到线程池去消费,而MessageListenerOrderly则是通过加分布式锁和本地锁保证同时只有一条线程去消费一个队列上的数据。
顺序消费模式使用3把锁来保证消费的顺序性:
- broker端的分布式锁:
1)在负载均衡的处理新分配队列的updateProcessQueueTableInRebalance方法,以及ConsumeMessageOrderlyService服务启动时的start方法中,都会尝试向broker申请当前消费者客户端分配到的messageQueue的分布式锁。
2)broker端的分布式锁存储结构为ConcurrentMap<String, ConcurrentHashMap<MessageQueue, LockEntry>>,该分布式锁保证同一个consumerGroup下同一个messageQueue只会被分配给一个consumerClient。
3)获取到的broker端的分布式锁,在client端的表现形式为processQueue. locked属性为true,且该分布式锁在broker端默认60s过期,而在client端默认30s过期,因此ConsumeMessageOrderlyService#start会启动一个定时任务,每过20s向broker申请分布式锁,刷新过期时间。而负载均衡服务也是每20s进行一次负载均衡。
4)broker端的分布式锁最先被获取到,如果没有获取到,那么在负载均衡的时候就不会创建processQueue了也不会提交对应的消费请求了。 - messageQueue的本地synchronized锁:
1)在执行消费任务的开头,便会获取该messageQueue的本地锁对象objLock,它是一个Object对象,然后通过synchronized实现锁定。
2)这个锁的锁对象存储在MessageQueueLock.mqLockTable属性中,结构为ConcurrentMap<MessageQueue, Object>,所以说,一个MessageQueue对应一个锁,不同的MessageQueue有不同的锁。
3)因为顺序消费也是通过线程池消费的,所以这个synchronized锁用来保证同一时刻对于同一个队列只有一个线程去消费它。 - ProcessQueue的本地consumeLock:
1)在获取到broker端的分布式锁以及messageQueue的本地synchronized锁的之后,在执行真正的消息消费的逻辑messageListener#consumeMessage之前,会获取ProcessQueue的consumeLock,这个本地锁是一个ReentrantLock。
2)那么这把锁有什么作用呢?
?2-1)在负载均衡时,如果某个队列C被分配给了新的消费者,那么当前客户端消费者需要对该队列进行释放,它会调用removeUnnecessaryMessageQueue方法对该队列C请求broker端分布式锁的解锁。
?2-2)而在请求broker分布式锁解锁的时候,一个重要的操作就是首先尝试获取这个messageQueue对应的ProcessQueue的本地consumeLock。只有获取了这个锁,才能尝试请求broker端对该messageQueue的分布式锁解锁。
?2-3)如果consumeLock加锁失败,表示当前消息队列正在消息,不能解锁。那么本次就放弃解锁了,移除消息队列失败,只有等待下次重新分配消费队列时,再进行移除。
3)如果没有这把锁,假设该消息队列因为负载均衡而被分配给其他客户端B,但是由于客户端A正在对于拉取的一批消费消息进行消费,还没有提交消费点位,如果此时客户端A能够直接请求broker对该messageQueue解锁,这将导致客户端B获取该messageQueue的分布式锁,进而消费消息,而这些没有commit的消息将会发送重复消费。
4)所以说这把锁的作用,就是防止在消费消息的过程中,该消息队列因为发生负载均衡而被分配给其他客户端,进而导致的两个客户端重复消费消息的行为。
消费者使用MessageListenerOrderly顺序消费有个两个问题:
- 使用了很多的锁,降低了吞吐量。
- 前一个消息消费阻塞时后面消息都会被阻塞。如果遇到消费失败的消息,会自动对当前消息进行重试(每次间隔时间为1秒),无法自动跳过,重试最大次数是Integer.MAX_VALUE,这将导致当前队列消费暂停,因此通常需要设定有一个最大消费次数,以及处理好所有可能的异常情况。RocketMQ的消费者消息重试和生产者消息重投。
6.2 广播消息
广播消息并没有特定的消息消费者样例,这是因为这涉及到消费者的集群消费模式。在集群状态(MessageModel.CLUSTERING)下,每一条消息只会被同一个消费者组中的一个实例消费到(这跟kafka和rabbitMQ的集群模式是一样的)。而广播模式则是把消息发给了所有订阅了对应主题的消费者,而不管消费者是不是同一个消费者组。
6.3 延迟消息
延迟时间的设置就是在Message消息对象上设置一个延迟级别message.setDelayTimeLevel(3);
开源版本的RocketMQ中,对延迟消息并不支持任意时间的延迟设定(商业版本中支持),而是只支持18个固定的延迟级别,1到18分别对应messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。这从哪里看出来的?其实从rocketmq-console控制台就能看出来。而这18个延迟级别也支持自行定义,不过一般情况下最好不要自定义修改。
6.4 批量消息
批量消息是指将多条消息合并成一个批量消息,一次发送出去。这样的好处是可以减少网络IO,提升吞吐量。
如果批量消息大于1MB就不要用一个批次发送,而要拆分成多个批次消息发送。也就是说,一个批次消息的大小不要超过1MB。
实际使用时,这个1MB的限制可以稍微扩大点,实际最大的限制是4194304字节,大概4MB。但是使用批量消息时,这个消息长度确实是必须考虑的一个问题。而且批量消息的使用是有一定限制的,这些消息应该有相同的Topic,相同的waitStoreMsgOK。而且不能是延迟消息、事务消息等。
6.5 过滤消息
两种方式:
- Tag过滤
TAG是RocketMQ中特有的一个消息属性。RocketMQ的最佳实践中就建议,使用RocketMQ时,一个应用可以就用一个Topic,而应用中的不同业务就用TAG来区分。 - 使用SQL表达式过滤
sql语句是按照SQL92标准来执行的。只有推模式的消费者可以使用SQL过滤。拉模式是用不了的。
6.6 事务消息
事务消息是在分布式系统中保证最终一致性的两阶段提交的消息实现。他可以保证本地事务执行与消息发送两个操作的原子性,也就是这两个操作一起成功或者一起失败。
事务消息的关键是在TransactionMQProducer中指定了一个TransactionListener事务监听器,这个事务监听器就是事务消息的关键控制器。
在提交完事务消息后执行:
- 返回COMMIT_MESSAGE状态的消息会立即被消费者消费到。
- 返回ROLLBACK_MESSAGE状态的消息会被丢弃。
- 返回UNKNOWN状态的消息会由Broker过一段时间再来回查事务的状态。
事务消息机制的关键是在发送消息时,会将消息转为一个half半消息,并存入RocketMQ内部的一个 RMQ_SYS_TRANS_HALF_TOPIC 这个Topic,这样对消费者是不可见的。再经过一系列事务检查通过后,再将消息转存到目标Topic,这样对消费者就可见了。
事务消息的使用限制:
- 1、事务消息不支持延迟消息和批量消息。
- 2、为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的
transactionCheckMax
参数来修改此限制。如果已经检查某条消息超过 N 次的话( N =transactionCheckMax
) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写AbstractTransactionCheckListener
类来修改这个行为。 - 3、事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于
transactionMsgTimeout
参数。 - 4、事务性消息可能不止一次被检查或消费。
- 5、提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
- 6、事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。
7.RocketMQ使用中常见的问题
7.1 使用RocketMQ如何保证消息不丢失?
完整方案
- 发送端:重试机制
- broker端:
同步刷盘;
主从复制改为同步复制,或者使用Dledger主从架构保证MQ主从复制时不会丢消息。 - 消费者端不要使用异步消费机制
- 整个MQ挂了之后准备降级方案:多次尝试发送RocketMQ不成功,那就只能另外找给地方(Redis、文件或者内存等)把消息缓存下来,然后起一个线程定时的扫描这些失败的消息,尝试往RocketMQ发送。这样等RocketMQ的服务恢复过来后,就能第一时间把这些消息重新发送出去。
7.2 使用RocketMQ如何快速处理积压消息?
1、如何确定RocketMQ有大量的消息积压?
在正常情况下,使用MQ都会要尽量保证他的消息生产速度和消费速度整体上是平衡的,但是如果部分消费者系统出现故障,就会造成大量的消息积累。这类问题通常在实际工作中会出现得比较隐蔽。例如某一天一个数据库突然挂了,大家大概率就会集中处理数据库的问题。等好不容易把数据库恢复过来了,这时基于这个数据库服务的消费者程序就会积累大量的消息。或者网络波动等情况,也会导致消息大量的积累。这在一些大型的互联网项目中,消息积压的速度是相当恐怖的。所以消息积压是个需要时时关注的问题。
对于消息积压,如果是RocketMQ或者kafka还好,他们的消息积压不会对性能造成很大的影响。而如果是RabbitMQ的话,那就惨了,大量的消息积压可以瞬间造成性能直线下滑。
对于RocketMQ来说,有个最简单的方式来确定消息是否有积压。那就是使用web控制台,就能直接看到消息的积压情况。
在Web控制台的主题页面,可以通过 Consumer管理 按钮实时看到消息的积压情况。
2、如何处理大量积压的消息?
其实我们回顾下RocketMQ的负载均衡的内容就不难想到解决方案。
如果Topic下的MessageQueue配置得是足够多的,那每个Consumer实际上会分配多个MessageQueue来进行消费。这个时候,就可以简单的通过增加Consumer的服务节点数量来加快消息的消费,等积压消息消费完了,再恢复成正常情况。最极限的情况是把Consumer的节点个数设置成跟MessageQueue的个数相同。但是如果此时再继续增加Consumer的服务节点就没有用了。
而如果Topic下的MessageQueue配置得不够多的话,那就不能用上面这种增加Consumer节点个数的方法了。这时怎么办呢? 这时如果要快速处理积压的消息,可以创建一个新的Topic,配置足够多的MessageQueue。然后把所有消费者节点的目标Topic转向新的Topic,并紧急上线一组新的消费者,只负责消费旧Topic中的消息,并转储到新的Topic中,这个速度是可以很快的。然后在新的Topic上,就可以通过增加消费者个数来提高消费速度了。之后再根据情况恢复成正常情况。
在官网中,还分析了一个特殊的情况。就是如果RocketMQ原本是采用的普通方式搭建主从架构,而现在想要中途改为使用Dledger高可用集群,这时候如果不想历史消息丢失,就需要先将消息进行对齐,也就是要消费者把所有的消息都消费完,再来切换主从架构。因为Dledger集群会接管RocketMQ原有的CommitLog日志,所以切换主从架构时,如果有消息没有消费完,这些消息是存在旧的CommitLog中的,就无法再进行消费了。这个场景下也是需要尽快的处理掉积压的消息。
7.3 RocketMQ的消息轨迹
1、RocketMQ消息轨迹数据的关键属性
2、消息轨迹配置
打开消息轨迹功能,需要在broker.conf中打开一个关键配置:
traceTopicEnable=true
3、消息轨迹数据存储
默认情况下,消息轨迹数据是存于一个系统级别的Topic ,RMQ_SYS_TRACE_TOPIC。这个Topic在Broker节点启动时,会自动创建出来。
在客户端的两个核心对象 DefaultMQProducer和DefaultMQPushConsumer,他们的构造函数中,都有两个可选的参数来打开消息轨迹存储
- enableMsgTrace:是否打开消息轨迹。默认是false。
- customizedTraceTopic:配置将消息轨迹数据存储到用户指定的Topic 。
参考
- 图灵vip课程https://vip.tulingxueyuan.cn/
- https://blog.csdn.net/weixin_43767015/article/details/121028059
- https://blog.csdn.net/demohui/article/details/119190796
- Dledger集群 https://www.cnblogs.com/Qing-840/p/16730988.html
- 主从同步 https://www.cnblogs.com/shanml/p/16950178.html