kafka的命令行的管理使用
- 1、创建topic
使用Zookeeper保存broker信息,每个topic有多少个分区,每个分区在哪些机器上面等等,有多少个副本(副本因子:是指整个数据保存积分,如果是2,就等价于保存存2份 )等等
kafka-topics.sh
-
node01执行以下命令创建topic
cd /kkb/install/kafka_2.11-1.1.0/ # 指定分区数、指定副本数、指定topic的名称、指定Zookeeper的连接地址 bin/kafka-topics.sh --create --partitions 3 --replication-factor 2 --topic test --zookeeper node01:2181,node02:2181,node03:2181
2、查询所有的topic
__consumer_offsets:默认自带的topic 主要是用于存储消费的offset的值到了哪里了
-
kafka-topics.sh
cd /kkb/install/kafka_2.11-1.1.0/ bin/kafka-topics.sh --list --zookeeper node01:2181,node02:2181,node03:2181
-
3、查看topic的描述信息
- kafka-topics.sh
cd /kkb/install/kafka_2.11-1.1.0/ bin/kafka-topics.sh --describe --topic test --zookeeper node01:2181,node02:2181,node03:2181
# 执行结果 # 1,topic的名称、分区的个数、副本因子的个数、一些其他的配置(Isr列表) Topic:test PartitionCount:3 ReplicationFactor:2 Configs: # 2,紧接着是对应的显示结果 # topic的名称、对应的分区(从0开始),主分区是谁,副本因子的节点,Isr列表的节点 Topic: test Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2 Topic: test Partition: 1 Leader: 0 Replicas: 2,0 Isr: 0,2 Topic: test Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1
- kafka-topics.sh
-
4、删除topic
- kafka-topics.sh
cd /kkb/install/kafka_2.11-1.1.0/ bin/kafka-topics.sh --delete --topic test --zookeeper node01:2181,node02:2181,node03:2181
- kafka-topics.sh
-
5、node01模拟生产者写入数据到topic中
node01执行以下命令,模拟生产者写入数据到kafka当中去cd /kkb/install/kafka_2.11-1.1.0/ bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic test
-
6、node01模拟消费者拉取topic中的数据
node02执行以下命令,模拟消费者消费kafka当中的数据# 方式1:使用Zookeeper保存偏移量的方式 cd /kkb/install/kafka_2.11-1.1.0/ bin/kafka-console-consumer.sh --zookeeper node01:2181,node02:2181,node03:2181 --topic test --from-beginning # 方式2:使用kafka内置的topic (__consumer_offsets) 来保存消息的偏移量 cd /kkb/install/kafka_2.11-1.1.0/ bin/kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --topic test --from-beginning
-
7、任意kafka服务器执行以下命令可以增加topic分区数
cd /kkb/install/kafka_2.11-1.1.0 # 这里的zkhost是可以任意一个Zookeeper的主机名,或者使用多个 bin/kafka-topics.sh --zookeeper zkhost:port --alter --topic topicName --partitions 8
补充讲解:
- 生产者生产的数据量非常大,总全部分区存的数据量非常多,造成消费者数据积压,怎么解决?
答:可以通过增加分区的方式是是实现,数据写入到不同的分区中,那么消费者多启动几个线程从不同的分区中拉取数据;
注意:不能随意的减少的分区,因为分区中保存这对应的数据,如果减少分区,那么分区中的数据处理如何处理将会是一个很大的问题。