当前位置: 首页>后端>正文

RabbitMq 交换机发送消息规则

一、默认交换机

默认交换机不需要声明交换机,也不需要指定路由key,在发送消息时交换机给空值,路由key给声明的队列名就可以了

       String message = "hello world";
        /**
         * 哪个交换机
         * 哪个路由key
         * 其他参数
         * 消息实体
         */
        channel.basicPublish("",queue_name,null,message.getBytes());

二、fanout 扇形交换机

它会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中,即无视RoutingKey和BindingKey的匹配规则。使用扇形交换机,生产者会将消息广播发送给全部消费者

import com.rabbitmq.client.Channel;
import utils.RabbitMqUtils;
import java.util.Scanner;

/**
 * @Author: yokipang
 * @Date: 2022/5/12
 * 消息生产者
 */
public class EmitLogs {

    public static final String exchange_name = "logs";

    public static void main(String[] args) throws  Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //声明交换机
        //channel.exchangeDeclare(exchange_name, "fanout");
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            String message = scanner.next();
            /**
             * 发送消息
             * 那个交换机
             * 哪个路由key
             * 其他参数  如果消息要持久化则填写MessageProperties.PERSISTENT_TEXT_PLAIN
             * 消息实体
             */
            channel.basicPublish(exchange_name, "", null, message.getBytes());//生产消息时 消息持久化
            System.out.println("生产者发出消息: " + message);
        }
    }
}


import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import utils.RabbitMqUtils;

/**
 * @Author: yokipang
 * @Date: 2022/5/12
 * 消费者 01 
 */
public class ReceiveLog01 {
    //交换机名称
    public static final String exchange_name = "logs";
    public static void main(String[] args) throws  Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //声明交换机
        channel.exchangeDeclare(exchange_name, BuiltinExchangeType.FANOUT);
        //声明临时队列
        String queueName = channel.queueDeclare().getQueue();
        //绑定交换机与队列
        channel.queueBind(queueName,exchange_name,"");
        System.out.println("消息……");

        DeliverCallback deliverCallback = (consumerTag, message) -> {
            try{
                System.out.println("log01接收到的消息"+new String(message.getBody()));
            }catch (Exception e){
                e.printStackTrace();
            }
        };

        CancelCallback cancelCallback = message->{
            System.out.println("取消消息"+message);
        };

        channel.basicConsume(queueName,false,deliverCallback,cancelCallback);
    }

}
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import utils.RabbitMqUtils;

/**
 * @Author: yokipang
 * @Date: 2022/5/12
 * 消费者 02
 */
public class ReceiveLog02 {

    public static final String exchang_name = "logs";

    public static void main(String[] args) throws  Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //声明交换机
        channel.exchangeDeclare(exchang_name, BuiltinExchangeType.FANOUT);
        //声明临时队列
        String queueName = channel.queueDeclare().getQueue();
        //绑定交换机与队列
        channel.queueBind(queueName,exchang_name,"");
        System.out.println("消息……");


        DeliverCallback deliverCallback = (consumerTag, message) -> {
            try{
                System.out.println("log02接收到的消息"+new String(message.getBody()));
            }catch (Exception e){
                e.printStackTrace();
            }

        };

        CancelCallback cancelCallback = message->{
            System.out.println("取消消息"+message);
        };
        //接受消息
        channel.basicConsume(queueName,false,deliverCallback,cancelCallback);
    }
}

结果如下


RabbitMq 交换机发送消息规则,第1张
结果

三、DIRECT交换机

它会把消息路由到那些BindingKey和RoutingKey完全匹配的队列中。
也就是在下面这个方法中的第二个参数指定路由key,消息就只会发送给与这个路由key绑定的队列
channel.basicPublish(exchange_name, "info", null, message.getBytes());

生产者代码↓

import com.rabbitmq.client.Channel;
import utils.RabbitMqUtils;
import java.util.Scanner;

/**
 * @Author: yokipang
 * @Date: 2022/5/12
 */
public class DirectLogs {
    //交换机名称
    public static final String exchange_name = "direct_log";
    public static void main(String[] args) throws  Exception {
        Channel channel = RabbitMqUtils.getChannel();
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            String message = scanner.next();
            /**
             * 发送消息
             * 那个交换机
             * 哪个路由key
             * 其他参数  如果消息要持久化则填写MessageProperties.PERSISTENT_TEXT_PLAIN
             * 消息实体
             */
            channel.basicPublish(exchange_name, "info", null, message.getBytes());
            System.out.println("生产者发出消息: " + message);
        }
    }
}

消费者01代码↓

import com.rabbitmq.client.Channel;
import utils.RabbitMqUtils;
import java.util.Scanner;

/**
 * @Author: yokipang
 * @Date: 2022/5/12
 */
public class DirectLogs {


    public static final String exchange_name = "direct_log";

    public static void main(String[] args) throws  Exception {
        Channel channel = RabbitMqUtils.getChannel();
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            String message = scanner.next();
            /**
             * 发送消息
             * 那个交换机
             * 哪个路由key
             * 其他参数  如果消息要持久化则填写MessageProperties.PERSISTENT_TEXT_PLAIN
             * 消息实体
             */
            channel.basicPublish(exchange_name, "info", null, message.getBytes());//生产消息时 消息持久化
            System.out.println("生产者发出消息: " + message);
        }
    }
}

消费者02代码↓

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import utils.RabbitMqUtils;

/**
 * @Author: yokipang
 * @Date: 2022/5/12
 */
public class ReceiveLogsDirect02 {

    //交换机名称
    public static final String exchange_name = "direct_log";
    //队列名称
    public static final String queue_name = "disk";
    //路由名称
    public static final String routing_key1 = "error";

    public static void main(String[] args) throws  Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //声明交换机
        channel.exchangeDeclare(exchange_name, BuiltinExchangeType.DIRECT);
        //声明队列
        channel.queueDeclare(queue_name,false,false,false,null);
        //绑定交换机与队列
        channel.queueBind(queue_name,exchange_name,routing_key1);
        System.out.println("消息……");

        DeliverCallback deliverCallback = (consumerTag, message) -> {
            try{
                System.out.println("log02接收到的消息"+new String(message.getBody()));
            }catch (Exception e){
                e.printStackTrace();
            }
        };

        CancelCallback cancelCallback = message->{
            System.out.println("取消消息"+message);
        };

        //消费消息
        channel.basicConsume(queue_name,false,deliverCallback,cancelCallback);
    }
}

生产者发送消息时指定了路由key为info,这样就只会由消费者01接收到消息


RabbitMq 交换机发送消息规则,第2张
结果

四、topic交换机

上面讲到direct类型的交换器路由规则是必须完全匹配BindingKey和RoutingKey,但这种严格的匹配方式在很多情况下无法满足实际业务的需求。topic类型的交换器在匹配规则上进行了扩展,它与direct类型的交换器类似,也是将消息路由到BindingKey和RoutingKey相匹配的队列中,但匹配规则略有不同,约定如下:
1.RoutingKey为一个点号"."分隔的字符串,被"."号分隔的每一段独立的字符串称为一个单词,如"com.rabbitmq.client"等。
2.Bindingkey和Routingkey一样也是"."分隔的字符串。
3.BindingKey中存在两种特殊宇符串"和#",用于做模糊匹配,其中"."用于匹配一个单词,"#"用于匹配多个单词(可以是零个)。

生产者代码↓


import com.rabbitmq.client.Channel;
import utils.RabbitMqUtils;
import java.util.HashMap;
import java.util.Map;

/**
 * @Author: yokipang
 * @Date: 2022/5/12
 */
public class TopicLogs {
    public static final String exchange_name = "topic_logs";
    public static void main(String[] args)throws  Exception {

        Channel channel = RabbitMqUtils.getChannel();
        Map<String,String> bindingKeyMap=new HashMap<>();
        bindingKeyMap.put("quick.orange.rabbit","被队列Q1Q2接收到");
        bindingKeyMap.put("lazy.orange.elephant","被队列Q1Q2接收到");
        bindingKeyMap.put("quick.orange.fox","被队列Q1接收到");
        bindingKeyMap.put("lazy.brown.fox","被队列Q2接收到");
        bindingKeyMap.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列Q2接收一次");
        bindingKeyMap.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃");
        bindingKeyMap.put("ouick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃");
        bindingKeyMap.put("lazy.orange.male.rabbit","是四个单词但匹配Q2");
        for (Map.Entry<String,String> bindingKeyEntry : bindingKeyMap.entrySet()){
            String routingKey = bindingKeyEntry.getKey();
            String message = bindingKeyEntry.getValue();
            channel.basicPublish(exchange_name,routingKey,null,message.getBytes());
            System.out.println("生产者发送消息:"+message);
        }
    }
}

消费者代码01 ↓

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import utils.RabbitMqUtils;
/**
 * @Author: yokipang
 * @Date: 2022/5/12
 */
public class ReceiveLogsTopic01 {
    public static final String exchange_name = "topic_logs";
    public static final String queue_name = "Q1";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        //创建交换机
        channel.exchangeDeclare(exchange_name, BuiltinExchangeType.TOPIC);
        //声明队列
        channel.queueDeclare(queue_name,true,false,false,null);
        //绑定交换机 设置路由规则
        channel.queueBind(queue_name,exchange_name,"*.orange.*");
        //取消消息回调函数
        CancelCallback cancelCallback = message->{
            System.out.println("Q1取消消息:"+message);
        };
        //接收消息回调函数
        DeliverCallback deliverCallback =( consumerTag,message )->{
            System.out.println("队列名称:"+queue_name+"    路由key:"+message.getEnvelope().getRoutingKey()+"    消息内容:"+new String(message.getBody()));
        };

        //接收消息
        channel.basicConsume(queue_name,true,deliverCallback,cancelCallback);
    }
}

消费者代码02↓


import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import utils.RabbitMqUtils;
/**
 * @Author: yokipang
 * @Date: 2022/5/12
 */
public class ReceiveLogsTopic02 {
    public static final String exchange_name = "topic_logs";
    public static final String queue_name = "Q2";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        //创建交换机
        channel.exchangeDeclare(exchange_name, BuiltinExchangeType.TOPIC);
        //声明队列
        channel.queueDeclare(queue_name,true,false,false,null);
        //绑定交换机 设置路由规则
        channel.queueBind(queue_name,exchange_name,"*.*.rabbit");
        channel.queueBind(queue_name,exchange_name,"lazy.#");

        //取消消息回调函数
        CancelCallback cancelCallback = message->{
            System.out.println("Q2取消消息:"+message);
        };
        //接收消息回调函数
        DeliverCallback deliverCallback =( consumerTag,message )->{
            System.out.println("队列名称:"+queue_name+"    路由key:"+message.getEnvelope().getRoutingKey()+"    消息内容:"+new String(message.getBody()));
        };

        //接收消息
        channel.basicConsume(queue_name,true,deliverCallback,cancelCallback);

    }

}

RabbitMq 交换机发送消息规则,第3张
生产者发送多条消息
RabbitMq 交换机发送消息规则,第4张
消费者01只接收到了与他的路由key规则匹配的消息
RabbitMq 交换机发送消息规则,第5张
消费者02只接收到了与他的路由key规则匹配的消息

https://www.xamrdz.com/backend/3xc1938380.html

相关文章: