消息持久化的两个前提:
1、设置要求队列必须在持久化,保证RabbitMQ宕机后,信道不会消失
2、设置要求队列中的消息必须持久话,保证RabbitMQ宕机后,消息不会消失
设置两个值后,并不能完全保证消息不丢失,因为将消息保存到磁盘的过程中,服务器可能宕机,无法将消息完整地保留到磁盘上。所以,就需要发布确认。
发布确认:发布者将消息发送给消息队列,消息队列将消息保存到磁盘上,然后告诉给发布者,确认已经保存到磁盘中。这个过程就叫做发布确认。
发布确认的策略
开启发布确认的方法
// 发布确认 channel.confirmSelect();
单个确认发布
这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它
被确认发布,后续的消息才能继续发布,waitForConfirmsOrDie(long)这个方法只有在消息被确认
的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。
这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会
阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。当然对于某
些应用程序来说这可能已经足够了。
ConfirmMessage.class
// 批量发消息的个数
public static final int MESSAGE_COUNT=1000;
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 单个确认
publishMessageIndividually();
}
public static void publishMessageIndividually() throws IOException, TimeoutException, InterruptedException {
Channel channel = RabbitMqUitls.rabbitMqConnection();
// 队列声明
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName,true,false, false, null);
// 开启发布确认
channel.confirmSelect();
// 开始时间
long startTime = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("",queueName,null,message.getBytes());
// 单个消息马上进行发布确认, true 为成功, false为失败
boolean flag = channel.waitForConfirms();
if (flag) {
System.out.println("消息发送成功!");
}
}
long endTime = System.currentTimeMillis();
System.out.println("发布"+MESSAGE_COUNT+"条,单个发送确认所耗时间:" + (endTime-startTime) + "ms");
}
耗时:
RabbitMQ控制台:
批量确认发布
与单个等待确认消息相比,先发布一批消息然后一起确认可以极大地提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种方案仍然是同步的,也一样阻塞消息的发布。
在代码中,批量发布相对于单个发布,发布确认channel.waitForConfirms()的位置有所不同,需要在生产者发送批量确认数量时,进行发布确认。当然,这是最简单的示例。代码如下:
public static void publishMessageBatch() throws Exception {
Channel channel = RabbitMqUitls.rabbitMqConnection();
// 队列声明
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName,true,false, false, null);
// 开启发布确认
channel.confirmSelect();
// 开始时间
long startTime = System.currentTimeMillis();
// 定义批量确认数量100, 发送100条确认一次
int batchCount = 100;
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("",queueName,null,message.getBytes());
if (i%batchCount==0) {
channel.waitForConfirms();
System.out.println(i + "条消息发送确认成功");
}
}
long endTime = System.currentTimeMillis();
System.out.println("发布"+MESSAGE_COUNT+"条,单个发送确认所耗时间:" + (endTime-startTime) + "ms");
}
批量确认发布所耗时间:
异步确认发布
是性价比最高,无论是可靠性还是效率都没得说,他是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功,
public static void publishMessageAsync() throws Exception {
Channel channel = RabbitMqUitls.rabbitMqConnection();
// 队列声明
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName,true,false, false, null);
// 开启发布确认
channel.confirmSelect();
/**
* 线程安全有序的一个哈希表,适用于高并发的情况
* 1.轻松的将序号与消息进行关联
* 2.轻松批量删除条目 只要给到序列号
* 3.支持并发访问
*/
ConcurrentSkipListMap<Long, String> concurrentSkipListMap = new ConcurrentSkipListMap<>();
/**
* 确认收到消息的一个回调
* 1.消息序列号
* 2.true 可以确认小于等于当前序列号的消息
* false 确认当前序列号消息
*/
// 消息成功
ConfirmCallback ackCallBack = (deliveryTag, multiple) -> {
System.out.println("确认的消息内容"+concurrentSkipListMap.get(deliveryTag) + "========确认的消息序号:"+ deliveryTag);
if (multiple) {
ConcurrentNavigableMap<Long, String> concurrentNavigableMap =
concurrentSkipListMap.headMap(deliveryTag);
concurrentNavigableMap.clear();
} else {
concurrentSkipListMap.clear();
}
};
// 消息失败
ConfirmCallback nackCallBack = (deliveryTag, multiple) -> {
String s = concurrentSkipListMap.get(deliveryTag);
System.out.println("未确认的消息内容"+s + "========未确认的消息序号:"+ deliveryTag);
};
channel.addConfirmListener(ackCallBack, nackCallBack);
// 开始时间
long startTime = System.currentTimeMillis();
// 发送消息
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = "消息" + i ;
channel.basicPublish("",queueName,null,message.getBytes());
// 记录发送成功的消息
concurrentSkipListMap.put(channel.getNextPublishSeqNo(), message);
}
long endTime = System.currentTimeMillis();
System.out.println("发布"+MESSAGE_COUNT+"条,异步发送确认所耗时间:" + (endTime-startTime) + "ms");
}
异步处理耗时:
注意点: 异步发布确认需要创建一个ConcurrentLinkedQueue(异步链接队列)队列--ConcurrentSkipListMap,用于记录发送成功的消息,然后将发送成功的消息清空,剩下的就是未成功的消息,返回给生产者。
ps:观点只作为个人理解,如有错误,请指正。