1. 正文
1. 工作者模式:
2. 发布订阅模式
3. 路由模式
4. topic主体模式。
5. Springboot整合rabbitMQ
2. 工作者模式:
特点:
1. 一个生产者
2. 由多个消费。
3. 统一个队列。
4. 这些消费者之间存在竞争关系。
用处:
比如批量处理上. rabbitMQ里面积压了大量的消息。
生产者
package com.tyt.worker;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static void main(String[] args) throws Exception{
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置主机地址
connectionFactory.setHost("192.168.31.145");
// 创建 连接对象Connection
Connection connection = connectionFactory.newConnection();
//创建信道
Channel channel = connection.createChannel();
// 创建队列
/**
* String queue, 队列的名称
* boolean durable,是否该队列持久性,rabbitMQ服务重启后该存放是否存在
* boolean exclusive,是否该队列独占
* boolean autoDelete,是否自动删除,如果长时间没有发生消息则自动删除
* Map<String, Object> arguments 额外参数
*/
channel.queueDeclare("qy129_worker", true, false, false, null);
/**发生消息
* String exchange, 交换机的名称,如果没有则使用“” ,它会自动默认
* String routingKey, 路由key 如果没有交换机的绑定,使用队列的名称
* BasicProperties props, 消息的一些额外配置 目前不加
* byte[] body 消息的内容
*
*/
for (int i = 0; i <10 ; i++) {
String msg = "Hello,rabbitmq,fuck,李浩辰"+i;
channel.basicPublish("","qy129_worker",null,msg.getBytes());
}
channel.close();
connection.close();
}
}
消费者01:
package com.tyt.worker;
import com.rabbitmq.client.*;
import java.io.IOException;
public class ConsumerTest01 {
public static void main(String[] args) throws Exception{
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置主机地址
connectionFactory.setHost("192.168.31.145");
// 创建 连接对象Connection
Connection connection = connectionFactory.newConnection();
//创建信道
Channel channel = connection.createChannel();
//消费消息(接收消息)
/**
* String queue, 队列
* boolean autoAck, 是否自动确认
* Consumer callback 回调方法,当队列中存在信息后,会自动触发回调函数
*/
DefaultConsumer callback = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("c1消息的内容:"+new String(body));
}
};
channel.basicConsume("qy129_worker",true,callback);
}
}
消费者02:
package com.tyt.worker;
import com.rabbitmq.client.*;
import java.io.IOException;
public class ConsumerTest02 {
public static void main(String[] args) throws Exception{
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置主机地址
connectionFactory.setHost("192.168.31.145");
// 创建 连接对象Connection
Connection connection = connectionFactory.newConnection();
//创建信道
Channel channel = connection.createChannel();
//消费消息(接收消息)
/**
* String queue, 队列
* boolean autoAck, 是否自动确认
* Consumer callback 回调方法,当队列中存在信息后,会自动触发回调函数
*/
DefaultConsumer callback = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("c2消息的内容:"+new String(body));
}
};
channel.basicConsume("qy129_worker",true,callback);
}
}
4. 发布订阅模式
1. 特点:
1.一个生产者
2.多个消费者
3.多个队列。
4.交换机 转发消息。
生产者:
package com.tyt.fanout;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static void main(String[] args) throws Exception{
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置主机地址
connectionFactory.setHost("192.168.31.145");
// 创建 连接对象Connection
Connection connection = connectionFactory.newConnection();
//创建信道
Channel channel = connection.createChannel();
// 创建队列
/**
* String queue, 队列的名称
* boolean durable,是否该队列持久性,rabbitMQ服务重启后该存放是否存在
* boolean exclusive,是否该队列独占
* boolean autoDelete,是否自动删除,如果长时间没有发生消息则自动删除
* Map<String, Object> arguments 额外参数
*/
channel.queueDeclare("qy129_fanout01", true, false, false, null);
channel.queueDeclare("qy129_fanout02", true, false, false, null);
/**
* 创建交换机
* String exchange, 交换机名称
* BuiltinExchangeType type,交换机类型
* boolean durable 是否持久化
*/
channel.exchangeDeclare("qy129_exchange_fanout", BuiltinExchangeType.FANOUT, true);
/**
* 连接交换机和队列
* String queue,队列名
* String exchange,交换机名
* String routingKey 路由器key
*/
channel.queueBind("qy129_fanout01", "qy129_exchange_fanout", "");
channel.queueBind("qy129_fanout02", "qy129_exchange_fanout", "");
/**发生消息
* String exchange, 交换机的名称,如果没有则使用“” ,它会自动默认
* String routingKey, 路由key 如果没有交换机的绑定,使用队列的名称
* BasicProperties props, 消息的一些额外配置 目前不加
* byte[] body 消息的内容
*
*/
for (int i = 0; i <10 ; i++) {
String msg = "Hello,rabbitmq,fuck,李浩辰"+i;
channel.basicPublish("qy129_exchange_fanout","",null,msg.getBytes());
}
channel.close();
connection.close();
}
}
消费者:
package com.tyt.fanout;
import com.rabbitmq.client.*;
import java.io.IOException;
public class ConsumerTest01 {
public static void main(String[] args) throws Exception{
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置主机地址
connectionFactory.setHost("192.168.31.145");
// 创建 连接对象Connection
Connection connection = connectionFactory.newConnection();
//创建信道
Channel channel = connection.createChannel();
//消费消息(接收消息)
/**
* String queue, 队列
* boolean autoAck, 是否自动确认
* Consumer callback 回调方法,当队列中存在信息后,会自动触发回调函数
*/
DefaultConsumer callback = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("c1消息的内容:"+new String(body));
}
};
channel.basicConsume("qy129_fanout01",true,callback);
}
}
5.路由模式
特点:
1.一个生产者
2.多个消费者
3.多个队列。
4.交换机 转发消息。
5.routekey:路由key 只要routekey匹配的消息可以到达对应队列。
生产者:
package com.tyt.direct;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static void main(String[] args) throws Exception{
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置主机地址
connectionFactory.setHost("192.168.31.145");
// 创建 连接对象Connection
Connection connection = connectionFactory.newConnection();
//创建信道
Channel channel = connection.createChannel();
// 创建队列
/**
* String queue, 队列的名称
* boolean durable,是否该队列持久性,rabbitMQ服务重启后该存放是否存在
* boolean exclusive,是否该队列独占
* boolean autoDelete,是否自动删除,如果长时间没有发生消息则自动删除
* Map<String, Object> arguments 额外参数
*/
channel.queueDeclare("qy129_direct01", true, false, false, null);
channel.queueDeclare("qy129_direct02", true, false, false, null);
/**
* 创建交换机
* String exchange, 交换机名称
* BuiltinExchangeType type,交换机类型
* boolean durable 是否持久化
*/
channel.exchangeDeclare("qy129_exchange_direct", BuiltinExchangeType.DIRECT, true);
/**
* 连接交换机和队列
* String queue,队列名
* String exchange,交换机名
* String routingKey 路由器key
*/
channel.queueBind("qy129_direct01", "qy129_exchange_direct", "error");
channel.queueBind("qy129_direct02", "qy129_exchange_direct", "error");
channel.queueBind("qy129_direct02", "qy129_exchange_direct", "info");
channel.queueBind("qy129_direct02", "qy129_exchange_direct", "warning");
/**发生消息
* String exchange, 交换机的名称,如果没有则使用“” ,它会自动默认
* String routingKey, 路由key 如果没有交换机的绑定,使用队列的名称
* BasicProperties props, 消息的一些额外配置 目前不加
* byte[] body 消息的内容
*
*/
for (int i = 0; i <10 ; i++) {
String msg = "Hello,rabbitmq,fuck,李浩辰"+i;
channel.basicPublish("qy129_exchange_direct","info",null,msg.getBytes());
}
channel.close();
connection.close();
}
}
消费者基本没什么变化,只需要把通道换一下即可。
6. topic主体模式
1. 绑定按照通配符的模式。
*: 统配一个单词。
#: 统配n个单词
hello.orange.rabbit
lazy.orange
生产者
package com.tyt.topic;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static void main(String[] args) throws Exception{
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置主机地址
connectionFactory.setHost("192.168.31.145");
// 创建 连接对象Connection
Connection connection = connectionFactory.newConnection();
//创建信道
Channel channel = connection.createChannel();
// 创建队列
/**
* String queue, 队列的名称
* boolean durable,是否该队列持久性,rabbitMQ服务重启后该存放是否存在
* boolean exclusive,是否该队列独占
* boolean autoDelete,是否自动删除,如果长时间没有发生消息则自动删除
* Map<String, Object> arguments 额外参数
*/
channel.queueDeclare("qy129_topic01", true, false, false, null);
channel.queueDeclare("qy129_topic02", true, false, false, null);
/**
* 创建交换机
* String exchange, 交换机名称
* BuiltinExchangeType type,交换机类型
* boolean durable 是否持久化
*/
channel.exchangeDeclare("qy129_exchange_topic", BuiltinExchangeType.TOPIC, true);
/**
* 连接交换机和队列
* String queue,队列名
* String exchange,交换机名
* String routingKey 路由器key
*/
channel.queueBind("qy129_topic01", "qy129_exchange_topic", "*.orange.*");
channel.queueBind("qy129_topic02", "qy129_exchange_topic", "*.*.rabbit");
channel.queueBind("qy129_topic02", "qy129_exchange_topic", "#.tyt.#");
channel.queueBind("qy129_topic02", "qy129_exchange_topic", "lazy.#");
/**发生消息
* String exchange, 交换机的名称,如果没有则使用“” ,它会自动默认
* String routingKey, 路由key 如果没有交换机的绑定,使用队列的名称
* BasicProperties props, 消息的一些额外配置 目前不加
* byte[] body 消息的内容
*
*/
for (int i = 0; i <10 ; i++) {
String msg = "Hello,rabbitmq,fuck,李浩辰"+i;
channel.basicPublish("qy129_exchange_topic","tyt",null,msg.getBytes());
}
channel.close();
connection.close();
}
}
7. rabbitMQ整合springboot
springboot引入了相关的依赖后,提供一个工具类RabbitTemplate.使用这个工具类可以发送消息。
(1)父工程引入相关的依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<packaging>pom</packaging>
<modules>
<module>product</module>
<module>consumer</module>
</modules>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.5</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.tyt</groupId>
<artifactId>springboot-rabbit-parent</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>springboot-rabbit-parent</name>
<description>springboot整合rabbitMQ</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<!--rabbitMQ的依赖: 启动类加载。读取配置文件:
springboot自动装配原理: 引用starter启动依赖时,把对应的自动装配类加载进去,该自动装配类可以读取application配置文件中
内容。 DispatherServlet
-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.72</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
(2)相应的生产者和消费的配置
server:
port: 8080
spring:
rabbitmq:
host: 192.168.31.145
datasource:
druid:
username: root
password: 566666
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3308/hello
注意:(1)主机号需要填写你自己的(2)我的mysql的端口号是3308,默认是3306
(3)生产者
package com.tyt.controller;
import com.tyt.service.FruitService;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class HelloController {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private FruitService fruitService;
@GetMapping("hello")
public String hello() {
String one = fruitService.getOne(1);
rabbitTemplate.convertAndSend("qy129_exchange_fanout", "", one);//相当于经过序列化
return "下单成功";
}
}
消费者监听消息
package com.tyt.listener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class ConsumerListener {
@RabbitListener(queues = "qy129_fanout01")
public void listener(Message msg) {
System.out.println("出库"+msg);
}
}