概述
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。
是一种高吞吐量的分布式发布订阅消息系统,以容错的方式记录消息流,以文件的方式来存储消息流,它可以处理消费者在网站中的所有动作流数据(包括网页浏览,搜索、功能统计等行为)。
Kafka的目的是通过集群来提供实时的消息消息管道和消息处理。
消息系统一般有两种应用模式:1、队列;2、发布订阅,显然kafka属于第二种。
下面从如下几个方面介绍下其相关理论:
目录
概述
架构
核心知识点:
部署方式:
优缺点分析
缺点:
常见应用场景:
调优经验:
API应用:
架构
核心知识点:
1、Zookeeper:
kafka集群依赖zookeeper来保存集群的的元信息,来保证系统的可用性。
2、Broker:
处理节点,一个Kafka节点就是一个broker,一个或者多个Broker可以组成一个Kafka集群,整个集群可运行在一个或多个(可跨不同数据中心)服务器上。
3、Message:
消息,每一条发送的消息主体。
4、Topic:
消息主题,Kafka根据topic对消息进行归类,kafka的数据就保存在topic,发布到Kafka集群的每条消息都需要指定一个topic。
5、Partition:
topic的分区,对topic从物理上进行分组,每个topic可以有多个分区,其数据被分散存储到多个partition。同一个topic在不同的分区的数据是不重复的,每个partition是一个有序队列,表现形式就是一个一个的文件夹!
在写入数据到topic时:(支持自定义分区,通过覆盖Partitioner下的 configure 和 partition 方法,实现自定义选择分区逻辑)
PS
1)、没有设置key的消息就会被轮训的发送到不同的分区。
2)、设置了key,kafka自带的分区控制,会根据key计算出来一个hash值,这个hash值会对应某一个分区。如果key相同的,那么hash值必然相同,key相同的值,必然是会被发送到同一个分区。
6、Segment:
分区的数据段,partition物理上由多个segment组成,每个segment存着message信息。
每个Partition都会对应一个日志目录:{topicName}-{partitionid}/,在目录下面会对应多个日志分段(LogSegment)。LogSegment文件由三部分组成,分别为“.index”文件、“.log”文件、.timeindex文件(早期版本中没有),log文件就实际是存储message的地方,而index和timeindex文件为索引文件,用于检索消息。
索引文件使用稀疏索引的方式,避免对日志每条数据建索引,节省存储空间。kafka就是利用分段 + 索引的方式来解决查找效率的问题。
其中每个.log文件默认大小为1G,文件的命名是以该segment最小offset来命名的,如x.log,x.index,x.timeindex, 其中x取值为0到368795之间。
7、Replication:
分区的副本,包括主分区(Leader)和备份分区(Follower),同一个partition可能会有多个Replication,多个副本之间的数据是一样的。
副本的作用是做备份,当主分区(Leader)故障的时候会选择一个备份(Follower)上位,成为Leader。
在kafka中默认副本的最大数量是10个,且副本的数量不能大于Broker的数量,follower和leader绝对是在不同的机器,同一机器对同一个分区也只可能存放一个副本(包括自己)。
8、ISR:
同步副本集合(in-sync replicas),每个分区的元数据信息都存储在一个ISR集合里。 在该集合里的副本具有如下特点:
a、每一个副本都与leader保持同步
b、才有资格被选举为leader
c、提交到kafka的消息只有该集合中的副本都接收到才认为“已提交”成功。
9、offset:
记录偏移量,最大值为64位的long大小(0到2^64 -1),Kafka通过offset保证消息在分区内的顺序,offset的顺序性不跨分区。Kafka0.10以后,使用一个专门的topic __consumer_offset保存offset。__consumer_offset日志留存方式为compact,也就是说,该topic会对key相同的消息进行整理。
kafka提交offset的方式,通过设置enable.auto.commit=true(默认) 若自己控制offset的提交方式,设置enable.auto.commit=false
kafka提供三种方式来配置offset的消费处理,通过设置auto.offset.reset = earliest(默认)
earliest : 当各分区下有已提交的 Offset 时,从“提交的 Offset”开始消费;无提交的Offset 时,从头开始消费;
latest : 当各分区下有已提交的 Offset 时,从提交的 Offset 开始消费;无提交的 Offset时,消费新产生的该分区下的数
none : Topic 各分区都存在已提交的 Offset 时,从 Offset 后开始消费;只要有一个分区不存在已提交的 Offset,则抛出异常。
在kafka内部,将offset的相关信息,保存在__consumer_offset内:
Consumer group组元数据消息
Consumer group位移消息
Tombstone消息
offset的信息内容如下:
Log End Offset:Producer 写入到 Kafka 中的最新一条数据的 offset
High Watermark:已经成功备份到其他 replicas 中的最新一条数据的 offset,也就是说 Log End Offset 与 High Watermark 之间的数据已经写入到该 partition 的 leader 中,但是还未成功备份到其他的 replicas 中
LastCommittedOffset: 上次提交记录
CurrentPosition: 当前位置
offset的具有如下优点:
1)、消费者可以根据需求,灵活指定offset消费。
2)、保证了消息不变性,为并发消费提供了线程安全的保证。每个consumer都保留自己的offset,互相之间不干扰,不存在线程安全问题。
消息访问的并行高效性。每个topic中的消息被组织成多个partition,partition均匀分配到集群server中。生产、消费消息的时候,会被路由到指定partition,减少竞争,增加了程序的并行能力。
3)、增加消息系统的可伸缩性。
每个topic中保留的消息可能非常庞大,通过partition将消息切分成多个子消息,并通过负责均衡策略将partition分配到不同server。这样当机器负载满的时候,通过扩容可以将消息重新均匀分配。
4)、保证消息可靠性。
消息消费完成之后不会删除,可以通过重置offset重新消费,保证了消息不会丢失。
5)、灵活的持久化策略。
可以通过指定时间段(如最近一天)来保存消息,节省broker存储空间。
6)、备份高可用性。
消息以partition为单位分配到多个server,并以partition为单位进行备份。备份策略为:1个leader和N个followers,leader接受读写请求,followers被动复制leader。leader和followers会在集群中打散,保证partition高可用。
消息可靠性原理(消息发送模式及ACK响应模式):
A、消息发送模式:
kafka自身提供下面三种方式来保证消息可靠性
1)、At most once
消息至多会被发送一次,但如果产生网络延迟等原因消息就会有丢失。
2)、At least once
消息至少会被发送一次,上面既然有消息会丢失,那么给它加一个消息确认机制(延迟 + 重试)即可解决,但是消息确认阶段也还会出现同样问题,这样消息就有可能被发送两次,导致数据重复。
3)、Exactly once
消息只会被发送一次,保证消息处理幂等,这是我们想要的效果。
但是由于网络或其他意外宕机情况,在生产端、消费端、kafka自身都有可能发生处理失败,导致消息丢失或重复发送,从而影响业务的正确性。所以理论上要完全保证消息的可靠性,必须做下面的处理:
1)、在生产端、消费端,都需要设计可靠性机制来处理
2)、对kafka自身,需要手动管理offset,也是通过设计可靠性机制来处理
目前大部分可靠性机制都是处理这么几个过程(处理前先检查并落地数据、处理时对比校验、正确处理后提交并删除落地数据、失败时重试)。
B、ACK响应模式:
1)、request.required.acks = 0;
只要请求已发送出去,就算是发送完了,不关心有没有写成功。性能很好,如果是对一些日志进行分析,可以承受丢数据的情况,用这个参数,性能会很好。
2)、request.required.acks = 1;
发送一条消息,当leader partition写入成功以后,才算写入成功。不过这种方式也有丢数据的可能。
3)、request.required.acks = -1;
需要ISR列表里面,所有副本都写完以后,这条消息才算写入成功。
10、Controller
控制器组件(Controller),是 Apache Kafka 的核心组件。它的主要作用是在 Apache ZooKeeper 的帮助下管理和协调整个 Kafka 集群。
集群中任意一台 Broker 都能充当控制器的角色,但是,在运行过程中,只能有一个 Broker 成为控制器,行使其管理和协调的职责。
换句话说,每个正常运转的 Kafka 集群,在任意时刻都有且只有一个控制器。
官网上有个名为 activeController 的 JMX 指标,可以帮助我们实时监控控制器的存活状态。这个 JMX 指标非常关键,你在实际运维操作过程中,一定要实时查看这个指标的值。
controller选举规则:启动时,第一个在Zookeeper中成功创建 /controller 节点的 Broker 会被指定为控制器。
controller主要作用如下:
1).主题管理(创建、删除、增加分区)
这里的主题管理,就是指控制器帮助我们完成对 Kafka 主题的创建、删除以及分区增加的操作。
2).分区重分配
管理topic的分区扩容,分区副本的分配,对已有主题分区进行细粒度的分配控制。
分区副本的分配策略有:Range范围策略、RoundRobin轮询策略、Stricky均匀分配(分区的分配和上次分配保持一致)
3).Preferred 领导者选举
Preferred 领导者选举主要是 Kafka 为了避免部分 Broker 负载过重而提供的一种换 Leader 的方案。
4).集群成员管理
管理broker的伸缩,包括broker的上线、下线处理、自动检测新增 Broker、Broker 主动关闭及被动宕机。这种自动检测是依赖于前面提到的 Watch 功能和 ZooKeeper 临时节点组合实现的。
比如,控制器组件会利用Watch 机制检查 ZooKeeper 的 /brokers/ids 节点下的子节点数量变更。目前,当有新 Broker 启动后,它会在 /brokers 下创建专属的 znode 节点。一旦创建完毕
,ZooKeeper 会通过 Watch 机制将消息通知推送给控制器,这样,控制器就能自动地感知到这个变化,进而开启后续的新增 Broker 作业。
侦测 Broker 存活性则是依赖于刚刚提到的另一个机制:临时节点。每个 Broker 启动后,会在 /brokers/ids 下创建一个临时 znode。当 Broker 宕机或主动关闭后,
该 Broker 与 ZooKeeper 的会话结束,这个 znode 会被自动删除。同理,ZooKeeper 的 Watch 机制将这一变更推送给控制器,这样控制器就能知道有 Broker 关闭或宕机了,从而进行“善后”。
5).数据服务
就是向其他 Broker 提供数据服务。控制器上保存了最全的集群元数据信息,其他所有 Broker 会定期接收控制器发来的元数据更新请求,从而更新其内存中的缓存数据。
11、Producer:
消息和数据的生产者,是消息的入口,向kafka的一个topic发布消息的进程/代码/服务,在发送消息之前,会对消息进行分类,即指定Topic。
发送流程
12、Consumer:
消息和数据的消费者,是消息的出口,向kafka集群订阅消息(Topic)并且处理消息的进程/代码/服务。
一般消息系统,consumer存在两种消费模型:
push:优势在于消息实时性高。劣势在于没有考虑consumer消费能力和饱和情况,容易导致producer压垮consumer。
pull:优势在可以控制消费速度和消费数量,保证consumer不会出现饱和。劣势在于当没有数据,会出现空轮询,消耗cpu。
kafka采用pull,并采用可配置化参数保证当存在数据并且数据量达到一定量的时候,consumer端才进行pull操作,否则一直处于block状态。kakfa采用整数值consumer position来记录单个分区的消费状态,并且单个分区单个消息只能被consumer group内的一个consumer消费,维护简单开销小。消费完成,
broker收到确认,position指向下次消费的offset。由于消息不会删除,在完成消费,position更新之后,consumer依然可以重置offset重新消费历史消息。
12、Consumer Group:
消费者组,一个组包含多个consumer,用户可以指定consumer的group,在kafka设计原则中每个消息只能由一个group中的一个consumer消费,如果一个消息想要被多个consumer消费,这些consumer必须在不同的group中,这样可以提高kafka处理消息的吞吐量。若每个消费者都属于不同的group,则所有的消费者都可以消费同一条消息,俗称“消息广播”。
实际上kafka通过配置可以支持两种模式:
a、当所有consumer的consumer group相同时,系统变成队列模式
b、当每个consumer的consumer group都不相同时,系统变成发布订阅
PS:对于一个 Consumer Group 而言,消费者的数量不应该多余分区的数量,因为在一个 Consumer Group 中,每个分区至多只能绑定到一个消费者上,即一个消费者可以消费多个分区,一个分区只能给一个消费者消费
因此,若一个 Consumer Group 中的消费者数量大于分区数量的话,多余的消费者将不会收到任何消息。
14、Coordinator:
协调者,可以理解为kafka消费组的管理员。
确定原则:
1)、同一个group中的消费者会发送一个请求(GroupCoordinatorRequest)给kafak集群
2)、集群会返回一个最小负载的broker节点的id,担任Coordinator角色
本质上通过broker抢占zookeeper临时节点来选择Coordinator,协调者与集群中所有borker都保持长连接。
协调者要执行下面一些主要工作:
1)、组成员管理
2)、offset位移管理
3)、主分区leader选举
协调者通过ISR中的副本,依赖zookeeper来选举主分区。具体有下图几种方式。
4)、消费组rebalance
再平衡,kafka集群发生配置变动时,需要重新分配集群数据,这个过程俗称“再平衡”。
再平衡发生的5种情况:
a、有新的消费者加入Consumer Group。
b、有消费者宕机下线。消费者并不一定需要真正下线,例如遇到长时间的GC、网络延迟导致消费者长时间未向GroupCoordinator发送HeartbeatRequest时,GroupCoordinator会认为消费者下线。
c、有消费者主动退出Consumer Group。
d、Consumer Group订阅的任一Topic出现分区数量的变化。
e、消费者调用unsubscrible()取消对某Topic的订阅。
15、零拷贝:
领拷贝的本质是减少了一次数据的拷贝(无需经过 os cache 到 应用端缓存)。
A、传统数据消费流程:
1)、消费者发送请求给kafka服务
2)、kafka服务去os cache缓存读取数据(缓存没有就去磁盘读取数据)
3)、从磁盘读取了数据到os cache缓存中
4)、os cache复制数据到kafka应用程序中
5)、kafka将数据(复制)发送到socket cache中
6)、socket cache通过网卡传输给消费者
B、借助于Linux 的 sendfile技术实现“零拷贝”
基于“零拷贝”的数据消费流程:
1、消费者发送请求给kafka服务
2、kafka服务去os cache缓存读取数据(缓存没有就去磁盘读取数据)
3、从磁盘读取了数据到os cache缓存中
4、os cache直接将数据发送给网卡
5、通过网卡将数据传输给消费者
部署方式:
官方推荐最小的集群部署需要3个节点,这里节点指的是运行kafka程序的计算机(物理机或虚拟机最好分别建立在不同的物理机上的虚拟机)。
具体有下面三种部署方式:
1)、单机模式
仅在一台机器上部署kafka程序,主要用于研发测试,不能用于生产环境,没办法保证高可用。这种模式推荐直接用kafka自带的Zookeeper即可。
2)、伪集群模式
也部署在一台机器上,但是区分不同的端口(至少需要部署三个端口),只是需要建3份不同的配置文件,分别指定不同的配置文件来启动kafka实例。
这种模式推荐直接用kafka自带的Zookeeper即可。
一般不推荐用于生产环境,因为只有一台机器,同样没办法保证高可用,在机器没问题的情况下可以保证高可用。
3)、集群模式
最少部署3个机器节点,为了优化适应选举一般推荐部署奇数个机器节点,主要用于生产环境,保证高可用。
PS:集群模式推荐应用单独的Zookeeper集群。
优缺点分析
缺点:
- 依赖Zookeeper
承Zookeeper的缺点。
- 消息乱序
kafka只能保证单分区内消息有序,不能保证同一topic不同的分区的消息有序。
- 丢失数据、消息重复
由于网络或其他意外原因,可能导致提交offset发生错误,从而导致数据丢失(生产时)、重复消费(消费时)
- 消息积压
当生产能力大于消费能力时,会导致消息大量积压,从而大量消耗kafka的资源,严重时会撑爆,促使集群宕机。
- 无法弹性扩容来增强kafka的写入能力
因为数据都需要写入partition,每个partition只有一个leader,不能通过扩容提高leader的写入能力,所以该分区受限于其leader所在broken的处理能力。
- 扩容成本高
集群中新增的broker只会处理新topic,如果要分担老topic-partition的压力,需要手动迁移partition,这时会占用大量集群带宽,增加网络延迟;
- 消费者端发生变化会产生rebalance
发生消费者增加、退出、掉线、订阅topic的分区发生变化都会导致rebalance,这样会导致数据重复消费,影响消费速度,占用集群带宽,增加网络延迟;
- partition过多时性能显著下降
因为kafka的元数据依赖Zookeeper保存,过多时Zookeeper压力较大,同时broker上partition过多让磁盘顺序写几乎退化成随机写。
优点:
1、分布式
多分区(partition),多副本(容错扩展性),多订阅者,多生产者,基于zookeeper集中管理元数据及选举、分配分区等。
2、高性能:
高吞吐量,低延迟,高并发,支持批量操作。
3、持久性:
数据支持持久化
4、可扩展性:
支持broker、partition横向扩展,消息自动再平衡,避免单个服务器压力大,支持高可用。
常见应用场景:
- 消息系统
kafka具有高吞吐量,内置的分区,备份冗余分布式的特点,为大规模消息处理提供了一种很好的解决方案,kafka每秒可以生产约20万消息,每秒可以处理55万消息,也就是提供异步处理的通道。
- 应用监控
利用kafka采集应用程序和服务器健康相关的指标,如CPU占有率,IO,内存,连接数,TPS,QPS等,然后将指标信息进行处理,从而构建一个具有监控仪表盘,曲线图等可视化监控系统。可以通过kafka + ELK整合构建应用服务系统。
- 日志聚合
可以将各类系统日志经过kafka的管道汇聚到日志分析平台(例如HDFS)
- 用户行为追踪
为了更好的了解用户行为,操作习惯。将用户的操作轨迹,内容等信息发送到kafka集群上,再通过大数据进行数据分析处理,生成相应的统计报告
- 流处理
需要将已收集的流数据提供给其他计算框架进行处理,可以理解为应用解耦、削峰等。
- 存储系统
应用于高性能、低延迟,支持复制和传播的分布式文件系统。
调优经验:
PS:后续逐渐把实践调优过程补充上来
API应用:
各个平台一般都有相应的操作组件,下面介绍下java和DoNet平台下的访问组件。
java平台:
1、kafka-clients
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.0</version>
</dependency>
2、Spring Boot整合kafka
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>版本</version>
</dependency>
DoNet平台:
1、confluent kafka 推荐使用这个
Nuget: 下载confluent kafka
2、kafka-net
Nuget: 下载kafka-net
3、rdkafka-dotnet
Nuget: 下载rdkafka-dotnet