当前位置: 首页>前端>正文

RocketMQ 基本用法

1. RocketMQ 使用

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>5.1.4</version>
</dependency>

1.1 Spring Boot 整合 RocketMQ 包

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.3</version>
</dependency>

2. 生产者 (Producer) 示例

@Test
public void sendDemo() throws MQClientException {
    MessageProducer messageProducer = new MessageProducer();
    messageProducer.init();

    String keys = UUID.randomUUID().toString().replace("-", "");
    MessageExt messageExt = new MessageExt();
    // 消息头
    messageExt.putUserProperty("SCENE_TYPE", "CREATE_ORDER");
    // 消息体
    OrderDTO orderDTO = new OrderDTO("S001", "10001");
    messageExt.setBody(JSON.toJSONBytes(orderDTO));
    // 发送同步消息,并获取结果
    ResultDTO<SendResult> sendResult = messageProducer.send("my_topic", "my_tag", messageExt, keys, true);
    System.out.println(sendResult);
}

2.1 发送同步消息

@RestController
@RequestMapping("/message")
public class MessageController {

    @Resource
    private MessageProducer messageProducer;

    @RequestMapping("/send")
    public String send() {
        MessageExt messageExt = new MessageExt();
        // 消息头
        messageExt.putUserProperty("SCENE_TYPE", "CREATE_ORDER");
        // 消息体
        OrderDTO orderDTO = new OrderDTO("S001", "10001");
        messageExt.setBody(JSON.toJSONBytes(orderDTO));
        String keys = UUID.randomUUID().toString().replace("-", "");
        // 发送同步消息,并获取结果
        ResultDTO<SendResult> sendResult = messageProducer.send("my_topic", "my_tag", messageExt, keys, true);
        return JSON.toJSONString(sendResult);
    }
}
@Component
public class MessageProducer {
    private final static Logger logger = LoggerFactory.getLogger(MessageProducer.class);

    private DefaultMQProducer producer;

    @PostConstruct
    public void init() throws MQClientException {
        producer = new DefaultMQProducer("producer_group");
        producer.start();
    }

    @PreDestroy
    public void shutdown() {
        if (producer != null) {
            producer.shutdown();
            logger.info("rocket_producer_destroy_success");
        }
    }

    public ResultDTO<SendResult> send(String topic, String tags, MessageExt messageExt, String keys, boolean printLog) {
        if (printLog) {
            logger.info("send_message_params, topic={}, tags={}, messageExt={}, body={}",
                    topic, tags, messageExt, new String(messageExt.getBody()));
        }
        try {
            messageExt.setTopic(topic);
            messageExt.setTags(tags);
            messageExt.setKeys(keys);

            SendResult sendResult = producer.send(messageExt);
            if (sendResult != null && SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
                logger.info("send_message_success, messageId={}", sendResult.getMsgId());
            }
            return ResultDTO.success(sendResult);
        } catch (Exception e) {
            logger.error("send_message_exception, topic={}, tags={}, messageExt={}, body={}, exception={}",
                    topic, tags, messageExt, new String(messageExt.getBody()), e);
            return ResultDTO.fail("send_message_error", e.getMessage());
        }
    }
}

2.2 发送异步消息

@RequestMapping("/sendAsync")
public String sendAsync() {
    MessageExt messageExt = new MessageExt();
    messageExt.putUserProperty("SCENE_TYPE", "CREATE_ORDER");
    OrderDTO orderDTO = new OrderDTO("S001", "10001");
    messageExt.setBody(JSON.toJSONBytes(orderDTO));
    String keys = UUID.randomUUID().toString().replace("-", "");

    messageProducer.sendAsync("my_topic", "my_tag", messageExt, keys, true);

    return "send async message success";
}
/**
 * 发送异步消息
 */
public void sendAsync(String topic, String tags, MessageExt messageExt, String keys, boolean printLog) {
    if (printLog) {
        logger.info("send_message_params, topic={}, tags={}, messageExt={}, body={}",
                topic, tags, messageExt, new String(messageExt.getBody()));
    }
    try {
        messageExt.setTopic(topic);
        messageExt.setTags(tags);
        messageExt.setKeys(keys);

        // 设置回调函数
        producer.send(messageExt, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                logger.info("send_message_onSuccess, messageId={}", sendResult.getMsgId());
            }

            @Override
            public void onException(Throwable throwable) {
                throwable.printStackTrace();
                logger.error("send_message_onException, exception={}", throwable.getMessage());
            }
        });
    } catch (Exception e) {
        logger.error("send_message_exception, topic={}, tags={}, messageExt={}, body={}, exception={}",
                topic, tags, messageExt, new String(messageExt.getBody()), e);
    }
}

2.3 失败重试

public boolean send(String topic, String tags, byte[] msg, String keys, Map<String, String> userProperties) {
    if (StringUtils.isBlank(topic) || msg == null) {
        return false;
    }
    Message message = new Message(topic, tags, msg);
    if (StringUtils.isNotBlank(keys)) {
        message.setKeys(keys);
    }
    if (MapUtils.isNotEmpty(userProperties)) {
        userProperties.forEach(message::putUserProperty);
    }
    // 失败后自动重试两次
    boolean sendResult = send(message);
    if (!sendResult) {
        sendResult = send(message);
    }
    if (!sendResult) {
        sendResult = send(message);
    }
    if (!sendResult) {
        logger.error("send_message_exception, topic={}, tags={}, message={}", topic, tags, new String(msg));
    }
    return true;
}

protected boolean send(Message message) {
    try {
        SendResult result = producer.send(message);
        if (result != null && SendStatus.SEND_OK.equals(result.getSendStatus())) {
            return true;
        }
    } catch (Exception e) {
        logger.error("send_message_exception, message={}, exception={}", message, e);
    }
    return false;
}

3. 消费者 (Consumer) 示例

@Component
public class MessageConsumer {
    private final static Logger logger = LoggerFactory.getLogger(MessageConsumer.class);

    private DefaultMQPushConsumer consumer;

    @PostConstruct
    public void init() throws MQClientException {
        consumer = new DefaultMQPushConsumer("consumer_group");
        consumer.setNamesrvAddr("localhost:9876");

        // 订阅Topic下所有Tag的消息
        // consumer.subscribe("my_topic", "*");
        consumer.subscribe("my_topic", "tag_test");

        // CONSUME_FROM_LAST_OFFSET(默认值):从最新的消息开始消费,即忽略之前的所有消息
        // consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

        // 批量消费的消息数量上限
        consumer.setConsumeMessageBatchMaxSize(1);

        // 消息事件监听
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
                try {
                    for (MessageExt message : messages) {
                        logger.info("consumer_message, topic={}, tags={}, msgId={}, properties={}, body={}",
                                message.getTopic(), message.getTags(), message.getMsgId(), message.getProperties(), new String(message.getBody()));
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                } catch (Exception e) {
                    // 消息重试
                    logger.error("consumer_message_exception_retry, topic=my_topic, tags=tag_test, exception={}", e.getMessage());
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }
        });

        consumer.start();
        logger.info("consumer_message_success");
    }
}

3.2 消息过滤

public void consumerMessage() throws MQClientException {
    consumer = new DefaultMQPushConsumer("consumer_group");
    consumer.setNamesrvAddr("localhost:9876");
    consumer.setConsumeMessageBatchMaxSize(1);

    /**
     * subscribe(String topic, MessageSelector messageSelector)
     *
     * 按内容过滤:MessageSelector.bySql("status = 1001 or status = 1002");
     * 按属性过滤:MessageSelector.bySql("userProperties['sceneType'] = '1'");
     */
    // 消息过滤
    consumer.subscribe("my_topic", "tag1 || tag2");
    // consumer.subscribe("my_topic", MessageSelector.byTag("tag1 || tag2"));

    // 消息事件监听
    consumer.registerMessageListener((MessageListenerConcurrently) (messages, context) -> {
        try {
            for (MessageExt message : messages) {
                // do something...
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        } catch (Exception e) {
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    });

    consumer.start();
    logger.info("consumer_message_success");
}

3.3 RocketMQ:Message filtering

文档 https://www.alibabacloud.com/help/en/apsaramq-for-rocketmq/cloud-message-queue-rocketmq-5-x-series/developer-reference/message-filtering?spm=a2c63.p38356.0.0.2e626814CogIhJ

SQL92 syntax Description Example
AND OR
= <>
IS NULL
IS NOT NULL Attribute a exists
> >= < <=
IN (xxx, xxx)
BETWEEN xxx AND xxx
NOT BETWEEN xxx AND xxx

https://www.xamrdz.com/web/2g31994035.html

相关文章: