kafka生态圈
Apache Kafka
环境
kafka:3.2.1
os:CentOS Linux/7
JDK:1.8.0_291
注意,kafka 3.3.1 要求本地环境安装 Java 8 及以上版本,本文档安装的java11,kafka4.0将会启用java8
安装kafka
-
下载kafka主安装包
官网下载地址:https://kafka.apache.org/downloads
解压
sudo tar zxvf kafka.tgz -C /opt/module/
- 修改目录名称
sudo mv /opt/module/kafka_2.12-3.3.1 /opt/module/kafka
- 在/opt/module/kafka目录下创建logs文件夹,在配置汇总修改日志
mkdir logs
- 修改配置文件
cd config/
vi server.properties
参考配置文件如下
#broker的全局唯一编号,不能重复,集群的话每个虚拟机不重复
broker.id=0
#删除topic功能使能
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的现成数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka运行日志存放的路径
log.dirs=/opt/module/kafka/logs
#topic在当前broker上的分区个数
num.partitions=1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
#配置连接Zookeeper集群地址
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka
接下来,我们启动 broker 的部分,需要按照顺序依次启动 zookeeper 和 kafka server。
- 后台启动 zookeeper(后续版本可能不再需要 zookeeper)
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties #kafka目录执行
- 再后台启动 kafka server
bin/kafka-server-start.sh -daemon config/server.properties #kafka目录执
-
创建topic
producer 发布的 event 会持久化在对应的 topic 中,才能路由给正确的 consumer。所以,在读写 event 之前,我们需要先创建 topic。
执行以下命令:
# 创建topic
bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
# 查询topic
bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
# 删除topic
bin/kafka-topics.sh --delete --topic quickstart-events #需要server.properties中设置delete.topic.enable=true否则只是标记删除或者直接重启
#修改topic
partition:分区索引,注:分区数修改的只能+不能-
leader:1代表kafka配置中设置的broker.id
Replicas:1 代表开启的副本数【kafka服务】对应的broker.id
- 读写event
接下来我们用 kafka 自带的 console-consumer 和 console-producer 读写 event。
使用 console-producer 写 event 时,我们每输入一行并回车,就会向 topic 写入一个 event。
bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
ctrl+c
退出接着,我们使用 console-consumer 读 event。可以看到,刚写的 event 被读到了。
#--from-beginning代表从开始读取,即开启之前的也会读取
bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
我们可以在两个会话中保持 producer 和 consumer 不退出,当我们在 producer 写入 event 时, consumer 将实时读取到。
- 关闭kafka
bin/kafka-server-stop.sh
Kafka命令
kafka-topics.sh --zookeeper hadoop102:2181 --list
docker中安装
可以参考docker仓管库kafka
- 创建一个网络
app-tier:网络名称
–driver:网络类型为bridge
docker network create app-tier --driver bridge
- 安装zookeeper
kafka依赖zookeeper所以先安装zookeeper
-p:设置映射端口(默认2181)
-d:后台启动
docker run -d --name zookeeper-server \
--network app-tier \
-e ALLOW_ANONYMOUS_LOGIN=yes \
bitnami/zookeeper:latest
- 安装Kafka
–name:容器名称
-p:设置映射端口(默认9092 )
-d:后台启动
ALLOW_PLAINTEXT_LISTENER任何人可以访问
KAFKA_CFG_ZOOKEEPER_CONNECT链接的zookeeper
KAFKA_ADVERTISED_HOST_NAME当前主机IP或地址(重点:如果是服务器部署则配服务器IP或域名否则客户端监听消息会报地址错误)
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.0.101:9092 \
docker run -d --name kafka-server \
--network app-tier \
-p 9092:9092 \
-e ALLOW_PLAINTEXT_LISTENER=yes \
-e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-server:2181 \
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.172.131:9092 \
bitnami/kafka:latest
- 进入kafka容器就可以执行操作,目录在
/opt/bitnami/kafka
- kafka-map图形化管理工具
访问地址:http://服务器IP:9002/
DEFAULT_USERNAME:默认账号admin
DEFAULT_PASSWORD:默认密码admin
Git 地址:https://github.com/dushixiang/kafka-map/blob/master/README-zh_CN.md
docker run -d --name kafka-map \
--network app-tier \
-p 9020:8080 \
-v /home/yzj/kafka-map/data:/usr/local/kafka-map/data \
-e DEFAULT_USERNAME=admin \
-e DEFAULT_PASSWORD=admin \
--restart always dushixiang/kafka-map:latest
kafka集群
- kafka.yml
version: "3.6"
services:
zookeeper:
container_name: zookeeper
image: 'bitnami/zookeeper:3.8.0'
user: root
ports:
- '2181:2181'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
networks:
netkafka:
ipv4_address: 172.23.0.10
kafka1:
container_name: kafka1
image: 'bitnami/kafka:3.3.1'
user: root
depends_on:
- zookeeper
ports:
- '19092:9092'
environment:
# 允许使用Kraft
- KAFKA_ENABLE_KRAFT=yes
- KAFKA_ZOOKEEPER_CONNECT=192.168.1.21:2181
- KAFKA_CFG_PROCESS_ROLES=broker,controller
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
# 定义kafka服务端socket监听端口(Docker内部的ip地址和端口)
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
# 定义安全协议
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
#定义外网访问地址(宿主机ip地址和端口)
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.1.21:19092
- KAFKA_BROKER_ID=1
- KAFKA_KRAFT_CLUSTER_ID=iZWRiSqjZAlYwlKEqHFQWI
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@172.23.0.11:9093,2@172.23.0.12:9093,3@172.23.0.13:9093
- ALLOW_PLAINTEXT_LISTENER=yes
# 设置broker最大内存,和初始内存
- KAFKA_HEAP_OPTS=-Xmx512M -Xms256M
volumes:
- /home/vagrant/kafka/volume/broker01:/bitnami/kafka:rw
networks:
netkafka:
ipv4_address: 172.23.0.11
kafka2:
container_name: kafka2
image: 'bitnami/kafka:3.3.1'
user: root
ports:
- '29092:9092'
- '29093:9093'
environment:
- KAFKA_ENABLE_KRAFT=yes
- KAFKA_ZOOKEEPER_CONNECT=192.168.1.21:2181
- KAFKA_CFG_PROCESS_ROLES=broker,controller
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.1.21:29092 #修改宿主机ip
- KAFKA_BROKER_ID=2
- KAFKA_KRAFT_CLUSTER_ID=iZWRiSqjZAlYwlKEqHFQWI #哪一,三个节点保持一致
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@172.23.0.11:9093,2@172.23.0.12:9093,3@172.23.0.13:9093
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_HEAP_OPTS=-Xmx512M -Xms256M
volumes:
- /home/vagrant/kafka/volume/broker02:/bitnami/kafka:rw
networks:
netkafka:
ipv4_address: 172.23.0.12
kafka3:
container_name: kafka3
image: 'bitnami/kafka:3.3.1'
user: root
ports:
- '39092:9092'
- '39093:9093'
environment:
- KAFKA_ENABLE_KRAFT=yes
- KAFKA_ZOOKEEPER_CONNECT=192.168.1.21:2181
- KAFKA_CFG_PROCESS_ROLES=broker,controller
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.1.21:39092 #修改宿主机ip
- KAFKA_BROKER_ID=3
- KAFKA_KRAFT_CLUSTER_ID=iZWRiSqjZAlYwlKEqHFQWI
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@172.23.0.11:9093,2@172.23.0.12:9093,3@172.23.0.13:9093
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_HEAP_OPTS=-Xmx512M -Xms256M
volumes:
- /home/vagrant/kafka/volume/broker03:/bitnami/kafka:rw
networks:
netkafka:
ipv4_address: 172.23.0.13
networks:
name:
netkafka:
driver: bridge
name: netkafka
ipam:
driver: default
config:
- subnet: 172.23.0.0/25
gateway: 172.23.0.1
1.修改宿主机ip KAFKA_CFG_ADVERTISED_LISTENERS
192.168.1.21
2.修改挂载路径/home/vagrant/kafka/volume
- 启动
进入到kafka.yml
目录执行:docker-compose -f kafka.yml up -d
- 执行
#3.1进入容器
docker exec -it kafka1 bash
#3.2进入kafka目录
cd /opt/bitnami/kafka/bin
#3.3 创建Topic
#创建一个副本为3、分区为5的topic
./kafka-topics.sh --create --topic foo --partitions 5 --replication-factor 3 --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092
Created topic foo.
#查看topic详细信息
kafka-topics.sh --describe --topic foo --bootstrap-server kafka1:9092, kafka2:9092, kafka3:9092
Topic: foo TopicId: 1tHGERe8QA6z24abtVKCLg PartitionCount: 5 ReplicationFactor: 3 Configs:
Topic: foo Partition: 0 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1
Topic: foo Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: foo Partition: 2 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: foo Partition: 3 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1
Topic: foo Partition: 4 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1
#3.4生产和消费验证
#开出两个窗口 进到相同容器相同目录
kafka1生产: kafka-console-producer.sh --topic foo --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092
kafka2和kafka3消费:/opt/bitnami/kafka# bin/kafka-console-consumer.sh --topic foo --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092
#3.5删除Topic
kafka-topics.sh --delete --topic foo --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092