RabbitMQ的安装
这里暂时使用window环境,稍后补充linux下的安装。首先,因为RabbitMQ由ERLANG实现,所以首先需要安装ERLANG环境,安装好之后才是RabbitMQ的安装和配置。
ERLANG环境
1.先去官网下载最新的OPT。直接打开exe文件执行安装,记录下安装的目录文件路径。
2.将ERLANG加入系统环境:
3.此时打开erlang表示成功。
RabbitMQ安装
1.下载RabbitMQ,直接运行exe文件即可。
2.检验安装是否成功:
进入命令行,输入你的安装目录和rabbitmqctl status命令,看到一大串东西就说明安装成功。
3.安装 RabbitMQWeb的管理插件。继续命令行输入:rabbitmq-plugins enable rabbitmq_management,此时就可以访问http://localhost:15672啦。
默认的管理员账户和密码都是guest,我们可以通过命令来创建新的管理员。
1.新增加用户:命令行(rabbitmqctl.bat add_user 用户名 密码)。
2.将用户升级为管理员(这样才可以登陆后台管理界面)。输入rabbitmqctl.bat set_user_tags 用户名 administrator
3.使用test1 test1登陆成功,这样RabbitMQ就安装好了。
RabbitMQ入门
RabbitMQ消息队列一共有6种,从简单到复杂分别为简单模式、 Work模式、订阅模式、路由模式、通配符模式和RPC模式。
下面我将使用java简单做几个测试demo。
maven中加入的dependency:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5</version>
</dependency>
简单模式
一个生产者将消息发送到队列中,一个消费者从队列中获得消息。
我们创建两个独立的项目,分别表示消费者和生产者。
生产者端代码:
public class Producer {
private final static String QUEUE_NAME = "RabbitMQ.demo";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 创建连工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置相关信息
factory.setHost("localhost");
// 创建一个链接
Connection connection = factory.newConnection();
// 创建一个通道
Channel channel = connection.createChannel();
// 在通道里声明一个队列
channel.queueDeclare("", false, false, false, null);
// 发布消息到队列中
String message = "This is 消息队列 demo";
for (int i = 0; i < 20; i++) {
channel.basicPublish("", QUEUE_NAME, null, (message+i).getBytes("UTF-8"));
TimeUnit.MILLISECONDS.sleep(300);
System.out.println("生产者发送了消息:" +"["+i+"]" +message);
}
// 关闭资源
channel.close();
connection.close();
}
}
解释一下queueDeclare中的参数。
Parameters:
queue :队列的名字,这没啥好说的。
durable :是否需要持久化,指的是队列持久化到数据库中,持久化设置为true的话,即使RabbitMQ服务崩溃也不会丢失队列里面的数据。
exclusive :是否排外,设置为true的队列只可以在本次的连接中被访问,新的队列就会被排外不能访问。
autoDelete :是否自动删除,也就是关闭connection的时候队列里面的数据是否自动删除。
arguments :队列的属性,如构造参数。(应该没翻译错吧other properties (construction arguments) for the queue)。
从RabbitMQ控制台可以知道,消息发送成功。
消费端代码
public class Customer {
private final static String QUEUE_NAME = "RabbitMQ.demo";
public static void main(String[] args) throws IOException, TimeoutException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
// 创建连工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置相关信息
factory.setHost("localhost");
// 创建一个链接
Connection connection = factory.newConnection();
// 创建一个通道
Channel channel = connection.createChannel();
// 在通道里声明一个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null)
System.out.println("消费者1消息队列开始关注消息");
//定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
//监听队列
//false表示接受消息需返回给消息队列确认
channel.basicConsume(QUEUE_NAME, true,consumer);
//获取消息
while(true){
TimeUnit.SECONDS.sleep(1);
QueueingConsumer.Delivery nextDelivery = consumer.nextDelivery();
//getBody()方法获得队列中的消息
String message = new String(nextDelivery.getBody());
System.out.println("消费者1队列获得了消息:"+message);
}
}
}
控制台输出:
Work模式
一个生产者,一个队列,多个消费者。
生产者代码和简单模式一样,消费端只需要复制多个消费者即可。
注意的是,默认情况下,多个消费者依次获得消息,不能越界,就算你已经早早的消费了消息,但是你还是需要等待,按照顺序从列队拿消息。这显然是不合理,应该是能者多劳。这时我们只需要在生产者加入一行:
channel.basicQos(1);//表示一个时间服务器只会发送一条消息给消费者
订阅模式
生产者将数据发送给交换机或多个队列,消费者从队列中获得。
生产者代码:
public class Producer {
//生产者和交换机绑定了
private final static String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 创建连工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置相关信息
factory.setHost("localhost");
// 创建一个链接
Connection connection = factory.newConnection();
// 创建一个通道
Channel channel = connection.createChannel();
//使用交换机,就声明一个交还机,type=fanout
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
/*//一个时刻只会发送一条消息给消费者
channel.basicQos(1);*/
// 发布消息到队列中
String message = "This is 消息队列 demo";
for (int i = 0; i < 20; i++) {
//设置路由b
channel.basicPublish(EXCHANGE_NAME, "", null, (message+i).getBytes("UTF-8"));
TimeUnit.MILLISECONDS.sleep(300);
System.out.println("生产者发送了消息:" +"["+i+"]" +message);
}
// 关闭资源
channel.close();
connection.close();
}
}
消费者1:
public class Customer {
private final static String QUEUE_NAME = "RabbitMQ1.demo";
private final static String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] args) throws IOException, TimeoutException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
// 创建连工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置相关信息
factory.setHost("localhost");
// 创建一个链接
Connection connection = factory.newConnection();
// 创建一个通道
Channel channel = connection.createChannel();
// 在通道里声明一个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//绑定到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
System.out.println("消费者1消息队列开始关注消息");
//定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
//监听队列
//false表示接受消息需返回给消息队列确认
channel.basicConsume(QUEUE_NAME, true,consumer);
//获取消息
while(true){
TimeUnit.SECONDS.sleep(1);
QueueingConsumer.Delivery nextDelivery = consumer.nextDelivery();
String message = new String(nextDelivery.getBody());
System.out.println("消费者1队列获得了消息:"+message);
}
}
}
消费者2和消费者代码一样,只需要修改QUEUE_NAME即可。
控制台输出:
这表明,两个消费者都获得了相同的消息。
路由模式
生产者将特定的消息传给交换机,然会由交换机发送给特定的队列,消费者只需要指定特定的路由key即可。
生产者:我们只需要修改交换机的名称为direct,指定路由的key即可,这里设置key=b
消费者1:表示接受key=a的消息,其他的接收不到。
输出:
消费者2:表示接受key=b的消息,其他的接收不到。
输出:
统配符模式
统配符模式其实也可以归类于路由模式,只不过统配符模式中路由的key可以使用通配符表示。代码可以参考路由模式。只不过需要修改一下交换机类型为topic:
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
.....
.....
//设置通配符为a.*
channel.basicPublish(EXCHANGE_NAME, "a.*", null, (message+i).getBytes("UTF-8"));
.....
.....
这样任何配以a.开头的消费者都能获得此生产者的消息。