效果流程
生成者与消费者公用的pom.xml
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>2.2.12.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>5.2.6.RELEASE</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>5.2.6.RELEASE</version> </dependency></dependencies><build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.2</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins></build>
生产者编制
rabbitmq.properties? 配置文件
rabbitmq.host=localhost
rabbitmq.port=5672
rabbitmq.virtue_host=/
rabbitmq.username=guest
rabbitmq.password=guest
spring-rabbitmq.xml配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
? ? ? xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
? ? ? xmlns:context="http://www.springframework.org/schema/context"
? ? ? xmlns:rabbit="http://www.springframework.org/schema/rabbit"
? ? ? xmlns:rabbidt="http://www.springframework.org/schema/rabbit"
? ? ? xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
? ? ? http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
? ? <context:property-placeholder location="classpath:rabbitmq.properties"/>
数据确认模式开启:publisher-confirms="true"
数据回退开启 publisher-returns="true"-->
? ? <rabbit:connection-factory id="factory" host="${rabbitmq.host}"
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? username="${rabbitmq.username}"
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? password="${rabbitmq.password}"
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? port="${rabbitmq.port}"
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? virtual-host="${rabbitmq.virtue_host}"
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? publisher-confirms="true"
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? publisher-returns="true"/>
? ? <rabbit:admin connection-factory="factory"/>
? ? <rabbit:template connection-factory="factory" id="rabbitTemplate"/>
? ? <rabbit:queue name="normal_queue" >
<rabbit:queue-arguments>
? ? ? ? ? ? <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>
? ? ? ? ? ? <entry key="x-dead-letter-exchange" value="dlx_exchange"/>
? ? ? ? ? ? <entry key="x-dead-letter-routing-key" value="dlx.key"/>
</rabbit:queue-arguments>
</rabbit:queue>
? ? <rabbit:topic-exchange name="nomal_exchange">
? ? ? ? <rabbit:bindings>
<rabbit:binding pattern="nomal.#" queue="normal_queue"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
? ? <rabbit:queue name="dlx_queue"/>
? ? <rabbit:topic-exchange name="dlx_exchange">
? ? ? ? <rabbit:bindings>
? ? ? ? ? ? <rabbit:binding pattern="dlx.*" queue="dlx_queue"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
</beans>
测试启动文件
@RunWith(SpringRunner.class)
@ContextConfiguration(locations ="classpath:spring-rabbitmq.xml")
public class TestRabbitmq {
//导入rabbitmq模板
? ? @Autowired
? private RabbitTemplaterabbitTemplate;
@Test
? ? public void testRabbitmq(){
//设置交换机是否发送到队列信息的确认机制
? ? ? ? rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
? ? ? ? ? ? public void confirm(CorrelationData correlationData,boolean ack, String cause) {
// 第一个参数是连接数据 第二个参数是否以及确认 第三个是发送失败的原因
? ? ? ? ? ? ? ? if(ack){
System.out.println("信息已经发送到队列" );
}else {
System.out.println("信息未发送到队列:失败的原因"+cause);
}
}
});
//设置数据回退信息
? ? ? ? rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
? ? ? ? ? ? public void returnedMessage(Message message,int replyCode, String replyText, String exchange, String routingKey) {
//第一个参数是发送的信息? 第二个是回复的错误的状态码 第三个是错误信息 第四个是交换机 第五个是路由键
? ? ? ? ? ? ? ? System.out.println(new String(message.getBody()));
System.out.println("replyCode="+replyCode);
System.out.println("replyText="+replyText);
System.out.println("exchange="+exchange);
System.out.println("routingKey="+routingKey);
}
});
//发送数据给正常的交换机
//第一个参数是交换机名 第二个是路由原则 第三个是发送的信息
? ? ? ? rabbitTemplate.convertAndSend("nomal_exchange","nomal.app","发送信息进行测试");
}
}
消费者创建
rabbitmq.properties配置文件
rabbitmq.host=localhost
rabbitmq.port=5672
rabbitmq.virtue_host=/
rabbitmq.username=guest
rabbitmq.password=guest
spring-rabbitmq.xml配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
? ? ? xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
? ? ? xmlns:context="http://www.springframework.org/schema/context"
? ? ? xmlns:rabbit="http://www.springframework.org/schema/rabbit"
? ? ? xmlns:rabbidt="http://www.springframework.org/schema/rabbit"
? ? ? xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
? ? ? http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
? ? <context:property-placeholder location="classpath:rabbitmq.properties"/>
-->
? ? <rabbit:connection-factory id="factory" host="${rabbitmq.host}"
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? username="${rabbitmq.username}"
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? password="${rabbitmq.password}"
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? port="${rabbitmq.port}"
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? virtual-host="${rabbitmq.virtue_host}"
? ? />
? ? <bean class="Consumer" id="consumer"/>
prefetch:表示签收数据量,可以起到限流作用
acknowledge为manual时是手动签收
-->
? ? <rabbit:listener-container connection-factory="factory" prefetch="1" acknowledge="manual">
? ? ? ? <rabbit:listener ref="consumer" queue-names="dlx_queue"/>
</rabbit:listener-container>
</beans>
Consumer类
public class Consumerimplements ChannelAwareMessageListener {
@Override
? ? public void onMessage(Message message, Channel channel)throws Exception {
//获取发送过来的标识
? ? ? ? long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//接收发送过来的数据
? ? ? ? ? ? System.out.println(new String(message.getBody()));
//做业务逻辑
? ? ? ? ? ? System.out.println("接收数据正常" );
//正常接收就自动确认
? ? ? ? ? ? channel.basicAck(deliveryTag,true);
}catch (IOException e) {
e.printStackTrace();
//接收异常,回滚数据
//第一个参数是标识 第二个是是否确认拒绝签收 第三个是否回滚数据,重新发送(true),要是为false 数据就会抛弃
? ? ? ? ? ? channel.basicNack(deliveryTag,true,true);
}
}
}
测试类
@RunWith(SpringRunner.class)
@ContextConfiguration(locations ="classpath:spring-rabbitmq.xml")
public class TestRabbitmq {
@Test
? ? public void test(){
while (true){
}
}
}