producer消息发送原理
Producer 将消息发送给 Broker ,并形成最终的可供 Consumer 消费的 Log。
- Producer 先从 Zookeeper 中找到要写入 Partition 的 Leader。
- Producer 将该消息push(推送)给 Leader。
- Leader 将消息先写入本地的Log,并通过ISR列表(副本同步列表)通知到相关的Followers。
-
Followers
从Leader中 Pull(拉取)信息,写入Log后向Leader
发送 Ack。 - Leader收到副本列表中的Followers的Acks之后,增加可以消费数据的最高偏移量(Offset),然后向Producer发送Ack,表示消息写入成功。
producer核心参数
(1), 常见异常处理
- 不管是异步还是同步,都可能让你处理异常,常见的异常如下:
- 1)LeaderNotAvailableException:这个就是如果某台机器挂了,此时leader副本不可用,会导致你写入失败,要等待其他follower副本切换为leader副本之后,才能继续写入,此时可以重试发送即可。如果说你平时重启kafka的broker进程,肯定会导致leader切换,一定会导致你写入报错,是LeaderNotAvailableException
- 2)NotControllerException:这个也是同理,如果说Controller所在Broker挂了,那么此时会有问题,需要等待Controller重新选举,此时也是一样就是重试即可
- 3)NetworkException:网络异常,重试即可
我们之前配置了一个参数,retries
,他会自动重试的,但是如果重试几次之后还是不行,就会提供Exception给我们来处理了。- retries:重新发送数据的次数,默认为0,表示不重试
- etry.backoff.ms:两次重试之间的时间间隔,默认为100ms
(2), 提升消息吞吐量
-
buffer.memory
- 设置发送消息的缓冲区,默认值是33554432,就是32MB
如果发送消息出去的速度小于写入消息进去的速度,就会导致缓冲区写满,此时生产消息就会阻塞住,所以说这里就应该多做一些压测,尽可能保证说这块缓冲区不会被写满导致生产行为被阻塞住
-
compression.type
- producer用于压缩数据的压缩类型。默认是none表示无压缩。可以指定gzip、snappy
- 压缩最好用于批量处理,批量处理消息越多,压缩性能越好。
-
batch.size
- producer将试图批处理消息记录,以减少请求次数。这将改善client与server之间的性能。
- 默认是16384Bytes,即16kB,也就是一个batch满了16kB就发送出去
如果batch太小,会导致频繁网络请求,吞吐量下降;如果batch太大,会导致一条消息需要等待很久才能被发送出去,而且会让内存缓冲区有很大压力,过多数据缓冲在内存里。
-
linger.ms
- 这个值默认是0,就是消息必须立即被发送
一般设置一个100毫秒之类的,这样的话就是说,这个消息被发送出去后进入一个batch,如果100毫秒内,这个batch满了16kB,自然就会发送出去。 但是如果100毫秒内,batch没满,那么也必须把消息发送出去了,不能让消息的发送延迟时间太长,也避免给内存造成过大的一个压力。
(3), 请求超时
-
max.request.size
- 这个参数用来控制发送出去的消息的大小,默认是1048576字节,也就1mb
- 这个一般太小了,很多消息可能都会超过1mb的大小,所以需要自己优化调整,把他设置更大一些(企业一般设置成10M)
-
request.timeout.ms
- 这个就是说发送一个请求出去之后,他有一个超时的时间限制,默认是30秒
- 如果30秒都收不到响应,那么就会认为异常,会抛出一个TimeoutException来让我们进行处理
(4), ACK参数
- Producer向Broker中发送消息,在Producer中提供了消息确认机制,也就是说我们可以通过配置来决定消息发送到对应分区的几个副本才算成功。
- 这个消息确认机制可以通过
request.required.acks
参数的值进行设置,不同的值可以在不同程度上保证数据的可靠性。- 配置示例:
properties.put("acks","0")
- acks参数,其实是控制发送出去的消息的持久化机制的。
-
==acks=0==
- 此时表示 “异步发送”:Producer 向 Kafka 发送信息而不需要 Kafka 反馈成功的Ack。这种方式效率最高,但是可靠性最低。
producer根本不管写入broker的消息到底成功没有,发送一条消息出去,立马就可以发送下一条消息,这是吞吐量最高的方式,但是可能消息都丢失了。 你也不知道的,但是说实话,你如果真是那种实时数据流分析的业务和场景,就是仅仅分析一些数据报表,丢几条数据影响不大的。会让你的发送吞吐量会提升很多,你发送弄一个batch出去,不需要等待人家leader写成功,直接就可以发送下一个batch了,吞吐量很大的,哪怕是偶尔丢一点点数据,实时报表,折线图,饼图。
-
==acks=1==
- 只要leader写入成功,就认为消息成功了。
Producer 将信息发送给 Kafka,Broker 的 Partition Leader 在收到信息之后会马上发送成功的Ack,而不需要等待副本列表中Follows同步完成。
此时如果 Producer 没有收到 Kafka的返回的Ack,那么可以确定是消息是的发送失败的;但是即时是返回了Ack,此时Leader得到的结果是成功写入,但是如果此时还没来得及把数据同步到 副本列表(ISR列表) 中的Followers就挂了,那么此时会出现消息丢失的情况。
默认给这个其实就比较合适的,还是可能会导致数据丢失的,如果刚写入leader,leader就挂了,此时数据必然丢了,其他的follower没收到数据副本,变成leader.
- 只要leader写入成功,就认为消息成功了。
-
==acks=all 或者 acks=-1==
- 需要Leader和Follower都将信息写入成功才可以。
Producer 发送消息给 Kafka,Kafka在收到信息之后要等到副本列表中所有的 Followers都同步完消息之后,才向生产者发送成功的Ack。
此时如果Producer收到Kafka 返回的Ack,则认为消息是发送成功的;但是如果一直没有收到Kafka的Ack,则认为消息发送失败,会自动重发消息。该种方式会出现重复接收的情况。
这种方式数据最安全,但是性能最差。
- 需要Leader和Follower都将信息写入成功才可以。
-
==如果要想保证数据不丢失,得如下设置==
(1)min.insync.replicas = 2 ISR里必须有2个副本,一个leader和一个follower,最最起码的一个,不能只有一个leader存活,连一个follower都没有了。 (2)acks = -1 每次写成功一定是leader和follower都成功才可以算做成功,这样leader挂了,follower上是一定有这条数据,不会丢失。 (3)retries = Integer.MAX_VALUE 无限重试,如果上述两个条件不满足,写入一直失败,就会无限次重试,保证说数据必须成功的发送给两个副本,如果做不到,就不停的重试。 除非是面向金融级的场景,面向企业大客户,或者是广告计费,跟钱的计算相关的场景下,才会通过严格配置保证数据绝对不丢失
-
(5), 重试乱序
- max.in.flight.requests.per.connection
- 每个网络连接已经发送但还没有收到服务端响应的请求个数最大值
消息重试是可能导致消息的乱序的,因为可能排在你后面的消息都发送出去了,你现在收到回调失败了才在重试,此时消息就会乱序,所以可以使用“max.in.flight.requests.per.connection”参数设置为1,这样可以保证producer必须把一个请求发送的数据发送成功了再发送后面的请求。避免数据出现乱序
broker核心参数
server.properties配置文件核心参数
【broker.id】
每个broker都必须自己设置的一个唯一id
【log.dirs】
这个极为重要,kafka的所有数据就是写入这个目录下的磁盘文件中的,如果说机器上有多块物理硬盘,那么可以把多个目录挂载到不同的物理硬盘上,然后这里可以设置多个目录,这样kafka可以数据分散到多块物理硬盘,多个硬盘的磁头可以并行写,这样可以提升吞吐量。
【zookeeper.connect】
连接kafka底层的zookeeper集群的
【Listeners】
broker监听客户端发起请求的端口号,默认是9092
【unclean.leader.election.enable】
默认是false,意思就是只能选举ISR列表里的follower成为新的leader,1.0版本后才设为false,之前都是true,允许非ISR列表的follower选举为新的leader
【delete.topic.enable】
默认true,允许删除topic
【log.retention.hours】
可以设置一下,要保留数据多少个小时(默认168小时),这个就是底层的磁盘文件,默认保留7天的数据,根据自己的需求来就行了