当前位置: 首页>编程语言>正文

kafka 推送策略优化 用kafka实现消息推送平台

概述

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。

是一种高吞吐量的分布式发布订阅消息系统,以容错的方式记录消息流,以文件的方式来存储消息流,它可以处理消费者在网站中的所有动作流数据(包括网页浏览,搜索、功能统计等行为)。

Kafka的目的是通过集群来提供实时的消息消息管道和消息处理。

消息系统一般有两种应用模式:1、队列;2、发布订阅,显然kafka属于第二种。

下面从如下几个方面介绍下其相关理论:

目录

概述

架构

核心知识点:

部署方式:

优缺点分析

缺点:

常见应用场景:

调优经验:

API应用:


架构

kafka 推送策略优化 用kafka实现消息推送平台,kafka 推送策略优化 用kafka实现消息推送平台_big data,第1张

核心知识点:

1Zookeeper

kafka集群依赖zookeeper来保存集群的的元信息,来保证系统的可用性。

2Broker

处理节点,一个Kafka节点就是一个broker,一个或者多个Broker可以组成一个Kafka集群,整个集群可运行在一个或多个(可跨不同数据中心)服务器上。

3Message

消息,每一条发送的消息主体。

4Topic

消息主题,Kafka根据topic对消息进行归类,kafka的数据就保存在topic,发布到Kafka集群的每条消息都需要指定一个topic。

5Partition

topic的分区,对topic从物理上进行分组,每个topic可以有多个分区,其数据被分散存储到多个partition。同一个topic在不同的分区的数据是不重复的,每个partition是一个有序队列,表现形式就是一个一个的文件夹!

在写入数据到topic时:(支持自定义分区,通过覆盖Partitioner下的 configure 和 partition 方法,实现自定义选择分区逻辑)

PS

1)、没有设置key的消息就会被轮训的发送到不同的分区。

2)、设置了key,kafka自带的分区控制,会根据key计算出来一个hash值,这个hash值会对应某一个分区。如果key相同的,那么hash值必然相同,key相同的值,必然是会被发送到同一个分区。

kafka 推送策略优化 用kafka实现消息推送平台,kafka 推送策略优化 用kafka实现消息推送平台_kafka_02,第2张

6Segment

分区的数据段,partition物理上由多个segment组成,每个segment存着message信息。

每个Partition都会对应一个日志目录:{topicName}-{partitionid}/,在目录下面会对应多个日志分段(LogSegment)。LogSegment文件由三部分组成,分别为“.index”文件、“.log”文件、.timeindex文件(早期版本中没有),log文件就实际是存储message的地方,而index和timeindex文件为索引文件,用于检索消息。

索引文件使用稀疏索引的方式,避免对日志每条数据建索引,节省存储空间。kafka就是利用分段 + 索引的方式来解决查找效率的问题。

其中每个.log文件默认大小为1G,文件的命名是以该segment最小offset来命名的,如x.log,x.index,x.timeindex, 其中x取值为0到368795之间。

kafka 推送策略优化 用kafka实现消息推送平台,kafka 推送策略优化 用kafka实现消息推送平台_big data_03,第3张

7Replication:

分区的副本,包括主分区(Leader)和备份分区(Follower),同一个partition可能会有多个Replication,多个副本之间的数据是一样的。

副本的作用是做备份,当主分区(Leader)故障的时候会选择一个备份(Follower)上位,成为Leader。

在kafka中默认副本的最大数量是10个,且副本的数量不能大于Broker的数量,follower和leader绝对是在不同的机器,同一机器对同一个分区也只可能存放一个副本(包括自己)。

kafka 推送策略优化 用kafka实现消息推送平台,kafka 推送策略优化 用kafka实现消息推送平台_big data_04,第4张

8ISR

同步副本集合(in-sync replicas),每个分区的元数据信息都存储在一个ISR集合里。 在该集合里的副本具有如下特点:

a、每一个副本都与leader保持同步

b、才有资格被选举为leader

c、提交到kafka的消息只有该集合中的副本都接收到才认为“已提交”成功。

9offset:

记录偏移量,最大值为64位的long大小(0到2^64 -1),Kafka通过offset保证消息在分区内的顺序,offset的顺序性不跨分区。Kafka0.10以后,使用一个专门的topic __consumer_offset保存offset。__consumer_offset日志留存方式为compact,也就是说,该topic会对key相同的消息进行整理。

kafka提交offset的方式,通过设置enable.auto.commit=true(默认) 若自己控制offset的提交方式,设置enable.auto.commit=false

kafka提供三种方式来配置offset的消费处理,通过设置auto.offset.reset = earliest(默认)

earliest : 当各分区下有已提交的 Offset 时,从“提交的 Offset”开始消费;无提交的Offset 时,从头开始消费;

latest : 当各分区下有已提交的 Offset 时,从提交的 Offset 开始消费;无提交的 Offset时,消费新产生的该分区下的数

none : Topic 各分区都存在已提交的 Offset 时,从 Offset 后开始消费;只要有一个分区不存在已提交的 Offset,则抛出异常。

在kafka内部,将offset的相关信息,保存在__consumer_offset内:

Consumer group组元数据消息

Consumer group位移消息

Tombstone消息

offset的信息内容如下:

kafka 推送策略优化 用kafka实现消息推送平台,kafka 推送策略优化 用kafka实现消息推送平台_big data_05,第5张

Log End Offset:Producer 写入到 Kafka 中的最新一条数据的 offset

High Watermark:已经成功备份到其他 replicas 中的最新一条数据的 offset,也就是说 Log End Offset 与 High Watermark 之间的数据已经写入到该 partition 的 leader 中,但是还未成功备份到其他的 replicas 中

LastCommittedOffset: 上次提交记录

CurrentPosition: 当前位置

offset的具有如下优点:

1)、消费者可以根据需求,灵活指定offset消费。

2)、保证了消息不变性,为并发消费提供了线程安全的保证。每个consumer都保留自己的offset,互相之间不干扰,不存在线程安全问题。

 消息访问的并行高效性。每个topic中的消息被组织成多个partition,partition均匀分配到集群server中。生产、消费消息的时候,会被路由到指定partition,减少竞争,增加了程序的并行能力。

 3)、增加消息系统的可伸缩性。

每个topic中保留的消息可能非常庞大,通过partition将消息切分成多个子消息,并通过负责均衡策略将partition分配到不同server。这样当机器负载满的时候,通过扩容可以将消息重新均匀分配。

 4)、保证消息可靠性。

消息消费完成之后不会删除,可以通过重置offset重新消费,保证了消息不会丢失。

 5)、灵活的持久化策略。

可以通过指定时间段(如最近一天)来保存消息,节省broker存储空间。

 6)、备份高可用性。

消息以partition为单位分配到多个server,并以partition为单位进行备份。备份策略为:1个leader和N个followers,leader接受读写请求,followers被动复制leader。leader和followers会在集群中打散,保证partition高可用。

消息可靠性原理(消息发送模式及ACK响应模式):

A、消息发送模式:

kafka自身提供下面三种方式来保证消息可靠性

1)、At most once

   消息至多会被发送一次,但如果产生网络延迟等原因消息就会有丢失。

2)、At least once

   消息至少会被发送一次,上面既然有消息会丢失,那么给它加一个消息确认机制(延迟 + 重试)即可解决,但是消息确认阶段也还会出现同样问题,这样消息就有可能被发送两次,导致数据重复。

3)、Exactly once

   消息只会被发送一次,保证消息处理幂等,这是我们想要的效果。

但是由于网络或其他意外宕机情况,在生产端、消费端、kafka自身都有可能发生处理失败,导致消息丢失或重复发送,从而影响业务的正确性。所以理论上要完全保证消息的可靠性,必须做下面的处理:

1)、在生产端、消费端,都需要设计可靠性机制来处理

2)、对kafka自身,需要手动管理offset,也是通过设计可靠性机制来处理

目前大部分可靠性机制都是处理这么几个过程(处理前先检查并落地数据、处理时对比校验、正确处理后提交并删除落地数据、失败时重试)。

BACK响应模式:

1)、request.required.acks = 0;

   只要请求已发送出去,就算是发送完了,不关心有没有写成功。性能很好,如果是对一些日志进行分析,可以承受丢数据的情况,用这个参数,性能会很好。

2)、request.required.acks = 1; 

   发送一条消息,当leader partition写入成功以后,才算写入成功。不过这种方式也有丢数据的可能。

3)、request.required.acks = -1;  

   需要ISR列表里面,所有副本都写完以后,这条消息才算写入成功。

10Controller

控制器组件(Controller),是 Apache Kafka 的核心组件。它的主要作用是在 Apache ZooKeeper 的帮助下管理和协调整个 Kafka 集群。

集群中任意一台 Broker 都能充当控制器的角色,但是,在运行过程中,只能有一个 Broker 成为控制器,行使其管理和协调的职责。

换句话说,每个正常运转的 Kafka 集群,在任意时刻都有且只有一个控制器。

官网上有个名为 activeController 的 JMX 指标,可以帮助我们实时监控控制器的存活状态。这个 JMX 指标非常关键,你在实际运维操作过程中,一定要实时查看这个指标的值。

controller选举规则:启动时,第一个在Zookeeper中成功创建 /controller 节点的 Broker 会被指定为控制器。

controller主要作用如下:

1.主题管理(创建、删除、增加分区)

这里的主题管理,就是指控制器帮助我们完成对 Kafka 主题的创建、删除以及分区增加的操作。

2.分区重分配

管理topic的分区扩容,分区副本的分配,对已有主题分区进行细粒度的分配控制。

分区副本的分配策略有:Range范围策略、RoundRobin轮询策略、Stricky均匀分配(分区的分配和上次分配保持一致)

3.Preferred 领导者选举

Preferred 领导者选举主要是 Kafka 为了避免部分 Broker 负载过重而提供的一种换 Leader 的方案。

4.集群成员管理

管理broker的伸缩,包括broker的上线、下线处理、自动检测新增 Broker、Broker 主动关闭及被动宕机。这种自动检测是依赖于前面提到的 Watch 功能和 ZooKeeper 临时节点组合实现的。

比如,控制器组件会利用Watch 机制检查 ZooKeeper 的 /brokers/ids 节点下的子节点数量变更。目前,当有新 Broker 启动后,它会在 /brokers 下创建专属的 znode 节点。一旦创建完毕

,ZooKeeper 会通过 Watch 机制将消息通知推送给控制器,这样,控制器就能自动地感知到这个变化,进而开启后续的新增 Broker 作业。

侦测 Broker 存活性则是依赖于刚刚提到的另一个机制:临时节点。每个 Broker 启动后,会在 /brokers/ids 下创建一个临时 znode。当 Broker 宕机或主动关闭后,

该 Broker 与 ZooKeeper 的会话结束,这个 znode 会被自动删除。同理,ZooKeeper 的 Watch 机制将这一变更推送给控制器,这样控制器就能知道有 Broker 关闭或宕机了,从而进行“善后”。

5.数据服务

就是向其他 Broker 提供数据服务。控制器上保存了最全的集群元数据信息,其他所有 Broker 会定期接收控制器发来的元数据更新请求,从而更新其内存中的缓存数据。

11Producer:

消息和数据的生产者,是消息的入口,向kafka的一个topic发布消息的进程/代码/服务,在发送消息之前,会对消息进行分类,即指定Topic。

发送流程

kafka 推送策略优化 用kafka实现消息推送平台,kafka 推送策略优化 用kafka实现消息推送平台_big data_06,第6张

12Consumer:

消息和数据的消费者,是消息的出口,向kafka集群订阅消息(Topic)并且处理消息的进程/代码/服务。

一般消息系统,consumer存在两种消费模型:

 push优势在于消息实时性高。劣势在于没有考虑consumer消费能力和饱和情况,容易导致producer压垮consumer。

 pull优势在可以控制消费速度和消费数量,保证consumer不会出现饱和。劣势在于当没有数据,会出现空轮询,消耗cpu。

kafka采用pull,并采用可配置化参数保证当存在数据并且数据量达到一定量的时候,consumer端才进行pull操作,否则一直处于block状态。kakfa采用整数值consumer position来记录单个分区的消费状态,并且单个分区单个消息只能被consumer group内的一个consumer消费,维护简单开销小。消费完成,

broker收到确认,position指向下次消费的offset。由于消息不会删除,在完成消费,position更新之后,consumer依然可以重置offset重新消费历史消息。

12Consumer Group:

消费者组,一个组包含多个consumer,用户可以指定consumer的group,在kafka设计原则中每个消息只能由一个group中的一个consumer消费,如果一个消息想要被多个consumer消费,这些consumer必须在不同的group中,这样可以提高kafka处理消息的吞吐量。若每个消费者都属于不同的group,则所有的消费者都可以消费同一条消息,俗称“消息广播”。

实际上kafka通过配置可以支持两种模式:

a、当所有consumer的consumer group相同时,系统变成队列模式

b、当每个consumer的consumer group都不相同时,系统变成发布订阅

PS:对于一个 Consumer Group 而言,消费者的数量不应该多余分区的数量,因为在一个 Consumer Group 中,每个分区至多只能绑定到一个消费者上,即一个消费者可以消费多个分区,一个分区只能给一个消费者消费

  因此,若一个 Consumer Group 中的消费者数量大于分区数量的话,多余的消费者将不会收到任何消息。

14Coordinator

协调者,可以理解为kafka消费组的管理员。

确定原则:

1)、同一个group中的消费者会发送一个请求(GroupCoordinatorRequest)给kafak集群

2)、集群会返回一个最小负载的broker节点的id,担任Coordinator角色

本质上通过broker抢占zookeeper临时节点来选择Coordinator,协调者与集群中所有borker都保持长连接。

协调者要执行下面一些主要工作:

1)、组成员管理

2)、offset位移管理

3)、主分区leader选举

   协调者通过ISR中的副本,依赖zookeeper来选举主分区。具体有下图几种方式。

  

kafka 推送策略优化 用kafka实现消息推送平台,kafka 推送策略优化 用kafka实现消息推送平台_kafka_07,第7张

4)、消费组rebalance

再平衡,kafka集群发生配置变动时,需要重新分配集群数据,这个过程俗称“再平衡”。

再平衡发生的5种情况:

a、有新的消费者加入Consumer Group。

b、有消费者宕机下线。消费者并不一定需要真正下线,例如遇到长时间的GC、网络延迟导致消费者长时间未向GroupCoordinator发送HeartbeatRequest时,GroupCoordinator会认为消费者下线。

c、有消费者主动退出Consumer Group。

d、Consumer Group订阅的任一Topic出现分区数量的变化。

e、消费者调用unsubscrible()取消对某Topic的订阅。

15、零拷贝:

领拷贝的本质是减少了一次数据的拷贝(无需经过 os cache 到 应用端缓存)。

A、传统数据消费流程:

1)、消费者发送请求给kafka服务

2)、kafka服务去os cache缓存读取数据(缓存没有就去磁盘读取数据)

3)、从磁盘读取了数据到os cache缓存中

4)、os cache复制数据到kafka应用程序中

5)、kafka将数据(复制)发送到socket cache中

6)、socket cache通过网卡传输给消费者

kafka 推送策略优化 用kafka实现消息推送平台,kafka 推送策略优化 用kafka实现消息推送平台_分布式_08,第8张

B、借助于Linux sendfile技术实现“零拷贝”

基于“零拷贝”的数据消费流程:

1、消费者发送请求给kafka服务

2、kafka服务去os cache缓存读取数据(缓存没有就去磁盘读取数据)

3、从磁盘读取了数据到os cache缓存中

4、os cache直接将数据发送给网卡

5、通过网卡将数据传输给消费者

kafka 推送策略优化 用kafka实现消息推送平台,kafka 推送策略优化 用kafka实现消息推送平台_分布式_09,第9张

部署方式:

官方推荐最小的集群部署需要3个节点,这里节点指的是运行kafka程序的计算机(物理机或虚拟机最好分别建立在不同的物理机上的虚拟机)。

具体有下面三种部署方式:

1)、单机模式

仅在一台机器上部署kafka程序,主要用于研发测试,不能用于生产环境,没办法保证高可用。这种模式推荐直接用kafka自带的Zookeeper即可。

2)、伪集群模式

也部署在一台机器上,但是区分不同的端口(至少需要部署三个端口),只是需要建3份不同的配置文件,分别指定不同的配置文件来启动kafka实例。

这种模式推荐直接用kafka自带的Zookeeper即可。

一般不推荐用于生产环境,因为只有一台机器,同样没办法保证高可用,在机器没问题的情况下可以保证高可用。

3)、集群模式

最少部署3个机器节点,为了优化适应选举一般推荐部署奇数个机器节点,主要用于生产环境,保证高可用。

PS:集群模式推荐应用单独的Zookeeper集群。

优缺点分析

缺点:

  1. 依赖Zookeeper

承Zookeeper的缺点。  

  1. 消息乱序

     kafka只能保证单分区内消息有序,不能保证同一topic不同的分区的消息有序。

  1. 丢失数据、消息重复

      由于网络或其他意外原因,可能导致提交offset发生错误,从而导致数据丢失(生产时)、重复消费(消费时)

  1. 消息积压

   当生产能力大于消费能力时,会导致消息大量积压,从而大量消耗kafka的资源,严重时会撑爆,促使集群宕机。

  1. 无法弹性扩容来增强kafka的写入能力

   因为数据都需要写入partition,每个partition只有一个leader,不能通过扩容提高leader的写入能力,所以该分区受限于其leader所在broken的处理能力。

  1. 扩容成本高

   集群中新增的broker只会处理新topic,如果要分担老topic-partition的压力,需要手动迁移partition,这时会占用大量集群带宽,增加网络延迟;

  1. 消费者端发生变化会产生rebalance

   发生消费者增加、退出、掉线、订阅topic的分区发生变化都会导致rebalance,这样会导致数据重复消费,影响消费速度,占用集群带宽,增加网络延迟;

  1. partition过多时性能显著下降

   因为kafka的元数据依赖Zookeeper保存,过多时Zookeeper压力较大,同时broker上partition过多让磁盘顺序写几乎退化成随机写。

优点:

1、分布式

    多分区(partition),多副本(容错扩展性),多订阅者,多生产者,基于zookeeper集中管理元数据及选举、分配分区等。

   

2、高性能:

   高吞吐量,低延迟,高并发,支持批量操作。

3、持久性:

   数据支持持久化

4、可扩展性:

   支持broker、partition横向扩展,消息自动再平衡,避免单个服务器压力大,支持高可用。

常见应用场景:

  1. 消息系统

kafka具有高吞吐量,内置的分区,备份冗余分布式的特点,为大规模消息处理提供了一种很好的解决方案,kafka每秒可以生产约20万消息,每秒可以处理55万消息,也就是提供异步处理的通道。

  1. 应用监控

利用kafka采集应用程序和服务器健康相关的指标,如CPU占有率,IO,内存,连接数,TPS,QPS等,然后将指标信息进行处理,从而构建一个具有监控仪表盘,曲线图等可视化监控系统。可以通过kafka + ELK整合构建应用服务系统。

  1. 日志聚合

可以将各类系统日志经过kafka的管道汇聚到日志分析平台(例如HDFS)

  1. 用户行为追踪

为了更好的了解用户行为,操作习惯。将用户的操作轨迹,内容等信息发送到kafka集群上,再通过大数据进行数据分析处理,生成相应的统计报告

  1. 流处理

需要将已收集的流数据提供给其他计算框架进行处理,可以理解为应用解耦、削峰等。

  1. 存储系统

应用于高性能、低延迟,支持复制和传播的分布式文件系统。

调优经验:

PS:后续逐渐把实践调优过程补充上来

API应用:

各个平台一般都有相应的操作组件,下面介绍下java和DoNet平台下的访问组件。

java平台:

1kafka-clients

<dependency>

    <groupId>org.apache.kafka</groupId>

    <artifactId>kafka-clients</artifactId>

    <version>0.10.2.0</version>

</dependency>

2Spring Boot整合kafka

<dependency>

    <groupId>org.springframework.kafka</groupId>

    <artifactId>spring-kafka</artifactId>

    <version>版本</version>

</dependency>

DoNet平台:

1confluent kafka 推荐使用这个

Nuget: 下载confluent kafka

2kafka-net 

Nuget: 下载kafka-net

3rdkafka-dotnet

Nuget: 下载rdkafka-dotnet


https://www.xamrdz.com/lan/5x61937157.html

相关文章: