当前位置: 首页>后端>正文

ELFK+Kafka集群搭建

背景

为了缓解ES压力,在ELFK架构上引入Kafka作为数据缓冲,提升日志收集效率。

规划

  • 系统版本为Ubuntu18.04
  • 本次kafka的版本为kafka_2.12-3.4.0.tgz,节点为3个节点
192.168.2.112  192.168.2.113   192.168.2.114
  • zookeeper使用kafka内置的

整体思路

  • 安装kafka (三台机器)
  • 把kafka内置的zookeeper配置为集群,再配置kafka
  • 简单测试kafka集群生产/消费,并把后面elfk要用的topic创建好
  • 将filebeat配置output配置修改为输出至kafka
  • 将logstash配置input配置修改为从kafka消费数据
  • 日志收集测试

一.Kafka集群搭建

以下步骤均三台机器分别执行,注意文件中程序路径

1.1) 安装kafka

解压目录并重命名(三台机器上目录分别为/opt/kafka-01,/opt/kafka-02,/opt/kafka-03)

tar zxvf kafka_2.12-3.4.0.tgz -C /opt/
cd /opt/ && mv kafka_2.12-3.4.0 kafka-01
#后面两台一样操作,注意目录名字区分

1.2) 将 kafka 内置的 zookeeper 配置为系统服务

cat >/lib/systemd/system/zk.service <<EOF
[Unit]
Description=Zookeeper in Kafka package
After=network.target

[Service]
Type=simple
WorkingDirectory=/opt/kafka-01

Environment=JAVA_HOME=/opt/jdk1.8.0_144

Restart=on-failure
LimitNOFILE=100000

ExecStart=/opt/kafka-01/bin/zookeeper-server-start.sh /opt/kafka-01/config/zookeeper.properties

[Install]
WantedBy=multi-user.target
EOF
#开机启动
systemctl daemon-reload
systemctl enable zk

1.3) 将 kafka 配置为系统服务

cat > /lib/systemd/system/kafka.service <<EOF
[Unit]
Description=Kafka
Requires=zk.service
After=network.target
After=zk.service


[Service]
Type=simple
WorkingDirectory=/opt/kafka-03

Environment=JAVA_HOME=/opt/jdk1.8.0_144

Restart=on-failure
LimitNOFILE=100000

ExecStart=/opt/kafka-03/bin/kafka-server-start.sh /opt/kafka-03/config/server.properties

[Install]
WantedBy=multi-user.target
EOF
#开机启动
systemctl daemon-reload
systemctl enable kafka

1.4)修改zookeeper配置文件

配置数据目录,日志目录,地址配为集群,修改后配置如下:
vim /opt/kafka-01/config/zookeeper.properties

clientPort=2181
admin.enableServer=false
dataDir=/opt/kafka-01/zk_data_new
dataLogDir= /opt/kafka-01/zk_log
tickTime=2000
initLimit=10
syncLimit=5
 
server.1=192.168.2.112:2888:3888
server.2=192.168.2.113:2888:3888
server.3=192.168.2.114:2888:3888

1.5) 创建zookeeper所需目录以及创建myid

上面的配置中server.1 代表192.168.2.112这个节点在zk集群中id为1,需要一个myid文件来存放这个id

mkdir /opt/kafka-01/zk_data_new zk_log
echo "1" > /opt/kafka-01/zk_data_new/myid
#后面两台以此类推,分别id为2,3

以上执行完,zookeeper集群就配置完了,下面配置kafka集群

1.6) 修改kafka配置文件

vim /opt/kafka-01/config# cat server.properties (以下的ip地址可配成内网域名解析)修改后为:

broker.id=1
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://192.168.2.112:9092
num.network.threads=4
num.io.threads=8
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=125829120
log.dirs=/opt/kafka-01/kafka-logs 
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
log.retention.hours=168
log.retention.bytes=1073741824
log.segment.bytes=209715200
log.retention.check.interval.ms=300000
#zookeeper.connect=localhost:2181
zookeeper.connect=192.168.2.112:2181,192.168.2.113:2181,192.168.2.114:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=3000
delete.topic.enable=true
queued.max.requests = 1000
auto.create.topics.enable=false
offsets.retention.minutes=43200
log.cleaner.enable=true
message.max.bytes=104857600
log.roll.hours=2
  • 参数解读:
=====================================================================================
broker.id=1  #这里1是kafka-01的id,每台机器不同,唯一的
listeners  #监听kafka地址:
advertised.listeners  #本机对外声明的地址:
zookeeper.connect  #配置zk集群地址
log.dirs  #kafka日志目录,可逗号配置多个
message.max.bytes   #允许的最大消息大小,要小于 socket.request.max.bytes,并且最好和消息生产者最大能产生的消息一致
=====================================================================================
num.network.threads 和 num.io.threads
分别是网络和 IO 的线程数量
=====================================================================================
socket.send.buffer.bytes 和 socket.receive.buffer.bytes
分别是发送缓冲区大小和接收缓冲区大小
=====================================================================================
socket.request.max.bytes
服务器允许请求的最大值,用来防止OOM(应该要大于 message.max.bytes)(为了防止OOM,这个值应该小于JVM 的堆内存)
=====================================================================================
num.io.threads
num.partitions
创建 topic 时如果没有指定 partition 数量则使用这个值。
=====================================================================================
num.recovery.threads.per.data.dir
在启动时恢复日志和关闭时刷盘日志时每个数据目录的线程的数量,一般磁盘可以不增加,默认为1
=====================================================================================
offsets.topic.replication.factor、transaction.state.log.replication.factor 和 transaction.state.log.min.isr
在生产环境下并且有多个 kafka 节点时建议配置为3,这些是内部 topic 使用副本数量等信息。
=====================================================================================
log.retention.hours
日志(消息)保留时间,单位是小时
=====================================================================================
log.retention.bytes
日志(消息)最大保留多少空间,单位是字节。与 log.retention.hours 同时生效,独立控制。
这里要注意,这个值原来是限制 partition 使用的大小而不是限制 broker 使用的大小
=====================================================================================
log.segment.bytes
日志(消息)存储时多少内容拆分一个 segment 文件,单位是字节。topic 创建时可以单独指定这个值
=====================================================================================
log.retention.check.interval.ms
多长时间去检查一次存储是否符合 log.retention.hours 和 log.retention.bytes,单位是毫秒
=====================================================================================
zookeeper.connect 和 zookeeper.connection.timeout.ms
连接 zookeeper 的地址和超时时间
=====================================================================================
group.initial.rebalance.delay.ms
有消费者加入到这个组后多久进行重新平衡,生产建议 3 秒,调试时可以配置为 0
=====================================================================================
delete.topic.enable
是否允许删除 topic
=====================================================================================
queued.max.requests
I/O线程等待队列中的最大的请求数,超过这个数量,network线程就不会再接收一个新的请求。应该是一种自我保护机制
=====================================================================================
auto.create.topics.enable
是否允许自动创建 topics
=====================================================================================
offsets.retention.minutes
内部 topic 中 offset 的过期时间,一般建议比 log.retention.hours 长,否则 offset 先过期的话数据可能被重新发送
=====================================================================================
log.cleaner.enable
是否启用日志压缩
=====================================================================================
log.roll.hours=2
和 log.segment.bytes 共同决定多久(多大)更换一个 segment 存储文件
=====================================================================================

1.7) 启动zk和kafka

systemctl start zk
systemctl start kafka

启动后可以看看是否正常启动,jps可查看到

24017 Logstash
7159 QuorumPeerMain
16698 Kafka
31771 Jps

二. Kafka创建Topic

2.1) 创建一个topic

(replication-factor-副本数,partitions-分区数,组成集群后server指定任意一个节点创建即可,后面节点会自动同步到其他节点)

bin/kafka-topics.sh --create --bootstrap-server 192.168.2.112:9092 --replication-factor 1 --partitions 1 --topic new_filebeat

2.2) 简单测试生产/消费(命令行)

--bootstrap-server 可随意指定集群任意节点,如果不同节点查不到同步的topic或者拿不到消息也说明集群没有搭好。
查看topic

./bin/kafka-topics.sh --bootstrap-server 192.168.2.112:9092 --topic new_filebeat --describe

kafka生产者

bin/kafka-console-producer.sh --broker-list  192.168.2.114:9092 --topic new_filebeat

kafka消费者

bin/kafka-console-consumer.sh --bootstrap-server 192.168.2.112:9092  --topic new_filebeat

正常结果如下:

ELFK+Kafka集群搭建,第1张

ELFK+Kafka集群搭建,第2张

三.调整Filebeat配置

将filbeat的输出由之前的output到logstash改成到kafka

output.kafka:
  hosts: ['192.168.2.112:9092','192.168.2.113:9092','192.168.2.114:9092']
  topic: 'new_filebeat'
  enabled: true
  partition.round_robin:
    reachable_only: true
  required_acks: 1
  compression: gzip
  max_message_bytes: 104857600

参数解读:

reachable_only: true: 设置是否仅将消息发送到可到达的broker服务端,避免消息发送失败。
required_acks: 1: 设置等待确认的应答数,1表示只需等待写入到leader分区后,就可以发送ACK确认消息。
compression: gzip: 设置消息传输时的压缩算法,gzip表示使用gzip算法进行压缩。
max_message_bytes: 104857600: 设置每个消息的最大字节数限制,104857600表示最大是100MB。
这些参数可以根据实际情况进行调整,以避免消息传输失败,提高效率。

重启服务重新加载配置systemctl restart filebeat

四.调整Logstash配置

将logstash的输入由之前的iutput从filebeat拿数据改成从kafka拿

input {
   kafka {
      bootstrap_servers => ["192.168.2.112:9092,192.168.2.113:9092,192.168.2.114:9092"]
      topics_pattern => "new_filebeat"
      group_id => "logs"
      codec => "json"
      consumer_threads => 70
   }
}

参数解读:

topics_pattern: 之前kafka中创建的topic名
codec: 编码格式
consumer_threads: 指定用于处理Kafka消息的线程数。默认为1,当应用程序需要同时处理大量Kafka消息时,配置大于1的数,可提高处理能力。
group_id : 用于定义消费者的组 ID。消费者组是一组 Kafka 消费者,它们共享同一个组 ID,并从一个或多个 Kafka 主题消费消息。当多个消费者共享相同的组 ID 时,
Kafka 会确保它们不会同时消费同一个分区中的消息,以保证分区内数据的负载均衡和整体消费速度的增加。在配置 Kafka 输入插件时,group_id 参数是必须配置的。
如果没有配置 group_id,则会使用默认值 logstash,这可能会导致多个 Logstash 实例都使用相同的 group_id,从而造成消费冲突和数据重复。

重启服务重新加载配置systemctl restart logstash./bin/logstash -f logstash.conf

五.测试日志消息

5.1)运行kafka消费者命令行指定我们上面创建的用于日志收集的topicnew_filebeat来监测消息

./kafka-console-consumer.sh --bootstrap-server 192.168.2.112:9092 --topic new_filebeat

5.2)输入测试数据到filebeat收集的日志路径

这里是指定收集/opt/nginx.log的日志

echo '8.8.8.11 - - [22/Mar/2023:14:15:12 +0800] "GET / HTTP/1.1" 404 0 "-" Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36 - 0.001  192.168.1.132:8005 900 0.000'>>/opt/nginx.log

5.3)观察消费端发现有消息写到kafka的topic中了:

ELFK+Kafka集群搭建,第3张

5.4)观察kibana也能看到日志了:

ELFK+Kafka集群搭建,第4张

遇到的问题

Q1.zookeeper集群启动失败

ERROR Unable to load database on disk (org.apache.zookeeper.server.quorum.QuorumPeer)
 java.io.IOException: No snapshot found, but there are log entries. Something is broken!
  at org.apache.zookeeper.server.persistence.FileTxnSnapLog.restore(FileTxnSnapLog.java:281)
  at org.apache.zookeeper.server.ZKDatabase.loadDataBase(ZKDatabase.java:285)
  at org.apache.zookeeper.server.quorum.QuorumPeer.loadDataBase(QuorumPeer.java:1094)
  at org.apache.zookeeper.server.quorum.QuorumPeer.start(QuorumPeer.java:1079)
  at org.apache.zookeeper.server.quorum.QuorumPeerMain.runFromConfig(QuorumPeerMain.java:227)
  at org.apache.zookeeper.server.quorum.QuorumPeerMain.initializeAndRun(QuorumPeerMain.java:136)
  at org.apache.zookeeper.server.quorum.QuorumPeerMain.main(QuorumPeerMain.java:90)
ERROR Unexpected exception, exiting abnormally (org.apache.zookeeper.server.quorum.QuorumPeerMain)
  java.lang.RuntimeException: Unable to run quorum server
  at org.apache.zookeeper.server.quorum.QuorumPeer.loadDataBase(QuorumPeer.java:1149)
  at org.apache.zookeeper.server.quorum.QuorumPeer.start(QuorumPeer.java:1079)
  at org.apache.zookeeper.server.quorum.QuorumPeerMain.runFromConfig(QuorumPeerMain.java:227)
  at org.apache.zookeeper.server.quorum.QuorumPeerMain.initializeAndRun(QuorumPeerMain.java:136)
  at org.apache.zookeeper.server.quorum.QuorumPeerMain.main(QuorumPeerMain.java:90)

原因:

之前搭建zk的时候是单点去初始化启动的,产生了数据文件,后加入了集群配置再次启动,zk会根据zk的log目录下的version-2下的数据和日志信息去启动,已经不匹配了,所以报没有可用的快照文件从磁盘加载。

解决:

删除zk数据目录中verrsion-2下的内容,删除zk的日志目录下的version-2下的内容,再次重新启动。
或者 重新修改数据目录和日志目录重新初始化。

Q2. kafka集群启动失败

ERROR Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
kafka.common.InconsistentClusterIdException: The Cluster ID kVSgfurUQFGGpHMTBqBPiw doesn't match stored clusterId Some(0Qftv9yBTAmf2iDPSlIk7g) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.
    at kafka.server.KafkaServer.startup(KafkaServer.scala:220)
    at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
    at kafka.Kafka$.main(Kafka.scala:84)
    at kafka.Kafka.main(Kafka.scala)

原因:

集群的id是第一次启动的时候就会自动产生,如果改过配置或者修改数据目录再次启动可能重新触发新的id产生,这会造成冲突

解决:

删除meta.proeteis (在kafka2.7.0版本中这个文件在kafka的log目录下)或者 修改meta.proeteis的id为报错中提示的id即可。 重启服务,jps查看进程是否有kafka


https://www.xamrdz.com/backend/3b21940637.html

相关文章: