所谓天才,只不过是把别人喝咖啡的功夫都用在工作上了。
????????????????????????????????????????????????????????????????????????????????——鲁迅
大纲
RocketMQ其他功能
1.消息轨迹
? ??源码目录下也有说明文档:
????rocketmq-all-4.8.0-source-release\docs\cn\msg_trace\user_guide.md
(1)配置
? ??Broker 端服务器开启配置:traceTopicEnable=true?
????RocketMQ 集群中每一个 Broker 节点均用于存储 Client 端收集并发送过来的消息轨迹数据。因此,对于 RocketMQ 集群中的 Broker 节点数量并无要求和限制。
????对于消息轨迹数据量较大的场景,可以在 RocketMQ 集群中选择其中一个 Broker 节点专用于存储消息轨迹,使得用户普通的消息数据与消息轨迹数据的物理 IO 完全隔离,互不影响。在该模式下,RockeMQ 集群中至少有两个 Broker 节点,其中一个 Broker 节点定义为存储消息轨迹数据的服务端。
(2)保存消息轨迹的Topic定义
? ??RocketMQ 的消息轨迹特性支持两种存储轨迹数据的方式:
????系统级的 TraceTopic
????在默认情况下,消息轨迹数据是存储于系统级的 TraceTopic 中(其名称为:RMQ_SYS_TRACE_TOPIC,队列个数为 1)。该 Topic 在 Broker 节点启动时,会自动创建出来(如上所叙,需要在 Broker 端的配置文件中将 traceTopicEnable 的开关变量设置为 true)。
????用户自定义的 TraceTopic
????如果用户不准备将消息轨迹的数据存储于系统级的默认 TraceTopic,也可以自己定义并创建用户级的 Topic 来保存轨迹(即为创建普通的 Topic用于保存消息轨迹数据)。自定义的话需要在 Client 客户端的处理的时候自定义的 TraceTopic。具体见案例。一般推荐使用系统及的 TraceTopic。
(3)案例
? ??代码在 example 目录下,tracemessage 目录下。
????RocketMQ 在消息审核消费时采用对原来接口增加一个开关参数(enableMsgTrace)来实现消息轨迹是否开启;并新增一个自定义参数(customizedTraceTopic)来实现用户存储消息轨迹数据至自己创建的用户级 Topic。
????对于定义轨迹的主题,需要先创建这个主题才能收到消息,RocketMQ 不会自动创建该主题。
(4)关键属性
(5)源码解读
? ??代码实现的核心类是 AsyncTraceDispatcher
????org.apache.rocketmq.client.trace.AsyncTraceDispatcher
? ??记录消息的轨迹主要是集中在消息发送前后、消息消费前后,可以通过 RokcetMQ 的 Hook 机制。通过如下两个接口来定义钩子函数
????SendMessageHook
ConsumeMessageHook
? ??通过实行上述两个接口,可以实现在消息发送、消息消费前后记录消息轨迹,为了不明显增加消息发送与消息消费的时延,记录消息轨迹最好使用 异步发送模式。
? ??核心步骤如下:
????1:遍历收集的消息轨迹数据
????2:获取存储消息轨迹的 Topic
????3:对 TraceContext 进行编码,这里是消息轨迹的传输数据。
????4:将编码后的数据发送到 Broker 服务器。
2.权限控制
? ??在 RocketMQ4.4.0 版本升级中加入了 ACL 权限管控,ACL 是 access control list 的简称,俗称访问控制列表。访问控制,基本上会涉及到用户、资源、 权限、角色等概念。
????用户的概念,即支持用户名、密码。
????资源:需要保护的对象,消息发送涉及的 Topic、消息消费涉及的消费组,应该进行保护,故可以抽象成资源。????
????权限:针对资源,能进行的操作。 角色:RocketMQ 中,只定义两种角色:是否是管理员。
(1)配置
? ??acl 默认的配置文件名:plain_acl.yml,需要放在${ROCKETMQ_HOME}/store/config 目录下
? ??需要使用 acl 必须在服务端开启此功能,在 Broker 的配置文件中配置,aclEnable = true 开启此功能
????RocketMQ 的权限控制存储的默认实现是基于 yml 配置文件。用户可以动态修改权限控制定义的属性,而不需重新启动 Broker 服务节点,因为Broker端有文件监听机制,每隔 500ms 监听、处理、加载文件的变更内容。
????如果 ACL 与高可用部署(Master/Slave 架构)同时启用,那么需要在 Broker Master 节点的${ROCKETMQ_HOME}/store/conf/plain_acl.yml 配置文件中 设置全局白名单信息,即为将 Slave 节点的 ip 地址设置至 Master 节点 plain_acl.yml 配置文件的全局白名单中。
(2)案例实战
? ??在 Broker 的配置文件中配置,aclEnable = true 开启此功能
? ? 代码得加入一个返回 RPCHook 的方法
????如果权限受限制:
????加入权限对应用户:
????发送成功。
????很多时候像控制台配置了后会报错。
????把全局的白名单打开。
常见问题
1.MQ百万消息积压处理
? ??发生了线上故障,几千万条数据在 MQ 里积压很久。是修复 consumer 的问题,让他恢复消费速度,然后等待几个小时消费完毕?这是个解决方案。 不过有时候我们还会进行临时紧急扩容。
????一个消费者一秒是 1000 条,一秒 3 个消费者是 3000 条,一分钟是 18 万条。1000 多万条,所以如果积压了几百万到上千万的数据,即使消费者恢复了,也需要大概 1 小时的时间才能恢复过来。
????一般如果消息不重要的话就在 consume 上直接释放掉。
????如果 topic 的 messageQueue 设置的比较多,比如设置了 20 个,consume 实例只有 4 个,那么每个 consume 实例对应 5 个 messageQueue,这个时候可以申请临时加机器,增加 consume 实例为 20 个,达到快速消费的目的。
????如果 messageQueue 设置的比较少,比如只设置了 4 个,那么这个时候就不能通过加 consume 机器来解决了,这时候就需要修改消费者代码了,不再消费者消费,而是把要消费的消息放到 mq 的另一个 topic 中,这个 topic 设置 20 个 messageQueue,对应 20 个 consume 实例,进行消费。
2.消息幂等
(1)消息重复的场景
? ?生产者
? ??重试机制导致的问题,消息成功发送到 MQ 中,但 MQ 因网络原因未能成功返回,导致重试机制重复发送到 MQ 。