1. 消息结构
1.1 消息模型 (Message)
生产者(Producer)通常使用 Message 来构造待发送的消息
public class Message implements Serializable {
private static final long serialVersionUID = 8445773977080406428L;
private String topic;
private int flag;
/**
* KEYS: 主键
* TAGS: 消息标签
*/
private Map<String, String> properties;
private byte[] body;
private String transactionId;
public void setTopic(String topic) {
this.topic = topic;
}
public void setKeys(String keys) {
this.putProperty(MessageConst.PROPERTY_KEYS, keys);
}
public void setTags(String tags) {
this.putProperty(MessageConst.PROPERTY_TAGS, tags);
}
public void putUserProperty(final String name, final String value) {
// ...
this.putProperty(name, value);
}
}
1.2 Message扩展类 (MessageExt)
消费者(Consumer)从Broker拉取消息时,实际接收到的是 MessageExt 类型的对象,这样消费者不仅可以获取到原始的消息内容,还能获取到消息在整个生命周期内的详细轨迹信息
public class MessageExt extends Message {
private static final long serialVersionUID = 5720810158625748049L;
private String brokerName;
private int queueId;
private int storeSize;
private long queueOffset;
private int sysFlag;
private long bornTimestamp;
private SocketAddress bornHost;
private long storeTimestamp;
private SocketAddress storeHost;
private String msgId;
private long commitLogOffset;
private int bodyCRC;
private int reconsumeTimes;
private long preparedTransactionOffset;
}
2. 生产者
两个实现类
org.apache.rocketmq.client.producer.DefaultMQProducer
org.apache.rocketmq.client.producer.TransactionMQProducer
2.2 公共配置类 (ClientConfig)
public class ClientConfig {
private String namesrvAddr = NameServerAddressUtils.getNameServerAddresses();
private String clientIP = NetworkUtil.getLocalAddress();
private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");
protected String namespace;
}
2.3 DefaultMQProducer
public class DefaultMQProducer extends ClientConfig implements MQProducer {
private String producerGroup;
public void start() throws MQClientException { }
// 关闭本地已注册的生产者,关闭已注册到Broker的客户端
public void shutdown() { }
// 发送同步消息
public SendResult send(Collection<Message> msgs) { }
// 发送异步消息
public void send(Collection<Message> msgs, SendCallback sendCallback) { }
// 发送单向消息, 只负责发送消息,不管发送结果
public void sendOneway(Message msg) { }
}
3. 消费者
- 两种消费模式
- 拉模式 (Pull Consumer)
- 推模式 (Push Consumer)
3.2 重试-死信机制
RocketMQ的消费过程分为 3个阶段:正常消费、重试消费和死信
消息消费失败,并且重试也失败了,那么消息会被保存到死信Topic中,进入死信Topic的消息不能被再次消费
3.3 消费服务
public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {
// 并行消费
@Override
public void registerMessageListener(MessageListenerConcurrently messageListener) {
this.messageListener = messageListener;
this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
}
// 顺序消费
@Override
public void registerMessageListener(MessageListenerOrderly messageListener) {
this.messageListener = messageListener;
this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
}
}