一:消息发送
过程:
- producer根据不同场景发送不同的消息,如顺序消息,事务消息等,默认使用DefaultMQProducer,发送需要经过五个步骤:
- 设置groupName,相同的groupName会分在一个组
- 设置instanceName, 如果一个jvm上需要启动多个实例,需要instanceName来区分。默认为"DEFAULT"
- 设置nameServer地址,从nameServer获取topic所在的broker地址
- 组装消息并发送
消息返回状态:
- FLUSH_DISK_TIMEOUT: 同步刷盘超时(broker的刷盘策略为SYNC_FLUSH时才会报这个错)
- FLUSH_SLAVE_TIMEOUT: 主备模式下,主从复制超时(需要master实例设置为同步复制:SYNC_MASTER)
- SLAVE_NOT_AVAILABLE: 主备模式下,主从复制没有可用的slave(需要master实例设置为同步复制:SYNC_MASTER)
- SEND_OK:发送成功
如何提升消息发送的性能:
- 如果对可靠性不要求,可以使用Oneway的方式。 只管发送到服务器,不需要应答。
- 增加producer实例,增大并发数量(不用担心多个同时写入会降低写磁盘的效率,rocket引入了一个并发窗口,消息是并发写入DirectMem中,而后使用异步方式刷盘)。
- 采用顺序写commitLog可以保持高性能。
broker配置:
二:消息消费
消息消费的几个要点:
- 消费方式: pull和push
- 消费模式: 广播和集群
- 流量控制: 可结合sentinel
- 消息过滤: Tag
- 并发线程设置
- 消息积压:
- 优化系统逻辑,提升消费速度
- 增加单个consumer的并行处理的线程数(修改consumeThreadMin和consumeThreadMax)
- 增加group内的consumer实例数量,但是不要超过topic下的Read Queue数量,因为超过的实例不能接收到消息
- 批量方式消费消息(修改consumeMessageBatchMaxSize,默认1)。 这个要看场景,如果场景不需要立即消费消息,可以批量来处理
- 丢弃不重要的消息
三:消息存储
消息存储结构
文件系统:rocket刷盘采用的是机器的文件系统来做持久化。 目前的高性能磁盘,顺序写速度可达600MB/s,超过一般网卡传输速度,随机写只有100kb/s,相差6000倍。 而rocket正是采用顺序写。
系统结构:前面我们知道,消息发送到某一个topic,是存在messageQueue中。 而存储实际是使用ConsumeQueue和CommitLog配合完成,每个messageQueue会对应一个从consumerQueue。 当消息路由到messageQueue时,先把消息顺序写到CommitLog中,然后将索引信息写到consumerQueue中,以便消费时从commitLog中快速找到。 具体来看看
- CommitLog:文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为 1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。
- ConsumeQueue:
- 逻辑队列,目的是提高消费性能,消费时不用去遍历commitLog。
- 存储了消息在commitLog中的偏移量offset,消息大小和Tag的hashCode值 。
- 我们知道,consumerGroup中的每个consumer对应消费一个messageQueue,其实就是对应一个consumerQueue,先读取到索引信息,再去commitLog中找。
四:负载均衡
rocketmq的负载均衡分为producer和consumer端,都是由client端自己实现的策略。 当然,也可由使用者在代码中指定。
producer端:
producer体现在发送消息时,对messageQueue的选择。 它会先根据topic获取TopicPublishInfo信息,里面记录了topic下可选择的messageQueue列表,轮询选择。
consumer端:
- pull consumer可以看到所有的Message Queue,而且从哪个Message Queue读取消息,读消息时的Offset都由使用者控制,使用者可以实现任何特殊方式的负载均衡。
- push consumer: 默认负载均衡策略是AllocateMessageQueueAveragely,每个consumer在启动时,或者group中成员数量发生变化,都会触发doRebalance动作。 此动作分为几步:
- broker实例接收consumer发送的心跳包,返回topic的queue信息(数量,id等),group中的consumer信息等。
- consumer在RebalanceImpl类中,对mq排序和consumer排序,排序后确定那些mq分给那些consumer。
- 根据当前的consumer的id,返回其消费的queue
五:消息重试
producer端的重试:
//同步发送消息,如果5秒内没有发送成功,则重试5次
DefaultMQProducer producer = new DefaultMQProducer("DefaultProducer");
producer.setRetryTimesWhenSendFailed(5);
producer.send(msg,5000L);
consumer端的重试:
1、顺序消息的重试
顺序消息失败后,每隔1s会重试。 这样可能发生阻塞,一定要保证能及时处理消费失败的情况
DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("consumer_grp_04_01");
consumer.setNamesrvAddr("node1:9876");
//设置并发线程和批量拉取数量为1
consumer.setConsumeMessageBatchMaxSize(1);
consumer.setConsumeThreadMin(1);
consumer.setConsumeThreadMax(1);
// 消息订阅
consumer.subscribe("tp_demo_04", "*");
// 并发消费
// consumer.setMessageListener(new MessageListenerConcurrently() {
// @Override
// public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
// return null;
// }
// });
// 顺序消费
consumer.setMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(msg.getMsgId() + "\t" + msg.getQueueId() +
"\t" + new String(msg.getBody()));
}
return null;
}
});
consumer.start();
}
2、无序消息的重试
对于无序消息(普通、定时、延时、事务消息)重试 只针对集群消费方式生效, RocketMQ 默认允许每条消息最多重试 16 次,时间间隔如下
默认16次,可自行配置次数:consumer.setMaxReconsumeTimes(20); 超过16次时间间隔都是2h。
配置方式:
public class MyConcurrentlyMessageListener implements MessageListenerConcurrently {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//处理消息
doConsumeMessage(msgs);
//方式1:返回 ConsumeConcurrentlyStatus.RECONSUME_LATER,消息将重试
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
//方式2:返回 null,消息将重试
return null;
//方式3:直接抛出异常, 消息将重试
throw new RuntimeException("Consumer Message exceotion");
}
}
如果不想重试,则返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//处理消息
try {
doConsumeMessage(msgs);
} catch (Throwable e) {
//捕获消费逻辑中的所有异常,并返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
//消息处理正常,直接返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
还可以获取消息当前重试的次数,来决定是否继续重试
for (MessageExt msg : msgs) {
System.out.println(msg.getReconsumeTimes());
}
doConsumeMessage(msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
六:死信队列
消息超过重试最大次数后,会放到私信队列Dead-Letter Queue。 处理问题后,可在控制台手动触发,重新消费。 来看看可视化控制台的搭建:
可视化工具: rocketmq-console 下载地址:
https://github.com/apache/rocketmq-externals/archive/rocketmq-console-1.0.0.zip
NameSrv地址端口,编译启动后访问即可
#
编译打包
mvn clean package
-DskipTests
#
运行工具
java
-jar
target/rocketmq-console-ng-1.0.0.jar
当可以访问后,在界面中选择相应的主题和消息,便可手动重新发送,再次消费。
特性: 死信队列的消息存活3天
七:延迟消息
定时消息: rocket中自带了一个topic:SCHEDULE_TOPIC_XXXX,该topic下绑定了代表不同定时时间的queue。 如果发送定时消息,会先发到该topic,并根据具体时间路由都对应的queue中,并在达到合适的时间后发送到业务topic中。
默认时间有18个等级,delayTimeLevel:1-18(18个queue,queueId=level-1): 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。
- level == 0,消息为非延迟消息
- 1<=level<=maxLevel,消息延迟特定时间,例如level==1,延迟1s
- level > maxLevel,则level== maxLevel,例如level==20,延迟2h
- 在producer发送message时,指定等级level:message.setDelayTimeLevel(level);
八:顺序消息
前面第一章已提过顺序消息,就是为了保证消费时候按生成消息的顺序消费。顺序消息包括: 全局顺序和部分顺序。现在,仔细来看一下。
先解释几个参数:
- MessageListenerOrderly: consumer端使用该模式代表开启顺序消费模式,在一次拉取消息后消费逻辑没有处理完,不会进行下一次拉取
- setConsumeThreadMin:consumer最少开启多少线程同时去拉取
- setConsumeThreadMax:consumer最多开启多少线程同时去拉取
- setPullBatchSize:每个线程每次最多拉取多少消息
- setConsumeMessageBatchMaxSize: 拉取消息后,调用listener处理消费逻辑时,一次传入多少条消息
全局顺序消息(很少场景用)
全局顺序消息,要保证同一个topic下所有的消息都有序,那就只能使用一个producer发消息,一个consumer消费,broker实例和其messageQueue也只能有一个。 另外,上面所有的参数也要设置为1,预防从同一个queue中一次拉取多条消息,却没按序消费。
- 创建主题并指定读写实例节点为1个:mqadmin updateTopic -b node1:10911 -n localhost:9876 -r 1 -t tp_demo_07_01 -w 1
- producer端: 使用一个producer轮询往同一个queue中发消息
public class GlobalOrderProducer { public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException { DefaultMQProducer producer = new DefaultMQProducer("producer_grp_07_02"); producer.setNamesrvAddr("node1:9876"); producer.start(); Message message = null; for (int i = 0; i < 100; i++) { message = new Message("tp_demo_07_01", ("hello lagou" + i).getBytes()); producer.send(message); } producer.shutdown(); } }
- consumer端:
public class GlobalOrderConsumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_grp_07_03"); consumer.setNamesrvAddr("node1:9876"); consumer.subscribe("tp_demo_07_01", "*"); consumer.setConsumeThreadMin(1); consumer.setConsumeThreadMax(1); consumer.setPullBatchSize(1); consumer.setConsumeMessageBatchMaxSize(1); consumer.setMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { for (MessageExt msg : msgs) { System.out.println(new String(msg.getBody())); } return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); } }
局部顺序消息(常用)
比如一个订单的创建,支付,完成3个消息,需要保证有序,但是不同订单之间可以并行。 要保证部分有序,需要producer端将同一订单的3条消息,发送到同一个队列。因为同一个队列只能被指定的一个consumer消费(集群模式),所以,在consumer端如果保证一个线程消费
producer:
producer.start();
Message message = null;
List<MessageQueue> queues = producer.fetchPublishMessageQueues("tp_demo_07");
System.err.println(queues.size());
MessageQueue queue = null;
for(int i = 0; i < 100; i++) {
queue = queues.get(i % 8);
message = new Message("tp_demo_07", ("hello lagou - order create" + i).getBytes());
producer.send(message, queue);
message = new Message("tp_demo_07", ("hello lagou - order payed" + i).getBytes());
producer.send(message, queue);
message = new Message("tp_demo_07", ("hello lagou - order ship" + i).getBytes());
producer.send(message, queue);
}
producer.shutdown();
consumer:
代码同全局顺序的consumer,只不过那4个参数可以不为1,因为,MessageListenerOrderly模式为每个consumerQueue上了锁,保证同一时间只能有一个线程访问,达到了局部有序的作用。 但是对不同的queue之间还是可以并发的。
九:事务消息
事务消息就是为了保证,投递消息和本地其他操作同时成功或失败,采用二阶段提交,流程如下:
- producer发送事务消息到mq,mq发现是事务消息,标记为待确认-Pending状态 并持久化,返回发送成功。
- 这个时候消息并没有发送到真正的topic,而是mq将消息(包含要发送的topic和queue的信息)发送到了另一个topic:RMQ_SYS_TRANS_HALF_TOPIC,这种情况下consumer端是无法消费的
- producer收到发送成功响应,执行本地逻辑,完成后向mq发送二次确认(commit或rollback),如果是commit,mq会将消息投递到真正的topic和queue,consumer开始消费;否则,删除消息。
- 如果producer端没能正常二次确认,mq在固定时间内会发起会查请求二次确认(默认最多回查15次,如不能确定,回滚),然后执行以上操作。这时候,如果该producer不能正常使用,会调用同组下其他producer。
producer端使用三个类配合完成是事务消息的发送:
- LocalTransaction-Executer: 执行本地事务,并返回二次确认的状态: LocalTransactionState.ROLLBACK_MESSAGE 或 LocalTransactionState.COMMIT_MESSAGE
- TransactionMQProducer: 用来发消息,比DefaultMQProducer多设置本地事务处理函数和回查状态函数
- TransactionCheckListener: 注册的回调函数,处理回查请求
具体使用:
public class TxProducer {
public static void main(String[] args) throws MQClientException {
TransactionListener listener = new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 当发送事务消息prepare(half)成功后,调用该方法执行本地事务
System.out.println("执行本地事务,参数为:" + arg);
try {
Thread.sleep(100000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// return LocalTransactionState.ROLLBACK_MESSAGE;
return LocalTransactionState.COMMIT_MESSAGE;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 如果没有收到生产者发送的Half Message的响应,broker发送请求到生产 者回查生产者本地事务的状态
// 该方法用于获取本地事务执行的状态。
System.out.println("检查本地事务的状态:" + msg);
return LocalTransactionState.COMMIT_MESSAGE;
// return LocalTransactionState.ROLLBACK_MESSAGE;
}
};
TransactionMQProducer producer = new TransactionMQProducer("tx_producer_grp_08");
producer.setTransactionListener(listener);
producer.setNamesrvAddr("node1:9876");
producer.start();
Message message = null;
message = new Message("tp_demo_08", "hello lagou - tx".getBytes());
producer.sendMessageInTransaction(message, " {\"name\":\"zhangsan\"}");
}
}
十:sentinel限流
sentinel可以让消息在一定时间内允许消费,不至于一下大批量消息压垮下游。 具体用法如下
引入依赖:
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-core</artifactId>
<version>x.y.z</version>
</dependency>
consumer:
import com.alibaba.csp.sentinel.Constants;
import com.alibaba.csp.sentinel.Entry;
import com.alibaba.csp.sentinel.EntryType;
import com.alibaba.csp.sentinel.SphU;
import com.alibaba.csp.sentinel.context.ContextUtil;
import com.alibaba.csp.sentinel.slots.block.BlockException;
import com.alibaba.csp.sentinel.slots.block.RuleConstant;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
public class PullDemo {
private static final String GROUP_NAME = "consumer_grp_13_05";
private static final String TOPIC_NAME = "tp_demo_13";
private static final String KEY = String.format("%s:%s", GROUP_NAME, TOPIC_NAME);
private static final Map<MessageQueue, Long> OFFSET_TABLE = new HashMap<MessageQueue, Long>();
@SuppressWarnings("PMD.ThreadPoolCreationRule")
private static final ExecutorService pool = Executors.newFixedThreadPool(32);
private static final AtomicLong SUCCESS_COUNT = new AtomicLong(0);
private static final AtomicLong FAIL_COUNT = new AtomicLong(0);
public static void main(String[] args) throws MQClientException {
// 初始化哨兵的流控
initFlowControlRule();
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(GROUP_NAME);
consumer.setNamesrvAddr("node1:9876");
consumer.start();
//拉取所有队列
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(TOPIC_NAME);
for (MessageQueue mq : mqs) {
System.out.printf("Consuming messages from the queue: %s%n", mq);
SINGLE_MQ:
while (true) {
try {
//拉取当前队列里面的消息,从指定偏移量位置拉取,每次最多32条(如果没有消息,该方法会阻塞等待消息)
PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
if (pullResult.getMsgFoundList() != null) {
for (MessageExt msg : pullResult.getMsgFoundList()) { //消费每一条消息
doSomething(msg);
}
}
long nextOffset = pullResult.getNextBeginOffset();
// 将每个mq对应的偏移量记录在本地HashMap中
putMessageQueueOffset(mq, nextOffset);
consumer.updateConsumeOffset(mq, nextOffset);
switch (pullResult.getPullStatus()) {
case NO_NEW_MSG:
break SINGLE_MQ; //根据标识跳出while循环
case FOUND:
case NO_MATCHED_MSG:
case OFFSET_ILLEGAL:
default:
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
consumer.shutdown();
}
/**
* 对每个收到的消息使用一个线程提交任务
* @param message
*/
private static void doSomething(MessageExt message) {
pool.submit(() -> { //对每个收到的消息使用一个线程提交任务
Entry entry = null;
try {
//这两步具体干啥的我不知道,但在这里的作用是,体现限流。没有被限流的才会往下走,否则抛出BlockException 异常
//因为我们的QPS是5,每秒处理一个消息,队列超时时间是5s,所以,即使submit了32个线程,也只有5个能被处理。 剩下的27个被丢弃,抛出BlockException 异常
ContextUtil.enter(KEY);
entry = SphU.entry(KEY, EntryType.OUT);
// 在这里处理业务逻辑,此处只是打印
System.out.printf("[%d][%s][Success: %d] Receive New Messages: %s %n", System.currentTimeMillis(), Thread.currentThread().getName(), SUCCESS_COUNT.addAndGet(1), new String(message.getBody()));
} catch (BlockException ex) {
// Blocked. // NOTE: 在处理请求被拒绝的时候,需要根据需求决定是否需要重新消费消息
System.out.println("Blocked: " + FAIL_COUNT.addAndGet(1));
} finally {
if (entry != null) {
entry.exit();
}
ContextUtil.exit();
}
});
}
/**
* 流控规则
*/
private static void initFlowControlRule() {
FlowRule rule = new FlowRule();
// 消费组名称:主题名称 字符串
rule.setResource(KEY);
// 根据QPS进行流控
rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
// 如果是5,则表示每秒5个消息,请求间隔200ms
// 1表示QPS为1,请求间隔1000ms。
rule.setCount(1);
rule.setLimitApp("default");
// 调用使用固定间隔。如果qps为1,则请求之间间隔为1s
rule.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER);
// 如果请求太多,就将这些请求放到等待队列中
// 该队列有超时时间。如果等待队列中请求超时,则丢弃
// 此处设置超时时间为5s,如果一次拉取了32个消息,当前QPS在5s内只能处理5个,就有25个被丢弃
rule.setMaxQueueingTimeMs(5 * 1000);
FlowRuleManager.loadRules(Collections.singletonList(rule));
}
// 获取指定MQ的偏移量
private static long getMessageQueueOffset(MessageQueue mq) {
Long offset = OFFSET_TABLE.get(mq);
if (offset != null) {
return offset;
}
return 0;
}
// 在本地HashMap中记录偏移量
private static void putMessageQueueOffset(MessageQueue mq, long offset) {
OFFSET_TABLE.put(mq, offset);
}
}
十一:客户端配置总结
客户端包括producer和consumer端
公共配置:
Producer 配置:
PushConsumer 配置
PullConsumer 配置
Message 数据结构