一、通用配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jee="http://www.springframework.org/schema/jee"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/jee
http://www.springframework.org/schema/jee/spring-jee-4.0.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util-4.0.xsd"
default-lazy-init="true">
<description>RabbitMQ 配置</description>
<!-- RabbitMQ 配置 ,requested-heartbeat(单位毫秒) -->
<!-- cache-mode="CONNECTION" ;CONNECTION might be suitable for a listener
container. 使用CONNECTION时,可使用connection-cache-size="5" -->
<rabbit:connection-factory id="rabbitConnectionFactory"
host="${rabbit.host}" username="${rabbit.username}" password="${rabbit.password}"
port="${rabbit.port}" virtual-host="${rabbit.vhost}" cache-mode="CHANNEL"
channel-cache-size="25" requested-heartbeat="10000"
connection-timeout="60000" />
<!-- RabbitMQ Template -->
<rabbit:template id="rabbitTemplate"
connection-factory="rabbitConnectionFactory" retry-template="retryTemplate" />
<!-- retryTemplate start -->
<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
<property name="retryContextCache" ref="mapRetryContextCache" />
<property name="retryPolicy">
<bean class="org.springframework.retry.policy.SimpleRetryPolicy">
<property name="maxAttempts" value="3" />
</bean>
</property>
</bean>
<bean id="mapRetryContextCache" class="org.springframework.retry.policy.MapRetryContextCache" />
<!-- retryTemplate end -->
<!-- auto-startup为true时,系统启动时,会自动创建,Exchange、Queue、Binding -->
<rabbit:admin id="rabbitAdmin" connection-factory="rabbitConnectionFactory"
auto-startup="true" ignore-declaration-exceptions="false" />
<!-- 消费者监听器,AOP -->
<bean id="missingMessageIdAdvice"
class="org.springframework.amqp.rabbit.retry.MissingMessageIdAdvice">
<constructor-arg index="0" ref="mapRetryContextCache" />
</bean>
<!-- 重做拦截器 -->
<bean id="retryOperationsInterceptor"
class="org.springframework.amqp.rabbit.config.StatefulRetryOperationsInterceptorFactoryBean">
<property name="retryOperations" ref="retryTemplate" />
</bean>
<util:list id="adviceChain">
<ref bean="missingMessageIdAdvice" />
<ref bean="retryOperationsInterceptor" />
</util:list>
</beans>
二、消息接收
参考:http://docs.spring.io/spring-amqp/docs/1.5.0.BUILD-SNAPSHOT/reference/html/_reference.html#containerAttributes
配置:
<description>RabbitMQ示例 接收消息 配置</description>
<!--属性文件 -->
<context:property-placeholder location="classpath*:demo.properties" />
<!-- 基础配置,注解声明,注解扫描路径等 -->
<import resource="classpath*:/applicationContext-base.xml" />
<!--引入基础配置 -->
<import resource="classpath*:/applicationContext-rabbitmq.xml" />
<!-- auto-delete="true",在监听器退出时,声明的队列将会被删除;exclusive="true",排他锁,声明的队列只能被当前对象使用 -->
<rabbit:queue id="queue.test.1" name="queue.test.1"
auto-declare="true" declared-by="rabbitAdmin" auto-delete="false"
exclusive="false" durable="true" />
<!-- -->
<rabbit:queue id="queue.test.2" name="queue.test.2"
auto-declare="true" declared-by="rabbitAdmin" auto-delete="false"
exclusive="false" durable="true" />
<!-- -->
<rabbit:queue id="queue.test.3" name="queue.test.3"
auto-declare="true" declared-by="rabbitAdmin" auto-delete="false"
exclusive="false" durable="true" />
<!-- 接收消息监听器 -->
<!-- acknowledge="auto",接收到消息后,服务端会直接删掉,不管消息在客户端有没有处理成功;acknowledge="manual",
服务端会在客户端处理完毕后,反馈处理成功消息后删掉消息 -->
<!-- 初始消费者线程数 concurrency="3" -->
<!-- 预取数量(一次从RabbitMQ Server上取到的数据数量),prefetch="10" -->
<rabbit:listener-container id="listner.test.1"
error-handler="" acknowledge="auto" concurrency="3" transaction-size="10"
prefetch="10" max-concurrency="5" auto-startup="true" advice-chain="adviceChain"
auto-declare="true" channel-transacted="true" connection-factory="rabbitConnectionFactory">
<rabbit:listener queues="queue.test.1" ref="rabbitmqReceiverTest"
method="handTest1Msg" />
</rabbit:listener-container>
<rabbit:listener-container id="listner.test.2"
error-handler="" acknowledge="auto" concurrency="3"
transaction-size="10" prefetch="10" max-concurrency="5" auto-startup="true"
advice-chain="adviceChain" auto-declare="true" channel-transacted="true"
connection-factory="rabbitConnectionFactory">
<rabbit:listener queues="queue.test.2" ref="rabbitmqReceiverTest"
method="handTest2Msg" />
</rabbit:listener-container>
<rabbit:listener-container id="listner.test.3"
error-handler="" acknowledge="auto" concurrency="3"
transaction-size="10" prefetch="10" max-concurrency="5" auto-startup="true"
advice-chain="adviceChain" auto-declare="true" channel-transacted="true"
connection-factory="rabbitConnectionFactory">
<rabbit:listener queues="queue.test.3" ref="rabbitmqReceiverTest"
method="handTest3Msg" />
</rabbit:listener-container>
示例代码:
package cn.com.easy.rabbitmq.receive;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.stereotype.Service;
import cn.com.easy.utils.FastJSONUtils;
/**
*
* @author nibili 2015年5月1日
*
*/
@Service
public class RabbitmqReceiverTest {
@SuppressWarnings("resource")
public static void main(String[] args) {
new ClassPathXmlApplicationContext("/applicationContext-rabbitmq-receive-demo.xml");
}
/**
public void handTest1Msg(Object o) {
System.out.println("Test1 msg:" + FastJSONUtils.toJsonString(o));
}
/**
public void handTest2Msg(Object o) {
System.out.println("Test2 msg:" + FastJSONUtils.toJsonString(o));
}
/**
public void handTest3Msg(Object o) {
System.out.println("Test3 msg:" + FastJSONUtils.toJsonString(o));
}
}
三,发送消息
<description>RabbitMQ示例 发送消息配置</description>
<!--属性文件 -->
<context:property-placeholder location="classpath*:demo.properties" />
<!-- 基础配置,注解声明,注解扫描路径等 -->
<import resource="classpath*:/applicationContext-base.xml" />
<!--引入基础配置 -->
<import resource="classpath*:/applicationContext-rabbitmq.xml" />
<!-- auto-delete="true",在监听器退出时,声明的队列将会被删除;exclusive="true",排他锁,声明的队列只能被当前对象使用 -->
<rabbit:queue id="queue.test.1" name="queue.test.1"
auto-declare="true" declared-by="rabbitAdmin" auto-delete="false"
exclusive="false" durable="true" />
<!-- -->
<rabbit:queue id="queue.test.2" name="queue.test.2"
auto-declare="true" declared-by="rabbitAdmin" auto-delete="false"
exclusive="false" durable="true" />
<!-- -->
<rabbit:queue id="queue.test.3" name="queue.test.3"
auto-declare="true" declared-by="rabbitAdmin" auto-delete="false"
exclusive="false" durable="true" />
<!-- direct 交换器 ,auto-delete="true"时,当声明Exchange的连接断开时,Exchange会被删除掉 -->
<rabbit:direct-exchange id="exchange.direct"
auto-declare="true" name="exchange.direct" auto-delete="false"
durable="true" declared-by="rabbitAdmin">
<rabbit:bindings>
<rabbit:binding key="queue.test.1" queue="queue.test.1" />
<rabbit:binding key="queue.test.2" queue="queue.test.2" />
<rabbit:binding key="queue.test.3" queue="queue.test.3" />
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- topic 交换器 ,auto-delete="true"时,当声明Exchange的连接断开时,Exchange会被删除掉 -->
<rabbit:topic-exchange id="exchange.topic"
auto-declare="true" name="exchange.topic" auto-delete="false" durable="true"
declared-by="rabbitAdmin">
<rabbit:bindings>
<rabbit:binding pattern="#.1" queue="queue.test.1"
exchange="" />
<rabbit:binding pattern="queue.*.2" queue="queue.test.2"
exchange="" />
<rabbit:binding pattern="queue.test.3" queue="queue.test.3"
exchange="" />
</rabbit:bindings>
</rabbit:topic-exchange>
<!-- fanout 交换器 ,auto-delete="true"时,当声明Exchange的连接断开时,Exchange会被删除掉 -->
<rabbit:fanout-exchange id="exchange.fanout"
auto-declare="true" name="exchange.fanout" auto-delete="false"
durable="true" declared-by="rabbitAdmin">
<rabbit:bindings>
<rabbit:binding queue="queue.test.1" exchange="" />
<rabbit:binding queue="queue.test.2" exchange="" />
<rabbit:binding queue="queue.test.3" exchange="" />
</rabbit:bindings>
</rabbit:fanout-exchange>
注意:
1、队列名称不能以 "AMP"开头,队列名以"AMP"开头的队列将不会被创建,这是在Spring-rabbit中控制的;
2、发送的对象必须继承 序列化接口Serializable 或者 String类型 或者 byte[]数组,否者对象不会被发送,而会发送一个空的字符“”;
Spring-rabbit源码对发送对象的一段处理,object即为要发送的对象
/**
@Override
protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
byte[] bytes = null;
if (object instanceof byte[]) {
bytes = (byte[]) object;
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_BYTES);
}
else if (object instanceof String) {
try {
bytes = ((String) object).getBytes(this.defaultCharset);
}
catch (UnsupportedEncodingException e) {
throw new MessageConversionException(
"failed to convert to Message content", e);
}
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
messageProperties.setContentEncoding(this.defaultCharset);
}
else if (object instanceof Serializable) {
try {
bytes = SerializationUtils.serialize(object);
} catch (IllegalArgumentException e) {
throw new MessageConversionException(
"failed to convert to serialized Message content", e);
}
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_SERIALIZED_OBJECT);
}
if (bytes != null) {
messageProperties.setContentLength(bytes.length);
}
return new Message(bytes, messageProperties);
}