Spring AMQP 是对 Spring 基于 AMQP 的消息收发解决方案,它是一个抽象层,不依赖于特定的 AMQP Broker 实现和客户端的抽象,所以可以很方便地替换。比如我们可以使用 spring-rabbit 来实现。
基本消息队列的消息发送流程:
- 建立connection
- 创建channel
- 利用channel声明队列
- 利用channel向队列发送消息
基本消息队列的消息接收流程:
- 建立connection
- 创建channel
- 利用channel声明队列(生产者与消费者都需要声明队列,是防止队列不存在,重复声明不会影响)
- 定义consumer的消费行为handleDelivery()
- 利用channel将消费者与队列绑定(回调机制,绑定函数但要收到消息才执行)
快速入门
- 在父工程中引入spring-amqp的依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 在publisher服务中利用RabbitTemplate发送消息到simple.queue这个队列
- 在publisher服务中编写application.yml
spring:
rabbitmq:
host: 192.168.187.128 #rabbitmq的ip地址
port: 5672 #端口号
username: ylf
password: 123456
virtual-host: / #虚拟主机
-
创建一个队列
编写测试方法
@SpringBootTest
@RunWith(SpringRunner.class)
public class SpringAmqpTest {
@Autowired RabbitTemplate rabbitTemplate;
@Test
public void testSendMsg() {
String queueName = "simple.queue"; // 队列名称
String message = "发送成功!"; // 消息内容
rabbitTemplate.convertAndSend(queueName, message);
}
}
- 在consumer模块中消费消息
- application.yml中添加mq连接信息
spring:
rabbitmq:
host: 192.168.187.128 #rabbitmq的ip地址
port: 5672 #端口号
username: ylf
password: 123456
virtual-host: / #虚拟主机
- 编写一个监听类
package com.yy.comsumer.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
//声明为bean,让Spring能够找到
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
//告诉spring要监听了队列
public void listenerSimpleQueueMessage(String msg) throws InterruptedException{
//String 是列表发的什么类型,此处就对应定义为什么消息
System.out.println("Spring 消费者接收到消:【" + msg +"】");
}
}
Work Queue工作队列
- Work queue,工作队列,可以提高消息处理速度,避免队列消息堆积
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listenerSimpleQueueMessage(String msg) {
System.out.println("消费者消息:" + msg);
}
@RabbitListener(queues = "simple.queue")
public void listenerSimpleQueueMessage2(String msg) {
System.out.println("消费者消息:" + msg);
}
}
默认情况Rabbitmq会使用预取机制平均的分配消息,如果想要改变这种状态的话
spring:
rabbitmq:
host: 192.168.187.128 #rabbitmq的ip地址
port: 5672 #端口号
username: ylf
password: 123456
virtual-host: / #虚拟主机
listener:
simple:
prefetch: 1 #每次只能获取一条消息,处理完成才能获取下一个消息
发布( Publish )、订阅( Subscribe )
发布订阅模式与之前案例的区别就是允许将同一消息发送给多个消费者。实现方式是加入了exchange(交换机)。
常见exchange类型包括:
- Fanout:广播
- Direct:路由
- Topic:话题
Fanout exchange
- Fanout Exchange 会将接收到的消息广播到每一个跟其绑定的queue
- 创建交换机和队列,并且进行一个绑定
package cn.itcast.mq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author ylf
* @version 1.0
*/
@Configuration
public class FanoutConfig {
// 创建Fanout交换机命名为ylf.fanout
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("ylf.fanout", true, false);
}
// 创建队列1
@Bean
public Queue fanoutQueue1() {
return new Queue("fanout.queue1", true);
}
// 队列1和交换机绑定
@Bean
public Binding fanoutBinding1(FanoutExchange fanoutExchange, Queue fanoutQueue1) {
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
// 创建队列2
@Bean
public Queue fanoutQueue2() {
return new Queue("fanout.queue2");
}
// 队列1和交换机绑定
@Bean
public Binding fanoutBinding2(FanoutExchange fanoutExchange, Queue fanoutQueue2) {
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
- 消息接收
package cn.itcast.mq.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author ylf
* @version 1.0
*/
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "fanout.queue1")
public void listenerFanoutQueue1(String msg) throws InterruptedException {
System.out.println("消费者接收到了fanout.queue1消息:" + msg);
}
@RabbitListener(queues = "fanout.queue2")
public void listenerFanoutQueue2(String msg) throws InterruptedException {
System.out.println("消费者接收到了fanout.queue2消息:" + msg);
}
}
- 消息发送
@Test
public void testSendFanoutMsg() {
// 交换机名称
String exchangeName = "ylf.fanout";
// 消息
String message = "大家好";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "", message);
}
发布订阅-DirectExchange
Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为路由模式(routes)。
- 每一个Queue都与Exchange设置一个BindingKey
- 发布者发送消息时,指定消息的RoutingKey
- Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
- 监听消息,并且使用@RabbitListener注解来创建交换机,队列并且绑定
package cn.itcast.mq.listener;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author ylf
* @version 1.0
*/
@Component
public class SpringRabbitListener {
@RabbitListener(
bindings =
@QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "ylf.direct", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}))
public void listenerDirectQueue1(String msg) {
System.out.println("消费者接收到了direct.queue1消息:" + msg);
}
@RabbitListener(
bindings =
@QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "ylf.direct", type = ExchangeTypes.DIRECT),
key = {"blue", "yellow"}))
public void listenerDirectQueue2(String msg) {
System.out.println("消费者接收到了direct.queue2消息:" + msg);
}
}
- 消息发送
@Test
public void testSendDirectMsg() {
// 交换机名称
String exchangeName = "ylf.direct";
// 消息
String message = "red你好";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "red", message);
}
发布订阅-TopicExchange
TopicExchange与DirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以 . 分割。
Queue与Exchange指定BindingKey时可以使用通配符:
1.#:代指0个或多个单词
2.*:代指一个单词
- 创建交换机队列并且绑定
@RabbitListener(
bindings =
@QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "ylf.topic", type = ExchangeTypes.TOPIC),
key = "china.#"))
public void listenerTopicQueue1(String msg) {
System.out.println("消费者接收到了direct.queue1:" + msg);
}
@RabbitListener(
bindings =
@QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "ylf.topic", type = ExchangeTypes.TOPIC),
key = "#.news"))
public void listenerTopicQueue2(String msg) {
System.out.println("消费者接收到了direct.queue2:" + msg);
}
- 发送消息
@Test
public void testSendTopicMsg() {
// 交换机名称
String exchangeName = "ylf.topic";
// 消息
String message = "蔡徐坤打篮球";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}
消息转换器
Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。
- 如果要修改只需要定义一个MessageConverter 类型的Bean即可。推荐用JSON方式序列化,步骤如下:
- 发送消息:
- 我们在父工程引入依赖
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
- 我们在publisher服务声明MessageConverter:
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
- 接收消息:
- 我们在consumer服务定义MessageConverter:
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
- 然后定义一个消费者,监听object.queue队列并消费消息:
@RabbitListener(queues = "object.queue")
public void listenObjectQueue(Map<String, Object> msg){
System.out.println("消费者接收到了object.queue:" + msg);
}