当前位置: 首页>编程语言>正文

spring rocketmq 多个tag消费 rocketmq 并发消费

1 拉消息流程回顾

学习消息是如何被消费的原理之前,我们去回顾之前拉取消息的流程。
首先,消费者实现类DefaultMQPushConsumerImpl里面有Rebalance对象,其触发的时机是在客户端实例里面的RebalanceService服务,RebalanceService有自己的线程资源,每二十秒执行一次doRebalance方法,该方法会调用RebalanceImpl的doRebalance方法,根据消费者所订阅的主题去计算出来分配给当前消费者的队列,并为该队列创建ProcessQueue(Broker端的队列在消费者端的快照,后文简称PQ),再为PQ创建PullRequest对象,将PullRequest交给PullMessageService,该服务里面的run方法基于PullRequestQueue作take操作,然后调用该消费者的pullMessage方法,传递PullRequest对象。pullMessage方法会创建PullCallback(最终处理服务器端返回的消息结果的),然后发起网络请求到broker,broker会根据请求参数查询消息封装到response对象,客户端接收到消息之后就将消息转成PullResultExt对象,将该对象交给PullCallback,PullCallback拿到该对象之后执行PullAPIWrapper.processPullresult预处理方法(该方法的流程:会根据PullResultExt里保存的suggestWhichBrokerId推荐主机id到本地的推荐映射表,将PullResultExt里的二进制数组messageBinary解析出来为msg的List,然后消费者本地的过滤,然后保存到msgFoundList)。预处理之后会根据pullStatus进行相对应的处理(见前文),pullStatus==FOUND为成功拉取到消息,然后会更新PullRequest对象的nextOffset(下次拉消息的offset),将拉取到的消息msg保存到PQ快照中,然后将msg、PQ和messageQueue一起提交到ConsumerMessageService消费服务,消费服务会提交到线程池进行消息的消费。
本篇文章内容主要针对消费服务如何消费消息展开学习,消息消费服务支持并发消费和顺序消费,首先学习并发消息消费。

2 并发消息消费

如下图所示。

  • 消费者门面:充当的是配置的角色,在消费服务里面作为限制条件;
  • 消息监听器:消息的处理代码都是写在消息监听器里,需要在消费者门面里注册,最核心的方法就是consumerMessage;
  • 消费任务线程池的大小由消费者门面管理,默认情况最大为20;
  • 调度线程池主要在内部使用,比如在需要延迟任务逻辑,需要在调度线程池里调度到消费任务线程池;
  • 清理过期任务调度线程池:会清理的任务有:PQ里treeMap存储的有没有超过15min还没被消费的过期消息,回退到服务器(先不纠结这个逻辑)

spring rocketmq 多个tag消费 rocketmq 并发消费,spring rocketmq 多个tag消费 rocketmq 并发消费_1024程序员节,第1张

submitConsumerRequest方法的逻辑如下:msg长度最大值为32。

spring rocketmq 多个tag消费 rocketmq 并发消费,spring rocketmq 多个tag消费 rocketmq 并发消费_学习_02,第2张

提交消费任务后会执行run方法,如下图所示:

  • 为何要保留PQ呢?因为处理完消息后需要将该消息从PQ中移除;
  • 默认请情况下消费任务需要消费的消息List只有一条消息;
  • messageListener就是在消费者门面注册的消息监听器;
  • 消息失效为什么需要控制延迟?因为如果不控制延迟,消息失效后立刻又拿到消息,又进行消费大概率还是会失效,这样减轻了服务的压力。broker端控制的延迟级别原理为:每失败一次延迟级别+1,级别越高延迟时间越长;
  • 消费上下文的作用主要在于可以控制延迟级别;

spring rocketmq 多个tag消费 rocketmq 并发消费,spring rocketmq 多个tag消费 rocketmq 并发消费_学习_03,第3张

最后处理消费结果,逻辑如下图:

该方法的传参为:

  1. ConsumeConcurrentlyStatus status 消费状态
  2. ConsumeConcurrentlyContext context 消费上下文
  3. ConsumeRequest consumeRequest 消费任务对象
    对于回退失败的消息并不能保证百分百回退成功(因为是网络调用),因此需要有回退失败的逻辑,使用调度线程池scheduleExecutorService,延迟五秒执行
    检查本地缓存中该mq的进度是否比storeOffset大,如果大则不更新

回退的逻辑如下图:

在broker中回退后并不是继续将原来的消息进行重试,而是生成新的消息,因此保存了原始消息的msgId和原始消息的主题,然后放到重试主题里面(重试主题为%RETRY%+consumerGroup)

spring rocketmq 多个tag消费 rocketmq 并发消费,spring rocketmq 多个tag消费 rocketmq 并发消费_学习_04,第4张



https://www.xamrdz.com/lan/5nf1959783.html

相关文章: