kafka整合Flume
前提是已经安装完成flume
-
1、添加启动flume时的配置文件
# node01执行以下命令开发flume的配置文件 cd /kkb/install/apache-flume-1.6.0-cdh5.14.2-bin/conf/ ### 编辑配置文件 vi flume-kafka.conf
#为我们的source channel sink起名 a1.sources = r1 a1.channels = c1 a1.sinks = k1 #指定我们的source数据收集策略 a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /kkb/install/flumeData/files a1.sources.r1.inputCharset = utf-8 #指定我们的source收集到的数据发送到哪个管道 a1.sources.r1.channels = c1 #指定我们的channel为memory,即表示所有的数据都装进memory当中 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 #指定我们的sink为kafka sink,并指定我们的sink从哪个channel当中读取数据 a1.sinks.k1.channel = c1 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = kaikeba a1.sinks.k1.kafka.bootstrap.servers = node01:9092,node02:9092,node03:9092 a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.acks = 1
-
2、node01执行以下命令创建Topic
cd /kkb/install/kafka_2.11-1.1.0/ bin/kafka-topics.sh --create --topic kaikeba --partitions 3 --replication-factor 2 --zookeeper node01:2181,node02:2181,node03:2181
-
3、启动Flume
node01执行以下命令启动flume
bin/flume-ng agent -n a1 -c conf -f conf/flume-kafka.conf -Dflume.root.logger=info,console
-
4、启动kafka控制台消费者,验证数据写入成功
node01执行以下命令消费kafka当中的数据
cd /kkb/install/kafka_2.11-1.1.0/ bin/kafka-console-consumer.sh --topic kaikeba --bootstrap-server node01:9092,node02:9092,node03:9092 --from-beginning