Flume(三)—— Flume案例


  • 1. 案例一:监控端口数据
  • 2. 案例二:监测(实时读取)本地文件到HDFS
  • 3. 案例三:实时读取目录文件到HDFS
  • 4. 案例四:扇出
  • 5. 案例五:扇入
  • 6. 案例六:Flume拦截器
  • 6.1 时间拦截器
  • 6.2 主机名拦截器
  • 6.3 UUID拦截器
  • 6.4 查询替换拦截器
  • 6.5 正则过滤拦截器
  • 6.6 正则抽取拦截器
  • 6.7 自定义拦截器
  • 7. 案例七:Flume对接Kafka
  • 8. 案例八:Kafka对接Flume



1. 案例一:监控端口数据



  1. 创建并修改任务的配置文件 flume-telnet.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = netcat
a1.sources.r1.bind = topnpl200
a1.sources.r1.port = 44445

# 定义sink
a1.sinks.k1.type = logger

# 定义channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 双向链接
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  1. 启动:
/usr/local/flume/bin/flume-ng agent --conf /usr/local/flume/conf/ --name a1 --conf-file /usr/local/flume/jobconf/flume-telnet.conf -Dflume.root.logger==INFO,console
  1. 测试:
telnet topnpl200 44445

输入 hello

# 判断44445端口是否被占用
netstat -tunlp | grep 44445

2. 案例二:监测(实时读取)本地文件到HDFS



  1. 创建并修改任务的配置文件 flume-hdfs.conf
# 1 agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2

# 2 source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/top_resources/flume_test
a2.sources.r2.shell = /bin/bash -c

# 3 sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://topnpl200:9000/flume/%Y%m%d/%H
a2.sinks.k2.hdfs.filePrefix = logs-
a2.sinks.k2.hdfs.round = true
a2.sinks.k2.hdfs.roundValue = 1
a2.sinks.k2.hdfs.roundUnit = hour
a2.sinks.k2.hdfs.useLocalTimeStamp = true
a2.sinks.k2.hdfs.batchSize = 10
a2.sinks.k2.hdfs.fileType = DataStream
a2.sinks.k2.hdfs.rollInterval = 600
a2.sinks.k2.hdfs.rollSize = 134217700
a2.sinks.k2.hdfs.rollCount = 0
a2.sinks.k2.hdfs.minBlockReplicas = 1

# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
  1. 确认开启了HDFS
  1. 启动
cd /usr/local/flume

# 用相对路径启动
bin/flume-ng agent \
--conf conf/ \
--name a2 \
--conf-file jobconf/flume-hdfs.conf

# 绝对路径启动(用这个吧,相对路径启动目前有问题)
/usr/local/flume/bin/flume-ng agent --conf /usr/local/flume/conf/ --name a2 --conf-file /usr/local/flume/jobconf/flume-hdfs.conf
  1. 测试
    往/opt/top_resources/flume_test文件里写数据 echo aaa >> flume_test(如果要清空flume_test里的数据,echo > flume_test),发现HDFS中有了/flume/20190831/23/logs-.1567266940527.tmp,当达到往HDFS中刷新的限制或者关掉监测后,tmp消失。
  2. 问题
    sink’s batch size is greater than the channels transaction capacity. Sink: k2, batch size = 1000, channel c2, transaction capacity = 100
    修改:a2.sinks.k2.hdfs.batchSize = 1000 -> a2.sinks.k2.hdfs.batchSize = 10

3. 案例三:实时读取目录文件到HDFS



  1. 创建并修改任务的配置文件 flume-dir.conf
#1 Agent
a3.sources = r3
a3.sinks = k3
a3.channels = c3

#2 source
a3.sources.r3.type = spooldir
  a3.sources.r3.spoolDir = /opt/top_resources/flume_dir
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)

# 3 sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://topnpl200:9000/flume/%H
a3.sinks.k3.hdfs.filePrefix = upload-
a3.sinks.k3.hdfs.round = true
a3.sinks.k3.hdfs.roundValue = 1
a3.sinks.k3.hdfs.roundUnit = hour
a3.sinks.k3.hdfs.useLocalTimeStamp = true
a3.sinks.k3.hdfs.batchSize = 100
a3.sinks.k3.hdfs.fileType = DataStream
a3.sinks.k3.hdfs.rollInterval = 600
a3.sinks.k3.hdfs.rollSize = 134217700
a3.sinks.k3.hdfs.rollCount = 0
a3.sinks.k3.hdfs.minBlockReplicas = 1

# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
  1. 确认开启了hdfs
  1. 启动
cd /usr/local/flume
bin/flume-ng agent --conf /conf --name a3 --conf-file jobconf/flume-dir.conf 
# 或者
bin/flume-ng agent --conf conf/ --name a3 --conf-file jobconf/flume-dir.conf
  1. 测试

4. 案例四:扇出


  1. 在bigdata111上,创建flume1.conf,用于监控某文件的变动,同时产生两个channel和两个sink分别输送给flume2和flume3。
# 1.agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 将数据流复制给多个channel
a1.sources.r1.selector.type = replicating

# 2.source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/test
a1.sources.r1.shell = /bin/bash -c

# 3.sink1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = bigdata112
a1.sinks.k1.port = 4141

# sink2
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = bigdata113
a1.sinks.k2.port = 4141

# 4.channel—1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 4.channel—2
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
  1. 在bigdata112上,创建flume-2.conf,用于接收flume1的event,同时产生1个channel和1个sink,将数据输送给hdfs。
# 1 agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# 2 source
a2.sources.r1.type = avro
a2.sources.r1.bind = bigdata112
a2.sources.r1.port = 4141

# 3 sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://bigdata111:9000/flume2/%H
a2.sinks.k1.hdfs.filePrefix = flume2-
a2.sinks.k1.hdfs.round = true
a2.sinks.k1.hdfs.roundValue = 1
a2.sinks.k1.hdfs.roundUnit = hour
a2.sinks.k1.hdfs.useLocalTimeStamp = true
a2.sinks.k1.hdfs.batchSize = 100
a2.sinks.k1.hdfs.fileType = DataStream
a2.sinks.k1.hdfs.rollInterval = 600
a2.sinks.k1.hdfs.rollSize = 134217700
a2.sinks.k1.hdfs.rollCount = 0
a2.sinks.k1.hdfs.minBlockReplicas = 1

# 4 channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

#5 Bind 
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
  1. 在bigdata113上,创建flume-3.conf,用于接收flume1的event,同时产生1个channel和1个sink,将数据输送给本地目录。
#1 agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1

# 2 source
a3.sources.r1.type = avro
a3.sources.r1.bind = bigdata113
a3.sources.r1.port = 4141

#3 sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/flume3

# 4 channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

# 5 Bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
  1. 启动
bin/flume-ng agent --conf conf/ --name a1 --conf-file jobconf/flume1.conf
bin/flume-ng agent --conf conf/ --name a2 --conf-file jobconf/flume2.conf
bin/flume-ng agent --conf conf/ --name a3 --conf-file jobconf/flume3.conf

5. 案例五:扇入


  1. 在bigdata111上,创建flume11.conf,用于监控hive.log文件,同时sink数据到flume-33。
# 1 agent
 a1.sources = r1
 a1.sinks = k1
 a1.channels = c1
 # 2 source
 a1.sources.r1.type = exec
 a1.sources.r1.command = tail -F /opt/test
 a1.sources.r1.shell = /bin/bash -c
 # 3 sink
 a1.sinks.k1.type = avro
 a1.sinks.k1.hostname = bigdata113
 a1.sinks.k1.port = 4141
 # 4 channel
 a1.channels.c1.type = memory
 a1.channels.c1.capacity = 1000
 a1.channels.c1.transactionCapacity = 100
 # 5 Bind
 a1.sources.r1.channels = c1
 a1.sinks.k1.channel = c1
  1. 在bigdata112上,创建flume-22.conf,用于监控端口44444数据流,同时sink数据到flume-33。
# 1 agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# 2 source
a2.sources.r1.type = netcat
a2.sources.r1.bind = bigdata112
a2.sources.r1.port = 44444

#3 sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = bigdata113
a2.sinks.k1.port = 4141

# 4 channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# 5 Bind
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
  1. 在bigdata113上,创建flume33.conf,用于接收flume11与flume22发送过来的数据流,最终合并后sink到HDFS。
# 1 agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1

# 2 source
a3.sources.r1.type = avro
a3.sources.r1.bind = bigdata113
a3.sources.r1.port = 4141

# 3 sink
a3.sinks.k1.type = hdfs
a3.sinks.k1.hdfs.path = hdfs://bigdata111:9000/flume3/%H
a3.sinks.k1.hdfs.filePrefix = flume3-
a3.sinks.k1.hdfs.round = true
a3.sinks.k1.hdfs.roundValue = 1
a3.sinks.k1.hdfs.roundUnit = hour
a3.sinks.k1.hdfs.useLocalTimeStamp = true
a3.sinks.k1.hdfs.batchSize = 100
a3.sinks.k1.hdfs.fileType = DataStream
a3.sinks.k1.hdfs.rollInterval = 600	
a3.sinks.k1.hdfs.rollSize = 134217700
a3.sinks.k1.hdfs.rollCount = 0
a3.sinks.k1.hdfs.minBlockReplicas = 1

# 4 channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

# 5 Bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
  1. 启动
bin/flume-ng agent --conf conf/ --name a1 --conf-file jobconf/flume11.conf
bin/flume-ng agent --conf conf/ --name a2 --conf-file jobconf/flume22.conf
bin/flume-ng agent --conf conf/ --name a3 --conf-file jobconf/flume33.conf

6. 案例六:Flume拦截器

6.1 时间拦截器


  1. 创建并修改任务的配置文件 flume-interceptors-timestamp.conf
#1.定义agent名, source、channel、sink的名称
a4.sources = r1
a4.channels = c1
a4.sinks = k1

a4.sources.r1.type = spooldir
a4.sources.r1.spoolDir = /opt/top_resources/flume_test

a4.sources.r1.interceptors = i1
a4.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder

a4.channels.c1.type = memory
a4.channels.c1.capacity = 10000
a4.channels.c1.transactionCapacity = 100

a4.sinks.k1.type = hdfs
a4.sinks.k1.hdfs.path = hdfs://topnpl200:9000/flume-interceptors/%H
a4.sinks.k1.hdfs.filePrefix = events-
a4.sinks.k1.hdfs.fileType = DataStream
a4.sinks.k1.hdfs.rollCount = 0
a4.sinks.k1.hdfs.rollSize = 134217728
a4.sinks.k1.hdfs.rollInterval = 60

a4.sources.r1.channels = c1
a4.sinks.k1.channel = c1
  1. 启动
bin/flume-ng agent -c conf/ -n a4 -f jobconf/flume-interceptors-timestamp.conf -Dflume.root.logger=INFO,console

6.2 主机名拦截器


  1. 创建并修改任务的配置文件 flume-interceptors-host.conf
a1.sources= r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /opt/top_resources/flume_test.txt

a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = host
a1.sources.r1.interceptors.i1.useIP = false
a1.sources.r1.interceptors.i1.hostHeader = agentHost

a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = hdfs://topnpl200:9000/flume-interceptors/%{agentHost}
a1.sinks.k1.hdfs.filePrefix = during_%{agentHost}
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  1. 启动
bin/flume-ng agent -c conf/ -n a1 -f jobconf/flume-interceptors-host.conf -Dflume.root.logger=INFO,console

6.3 UUID拦截器


  1. 创建并修改任务的配置文件 flume-interceptors-uuid.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /opt/top_resources/flume_test.txt

a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
a1.sources.r1.interceptors.i1.preserveExisting = true
a1.sources.r1.interceptors.i1.prefix = UUID_

a1.sinks.k1.type = logger

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  1. 启动
bin/flume-ng agent -c conf/ -f jobconf/flume-interceptors-uuid.conf -n a1 -Dflume.root.logger==INFO,console

6.4 查询替换拦截器

  1. 创建并修改任务的配置文件 flume-interceptors-search-replace.conf
#1 agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#2 source
a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /opt/top_resources/flume_test.txt
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = search_replace

a1.sources.r1.interceptors.i1.searchPattern = [0-9]+
a1.sources.r1.interceptors.i1.replaceString = ***
a1.sources.r1.interceptors.i1.charset = UTF-8

#3 sink
a1.sinks.k1.type = logger

#4 Chanel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#5 bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  1. 启动
bin/flume-ng agent -c conf/ -f jobconf/flume-interceptors-search-replace.conf -n a1 -Dflume.root.logger=INFO,console

6.5 正则过滤拦截器

  1. 创建并修改任务的配置文件 flume-interceptors-filter.conf
#1 agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#2 source
a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /opt/top_resources/flume_test.txt
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = regex_filter
a1.sources.r1.interceptors.i1.regex = ^A.*
a1.sources.r1.interceptors.i1.excludeEvents = true

a1.sinks.k1.type = logger

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  1. 启动
bin/flume-ng agent -c conf/ -f jobconf/flume-interceptors-filter.conf -n a1 -Dflume.root.logger=INFO,console

6.6 正则抽取拦截器

  1. 创建并修改任务的配置文件 flume-interceptors-extractor.conf
#1 agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#2 source
a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /opt/top_resources/flume_test.txt
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = regex_extractor
a1.sources.r1.interceptors.i1.regex = hostname is (.*?) ip is (.*)
a1.sources.r1.interceptors.i1.serializers = s1 s2
a1.sources.r1.interceptors.i1.serializers.s1.name = cookieid
a1.sources.r1.interceptors.i1.serializers.s2.name = ip

a1.sinks.k1.type = logger

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  1. 启动
bin/flume-ng agent -c conf/ -f jobconf/flume-interceptors-extractor.conf -n a1 -Dflume.root.logger=INFO,console
  1. 测试
    往flume_test.txt输入 hostname is topnpl200 ip is
2019-09-01 19:06:42,417 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{ip=, cookieid=topnpl200} body: 68 6F 73 74 6E 61 6D 65 20 69 73 20 74 6F 70 6E hostname is topn }


6.7 自定义拦截器

  1. 创建并修改任务的配置文件 flume-interceptors-custom.conf
a1.sources = r1
a1.sinks =k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/top_resources/flume_test.txt
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.during.flume.ConvertInterceptor$Builder
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume-interceptors/custom-interceptors
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.rollInterval = 3
a1.sinks.k1.hdfs.rollSize = 20
a1.sinks.k1.hdfs.rollCount = 5
a1.sinks.k1.hdfs.batchSize = 1
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#生成的文件类型,默认是 Sequencefile,可用 DataStream,则为普通文本
a1.sinks.k1.hdfs.fileType = DataStream
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  1. Java程序
package com.during.flume;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.ArrayList;
import java.util.List;

public class MyInterceptor implements Interceptor {
	public void initialize() {

	public void close() {

	 * 拦截source发送到通道channel中的消息
	 * @param event 	接收过滤的event
	 * @return event  根据业务处理后的event
	public Event intercept(Event event) {
	    // 获取事件对象中的字节数据
	    byte[] arr = event.getBody();
	    // 将获取的数据转换成大写
	    event.setBody(new String(arr).toUpperCase().getBytes());
	    // 返回到消息中
	    return event;

	// 接收被过滤事件集合
	public List<Event> intercept(List<Event> events) {
	    List<Event> list = new ArrayList<Event>();
	    for (Event event : events) {
	    return list;

	public static class Builder implements Interceptor.Builder {
	    // 获取配置文件的属性
	    public Interceptor build() {
	        return new MyInterceptor();

	    public void configure(Context context) {

  1. 启动
bin/flume-ng agent -c conf/ -n a1 -f jobconf/flume-interceptors-custom.conf -C ./during-1.0-SNAPSHOT.jar -Dflume.root.logger=DEBUG,console

7. 案例七:Flume对接Kafka

  1. 创建并修改任务的配置文件 flume2kafka.conf
# define
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F -c +0 /opt/jars/calllog.csv
a1.sources.r1.shell = /bin/bash -c

# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.brokerList = bigdata111:9092,bigdata112:9092,bigdata113:9092
a1.sinks.k1.topic = calllog
a1.sinks.k1.batchSize = 20
a1.sinks.k1.requiredAcks = 1

# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  1. 启动
/opt/module/flume-1.8.0/bin/flume-ng agent --conf /opt/module/flume-1.8.0/conf/ --name a1 --conf-file /opt/jars/flume2kafka.conf

8. 案例八:Kafka对接Flume

  1. 创建并修改任务的配置文件 kafka2flume.conf
agent.sources = kafkaSource
agent.channels = memoryChannel
agent.sinks = hdfsSink

# The channel can be defined as follows.
agent.sources.kafkaSource.channels = memoryChannel


# the sink of hdfs
agent.sinks.hdfsSink.channel = memoryChannel
  1. 启动
bin/flume-ng agent --conf conf --conf-file jobconf/kafka2flume.conf --name agent -Dflume.root.logger=INFO,console

