1. RocketMQ 使用
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>5.1.4</version>
</dependency>
1.1 Spring Boot 整合 RocketMQ 包
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
2. 生产者 (Producer) 示例
@Test
public void sendDemo() throws MQClientException {
MessageProducer messageProducer = new MessageProducer();
messageProducer.init();
String keys = UUID.randomUUID().toString().replace("-", "");
MessageExt messageExt = new MessageExt();
// 消息头
messageExt.putUserProperty("SCENE_TYPE", "CREATE_ORDER");
// 消息体
OrderDTO orderDTO = new OrderDTO("S001", "10001");
messageExt.setBody(JSON.toJSONBytes(orderDTO));
// 发送同步消息,并获取结果
ResultDTO<SendResult> sendResult = messageProducer.send("my_topic", "my_tag", messageExt, keys, true);
System.out.println(sendResult);
}
2.1 发送同步消息
@RestController
@RequestMapping("/message")
public class MessageController {
@Resource
private MessageProducer messageProducer;
@RequestMapping("/send")
public String send() {
MessageExt messageExt = new MessageExt();
// 消息头
messageExt.putUserProperty("SCENE_TYPE", "CREATE_ORDER");
// 消息体
OrderDTO orderDTO = new OrderDTO("S001", "10001");
messageExt.setBody(JSON.toJSONBytes(orderDTO));
String keys = UUID.randomUUID().toString().replace("-", "");
// 发送同步消息,并获取结果
ResultDTO<SendResult> sendResult = messageProducer.send("my_topic", "my_tag", messageExt, keys, true);
return JSON.toJSONString(sendResult);
}
}
@Component
public class MessageProducer {
private final static Logger logger = LoggerFactory.getLogger(MessageProducer.class);
private DefaultMQProducer producer;
@PostConstruct
public void init() throws MQClientException {
producer = new DefaultMQProducer("producer_group");
producer.start();
}
@PreDestroy
public void shutdown() {
if (producer != null) {
producer.shutdown();
logger.info("rocket_producer_destroy_success");
}
}
public ResultDTO<SendResult> send(String topic, String tags, MessageExt messageExt, String keys, boolean printLog) {
if (printLog) {
logger.info("send_message_params, topic={}, tags={}, messageExt={}, body={}",
topic, tags, messageExt, new String(messageExt.getBody()));
}
try {
messageExt.setTopic(topic);
messageExt.setTags(tags);
messageExt.setKeys(keys);
SendResult sendResult = producer.send(messageExt);
if (sendResult != null && SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
logger.info("send_message_success, messageId={}", sendResult.getMsgId());
}
return ResultDTO.success(sendResult);
} catch (Exception e) {
logger.error("send_message_exception, topic={}, tags={}, messageExt={}, body={}, exception={}",
topic, tags, messageExt, new String(messageExt.getBody()), e);
return ResultDTO.fail("send_message_error", e.getMessage());
}
}
}
2.2 发送异步消息
@RequestMapping("/sendAsync")
public String sendAsync() {
MessageExt messageExt = new MessageExt();
messageExt.putUserProperty("SCENE_TYPE", "CREATE_ORDER");
OrderDTO orderDTO = new OrderDTO("S001", "10001");
messageExt.setBody(JSON.toJSONBytes(orderDTO));
String keys = UUID.randomUUID().toString().replace("-", "");
messageProducer.sendAsync("my_topic", "my_tag", messageExt, keys, true);
return "send async message success";
}
/**
* 发送异步消息
*/
public void sendAsync(String topic, String tags, MessageExt messageExt, String keys, boolean printLog) {
if (printLog) {
logger.info("send_message_params, topic={}, tags={}, messageExt={}, body={}",
topic, tags, messageExt, new String(messageExt.getBody()));
}
try {
messageExt.setTopic(topic);
messageExt.setTags(tags);
messageExt.setKeys(keys);
// 设置回调函数
producer.send(messageExt, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
logger.info("send_message_onSuccess, messageId={}", sendResult.getMsgId());
}
@Override
public void onException(Throwable throwable) {
throwable.printStackTrace();
logger.error("send_message_onException, exception={}", throwable.getMessage());
}
});
} catch (Exception e) {
logger.error("send_message_exception, topic={}, tags={}, messageExt={}, body={}, exception={}",
topic, tags, messageExt, new String(messageExt.getBody()), e);
}
}
2.3 失败重试
public boolean send(String topic, String tags, byte[] msg, String keys, Map<String, String> userProperties) {
if (StringUtils.isBlank(topic) || msg == null) {
return false;
}
Message message = new Message(topic, tags, msg);
if (StringUtils.isNotBlank(keys)) {
message.setKeys(keys);
}
if (MapUtils.isNotEmpty(userProperties)) {
userProperties.forEach(message::putUserProperty);
}
// 失败后自动重试两次
boolean sendResult = send(message);
if (!sendResult) {
sendResult = send(message);
}
if (!sendResult) {
sendResult = send(message);
}
if (!sendResult) {
logger.error("send_message_exception, topic={}, tags={}, message={}", topic, tags, new String(msg));
}
return true;
}
protected boolean send(Message message) {
try {
SendResult result = producer.send(message);
if (result != null && SendStatus.SEND_OK.equals(result.getSendStatus())) {
return true;
}
} catch (Exception e) {
logger.error("send_message_exception, message={}, exception={}", message, e);
}
return false;
}
3. 消费者 (Consumer) 示例
@Component
public class MessageConsumer {
private final static Logger logger = LoggerFactory.getLogger(MessageConsumer.class);
private DefaultMQPushConsumer consumer;
@PostConstruct
public void init() throws MQClientException {
consumer = new DefaultMQPushConsumer("consumer_group");
consumer.setNamesrvAddr("localhost:9876");
// 订阅Topic下所有Tag的消息
// consumer.subscribe("my_topic", "*");
consumer.subscribe("my_topic", "tag_test");
// CONSUME_FROM_LAST_OFFSET(默认值):从最新的消息开始消费,即忽略之前的所有消息
// consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
// 批量消费的消息数量上限
consumer.setConsumeMessageBatchMaxSize(1);
// 消息事件监听
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
try {
for (MessageExt message : messages) {
logger.info("consumer_message, topic={}, tags={}, msgId={}, properties={}, body={}",
message.getTopic(), message.getTags(), message.getMsgId(), message.getProperties(), new String(message.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
// 消息重试
logger.error("consumer_message_exception_retry, topic=my_topic, tags=tag_test, exception={}", e.getMessage());
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
});
consumer.start();
logger.info("consumer_message_success");
}
}
3.2 消息过滤
public void consumerMessage() throws MQClientException {
consumer = new DefaultMQPushConsumer("consumer_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.setConsumeMessageBatchMaxSize(1);
/**
* subscribe(String topic, MessageSelector messageSelector)
*
* 按内容过滤:MessageSelector.bySql("status = 1001 or status = 1002");
* 按属性过滤:MessageSelector.bySql("userProperties['sceneType'] = '1'");
*/
// 消息过滤
consumer.subscribe("my_topic", "tag1 || tag2");
// consumer.subscribe("my_topic", MessageSelector.byTag("tag1 || tag2"));
// 消息事件监听
consumer.registerMessageListener((MessageListenerConcurrently) (messages, context) -> {
try {
for (MessageExt message : messages) {
// do something...
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
consumer.start();
logger.info("consumer_message_success");
}
3.3 RocketMQ:Message filtering
文档 https://www.alibabacloud.com/help/en/apsaramq-for-rocketmq/cloud-message-queue-rocketmq-5-x-series/developer-reference/message-filtering?spm=a2c63.p38356.0.0.2e626814CogIhJ
SQL92 syntax | Description | Example |
---|---|---|
AND OR
|
||
= <>
|
||
IS NULL | ||
IS NOT NULL | Attribute a exists | |
> >= < <=
|
||
IN (xxx, xxx) | ||
BETWEEN xxx AND xxx | ||
NOT BETWEEN xxx AND xxx |