当前位置: 首页>后端>正文

RabbitMQ发布确认

消息持久化的两个前提:
1、设置要求队列必须在持久化,保证RabbitMQ宕机后,信道不会消失
2、设置要求队列中的消息必须持久话,保证RabbitMQ宕机后,消息不会消失

设置两个值后,并不能完全保证消息不丢失,因为将消息保存到磁盘的过程中,服务器可能宕机,无法将消息完整地保留到磁盘上。所以,就需要发布确认

发布确认:发布者将消息发送给消息队列,消息队列将消息保存到磁盘上,然后告诉给发布者,确认已经保存到磁盘中。这个过程就叫做发布确认

发布确认的策略

开启发布确认的方法

// 发布确认 channel.confirmSelect();

单个确认发布

这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它
被确认发布,后续的消息才能继续发布,waitForConfirmsOrDie(long)这个方法只有在消息被确认
的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。
这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会
阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。当然对于某
些应用程序来说这可能已经足够了。

ConfirmMessage.class

// 批量发消息的个数
    public static final int MESSAGE_COUNT=1000;

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
       // 单个确认
        publishMessageIndividually();
    }

    public static void publishMessageIndividually() throws IOException, TimeoutException, InterruptedException {
        Channel channel = RabbitMqUitls.rabbitMqConnection();
        // 队列声明
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName,true,false, false, null);
        // 开启发布确认
        channel.confirmSelect();
        // 开始时间
        long startTime = System.currentTimeMillis();
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("",queueName,null,message.getBytes());
            // 单个消息马上进行发布确认, true 为成功, false为失败
            boolean flag = channel.waitForConfirms();
            if (flag) {
                System.out.println("消息发送成功!");
            }
        }
        long endTime = System.currentTimeMillis();
        System.out.println("发布"+MESSAGE_COUNT+"条,单个发送确认所耗时间:" + (endTime-startTime) + "ms");
    }

耗时:


RabbitMQ发布确认,第1张
单个发布确认.png

RabbitMQ控制台:


RabbitMQ发布确认,第2张
单个发布确认RabbitMQ控制台.png

批量确认发布

与单个等待确认消息相比,先发布一批消息然后一起确认可以极大地提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种方案仍然是同步的,也一样阻塞消息的发布。

在代码中,批量发布相对于单个发布,发布确认channel.waitForConfirms()的位置有所不同,需要在生产者发送批量确认数量时,进行发布确认。当然,这是最简单的示例。代码如下:

public static void publishMessageBatch() throws Exception {
        Channel channel = RabbitMqUitls.rabbitMqConnection();
        // 队列声明
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName,true,false, false, null);
        // 开启发布确认
        channel.confirmSelect();
        // 开始时间
        long startTime = System.currentTimeMillis();

        // 定义批量确认数量100, 发送100条确认一次
        int batchCount = 100;
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("",queueName,null,message.getBytes());
            if (i%batchCount==0) {
                channel.waitForConfirms();
                System.out.println(i + "条消息发送确认成功");
            }
        }
        long endTime = System.currentTimeMillis();
        System.out.println("发布"+MESSAGE_COUNT+"条,单个发送确认所耗时间:" + (endTime-startTime) + "ms");
    }

批量确认发布所耗时间:


RabbitMQ发布确认,第3张
批量发布确认.png

异步确认发布

是性价比最高,无论是可靠性还是效率都没得说,他是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功,

 public static void publishMessageAsync() throws Exception {
        Channel channel = RabbitMqUitls.rabbitMqConnection();
        // 队列声明
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName,true,false, false, null);
        // 开启发布确认
        channel.confirmSelect();
        /**
         * 线程安全有序的一个哈希表,适用于高并发的情况
         * 1.轻松的将序号与消息进行关联
         * 2.轻松批量删除条目 只要给到序列号
         * 3.支持并发访问
         */
        ConcurrentSkipListMap<Long, String> concurrentSkipListMap = new  ConcurrentSkipListMap<>();

        /**
         * 确认收到消息的一个回调
         * 1.消息序列号
         * 2.true 可以确认小于等于当前序列号的消息
         * false 确认当前序列号消息
         */
        // 消息成功
        ConfirmCallback ackCallBack = (deliveryTag, multiple) -> {
            System.out.println("确认的消息内容"+concurrentSkipListMap.get(deliveryTag) + "========确认的消息序号:"+ deliveryTag);
            if (multiple) {
                ConcurrentNavigableMap<Long, String> concurrentNavigableMap =
                        concurrentSkipListMap.headMap(deliveryTag);
                concurrentNavigableMap.clear();
            } else {
                concurrentSkipListMap.clear();
            }
        };
        // 消息失败
        ConfirmCallback nackCallBack = (deliveryTag, multiple) -> {
            String s = concurrentSkipListMap.get(deliveryTag);
            System.out.println("未确认的消息内容"+s + "========未确认的消息序号:"+ deliveryTag);
        };

        channel.addConfirmListener(ackCallBack, nackCallBack);
        // 开始时间
        long startTime = System.currentTimeMillis();

        // 发送消息
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = "消息" + i ;
            channel.basicPublish("",queueName,null,message.getBytes());
            // 记录发送成功的消息
            concurrentSkipListMap.put(channel.getNextPublishSeqNo(), message);
        }
        long endTime = System.currentTimeMillis();
        System.out.println("发布"+MESSAGE_COUNT+"条,异步发送确认所耗时间:" + (endTime-startTime) + "ms");
    }

异步处理耗时:


RabbitMQ发布确认,第4张
异步发布确认.png

注意点: 异步发布确认需要创建一个ConcurrentLinkedQueue(异步链接队列)队列--ConcurrentSkipListMap,用于记录发送成功的消息,然后将发送成功的消息清空,剩下的就是未成功的消息,返回给生产者。

ps:观点只作为个人理解,如有错误,请指正。


https://www.xamrdz.com/backend/3v71929081.html

相关文章: