当前位置: 首页>后端>正文

Java rabbitmq消息订阅创建多个连接绑定交换机 rabbitmq配置多个生产者

1. 正文

1. 工作者模式: 
2. 发布订阅模式
3. 路由模式
4. topic主体模式。
5. Springboot整合rabbitMQ

2. 工作者模式:

Java rabbitmq消息订阅创建多个连接绑定交换机 rabbitmq配置多个生产者,Java rabbitmq消息订阅创建多个连接绑定交换机 rabbitmq配置多个生产者_java,第1张

特点:
    1. 一个生产者
    2. 由多个消费。
    3. 统一个队列。
    4. 这些消费者之间存在竞争关系。

用处:
    比如批量处理上. rabbitMQ里面积压了大量的消息。

生产者

package com.tyt.worker;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {
    public static void main(String[] args) throws Exception{
        //创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
//        设置主机地址
        connectionFactory.setHost("192.168.31.145");
//        创建 连接对象Connection
        Connection connection = connectionFactory.newConnection();

//创建信道
        Channel channel = connection.createChannel();
//        创建队列
        /**
         * String queue, 队列的名称
         * boolean durable,是否该队列持久性,rabbitMQ服务重启后该存放是否存在
         * boolean exclusive,是否该队列独占
         * boolean autoDelete,是否自动删除,如果长时间没有发生消息则自动删除
         * Map<String, Object> arguments 额外参数
         */
        channel.queueDeclare("qy129_worker", true, false, false, null);
        /**发生消息
         * String exchange, 交换机的名称,如果没有则使用“” ,它会自动默认
         * String routingKey, 路由key 如果没有交换机的绑定,使用队列的名称
         * BasicProperties props, 消息的一些额外配置 目前不加
         * byte[] body  消息的内容
         *
         */
        for (int i = 0; i <10 ; i++) {
            String msg = "Hello,rabbitmq,fuck,李浩辰"+i;
            channel.basicPublish("","qy129_worker",null,msg.getBytes());
        }
        channel.close();
        connection.close();

    }
}

消费者01:

package com.tyt.worker;

import com.rabbitmq.client.*;

import java.io.IOException;

public class ConsumerTest01 {
    public static void main(String[] args) throws Exception{
        //创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
//        设置主机地址
        connectionFactory.setHost("192.168.31.145");
//        创建 连接对象Connection
        Connection connection = connectionFactory.newConnection();
//创建信道
        Channel channel = connection.createChannel();
        //消费消息(接收消息)
        /**
         * String queue, 队列
         * boolean autoAck, 是否自动确认
         * Consumer callback 回调方法,当队列中存在信息后,会自动触发回调函数
         */
        DefaultConsumer callback = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("c1消息的内容:"+new String(body));
            }
        };
        channel.basicConsume("qy129_worker",true,callback);
    }
}

消费者02:

package com.tyt.worker;

import com.rabbitmq.client.*;

import java.io.IOException;

public class ConsumerTest02 {
    public static void main(String[] args) throws Exception{
        //创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
//        设置主机地址
        connectionFactory.setHost("192.168.31.145");
//        创建 连接对象Connection
        Connection connection = connectionFactory.newConnection();
//创建信道
        Channel channel = connection.createChannel();
        //消费消息(接收消息)
        /**
         * String queue, 队列
         * boolean autoAck, 是否自动确认
         * Consumer callback 回调方法,当队列中存在信息后,会自动触发回调函数
         */
        DefaultConsumer callback = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("c2消息的内容:"+new String(body));
            }
        };
        channel.basicConsume("qy129_worker",true,callback);
    }
}

4. 发布订阅模式

Java rabbitmq消息订阅创建多个连接绑定交换机 rabbitmq配置多个生产者,Java rabbitmq消息订阅创建多个连接绑定交换机 rabbitmq配置多个生产者_重启_02,第2张

1. 特点:
    1.一个生产者
    2.多个消费者
    3.多个队列。
    4.交换机 转发消息。

生产者:

package com.tyt.fanout;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {
    public static void main(String[] args) throws Exception{
        //创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
//        设置主机地址
        connectionFactory.setHost("192.168.31.145");
//        创建 连接对象Connection
        Connection connection = connectionFactory.newConnection();

//创建信道
        Channel channel = connection.createChannel();
//        创建队列
        /**
         * String queue, 队列的名称
         * boolean durable,是否该队列持久性,rabbitMQ服务重启后该存放是否存在
         * boolean exclusive,是否该队列独占
         * boolean autoDelete,是否自动删除,如果长时间没有发生消息则自动删除
         * Map<String, Object> arguments 额外参数
         */
        channel.queueDeclare("qy129_fanout01", true, false, false, null);
        channel.queueDeclare("qy129_fanout02", true, false, false, null);
        /**
         * 创建交换机
         * String exchange, 交换机名称
         * BuiltinExchangeType type,交换机类型
         * boolean durable 是否持久化
         */
        channel.exchangeDeclare("qy129_exchange_fanout", BuiltinExchangeType.FANOUT, true);
        /**
         * 连接交换机和队列
         * String queue,队列名
         * String exchange,交换机名
         * String routingKey 路由器key
         */
        channel.queueBind("qy129_fanout01", "qy129_exchange_fanout", "");
        channel.queueBind("qy129_fanout02", "qy129_exchange_fanout", "");
        /**发生消息
         * String exchange, 交换机的名称,如果没有则使用“” ,它会自动默认
         * String routingKey, 路由key 如果没有交换机的绑定,使用队列的名称
         * BasicProperties props, 消息的一些额外配置 目前不加
         * byte[] body  消息的内容
         *
         */
        for (int i = 0; i <10 ; i++) {
            String msg = "Hello,rabbitmq,fuck,李浩辰"+i;
            channel.basicPublish("qy129_exchange_fanout","",null,msg.getBytes());
        }
        channel.close();
        connection.close();

    }
}

消费者:

package com.tyt.fanout;

import com.rabbitmq.client.*;

import java.io.IOException;

public class ConsumerTest01 {
    public static void main(String[] args) throws Exception{
        //创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
//        设置主机地址
        connectionFactory.setHost("192.168.31.145");
//        创建 连接对象Connection
        Connection connection = connectionFactory.newConnection();
//创建信道
        Channel channel = connection.createChannel();
        //消费消息(接收消息)
        /**
         * String queue, 队列
         * boolean autoAck, 是否自动确认
         * Consumer callback 回调方法,当队列中存在信息后,会自动触发回调函数
         */
        DefaultConsumer callback = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("c1消息的内容:"+new String(body));
            }
        };
        channel.basicConsume("qy129_fanout01",true,callback);
    }
}

5.路由模式

Java rabbitmq消息订阅创建多个连接绑定交换机 rabbitmq配置多个生产者,Java rabbitmq消息订阅创建多个连接绑定交换机 rabbitmq配置多个生产者_持久性_03,第3张

特点:
    1.一个生产者
    2.多个消费者
    3.多个队列。
    4.交换机 转发消息。
    5.routekey:路由key 只要routekey匹配的消息可以到达对应队列。

生产者:

package com.tyt.direct;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {
    public static void main(String[] args) throws Exception{
        //创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
//        设置主机地址
        connectionFactory.setHost("192.168.31.145");
//        创建 连接对象Connection
        Connection connection = connectionFactory.newConnection();

//创建信道
        Channel channel = connection.createChannel();
//        创建队列
        /**
         * String queue, 队列的名称
         * boolean durable,是否该队列持久性,rabbitMQ服务重启后该存放是否存在
         * boolean exclusive,是否该队列独占
         * boolean autoDelete,是否自动删除,如果长时间没有发生消息则自动删除
         * Map<String, Object> arguments 额外参数
         */
        channel.queueDeclare("qy129_direct01", true, false, false, null);
        channel.queueDeclare("qy129_direct02", true, false, false, null);
        /**
         * 创建交换机
         * String exchange, 交换机名称
         * BuiltinExchangeType type,交换机类型
         * boolean durable 是否持久化
         */
        channel.exchangeDeclare("qy129_exchange_direct", BuiltinExchangeType.DIRECT, true);
        /**
         * 连接交换机和队列
         * String queue,队列名
         * String exchange,交换机名
         * String routingKey 路由器key
         */
        channel.queueBind("qy129_direct01", "qy129_exchange_direct", "error");

        channel.queueBind("qy129_direct02", "qy129_exchange_direct", "error");
        channel.queueBind("qy129_direct02", "qy129_exchange_direct", "info");
        channel.queueBind("qy129_direct02", "qy129_exchange_direct", "warning");
        /**发生消息
         * String exchange, 交换机的名称,如果没有则使用“” ,它会自动默认
         * String routingKey, 路由key 如果没有交换机的绑定,使用队列的名称
         * BasicProperties props, 消息的一些额外配置 目前不加
         * byte[] body  消息的内容
         *
         */
        for (int i = 0; i <10 ; i++) {
            String msg = "Hello,rabbitmq,fuck,李浩辰"+i;
            channel.basicPublish("qy129_exchange_direct","info",null,msg.getBytes());
        }
        channel.close();
        connection.close();

    }
}

消费者基本没什么变化,只需要把通道换一下即可。

6. topic主体模式

Java rabbitmq消息订阅创建多个连接绑定交换机 rabbitmq配置多个生产者,Java rabbitmq消息订阅创建多个连接绑定交换机 rabbitmq配置多个生产者_rabbitmq_04,第4张

1. 绑定按照通配符的模式。
     *: 统配一个单词。
     #: 统配n个单词

hello.orange.rabbit

lazy.orange

生产者

package com.tyt.topic;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {
    public static void main(String[] args) throws Exception{
        //创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
//        设置主机地址
        connectionFactory.setHost("192.168.31.145");
//        创建 连接对象Connection
        Connection connection = connectionFactory.newConnection();

//创建信道
        Channel channel = connection.createChannel();
//        创建队列
        /**
         * String queue, 队列的名称
         * boolean durable,是否该队列持久性,rabbitMQ服务重启后该存放是否存在
         * boolean exclusive,是否该队列独占
         * boolean autoDelete,是否自动删除,如果长时间没有发生消息则自动删除
         * Map<String, Object> arguments 额外参数
         */
        channel.queueDeclare("qy129_topic01", true, false, false, null);
        channel.queueDeclare("qy129_topic02", true, false, false, null);
        /**
         * 创建交换机
         * String exchange, 交换机名称
         * BuiltinExchangeType type,交换机类型
         * boolean durable 是否持久化
         */
        channel.exchangeDeclare("qy129_exchange_topic", BuiltinExchangeType.TOPIC, true);
        /**
         * 连接交换机和队列
         * String queue,队列名
         * String exchange,交换机名
         * String routingKey 路由器key
         */
        channel.queueBind("qy129_topic01", "qy129_exchange_topic", "*.orange.*");

        channel.queueBind("qy129_topic02", "qy129_exchange_topic", "*.*.rabbit");
        channel.queueBind("qy129_topic02", "qy129_exchange_topic", "#.tyt.#");
        channel.queueBind("qy129_topic02", "qy129_exchange_topic", "lazy.#");
        /**发生消息
         * String exchange, 交换机的名称,如果没有则使用“” ,它会自动默认
         * String routingKey, 路由key 如果没有交换机的绑定,使用队列的名称
         * BasicProperties props, 消息的一些额外配置 目前不加
         * byte[] body  消息的内容
         *
         */
        for (int i = 0; i <10 ; i++) {
            String msg = "Hello,rabbitmq,fuck,李浩辰"+i;
            channel.basicPublish("qy129_exchange_topic","tyt",null,msg.getBytes());
        }
        channel.close();
        connection.close();

    }
}

7. rabbitMQ整合springboot

springboot引入了相关的依赖后,提供一个工具类RabbitTemplate.使用这个工具类可以发送消息。

Java rabbitmq消息订阅创建多个连接绑定交换机 rabbitmq配置多个生产者,Java rabbitmq消息订阅创建多个连接绑定交换机 rabbitmq配置多个生产者_rabbitmq_05,第5张

(1)父工程引入相关的依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <packaging>pom</packaging>
    <modules>
        <module>product</module>
        <module>consumer</module>
    </modules>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.4.5</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.tyt</groupId>
    <artifactId>springboot-rabbit-parent</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>springboot-rabbit-parent</name>
    <description>springboot整合rabbitMQ</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <!--rabbitMQ的依赖: 启动类加载。读取配置文件:
           springboot自动装配原理: 引用starter启动依赖时,把对应的自动装配类加载进去,该自动装配类可以读取application配置文件中
           内容。 DispatherServlet
        -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.72</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

  
</project>

(2)相应的生产者和消费的配置

server:
  port: 8080

spring:
  rabbitmq:
    host: 192.168.31.145
  datasource:
    druid:
      username: root
      password: 566666
      driver-class-name: com.mysql.cj.jdbc.Driver
      url: jdbc:mysql://localhost:3308/hello

注意:(1)主机号需要填写你自己的(2)我的mysql的端口号是3308,默认是3306
(3)生产者

package com.tyt.controller;

import com.tyt.service.FruitService;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class HelloController {

    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private FruitService fruitService;
    @GetMapping("hello")
    public String hello() {
        String one = fruitService.getOne(1);
        rabbitTemplate.convertAndSend("qy129_exchange_fanout", "", one);//相当于经过序列化
        return "下单成功";
    }
}

消费者监听消息

package com.tyt.listener;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class ConsumerListener {
    @RabbitListener(queues = "qy129_fanout01")
    public void listener(Message msg) {
        System.out.println("出库"+msg);
    }
}



https://www.xamrdz.com/backend/3l41962962.html

相关文章: