一、前言
Topic模式,又称主题模式,他和路由模式类似,但是比路由模式灵活,绑定在Topic交换机上的队列,可以设置一个带通配符的路由key
,比如orange.*
,*.black.*
等,例如producer发送消息到Topic交换机时,带上路由可以为orange.1
,则会将消息发送到orange.*
这个队列。需要注意的是,比如队列绑定的路由key是*.black.*
,那producer发送消息时,路由一定要black前后都有值才能匹配到,比如1.black.1
才行,1.black
是不行的。
二、生产者
1.TopicExchangeConfig.java
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TopicExchangeConfig {
public static final String TOPIC_EXCHANGE = "topic_exchange";
public static final String QUEUE_ORANGE = "queue_orange";
public static final String QUEUE_BLACK = "queue_black";
public static final String ROUTING_KEY_ORANGE = "orange.*";
public static final String ROUTING_KEY_BLACK = "*.black.*";
/**
* 定义一个Topic类型交换机
* @return
*/
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(TOPIC_EXCHANGE);
}
/**
* 创建一个队列'queue_orange'
* @return
*/
@Bean
public Queue queueOrange(){
return new Queue(QUEUE_ORANGE);
}
/**
* 创建一个队列'queue_black'
* @return
*/
@Bean
public Queue queueBlack(){
return new Queue(QUEUE_BLACK);
}
/**
* 将队列'queue_orange'绑定到Topic交换机上,并设置路由key为'orange.*'
* 该队列则可以接收路由key为orange.1,orange.2等样式的消息
* @return
*/
@Bean
public Binding bindingQueueOrange(){
return BindingBuilder.bind(queueOrange()).to(topicExchange()).with(ROUTING_KEY_ORANGE);
}
/**
* 将队列'queue_orange'绑定到Topic交换机上,并设置路由key为'*.black.*'
* 该队列则可以接收路由key为1.black.1,2.black.2等样式的消息
* 注意:发送路由key为1.black是匹配不到这个队列的
* @return
*/
@Bean
public Binding bindingQueueBlack(){
return BindingBuilder.bind(queueBlack()).to(topicExchange()).with(ROUTING_KEY_BLACK);
}
}
2.ProducerController.java
@RestController
@RequestMapping(value = "send")
public class ProducerController {
@Resource
private RabbitTemplate rabbitTemplate;
/**
* 发送路由key为"orange.1"的消息到Topic交换机,交换机会将该消息投递给队列"queue_orange"
*/
@GetMapping("topic/orange1")
public void sendOrange1(){
String msg = "send routing key = orange.1 msg " + new Date().toString();
rabbitTemplate.convertAndSend(TopicExchangeConfig.TOPIC_EXCHANGE, "orange.1", msg);
}
/**
* 发送路由key为"orange.2"的消息到Topic交换机,交换机会将该消息投递给队列"queue_orange"
*/
@GetMapping("topic/orange2")
public void sendOrange2(){
String msg = "send routing key = orange.2 msg " + new Date().toString();
rabbitTemplate.convertAndSend(TopicExchangeConfig.TOPIC_EXCHANGE, "orange.2", msg);
}
/**
* 发送路由key为"1.black.1"的消息到Topic交换机,交换机会将该消息投递给队列"queue_black"
*/
@GetMapping("topic/black1")
public void sendBlack1(){
String msg = "send routing key = 1.black msg " + new Date().toString();
rabbitTemplate.convertAndSend(TopicExchangeConfig.TOPIC_EXCHANGE, "1.black.1", msg);
}
/**
* 发送路由key为"2.black.2"的消息到Topic交换机,交换机会将该消息投递给队列"queue_black"
*/
@GetMapping("topic/black2")
public void sendBlack2(){
String msg = "send routing key = 2.black msg " + new Date().toString();
rabbitTemplate.convertAndSend(TopicExchangeConfig.TOPIC_EXCHANGE, "2.black.2", msg);
}
}
3.查看队列
服务启动后,将RabbitMQ上将会看到这两个队列
三、消费者
1.RabbitMqConfig.java
/**
* 定义队列
*/
@Configuration
public class RabbitMqConfig {
public static final String QUEUE_ORANGE = "queue_orange";
public static final String QUEUE_BLACK = "queue_black";
}
2.RabbitMqReceiver.java
@Component
public class RabbitMqReceiver {
private final static Logger logger = LoggerFactory.getLogger(RabbitMqReceiver.class);
/**
* 监听队列queue_orange的消息
*/
@RabbitListener(queues = RabbitMqConfig.QUEUE_ORANGE)
public void receiverQueueOrange(String msg, Channel channel, Message message) throws IOException {
logger.info("receiverQueueOrange 接收到消息为:"+msg);
}
/**
* 监听队列queue_black的消息
*/
@RabbitListener(queues = RabbitMqConfig.QUEUE_BLACK)
public void receiverQueueBlack(String msg, Channel channel, Message message) throws IOException {
logger.info("receiverQueueBlack 接收到消息为:"+msg);
}
}