当前位置: 首页>后端>正文

SpringBoot笔记--RabbitMQ Topic模式

一、前言

Topic模式,又称主题模式,他和路由模式类似,但是比路由模式灵活,绑定在Topic交换机上的队列,可以设置一个带通配符的路由key,比如orange.**.black.*等,例如producer发送消息到Topic交换机时,带上路由可以为orange.1,则会将消息发送到orange.*这个队列。需要注意的是,比如队列绑定的路由key是*.black.*,那producer发送消息时,路由一定要black前后都有值才能匹配到,比如1.black.1才行,1.black是不行的。

二、生产者

1.TopicExchangeConfig.java

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class TopicExchangeConfig {

public static final String TOPIC_EXCHANGE = "topic_exchange";

public static final String QUEUE_ORANGE = "queue_orange";
public static final String QUEUE_BLACK = "queue_black";

public static final String ROUTING_KEY_ORANGE = "orange.*";
public static final String ROUTING_KEY_BLACK = "*.black.*";

/**
 * 定义一个Topic类型交换机
 * @return
 */
@Bean
public TopicExchange topicExchange(){
    return new TopicExchange(TOPIC_EXCHANGE);
}

/**
 * 创建一个队列'queue_orange'
 * @return
 */
@Bean
public Queue queueOrange(){
    return new Queue(QUEUE_ORANGE);
}

/**
 * 创建一个队列'queue_black'
 * @return
 */
@Bean
public Queue queueBlack(){
    return new Queue(QUEUE_BLACK);
}

/**
 * 将队列'queue_orange'绑定到Topic交换机上,并设置路由key为'orange.*'
 * 该队列则可以接收路由key为orange.1,orange.2等样式的消息
 * @return
 */
@Bean
public Binding bindingQueueOrange(){
    return BindingBuilder.bind(queueOrange()).to(topicExchange()).with(ROUTING_KEY_ORANGE);
}

/**
 * 将队列'queue_orange'绑定到Topic交换机上,并设置路由key为'*.black.*'
 * 该队列则可以接收路由key为1.black.1,2.black.2等样式的消息
 * 注意:发送路由key为1.black是匹配不到这个队列的
 * @return
 */
@Bean
public Binding bindingQueueBlack(){
    return BindingBuilder.bind(queueBlack()).to(topicExchange()).with(ROUTING_KEY_BLACK);
}
}

2.ProducerController.java

@RestController
@RequestMapping(value = "send")
public class ProducerController {

@Resource
private RabbitTemplate rabbitTemplate;

/**
 * 发送路由key为"orange.1"的消息到Topic交换机,交换机会将该消息投递给队列"queue_orange"
 */
@GetMapping("topic/orange1")
public void sendOrange1(){
    String msg = "send routing key = orange.1 msg " + new Date().toString();
    rabbitTemplate.convertAndSend(TopicExchangeConfig.TOPIC_EXCHANGE, "orange.1", msg);
}

/**
 * 发送路由key为"orange.2"的消息到Topic交换机,交换机会将该消息投递给队列"queue_orange"
 */
@GetMapping("topic/orange2")
public void sendOrange2(){
    String msg = "send routing key = orange.2 msg " + new Date().toString();
    rabbitTemplate.convertAndSend(TopicExchangeConfig.TOPIC_EXCHANGE, "orange.2", msg);
}

/**
 * 发送路由key为"1.black.1"的消息到Topic交换机,交换机会将该消息投递给队列"queue_black"
 */
@GetMapping("topic/black1")
public void sendBlack1(){
    String msg = "send routing key = 1.black msg " + new Date().toString();
    rabbitTemplate.convertAndSend(TopicExchangeConfig.TOPIC_EXCHANGE, "1.black.1", msg);
}

/**
 * 发送路由key为"2.black.2"的消息到Topic交换机,交换机会将该消息投递给队列"queue_black"
 */
@GetMapping("topic/black2")
public void sendBlack2(){
    String msg = "send routing key = 2.black msg " + new Date().toString();
    rabbitTemplate.convertAndSend(TopicExchangeConfig.TOPIC_EXCHANGE, "2.black.2", msg);
}
}

3.查看队列

服务启动后,将RabbitMQ上将会看到这两个队列

SpringBoot笔记--RabbitMQ Topic模式,第1张
队列.png
SpringBoot笔记--RabbitMQ Topic模式,第2张
topic交换机.png

三、消费者

1.RabbitMqConfig.java

/**
 * 定义队列
 */
@Configuration
public class RabbitMqConfig {

public static final String QUEUE_ORANGE = "queue_orange";

public static final String QUEUE_BLACK = "queue_black";
}

2.RabbitMqReceiver.java

@Component
public class RabbitMqReceiver {

private final static Logger logger = LoggerFactory.getLogger(RabbitMqReceiver.class);

/**
 * 监听队列queue_orange的消息
 */
@RabbitListener(queues = RabbitMqConfig.QUEUE_ORANGE)
public void receiverQueueOrange(String msg, Channel channel, Message message) throws IOException {
    logger.info("receiverQueueOrange 接收到消息为:"+msg);
}

/**
 * 监听队列queue_black的消息
 */
@RabbitListener(queues = RabbitMqConfig.QUEUE_BLACK)
public void receiverQueueBlack(String msg, Channel channel, Message message) throws IOException {
    logger.info("receiverQueueBlack 接收到消息为:"+msg);
}
}

https://www.xamrdz.com/backend/34f1926822.html

相关文章: