3种丢消息的场景
- 发送消息到交换机或队列时,丢消息(设置2个回调)
- 消息到MQ软件,MQ因宕机而要重启,丢消息(交换机、队列、消息的durable属性,要设置为持久化,Spring的RabbitTemplate默认就是将这3个都持久化的,一般不需要去改)
- 消费者没有正常消费消息,丢消息(默认消费方是阅后即焚的,所以消息从队列出队给消费方后,队列中就没有这个消息了,消费方没有正常去消费,消息就丢失了)
发送消息到交换机或队列时,丢消息
- 开启发送消息后的回调配置
spring:
rabbitmq:
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true
设置RabbitTemplate的2个回调
setConfirmCallback,消息发送给交换机后回调,成功时ack参数为true,失败则为false
setReturnCallback,消息从交换机投递给队列失败时,才会回调,所以要在这个回调时,记录日志
@Slf4j
@Configuration
// 实现ApplicationContextAware接口,可以从已有的spring上下文取得已实例化的bean
public class CommonConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 获取RabbitTemplate实例
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 设置confirm callback,投递消息到交换机成功或失败,都会回调此方法
// 注:如果投递成功,方法的ack参数为true,失败则为false
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("消息投递完成,ack = {}, cause = {}", ack, cause);
}
});
// 设置return callback,从交换机投递到队列失败时,才会回调该方法
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
// 记录日志
log.info("消息投递失败,replyCode = {},replyText = {},exchange = {},routingKey = {},message = {}", replyCode, replyText, exchange, routingKey, message.toString());
}
});
}
}
MQ软件重启,丢消息
- 在Java代码中,创建交换机、队列时,就会设置为持久化,属性名为
durable
,在RabbitMQ的后台中看,有一个大写的D
,就是设置了持久化的了,一般我们都会设置为持久化,保证MQ重启不丢消息
消费者没有正常消费消息,丢消息
默认消费者收到消息后,MQ就会将消息从队列中删除,也就是阅后即焚,我们需要设置MQ的确认模式,一般我们可以设置为auto
自动或manual
手动,以下以手动为例
手动模式
- 在消费者的
application.yml
文件中,配置以下内容
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual # 确认模式有3种,manual、auto、none
- 确认模式
- manual:手动ack,需要在业务代码结束后,调用api发送ack。
- auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
- none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除
- 即,none是失败后什么都不处理,auto是类似事务机制,出现异常时返回nack,消息回滚到MQ,没有异常则返回ack,消息才从MQ中删除。manual是手动自己判断业务是否正常执行,成功则手动返回ack
@Component
@Slf4j
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(Message msg, Channel channel) throws Exception {
System.out.println("消费者接收到simple.queue的消息:【" + msg + "】");
// 模拟异常
System.out.println(1 / 0);
// 业务执行正常,才回复ack
// 参数一:deliveryTag,也就是消息的标识,从msg中获取
// 参数二:multiple,如果MQ是集群,true则是需要通知集群中的所有MQ
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), true);
log.debug("消息处理完成!");
}
}
自动模式
- 将消费者的确认模式,修改为auto
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto # 确定模式:auto,为自动ack
- 重新运行消费者,会发现消费者消费出现异常,然后将消息归还给MQ,然后消费者的Listener监听队列又有消息,又从队列中拿出来消息,导致出现的无限死循环!!
重试次数
因此不能无限重试,我们应该限制重试的次数,以及重试完毕后的失败策略(例如重试了3、5次后,还是失败,则将消息投递到一个特定的错误消息交换机,然后再投递到错误消息队列)
配置spring的retry机制,当消费者消费出现异常时,进行本地重试,而不是无限制的
requeue
重新入队到MQ队列中,其中enabled
属性为开启失败重试,max-attempts
为最大重试次数本地重试:也就是消息消费过程中,出现异常,不会将消息
requeue
到队列,而是在消费者本地进行重试,就不会出现频繁requeue
,给MQ造成不必要的压力
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000 # 初始的失败等待时长为1秒 2 4 8 16 32
multiplier: 2 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
- 最后,重试达到最大次数后,Spring会返回ack,消息会被丢弃,所以我们还需要配置失败策略
失败策略
-
默认,重试达到最大重试次数后,消息会丢失,这个是Spring的内部机制决定的,默认的失败处理策略是丢弃消息,我们可以配置策略的实现,策略接口为
MessageRecoverer
,有3种策略实现,分别是:- RejectAndDontRequeueRecoverer,到达最大重试次数后,直接reject,丢弃消息,默认就是这种
- ImmediateRequeueMessageRecoverer,到达最大重试次数后,返回nack,消息重新
requeue
入队 - RepublishMessageRecoverer,到达最大重试次数后,将失败消息投递到指定的交换机
比较优雅的方式是选用
RepublishMessageRecoverer
,例如使用这种策略方式,当到达到最大重试次数后,将消息投递到一个错误消息交换机,然后交换机再投递到一个专门存放错误消息的错误队列,后续人工再集中处理在消费方,定义处理错误的交换机和队列
// 错误消息交换机
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
// 错误消息队列
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true);
}
// 绑定错误交换机和错误消息队列
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
return BindingBuilder
// 队列
.bind(errorQueue)
// 交换机
.to(errorMessageExchange)
// 设置routingKey
.with("error");
}
- 定义失败策略,指定错误消息交换机和routingKey
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
- 此时,再次启动生产者,发送消息,消费者重试3次失败后,执行失败策略,将消息投递到了error错误队列,在RabiitMQ的控制台中,点击
get message
按钮,就能获取到消息内容,以及错误消息的堆栈
完整代码
@Configuration
public class ErrorMessageConfig {
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
}