当前位置: 首页>后端>正文

RabbitMQ java客户端(发布和订阅模式)


 // HAProxy 代理地址 (没有用LB的话直接写具体节点的地址即可)
    private static final String HOST_MQ = "192.168.150.11";

    //  private static final int PORT_MQ = 5672;
    private static final String USERNAME = "admin";
    private static final String PASSWORD = "admin";

    private static Channel channel = null;

    private static Connection connection = null;

    /**
     * 获取 MQ 连接对象 发布消息
     *
     * @return
     */
    public static void getConnection(byte[] body) {
        String EXCHANGE_NAME = "MQ_1";
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(HOST_MQ);
        connectionFactory.setUsername(USERNAME);
        connectionFactory.setPassword(PASSWORD);
        try (Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel()) {

            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            /**
             * 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显示绑定或解除绑定
             * 默认的交换机,routingKey等于队列名称
             */
            channel.basicPublish(EXCHANGE_NAME, "", null, body);
        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (TimeoutException e) {
            throw new RuntimeException(e);
        }
    }

    //消息消费者 (可以开多个消费者增加消费速度)
    public static void getConsumer() {
        String EXCHANGE_NAME = "MQ_2";
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost(HOST_MQ);
            connectionFactory.setUsername(USERNAME);
            connectionFactory.setPassword(PASSWORD);

            connection = connectionFactory.newConnection();
            channel = connection.createChannel();

            //fanout 扇出类型
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            String queueName = channel.queueDeclare().getQueue();
            channel.queueBind(queueName, EXCHANGE_NAME, "");

            //回调
            DeliverCallback deliverCallback = ((consumerTag, message) -> {
                myPublishHandle(consumerTag, message);
            });
            channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (TimeoutException e) {
            throw new RuntimeException(e);
        }
    }


https://www.xamrdz.com/backend/3h21944460.html

相关文章: