Kafka的学习与使用
简介
起初是由LinkedIn公司采用Scala语言开发的一个分布式、多分区、多副本且基于zookeeper协调的分布式消息系统,现已捐献给Apache基金会。它是一种高吞吐量的分布式发布订阅消息系统,以可水平扩展和高吞吐率而被广泛使用。目前越来越多的开源分布式处理系统如Cloudera、Apache Storm、Spark、Flink等都支持与Kafka集成。
kafka是用Scala写的,一下两种框架都是使用scala语言写的:
- kafka(消息队列) : 分布式消息队列 内部代码经常用来处理并发的问题 用scala可以大大简化其代码。
- spark(实时的大数据分析框架,和flink功能相似) : 处理多线程场景方便 另外 spark主要用作内存计算 经常要用来实现复杂的算法 利用scala这种函数式编程语言 可以大大简化代码
Kafka与zk的关系
zk为kafka提供一下几点作用**(需要注意,在0.9版本之前offset存储在ZK0.9版本及之后offset存储在kafka本地内部topic自己保存)**
1.Broker注册
Broker是分布式部署并且相互之间相互独立,但是需要有一个注册系统能够将整个集群中的Broker管理起来,此时就使用到了Zookeeper。在Zookeeper上会有一个专门用来进行Broker服务器列表记录的节点:/brokers/ids
每个Broker在启动时,都会到Zookeeper上进行注册,即到/brokers/ids下创建属于自己的节点,如/brokers/ids/[0…N]。
Kafka使用了全局唯一的数字来指代每个Broker服务器,不同的Broker必须使用不同的Broker ID进行注册,创建完节点后,每个Broker就会将自己的IP地址和端口信息记录到该节点中去。其中,Broker创建的节点类型是临时节点,一旦Broker宕机,则对应的临时节点也会被自动删除。
2.Topic注册
在Kafka中,同一个Topic的消息会被分成多个分区并将其分布在多个Broker上,这些分区信息及与Broker的对应关系也都是由Zookeeper在维护,由专门的节点来记录,如:/borkers/topics
Kafka中每个Topic都会以/brokers/topics/[topic]的形式被记录,如/brokers/topics/login和/brokers/topics/search等。Broker服务器启动后,会到对应Topic节点(/brokers/topics)上注册自己的Broker ID并写入针对该Topic的分区总数,如/brokers/topics/login/3->2,这个节点表示Broker ID为3的一个Broker服务器,对于"login"这个Topic的消息,提供了2个分区进行消息存储,同样,这个分区节点也是临时节点。
Kafka的架构图解析
Kafka安装步骤
kafka是依赖zookeeper的,所以在安装前你需要安装并启动zookeeper。
- 上传文件
这里我使用的kafka版本比较老,因为一般公司使用的都不会是最新的版本。在linux中的/opt
文件夹中新增kafka文件夹,并把安装包上传,可以使用xftp进行上传。
- 解压
使用命令tar -zxvf kafka_2.11-0.11.0.0.tgz
解压kafka的文件,解压后使用mv 命令进行重命名,把解压后的文件夹重命名为kafka,这样方便以后输入。
- 修改配置
进入kafka文件夹的config目录。可以看到目录结构如下
首先修改 server.properties
文件,命令vi server.properties
先在kafka文件夹下新建一个logs的文件夹,下面需要使用
这里主要修改四处,在高版本中已经没有删除 topic 功能这个配置可以自行忽略
#broker 的全局唯一编号,不能重复
broker.id=0
#删除 topic 功能使能
delete.topic.enable=true
#kafka 运行日志存放的路径
log.dirs=/opt/kafka/logs
#zk的地址,这里改成自己的zk地址就好
zookeeper.connect=localhost:2181
这里需要注意的点是默认kafka消息默认是存储168小时也就是7天,这个也是在这个文件中配置的。
然后配置环境变量
使用命令:vi /etc/profile
,在文件的最后配置kafka
#KAFKA_HOME
export KAFKA_HOME=/opt/kafka
export PATH=$PATH:$KAFKA_HOME/bin
最后如果是集群安装还要分发安装包,这里暂时不需要。分发使用xsync kafka/
,分发后记得修改brokerid,在集群中需要唯一。
- 启动
启动的时候要带上我们的配置文件,且以守护进程的方式启动
bin/kafka-server-start.sh -daemon config/server.properties
如果是集群可以自己写群起脚本。
Kafka 命令行操作
首先进入kafka安装目录的bin目录。
- 查看当前服务器中的所有 topic
kafka-topics.sh --zookeeper 127.0.0.1:2181 --list
- 创建 topic
kafka-topics.sh --zookeeper 127.0.0.1:12181 --create --replication-factor 3 --partitions 1 --topic first
选项说明:
–topic 定义 topic 名
–replication-factor 定义副本数
–partitions 定义分区数 - 删除 topic
kafka-topics.sh --zookeeper 127.0.0.1:12181 --delete --topic first
需要 server.properties 中设置 delete.topic.enable=true 否则只是标记删除。 - 发送消息
kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic first
输入命令后就可以进行发送消息了 - 消费消息(多种方式)
这是之前老版本的方式,连接zk,现在不推荐kafka-console-consumer.sh --zookeeper 127.0.0.1:12181 --topic first
新版本的关于offset等数据已经存储在kafka本地了,所以使用这个命令kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic first
–from-beginning:会把主题中以往所有的数据都读取出来。 - 查看某个 Topic 的详情
kafka-topics.sh --zookeeper 127.0.0.1:12181 --describe --topic first
- 修改分区数
kafka-topics.sh --zookeeper127.0.0.1:12181 --alter --topic first --partitions 6
127.0.0.1:12181 --describe --topic first`
- 修改分区数
kafka-topics.sh --zookeeper127.0.0.1:12181 --alter --topic first --partitions 6