背景
为了缓解ES压力,在ELFK架构上引入Kafka作为数据缓冲,提升日志收集效率。
规划
- 系统版本为Ubuntu18.04
- 本次kafka的版本为
kafka_2.12-3.4.0.tgz
,节点为3个节点
192.168.2.112 192.168.2.113 192.168.2.114
- zookeeper使用kafka内置的
整体思路
- 安装kafka (三台机器)
- 把kafka内置的zookeeper配置为集群,再配置kafka
- 简单测试kafka集群生产/消费,并把后面elfk要用的topic创建好
- 将filebeat配置output配置修改为输出至kafka
- 将logstash配置input配置修改为从kafka消费数据
- 日志收集测试
一.Kafka集群搭建
以下步骤均三台机器分别执行,注意文件中程序路径
1.1) 安装kafka
解压目录并重命名(三台机器上目录分别为/opt/kafka-01,/opt/kafka-02,/opt/kafka-03)
tar zxvf kafka_2.12-3.4.0.tgz -C /opt/
cd /opt/ && mv kafka_2.12-3.4.0 kafka-01
#后面两台一样操作,注意目录名字区分
1.2) 将 kafka 内置的 zookeeper 配置为系统服务
cat >/lib/systemd/system/zk.service <<EOF
[Unit]
Description=Zookeeper in Kafka package
After=network.target
[Service]
Type=simple
WorkingDirectory=/opt/kafka-01
Environment=JAVA_HOME=/opt/jdk1.8.0_144
Restart=on-failure
LimitNOFILE=100000
ExecStart=/opt/kafka-01/bin/zookeeper-server-start.sh /opt/kafka-01/config/zookeeper.properties
[Install]
WantedBy=multi-user.target
EOF
#开机启动
systemctl daemon-reload
systemctl enable zk
1.3) 将 kafka 配置为系统服务
cat > /lib/systemd/system/kafka.service <<EOF
[Unit]
Description=Kafka
Requires=zk.service
After=network.target
After=zk.service
[Service]
Type=simple
WorkingDirectory=/opt/kafka-03
Environment=JAVA_HOME=/opt/jdk1.8.0_144
Restart=on-failure
LimitNOFILE=100000
ExecStart=/opt/kafka-03/bin/kafka-server-start.sh /opt/kafka-03/config/server.properties
[Install]
WantedBy=multi-user.target
EOF
#开机启动
systemctl daemon-reload
systemctl enable kafka
1.4)修改zookeeper配置文件
配置数据目录,日志目录,地址配为集群,修改后配置如下:
vim /opt/kafka-01/config/zookeeper.properties
clientPort=2181
admin.enableServer=false
dataDir=/opt/kafka-01/zk_data_new
dataLogDir= /opt/kafka-01/zk_log
tickTime=2000
initLimit=10
syncLimit=5
server.1=192.168.2.112:2888:3888
server.2=192.168.2.113:2888:3888
server.3=192.168.2.114:2888:3888
1.5) 创建zookeeper所需目录以及创建myid
上面的配置中server.1
代表192.168.2.112这个节点在zk集群中id为1,需要一个myid文件来存放这个id
mkdir /opt/kafka-01/zk_data_new zk_log
echo "1" > /opt/kafka-01/zk_data_new/myid
#后面两台以此类推,分别id为2,3
以上执行完,zookeeper集群就配置完了,下面配置kafka集群
1.6) 修改kafka配置文件
vim /opt/kafka-01/config# cat server.properties (以下的ip地址可配成内网域名解析)修改后为:
broker.id=1
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://192.168.2.112:9092
num.network.threads=4
num.io.threads=8
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=125829120
log.dirs=/opt/kafka-01/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
log.retention.hours=168
log.retention.bytes=1073741824
log.segment.bytes=209715200
log.retention.check.interval.ms=300000
#zookeeper.connect=localhost:2181
zookeeper.connect=192.168.2.112:2181,192.168.2.113:2181,192.168.2.114:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=3000
delete.topic.enable=true
queued.max.requests = 1000
auto.create.topics.enable=false
offsets.retention.minutes=43200
log.cleaner.enable=true
message.max.bytes=104857600
log.roll.hours=2
- 参数解读:
=====================================================================================
broker.id=1 #这里1是kafka-01的id,每台机器不同,唯一的
listeners #监听kafka地址:
advertised.listeners #本机对外声明的地址:
zookeeper.connect #配置zk集群地址
log.dirs #kafka日志目录,可逗号配置多个
message.max.bytes #允许的最大消息大小,要小于 socket.request.max.bytes,并且最好和消息生产者最大能产生的消息一致
=====================================================================================
num.network.threads 和 num.io.threads
分别是网络和 IO 的线程数量
=====================================================================================
socket.send.buffer.bytes 和 socket.receive.buffer.bytes
分别是发送缓冲区大小和接收缓冲区大小
=====================================================================================
socket.request.max.bytes
服务器允许请求的最大值,用来防止OOM(应该要大于 message.max.bytes)(为了防止OOM,这个值应该小于JVM 的堆内存)
=====================================================================================
num.io.threads
num.partitions
创建 topic 时如果没有指定 partition 数量则使用这个值。
=====================================================================================
num.recovery.threads.per.data.dir
在启动时恢复日志和关闭时刷盘日志时每个数据目录的线程的数量,一般磁盘可以不增加,默认为1
=====================================================================================
offsets.topic.replication.factor、transaction.state.log.replication.factor 和 transaction.state.log.min.isr
在生产环境下并且有多个 kafka 节点时建议配置为3,这些是内部 topic 使用副本数量等信息。
=====================================================================================
log.retention.hours
日志(消息)保留时间,单位是小时
=====================================================================================
log.retention.bytes
日志(消息)最大保留多少空间,单位是字节。与 log.retention.hours 同时生效,独立控制。
这里要注意,这个值原来是限制 partition 使用的大小而不是限制 broker 使用的大小
=====================================================================================
log.segment.bytes
日志(消息)存储时多少内容拆分一个 segment 文件,单位是字节。topic 创建时可以单独指定这个值
=====================================================================================
log.retention.check.interval.ms
多长时间去检查一次存储是否符合 log.retention.hours 和 log.retention.bytes,单位是毫秒
=====================================================================================
zookeeper.connect 和 zookeeper.connection.timeout.ms
连接 zookeeper 的地址和超时时间
=====================================================================================
group.initial.rebalance.delay.ms
有消费者加入到这个组后多久进行重新平衡,生产建议 3 秒,调试时可以配置为 0
=====================================================================================
delete.topic.enable
是否允许删除 topic
=====================================================================================
queued.max.requests
I/O线程等待队列中的最大的请求数,超过这个数量,network线程就不会再接收一个新的请求。应该是一种自我保护机制
=====================================================================================
auto.create.topics.enable
是否允许自动创建 topics
=====================================================================================
offsets.retention.minutes
内部 topic 中 offset 的过期时间,一般建议比 log.retention.hours 长,否则 offset 先过期的话数据可能被重新发送
=====================================================================================
log.cleaner.enable
是否启用日志压缩
=====================================================================================
log.roll.hours=2
和 log.segment.bytes 共同决定多久(多大)更换一个 segment 存储文件
=====================================================================================
1.7) 启动zk和kafka
systemctl start zk
systemctl start kafka
启动后可以看看是否正常启动,jps
可查看到
24017 Logstash
7159 QuorumPeerMain
16698 Kafka
31771 Jps
二. Kafka创建Topic
2.1) 创建一个topic
(replication-factor-副本数,partitions-分区数,组成集群后server指定任意一个节点创建即可,后面节点会自动同步到其他节点)
bin/kafka-topics.sh --create --bootstrap-server 192.168.2.112:9092 --replication-factor 1 --partitions 1 --topic new_filebeat
2.2) 简单测试生产/消费(命令行)
--bootstrap-server
可随意指定集群任意节点,如果不同节点查不到同步的topic或者拿不到消息也说明集群没有搭好。
查看topic
./bin/kafka-topics.sh --bootstrap-server 192.168.2.112:9092 --topic new_filebeat --describe
kafka生产者
bin/kafka-console-producer.sh --broker-list 192.168.2.114:9092 --topic new_filebeat
kafka消费者
bin/kafka-console-consumer.sh --bootstrap-server 192.168.2.112:9092 --topic new_filebeat
正常结果如下:
三.调整Filebeat配置
将filbeat的输出由之前的output到logstash改成到kafka
output.kafka:
hosts: ['192.168.2.112:9092','192.168.2.113:9092','192.168.2.114:9092']
topic: 'new_filebeat'
enabled: true
partition.round_robin:
reachable_only: true
required_acks: 1
compression: gzip
max_message_bytes: 104857600
参数解读:
reachable_only: true: 设置是否仅将消息发送到可到达的broker服务端,避免消息发送失败。
required_acks: 1: 设置等待确认的应答数,1表示只需等待写入到leader分区后,就可以发送ACK确认消息。
compression: gzip: 设置消息传输时的压缩算法,gzip表示使用gzip算法进行压缩。
max_message_bytes: 104857600: 设置每个消息的最大字节数限制,104857600表示最大是100MB。
这些参数可以根据实际情况进行调整,以避免消息传输失败,提高效率。
重启服务重新加载配置systemctl restart filebeat
四.调整Logstash配置
将logstash的输入由之前的iutput从filebeat拿数据改成从kafka拿
input {
kafka {
bootstrap_servers => ["192.168.2.112:9092,192.168.2.113:9092,192.168.2.114:9092"]
topics_pattern => "new_filebeat"
group_id => "logs"
codec => "json"
consumer_threads => 70
}
}
参数解读:
topics_pattern: 之前kafka中创建的topic名
codec: 编码格式
consumer_threads: 指定用于处理Kafka消息的线程数。默认为1,当应用程序需要同时处理大量Kafka消息时,配置大于1的数,可提高处理能力。
group_id : 用于定义消费者的组 ID。消费者组是一组 Kafka 消费者,它们共享同一个组 ID,并从一个或多个 Kafka 主题消费消息。当多个消费者共享相同的组 ID 时,
Kafka 会确保它们不会同时消费同一个分区中的消息,以保证分区内数据的负载均衡和整体消费速度的增加。在配置 Kafka 输入插件时,group_id 参数是必须配置的。
如果没有配置 group_id,则会使用默认值 logstash,这可能会导致多个 Logstash 实例都使用相同的 group_id,从而造成消费冲突和数据重复。
重启服务重新加载配置systemctl restart logstash
或./bin/logstash -f logstash.conf
五.测试日志消息
5.1)运行kafka消费者命令行指定我们上面创建的用于日志收集的topicnew_filebeat
来监测消息
./kafka-console-consumer.sh --bootstrap-server 192.168.2.112:9092 --topic new_filebeat
5.2)输入测试数据到filebeat收集的日志路径
这里是指定收集/opt/nginx.log的日志
echo '8.8.8.11 - - [22/Mar/2023:14:15:12 +0800] "GET / HTTP/1.1" 404 0 "-" Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36 - 0.001 192.168.1.132:8005 900 0.000'>>/opt/nginx.log
5.3)观察消费端发现有消息写到kafka的topic中了:
5.4)观察kibana也能看到日志了:
遇到的问题
Q1.zookeeper集群启动失败
ERROR Unable to load database on disk (org.apache.zookeeper.server.quorum.QuorumPeer)
java.io.IOException: No snapshot found, but there are log entries. Something is broken!
at org.apache.zookeeper.server.persistence.FileTxnSnapLog.restore(FileTxnSnapLog.java:281)
at org.apache.zookeeper.server.ZKDatabase.loadDataBase(ZKDatabase.java:285)
at org.apache.zookeeper.server.quorum.QuorumPeer.loadDataBase(QuorumPeer.java:1094)
at org.apache.zookeeper.server.quorum.QuorumPeer.start(QuorumPeer.java:1079)
at org.apache.zookeeper.server.quorum.QuorumPeerMain.runFromConfig(QuorumPeerMain.java:227)
at org.apache.zookeeper.server.quorum.QuorumPeerMain.initializeAndRun(QuorumPeerMain.java:136)
at org.apache.zookeeper.server.quorum.QuorumPeerMain.main(QuorumPeerMain.java:90)
ERROR Unexpected exception, exiting abnormally (org.apache.zookeeper.server.quorum.QuorumPeerMain)
java.lang.RuntimeException: Unable to run quorum server
at org.apache.zookeeper.server.quorum.QuorumPeer.loadDataBase(QuorumPeer.java:1149)
at org.apache.zookeeper.server.quorum.QuorumPeer.start(QuorumPeer.java:1079)
at org.apache.zookeeper.server.quorum.QuorumPeerMain.runFromConfig(QuorumPeerMain.java:227)
at org.apache.zookeeper.server.quorum.QuorumPeerMain.initializeAndRun(QuorumPeerMain.java:136)
at org.apache.zookeeper.server.quorum.QuorumPeerMain.main(QuorumPeerMain.java:90)
原因:
之前搭建zk的时候是单点去初始化启动的,产生了数据文件,后加入了集群配置再次启动,zk会根据zk的log目录下的version-2下的数据和日志信息去启动,已经不匹配了,所以报没有可用的快照文件从磁盘加载。
解决:
删除zk数据目录中verrsion-2下的内容,删除zk的日志目录下的version-2下的内容,再次重新启动。
或者 重新修改数据目录和日志目录重新初始化。
Q2. kafka集群启动失败
ERROR Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
kafka.common.InconsistentClusterIdException: The Cluster ID kVSgfurUQFGGpHMTBqBPiw doesn't match stored clusterId Some(0Qftv9yBTAmf2iDPSlIk7g) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.
at kafka.server.KafkaServer.startup(KafkaServer.scala:220)
at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
at kafka.Kafka$.main(Kafka.scala:84)
at kafka.Kafka.main(Kafka.scala)
原因:
集群的id是第一次启动的时候就会自动产生,如果改过配置或者修改数据目录再次启动可能重新触发新的id产生,这会造成冲突
解决:
删除meta.proeteis (在kafka2.7.0版本中这个文件在kafka的log目录下)或者 修改meta.proeteis的id为报错中提示的id即可。 重启服务,jps查看进程是否有kafka