当前位置: 首页>后端>正文

消费端限流 Qos

什么是消费端限流?

假设一个场景,首先,我们RabbitMQ服务器有上完条未处理的消息,我们随便打开一个消费客户端,会出现下面情况:

巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这些多数据。

另一种情况时 生产者一分钟生产上千条数据,但是我们消费端一分钟只能消费几百条数据!

但是我生产端的数据量就是这么大,一般都是在消费端做限流,防止消费端的资源被耗尽!

RabbitMQ提供了一种qos(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息也就是消息堆积(通过基于consume或者channel设置)未被确认前,不进行新的消息。

方法:

void BasicQos(uint prefetchSize, ushort prefetchCount, bool global) 一般值 BasicQos(0, 1, false)

prefetchSize 限制大小,比图多少M,0表示不做限制,一般在消费端做,生产端不做
prefetchCount 给一个消费者一次最多处理多少条消息, 一般设置为 1 (N) , 也即是一旦有N个消息还没有ack,则该consumer将block掉,直到消息ack
global 表示在哪一个层面设置限制,在RabbitMQ里面只有两个,一个是channel, 一个消费端! true表示channel层面,false表示消费端层面, 建议在消费端层面

prefetchSize 和 global 这两项,rabbitmq没有实现,暂且不研究prefetch_count 在 no_ask=false 的情况下失效,即在自动应答的情况下这两个值时不生效的

自定义消费

package com.qiyexue.api.limit;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;

/**
 * 自定义消费者
 * @author 七夜雪
 * @date 2018-12-16 8:20
 */
public class MyConsumer extends DefaultConsumer {

    private Channel channel;

    public MyConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("-------------自定义消费者------------");
        System.out.println("consumerTag : " + consumerTag);
        System.out.println("envelope : " + envelope);
        System.out.println("properties : " + properties);
        System.out.println("body : " + new String(body));
        // 手工签收, 第二个参数表示是否批量签收
        channel.basicAck(envelope.getDeliveryTag(), false);
    }
}

生产者

package com.qiyexue.api.limit;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 生产者
 *
 * @author 七夜雪
 * @date 2018-12-15 19:56
 */
public class Producer {

    public static void main(String[] args) throws Exception {
        // 1. 创建ConnectionFactory, 并设置属性
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.72.138");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        // 2. 创建连接
        Connection connection = factory.newConnection();

        // 3. 创建channel
        Channel channel = connection.createChannel();


        String exchangeName = "test_limit_exchange";
        String routingKey = "limit.qiye";

        // 发送消息
        String msg = "自定义消费者, 消息发送 : Hello, 七夜雪";
        for (int i = 0; i < 5; i++) {
            channel.basicPublish(exchangeName, routingKey, true, null, msg.getBytes());
        }

        // 关闭连接
        channel.close();
        connection.close();

    }

}

消费者

package com.qiyexue.api.limit;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 消费者
 *
 * @author 七夜雪
 * @date 2018-12-15 20:07
 */
public class Consumer {

    public static void main(String[] args) throws Exception {
        // 1. 创建连接工厂并设置属性
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.72.138");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        // 2. 创建连接
        Connection connection = factory.newConnection();

        // 3. 创建channel
        Channel channel = connection.createChannel();

        // 4. 声明Exchange
        String exchangeName = "test_limit_exchange";
        String exchangeType = "topic";
        String routingKey = "limit.*";
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, null);

        // 5. 声明消息队列
        String queueName = "test_limit_queue";
        channel.queueDeclare(queueName, true, false, false, null);

        // 6. 绑定队列和Exchange
        channel.queueBind(queueName, exchangeName, routingKey);

        // 表示不限制消息大小, 一次只处理一条消息, 限制只是当前消费者有效
        channel.basicQos(0, 1, false);

        // 7. 设置消费者为自定义的消费者, 要进行限流必须关闭自动签收
        channel.basicConsume(queueName, false, new MyConsumer(channel));

    }

}


https://www.xamrdz.com/backend/3kv1933819.html

相关文章: