当前位置: 首页>后端>正文

RabbitMQ预取值

现有消费者Worker02 、Worker03

public class Worker02 {

    public static final String TASK_QUEUE_NAME="ack_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUitls.rabbitMqConnection();
        System.out.println("C1等待接收消息,处理时间较短");
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("C1接收消息:" + new String(message.getBody()));
            SleepUtil.sleep(1);
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
        };
        // 采用手动应答
        channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, CancelCallback->{
            System.out.println("消费者取消接收时的回调函数");
        });

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("C1接收消息:" + new String(body));
                SleepUtil.sleep(1);
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 设置prefetchCount=1 不公平分发
        //channel.basicQos(1);

        // 设置预取值 prefetchCount>1; 设置预取值,并不是说此信道只能处理两条,而是当前信道总共只能堆积2条,如果超过,则由其他信道接收处理
        channel.basicQos(5);
        channel.basicConsume(TASK_QUEUE_NAME,false,consumer);
    }
}
public class Worker03 {

    public static final String TASK_QUEUE_NAME="ack_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUitls.rabbitMqConnection();
        System.out.println("C2等待接收消息,处理时间较长");

        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("C2接收消息:" + new String(message.getBody()));
            SleepUtil.sleep(30);
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
        };
        // 设置prefetchCount=1 不公平分发
        //channel.basicQos(1);

        // 设置预取值 prefetchCount>1
        channel.basicQos(2);
        // 采用手动应答
        channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, CancelCallback->{
            System.out.println("消费者取消接收时的回调函数");
        });
    }
}

我们为Worker02设置预取值为5,Worker03 为2。 就是说02的信道可以堆积5条,03可以堆积2条。但是,02的处理速度要比03快的多,在信道还没有堆积5条时,先接收的消息已经被02处理完毕,所以,02可能不止接收到5条。

生产者发送7条数据


RabbitMQ预取值,第1张

Worker02接收情况:

RabbitMQ预取值,第2张

Worker03 接收情况:


RabbitMQ预取值,第3张

当我们将03的预取值改为5,02改为2,因为03的处理速度很慢,所以信道堵塞,最多堵塞5条。

03堵塞:


RabbitMQ预取值,第4张

ps:观点只作为个人理解,如有错误,请指正。


https://www.xamrdz.com/backend/3q71920640.html

相关文章: