当前位置: 首页>后端>正文

RocketMQ

RocketMQ,第1张

消息队列版既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性,是现在分布式系统中不可或缺的中间件。这次我们一起来看下使用很广泛的 RocketMQ 的一些特性。

RocketMQ结构

RocketMQ,第2张

RocketMQ 官方推荐的部署结构如上图所示,主要分为 NameServer、Broker、Producer、Consumer 四种角色

物理结构

  • Producer:消息的发布者。Producer 通过 NameServer 获取 Topic 及 Broker 信息,并向相应的 Broker 集群队列进行消息投递,投递的过程支持快速失败并且低延迟。
  • Consumer:消息的消费者。Consumer 通过 NameServer 获取 Topic 及 Broker 信息,支持以 pull 拉,push 推(实际也是pull)两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制。
  • BrokerServer:Broker 主要负责消息的存储、投递和查询以及服务高可用保证。为了高可用一般会采用 Master + Slave 的一对多部署方式,Broker 启动后需要完成一次将自己注册至 NameServer 的操作,随后定期向 NameServer 上报 Topic 路由信息。
  • NameServer:NameServer 是一个非常简单的 Topic 路由注册中心,其角色类似 Dubbo 中的 Zookeeper,支持 Broker 的动态注册与发现。主要包括两个功能:Broker 管理,接受 Broker 的注册信息,检查 Broker 是否还存活;路由信息管理, Producer 和Conumser 通过 NameServer 就可以知道整个 Broker 集群的路由信息,从而进行消息的投递和消费。NameServer 通常也是集群的方式部署,各实例间相互不进行信息通讯。

逻辑结构

  • Producer Group:用来表示一个发送消息应用,一个 Producer Group 下包含多个 Producer 实例,可以是多台机器,也可以是一台机器的多个进程,或者一个进程的多个 Producer 对象。一个 Producer Group 可以发送多个 Topic 消息,Producer Group 作用如下:
    • 标识一类 Producer
    • 可以通过运维工具查询这个发送消息应用下有多个 Producer 实例
    • 发送分布式事务消息时,如果 Producer 中途意外宕机,Broker 会主动回调 Producer Group 内的任意一台机器来确认事务状态。
  • Consumer Group:用来表示一个消费消息应用,一个 Consumer Group 下包含多个 Consumer 实例,可以是多台机器,也可以是多个进程,或者是一个进程的多个 Consumer 对象。一个 Consumer Group 下的多个 Consumer 以均摊方式消费消息,如果设置为广播方式,那么这个 Consumer Group 下的每个实例都消费全量数据。

消息存储

存储文件说明

├── abort
├── checkpoint
├── config
│   ├── consumerOffset.json
│   ├── consumerOffset.json.bak
│   ├── delayOffset.json
│   ├── delayOffset.json.bak
│   ├── subscriptionGroup.json
│   ├── subscriptionGroup.json.bak
│   ├── topics.json
│   └── topics.json.bak
├── commitlog
│   ├── 00000000000000000000
│   ├── 00000000001073741824
│   └── 00000000002147483648 
├── consumequeue
│   ├── %DLQ%ConsumerGroupA
│   │   └── 0
│   │       └── 00000000000060000000
│   ├── %RETRY%ConsumerGroupA
│   │   └── 0
│   │       └── 00000000000000000000
│   ├── %RETRY%ConsumerGroupB
│   │   └── 0
│   │       └── 00000000000000000000
│   ├── SCHEDULE_TOPIC_XXXX
│   │   ├── 2
│   │   │   └── 00000000000060000000
│   │   └── 3
│   │       └── 00000000000060000000
│   ├── TopicA
│   │   ├── 0
│   │   │   ├── 00000000000000000000
│   │   │   ├── 00000000000060000000
│   │   │   └── 00000000000120000000
│   │   └── 1
│   │       ├── 00000000000060000000
│   │       └── 00000000000120000000
│   └── TopicB
│       ├── 0
│       │   └── 00000000000240000000
│       ├── 1
│       │   └── 00000000000240000000
│       └── 2
│           └── 00000000000240000000
├── index
│   └── 20200720163452641
└── lock

MQ 实际的文件存储结构如上所示,最核心的为 CommitLog 及 ConsumeQueue 下面进行详细介绍:

CommitLog

消息主体以及元数据的存储主体,存储 Producer 端写入的消息主体内容,消息内容不是定长的。单个文件大小默认 1G, 文件名长度为 20 位,左边补零,剩余为起始偏移量,比如 00000000000000000000代表了第一个文件,起始偏移量为 0 ,文件大小为 1G = 1073741824B ;当第一个文件写满了,第二个文件为00000000001073741824 ,起始偏移量为 1073741824 ,以此类推。

ConsumeQueue

消息消费队列,引入的目的主要是提高消息消费的性能,由于 RocketMQ 是基于 Topic 的订阅模式,消息消费是针对 Topic 进行,如果要遍历 CommitLog 文件中根据 Topic 检索消息是非常低效的。ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了 Topic 下的消息在 CommitLog中 的起始物理偏移量 offset 、消息大小和消息 Tag 的HashCode 值。ConsumeQueue 文件可以看成是基于 Topic 的 CommitLog 索引文件,故ConsumeQueue 文件夹的组织方式如下:topic/queue/file 三层组织结构。

同样 ConsumeQueue 文件采取定长设计,每一个条目共 20 个字节,分别为 8 字节的 CommitLog 物理偏移量、4字节的消息长度、8字节tag hashcode ,单个文件由 30W 个条目组成。名字长度也为20位,左边补零,剩余为起始偏移量,偏移量单位即文件的大小 6000000。

Index

索引文件提供了一种可以通过 key 或时间区间来查询消息的能力。文件名 fileName 是以创建时的时间戳命名的,单个IndexFile 文件大小约为 400M ,一个 IndexFile 可以保存 2000W 个索引,索引文件其底层实现为 hash 索引。

在上面的RocketMQ的消息存储整体架构图中可以看出,RocketMQ采用的是混合型的存储结构,即一个 Broker 实例下所有的队列共用一个日志文件(即CommitLog)来存储。Producer 发送消息至 Broker ,Broker 使用同步或者异步的方式对消息刷盘持久化,保存至CommitLog中。ConsumerQueue 则相当于 CommitLog 的索引文件,Consumer 消费时会先从 ConsumerQueue 中查找消息的在 CommitLog 中的 offset ,再去CommitLog 中找元数据。

MsgId说明

RocketMQ,第3张
msgId

MsgId 总共 16 字节,包含消息存储主机地址,消息 CommitLog Offset。从 MsgId 中解析出 Broker 的地址和 CommitLog 的偏移地址,然后按照存储格式所在位置消息 buffer 解析成一个完整的消息。

刷盘策略

RocketMQ 的所有消息都是持久化的,先写入系统 PageCache,然后刷盘,可以保证内存与磁盘都有一份数据,访问时直接从内存读取。为适用各种场景,MQ 支持同步/异步刷盘方式,可以通过 Broker 的 flushDiskType 进行配置。

异步刷盘

RocketMQ,第4张
异步刷盘

能够充分利用 OS 的PageCache的优势,只要消息写入 PageCache 即可将成功的 ACK 返回给 Producer 端。消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了 MQ 的性能和吞吐量。

同步刷盘

RocketMQ,第5张
同步刷盘

只有在消息真正持久化至磁盘后 RocketMQ 的 Broker 端才会真正返回给 Producer 端一个成功的 ACK 响应。同步刷盘对 MQ 消息可靠性来说是一种不错的保障,但是性能上会有较大影响,一般金融业务应用该模式较多。

高可用

RocketMQ 采用 Master + Slave 的模式来保证高可用。与刷盘策略类似,也提供了同步双写与异步复制两种方式来满足不同场景需求,可以通过 Broker 的 brokerRole 参数进行控制。如果对消息的可靠性要求比较严格,可以采用 SYNC_MASTER(同步双写)的部署方式。如果对消息可靠性要求不高,可以采用 ASYNC_MASTER(异步复制)的部署方式。

异步复制的实现思路也非常简单,Slave 启动一个线程,不断从 Master 拉取 CommitLog 中的数据,然后再异步构建出 ConsumeQueue 数据结构。整个实现过程基本同 Mysql 主从同步类似。

高性能保障

顺序写读

PageCache 是操作系统对文件的缓存,用于加速对文件的读写。一般来说,程序对文件进行顺序读写的速度几乎接近于内存的读写速度,主要原因就是由于使用 PageCache 机制对读写访问操作进行了性能优化,可以参见下图的的性能表现。

RocketMQ,第6张
随机顺序读写

结合 MQ 的存储结构发现这两个特点,①所有数据存储到一个 CommitLog,完全顺序写但随机读②对最终用户展现的 ConsumerQueue 实际只存储消息在 CommitLog 的位置信息,并且串行方式刷盘。这种设计方案带来 了相应的优缺点,具体如下:

优点

  1. 队列轻量化,单个 ConsumerQueue 数据量非常少。
  2. 对磁盘的访问串行化,避免磁盘竟争,不会因为队列增加导致 IOWAIT 增高。

缺点

  1. 写虽然完全是顺序写,但是读却变成了完全的随机读。
  2. 读一条消息,会先读 ConsumeQueue ,再读 CommitLog ,增加了开销。
  3. 要保证 CommitLog 与 ConsumeQueue 完全的一致,增加了编程的复杂度。

缺点应对方案

  1. 对随机读,尽可能让读命中 PageCache ,减少 IO 操作,所以内存越大越好。当系统中堆积的消息过多,读数据要访问磁盘时候,通过如下方式提高效率:
    a. 访问 PageCache 时,即使只访问 1k 的消息,也会提前预读更多的数据,在下次读时就可能命中缓存。
    b. 随机访问 CommitLog 磁盘数据,IO 调度算法设置为 NOOP 方式,会在一定程度上将完全的随机读变成顺序跳跃方式,而顺序跳跃方式读较完全的随机读性能会高 5 倍以上。
  2. 由于 ConsumeQueue 存储数据量极少,而且是顺序读,在预读作用下,ConsumeQueue 的读性能几乎与内存一致。所以可认为 ConsumeQueue 基本不会阻碍读性能。
  3. CommitLog 中存储了所有的元数据,类似于 Mysql 的 redolog,所以只要 CommitLog 存在,ConsumeQueue 即使数据丢失,仍然可以恢复出来。

零拷贝应用

RocketMQ 主要通过 mmap 零拷贝技术对文件进行读写操作。其中,利用了 NIO 中的 FileChannel 模型将磁盘上的物理文件直接映射到用户态的内存地址中,将对文件的操作转化为直接对内存地址的操作,从而极大地提高了文件的读写效率。正因为需要使用内存映射机制,故 RocketMQ 的文件存储都使用定长结构来存储,方便一次将整个文件映射至内存。

事务消息

RocketMQ,第7张
事务消息

事务消息流程概要

上图说明了事务消息的大致方案,其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。

  1. 事务消息发送及提交:
    a. 发送消息(half消息)。
    b. 服务端响应消息写入结果。
    c. 根据发送结果执行本地事务(如果写入失败,此时 half 消息对业务不可见,本地逻辑不执行)。
    d. 根据本地事务状态执行 Commit 或者 Rollback( Commit 操作生成消息索引,消息对消费者可见)
  2. 补偿流程:
    a. 对没有 Commit/Rollback 的事务消息,从服务端发起一次“回查”
    b. Producer 收到回查消息,检查回查消息对应的本地事务的状态
    c. 根据本地事务状态,重新 Commit 或者 Rollback 其中。

RocketMQ 并不会无休止的的信息事务状态回查,默认回查15次,如果15次回查还是无法得知事务状态,RocketMQ 默认回滚该消息。

其他说明

特殊队列

延迟队列

定时消息(延迟队列)是指消息发送到 Broker 后,不会立即被消费,等待特定时间投递给真正的 Topic。 Broker 有配置项 messageDelayLevel ,存在“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h” 18个 level 。注意,messageDelayLevel 是 Broker 的属性,不属于某个 Topic 。发消息时,设置delayLevel 等级即可。level有以下三种情况:

  • level == 0,消息为非延迟消息
  • 1<=level<=maxLevel,消息延迟特定时间,例如level==1,延迟1s
  • level > maxLevel,则level== maxLevel,例如level==20,延迟2h

定时消息会暂存在名为 SCHEDULE_TOPIC_XXXX 的 Topic 中,并根据 delayTimeLevel 存入特定的queue ,queueId = delayTimeLevel – 1,即一个 queue 只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker 会调度地消费 SCHEDULE_TOPIC_XXXX ,将消息写入真实的 Topic 。
需要注意的是,定时消息会在第一次写入和调度写入真实 Topic 时都会计数,因此发送数量、tps 都会变高。

重试队列

Consumer消费消息失败后,要提供一种重试机制,令消息再消费一次。Consumer消费消息失败通常可以认为有以下几种情况:

  • 由于消息本身的原因,例如反序列化失败,消息数据本身无法处理(例如话费充值,当前消息的手机号被注销)。这种错误通常需要跳过这条消息,再消费其它消息,而这条失败的消息即使立刻重试消费,99%也不成功,所以最好提供一种定时重试机制,即过10秒后再重试。
  • 由于依赖的下游应用服务不可用,例如 db 连接不可用,依赖接口不可达等。遇到这种错误,即使跳过当前失败的消息,消费其他消息同样也会报错。这种情况建议应用 sleep 30s,再消费下一条消息,这样可以减轻Broker重试消息的压力。

RocketMQ 会为每个消费组都设置一个 Topic 名称为“%RETRY%+consumerGroup”的重试队列(注意:这个 Topic 的重试队列是针对消费组,而不是针对每个 Topic),用于暂时保存因为各种异常而导致 Consumer 端无法消费的消息。考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重新投递延时,重试次数越多投递延时就越大。RocketMQ 对于重试消息的处理是先保存至 Topic 名称为“SCHEDULE_TOPIC_XXXX”的延迟队列中,后台定时任务按照对应的时间进行 Delay 后重新保存至“%RETRY%+consumerGroup”的重试队列中。

重试会自动进行16次,每次间隔时间依次如下

LEVEL 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
时间 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

死信队列

死信队列用于处理无法被正常消费的消息。当一条消息初次消费失败,消息队列会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。

RocketMQ 将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue),名称为“%DLQ%+consumerGroup”。在 RocketMQ 中,可以通过使用 console 控制台对死信队列中的消息进行重发来使得消费者实例再次进行消费。

消息重投(Producer)

生产者在发送消息时,同步消息失败会重投,异步消息有重试,oneway没有任何保证。消息重投保证消息尽可能发送成功、不丢失,但可能会造成消息重复。如下方法可以设置消息重试策略:

  • retryTimesWhenSendFailed:同步发送失败重投次数,默认为2,因此生产者会最多尝试发送retryTimesWhenSendFailed + 1次。不会选择上次失败的 Broker,尝试向其他 Broker 发送,最大程度保证消息不丢。超过重投次数,抛出异常。当出现 RemotingException、MQClientException 和部分MQBrokerException 时会重投。
  • retryTimesWhenSendAsyncFailed:异步发送失败重试次数,异步重试不会选择其他 Broker ,仅在同一个Broker 上做重试,不保证消息不丢。
  • retryAnotherBrokerWhenNotStoreOK:消息刷盘(主或备)超时或 slave 不可用(返回状态非SEND_OK),是否尝试发送到其他broker,默认false。十分重要消息可以开启。

低延迟消息

在消息不堆积情况下,消息到达 Broker 后,能立刻到达 Consumer。
RocketMQ 使用长轮询 Pull 方式,可保证消息非常实时,消息实时性不低于 Push。类似于 Web QQ 收发消息的机制。

At least Once

是指每个消息必须投递一次。

RocketMQ Consumer 先 pull 消息到本地,消费完成后,才向服务器返回 ack,如果没有消费一定不会 ack 消息,所以 RocketMQ 可以很好的支持此特性。

Exactly Only Once

是指每个消息只被投递一次。

  1. 发送消息阶段,不允许发送重复的消息。
  2. 消费消息阶段,不允许消费重复的消息。

只有以上两个条件都满足情况下,才能认为消息是“Exactly Only Once”,而要实现以上两点,在分布式系统环境下,不可避免要产生巨大的开销。所以 RocketMQ 为了追求高性能,并不保证此特性,要求在业务上进行去重,也就是说消费消息要做到幂等性。

kafka 在 0.11.x 版本引入了 exactly-once 语义。

PageCache

PageCache 是操作系统对文件的缓存,用于加速对文件的读写,类似于 CPU 访问内存时候的三级缓存,都是为了平衡不同运行效率的硬件,从而提高整体运行性能的技术。

对于数据文件的写入,操作系统会先写入至 PageCache 内,随后通过异步的方式由 pdflush 内核线程将 PageCache 内的数据刷盘至物理磁盘上。对于数据文件的读取,如果一次读取文件时出现未命中 PageCache 的情况,操作系统从物理磁盘上读取文件的同时,会顺序对其他相邻块的数据文件进行预读取。当对于文件的顺序读写时候,在 PageCache 机制的预读取作用下,此时读写性能接近于内存。


https://www.xamrdz.com/backend/3wf1938555.html

相关文章: