1、创建topic
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic first --partitions 1 --replication-factor 1
./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic first1
config cleanup.policy包含delete/compact
./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 12 --topic first2 --config delete.retention.ms=86400000 --config cleanup.policy=compact
2、查询topic,进入kafka目录:bin/kafka-topics.sh --list --zookeeper localhost:2181
3、查询topic./bin/kafka-topics.sh --list --bootstrap-server localhost:9092
4、查询topic内容:bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first --from-beginning
5、查看topic 为 first的 详细信息./bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic first
返回的信息如下
Topic:first PartitionCount:1 ReplicationFactor:1 Configs:segment.bytes=1073741824
Topic: first Partition: 0 Leader: 1 Replicas: 1 Isr: 1
6、往topic 为 first 的内部生产消息./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic first
输入以下九条数据(包含空格)
nihao
nihao
你好
quit
exit
实时消费的
7、从topic 为first的内部消费消息,此时打开消费就能收到实时发送的消息了./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first
8、查询当前topic某分区中数据的偏移量信息,注: time为-1时表示最大值,time为-2时表示最小值(可查看到包含9条数据信息)
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic first --time -1 --partitions 0
返回的信息
first:0:9
9、从指定的分区--partition 0,指定偏移量—offset 5,最大消息输出--max-messages 5(当接收够五条就会关闭接收请求,不设置次选项就会一直接受)查询当前topic信息
./bin/kafka-console-consumer.sh? --bootstrap-server localhost:9092 --topic first --partition 0 --offset 5 --max-messages 5 --property? print.timestamp=true
CreateTime:1684392637695 quit
CreateTime:1684392641949 exit
CreateTime:1684392913335
CreateTime:1684392923551 实时消费的