1、Kafka rebalance机制
kafka是以消费者组进行消费,一个消费者组,由多个consumer组成,他们和topic的消费规则如下:
topic的一个分区只能被消费组中的一个消费者消费。
消费者组中的一个消费者可以消费topic一个或者多个分区。
那么group中每个消费者consumer消费topic中的哪个partition是由谁决定的呢?
1.1 消费者分区策略
主要有三种rebalance的策略:range(范围)、round-robin(轮询)、sticky(粘性)、CooperativeSticky。
Kafka 提供了消费者客户端参数partition.assignment.strategy 来设置消费者与订阅主题之间的分区分配策略。默认情况为range分配策略。
假设一个主题有10个分区(0-9),现在有三个consumer消费:
range策略:就是按照分区序号排序(范围分配),假设 n=分区数/消费者数量 = 3, m=分区数%消费者数量 = 1,那么前 m 个消费者每个分配 n+1 个分区,后面的(消费者数量-m )个消费者每个分配 n 个分区。
比如分区03给一个consumer,分区46给一个consumer,分区7~9给一个consumer。round-robin策略就是轮询分配
比如分区0、3、6、9给一个consumer,分区1、4、7给一个consumer,分区2、5、8给一个consumerSticky是粘性的意思,它是从 0.11.x 版本开始引入这种分配策略,首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,在rebalance会尽量保持原有分配的分区不变化,这样可以节省开销。
Cooperative Sticky和Sticky类似,但是它会将原来的一次大规模rebalance操作,拆分成了多次小规模的rebalance,直至最终平衡完成,所以体验上会更好。
默认策略是Range + CooperativeSticky
1.2 rebalance(再均衡)
再均衡:在同一个消费者组当中,分区的所有权从一个消费者转移到另外一个消费者,比如consumer group中某个消费者挂了,此时会自动把分配给他的分区交给其他的消费者,如果他又重启了,那么又会把一些分区重新交还给他。
Kafka 再平衡是外部触发导致的,触发 Kafka 再平衡的有以下几种情况:
- 消费组成员发生变更,有新消费者加入或者离开,或者有消费者宕机;
消费者并不一定需要真正下线,例如遇到长时间的 GC 、网络延迟导致消费者长时间未向Group Coordinator发送心跳等情况时,GroupCoordinator 会认为消费者己下线 - 消费组订阅的主题数量发生变更;
- 消费组订阅的分区数发生变更。
rebalance过程中,消费者无法从kafka消费消息,这对kafka的TPS会有影响,如果kafka集群内节点较多,比如数百个,那重平衡可能会耗时极多,所以应尽量避免在系统高峰期的重平衡发生。
1.3 rebalance的影响
Rebalance对我们数据的影响主要有以下几点:
1、可能重复消费: Consumer被踢出消费组,可能还没有提交offset,Rebalance时会Partition重新分配其它Consumer,会造成重复消费,虽有幂等操作但耗费消费资源,亦增加集群压力
2、集群不稳定:Rebalance扩散到整个ConsumerGroup的所有消费者,因为一个消费者的退出,导致整个Group进行了Rebalance,并在一个比较慢的时间内达到稳定状态,影响面较大
3、影响消费速度:频繁的Rebalance反而降低了消息的消费速度,大部分时间都在重复消费和Rebalance
1.4 避免rebalance措施
1、业务需要不可避免(分区、消费者扩容)
(1)针对分区个数的增加, 一般不会常有,是需要增加的时候都是业务及数据需求,不可避免
(2)对Topic的订阅增加或取消亦不可避免
2、合理设置消费者参数
下边是我们遇到的,要格外关注及重视
(1)未能及时发送心跳而Rebalance
session.timeout.ms 一次session的连接超时时间
heartbeat.interval.ms 心跳时间,一般为超时时间的1/3,Consumer在被判定为死亡之前,能够发送至少 3 轮的心跳请求
(2)Consumer消费超时而Rebalance
max.poll.interval.ms 每隔多长时间去拉取消息。合理设置预期值,尽量但间隔时间消费者处理完业务逻辑,否则就会被coordinator判定为死亡,踢出Consumer Group,进行Rebalance
max.poll.records 一次从拉取出来的数据条数。根据消费业务处理耗费时长合理设置,如果每次max.poll.interval.ms 设置的时间较短,可以max.poll.records设置小点儿,少拉取些,这样不会超时。
总之,尽可能在max.poll.interval.ms时间间隔内处理完max.poll.records条消息,让Coordinator认为消费Consumer还活着
2、消费者位移offset管理
Offset记录着下一条将要发送给Consumer的消息的序号。
Offset从语义上来看拥有两种:Current Offset和Committed Offset。
2.1 Current Offset
Current Offset保存在Consumer客户端中,它表示Consumer希望收到的下一条消息的序号。它仅仅在poll()方法中使用。例如,Consumer第一次调用poll()方法后收到了20条消息,那么Current Offset就被设置为20。这样Consumer下一次调用poll()方法时,Kafka就知道应该从序号为21的消息开始读取。这样就能够保证每次Consumer poll消息时,都能够收到不重复的消息。
2.2 Committed Offset
Committed Offset保存在Broker上,它表示Consumer已经确认消费过的消息的序号。主要通过commitSync和commitAsync
API来操作。举个例子,Consumer通过poll() 方法收到20条消息后,此时Current Offset就是20,经过一系列的逻辑处理后,并没有调用consumer.commitAsync()或consumer.commitSync()来提交Committed Offset,那么此时Committed Offset依旧是0。
Committed Offset主要用于Consumer Rebalance。在Consumer Rebalance的过程中,一个partition被分配给了一个Consumer,那么这个Consumer该从什么位置开始消费消息呢?答案就是Committed Offset。另外,如果一个Consumer消费了5条消息(poll并且成功commitSync)之后宕机了,重新启动之后它仍然能够从第6条消息开始消费,因为Committed Offset已经被Kafka记录为5。
总结一下,Current Offset是针对Consumer的poll过程的,它可以保证每次poll都返回不重复的消息;而Committed Offset是用于Consumer Rebalance过程的,它能够保证新的Consumer能够从正确的位置开始消费一个partition,从而避免重复消费。
在Kafka 0.9前,Committed Offset信息保存在zookeeper的[consumers/{group}/offsets/{topic}/{partition}]目录中(zookeeper其实并不适合进行大批量的读写操作,尤其是写操作)。而在0.9之后,所有的offset信息都保存在了Broker上的一个名为__consumer_offsets的topic中。
Kafka集群中offset的管理都是由Group Coordinator中的Offset Manager完成的。
2.3 Group Coordinator
Group Coordinator是运行在Kafka集群中每一个Broker内的一个进程。它主要负责Consumer Group的管理,Offset位移管理以及Consumer Rebalance。
对于每一个Consumer Group,Group Coordinator都会存储以下信息:
- 订阅的topics列表
- Consumer Group配置信息,包括session timeout等
- 组中每个Consumer的元数据。包括主机名,consumer id
- 每个Group正在消费的topic partition的当前offsets
- Partition的ownership元数据,包括consumer消费的partitions映射关系
Consumer Group如何确定自己的coordinator是谁呢? 简单来说分为两步:
确定Consumer Group offset信息将要写入__consumers_offsets topic的哪个分区。具体计算公式:
__consumers_offsets partition# = Math.abs(groupId.hashCode() % offsets.topic.num.partitions) //offsets.topic.num.partitions默认值为50。
该分区leader所在的broker就是被选定的coordinator
2.4 Offset存储模型
由于一个partition只能固定的交给一个消费者组中的一个消费者消费,因此Kafka保存offset时并不直接为每个消费者保存,而是以 groupid-topic-partition -> offset 的方式保存。
因此consumer poll消息时,已知groupid和topic,又通过Coordinator分配partition的方式获得了对应的partition,自然能够通过Coordinator查找__consumers_offsets的方式获得最新的offset了。