当前位置: 首页>后端>正文

消息队列 处理日志 消息队列常见问题

为什么使用消息队列?

  1. 应用解耦
  2. 流量削峰
  3. 异步处理
  4. 消息通讯
  5. 远程调用

消息队列如何解决消息丢失问题?

消息队列 处理日志 消息队列常见问题,消息队列 处理日志 消息队列常见问题_大数据,第1张

因此保证MQ不丢失消息可以从这三个阶段来进行阐述:

  • 生产者保证不丢消息
  • 存储端不丢消息
  • 消费者不丢消息

生产者保证不丢消息

生产段保证不丢消息就是确保生产的消息能够到达存储端。

如果是RocketMQ消息中间件,Producer生产者提供了三种发送消息的方式,分别是:

  • 同步发送
  • 异步发送
  • 单向发送

生产者要想发消息时保证消息不丢失,可以:

  • 采用同步方式发送,send消息方法返回成功状态,就表示消息正常到达了存储端Broker。
  • 如果send消息异常或者返回非成功状态,可以重试。
  • 可以使用事务消息,RocketMQ的事务消息机制就是为了保证零丢失来设计的。

存储端不丢消息

确保消息持久化到磁盘,很容易想到的就是刷盘策略。

磁盘机制分同步和异步刷盘:

  • 生产者消息发送过来的时候,只有持久化到磁盘,RocketMQ的存储端Broker才返回一个成功的ACK相应,这就是同步刷盘。它保证消息不丢失,但是影响了性能。
  • 异步刷盘的话,只要消息写入PageCache缓存,就返回一个成功的ACK响应。这样提高了MQ的性能,但是如果这个时候机器断电了,就会丢失消息。

Broker一般都是集群部署的,有master主节点和slave从节点。消息到Broker存储端,只有主节点和从节点都写入成功,才反馈成功的ACK给生产者,这就是同步复制,它保证消息不丢失,但是降低了系统的吞吐量。与之对应的就是异步复制,只要消息写入主节点成功,就返回成功的ACK,它速度快但是会有性能问题。

消费阶段不丢消息

消费者执行完业务逻辑,再反馈给Broker说消费成功,这样才可以保证消费阶段消息不丢失。

消息队列如何保证消息的顺序性?

有些业务对消息的顺序是有要求的,比如先下单后付款,最后再完成订单,这样等。假设生产者先后产生了两条消息,分别是下单消息(M1),付款消息(M2),M1比M2先产生,如何保证M1比M2先消费呢?

消息队列 处理日志 消息队列常见问题,消息队列 处理日志 消息队列常见问题_大数据_02,第2张

为了保证消息的顺序性,可以将M1和M2发送到同一个Server上,当M1发送完收到ACK后,M2再发送。如图:

消息队列 处理日志 消息队列常见问题,消息队列 处理日志 消息队列常见问题_java_03,第3张

但是这样可能是有问题的,因为MQ服务器到消费端,可能存在网络延迟,虽然M1先发送,但是它比M2晚到。

消息队列 处理日志 消息队列常见问题,消息队列 处理日志 消息队列常见问题_大数据_04,第4张

那还能怎么办才能保证消息的顺序性呢?将M1和M2发往同一个消费者,且发送M1后,等到消费端ACK成功后,才发送M2就行了。

消息队列 处理日志 消息队列常见问题,消息队列 处理日志 消息队列常见问题_消息队列_05,第5张

消息队列保证顺序性的整体思路就是这样。比如Kafka的全局有序消息,就是这种思想的体现:就是生产者发消息时,1个Topic只能对应1个Partition,一个Consumer,内部单线程消费。

但是这样吞吐量太低,一般保证消息局部有序即可(不需要整体有序)。在发消息的时候指定Partition Key,Kafka对其进行Hash计算,根据计算结果决定放入哪个Partition。这样Partition Key相同的消息会放在同一个Partition。然后多消费者单线程消费指定的Partition。

消息队列有可能发生重复消费,如何避免,如何做到幂等?

消息队列是有可能发生重复消费的。

  • 生产端为了保证消息的可靠性,他可能往MQ服务器重复发送消息,直到拿到成功的ACK。
  • 再然后就是消费端,消费端消费消息一般是这个流程:拉取消息业务逻辑处理提交消费位移。假设业务逻辑处理完,事务提交了,但是需要更新消费位移时,消费者却挂了,这个时候另一个消费者就会拉到重复消息了。

如果幂等处理重复消息呢?

搞一个本地表,带唯一业务标记的,利用主键或者唯一性索引,每次处理业务先校验一下就可以了。又或者是用Redis缓存一下业务标记,每次看一下是否处理过了。

如何处理消息队列的消息积压问题?

消息积压是因为生产者的生产速度,大于消费者是消费速度。遇到消息积压的时候,我们需要先排查,是不是有BUG了。

如果不是BUG,我们可以优化一下消费的逻辑,比如之前是一条一条消息处理的话,我们可以确认是不是可以优为批量处理消息。如果还是满,我们也可以考虑水平扩容,增加Topic的队列数,和消费组机器的数量,提升整体的消费能力。

如果是BUG导致几百万消息持续积压几个小时。又该如何处理呢?需要解决BUG,临时紧急扩容。大概思路如下:

  • 先修复consumer消费者的问题,以确保其恢复消费速度,然后将现有consumer都停掉。
  • 新建一个topic,partition是原来的10倍,临时建立好原先10倍的queue数量。
  • 然后写一个临时的分发数据的consumer程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询吸入临时简历好的10倍数量的queue。
  • 接着临时用10倍的机器来部署consumer,每一批consumer消费一个临时queue的数据,这种做法相当于是临时将queue资源和consumer资源扩大10倍,以正常的速度来消费数据。
  • 等快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的consumer机器来消费消息。

如何保证数据一致性,事务消息如何实现?

事务消息:事务就是为了一些更新操作要么都成功,要么都失败,不会有中间状态的产生的消息。

一条普通的MQ消息,从产生到消费,大概流程如下:

消息队列 处理日志 消息队列常见问题,消息队列 处理日志 消息队列常见问题_分布式_06,第6张

那么如何保证数据一致性呢?可以使用事务消息。一起来看一下事务消息是如何实现的吧!

消息队列 处理日志 消息队列常见问题,消息队列 处理日志 消息队列常见问题_消息队列 处理日志_07,第7张

让你写一个消息队列,该如何进行架构设计?

这其实并不要求你真的可以写出来,只是进行设计而已。

我们可以从这几个方向来进行思考:

  • 消息队列的整体流程
  • RPC如何设计
  • broker如何考虑持久化
  • 消费关系如何保存
  • 消息可靠性如何保证
  • 消息队列的高可用
  • 消息事务特性
  • MQ的伸缩性和可拓展性

消息队列和RPC之间有什么关系?

RPC:远程过程调用,主要解决远程通信间的问题,不需要了解底层网络的通信机制。

RPC经历的步骤

  • 建立通信:A机器想要调用B机器,首先得建立起通信连接,主要是通过在客户端和服务器之间建立TCP连接。
  • 服务寻址:A服务器上如何连接到B服务器以及待定的端口,方法的名称是什么。
  • 网络传输
  • 序列化:当A服务器上的应用发起一个RPC调用时,调用方法和参数数据都需要先进行序列化。
  • 反序列化:当B服务器接收到A服务器的请求之后,又需要对接收到的参数等信息进行反序列化操作。
  • 服务调用:B服务器进行本地调用(通过代理Proxy)之后得到了返回值,此时还需要再把返回值发送回A服务器,同样也需要经过序列化操作,然后再经过网络传输将二进制数据发送回A服务器。
    通常,一次完整的PRC调用需要经历如上4个步骤。

RPC和MQ的区别和关联

  • 在架构上,RPC和MQ的差异点是,Message有一个Message Queue可以把消息存储。
    RPC:

消息队列 处理日志 消息队列常见问题,消息队列 处理日志 消息队列常见问题_大数据_08,第8张

MQ:

消息队列 处理日志 消息队列常见问题,消息队列 处理日志 消息队列常见问题_大数据_09,第9张

  • 同步调用:对于要立即等待返回处理结果的场景,RPC是首选。
  • MQ是异步调用,RPC是立即返回,是同步调用,随着业务增长可以把同步调用改成异步调用。



https://www.xamrdz.com/backend/3w21964251.html

相关文章: