当前位置: 首页>编程语言>正文

rockemq 设置密码 rocketm2设置

一:消息发送

过程:

  1. producer根据不同场景发送不同的消息,如顺序消息,事务消息等,默认使用DefaultMQProducer,发送需要经过五个步骤:
  2. 设置groupName,相同的groupName会分在一个组
  3. 设置instanceName, 如果一个jvm上需要启动多个实例,需要instanceName来区分。默认为"DEFAULT"
  4. 设置nameServer地址,从nameServer获取topic所在的broker地址
  5. 组装消息并发送

 消息返回状态:

  1. FLUSH_DISK_TIMEOUT: 同步刷盘超时(broker的刷盘策略为SYNC_FLUSH时才会报这个错)
  2. FLUSH_SLAVE_TIMEOUT: 主备模式下,主从复制超时(需要master实例设置为同步复制:SYNC_MASTER)
  3. SLAVE_NOT_AVAILABLE:  主备模式下,主从复制没有可用的slave(需要master实例设置为同步复制:SYNC_MASTER)
  4. SEND_OK:发送成功

如何提升消息发送的性能:

  1.  如果对可靠性不要求,可以使用Oneway的方式。 只管发送到服务器,不需要应答。
  2. 增加producer实例,增大并发数量(不用担心多个同时写入会降低写磁盘的效率,rocket引入了一个并发窗口,消息是并发写入DirectMem中,而后使用异步方式刷盘)。
  3. 采用顺序写commitLog可以保持高性能。

broker配置:

rockemq 设置密码 rocketm2设置,rockemq 设置密码 rocketm2设置_rockemq 设置密码,第1张

二:消息消费 

消息消费的几个要点:

  1. 消费方式: pull和push
  2. 消费模式: 广播和集群
  3. 流量控制: 可结合sentinel
  4. 消息过滤: Tag
  5. 并发线程设置
  6. 消息积压:
  1. 优化系统逻辑,提升消费速度
  2. 增加单个consumer的并行处理的线程数(修改consumeThreadMin和consumeThreadMax
  3. 增加group内的consumer实例数量,但是不要超过topic下的Read Queue数量,因为超过的实例不能接收到消息
  4. 批量方式消费消息(修改consumeMessageBatchMaxSize,默认1)。 这个要看场景,如果场景不需要立即消费消息,可以批量来处理
  5. 丢弃不重要的消息

三:消息存储 

消息存储结构

文件系统:rocket刷盘采用的是机器的文件系统来做持久化。 目前的高性能磁盘,顺序写速度可达600MB/s,超过一般网卡传输速度,随机写只有100kb/s,相差6000倍。 而rocket正是采用顺序写。

系统结构:前面我们知道,消息发送到某一个topic,是存在messageQueue中。 而存储实际是使用ConsumeQueue和CommitLog配合完成,每个messageQueue会对应一个从consumerQueue。 当消息路由到messageQueue时,先把消息顺序写到CommitLog中,然后将索引信息写到consumerQueue中,以便消费时从commitLog中快速找到。    具体来看看

  1. CommitLog:文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为 1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。
  2. ConsumeQueue
  1. 逻辑队列,目的是提高消费性能,消费时不用去遍历commitLog。
  2. 存储了消息在commitLog中的偏移量offset,消息大小和Tag的hashCode值  。
  3. 我们知道,consumerGroup中的每个consumer对应消费一个messageQueue,其实就是对应一个consumerQueue,先读取到索引信息,再去commitLog中找。

四:负载均衡

rocketmq的负载均衡分为producer和consumer端,都是由client端自己实现的策略。 当然,也可由使用者在代码中指定。

producer端:

producer体现在发送消息时,对messageQueue的选择。 它会先根据topic获取TopicPublishInfo信息,里面记录了topic下可选择的messageQueue列表,轮询选择。

consumer端:

  • pull consumer可以看到所有的Message Queue,而且从哪个Message Queue读取消息,读消息时的Offset都由使用者控制,使用者可以实现任何特殊方式的负载均衡。
  • push consumer: 默认负载均衡策略是AllocateMessageQueueAveragely,每个consumer在启动时,或者group中成员数量发生变化,都会触发doRebalance动作。 此动作分为几步:
  • broker实例接收consumer发送的心跳包,返回topic的queue信息(数量,id等),group中的consumer信息等。
  • consumer在RebalanceImpl类中,对mq排序和consumer排序,排序后确定那些mq分给那些consumer。
  • 根据当前的consumer的id,返回其消费的queue




五:消息重试

producer端的重试:

//同步发送消息,如果5秒内没有发送成功,则重试5次
DefaultMQProducer producer = new DefaultMQProducer("DefaultProducer");
producer.setRetryTimesWhenSendFailed(5);
producer.send(msg,5000L);

consumer端的重试:

1、顺序消息的重试

顺序消息失败后,每隔1s会重试。 这样可能发生阻塞,一定要保证能及时处理消费失败的情况

DefaultMQPushConsumer consumer = new
                DefaultMQPushConsumer("consumer_grp_04_01");
        consumer.setNamesrvAddr("node1:9876");
        //设置并发线程和批量拉取数量为1
        consumer.setConsumeMessageBatchMaxSize(1);
        consumer.setConsumeThreadMin(1);
        consumer.setConsumeThreadMax(1);
        // 消息订阅
        consumer.subscribe("tp_demo_04", "*");
        // 并发消费
//         consumer.setMessageListener(new MessageListenerConcurrently() {
//             @Override
//             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//                 return null;
//             }
//         });
        
        // 顺序消费
            consumer.setMessageListener(new MessageListenerOrderly() {
                @Override
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
                                                           ConsumeOrderlyContext context) {
                    for (MessageExt msg : msgs) {
                        System.out.println(msg.getMsgId() + "\t" + msg.getQueueId() +
                                "\t" + new String(msg.getBody()));
                    }
                    return null;
                }
            });
            consumer.start();
        }

2、无序消息的重试


对于无序消息(普通、定时、延时、事务消息)重试 只针对集群消费方式生效, RocketMQ 默认允许每条消息最多重试 16 次,时间间隔如下



rockemq 设置密码 rocketm2设置,rockemq 设置密码 rocketm2设置_rockemq 设置密码_02,第2张

默认16次,可自行配置次数:consumer.setMaxReconsumeTimes(20);   超过16次时间间隔都是2h。

配置方式:

public class MyConcurrentlyMessageListener implements MessageListenerConcurrently {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        //处理消息 
        doConsumeMessage(msgs);
        //方式1:返回 ConsumeConcurrentlyStatus.RECONSUME_LATER,消息将重试 
        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        //方式2:返回 null,消息将重试 
        return null;
        //方式3:直接抛出异常, 消息将重试 
        throw new RuntimeException("Consumer Message exceotion");

    }
}

如果不想重试,则返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    //处理消息
    try {
        doConsumeMessage(msgs);
    } catch (Throwable e) {
        //捕获消费逻辑中的所有异常,并返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS 
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
    //消息处理正常,直接返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS 
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

还可以获取消息当前重试的次数,来决定是否继续重试

for (MessageExt msg : msgs) { 
    System.out.println(msg.getReconsumeTimes()); 
}
doConsumeMessage(msgs); 
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

六:死信队列

消息超过重试最大次数后,会放到私信队列Dead-Letter Queue。 处理问题后,可在控制台手动触发,重新消费。   来看看可视化控制台的搭建:

可视化工具: rocketmq-console 下载地址:

https://github.com/apache/rocketmq-externals/archive/rocketmq-console-1.0.0.zip

NameSrv地址端口,编译启动后访问即可

#  
   编译打包 
 
  
mvn clean package  
   -DskipTests 
 
  
#  
   运行工具 
 
  
java  
   -jar  
   target/rocketmq-console-ng-1.0.0.jar

当可以访问后,在界面中选择相应的主题和消息,便可手动重新发送,再次消费。

特性: 死信队列的消息存活3天

七:延迟消息

定时消息: rocket中自带了一个topic:SCHEDULE_TOPIC_XXXX,该topic下绑定了代表不同定时时间的queue。 如果发送定时消息,会先发到该topic,并根据具体时间路由都对应的queue中,并在达到合适的时间后发送到业务topic中。

默认时间有18个等级,delayTimeLevel:1-18(18个queue,queueId=level-1): 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。

  • level == 0,消息为非延迟消息
  • 1<=level<=maxLevel,消息延迟特定时间,例如level==1,延迟1s
  • level > maxLevel,则level== maxLevel,例如level==20,延迟2h
  • 在producer发送message时,指定等级level:message.setDelayTimeLevel(level);

八:顺序消息

前面第一章已提过顺序消息,就是为了保证消费时候按生成消息的顺序消费。顺序消息包括: 全局顺序和部分顺序。现在,仔细来看一下。

先解释几个参数:

  • MessageListenerOrderly: consumer端使用该模式代表开启顺序消费模式,在一次拉取消息后消费逻辑没有处理完,不会进行下一次拉取
  • setConsumeThreadMin:consumer最少开启多少线程同时去拉取
  • setConsumeThreadMax:consumer最多开启多少线程同时去拉取
  • setPullBatchSize:每个线程每次最多拉取多少消息
  • setConsumeMessageBatchMaxSize: 拉取消息后,调用listener处理消费逻辑时,一次传入多少条消息


全局顺序消息(很少场景用)



全局顺序消息,要保证同一个topic下所有的消息都有序,那就只能使用一个producer发消息,一个consumer消费,broker实例和其messageQueue也只能有一个。    另外,上面所有的参数也要设置为1,预防从同一个queue中一次拉取多条消息,却没按序消费。

  1. 创建主题并指定读写实例节点为1个:mqadmin updateTopic -b node1:10911 -n localhost:9876 -r 1 -t tp_demo_07_01 -w 1
  2. producer端: 使用一个producer轮询往同一个queue中发消息

public class GlobalOrderProducer {
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
        DefaultMQProducer producer = new DefaultMQProducer("producer_grp_07_02");
        producer.setNamesrvAddr("node1:9876");
        producer.start();
        Message message = null;
        for (int i = 0; i < 100; i++) {
            message = new Message("tp_demo_07_01", ("hello lagou" + i).getBytes());
            producer.send(message);
        }
        producer.shutdown();
    }
}
  1. consumer端:

public class GlobalOrderConsumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_grp_07_03");
        consumer.setNamesrvAddr("node1:9876");
        consumer.subscribe("tp_demo_07_01", "*");
        consumer.setConsumeThreadMin(1);
        consumer.setConsumeThreadMax(1);
        consumer.setPullBatchSize(1);
        consumer.setConsumeMessageBatchMaxSize(1);
        consumer.setMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println(new String(msg.getBody()));
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();
    }
}

局部顺序消息(常用)

比如一个订单的创建,支付,完成3个消息,需要保证有序,但是不同订单之间可以并行。 要保证部分有序,需要producer端将同一订单的3条消息,发送到同一个队列。因为同一个队列只能被指定的一个consumer消费(集群模式),所以,在consumer端如果保证一个线程消费

producer:

producer.start(); 
Message message = null; 
List<MessageQueue> queues = producer.fetchPublishMessageQueues("tp_demo_07"); 
System.err.println(queues.size()); 
MessageQueue queue = null; 
for(int i = 0; i < 100; i++) { 
    queue = queues.get(i % 8); 
    message = new Message("tp_demo_07", ("hello lagou - order create" + i).getBytes()); 
    producer.send(message, queue); 
    message = new Message("tp_demo_07", ("hello lagou - order payed" + i).getBytes()); 
    producer.send(message, queue); 
    message = new Message("tp_demo_07", ("hello lagou - order ship" + i).getBytes()); 
    producer.send(message, queue); 
}
producer.shutdown();

consumer:

代码同全局顺序的consumer,只不过那4个参数可以不为1,因为,MessageListenerOrderly模式为每个consumerQueue上了锁,保证同一时间只能有一个线程访问,达到了局部有序的作用。  但是对不同的queue之间还是可以并发的。

九:事务消息

事务消息就是为了保证,投递消息和本地其他操作同时成功或失败,采用二阶段提交,流程如下:

  • producer发送事务消息到mq,mq发现是事务消息,标记为待确认-Pending状态  并持久化,返回发送成功。
  • 这个时候消息并没有发送到真正的topic,而是mq将消息(包含要发送的topic和queue的信息)发送到了另一个topic:RMQ_SYS_TRANS_HALF_TOPIC,这种情况下consumer端是无法消费的
  • producer收到发送成功响应,执行本地逻辑,完成后向mq发送二次确认(commit或rollback),如果是commit,mq会将消息投递到真正的topic和queue,consumer开始消费;否则,删除消息。
  • 如果producer端没能正常二次确认,mq在固定时间内会发起会查请求二次确认(默认最多回查15次,如不能确定,回滚),然后执行以上操作。这时候,如果该producer不能正常使用,会调用同组下其他producer。

producer端使用三个类配合完成是事务消息的发送: 

  • LocalTransaction-Executer: 执行本地事务,并返回二次确认的状态:  LocalTransactionState.ROLLBACK_MESSAGE 或 LocalTransactionState.COMMIT_MESSAGE
  • TransactionMQProducer: 用来发消息,比DefaultMQProducer多设置本地事务处理函数和回查状态函数
  • TransactionCheckListener: 注册的回调函数,处理回查请求

具体使用:

public class TxProducer {
    public static void main(String[] args) throws MQClientException {
        TransactionListener listener = new TransactionListener() {
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                // 当发送事务消息prepare(half)成功后,调用该方法执行本地事务
                System.out.println("执行本地事务,参数为:" + arg);
                try {
                    Thread.sleep(100000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // return LocalTransactionState.ROLLBACK_MESSAGE;
                return LocalTransactionState.COMMIT_MESSAGE;
            }

            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                // 如果没有收到生产者发送的Half Message的响应,broker发送请求到生产 者回查生产者本地事务的状态
                // 该方法用于获取本地事务执行的状态。
                System.out.println("检查本地事务的状态:" + msg);
                return LocalTransactionState.COMMIT_MESSAGE;
                // return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        };
        TransactionMQProducer producer = new TransactionMQProducer("tx_producer_grp_08");
        producer.setTransactionListener(listener);
        producer.setNamesrvAddr("node1:9876");
        producer.start();
        Message message = null;
        message = new Message("tp_demo_08", "hello lagou - tx".getBytes());
        producer.sendMessageInTransaction(message, " {\"name\":\"zhangsan\"}");
    }
}

十:sentinel限流

sentinel可以让消息在一定时间内允许消费,不至于一下大批量消息压垮下游。 具体用法如下

引入依赖:

<dependency>
    <groupId>com.alibaba.csp</groupId>
    <artifactId>sentinel-core</artifactId>
    <version>x.y.z</version>
</dependency>

consumer:

import com.alibaba.csp.sentinel.Constants;
import com.alibaba.csp.sentinel.Entry;
import com.alibaba.csp.sentinel.EntryType;
import com.alibaba.csp.sentinel.SphU;
import com.alibaba.csp.sentinel.context.ContextUtil;
import com.alibaba.csp.sentinel.slots.block.BlockException;
import com.alibaba.csp.sentinel.slots.block.RuleConstant;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;

public class PullDemo {
    private static final String GROUP_NAME = "consumer_grp_13_05";
    private static final String TOPIC_NAME = "tp_demo_13";
    private static final String KEY = String.format("%s:%s", GROUP_NAME, TOPIC_NAME);
    private static final Map<MessageQueue, Long> OFFSET_TABLE = new HashMap<MessageQueue, Long>();
    @SuppressWarnings("PMD.ThreadPoolCreationRule")
    private static final ExecutorService pool = Executors.newFixedThreadPool(32);
    private static final AtomicLong SUCCESS_COUNT = new AtomicLong(0);
    private static final AtomicLong FAIL_COUNT = new AtomicLong(0);

    public static void main(String[] args) throws MQClientException {
        // 初始化哨兵的流控
        initFlowControlRule();
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(GROUP_NAME);
        consumer.setNamesrvAddr("node1:9876");
        consumer.start();
        //拉取所有队列
        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(TOPIC_NAME);
        for (MessageQueue mq : mqs) {
            System.out.printf("Consuming messages from the queue: %s%n", mq);
            SINGLE_MQ:
            while (true) {
                try {
                    //拉取当前队列里面的消息,从指定偏移量位置拉取,每次最多32条(如果没有消息,该方法会阻塞等待消息)
                    PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                    if (pullResult.getMsgFoundList() != null) {
                        for (MessageExt msg : pullResult.getMsgFoundList()) { //消费每一条消息
                            doSomething(msg);
                        }
                    }
                    long nextOffset = pullResult.getNextBeginOffset();
                    // 将每个mq对应的偏移量记录在本地HashMap中
                    putMessageQueueOffset(mq, nextOffset);
                    consumer.updateConsumeOffset(mq, nextOffset);
                    switch (pullResult.getPullStatus()) {
                        case NO_NEW_MSG:
                            break SINGLE_MQ;  //根据标识跳出while循环
                        case FOUND:
                        case NO_MATCHED_MSG:
                        case OFFSET_ILLEGAL:
                        default:
                            break;
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
        consumer.shutdown();
    }

    /**
     * 对每个收到的消息使用一个线程提交任务
     * @param message
     */
    private static void doSomething(MessageExt message) {
        pool.submit(() -> { //对每个收到的消息使用一个线程提交任务
            Entry entry = null;
            try {
                //这两步具体干啥的我不知道,但在这里的作用是,体现限流。没有被限流的才会往下走,否则抛出BlockException 异常
                //因为我们的QPS是5,每秒处理一个消息,队列超时时间是5s,所以,即使submit了32个线程,也只有5个能被处理。 剩下的27个被丢弃,抛出BlockException 异常
                ContextUtil.enter(KEY);
                entry = SphU.entry(KEY, EntryType.OUT);
                // 在这里处理业务逻辑,此处只是打印
                System.out.printf("[%d][%s][Success: %d] Receive New Messages: %s %n", System.currentTimeMillis(), Thread.currentThread().getName(), SUCCESS_COUNT.addAndGet(1), new String(message.getBody()));
            } catch (BlockException ex) {
                // Blocked.  // NOTE: 在处理请求被拒绝的时候,需要根据需求决定是否需要重新消费消息
                System.out.println("Blocked: " + FAIL_COUNT.addAndGet(1));
            } finally {
                if (entry != null) {
                    entry.exit();
                }
                ContextUtil.exit();
            }
        });
    }

    /**
     * 流控规则
     */
    private static void initFlowControlRule() {
        FlowRule rule = new FlowRule();
        // 消费组名称:主题名称 字符串
        rule.setResource(KEY);
        // 根据QPS进行流控
        rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
        // 如果是5,则表示每秒5个消息,请求间隔200ms
        // 1表示QPS为1,请求间隔1000ms。
        rule.setCount(1);
        rule.setLimitApp("default");
        // 调用使用固定间隔。如果qps为1,则请求之间间隔为1s
        rule.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER);
        // 如果请求太多,就将这些请求放到等待队列中
        // 该队列有超时时间。如果等待队列中请求超时,则丢弃
        // 此处设置超时时间为5s,如果一次拉取了32个消息,当前QPS在5s内只能处理5个,就有25个被丢弃
        rule.setMaxQueueingTimeMs(5 * 1000);
        FlowRuleManager.loadRules(Collections.singletonList(rule));
    }

    // 获取指定MQ的偏移量
    private static long getMessageQueueOffset(MessageQueue mq) {
        Long offset = OFFSET_TABLE.get(mq);
        if (offset != null) {
            return offset;
        }
        return 0;
    }

    // 在本地HashMap中记录偏移量
    private static void putMessageQueueOffset(MessageQueue mq, long offset) {
        OFFSET_TABLE.put(mq, offset);
    }
}

十一:客户端配置总结

客户端包括producer和consumer端

公共配置:

rockemq 设置密码 rocketm2设置,rockemq 设置密码 rocketm2设置_java_03,第3张


Producer 配置:



rockemq 设置密码 rocketm2设置,rockemq 设置密码 rocketm2设置_java_04,第4张


PushConsumer 配置




rockemq 设置密码 rocketm2设置,rockemq 设置密码 rocketm2设置_开发语言_05,第5张



PullConsumer 配置 



rockemq 设置密码 rocketm2设置,rockemq 设置密码 rocketm2设置_List_06,第6张



Message 数据结构




rockemq 设置密码 rocketm2设置,rockemq 设置密码 rocketm2设置_分布式_07,第7张



https://www.xamrdz.com/lan/5js1937213.html

相关文章: