当前位置: 首页>后端>正文

RocketMQ源码解析

RocketMQ基本模块

RocketMQ源码解析,第1张
a.png

先看下官方简单介绍:

  • NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步

  • Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave 的对应关系通过指定相同的BrokerName,不同的BrokerId 来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。 注意:当前RocketMQ版本在部署架构上支持一Master多Slave,但只有BrokerId=1的从服务器才会参与消息的读负载。

  • Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic 服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。

  • Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,消费者在向Master拉取消息时,Master服务器会根据拉取偏移量与最大偏移量的距离(判断是否读老消息,产生读I/O),以及从服务器是否可读等因素建议下一次是从Master还是Slave拉取。

NameServer

NameServer是一个无状态的节点 可以集群部署 节点之间无任何信息同步,那么NameServer又是干啥的?如何实现的?跟其他的服务有什么联系呢

NameServer作用

NameServer主要作用是用来消息生产者Producer 和 消息消费者Consumer 提供相关Topic路由信息 以及管理Broker节点 包括Broker的注册和删除 以及 监控Broker的状态

NameServer具体实现

通过源码中nameSrc模块我们简单分析 NameServer的实现
启动获取配置文件以及启动Netty服务这里就不做解释了 笔者主要分析下内部的Topic路由实现 以及 Broker的管理

首先我们查看一个RouteInfoManager类,大致结构如下

private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

topicQueueTable

Topic消息队列路由 key为Topic value为QueueData
QueueData 中包含 brokerName 和当前这个Broker中的读写队列数量 以及 tpicSyncFlag 同步标记

brokerAddrTable

key 为BrokerName value为BrokerData BrokerData 包含集成名称 broker名称 以及 broker主备列表 这里broker主备列表一般为 一主一备 当然也支持一主多备 BrokerId为0的为Master 大于0的为Slave

clusterAddrTable

key为集群名称 value为集群下所有broker的名称

brokerLiveTable

broker地址对应的状态信息,每次收到心跳包的时候会替换对应Broker的状态信息
brokerLiveInfo中 包含最近一次更新时间,version 以及 channel 和 haServerAddr

这里简单说明下haServerAddr,haServerAddr代表的是Master地址,第一次请求的时候为空值,是Slave向NameServer注册后返回的。

filterServerTable

Broker地址对应每一个Broker上的FilterServer列表 一般用来做消息过滤

NameServer通讯机制

[图片上传失败...(image-aa69cf-1666770168025)]

上图三台NameSrv和三台Broker, 在启动NameServer之后需要给 三台Broker都添加三台NameServer的配置地址,这个地址是需要写死的 在每一个Broker启动之后都会连接所有的NameServer,并且会给NameServer发送心跳包

Broker每30s会给NameServer发送一次心跳包,NameServer收到心跳包后会更新BrokerLiveTable中的数据

在NameSerer会每隔10s扫描一次BrokerLiveData 对应NamesrvController中代码如下

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

   @Override
   public void run() {
       NamesrvController.this.routeInfoManager.scanNotActiveBroker();
   }
}, 5, 10, TimeUnit.SECONDS);

当超过120s还未收到心跳包则删除对应的Broker路由信息

RocketMQ中的网络传输是基于Netty实现的,这里就不过展开Netty相关的东西了 后续笔者会整理Neety相关文章

NameServer之间不做通讯不需要连接 所以NameServer被称之为无状态的服务

心跳包处理机制

基于Neety接受数据后会通过DefaultRequestProcessor将对应消息序列化,最后到RouteInfoManager中的registerBroker方法。

该方法主要干了如下事情(整个过程是写锁ReentrantReadWriteLock):

  1. 将Broker加入对应集群列表中 clusterAddrTable
  2. 更新brokerAddrTable中的值 比如切换SlaveMaster
  3. 判断Topic是否发送改变或者Topic中队列是否发生改变(通过 brokerLiveTable中version判断) 发生改变更新最新路由数据到topicQueueTable
  4. 更新filterServerTable
  5. 如果当前Broker非master的节点,查找对应master节点,更新 haServerAddr值为master节点的地址

写到这里的时候就能发现 当Topic路由数据发送改变 NameServer并没有将改变的数据推送给对应连接的客户端, NameServer是通过客户端定时更新的方式更新路由信息,所以说这里的发生改变会有一定的延时性。

Broker

Broker 真正负责消息存储的服务,也就是每一个Broker就是一个MQ服务的实例,上面介绍了NameServer的时候说了 每一个Broker都会将自己注册到所有的NameServer中 并且每隔30s会给NameServer发送自己当前最新的数据.

先简单描述下Broker的集群方式,这里的集群分为二个维度

  • 第一个维度 保证单个Broker高可用,那么就是Broker的主备方式
  • 第二个维度 是整个Broker大集群的分布式消息 增加消息的吞吐量

[图片上传失败...(image-5ee1b4-1666770168026)]

这一块后面在做详细的介绍本文先简单

Broker中主要涉及到消息的存储机制,消费以及offset 和消息查询 这一块的东西比较多 我后面会单独拆分几篇文章进行分享 这里先简单介绍下后续再通过源码解释

主从复制策略

同步复制

  1. Producer将消息发送给broker的Master
  2. Broker Master 将消息同步复制给 Slave节点
  3. 返回生产者消息发送成功

异步复制

  1. Producer将消息发送给broker的Master
  2. 返回生产消息发送成功 异步将消息复制给Slave

优缺点:

同步复制 能保证整个消息在极端情况下也不会丢失但是会导致整个系统的吞吐量下降 发送消息TPS降低

异步复制 能提高整个系统的吞吐量,但是在极端的情况下会丢失消息

刷盘策略

RocketMQ 是将所有消息都存放到commitlog文件中 可以说RocketMQ是面向文件编程 所以写入磁盘的机制尤为重要

同步异步刷盘

同步刷盘,异步刷盘 通过submitFlushRequest函数实现区中间有一个isWaitStoreMsgOK

public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {
   // Synchronization flush
   if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
       final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
       if (messageExt.isWaitStoreMsgOK()) {
           GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
                   this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
           service.putRequest(request);
           return request.future();
       } else {
           service.wakeup();
           return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
       }
   }
   // Asynchronous flush
   else {
       if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
           flushCommitLogService.wakeup();
       } else  {
           commitLogService.wakeup();
       }
       return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
   }
}

同步刷盘 其实和同步复制优缺点差不多只是一个是外部IO 一个是磁盘IO ,就是生产者生产消息发送给Broker, Broker需要将对应数据存储到磁盘上,如果是同步刷盘 相当于每次都要等待数据真正写入磁盘才能返回成功

在RocketMQ中文件基本都是通过RandomAccessFile这个类去实现的,Buffer通过RandomAccessFile的channel中map方法得到MappedByteBuffer

this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);

关于RandomAccessFile零拷贝这一块本文不做描述,简单来说零拷贝就是在内存中没有复制操作, 我们简单描述下pagecachemap使用的系统内核中的mmap方法.

在这里我们需要明确一个概念,向文件磁盘IO操作全都是我们的系统内核去完成的,调用mmap就是内存到文件的一个映射 这一块的实现全部由操作系统完成。在linux中是将文件映射到内存中的pagecache中,每一个pagecache默认为4K 也就是我们写入buffer数据 不代表真的写入到磁盘了,这个时候的写入都是在pagecache中,只是当前的pagecache接收到数据写入后会变成脏页,脏页数据也不是立即flush到磁盘中的,但是Rocketmq并不是依赖系统自动刷写pagecache的 而是通过 GroupCommitService去做的后台10ms刷盘操作

关于脏页数据刷入到磁盘在kernel内核中有一段配置

vm.dirty_background_bytes = 0
vm.dirty_background_ratio = 10
vm.dirty_bytes = 0
vm.dirty_expire_centisecs = 3000
vm.dirty_ratio = 20
vm.dirty_writeback_centisecs = 500
vm.dirtytime_expire_seconds = 43200

dirty_backgound_ratio=xx
后台脏页阈值去回写到磁盘 也就是后台flush
xx代表 如果10G内存 就是10* xx / 100.0 后台回写阈值比例 , 通过内核的后台线程去刷写到磁盘,刷写完成后 会将PageCache 标记为非脏状态

drity_ratio 也就是同步刷写的意思 这个同步刷写 是指的刷写服务 一般将内存数据写入磁盘都是通过协处理器(DMA)实现的

在内核中如何强制将pagecache数据回写到磁盘呢其实就是flush操作 对应fileChannel.force或者mappedByteBuffer.force强制刷写到磁盘中

理解了Pagecache 我们其实就能理解同步刷盘策略和异步刷盘策略了, 同步刷盘相当于每次都需要将pagecache中的数据flush到磁盘中,异步刷盘就是在写入数据的时候将数据追加到mappedByteBuffer

异步刷盘

if (currentPos < this.fileSize) {
   ByteBuffer byteBuffer = writeBuffer != null writeBuffer.slice() : this.mappedByteBuffer.slice();
   byteBuffer.position(currentPos);
   AppendMessageResult result;
   //将数据在追加到buffer中
   if (messageExt instanceof MessageExtBrokerInner) {
       result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
               (MessageExtBrokerInner) messageExt, putMessageContext);
   } else if (messageExt instanceof MessageExtBatch) {
       result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
               (MessageExtBatch) messageExt, putMessageContext);
   } else {
       return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
   }
   this.wrotePosition.addAndGet(result.getWroteBytes());
   this.storeTimestamp = result.getStoreTimestamp();
   return result;
}

优劣势其实很明显了:

同步刷盘 数据不会丢失 但是非常影响吞吐量 IO刷写成为瓶颈 无法使用内存缓冲 性能低

异步刷盘 吞吐量高,在极端情况下可能出现数据丢失,比如宕机的情况 数据在pagecache中还未flush到磁盘

GroupCommitService 会每隔10ms做一次flush操作 当有GroupCommitRequest的时候就会执行对应的doCommit,当异步刷盘的时候会先返回对应客户端成功,然后GroupCommitService在会触发刷盘操作,所以异步刷盘丢失的数据最多也就是毫秒级别.

public void run() {
    CommitLog.log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        try {
            this.waitForRunning(10);
            this.doCommit();
        } catch (Exception e) {
            CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
        }
    }

    // Under normal circumstances shutdown, wait for the arrival of the
    // request, and then flush
    try {
        Thread.sleep(10);
    } catch (InterruptedException e) {
        CommitLog.log.warn("GroupCommitService Exception, ", e);
    }

    synchronized (this) {
        this.swapRequests();
    }

    this.doCommit();

    CommitLog.log.info(this.getServiceName() + " service end");
}

Broker的集群方式

单Master节点

单节点也就是 起一个master节点 没有salve 没有分布式队列 所有消息发送都到这个节点,一旦节点挂了整个mq就不可以使用了 这个没人会在生产环境使用 一般我们自己测试是这种方式

多Master节点

多Master节点 就是有分布式队列 但是每一个broker没有第一维度的高可用,也就是当一个master节点挂了会导致这个master的节点上的数据无法被消费,这个也没人会在生产环境上使用 毕竟我们生产环境都是需要高可用,部分消息消费不到了也是无法接受的

多Master多Slave节点

这个其实就是我们文章开始的 rocketmq给的架构图了 当然master 也可以配置多个Slave节点

异步复制

即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,同时Master宕机后,消费者仍然可以从Slave消费,而且此过程对应用透明,不需要人工干预,性能同多Master模式几乎一样

即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,同时Master宕机后,消费者仍然可以从Slave消费,而且此过程对应用透明,不需要人工干预,性能同多Master模式几乎一样

优点: 即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,同时Master宕机后,消费者仍然可以从Slave消费,而且此过程对应用透明,不需要人工干预,性能同多Master模式几乎一样

缺点: Master宕机,磁盘损坏情况下会丢失少量消息

同步双写

同步双写在4.7.0版本之前是一个完全串行的执行流程,在这个版本之前 使用同步双写的时候 TPS会大大的降低,所以Rocketmq社区在4.7.0版本的时候 对同步双写的机制做了优化,最后的实现的TPS和异步复制很接近。

在4.7.0版本之前 同步双写主要步骤

1.Producer发送消息到Broker
Broker接受消息处理存储到Commitlog
提交GroupCommitRequest将消息提交给HAService 复制给其他Slave
等待HAService复制结果
通知客户端发送成功

在这种阻塞模式下,broker中SendMessageProcessor线程需要等待消息被同步到Slave才能处理新的请求,SendMessageProcessor处理消息的线程就会被堵塞直到 HAService同步成功后才能处理新的请求,导致CPU处理闲置状态 TPS上不去.

所以Rocket社区在4.7.0针对同步双写模式做了优化使用异步编程模型CompletableFuture的方式

SendMessageProcessor线程不再等待消息复制到Slave节点后再处理新请求,而是提前生成CompletableFuture并返回,并马上处理新的请求不堵塞Processor线程,HAService中的线程在Slave的复制位点超过消息的写入位点后去完成对应的CompletableFuture,并通知remoting模块响应客户端。通过上述方式我们对消息的写入过程进行切分并将其流水线化,减少了线程等待,提高性能。

SendMessageProcessor#asyncProcessRequest的方法接受到新的发送消息请求包装成CompletableFuture的方式 具体代码如下

asyncProcessRequest(ctx, request)
        .thenAcceptAsync(responseCallback::callback,
                this.brokerController.getSendMessageExecutor());

CommitLog#asyncPutMessage中处理消息 可以通过下面代码看到包装成了submitFlushRequestsubmitReplicaRequest两个CompletableFuture最后使用Combine合并2个Future

....
// 存盘CompletableFuture
CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
// 复制副本CompletableFuture
CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);
return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
    if (flushStatus != PutMessageStatus.PUT_OK) {
        putMessageResult.setPutMessageStatus(flushStatus);
    }
    if (replicaStatus != PutMessageStatus.PUT_OK) {
        putMessageResult.setPutMessageStatus(replicaStatus);
        ....
    }
    return putMessageResult;
});

submitFlushRequest中通过GroupCommitRequest包装请求传递到GroupCommitService队列中

GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
        this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
service.putRequest(request);
return request.future();

submitReplicaRequest中通过GroupCommitRequest包装请求传递到 HAService

整个流程基本上为以下步骤

1.SendMessageProcessor将数据解析传递给CommitLog中
2.CommitLog在完成初始校验以及写入buffer之后将同步刷盘请求传递给flushCommitLogService返回CompletableFuture
3.包装GroupCommitRequest传递给HAService处理返回 返回CompletableFuture
4.合并 flush和HAService的CompletableFuture 返回给上一级的SendMessageProcessor
5.SendMessageProcessor 接受到执行结果后 切换到SendMessageExecutor线程返回给用户

整体来说就是充分的利用了CompletableFuture将执行的流程添加到Pipeline

下图就是Rocket官方给出的压测下 Pipelinewait以前同步双写的 以及 async的三者的TPC以及发送消息的耗时

[图片上传失败...(image-82082c-1666770168026)]

[图片上传失败...(image-925b00-1666770168026)]

DLedger

在Rocketmq4.5之前 我们一般使用的都是Broker对应一个Slave的方式去做Broker的高可用,但是这样 会议个比较明显的缺点 就是当Master挂了自后Salve节点不能自动切换为Master节点 还需要人为去手动切换.
所以在4.5版本之后RocketMQ基于raft协议实现了 主从自动切换 也就是Dledger.
为什么要采用raft协议去实现自动切换容灾实现?
一般有两种方式实现自动选主功能:

  1. 使用zookeeper 或者 etcd 第三方服务来实现自动选举,像zookeeper又需要搭建一套zookeeper集群部署方式就比较复杂了
  2. 自己实现选主协议

而RocketMQ选择自己实现raft协议来实现主从自动切换 关于raft协议感兴趣的可以自己去了解下 关于DLedger集群模式具体分析 笔者会单独写一篇文章进行介绍 包含对于raft协议的一些改进以及复制异步模型,网络分区的一些优化,可靠性的优化

[图片上传失败...(image-50d40-1666770168026)]

Deleger的配置

//开启Dleger
enableDLegerCommitLog=true
//这个跟BorkerName保持一致
dLegerGroup=RaftNode00
//包含当前DLeger组中所有节点 id-ip:port; xxx
dLegerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
//当前节点的id 在一个DLeger组中必须唯一
dLegerSelfId=n0
//发送线程个数,建议配置成 Cpu 核数
sendMessageThreadPoolNums=16
name 含义 举例
enableDLegerCommitLog 是否启动 DLedger true
dLegerGroup DLedger Raft Group的名字,建议和 brokerName 保持一致 RaftNode00
dLegerPeers DLedger Group 内各节点的端口信息,同一个 Group 内的各个节点配置必须要保证一致 n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
dLegerSelfId 节点 id, 必须属于 dLegerPeers 中的一个;同 Group 内各个节点要唯一 n0
sendMessageThreadPoolNums 发送线程个数,建议配置成 Cpu 核数 16

Producer

消息生产者,也就是我们平常服务使用client 发送消息到broker的一个过程

消息发送的流程

1.验证消息

发送消息的时候会通过一次消息长度验证 最大长度是不能超过4M的 最小不能小余等于0

public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer)
 throws MQClientException {
 if (null == msg) {
     throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
 }
 // topic
 Validators.checkTopic(msg.getTopic());
 Validators.isNotAllowedSendTopic(msg.getTopic());

 // body
 if (null == msg.getBody()) {
     throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
 }

 if (0 == msg.getBody().length) {
     throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
 }

 if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
     throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
         "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
 }
}

2.查找路由信息

tryToFindTopicPublishInfo查找路由信息

通过topicPublishInfoTable一个map结构的 topic为key value为TopicPublishInfo 包含当前Topic下所有的queue列表以及TopicRouteData数据

TopicPublishInfo

//是否为顺序消息
private boolean orderTopic = false;
//是否有topic 路由信息
private boolean haveTopicRouterInfo = false;
// 当前topic下所有队列信息
private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
// 记录选择队列编号 每一次选择消息队列的时候 都会自增 用于选择消息队列的一些算法记录标识
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
//路由信息
private TopicRouteData topicRouteData;

TopicRouteData

// 顺序消息配置信息
private String orderTopicConf;
// 队列的一些配置数据 比如读写队列数 权限等
private List<QueueData> queueDatas;
// broker的信息 集群名称 地址等
private List<BrokerData> brokerDatas;
// broker对应的一些过滤信息
private HashMap<String, List<String>> filterServerTable;

tryToFindTopicPublishInfo
尝试查找topic的路由信息 如果路由信息不存在会尝试去NameServer拉取对应的路由信息

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
    TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
    if (null == topicPublishInfo || !topicPublishInfo.ok()) {
        this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
    }

    if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
        return topicPublishInfo;
    } else {
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
        return topicPublishInfo;
    }
}

3.选择队列

MQFaultStrategy#selectOneMessageQueue 通过选择策略选择需要发送的队列

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    if (this.sendLatencyFaultEnable) {
        try {
            int index = tpInfo.getSendWhichQueue().incrementAndGet();
            for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                //取模轮询的方式
                int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                //队列可用直接返回
                if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
                    return mq;
            }

            //从容错中找到选择一个随机选择一个broker
            final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
            //从路由信息中找到broker对应的queue
            int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
            if (writeQueueNums > 0) {
                //随机选择一个队列
                final MessageQueue mq = tpInfo.selectOneMessageQueue();
                if (notBestBroker != null) {
                   //设置队列的broker为容错的broker
                    mq.setBrokerName(notBestBroker);
                    //修改队列id
                    mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
                   
                }
                return mq;
            } else {
                // 如果队列不可用删除选择的broker
                latencyFaultTolerance.remove(notBestBroker);
            }
        } catch (Exception e) {
            log.error("Error occurred when selecting message queue", e);
        }

        return tpInfo.selectOneMessageQueue();
    }

    return tpInfo.selectOneMessageQueue(lastBrokerName);
}

先解释下latencyFaultTolerance是什么:

latencyFaultTolerance 是对之前失败的,按一定的时间做规避。比如,如果上次请求的latency超过550ms,就修改规避时间为3000ms 也就在这3000ms内默认认为这个broker不可用;超过1000ms,就退避60000ms;如果关闭,采用随机递增取模的方式选择一个队列(MessageQueue)来发送消息,latencyFaultTolerance机制主要是为了隔离之前发送失败的broker队列 选择其他的可以使用的队列进行发送, 隔离时间过后 有可能再次重试之前隔离的队列,提高了发送成功的几率提高了发送消息的成功率.

在消息发送完成之后 无论是否失败都会更新FaultItem

public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
    if (this.sendLatencyFaultEnable) {
        long duration = computeNotAvailableDuration(isolation 30000 : currentLatency);
        this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
    }
}

currentLatency代表发送延迟,isolation表示是否规避隔离 在发送消息失败的时候 isolation为true
成功isolation则为false(这里的成功表示服务能通没有出现网络连接的异常) ,computeNotAvailableDuration 计算了隔离时间 传递给latencyFaultTolerance,这样我们可以认为latencyFaultTolerance是每次发送消息后 本地缓存的一个broker的状态以及broker需要隔离的时间(其实是因为发送失败导致的隔离).

isAvailable判断方式就是查看当前broker是不是在隔离期

public boolean isAvailable() {
    return (System.currentTimeMillis() - startTimestamp) >= 0;
}

如果开启了sendLatencyFaultEnable步骤为

1 根据对消息队列进行轮询获取一个消息队列
2 验证该消息队列是否在隔离期, latencyFaultTolerance.isAvailable(mq.getBrokerName())
3 如果返回的 MessageQueue可用直接返回
4 如果所有的队列都在隔离器 那么随机选择一个队列 进行发送

如果没有开启sendLatencyFaultEnable 那么就是走selectOneMessageQueue的逻辑

public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
    if (lastBrokerName == null) {
        return selectOneMessageQueue();
    } else {
        for (int i = 0; i < this.messageQueueList.size(); i++) {
            int index = this.sendWhichQueue.incrementAndGet();
            int pos = Math.abs(index) % this.messageQueueList.size();
            if (pos < 0)
                pos = 0;
            MessageQueue mq = this.messageQueueList.get(pos);
            if (!mq.getBrokerName().equals(lastBrokerName)) {
                return mq;
            }
        }
        return selectOneMessageQueue();
    }
}

步骤为:

随机选择一个MessageQueue
如果发送失败了 重试的时候随机选择 规避掉上一次选择的Broker

还可以通过自己定义MessageQueueSelector自定义消息的队列选择 这一块我们可以根据自己定义我们业务中所需要的一些消息选择策略

public interface MessageQueueSelector {
    MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}

4发送消息

Rocket发送消息支持三种方式 sync同步、async异步、oneway单向

同步: 发送者向MQ执行发送消息,同步等待broker返回发送成功通知
异步: 发送者向MQ发送消息指定发送结构回调函数,不阻塞发送线程 发送状态通过回调函数通知
单向: 发送消息给MQ后直接返回,不接受broker处理消息后的状态 简单来说就是只管发送就完了

可以通过以下代码看到
oneway调用后直接返回null 也就是没有任何发送状态返回 只管发送

ASYNC异步发送代码sendMessageAsync调用后也是直接返回null 但是有一个sendCallback回调触发

SYNC同步发送sendMessageSync发送直接阻塞返回 result

switch (communicationMode) {
    case ONEWAY:
        this.remotingClient.invokeOneway(addr, request, timeoutMillis);
        return null;
    case ASYNC:
        final AtomicInteger times = new AtomicInteger();
        long costTimeAsync = System.currentTimeMillis() - beginStartTime;
        if (timeoutMillis < costTimeAsync) {
            throw new RemotingTooMuchRequestException("sendMessage call timeout");
        }
        this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
            retryTimesWhenSendFailed, times, context, producer);
        return null;
    case SYNC:
        long costTimeSync = System.currentTimeMillis() - beginStartTime;
        if (timeoutMillis < costTimeSync) {
            throw new RemotingTooMuchRequestException("sendMessage call timeout");
        }
        return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
    default:
        assert false;
        break;
}

具体消息发送的逻辑 因为涉及到批量发送以及消息的数据结构 以及序列化方式 笔者后面有时间会单独文章仔细说明 这里主要是说明发送消息的几个重要的流程

Consumer

Consumer 涉及的东西会比较多 这里只是大概介绍下Consumer的几种方式 后续单独一篇文章再去说明整个Consumer的源码解析

Consumer 消息消费者在Rocketmq Consumer都是以ConsumerGroup的方式存在

在Rocketmq中 一个queue只能对应一个Consumer,Rocketmq没有按照任何的消息中间件的协议去实现,所以这一点和其他的mq不一样 比如我们在Rabbitmq中一个queue可以有多个Consumer

消费者组

同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的Topic。RocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。

消费模式

消费模式主要分为两种 Pull和Push消费方式

Pull

Consumer消费的一种类型,应用通常主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程

Push

Consumer消费的一种类型,该模式下Broker收到数据后会主动推送给消费端,该消费模式一般实时性较高。

具体Consumer我这边会单独写一篇文章跟消息的几种类型一起说明


https://www.xamrdz.com/backend/3tw1945766.html

相关文章: