一、简介
RabbitMQ是一个部署最广泛的开源消息代理中间件。它具有轻量级、支持多语言多平台部署、支持多种消息传递协议等优点,并且支持分布式部署以满足大规模、高可用性的业务需求。如下是RabbitMQ官网列出的功能:
二、运行原理
2.1 RabbitMQ中的一些概念
-
Message
消息,由消息头和消息体组成:消息头由一系列可选属性构成,包括routing-key(路由键)、priority(优先权)、delivery-mode(是否需要持久性存储)等,这些属性规定了该消息的发送规则;而消息体则指我们需要发送的具体消息内容,它是不透明的。 -
Publisher
消息的生产者,向消息代理(Message Broker)发布消息的客户端应用程序。 -
Exchange
交换器,接收生产者发送的消息并将这些消息路由给服务器中的零个或多个队列。使用的路由算法取决于Exchange类型和Binding规则。AMQP中定义了四种交换类型,与RabbitMQ的交换器定义对应关系如下:
不同类型的Exchange转发消息的策略有所不同。headers匹配AMQP消息的header而不是路由键,headers 交换器和direct 交换器原理一样,但性能相差很大,目前几乎用不到。
三种交换方式简析
-
Direct Exchange
如果消息中的路由键(routing key)和Binding中的binding key一致,交换器就将该消息发送到对应的队列中。在direct Exchange模式下,两个key需要完全匹配。 -
Fanout Exchange
一个fanout交换器可以绑定多个队列,fanout交换器不处理路由键,只是将消息发送到每个被绑定的队列上。与子网广播类似,每台子网内的主机都获得了一份复制的消息。fanout类型转发消息是最快的。 -
Topic Exchange
topic交换器通过模式匹配分配消息的路由键属性,将路由键和绑定键进行匹配。路由键和绑定键的字符串用点切分为单词,以及通配符“#”和“”,“#”匹配0个或多个单词,“”匹配一个单词。
-
Queue
消息队列,相当于一个消息容器,用来保存消息。一个消息可被放入一个或多个队列,消费者通过与消息队列连接,将消息取出。 -
Binding
绑定,基于路由键将Exchange与Queue连接起来的路由规则,可以是多对多的关系。 -
Connection
网络连接,如一个TCP连接。 -
Channel
信道,是TCP里面的虚拟连接,一个TCP连接中容纳多个信道。信道与TCP就好比光纤与电缆,信道是一条独立光纤束。在实际应用中,一个线程使用一个信道,保证了线程之间的独立和安全性,也减少了系统开销。TCP一旦打开,AMQP信道就会创建。发布消息、接收消息、订阅队列这些动作都是通过信道来完成。
在没有信道的情况下,多个线程进行消息的生产和消费时,每个线程都会创建一个TCP连接。而TCP的创建需要3次握手,销毁需要4次分手,资源消耗非常大。高峰时,每秒成千上万条TCP连接会造成巨大的资源浪费,并且操作系统每秒处理的TCP连接数也是有限的,这也会造成性能降低。
-
Consumer
消费者,从消息队列中取得消息的客户端应用程序。 -
Virtual Host
虚拟主机,本质上是一个mini版的RabbitMQ服务器,拥有自己的队列、交换机、绑定和权限机制,vhost是共享相同的身份认证和加密环境的独立服务器域。vhost是AMQP的基础,必须在连接时指定,RabbitMQ默认的vhost是/。 -
Broker
消息队列服务器实体
三、在Spring Boot中的实践
- 在Docker上安装RabbitMQ
// 下载带management的版本,表示有管理界面
docker pull registry.docker-cn.com/library/rabbitmq:3-management
//5672是RabbitMQ的端口号;15672是RabbitMQ的Web页面的端口号
docker run -d -p 5672:5672 -p15672:15672 --name myrabbitmq [镜像id]
- 引入spring-boot-starter-amqp依赖
<dependency>
<groupId>org.springframework.boot<groupId>
<artifactId>spring-boot-starter-amqp<artifactId>
<dependency>
-
application.yml配置
类RabbitProperties中定义了RabbitMQ的相关属性,我们也可以在application.yml中用spring.rabbitmq自定义属性。
@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitProperties {
/**
*RabbitMQ host
*/
private String host = "localhost";
/**
*RabbitMQ port
*/
private int port = 5672;
...
}
application.yml文件中可以如下配置:
spring.rabbitmq.host=118.24.33.122
spring.rabbitmq.username=guest
spring.rabbitmq.username=guest
- 测试RabbitMQ
- RabbitTemplate:消息发送处理组件
public class HelloWorldMainApplicationTest {
@Autowired
RabbitTemplate rabbitTemplate;
/**
* 点对点模式
*/
@Test
public void contextLoads() {
//需要自己构造一个Message:消息体内容和消息头
//rabbitTemplate.send(exchange,routeKey,message);
//object默认被当成消息体,当传入一个对象时,RabbitTemplate会把它自动序列化后发送给rabbimq
//rabbitTemplate.convertAndSend(exchange,routeKey,object);
Map<String,Object> map = new HashMap<>();
map.put("msg", "第一个消息");
map.put("data", Arrays.asList("helloworld", 123, 333));
rabbitTemplate.convertAndSend("exchange.direct", "direct.route",map);
}
/**
* 接收数据
*/
@Test
public void recieve() {
Object o = rabbitTemplate.receiveAndConvert("direct.route");
System.out.println(o.getClass());
System.out.println(o);
}
- RabbitMQ监听消息队列内容---@RabbitListener&@EnableRabbit
@EnableRabbit表示开启基于注解的RabbitMQ,放在启动类上
@RabbitListener可以监听指定消息队列的变化
例如:
@Service
public class BookService {
@RabbitListener(queues = "web.news")
public void receive(Book book) {
System.out.println("message" + book);
}
//将监听到的消息定义为Message对象,可以获取消息头
@RabbitListener(queues = "web.news")
public void receive(Message message) {
System.out.println("消息头:" + message.getMessageProperties());
System.out.println("消息体" + message.getBody());
}
}
-
AmqpAdmin:RabbitMQ系统管理组件:可以用于声明Exchange、Queue和Binding等;
AMQP中定义了接口Exchange、Queue和Binding接口,而AmqpAdmin中对应了各种方法用来创建、删除上述对象: - declareExchange\declareQueue\declareBinding
-
deleteExchange\deleteQueue\deleteBinding
我们在定义这些对象时,可以看一下需要定义哪些属性。比如,Exchange接口有如下属性及实现: