现有消费者Worker02 、Worker03
public class Worker02 {
public static final String TASK_QUEUE_NAME="ack_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUitls.rabbitMqConnection();
System.out.println("C1等待接收消息,处理时间较短");
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("C1接收消息:" + new String(message.getBody()));
SleepUtil.sleep(1);
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
// 采用手动应答
channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, CancelCallback->{
System.out.println("消费者取消接收时的回调函数");
});
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("C1接收消息:" + new String(body));
SleepUtil.sleep(1);
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 设置prefetchCount=1 不公平分发
//channel.basicQos(1);
// 设置预取值 prefetchCount>1; 设置预取值,并不是说此信道只能处理两条,而是当前信道总共只能堆积2条,如果超过,则由其他信道接收处理
channel.basicQos(5);
channel.basicConsume(TASK_QUEUE_NAME,false,consumer);
}
}
public class Worker03 {
public static final String TASK_QUEUE_NAME="ack_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUitls.rabbitMqConnection();
System.out.println("C2等待接收消息,处理时间较长");
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("C2接收消息:" + new String(message.getBody()));
SleepUtil.sleep(30);
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
// 设置prefetchCount=1 不公平分发
//channel.basicQos(1);
// 设置预取值 prefetchCount>1
channel.basicQos(2);
// 采用手动应答
channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, CancelCallback->{
System.out.println("消费者取消接收时的回调函数");
});
}
}
我们为Worker02设置预取值为5,Worker03 为2。 就是说02的信道可以堆积5条,03可以堆积2条。但是,02的处理速度要比03快的多,在信道还没有堆积5条时,先接收的消息已经被02处理完毕,所以,02可能不止接收到5条。
生产者发送7条数据:
Worker02接收情况:
Worker03 接收情况:
当我们将03的预取值改为5,02改为2,因为03的处理速度很慢,所以信道堵塞,最多堵塞5条。
03堵塞:
ps:观点只作为个人理解,如有错误,请指正。