什么是消费端限流?
假设一个场景,首先,我们RabbitMQ服务器有上完条未处理的消息,我们随便打开一个消费客户端,会出现下面情况:
巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这些多数据。
另一种情况时 生产者一分钟生产上千条数据,但是我们消费端一分钟只能消费几百条数据!
但是我生产端的数据量就是这么大,一般都是在消费端做限流,防止消费端的资源被耗尽!
RabbitMQ提供了一种qos(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息也就是消息堆积(通过基于consume或者channel设置)未被确认前,不进行新的消息。
方法:
void BasicQos(uint prefetchSize, ushort prefetchCount, bool global) 一般值 BasicQos(0, 1, false)
prefetchSize 限制大小,比图多少M,0表示不做限制,一般在消费端做,生产端不做
prefetchCount 给一个消费者一次最多处理多少条消息, 一般设置为 1 (N) , 也即是一旦有N个消息还没有ack,则该consumer将block掉,直到消息ack
global 表示在哪一个层面设置限制,在RabbitMQ里面只有两个,一个是channel, 一个消费端! true表示channel层面,false表示消费端层面, 建议在消费端层面
prefetchSize 和 global 这两项,rabbitmq没有实现,暂且不研究prefetch_count 在 no_ask=false 的情况下失效,即在自动应答的情况下这两个值时不生效的
自定义消费
package com.qiyexue.api.limit;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
/**
* 自定义消费者
* @author 七夜雪
* @date 2018-12-16 8:20
*/
public class MyConsumer extends DefaultConsumer {
private Channel channel;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("-------------自定义消费者------------");
System.out.println("consumerTag : " + consumerTag);
System.out.println("envelope : " + envelope);
System.out.println("properties : " + properties);
System.out.println("body : " + new String(body));
// 手工签收, 第二个参数表示是否批量签收
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
生产者
package com.qiyexue.api.limit;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* 生产者
*
* @author 七夜雪
* @date 2018-12-15 19:56
*/
public class Producer {
public static void main(String[] args) throws Exception {
// 1. 创建ConnectionFactory, 并设置属性
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.72.138");
factory.setPort(5672);
factory.setVirtualHost("/");
// 2. 创建连接
Connection connection = factory.newConnection();
// 3. 创建channel
Channel channel = connection.createChannel();
String exchangeName = "test_limit_exchange";
String routingKey = "limit.qiye";
// 发送消息
String msg = "自定义消费者, 消息发送 : Hello, 七夜雪";
for (int i = 0; i < 5; i++) {
channel.basicPublish(exchangeName, routingKey, true, null, msg.getBytes());
}
// 关闭连接
channel.close();
connection.close();
}
}
消费者
package com.qiyexue.api.limit;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* 消费者
*
* @author 七夜雪
* @date 2018-12-15 20:07
*/
public class Consumer {
public static void main(String[] args) throws Exception {
// 1. 创建连接工厂并设置属性
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.72.138");
factory.setPort(5672);
factory.setVirtualHost("/");
// 2. 创建连接
Connection connection = factory.newConnection();
// 3. 创建channel
Channel channel = connection.createChannel();
// 4. 声明Exchange
String exchangeName = "test_limit_exchange";
String exchangeType = "topic";
String routingKey = "limit.*";
channel.exchangeDeclare(exchangeName, exchangeType, true, false, null);
// 5. 声明消息队列
String queueName = "test_limit_queue";
channel.queueDeclare(queueName, true, false, false, null);
// 6. 绑定队列和Exchange
channel.queueBind(queueName, exchangeName, routingKey);
// 表示不限制消息大小, 一次只处理一条消息, 限制只是当前消费者有效
channel.basicQos(0, 1, false);
// 7. 设置消费者为自定义的消费者, 要进行限流必须关闭自动签收
channel.basicConsume(queueName, false, new MyConsumer(channel));
}
}