当前位置: 首页>后端>正文

java从rabbitmq指定的队列中获取消息 rabbitmq去重消息队列

Rabbitmq作为高并发的消息中间件,本文不在阐述基础概念。旨在分析与提供解决消息的丢失与重复的解决思路。


java从rabbitmq指定的队列中获取消息 rabbitmq去重消息队列,java从rabbitmq指定的队列中获取消息 rabbitmq去重消息队列_可持久化,第1张

RabbitMQ架构流程图



根据以上架构图,要明白消息走向的整个流程,生产者发送消息——》交换机(路由器)——》队列——》被消费者消费。以上几个流程任何环节出问题都会导致消息的丢失,下面详细分情况介绍。

一、消息丢失

首要前提是设置了队列,交换器都是可持久化的,更重要的是设置了消息的手动确认




java从rabbitmq指定的队列中获取消息 rabbitmq去重消息队列,java从rabbitmq指定的队列中获取消息 rabbitmq去重消息队列_主键_02,第2张


java从rabbitmq指定的队列中获取消息 rabbitmq去重消息队列,java从rabbitmq指定的队列中获取消息 rabbitmq去重消息队列_可持久化_03,第3张


通过以上两张源码截图可以发现队列和交换机默认都是可持久化的,如下设置手动消息确认


java从rabbitmq指定的队列中获取消息 rabbitmq去重消息队列,java从rabbitmq指定的队列中获取消息 rabbitmq去重消息队列_可持久化_04,第4张


1.发送者发送失败(网络原因等)

1.1 没有正常发送到exchange路由器(交换机)


java从rabbitmq指定的队列中获取消息 rabbitmq去重消息队列,java从rabbitmq指定的队列中获取消息 rabbitmq去重消息队列_任务调度_05,第5张


该方法ack参数为false时说明消息没有被正确发送到路由器,可将该消息保存本地消息表,使用任务调度器重新发送

1.2没有被正确的路由到队列


java从rabbitmq指定的队列中获取消息 rabbitmq去重消息队列,java从rabbitmq指定的队列中获取消息 rabbitmq去重消息队列_主键_06,第6张


进入该方法returnedMessage,则表明未被正确路由到消息队列,具体原因可以看replyCode和replyText,比如绑定规则有误等,可将该消息保存本地消息表,使用任务调度器重新发送。

至此发送端的消息丢失已经解决,我们在看下消费端的。

1.3消息在消费端处理业务时没有做异常处理就确认,从而使消息被移除队列导致消息丢失


java从rabbitmq指定的队列中获取消息 rabbitmq去重消息队列,java从rabbitmq指定的队列中获取消息 rabbitmq去重消息队列_可持久化_07,第7张


在此处分两种情况,一,消息被正确消费则basicAck,从队列删除消息。二,消息在处理业务时发生异常未被正确消费basicReject,不要重新入队列,而是本地消息表中记录,使用任务调度器重新发送。

二、消息去重

基于业务逻辑进行去重,不要依赖rabbitmq。在消费端应该有一个本地消息消费表记录已经被成功消费的消息信息,业务字段主键作为去重标准。

最后,有人会反思用了按照以上方案,用了rabbitmq还用本地消息表,降低了性能,在生产端我们消息失败表的记录,重新发送并且重新消费成功,就可以删除该条记录了,该表的记录不会太大,还有就是消费端的未正常消费的记录同理操作。去重表最好借助redis保存业务主键快速识别是否已经被消费,在判断是重复消息时一定要记得basicAck手工确认,避免再次发送。


https://www.xamrdz.com/backend/3yz1922051.html

相关文章: