当前位置: 首页>后端>正文

kafka的命令行的管理使用

kafka的命令行的管理使用

  • 1、创建topic

使用Zookeeper保存broker信息,每个topic有多少个分区,每个分区在哪些机器上面等等,有多少个副本(副本因子:是指整个数据保存积分,如果是2,就等价于保存存2份 )等等

  • kafka-topics.sh

  • node01执行以下命令创建topic

    cd /kkb/install/kafka_2.11-1.1.0/
    # 指定分区数、指定副本数、指定topic的名称、指定Zookeeper的连接地址
    bin/kafka-topics.sh --create --partitions 3 --replication-factor 2 --topic test --zookeeper node01:2181,node02:2181,node03:2181
    
  • 2、查询所有的topic

__consumer_offsets:默认自带的topic 主要是用于存储消费的offset的值到了哪里了

  • kafka-topics.sh

    cd /kkb/install/kafka_2.11-1.1.0/
    bin/kafka-topics.sh --list --zookeeper node01:2181,node02:2181,node03:2181 
    
  • 3、查看topic的描述信息

    • kafka-topics.sh
      cd /kkb/install/kafka_2.11-1.1.0/
      bin/kafka-topics.sh --describe --topic test --zookeeper node01:2181,node02:2181,node03:2181  
      
      # 执行结果
      # 1,topic的名称、分区的个数、副本因子的个数、一些其他的配置(Isr列表)
      Topic:test      PartitionCount:3        ReplicationFactor:2     Configs:
      # 2,紧接着是对应的显示结果
      # topic的名称、对应的分区(从0开始),主分区是谁,副本因子的节点,Isr列表的节点
              Topic: test     Partition: 0    Leader: 1       Replicas: 1,2   Isr: 1,2
              Topic: test     Partition: 1    Leader: 0       Replicas: 2,0   Isr: 0,2
              Topic: test     Partition: 2    Leader: 0       Replicas: 0,1   Isr: 0,1
      
  • 4、删除topic

    • kafka-topics.sh
      cd /kkb/install/kafka_2.11-1.1.0/
      bin/kafka-topics.sh --delete --topic test --zookeeper node01:2181,node02:2181,node03:2181 
      
  • 5、node01模拟生产者写入数据到topic中
    node01执行以下命令,模拟生产者写入数据到kafka当中去

    cd /kkb/install/kafka_2.11-1.1.0/
    bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic test 
    
  • 6、node01模拟消费者拉取topic中的数据
    node02执行以下命令,模拟消费者消费kafka当中的数据

    # 方式1:使用Zookeeper保存偏移量的方式
    cd /kkb/install/kafka_2.11-1.1.0/
    bin/kafka-console-consumer.sh --zookeeper node01:2181,node02:2181,node03:2181 --topic test --from-beginning
    
    
    # 方式2:使用kafka内置的topic (__consumer_offsets) 来保存消息的偏移量
    cd /kkb/install/kafka_2.11-1.1.0/
    bin/kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --topic test --from-beginning
    
  • 7、任意kafka服务器执行以下命令可以增加topic分区数

    cd /kkb/install/kafka_2.11-1.1.0
    # 这里的zkhost是可以任意一个Zookeeper的主机名,或者使用多个
    bin/kafka-topics.sh --zookeeper zkhost:port --alter --topic topicName --partitions 8
    

补充讲解:

  • 生产者生产的数据量非常大,总全部分区存的数据量非常多,造成消费者数据积压,怎么解决?
    答:可以通过增加分区的方式是是实现,数据写入到不同的分区中,那么消费者多启动几个线程从不同的分区中拉取数据;
    注意:不能随意的减少的分区,因为分区中保存这对应的数据,如果减少分区,那么分区中的数据处理如何处理将会是一个很大的问题。

https://www.xamrdz.com/backend/3nt1934916.html

相关文章: