前言、最近在生产上遇到一些消息队列的问题,翻阅了很多资料文档发现有不少的都是错误的,于是把自己从实践中得到的一些分享、记录在此;
一、MQ的应答模式
- acknowledge-mode: none(无应答模式)
在这种模式下,不管消费者异常消费,还是正常消费,MQ服务器中的队列都会自动删除已消费的消息 - acknowledge-mode: auto(自动应答模式)
当mq的应答模式配置为auto,或者没有进行配置时,系统会默认为自动应答模式。在这种情况下,只要我们的消费者,在消费消息的时候没有抛出异常,那服务端MQ会认为,消息消费正常,删除队列中的消息;如果消费过程中,抛出了异常,消息会进行自动补偿,重会队列头部,再次被拉到消费者的缓冲区(prefetch count),进行重复消费。此时如果缓冲区的大小设置为1,那么整个队列就会被阻塞,unacked也会显示为1(单线程消费的情况下)。 - acknowledge-mode: manual(手动应答模式)
当设置为应手动应答时,我们需要在消费消息的时候手动告诉MQ我们消费的情况,否者MQ会一直等待消费端的消息,如果一直没有应答,当消费数量达到缓冲区大小(prefetch count)后,队列会全部阻塞。 - channel.basicReject(deliveryTag, true)-------------拒绝deliveryTag(一个消费线程对应的一个消费ID,是自增的LONG类型)对应的消息,第二个参数是否requeue,true则重新入队列,否则丢弃或者进入死信队列。该方法reject后,该消费者还是会消费到该条被reject的消息。
- channel.basicNack(deliveryTag, false, true)---------------不确认deliveryTag对应的消息,第二个参数是否应用于多消息,第三个参数是否requeue
与basic.reject区别就是同时支持多个消息,可以nack该消费者先前接收未ack的所有消息。nack后的消息也会被自己消费到。 - channel.basicRecover(true)------------是否恢复消息到队列,参数是是否requeue,true则重新入队列,并且尽可能的将之前recover的消息投递给其他消费者消费,而不是自己再次消费。false则消息会重新被投递给自己。
spring:
rabbitmq:
host: 121.40.65.123
port: 5672
username: rabbit
password: rabbit
listener:
type: simple
simple:
prefetch: 3 #缓冲区大小,这是对于每个消费线程来说的,默认的值是250
concurrency: 2 #消费的线程数,默认是1
acknowledge-mode: manual #应答模式,默认的是auto
二、配置
配置类(简洁方法)(推荐)
package com.example.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQRouterConfig {
public static final String QUEUE_HELLO = "Queue@hello";
public static final String QUEUE_HI = "Queue@hi";
public static final String EXCHANGE_TOPIC_WELCOME = "Exchange@topic.welcome";
public static final String ROUTINGKEY_HELLOS = "hello.#";
@Autowired
private AmqpAdmin amqpAdmin;
@Bean
public Object initBindingTest() {
amqpAdmin.declareExchange(new TopicExchange(EXCHANGE_TOPIC_WELCOME, true, false));
amqpAdmin.declareQueue(new Queue(QUEUE_HI, true));
amqpAdmin.declareQueue(new Queue(QUEUE_HELLO, true));
amqpAdmin.declareBinding(new Binding(QUEUE_HELLO, Binding.DestinationType.QUEUE,
EXCHANGE_TOPIC_WELCOME, ROUTINGKEY_HELLOS, null));
return new Object();
}
}
amqpAdmin.declareBinding
需要一个Binding对象作为参数
exchange:交换器名称
type:交换器类型。BuiltinExchangeType枚举类,有以下4中类型交换器:DIRECT(“direct”), FANOUT(“fanout”), TOPIC(“topic”), HEADERS(“headers”)
durable:设置是否持久化。true:持久化,false:非持久化。持久化可以将交换器存盘,在服务器重启时不会丢失相关消息。
autoDelete:设置是否自动删除。true:自动删除,false:不自动删除。自动删除的前提是至少有一个队列或交换器与这个交换器绑定,之后所有与这个交换器绑定的队列或交换器都与此交换器解绑。
internal:设置是否内置的。true:内置交换器,false:非内置交换器。内置交换器,客户端无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式。
arguments:其他一些结构化参数。如备份交换器:alternate-exchange、超时时间。示例配置超时时间方法:
Map<String, Object> params = new HashMap();
params.put(“x-message-ttl”, 2000);
amqpAdmin.declareBinding(new Binding(QUEUE_HELLO, Binding.DestinationType.QUEUE,
EXCHANGE_TOPIC_WELCOME, ROUTINGKEY_HELLOS, params));
三、遇到的问题及解决办法
- 消费顺序问题----------在生产上有个监控功能使用了消息队列,会先发送一条插入数据库的消息到监控系统,当业务成功完成后再发一条删除数据的消息到监控系统。
- 重复消费日志文件打爆------------
- 队列阻塞问题---------------