当前位置: 首页>后端>正文

RocketMQ 消息模型

1. 消息结构

1.1 消息模型 (Message)

生产者(Producer)通常使用 Message 来构造待发送的消息

public class Message implements Serializable {
    private static final long serialVersionUID = 8445773977080406428L;

    private String topic;
    private int flag;
    /**
     * KEYS: 主键
     * TAGS: 消息标签
     */
    private Map<String, String> properties;
    private byte[] body;
    private String transactionId;

    public void setTopic(String topic) {
        this.topic = topic;
    }
    public void setKeys(String keys) {
        this.putProperty(MessageConst.PROPERTY_KEYS, keys);
    }
    public void setTags(String tags) {
        this.putProperty(MessageConst.PROPERTY_TAGS, tags);
    }
    public void putUserProperty(final String name, final String value) {
        // ...
        this.putProperty(name, value);
    }
}

1.2 Message扩展类 (MessageExt)

消费者(Consumer)从Broker拉取消息时,实际接收到的是 MessageExt 类型的对象,这样消费者不仅可以获取到原始的消息内容,还能获取到消息在整个生命周期内的详细轨迹信息

public class MessageExt extends Message {
    private static final long serialVersionUID = 5720810158625748049L;

    private String brokerName;

    private int queueId;

    private int storeSize;

    private long queueOffset;
    private int sysFlag;
    private long bornTimestamp;
    private SocketAddress bornHost;

    private long storeTimestamp;
    private SocketAddress storeHost;
    private String msgId;
    private long commitLogOffset;
    private int bodyCRC;
    private int reconsumeTimes;

    private long preparedTransactionOffset;
}

2. 生产者

两个实现类
org.apache.rocketmq.client.producer.DefaultMQProducer
org.apache.rocketmq.client.producer.TransactionMQProducer

RocketMQ 消息模型,第1张

2.2 公共配置类 (ClientConfig)

public class ClientConfig {

    private String namesrvAddr = NameServerAddressUtils.getNameServerAddresses();
    private String clientIP = NetworkUtil.getLocalAddress();
    private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");

    protected String namespace;
}

2.3 DefaultMQProducer

public class DefaultMQProducer extends ClientConfig implements MQProducer {
    private String producerGroup;

    public void start() throws MQClientException { }

    // 关闭本地已注册的生产者,关闭已注册到Broker的客户端
    public void shutdown() { }

    // 发送同步消息
    public SendResult send(Collection<Message> msgs) { }

    // 发送异步消息
    public void send(Collection<Message> msgs, SendCallback sendCallback) { }

    // 发送单向消息, 只负责发送消息,不管发送结果
    public void sendOneway(Message msg) { }
}

3. 消费者

  • 两种消费模式
    • 拉模式 (Pull Consumer)
    • 推模式 (Push Consumer)
RocketMQ 消息模型,第2张

3.2 重试-死信机制

RocketMQ的消费过程分为 3个阶段:正常消费、重试消费和死信

消息消费失败,并且重试也失败了,那么消息会被保存到死信Topic中,进入死信Topic的消息不能被再次消费

3.3 消费服务

public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {
    
    // 并行消费
    @Override
    public void registerMessageListener(MessageListenerConcurrently messageListener) {
        this.messageListener = messageListener;
        this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
    }

    // 顺序消费
    @Override
    public void registerMessageListener(MessageListenerOrderly messageListener) {
        this.messageListener = messageListener;
        this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
    }
}

https://www.xamrdz.com/backend/3q71941512.html

相关文章: