当前位置: 首页>后端>正文

java rebbitmq的生产 rabbitmq 多个生产者

说明:

(1)内容说明:

          ● 这儿我们会创建一个项目,演示RabbitMQ最基础的内容;

java rebbitmq的生产 rabbitmq 多个生产者,java rebbitmq的生产 rabbitmq 多个生产者_rabbitmq,第1张

通过,这个最简单的例子,先了解:如何使用RabbitMQ,如何连接RabbitMQ,如何发送消息,如何接收消息等最最基础的内容;

          ● 然后,会演示多个消费者平均压力的内容;

目录

一:第一个生产者和消费者;

0.创建一个maven项目rabbitmq,演示用;

1.引入RabbitMQ的Java客户端的,依赖;

2.第一个生产者;

3.第一个消费者;

4.瞅一眼RabbitMQ管理后台;

二: 根据消息内容的不同,采取不同的处理策略;(这儿演示的是一种思路)

三:当消息相对较多时,多个消费者平均压力; 

1.多个消费者,平均压力:引入; 

2.公平派遣;


一:第一个生产者和消费者;

0.创建一个maven项目rabbitmq,演示用;

java rebbitmq的生产 rabbitmq 多个生产者,java rebbitmq的生产 rabbitmq 多个生产者_java rebbitmq的生产_02,第2张

java rebbitmq的生产 rabbitmq 多个生产者,java rebbitmq的生产 rabbitmq 多个生产者_服务器_03,第3张

java rebbitmq的生产 rabbitmq 多个生产者,java rebbitmq的生产 rabbitmq 多个生产者_中间件_04,第4张

 RabbitMQ支持多语言,其中就包括Java;同时,RabbitMQ的API丰富,我们可以利用RabbitMQ针对Java提供的客户端的一系列API,来完成操作;

1.引入RabbitMQ的Java客户端的,依赖;

<dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-nop</artifactId>
            <version>1.7.29</version>
        </dependency>
    </dependencies>

说明:

(1)依赖说明;

java rebbitmq的生产 rabbitmq 多个生产者,java rebbitmq的生产 rabbitmq 多个生产者_中间件_05,第5张

2.第一个生产者;

java rebbitmq的生产 rabbitmq 多个生产者,java rebbitmq的生产 rabbitmq 多个生产者_java rebbitmq的生产_06,第6张

Send类:

package helloworld;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 描述:发送消息的类:连接到RabbitMQ的服务端,然后发送一条消息,然后退出;
 */
public class Send {
    //我们发送消息时,需要指定要发到哪里去;所以,我们要指定队列的名字;所以,这儿我们定义队列的名字;
    //这个名字可随便取,待会在接收的消息时候,要使用这个队列;
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //2.设置RabbitMQ的地址(即RabbitMQ的服务端的地址)
        //这里面填写是RabbitMQ服务端所在服务器的ip地址
        connectionFactory.setHost("1**.***.***.**8");
        //然后,要想连接RabbitMQ的服务端,我么还需要通过一个用户才行;
        // 所以,这儿我们使用【前面我们设置的,能够在其他服务器上访问RabbitMQ所在服务器的,admin用户】
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("password");
        //PS:记得要放开RabbitMQ部署的服务器的,5672端口;
        //3.建立连接
        Connection connection = connectionFactory.newConnection();
        //4.获得Channel信道(我们大部分的操作,都是在信道上完成的;有了信道后,我们就可以进行操作了)
        Channel channel = connection.createChannel();
        //5.声明队列(有了队列之后,我们就可以发布消息了)
        //参数说明:第一个参数(queue):队列名;
        // 第二个参数(durable):这个队列是否需要持久(即,服务重启后,这个队列是否需要还存在;这儿我们根据自己的需求,设为了false;)
        //第三个参数(exclusive):这个队列是否独有(即,这个队列是不是仅能给这个连接使用;这儿我们设为了false)
        //第四个参数(autoDelete):这个队列是否需要自动删除(即,在队列没有使用的时候,是否需要自动删除;这儿我们设为了false)
        //第五个参数(arguments);
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //6.发布消息
        String message = "测试的消息";
        //参数说明:第一个参数(exchange)是交换机,这儿我们暂时不深入了解;
        // 第二个参数(routingKey)是路由键,这儿我们就写成队列的名字;
        //第三个参数(props),消息除了消息体外,还要有props作为它的配置;
        // 第四个参数(body)消息的内容,要求是byte[]类型的,同时,需要指定编码类型
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes("UTF-8"));
        System.out.println("消息发送成功了:" + message);
        //7.关闭连接:先关闭channel信道,然后关闭connection连接;
        channel.close();
        connection.close();
    }


}

说明:

(0)RabbitMQ有三个所谓的“端”:这儿梳理一下;

          ● 服务端:就是我们安装了RabbitMQ的Linux系统;我们把RabbitMQ启动后,这个服务器中的RabbitMQ就是服务端;

          ● 管理后台:RabbitMQ的管理后台,就是我们在【RabbitMQ入门3:RabbitMQ管理后台,简介;】中,演示的在web端查看、管理RabbitMQ的一些配置的地方;(PS:要想在网页上访问管理后台,那么部署RabbitMQ的服务器,就要开发RabbitMQ的15672端口)

          ● 客户端:比如,在这儿,我们在我们的Java项目中,引入RabbitMQ提供的Java客户端后,我们就可以通过客户端去操作RabbitMQ了;(PS:要想在远端服务器,通过客户端访问RabbitMQ服务,那么部署RabbitMQ的服务器,就要开发RabbitMQ的5672端口)

(1)看注释;这儿的连接RabbitMQ的套路,都是相对固定的,后面如有需要,我们似乎也可以创建一个工具类;

(2)我们要想在其他服务器,通过客户端访问RabbitMQ,那么部署RabbitMQ的服务器,就要开发RabbitMQ的5672端口;关于Linux防火墙设置,可以参考【Linux进阶六:【firewall-cmd】防火墙设置;(以【对外开放Tomcat】为例来演示)】;

java rebbitmq的生产 rabbitmq 多个生产者,java rebbitmq的生产 rabbitmq 多个生产者_java rebbitmq的生产_07,第7张

(3)如果我们的RabbitMQ是安装在本机的话,就可以设置本机地址,然后其默认会使用guest用户去登录,所以设置用户就可以省略;

java rebbitmq的生产 rabbitmq 多个生产者,java rebbitmq的生产 rabbitmq 多个生产者_java rebbitmq的生产_08,第8张

(4)运行结果:

java rebbitmq的生产 rabbitmq 多个生产者,java rebbitmq的生产 rabbitmq 多个生产者_中间件_09,第9张

3.第一个消费者;

java rebbitmq的生产 rabbitmq 多个生产者,java rebbitmq的生产 rabbitmq 多个生产者_java rebbitmq的生产_10,第10张

Recv类:

package helloworld;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 描述:接收消息的类:连接到RabbitMQ的服务端,然后接收消息;这个接收消息的类,会持续运行;
 */
public class Recv {
    //我们这儿想要接收的消息,就是Send类发送到“hello”这个队列中的消息;
    // 所以,在接收消息的时候,我们也要使用到“hello”这个队列;
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //2.设置RabbitMQ的地址(即RabbitMQ的服务端的地址)
        //这里面填写是RabbitMQ服务端所在服务器的ip地址
        connectionFactory.setHost("1**.***.***.**8");
        //然后,要想连接RabbitMQ的服务端,我么还需要通过一个用户才行;
        // 所以,这儿我们使用【前面我们设置的,能够在其他服务器上访问RabbitMQ所在服务器的,admin用户】
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("password");
        //PS:记得要放开RabbitMQ部署的服务器的,5672端口;
        //3.建立连接
        Connection connection = connectionFactory.newConnection();
        //4.获得Channel信道(我们大部分的操作,都是在信道上完成的;有了信道后,我们就可以进行操作了)
        Channel channel = connection.createChannel();
        //5.声明队列:因为这儿想要接收的消息,就是Send类发送到“hello”这个队列中的消息;所以,这儿声明的队列和Send中声明的队列是一样的;
        //参数说明:第一个参数(queue):队列名;
        // 第二个参数(durable):这个队列是否需要持久(即,服务重启后,这个队列是否需要还存在;这儿我们根据自己的需求,设为了false;)
        //第三个参数(exclusive):这个队列是否独有(即,这个队列是不是仅能给这个连接使用;这儿我们设为了false)
        //第四个参数(autoDelete):这个队列是否需要自动删除(即,在队列没有使用的时候,是否需要自动删除;这儿我们设为了false)
        //第五个参数(arguments);
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //6.接收消息,并消费
        //参数说明:第一个参数(queue)队列名;
        // 第二个参数(autoAck),是否去自动的确认收到;即,这儿接收到消息之后,是否需要通知消息发送者;
        //第三个参数(callback),消息收到后的处理
        channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(message);
            }
        });
    }
}

说明:

(1)接收消息的时候,我们不需要关闭channel和connection,这个类会一直运行,即其会一直看队列中有没有数据,有的话就拿过来消费;

(2)看注释;

(3)我们接收到消息后,具体消息怎么处理,写在了handleDelivery()方法中;

java rebbitmq的生产 rabbitmq 多个生产者,java rebbitmq的生产 rabbitmq 多个生产者_服务器_11,第11张

(4)运行结果;

java rebbitmq的生产 rabbitmq 多个生产者,java rebbitmq的生产 rabbitmq 多个生产者_中间件_12,第12张

java rebbitmq的生产 rabbitmq 多个生产者,java rebbitmq的生产 rabbitmq 多个生产者_中间件_13,第13张

java rebbitmq的生产 rabbitmq 多个生产者,java rebbitmq的生产 rabbitmq 多个生产者_中间件_14,第14张

java rebbitmq的生产 rabbitmq 多个生产者,java rebbitmq的生产 rabbitmq 多个生产者_服务端_15,第15张

4.瞅一眼RabbitMQ管理后台;

java rebbitmq的生产 rabbitmq 多个生产者,java rebbitmq的生产 rabbitmq 多个生产者_中间件_16,第16张

java rebbitmq的生产 rabbitmq 多个生产者,java rebbitmq的生产 rabbitmq 多个生产者_rabbitmq_17,第17张


二: 根据消息内容的不同,采取不同的处理策略;(这儿演示的是一种思路)

在实际开发中,有时可能需要根据消息内容的不同,采取不同的处理策略;本篇博客,就来演示一下;

java rebbitmq的生产 rabbitmq 多个生产者,java rebbitmq的生产 rabbitmq 多个生产者_服务器_18,第18张

NewTask类:

package workqueues;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 描述:发送消息的类:其会发送
 */
public class NewTask {
    //我们发送消息时,需要指定要发到哪里去;所以,我们要指定队列的名字;所以,这儿我们定义队列的名字;
    //这个名字可随便取,待会在接收的消息时候,要使用这个队列;
    private final static String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //2.设置RabbitMQ的地址(即RabbitMQ的服务端的地址)
        //这里面填写是RabbitMQ服务端所在服务器的ip地址
        connectionFactory.setHost("1**.***.***.**8");
        //然后,要想连接RabbitMQ的服务端,我么还需要通过一个用户才行;
        // 所以,这儿我们使用【前面我们设置的,能够在其他服务器上访问RabbitMQ所在服务器的,admin用户】
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("password");
        //PS:记得要放开RabbitMQ部署的服务器的,5672端口;
        //3.建立连接
        Connection connection = connectionFactory.newConnection();
        //4.获得Channel信道(我们大部分的操作,都是在信道上完成的;有了信道后,我们就可以进行操作了)
        Channel channel = connection.createChannel();
        //5.声明队列(有了队列之后,我们就可以发布消息了)
        //参数说明:第一个参数(queue):队列名;
        // 第二个参数(durable):这个队列是否需要持久(即,服务重启后,这个队列是否需要还存在;这儿我们根据自己的需求,设为了true;)
        //第三个参数(exclusive):这个队列是否独有(即,这个队列是不是仅能给这个连接使用;这儿我们设为了false)
        //第四个参数(autoDelete):这个队列是否需要自动删除(即,在队列没有使用的时候,是否需要自动删除;这儿我们设为了false)
        //第五个参数(arguments);
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        //6.发布消息;这儿我们模拟发送10条消息(消息内容是,如"1...","2...","3..."……这样的)
        for (int i = 0; i < 10; i++) {
            String message = i + "...";
            //参数说明:第一个参数(exchange)是交换机,这儿我们暂时不深入了解;
            // 第二个参数(routingKey)是路由键,这儿我们就写成队列的名字;
            //第三个参数(props),消息除了消息体外,还要有props作为它的配置;
            // 第四个参数(body)消息的内容,要求是byte[]类型的,同时,需要指定编码类型
            channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8"));
            System.out.println("消息发送成功了:" + message);
        }
        //7.关闭连接:先关闭channel信道,然后关闭connection连接;
        channel.close();
        connection.close();
    }
}

说明:

(1)看注释;

(2)类内容说明;

java rebbitmq的生产 rabbitmq 多个生产者,java rebbitmq的生产 rabbitmq 多个生产者_java rebbitmq的生产_19,第19张

Worker类:

package workqueues;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 描述:接收消息的类:连接到RabbitMQ的服务端,然后接收消息;这个接收消息的类,会持续运行;
 */
public class Worker {
    //我们这儿想要接收的消息,就是NewTask类发送到“task_queue”这个队列中的消息;
    // 所以,在接收消息的时候,我们也要使用到“task_queue”这个队列;
    private final static String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //2.设置RabbitMQ的地址(即RabbitMQ的服务端的地址)
        //这里面填写是RabbitMQ服务端所在服务器的ip地址
        connectionFactory.setHost("1**.***.***.**8");
        //然后,要想连接RabbitMQ的服务端,我么还需要通过一个用户才行;
        // 所以,这儿我们使用【前面我们设置的,能够在其他服务器上访问RabbitMQ所在服务器的,admin用户】
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("password");
        //PS:记得要放开RabbitMQ部署的服务器的,5672端口;
        //3.建立连接
        Connection connection = connectionFactory.newConnection();
        //4.获得Channel信道(我们大部分的操作,都是在信道上完成的;有了信道后,我们就可以进行操作了)
        Channel channel = connection.createChannel();
        //5.声明队列:因为这儿想要接收的消息,就是Send类发送到“hello”这个队列中的消息;所以,这儿声明的队列和Send中声明的队列是一样的;
        //参数说明:第一个参数(queue):队列名;
        // 第二个参数(durable):这个队列是否需要持久(即,服务重启后,这个队列是否需要还存在;这儿我们根据自己的需求,设为了true;)
        //第三个参数(exclusive):这个队列是否独有(即,这个队列是不是仅能给这个连接使用;这儿我们设为了false)
        //第四个参数(autoDelete):这个队列是否需要自动删除(即,在队列没有使用的时候,是否需要自动删除;这儿我们设为了false)
        //第五个参数(arguments);
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        System.out.println("开始接收消息");
        //6.接收消息,并消费
        //参数说明:第一个参数(queue)队列名;
        // 第二个参数(autoAck),是否去自动的确认收到;即,这儿接收到消息之后,是否需要通知消息发送者;
        //第三个参数(callback),消息收到后的处理
        channel.basicConsume(TASK_QUEUE_NAME, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("收到了消息" + message);
                try{
                    doWork(message);
                }finally {
                    System.out.println("完成消息处理");
                }
            }
        });
    }

    /**
     * 工具方法:处理消息;
     * @param task
     */
    private static void doWork(String task) {
        //根据具体消息内容的不同,去处理消息;
        // 即,如果消息中有'.'的话,那么我们就让其处理速度慢1秒;(PS:这儿仅仅是为了演示用的,玩具式程序)
        // 那么,这样一来,就会出现这个效果:如果消息中没有'.',处理的就会很快;如果有'.',处理速度就会慢的多;
        char[] chars = task.toCharArray();
        for (char ch : chars) {
            if (ch == '.') {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

    }
}

说明:

(1)看注释;

(2)类内容说明;

java rebbitmq的生产 rabbitmq 多个生产者,java rebbitmq的生产 rabbitmq 多个生产者_服务器_20,第20张

java rebbitmq的生产 rabbitmq 多个生产者,java rebbitmq的生产 rabbitmq 多个生产者_java rebbitmq的生产_21,第21张

(3)运行结果;

java rebbitmq的生产 rabbitmq 多个生产者,java rebbitmq的生产 rabbitmq 多个生产者_服务端_22,第22张

java rebbitmq的生产 rabbitmq 多个生产者,java rebbitmq的生产 rabbitmq 多个生产者_服务端_23,第23张


三:当消息相对较多时,多个消费者平均压力; 

可以看到,在(二: 根据消息内容的不同,采取不同的处理策略;(这儿演示的是一种思路))中,单靠Worker处理10条信息,需要10秒多;试想,我们再创建一个Worker,10条消息的处理速度就会提升;

1.多个消费者,平均压力:引入; 

java rebbitmq的生产 rabbitmq 多个生产者,java rebbitmq的生产 rabbitmq 多个生产者_rabbitmq_24,第24张

一个直接的想法就是,既然我们想让多个Worker一起来接收并处理消息,那么我们可不可以再运行一次Worker类?

java rebbitmq的生产 rabbitmq 多个生产者,java rebbitmq的生产 rabbitmq 多个生产者_中间件_25,第25张

进行一下配置,让Worker类可以有多个实例并行运行;

java rebbitmq的生产 rabbitmq 多个生产者,java rebbitmq的生产 rabbitmq 多个生产者_java rebbitmq的生产_26,第26张

java rebbitmq的生产 rabbitmq 多个生产者,java rebbitmq的生产 rabbitmq 多个生产者_服务器_27,第27张

java rebbitmq的生产 rabbitmq 多个生产者,java rebbitmq的生产 rabbitmq 多个生产者_java rebbitmq的生产_28,第28张

那么,此时的效果;

java rebbitmq的生产 rabbitmq 多个生产者,java rebbitmq的生产 rabbitmq 多个生产者_中间件_29,第29张

java rebbitmq的生产 rabbitmq 多个生产者,java rebbitmq的生产 rabbitmq 多个生产者_rabbitmq_30,第30张

默认情况下,如果有多个Worker的话,那么这多个Worker会并行工作;RabbitMQ会根据已启动worker和消息的情况,按顺序把每个消息发送给下一个Worker,在消息数量上是平均分配的;即,比如上面有10条消息,两个消费者,那么无论两个消费者处理能力如何,每个消费者都会收到5条消息;

但是,这种平均,是任务量的平均分配,而不一定是真实工作量(压力)的分配;比如,下面的案例;

java rebbitmq的生产 rabbitmq 多个生产者,java rebbitmq的生产 rabbitmq 多个生产者_rabbitmq_31,第31张

此时,再发送消息,观察效果:

java rebbitmq的生产 rabbitmq 多个生产者,java rebbitmq的生产 rabbitmq 多个生产者_服务器_32,第32张

java rebbitmq的生产 rabbitmq 多个生产者,java rebbitmq的生产 rabbitmq 多个生产者_服务器_33,第33张

而,为了解决这种,纯按数量和顺序分配,却没有按工作量(压力)平均分配的问题;就是下面公平派遣的内容了;

2.公平派遣;

java rebbitmq的生产 rabbitmq 多个生产者,java rebbitmq的生产 rabbitmq 多个生产者_rabbitmq_34,第34张

公平派遣是在有多个消费者,而且是循环调度的情况下,来说的;公平派遣机制下,RabbitMQ会根据消费者的压力,来决定是否派遣;

要想实现公平派遣,还需要加入消息确认机制;主要目的是,消费者处理完消息后,发送一个确认消息,这样一来,RabbitMQ就知道你处理完了,然后就会给你发送下一个消息;

修改Worker这个Consumer类的内容如下:

package workqueues;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 描述:接收消息的类:连接到RabbitMQ的服务端,然后接收消息;这个接收消息的类,会持续运行;
 */
public class Worker {
    //我们这儿想要接收的消息,就是NewTask类发送到“task_queue”这个队列中的消息;
    // 所以,在接收消息的时候,我们也要使用到“task_queue”这个队列;
    private final static String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //2.设置RabbitMQ的地址(即RabbitMQ的服务端的地址)
        //这里面填写是RabbitMQ服务端所在服务器的ip地址
        connectionFactory.setHost("1**.***.***.**8");
        //然后,要想连接RabbitMQ的服务端,我么还需要通过一个用户才行;
        // 所以,这儿我们使用【前面我们设置的,能够在其他服务器上访问RabbitMQ所在服务器的,admin用户】
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("password");
        //PS:记得要放开RabbitMQ部署的服务器的,5672端口;
        //3.建立连接
        Connection connection = connectionFactory.newConnection();
        //4.获得Channel信道(我们大部分的操作,都是在信道上完成的;有了信道后,我们就可以进行操作了)
        final Channel channel = connection.createChannel();
        //5.声明队列:因为这儿想要接收的消息,就是Send类发送到“hello”这个队列中的消息;所以,这儿声明的队列和Send中声明的队列是一样的;
        //参数说明:第一个参数(queue):队列名;
        // 第二个参数(durable):这个队列是否需要持久(即,服务重启后,这个队列是否需要还存在;这儿我们根据自己的需求,设为了true;)
        //第三个参数(exclusive):这个队列是否独有(即,这个队列是不是仅能给这个连接使用;这儿我们设为了false)
        //第四个参数(autoDelete):这个队列是否需要自动删除(即,在队列没有使用的时候,是否需要自动删除;这儿我们设为了false)
        //第五个参数(arguments);
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        System.out.println("开始接收消息");

        //这句话的意思,这个消费者最希望处理的消息的数量;那么效果就是,这个消费者在处理完一个消息之前,是不会接收下一个消息的;
        channel.basicQos(1);

        //6.接收消息,并消费
        //参数说明:第一个参数(queue)队列名;
        // 第二个参数(autoAck),是否去自动的确认收到;即,这儿接收到消息之后,是否需要通知消息发送者;
        //为了演示公平派遣,我们这儿改成了false,即我么手动却发送处理了完成的消息
        //第三个参数(callback),消息收到后的处理
        channel.basicConsume(TASK_QUEUE_NAME, false, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("收到了消息" + message);
                try{
                    doWork(message);
                }finally {
                    System.out.println("完成消息处理");
                    //消息处理完成后,去手动确认;
                    //第一个参数(deliveryTag)这个参数是固定的;
                    //第二参数(multiple)意思是,我们是否同时多个消息一起确认,这儿我们不需要,所以设为了false;
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        });
    }

    /**
     * 工具方法:处理消息;
     * @param task
     */
    private static void doWork(String task) {
        //根据具体消息内容的不同,去处理消息;
        // 即,如果消息中有'.'的话,那么我们就让其处理速度慢1秒;(PS:这儿仅仅是为了演示用的,玩具式程序)
        // 那么,这样一来,就会出现这个效果:如果消息中没有'.',处理的就会很快;如果有'.',处理速度就会慢的多;
        char[] chars = task.toCharArray();
        for (char ch : chars) {
            if (ch == '.') {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

    }
}

说明:

(1)修改内容说明;

java rebbitmq的生产 rabbitmq 多个生产者,java rebbitmq的生产 rabbitmq 多个生产者_rabbitmq_35,第35张

(2)注意:此时,一定要记得主动发送【消息处理完成的通知】;否则,RabbitMQ就不知道,这个消息是否被处理完了,其就会认为没有被处理完,于是后续的消息就会得不到处理,越积越多,消耗内存;

(3)运行效果;此时,我们重开两个Worker,然后运行NewTask,发送10条消息;

 

java rebbitmq的生产 rabbitmq 多个生产者,java rebbitmq的生产 rabbitmq 多个生产者_中间件_36,第36张

java rebbitmq的生产 rabbitmq 多个生产者,java rebbitmq的生产 rabbitmq 多个生产者_中间件_37,第37张


https://www.xamrdz.com/backend/3bv1962958.html

相关文章: