当前位置: 首页>后端>正文

TTL+死信队列 组合实现延迟队列的效果

效果流程

TTL+死信队列 组合实现延迟队列的效果,第1张

生成者与消费者公用的pom.xml

<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>2.2.12.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>5.2.6.RELEASE</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>5.2.6.RELEASE</version> </dependency></dependencies><build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.2</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins></build>

生产者编制

rabbitmq.properties? 配置文件

rabbitmq.host=localhost

rabbitmq.port=5672

rabbitmq.virtue_host=/

rabbitmq.username=guest

rabbitmq.password=guest

spring-rabbitmq.xml配置文件

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

? ? ? xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

? ? ? xmlns:context="http://www.springframework.org/schema/context"

? ? ? xmlns:rabbit="http://www.springframework.org/schema/rabbit"

? ? ? xmlns:rabbidt="http://www.springframework.org/schema/rabbit"

? ? ? xsi:schemaLocation="http://www.springframework.org/schema/beans

http://www.springframework.org/schema/beans/spring-beans.xsd

http://www.springframework.org/schema/context

https://www.springframework.org/schema/context/spring-context.xsd

http://www.springframework.org/schema/rabbit

? ? ? http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

? ? <context:property-placeholder location="classpath:rabbitmq.properties"/>

数据确认模式开启:publisher-confirms="true"

数据回退开启 publisher-returns="true"-->

? ? <rabbit:connection-factory id="factory" host="${rabbitmq.host}"

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? username="${rabbitmq.username}"

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? password="${rabbitmq.password}"

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? port="${rabbitmq.port}"

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? virtual-host="${rabbitmq.virtue_host}"

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? publisher-confirms="true"

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? publisher-returns="true"/>

? ? <rabbit:admin connection-factory="factory"/>

? ? <rabbit:template connection-factory="factory" id="rabbitTemplate"/>

? ? <rabbit:queue name="normal_queue" >

<rabbit:queue-arguments>

? ? ? ? ? ? <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>

? ? ? ? ? ? <entry key="x-dead-letter-exchange" value="dlx_exchange"/>

? ? ? ? ? ? <entry key="x-dead-letter-routing-key" value="dlx.key"/>

</rabbit:queue-arguments>

</rabbit:queue>

? ? <rabbit:topic-exchange name="nomal_exchange">

? ? ? ? <rabbit:bindings>

<rabbit:binding pattern="nomal.#" queue="normal_queue"></rabbit:binding>

</rabbit:bindings>

</rabbit:topic-exchange>

? ? <rabbit:queue name="dlx_queue"/>

? ? <rabbit:topic-exchange name="dlx_exchange">

? ? ? ? <rabbit:bindings>

? ? ? ? ? ? <rabbit:binding pattern="dlx.*" queue="dlx_queue"></rabbit:binding>

</rabbit:bindings>

</rabbit:topic-exchange>

</beans>

测试启动文件

@RunWith(SpringRunner.class)

@ContextConfiguration(locations ="classpath:spring-rabbitmq.xml")

public class TestRabbitmq {

//导入rabbitmq模板

? ? @Autowired

? private RabbitTemplaterabbitTemplate;

@Test

? ? public void testRabbitmq(){

//设置交换机是否发送到队列信息的确认机制

? ? ? ? rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {

@Override

? ? ? ? ? ? public void confirm(CorrelationData correlationData,boolean ack, String cause) {

// 第一个参数是连接数据 第二个参数是否以及确认 第三个是发送失败的原因

? ? ? ? ? ? ? ? if(ack){

System.out.println("信息已经发送到队列" );

}else {

System.out.println("信息未发送到队列:失败的原因"+cause);

}

}

});

//设置数据回退信息

? ? ? ? rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {

@Override

? ? ? ? ? ? public void returnedMessage(Message message,int replyCode, String replyText, String exchange, String routingKey) {

//第一个参数是发送的信息? 第二个是回复的错误的状态码 第三个是错误信息 第四个是交换机 第五个是路由键

? ? ? ? ? ? ? ? System.out.println(new String(message.getBody()));

System.out.println("replyCode="+replyCode);

System.out.println("replyText="+replyText);

System.out.println("exchange="+exchange);

System.out.println("routingKey="+routingKey);

}

});

//发送数据给正常的交换机

//第一个参数是交换机名 第二个是路由原则 第三个是发送的信息

? ? ? ? rabbitTemplate.convertAndSend("nomal_exchange","nomal.app","发送信息进行测试");

}

}

消费者创建

rabbitmq.properties配置文件

rabbitmq.host=localhost

rabbitmq.port=5672

rabbitmq.virtue_host=/

rabbitmq.username=guest

rabbitmq.password=guest


spring-rabbitmq.xml配置文件

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

? ? ? xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

? ? ? xmlns:context="http://www.springframework.org/schema/context"

? ? ? xmlns:rabbit="http://www.springframework.org/schema/rabbit"

? ? ? xmlns:rabbidt="http://www.springframework.org/schema/rabbit"

? ? ? xsi:schemaLocation="http://www.springframework.org/schema/beans

http://www.springframework.org/schema/beans/spring-beans.xsd

http://www.springframework.org/schema/context

https://www.springframework.org/schema/context/spring-context.xsd

http://www.springframework.org/schema/rabbit

? ? ? http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

? ? <context:property-placeholder location="classpath:rabbitmq.properties"/>

-->

? ? <rabbit:connection-factory id="factory" host="${rabbitmq.host}"

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? username="${rabbitmq.username}"

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? password="${rabbitmq.password}"

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? port="${rabbitmq.port}"

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? virtual-host="${rabbitmq.virtue_host}"

? ? />

? ? <bean class="Consumer" id="consumer"/>

prefetch:表示签收数据量,可以起到限流作用

acknowledge为manual时是手动签收

-->

? ? <rabbit:listener-container connection-factory="factory" prefetch="1" acknowledge="manual">

? ? ? ? <rabbit:listener ref="consumer" queue-names="dlx_queue"/>

</rabbit:listener-container>

</beans>

Consumer类

public class Consumerimplements ChannelAwareMessageListener {

@Override

? ? public void onMessage(Message message, Channel channel)throws Exception {

//获取发送过来的标识

? ? ? ? long deliveryTag = message.getMessageProperties().getDeliveryTag();

try {

//接收发送过来的数据

? ? ? ? ? ? System.out.println(new String(message.getBody()));

//做业务逻辑

? ? ? ? ? ? System.out.println("接收数据正常" );

//正常接收就自动确认

? ? ? ? ? ? channel.basicAck(deliveryTag,true);

}catch (IOException e) {

e.printStackTrace();

//接收异常,回滚数据

//第一个参数是标识 第二个是是否确认拒绝签收 第三个是否回滚数据,重新发送(true),要是为false 数据就会抛弃

? ? ? ? ? ? channel.basicNack(deliveryTag,true,true);

}

}

}

测试类

@RunWith(SpringRunner.class)

@ContextConfiguration(locations ="classpath:spring-rabbitmq.xml")

public class TestRabbitmq {

@Test

? ? public void test(){

while (true){

}

}

}


https://www.xamrdz.com/backend/3sg1941833.html

相关文章: