// HAProxy 代理地址 (没有用LB的话直接写具体节点的地址即可)
private static final String HOST_MQ = "192.168.150.11";
// private static final int PORT_MQ = 5672;
private static final String USERNAME = "admin";
private static final String PASSWORD = "admin";
private static Channel channel = null;
private static Connection connection = null;
/**
* 获取 MQ 连接对象 发布消息
*
* @return
*/
public static void getConnection(byte[] body) {
String EXCHANGE_NAME = "MQ_1";
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(HOST_MQ);
connectionFactory.setUsername(USERNAME);
connectionFactory.setPassword(PASSWORD);
try (Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
/**
* 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显示绑定或解除绑定
* 默认的交换机,routingKey等于队列名称
*/
channel.basicPublish(EXCHANGE_NAME, "", null, body);
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
}
//消息消费者 (可以开多个消费者增加消费速度)
public static void getConsumer() {
String EXCHANGE_NAME = "MQ_2";
try {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(HOST_MQ);
connectionFactory.setUsername(USERNAME);
connectionFactory.setPassword(PASSWORD);
connection = connectionFactory.newConnection();
channel = connection.createChannel();
//fanout 扇出类型
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
//回调
DeliverCallback deliverCallback = ((consumerTag, message) -> {
myPublishHandle(consumerTag, message);
});
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
}