当前位置: 首页>大数据>正文

【ActiveMQ】5.ActiveMQ的自动重连实验过程

业务需求

ActiveMQ 服务重启后,我们的项目仍然可以实现对队列的监听。
目前:我们每次重启ActiveMQ,需要重启项目。

尝试

ActiveMQ failover

修改yml:

broker-url: failover:(tcp://127.0.01:61616) # 故障转移机制
...

可以实现项目启动后的自动重连,但是项目初次启动时必须要求ActiveMQ的服务能连接上。
找问题发现是 ActiveMQ 监听的问题,导致项目初次启动失败

初步考虑了几种实现方法:

  • 另起线程监听
  • 监听类的Lazy
  • 定时任务修改broker-url

1.在监听类 添加Lazy
不管ActiveMQ服务是否启动,项目初次启动都成功,但是重启 ActiveMQ 监听不到数据。
不修改 broker-url: failover,将Listener 定义为 Lazy ,并且在Listener中写一个空方法,在一个前端请求的接口中注入Listener 并且该调用空方法进行创建消费者。

测试问题:能够实现功能需求,但是刷新页面会创建多个消费者,后来测试又不能创建消费者了(这个问题不知道为啥)

最终实现方式

不采用注解监听的方式

 public void startListen() throws JMSException {
        Connection connection = JmsUtils.connection;
        if (connection == null){
            connection = connectionFactory.createConnection();
            connection.start();
            JmsUtils.connection = connection;
        }
        Session session = JmsUtils.session;
        if (session == null){
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            JmsUtils.session = session;
        }
        MessageConsumer consumer = JmsUtils.consumer;
        if (consumer == null){
            consumer = session.createConsumer(session.createQueue(ywmptMq));
            JmsUtils.consumer = consumer;
        }
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                try {
                    readActiveYwmptQueue(((TextMessage)message).getText());
                } catch (JMSException e) {
                    System.err.println("报错了:" + e.getMessage());
                }
            }
        });
    }

JmsUtils.java

@Data
public class JmsUtils {
    public static Connection connection;
    public static MessageConsumer consumer;
    public static Session session;
}

定时任务重连

   @Scheduled(fixedDelay = 5000L)
    public void startListener() {
        try {
            activeMQQueueListener.startListen()
        } catch (Exception e) {
//            e.printStackTrace()
            log.info("JMS 连接异常")
            //把static 变量置空
            JmsUtils.connection?.close()
            JmsUtils.connection = null
            JmsUtils.session?.close()
            JmsUtils.session = null
            JmsUtils.consumer?.close()
            JmsUtils.consumer = null
        }
    }

关闭项目还存在消费者,解决方式:closeListener

    void closeConsumer(){
        try {
            Connection connection = JmsUtils.connection;

            if (connection != null){
                connection.close()
                log.info("关闭connection成功!")
            }
            Session session = JmsUtils.session
            if (session != null){
                session.close()
                log.info("关闭session成功!")
            }
            MessageConsumer consumer = JmsUtils.consumer
            if (consumer != null){
                consumer.close()
                log.info("关闭consumer成功!")
            }
        } catch (Exception e) {
            log.error("关闭JMS消费者失败:",e)
        }
    }

https://www.xamrdz.com/bigdata/7de1995246.html

相关文章: