当前位置: 首页>后端>正文

深入了解Kafka可靠性(二)

前言

我们知道多副本机制的设计让 Kafka 提升了备份容灾的能力,能实现故障自动转移,但是也带来了新的问题——数据丢失数据不一致。接下来,我们一起看下 Kafka 面对这两个问题是如何处理的。

在此之前,先简单复习几个相关的概念:

  • 副本( Replica )是分布式系统中常见的概念之一,指的是分布式系统对数据和服务提供的一种冗余方式。
  • 这篇文章提及的副本是相对分区而言的,即副本是指某个分区的副本
  • 在多副本情况下,其中一个副本为 leader,其它均为 follower。只有 leader 对外提供服务,follower 仅同步leader 数据
  • 分区中所有的副本集合称为 AR,ISR 是与 leader 保持同步状态的副本集合,leader 也是 ISR 中的一员
  • LEO 是每个分区下一条消息要写入的位置
  • HW 是 ISR 中最小的 LEO,消费者只能拉取 HW 之前的消息

失效副本

正常情况下,分区中所有副本都应该属于 ISR,但是网络具有不可靠性。因此,难免在某一个时刻会有一些成员会被踢出 ISR,这些副本要么处于同步失效状态,要么功能失效,这些副本统称为失效副本

功能失效指的是无法工作,比如某个 broker 宕机了,那么在它上面的分区就会失效。

同步失效又是怎么判断是否同步失效的呢?是通过参数 replica.lag.time.max.ms 来判断的,默认是 10000 毫秒。当某个 follower 同步滞后 leader 的时间超过 10 秒,则判定为同步失效

具体实现原理如下


深入了解Kafka可靠性(二),第1张
副本失效判定.png

当 follower 将 leader LEO 之前的消息全部同步完成,那么会认为该 follower 已经追上 leader,并更新 lastCaughtUpTimeMs。Kafka 的副本管理器有一个副本过期检测的定时任务,如果发现 当前时间 - lastCaughtUpTimeMs > 10秒,则判定同步失效。

除了时间设置以外,还有另一个参数 replica.lag.max.message (默认4000,这个是 broker 级别的参数),也是用来判定失效副本的。

一般情况下,这两个参数都是使用默认值就好了,因为如果没有调优经验,自己乱配置,容易导致 ISR 变动过于频繁。同时,需要监控失效副本的数量,因为它是衡量 Kafka 是否健康的一个很重要的指标。

PS:新加入的副本因子/宕机恢复后重新加入的副本在追赶上 leader 之前,也会一直处于失效状态。

失效副本的作用

失效副本为 Kafka 带来了什么收益呢?为什么需要设计这么一个状态呢?
大家不妨试想下:假设允许 ISR 中有一个副本同步一直跟不上 leader。当 leader 发生宕机时,这个 follower 被选举成了新的 leader,那么这时就会发生消息丢失。
一般会造成副本失效都是以下两个原因:

  • follower 副本进程卡顿,在一段时间内无法发起同步请求,比如说频繁发生 FULL GC
  • follower 同步过慢,在一段时间内无法追上 leader,比如 I/O有问题(笔者实际工作中遇到过一次,公司搭建自己的物理机房,用了二手服务器,有一台服务器I/O老化导致读写数据慢,导致了副本失效,消息堆积等问题)

LEO 与 HW

这一小节会更进一步去讲解它们之间的关系,让大家可以更清楚 Kafka 的副本同步机制。

假设现在有 3 个 broker,某个 topic 有 1 个分区,3 个副本。现在有一个 producer 发送了一条消息,这 3 个副本会发生些什么操作。


深入了解Kafka可靠性(二),第2张
异步拉取leader日志.png

具体步骤如下:

  1. producer 发送消息到 leader
  2. leader 将消息追加到日志,并且更新日志的偏移量
  3. follower 执行定时任务向 leader 发送 fetch request 同步数据,该请求会带上自己的 LEO
  4. leader 读取本地日志,并更新 follower 的信息
  5. leader 返回 fetch response 给 follower,response 会包含 HW
  6. follower 将消息追加到本地日志,并更新日志的偏移量

为了更直观地理解上面的步骤,下面将会用图来展示

  1. 一个新建的 topic 被写入了 5 条消息,两个 follower 去拉取数据


    深入了解Kafka可靠性(二),第3张
    同步消息初始状态.png
  2. leader 给 follower 返回 fetch response,并且 leader 又被写入了 5 条消息


    深入了解Kafka可靠性(二),第4张
    follower同步消息第一次更新HW.png

    其中 follower1 同步了 2 条数据,而 follower2 同步了 3 条数据。
    而 follower 的 HW = min(自己的LEO, 同步回来的HW)

  3. follower 再次同步数据,同时 leader 又被写入了 5 条消息


    深入了解Kafka可靠性(二),第5张
    follower第2次同步消息.png

    leader 更新了 HW

  4. leader 给 follower 返回 fetch response


    深入了解Kafka可靠性(二),第6张
    follower同步消息第2次更新HW.png

    根据公式,follower 更新 HW = 3

在一个分区中,leader 所在 broker 会记录所有副本的 LEO 和 自己的 HW;而 follower 所在的 broker 只会记录自己的 LEO 和 HW。因此,在逻辑层面上,我们可以得到下图


深入了解Kafka可靠性(二),第7张
LEO和HW在各个副本中的维护情况.png

0.11.0.0版本之前,Kafka 是基于 HW 的同步机制,但是这个设计有可能出现数据丢失数据不一致的问题。Kafka 后面的版本通过 leader epoch 来进行优化。

数据丢失 & 数据不一致的解决方案

我们先一起来看下这两问题是如何产生的。

数据丢失

假设某一分区在某一时刻的状态如下图(L 代表是 leader)


深入了解Kafka可靠性(二),第8张
LEO HW更新机制数据丢失场景1.png

可以看见副本A的 LEO 是 2,HW 是 1;副本B的 LEO 是 2,HW 是 2。显然,哪怕没有新的消息写入副本B中,副本A也要过一小段时间才能追上副本A,并更新 HW。

假设在副本A更新 HW = 2之前,A宕机了,随后立马就恢复。这里会有一个截断机制——根据宕机之前持久化的HW 恢复消息。也就是说,A只恢复了 m1,m2 丢失了

再假设 A 刚恢复,B 也宕机了,A 成为了 leader。这时 B 又恢复了,并成为了 follower。由于 follower 的 HW 不能比 leader 的 HW 高,所以 B 的 m2 也丢失了

深入了解Kafka可靠性(二),第9张
LEO HW更新机制数据丢失场景2.png

总结:这里大家可以发现 follower 的 HW 更新是有一定间隙的,像我这个例子其实 follower 是拿到 m2 了,只不过 HW 要追上 leader 需要等下一次的 fetch request。除非配置 acks=-1 并且配置 min.insync.replicas 大于 1,unclean.leader.election.enable = true 才行。

数据不一致

深入了解Kafka可靠性(二),第10张
LEO HW更新机制数据不一致场景1.png

假设某一分区在某一时刻,副本A 的 HW = 2,LEO = 2;副本B 的 HW = 1,LEO = 1。
又假设它们同时挂了,B 先恢复。这时,B 会成为 leader,如下图


深入了解Kafka可靠性(二),第11张
LEO HW更新机制数据不一致场景2.png

此时,B 写入新消息 m3,并将 HW、LEO 更新为 2。此时,A 也恢复了。由于 A 的 HW 也是 2,所以没有截断消息。如下图
深入了解Kafka可靠性(二),第12张
LEO HW更新机制数据不一致场景3.png

这样一来,A 中 offset = 1 的消息是 m2,B 中 offset = 1 的消息是 m3,数据不一致了。

leader epoch

为了优化上面这两个问题,Kafka 从 0.11.0.0 开始引入 leader epoch,在需要截断时使用 leader epoch 作为依据,而不再是 HW。

如果看框架代码比较多的同学应该知道 epoch 是相当于版本的这么一个东西。leader epoch 的初始值是 0,每变更一次 leader,leader epoch 就会增加 1。另外,每个副本中还会增加一个矢量<LeaderEpoch => StartOffset>,其中 StartOffset 是当前 leader epoch 下写入第一条消息的偏移量。每个副本的 Log 下都有一个 leader-epoch-checkpoint 文件,在发生 leader 变更时,会将对应的矢量追加到这个文件中。

解决数据丢失问题

深入了解Kafka可靠性(二),第13张
解决数据丢失1.png

还是上面的例子,只不过多了 leader epoch 矢量信息。

副本A:HW=1,LEO=2,LE(leader epoch)=0,Offset(StartOffset)=0

leader-副本B:HW=2,LEO=2,LE=0,Offset(StartOffset)=0

假设在副本A更新 HW = 2之前,A宕机了,随后立马就恢复。不过这里不会立马进行截断日志操作,而是会发送一个 OffsetsForLeaderEpochRequest 请求给 B,B 作为目前的 leader 在收到请求之后会返回 OffsetsForLeaderEpochResponse 给 A。

我们先来看下 OffsetsForLeaderEpochRequest 和 OffsetsForLeaderEpochResponse 的数据结构。如下图

  • OffsetsForLeaderEpochRequest


    深入了解Kafka可靠性(二),第14张
    OffsetsForLeaderEpochRequest请求体结构.png

    A 会将自己的 leader epoch 信息给 leader(A的 leader epoch 这里简化成 LE_A)。这里会出现两种情况:

    • 变更了 leader

      B 会返回 LE_A+1 的 StartOffset 给 A

    • 没有变更 leader

      B 会返回 A 的 LEO 给 A

    因此,我们可以把 OffsetsForLeaderEpochRequest 看作是一个查询 follower 当前 leader_epoch 的 LEO。

  • OffsetsForLeaderEpochResponse


    深入了解Kafka可靠性(二),第15张
    OffsetsForLeaderEpochResponse响应体结构.png

    这个例子中,B 会返回2给 A,而此时的 A 的 LEO 刚好是 2,所以不用进行截断日志。如下图


    深入了解Kafka可靠性(二),第16张
    解决数据丢失2.png

    如果此时B挂了,A成了 leader,并有 m3 写入,就会得到下图
    深入了解Kafka可靠性(二),第17张
    解决数据丢失3.png

    可以看见 m2 并没有丢失,并且也更新了 leader_epoch 矢量为 (1,2)。

解决数据不一致问题

深入了解Kafka可靠性(二),第18张
解决数据不一致1.png

还是前面的例子。副本A是 leader,B 是 follower。

A 的 HW=2,LEO=2,LE=(0,0)

B 的 HW=1,LEO=1,LE=(0,0)

此时,A 和 B 同时宕机,并且 B 先恢复成为了 leader。此时,epoch 变成了 1。另外,新消息 m3 成功写入,就会得到下图


深入了解Kafka可靠性(二),第19张
解决数据不一致2.png

接着,A 也恢复了,这时 A 不会急着截断日志,而是给 leader 发送 OffsetsForLeaderEpochRequest,B 会返回 LEO = 1 给 A。因此,A 会截断日志,删除 m2。之后,再给 B 发送 fetch request,得到 B 的响应并更新后,将得到下图


深入了解Kafka可靠性(二),第20张
解决数据不一致3.png

这样数据不一致问题就解决了。

这里大家可能会有疑问,m2不是丢失了吗?是的,这种设计因为更新具有一定的间隙,并且没有事务管理,所以会有丢失消息的风险。

从 CAP 定理来看,这里的设计属于 AP。为什么这么说呢?大家不妨想一下,如果为了不丢失数据,这里加了事务控制的设计,那么对于分区而言它的吞吐量是会下降的,甚至是不可用的,因为响应速度是由短板的副本所决定的。对于定位是高吞吐量的 Kafka 而言,这显然是不可接受的。

总结

Kafka 通过多副本机制增强了备份容灾的能力,并且基于多副本机制实现了故障转移,避免了单点问题,但同时也引进了新的问题——数据丢失和数据不一致。从 0.11.0.0 版本开始,Kafka 增加了 leader epoch,它对这两个问题进行了优化。虽然无法完全避免消息丢失,但是从实际的使用角度而言,这个问题其实并不大。有实际工作经验的同学应该都知道,我们发送消息难以避免需要重推,哪怕消息中间件做到了百分百不丢失,其实我们在使用时仍然会做防止消息丢失的设计。相对而言,数据一致性就更重要了,否则很容易让订阅消息的下游系统出现脏数据。


https://www.xamrdz.com/backend/3f21941044.html

相关文章: