文章目录
- 1. 案例一:监控端口数据
- 2. 案例二:监测(实时读取)本地文件到HDFS
- 3. 案例三:实时读取目录文件到HDFS
- 4. 案例四:扇出
- 5. 案例五:扇入
- 6. 案例六:Flume拦截器
- 6.1 时间拦截器
- 6.2 主机名拦截器
- 6.3 UUID拦截器
- 6.4 查询替换拦截器
- 6.5 正则过滤拦截器
- 6.6 正则抽取拦截器
- 6.7 自定义拦截器
- 7. 案例七:Flume对接Kafka
- 8. 案例八:Kafka对接Flume
所有任务的配置文件都放在/usr/local/flume/jobconf下
启动命令:
bin/flume-ng agent --conf conf/ --name a3 --conf-file myconf/flume33.conf
bin/flume-ng agent -c conf -n a3 -f myconf/flume33.conf
Source、Channel、Sink类型总结:
1. 案例一:监控端口数据
Flume监控一端Console,另一端Console发送消息,使被监控端实时显示。
监控端口source:type–>netcat
日志打印sink:type–>logger
- 创建并修改任务的配置文件 flume-telnet.conf
#定义Agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#定义netcatsource
a1.sources.r1.type = netcat
a1.sources.r1.bind = topnpl200
a1.sources.r1.port = 44445
# 定义sink
a1.sinks.k1.type = logger
# 定义channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 双向链接
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
- 启动:
/usr/local/flume/bin/flume-ng agent --conf /usr/local/flume/conf/ --name a1 --conf-file /usr/local/flume/jobconf/flume-telnet.conf -Dflume.root.logger==INFO,console
- 测试:
telnet topnpl200 44445
输入 hello
日志中能监测到
# 判断44445端口是否被占用
netstat -tunlp | grep 44445
2. 案例二:监测(实时读取)本地文件到HDFS
监控文件(通过Shell命令)
source:type–>exec
sink:type–>hdfs
- 创建并修改任务的配置文件 flume-hdfs.conf
# 1 agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2
# 2 source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/top_resources/flume_test
a2.sources.r2.shell = /bin/bash -c
# 3 sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://topnpl200:9000/flume/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a2.sinks.k2.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k2.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a2.sinks.k2.hdfs.batchSize = 10
#设置文件类型,可支持压缩
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一个新的文件(单位:秒)
a2.sinks.k2.hdfs.rollInterval = 600
#设置每个文件的滚动大小(单位:字节)128M
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a2.sinks.k2.hdfs.rollCount = 0
#最小副本数
a2.sinks.k2.hdfs.minBlockReplicas = 1
# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
- 确认开启了HDFS
start-dfs.sh
- 启动
cd /usr/local/flume
# 用相对路径启动
bin/flume-ng agent \
--conf conf/ \
--name a2 \
--conf-file jobconf/flume-hdfs.conf
# 绝对路径启动(用这个吧,相对路径启动目前有问题)
/usr/local/flume/bin/flume-ng agent --conf /usr/local/flume/conf/ --name a2 --conf-file /usr/local/flume/jobconf/flume-hdfs.conf
- 测试
往/opt/top_resources/flume_test文件里写数据 echo aaa >> flume_test(如果要清空flume_test里的数据,echo > flume_test),发现HDFS中有了/flume/20190831/23/logs-.1567266940527.tmp,当达到往HDFS中刷新的限制或者关掉监测后,tmp消失。 - 问题
发现HDFS中并没有/flume/190831/21/log-时间戳.tmp,查看log,发现报错了:
sink’s batch size is greater than the channels transaction capacity. Sink: k2, batch size = 1000, channel c2, transaction capacity = 100
sink的batchSize的容量要小于在channel内的transactionCapactiy,否则文件会溢出。
修改:a2.sinks.k2.hdfs.batchSize = 1000 -> a2.sinks.k2.hdfs.batchSize = 10
3. 案例三:实时读取目录文件到HDFS
监控目录
source:type–>spooldir
sink:type–>hdfs
- 创建并修改任务的配置文件 flume-dir.conf
#1 Agent
a3.sources = r3
a3.sinks = k3
a3.channels = c3
#2 source
#监控目录的类型
a3.sources.r3.type = spooldir
#监控目录的路径
a3.sources.r3.spoolDir = /opt/top_resources/flume_dir
#哪个文件上传hdfs,然后给这个文件添加一个后缀
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
#忽略所有以.tmp结尾的文件,不上传(可选)
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
# 3 sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://topnpl200:9000/flume/%H
#上传到hdfs的文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 600
#设置每个文件的滚动大小大概是128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a3.sinks.k3.hdfs.rollCount = 0
#最小副本数
a3.sinks.k3.hdfs.minBlockReplicas = 1
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
- 确认开启了hdfs
start-dfs.sh
- 启动
cd /usr/local/flume
bin/flume-ng agent --conf /conf --name a3 --conf-file jobconf/flume-dir.conf
# 或者
bin/flume-ng agent --conf conf/ --name a3 --conf-file jobconf/flume-dir.conf
- 测试
在/opt/top_resources/flume_dir下创建几个文件,发现HDFS中有了/flume/00/upload-.1567270075479.tmp。当达到往HDFS中刷新的限制或者关掉监测后,tmp消失。
4. 案例四:扇出
使用flume1监控文件变动,flume1将变动内容传递给flume-2,flume-2负责存储到HDFS。
同时flume1将变动内容传递给flume-3,flume-3负责输出到local。
以下在三个节点中配置,也可都放在bigdata111上。
- 在bigdata111上,创建flume1.conf,用于监控某文件的变动,同时产生两个channel和两个sink分别输送给flume2和flume3。
# 1.agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 将数据流复制给多个channel
a1.sources.r1.selector.type = replicating
# 2.source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/test
a1.sources.r1.shell = /bin/bash -c
# 3.sink1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = bigdata112
a1.sinks.k1.port = 4141
# sink2
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = bigdata113
a1.sinks.k2.port = 4141
# 4.channel—1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 4.channel—2
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
- 在bigdata112上,创建flume-2.conf,用于接收flume1的event,同时产生1个channel和1个sink,将数据输送给hdfs。
# 1 agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# 2 source
a2.sources.r1.type = avro
a2.sources.r1.bind = bigdata112
a2.sources.r1.port = 4141
# 3 sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://bigdata111:9000/flume2/%H
#上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照时间滚动文件夹
a2.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k1.hdfs.rollInterval = 600
#设置每个文件的滚动大小大概是128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a2.sinks.k1.hdfs.rollCount = 0
#最小副本数
a2.sinks.k1.hdfs.minBlockReplicas = 1
# 4 channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
#5 Bind
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
- 在bigdata113上,创建flume-3.conf,用于接收flume1的event,同时产生1个channel和1个sink,将数据输送给本地目录。
#1 agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1
# 2 source
a3.sources.r1.type = avro
a3.sources.r1.bind = bigdata113
a3.sources.r1.port = 4141
#3 sink
a3.sinks.k1.type = file_roll
#备注:此处的文件夹需要先创建好,如果该目录不存在,并不会创建新的目录。
a3.sinks.k1.sink.directory = /opt/flume3
# 4 channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
# 5 Bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
- 启动
bin/flume-ng agent --conf conf/ --name a1 --conf-file jobconf/flume1.conf
bin/flume-ng agent --conf conf/ --name a2 --conf-file jobconf/flume2.conf
bin/flume-ng agent --conf conf/ --name a3 --conf-file jobconf/flume3.conf
5. 案例五:扇入
flume11监控文件hive.log,flume-22监控某一个端口的数据流,flume11与flume-22将数据发送给flume-33,flume33将最终数据写入到HDFS。
- 在bigdata111上,创建flume11.conf,用于监控hive.log文件,同时sink数据到flume-33。
# 1 agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 2 source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/test
a1.sources.r1.shell = /bin/bash -c
# 3 sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = bigdata113
a1.sinks.k1.port = 4141
# 4 channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 5 Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
- 在bigdata112上,创建flume-22.conf,用于监控端口44444数据流,同时sink数据到flume-33。
# 1 agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# 2 source
a2.sources.r1.type = netcat
a2.sources.r1.bind = bigdata112
a2.sources.r1.port = 44444
#3 sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = bigdata113
a2.sinks.k1.port = 4141
# 4 channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# 5 Bind
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
- 在bigdata113上,创建flume33.conf,用于接收flume11与flume22发送过来的数据流,最终合并后sink到HDFS。
# 1 agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1
# 2 source
a3.sources.r1.type = avro
a3.sources.r1.bind = bigdata113
a3.sources.r1.port = 4141
# 3 sink
a3.sinks.k1.type = hdfs
a3.sinks.k1.hdfs.path = hdfs://bigdata111:9000/flume3/%H
#上传文件的前缀
a3.sinks.k1.hdfs.filePrefix = flume3-
#是否按照时间滚动文件夹
a3.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a3.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k1.hdfs.rollInterval = 600
#设置每个文件的滚动大小大概是128M
a3.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a3.sinks.k1.hdfs.rollCount = 0
#最小冗余数
a3.sinks.k1.hdfs.minBlockReplicas = 1
# 4 channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
# 5 Bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
- 启动
bin/flume-ng agent --conf conf/ --name a1 --conf-file jobconf/flume11.conf
bin/flume-ng agent --conf conf/ --name a2 --conf-file jobconf/flume22.conf
bin/flume-ng agent --conf conf/ --name a3 --conf-file jobconf/flume33.conf
6. 案例六:Flume拦截器
6.1 时间拦截器
功能:将时间戳放到event的header(Map<key,value>)
- 创建并修改任务的配置文件 flume-interceptors-timestamp.conf
#1.定义agent名, source、channel、sink的名称
a4.sources = r1
a4.channels = c1
a4.sinks = k1
#2.具体定义source
a4.sources.r1.type = spooldir
a4.sources.r1.spoolDir = /opt/top_resources/flume_test
#定义拦截器,为文件最后添加时间戳
a4.sources.r1.interceptors = i1
a4.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
#具体定义channel
a4.channels.c1.type = memory
a4.channels.c1.capacity = 10000
a4.channels.c1.transactionCapacity = 100
#具体定义sink
a4.sinks.k1.type = hdfs
a4.sinks.k1.hdfs.path = hdfs://topnpl200:9000/flume-interceptors/%H
a4.sinks.k1.hdfs.filePrefix = events-
a4.sinks.k1.hdfs.fileType = DataStream
#不按照条数生成文件
a4.sinks.k1.hdfs.rollCount = 0
#HDFS上的文件达到128M时生成一个文件
a4.sinks.k1.hdfs.rollSize = 134217728
#HDFS上的文件达到60秒生成一个文件
a4.sinks.k1.hdfs.rollInterval = 60
#组装source、channel、sink
a4.sources.r1.channels = c1
a4.sinks.k1.channel = c1
- 启动
bin/flume-ng agent -c conf/ -n a4 -f jobconf/flume-interceptors-timestamp.conf -Dflume.root.logger=INFO,console
6.2 主机名拦截器
功能:将主机名放到event的header(Map<key,value>)
- 创建并修改任务的配置文件 flume-interceptors-host.conf
#1.定义agent
a1.sources= r1
a1.sinks = k1
a1.channels = c1
#2.定义source
a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /opt/top_resources/flume_test.txt
#拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = host
#参数为true时用IP192.168.1.111,参数为false时用主机名,默认为true
a1.sources.r1.interceptors.i1.useIP = false
a1.sources.r1.interceptors.i1.hostHeader = agentHost
#3.定义sinks
a1.sinks.k1.type=hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = hdfs://topnpl200:9000/flume-interceptors/%{agentHost}
a1.sinks.k1.hdfs.filePrefix = during_%{agentHost}
#往生成的文件加后缀名.log
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
- 启动
bin/flume-ng agent -c conf/ -n a1 -f jobconf/flume-interceptors-host.conf -Dflume.root.logger=INFO,console
6.3 UUID拦截器
功能:将生成的UUID放到event的header(Map<key,value>)
- 创建并修改任务的配置文件 flume-interceptors-uuid.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /opt/top_resources/flume_test.txt
a1.sources.r1.interceptors = i1
#type的参数不能写成uuid,得写具体,否则找不到类
a1.sources.r1.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
#如果UUID头已经存在,它应该保存
a1.sources.r1.interceptors.i1.preserveExisting = true
a1.sources.r1.interceptors.i1.prefix = UUID_
#如果sink类型改为HDFS,那么在HDFS的文本中没有headers的信息数据
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
- 启动
bin/flume-ng agent -c conf/ -f jobconf/flume-interceptors-uuid.conf -n a1 -Dflume.root.logger==INFO,console
6.4 查询替换拦截器
- 创建并修改任务的配置文件 flume-interceptors-search-replace.conf
#1 agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#2 source
a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /opt/top_resources/flume_test.txt
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = search_replace
#遇到数字改成*,A123会替换为A***
a1.sources.r1.interceptors.i1.searchPattern = [0-9]+
a1.sources.r1.interceptors.i1.replaceString = ***
a1.sources.r1.interceptors.i1.charset = UTF-8
#3 sink
a1.sinks.k1.type = logger
#4 Chanel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#5 bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
- 启动
bin/flume-ng agent -c conf/ -f jobconf/flume-interceptors-search-replace.conf -n a1 -Dflume.root.logger=INFO,console
6.5 正则过滤拦截器
- 创建并修改任务的配置文件 flume-interceptors-filter.conf
#1 agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#2 source
a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /opt/top_resources/flume_test.txt
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = regex_filter
a1.sources.r1.interceptors.i1.regex = ^A.*
#如果excludeEvents设为true,则表示过滤掉以A开头的events(表示符合正则的)。
#如果excludeEvents设为false,表示过滤掉不是以A开头的events(表示不符合正则的)。
a1.sources.r1.interceptors.i1.excludeEvents = true
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
- 启动
bin/flume-ng agent -c conf/ -f jobconf/flume-interceptors-filter.conf -n a1 -Dflume.root.logger=INFO,console
6.6 正则抽取拦截器
- 创建并修改任务的配置文件 flume-interceptors-extractor.conf
#1 agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#2 source
a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /opt/top_resources/flume_test.txt
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = regex_extractor
a1.sources.r1.interceptors.i1.regex = hostname is (.*?) ip is (.*)
a1.sources.r1.interceptors.i1.serializers = s1 s2
a1.sources.r1.interceptors.i1.serializers.s1.name = cookieid
a1.sources.r1.interceptors.i1.serializers.s2.name = ip
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
- 启动
bin/flume-ng agent -c conf/ -f jobconf/flume-interceptors-extractor.conf -n a1 -Dflume.root.logger=INFO,console
- 测试
往flume_test.txt输入 hostname is topnpl200 ip is 39.104.119.200
控制台监控到如下:
2019-09-01 19:06:42,417 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{ip=39.104.119.200, cookieid=topnpl200} body: 68 6F 73 74 6E 61 6D 65 20 69 73 20 74 6F 70 6E hostname is topn }
注意:正则抽取拦截器的headers不会出现在文件名和文件内容中。只会在header中有想要获取的内容。
6.7 自定义拦截器
- 创建并修改任务的配置文件 flume-interceptors-custom.conf
#1.agent
a1.sources = r1
a1.sinks =k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/top_resources/flume_test.txt
a1.sources.r1.interceptors = i1
#全类名$Builder
a1.sources.r1.interceptors.i1.type = com.during.flume.ConvertInterceptor$Builder
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume-interceptors/custom-interceptors
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.rollInterval = 3
a1.sinks.k1.hdfs.rollSize = 20
a1.sinks.k1.hdfs.rollCount = 5
a1.sinks.k1.hdfs.batchSize = 1
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#生成的文件类型,默认是 Sequencefile,可用 DataStream,则为普通文本
a1.sinks.k1.hdfs.fileType = DataStream
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
- Java程序
package com.during.flume;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.ArrayList;
import java.util.List;
public class MyInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public void close() {
}
/**
* 拦截source发送到通道channel中的消息
*
* @param event 接收过滤的event
* @return event 根据业务处理后的event
*/
@Override
public Event intercept(Event event) {
// 获取事件对象中的字节数据
byte[] arr = event.getBody();
// 将获取的数据转换成大写
event.setBody(new String(arr).toUpperCase().getBytes());
// 返回到消息中
return event;
}
// 接收被过滤事件集合
@Override
public List<Event> intercept(List<Event> events) {
List<Event> list = new ArrayList<Event>();
for (Event event : events) {
list.add(intercept(event));
}
return list;
}
public static class Builder implements Interceptor.Builder {
// 获取配置文件的属性
@Override
public Interceptor build() {
return new MyInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
- 启动
jar包放在/usr/local/flume
bin/flume-ng agent -c conf/ -n a1 -f jobconf/flume-interceptors-custom.conf -C ./during-1.0-SNAPSHOT.jar -Dflume.root.logger=DEBUG,console
7. 案例七:Flume对接Kafka
- 创建并修改任务的配置文件 flume2kafka.conf
# define
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F -c +0 /opt/jars/calllog.csv
a1.sources.r1.shell = /bin/bash -c
# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.brokerList = bigdata111:9092,bigdata112:9092,bigdata113:9092
a1.sinks.k1.topic = calllog
a1.sinks.k1.batchSize = 20
a1.sinks.k1.requiredAcks = 1
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
- 启动
/opt/module/flume-1.8.0/bin/flume-ng agent --conf /opt/module/flume-1.8.0/conf/ --name a1 --conf-file /opt/jars/flume2kafka.conf
8. 案例八:Kafka对接Flume
- 创建并修改任务的配置文件 kafka2flume.conf
agent.sources = kafkaSource
agent.channels = memoryChannel
agent.sinks = hdfsSink
# The channel can be defined as follows.
agent.sources.kafkaSource.channels = memoryChannel
agent.sources.kafkaSource.type=org.apache.flume.source.kafka.KafkaSource
agent.sources.kafkaSource.zookeeperConnect=bigdata111:2181,bigdata112:2181,bigdata113:2181
agent.sources.kafkaSource.topic=calllog
#agent.sources.kafkaSource.groupId=flume
agent.sources.kafkaSource.kafka.consumer.timeout.ms=100
agent.channels.memoryChannel.type=memory
agent.channels.memoryChannel.capacity=10000
agent.channels.memoryChannel.transactionCapacity=1000
# the sink of hdfs
agent.sinks.hdfsSink.type=hdfs
agent.sinks.hdfsSink.channel = memoryChannel
agent.sinks.hdfsSink.hdfs.path=hdfs://bigdata111:9000/kafka2flume
agent.sinks.hdfsSink.hdfs.writeFormat=Text
agent.sinks.hdfsSink.hdfs.fileType=DataStream
#这两个不配置,会产生大量的小文件
agent.sinks.hdfsSink.hdfs.rollSize=0
agent.sinks.hdfsSink.hdfs.rollCount=0
- 启动
bin/flume-ng agent --conf conf --conf-file jobconf/kafka2flume.conf --name agent -Dflume.root.logger=INFO,console