当前位置: 首页>后端>正文

RabbitMQ丢消息的解决方案

3种丢消息的场景

  • 发送消息到交换机或队列时,丢消息(设置2个回调)
  • 消息到MQ软件,MQ因宕机而要重启,丢消息(交换机、队列、消息的durable属性,要设置为持久化,Spring的RabbitTemplate默认就是将这3个都持久化的,一般不需要去改)
  • 消费者没有正常消费消息,丢消息(默认消费方是阅后即焚的,所以消息从队列出队给消费方后,队列中就没有这个消息了,消费方没有正常去消费,消息就丢失了)

发送消息到交换机或队列时,丢消息

  • 开启发送消息后的回调配置
spring:
  rabbitmq:
    publisher-confirm-type: correlated
    publisher-returns: true
    template:
      mandatory: true
  • 设置RabbitTemplate的2个回调

  • setConfirmCallback,消息发送给交换机后回调,成功时ack参数为true,失败则为false

  • setReturnCallback,消息从交换机投递给队列失败时,才会回调,所以要在这个回调时,记录日志

@Slf4j
@Configuration
// 实现ApplicationContextAware接口,可以从已有的spring上下文取得已实例化的bean
public class CommonConfig implements ApplicationContextAware {
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 获取RabbitTemplate实例
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        
        // 设置confirm callback,投递消息到交换机成功或失败,都会回调此方法
        // 注:如果投递成功,方法的ack参数为true,失败则为false
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("消息投递完成,ack = {}, cause = {}", ack, cause);
            }
        });
        
        // 设置return callback,从交换机投递到队列失败时,才会回调该方法
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                // 记录日志
                log.info("消息投递失败,replyCode = {},replyText = {},exchange = {},routingKey = {},message = {}", replyCode, replyText, exchange, routingKey, message.toString());
            }
        });
    }
}

MQ软件重启,丢消息

  • 在Java代码中,创建交换机、队列时,就会设置为持久化,属性名为durable,在RabbitMQ的后台中看,有一个大写的D,就是设置了持久化的了,一般我们都会设置为持久化,保证MQ重启不丢消息

消费者没有正常消费消息,丢消息

默认消费者收到消息后,MQ就会将消息从队列中删除,也就是阅后即焚,我们需要设置MQ的确认模式,一般我们可以设置为auto自动或manual手动,以下以手动为例

手动模式

  • 在消费者的application.yml文件中,配置以下内容
spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual # 确认模式有3种,manual、auto、none
  • 确认模式
    • manual:手动ack,需要在业务代码结束后,调用api发送ack。
    • auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
    • none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除
    • 即,none是失败后什么都不处理,auto是类似事务机制,出现异常时返回nack,消息回滚到MQ,没有异常则返回ack,消息才从MQ中删除。manual是手动自己判断业务是否正常执行,成功则手动返回ack
@Component
@Slf4j
public class SpringRabbitListener {
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueue(Message msg, Channel channel) throws Exception {
        System.out.println("消费者接收到simple.queue的消息:【" + msg + "】");
        // 模拟异常
        System.out.println(1 / 0);
        
        // 业务执行正常,才回复ack
        // 参数一:deliveryTag,也就是消息的标识,从msg中获取
        // 参数二:multiple,如果MQ是集群,true则是需要通知集群中的所有MQ
        channel.basicAck(msg.getMessageProperties().getDeliveryTag(), true);
        
        log.debug("消息处理完成!");
    }
}

自动模式

  • 将消费者的确认模式,修改为auto
spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto # 确定模式:auto,为自动ack
  • 重新运行消费者,会发现消费者消费出现异常,然后将消息归还给MQ,然后消费者的Listener监听队列又有消息,又从队列中拿出来消息,导致出现的无限死循环!!

重试次数

  • 因此不能无限重试,我们应该限制重试的次数,以及重试完毕后的失败策略(例如重试了3、5次后,还是失败,则将消息投递到一个特定的错误消息交换机,然后再投递到错误消息队列)

  • 配置spring的retry机制,当消费者消费出现异常时,进行本地重试,而不是无限制的requeue重新入队到MQ队列中,其中enabled属性为开启失败重试,max-attempts为最大重试次数

  • 本地重试:也就是消息消费过程中,出现异常,不会将消息requeue到队列,而是在消费者本地进行重试,就不会出现频繁requeue,给MQ造成不必要的压力

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000 # 初始的失败等待时长为1秒 2  4  8  16  32
          multiplier: 2 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
  • 最后,重试达到最大次数后,Spring会返回ack,消息会被丢弃,所以我们还需要配置失败策略

失败策略

  • 默认,重试达到最大重试次数后,消息会丢失,这个是Spring的内部机制决定的,默认的失败处理策略是丢弃消息,我们可以配置策略的实现,策略接口为MessageRecoverer,有3种策略实现,分别是:

    • RejectAndDontRequeueRecoverer,到达最大重试次数后,直接reject,丢弃消息,默认就是这种
    • ImmediateRequeueMessageRecoverer,到达最大重试次数后,返回nack,消息重新requeue入队
    • RepublishMessageRecoverer,到达最大重试次数后,将失败消息投递到指定的交换机
  • 比较优雅的方式是选用RepublishMessageRecoverer,例如使用这种策略方式,当到达到最大重试次数后,将消息投递到一个错误消息交换机,然后交换机再投递到一个专门存放错误消息的错误队列,后续人工再集中处理

  • 在消费方,定义处理错误的交换机和队列

// 错误消息交换机
@Bean
public DirectExchange errorMessageExchange(){
    return new DirectExchange("error.direct");
}

// 错误消息队列
@Bean
public Queue errorQueue(){
    return new Queue("error.queue", true);
}

// 绑定错误交换机和错误消息队列
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
    return BindingBuilder
    // 队列
    .bind(errorQueue)
    // 交换机
    .to(errorMessageExchange)
    // 设置routingKey
    .with("error");
}
  • 定义失败策略,指定错误消息交换机和routingKey
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
    return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
  • 此时,再次启动生产者,发送消息,消费者重试3次失败后,执行失败策略,将消息投递到了error错误队列,在RabiitMQ的控制台中,点击get message按钮,就能获取到消息内容,以及错误消息的堆栈

完整代码

@Configuration
public class ErrorMessageConfig {
    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }
    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue", true);
    }
    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
        return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
    }

    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}

https://www.xamrdz.com/backend/3cj1945130.html

相关文章: