问题导览
Kafka 是采用 Scala 语言开发的一个分布式、多分区、多副本且基于 zookeeper 协调的分布式发布与订阅消息系统。
- Kafka特征?
- Kafka如何实现高吞吐率?
- 为什么Kafka速度那么快?
- 如何保证数据的有序性?
- kafka数据的数据丢失问题,以及如何保证数据不丢失?
- Kafka对于不同的重复消费的问题?
详细剖析
2.kafka设计目标
Kafka 是采用 Scala 语言和Java语言开发的一个分布式、多分区、多副本且基于 zookeeper 协调的分布式发布与订阅消息系统。
(1),Kafka特征?
- 持久性、可靠性: 每个主题(topic) 包含多个分区(partition),主题中的分区可以分布在不同的主机(broker)中,即使对TB级以上数据也能保证常数时间复杂度的访问性能。
- 高吞吐率、低延时:kakfa 最大的特点就是收发消息非常快,kafka 每秒可以处理几十万条消息,它的最低延迟只有几毫秒。
- 高伸缩性: 每个主题(topic) 包含多个分区(partition),主题中的分区可以分布在不同的主机(broker)中,支持分布式消费,同时保证每个Partition内的消息顺序传输。
- 高并发:支持数千个客户端同时读写;支持在线水平扩展。
(2).如何实现高吞吐率?
- 存储大:其将所有的消息都写入了低速大容量的硬盘中。
- 顺序读写:Kafka将消息写入了分区 Partition 中,在分区中的消息又是顺序读写的。顺序读写+机械硬盘读写的速度是大于固态硬盘的。
- 零拷贝:在内核层直接将文件内容传送给网络 socket,从而避免了应用层的数据拷贝,减小 IO 开销;生产者和消费者对于Kafka 中的消息都是采用的SendFile的方式实现的零拷贝。
- 批量发送:Kafka允许批量发送和接收数据
-
消息压缩:Kafka允许对消息集合进行压缩,并且支持的 压缩方式比较只有
GZip
和Snappy
(3),为什么Kafka速度那么快??
参考资料:
[1,Kafka如何在千万级别时优化JVM GC问题?](D:\BigData\Kafka\KafkaSplits\Kafka如何在千万级别时优化JVM GC问题?.pdf)
Kafka是大数据领域无处不在的消息中间件,目前广泛使用在企业内部的实时数据管道,并帮助企业构建自己的流计算应用程序。
Kafka虽然是基于磁盘做的数据存储,但却具有高性能、高吞吐、低延时的特点,其吞吐量动辄几万、几十上百万,这其中的原由值得我们一探究竟。
1), 顺序读写
- 磁盘顺序读写性能要远高于固态硬盘、甚至内存的随机读写。
引用一组Kafka官方给出的测试数据(Raid-5,7200rpm):
Sequence I/O: 600MB/s
Random I/O: 100KB/s
众所周知Kafka是将消息记录持久化到本地磁盘中的,一般人会认为磁盘读写性能差,可能会对Kafka性能如何保证提出质疑。实际上不管是内存还是磁盘,快或慢关键在于寻址的方式,磁盘分为顺序读写与随机读写,内存也一样分为顺序读写与随机读写。基于磁盘的随机读写确实很慢,但磁盘的顺序读写性能却很高,一般而言要高出磁盘随机读写三个数量级,一些情况下磁盘顺序读写性能甚至要高于内存随机读写。
磁盘的顺序读写是磁盘使用模式中最有规律的,并且操作系统也对这种模式做了大量优化,Kafka就是使用了磁盘顺序读写来提升的性能。Kafka的message是不断追加到本地磁盘文件末尾的,而不是随机的写入,这使得Kafka写入吞吐量得到了显著提升。
2), Page Cache(页缓存)
为了优化读写性能,Kafka利用了操作系统本身的
Page Cache(页缓存)
,就是利用操作系统自身的内存而不是JVM空间内存。这样做的好处有:
- (1)避免Object消耗:如果是使用Java堆,Java对象的内存消耗比较大,通常是所存储数据的两倍甚至更多。
- (2)避免GC问题:随着JVM中数据不断增多,垃圾回收将会变得复杂与缓慢,使用系统缓存就不会存在GC问题。
Kafka重度依赖底层操作系统提供的PageCache功能。
当上层有写操作时,操作系统只是将数据写入 PageCache,同时标记Page属性为Dirty。
当读操作发生时,先从PageCache中查找,如果发生缺页才进行磁盘调度,最终返回需要的数据。
实际上PageCache是把尽可能多的空闲内存都当做了磁盘缓存来使用。同时如果有其他进程申请内存,回收PageCache的代价又很小,所以现代的OS都支持PageCache。使用PageCache功能同时可以避免在JVM内部缓存数据,JVM虽然可以为我们提供了强大的GC能力,但是同时也引入了一些问题,所以并不适用于Kafka的设计。
- 如果在Heap(堆)内存管理缓存,JVM的GC线程会频繁扫描Heap空间,带来不必要的开销。如果Heap过大,执行一次Full GC对系统的可用性来说将是极大的挑战。
- 所有在在JVM内的对象都不免带有一个 Object Overhead(千万不可小视),内存的有效空间利用率会因此降低。
- 所有的In-Process Cache在OS中都有一份同样的PageCache。所以通过将缓存只放在PageCache,可以至少让可用缓存空间翻倍。
- 如果Kafka重启,所有的In-Process Cache都会失效,而OS管理的PageCache依然可以继续使用。
3), 零拷贝(Zero-Copy)
Kafka中的零拷贝的实现是采用的 Sendfile 的方式实现的
- 零拷贝并不是不需要拷贝,而是减少不必要的拷贝次数。通常是说在IO读写过程中,比如下图中展示便是传统网络的I/O操作流程:
- OS 从硬盘把数据读到内核区的PageCache。
- 用户进程把数据从内核区Copy到用户区。
- 然后用户进程再把数据写入到Socket,数据流入内核区的Socket Buffer上。
- OS 再把数据从Buffer中Copy到网卡的Buffer上,这样完成一次发送。
整个过程共经历两次Context Switch,四次System Call。同一份数据在内核Buffer与用户Buffer之间重复拷贝,效率低下。其中2、3两步没有必要,完全可以直接在内核区完成数据拷贝。这也正是Sendfile所解决的问题,经过Sendfile优化后,整个I/O过程就变成了下面这个样子。
通过以上的介绍不难看出,Kafka的设计初衷是尽一切努力在内存中完成数据交换,无论是对外作为一整个消息系统,或是内部同底层操作系统的交互。
内核层直接将文件内容传送给网络 socket,避免应用层数据拷贝,减小 IO 开销;
如果Producer和Consumer之间生产和消费进度上配合得当,完全可以实现数据交换零I/O。
(4),如何保证数据的有序性?
方式1,可以强制只有一个分区,那么在一个分区中就是有序的,那么整体就是有序的。但是只有一个分区kafka的吞吐量就不高了。
-
方式2,从业务的角度考虑:
- 两种角度的分析:
- 生产者:可以通过自定义分区的策略(
org.apache.kafka.clients.Partitioner
)将满足指定规则的数据存储在同一个分区中,从而实现有序;
- 消费者:另外对应的如果是在多线程消费的场景下,指定是,每个线程分别消费对应的一个分区中的数据,这样就能保证顺序性。
- 生产者:可以通过自定义分区的策略(
- 实际应用场景的举例:
- (1),比如,电商的场景:同一个订单的不同状态的消息存储在同一个分区中
- (2),或者,用户的场景:同一个登录的用户的各类操作存储在同一个分区中
- 两种角度的分析:
(5).kafka数据的数据丢失问题,以及如何保证数据不丢失?
1).数据可能出现丢失的情况?
-
acks=0
的时候: 使用异步模式的时候,该模式下 kafka 无法保证消息,有可能会丢。 -
acks=1
的时候: (只保证写入 leader 成功),如果刚好 leader 挂了,则数据会丢失。
2).Broker 如何保证数据不丢失?
-
acks=-1(或者acks=all)
:在Leader写入成功之后,将信息同步到副本列表中Followers之后再发送成功的Ack。 -
min.insync.replicas=2
:消息至少要被同步到多少副本才算成功。 -
retries="一个合理值"
:kafka 发送数据失败后的重试值(如果总是失败,则可能是网络原因)。 -
unclean.leader.election.enable=false
:默认表示关闭 unclean leader 选举,即不允许 非副本列表 中的副本被选举为 leader,以避免数据丢失。
3).Consumer 如何保证数据不丢失?
如果在消息处理完成之前就提交了offset,那么就有可能造成数据的丢失;那么我么可以将自动提交关闭,然后在处理完成之后选择手动提交;
enable.auto.commit=false
: 关闭自动提交 Offset,进行手动维护Offset
(6).Kafka对于不同的重复消费的问题?
1).同一个 Consumer 重复消费
比如当某个数据刚好消费完成,但是正准备提交Offset 的时候,消费时间超时,则Broker认为该条消息未消费成功,这时就会产生消费重复的问题。
解决方案:适当延长Offse自动提交的时间。
2).不同的 Consumer 重复消费
当 Consumer 消费了信息,但是还没有提交Offset 时宕机,那么这些已经被消费过的信息会被重复消费。
解决方案:将自动提交更改为手动提交。
3).也可以从架构设计上防止重复消费
- 所有的消息都设置和保存一个唯一的 uuid ,在消费信息的时候,先去持久化系统中查询这个消息是否被消费过,如果没有则进行消费,若消费了则直接丢弃就行。
- 利用数据库的唯一键约束等等。