当前位置: 首页>前端>正文

Flume(三)—— Flume案例


文章目录

  • 1. 案例一:监控端口数据
  • 2. 案例二:监测(实时读取)本地文件到HDFS
  • 3. 案例三:实时读取目录文件到HDFS
  • 4. 案例四:扇出
  • 5. 案例五:扇入
  • 6. 案例六:Flume拦截器
  • 6.1 时间拦截器
  • 6.2 主机名拦截器
  • 6.3 UUID拦截器
  • 6.4 查询替换拦截器
  • 6.5 正则过滤拦截器
  • 6.6 正则抽取拦截器
  • 6.7 自定义拦截器
  • 7. 案例七:Flume对接Kafka
  • 8. 案例八:Kafka对接Flume


所有任务的配置文件都放在/usr/local/flume/jobconf下

启动命令:

bin/flume-ng agent --conf conf/ --name a3 --conf-file myconf/flume33.conf  
bin/flume-ng agent -c conf -n a3 -f myconf/flume33.conf

Source、Channel、Sink类型总结:

Flume(三)—— Flume案例,Flume(三)—— Flume案例_HDFS,第1张

1. 案例一:监控端口数据

Flume监控一端Console,另一端Console发送消息,使被监控端实时显示。

监控端口source:type–>netcat
日志打印sink:type–>logger

  1. 创建并修改任务的配置文件 flume-telnet.conf
#定义Agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#定义netcatsource
a1.sources.r1.type = netcat
a1.sources.r1.bind = topnpl200
a1.sources.r1.port = 44445

# 定义sink
a1.sinks.k1.type = logger

# 定义channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 双向链接
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  1. 启动:
/usr/local/flume/bin/flume-ng agent --conf /usr/local/flume/conf/ --name a1 --conf-file /usr/local/flume/jobconf/flume-telnet.conf -Dflume.root.logger==INFO,console
  1. 测试:
telnet topnpl200 44445

输入 hello
日志中能监测到

# 判断44445端口是否被占用
netstat -tunlp | grep 44445

2. 案例二:监测(实时读取)本地文件到HDFS

监控文件(通过Shell命令)

source:type–>exec
sink:type–>hdfs

  1. 创建并修改任务的配置文件 flume-hdfs.conf
# 1 agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2

# 2 source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/top_resources/flume_test
a2.sources.r2.shell = /bin/bash -c

# 3 sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://topnpl200:9000/flume/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a2.sinks.k2.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k2.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a2.sinks.k2.hdfs.batchSize = 10
#设置文件类型,可支持压缩
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一个新的文件(单位:秒)
a2.sinks.k2.hdfs.rollInterval = 600
#设置每个文件的滚动大小(单位:字节)128M
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a2.sinks.k2.hdfs.rollCount = 0
#最小副本数
a2.sinks.k2.hdfs.minBlockReplicas = 1

# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
  1. 确认开启了HDFS
start-dfs.sh
  1. 启动
cd /usr/local/flume

# 用相对路径启动
bin/flume-ng agent \
--conf conf/ \
--name a2 \
--conf-file jobconf/flume-hdfs.conf

# 绝对路径启动(用这个吧,相对路径启动目前有问题)
/usr/local/flume/bin/flume-ng agent --conf /usr/local/flume/conf/ --name a2 --conf-file /usr/local/flume/jobconf/flume-hdfs.conf
  1. 测试
    往/opt/top_resources/flume_test文件里写数据 echo aaa >> flume_test(如果要清空flume_test里的数据,echo > flume_test),发现HDFS中有了/flume/20190831/23/logs-.1567266940527.tmp,当达到往HDFS中刷新的限制或者关掉监测后,tmp消失。
  2. 问题
    发现HDFS中并没有/flume/190831/21/log-时间戳.tmp,查看log,发现报错了:
    sink’s batch size is greater than the channels transaction capacity. Sink: k2, batch size = 1000, channel c2, transaction capacity = 100
    sink的batchSize的容量要小于在channel内的transactionCapactiy,否则文件会溢出。
    修改:a2.sinks.k2.hdfs.batchSize = 1000 -> a2.sinks.k2.hdfs.batchSize = 10

3. 案例三:实时读取目录文件到HDFS

监控目录

source:type–>spooldir
sink:type–>hdfs

  1. 创建并修改任务的配置文件 flume-dir.conf
#1 Agent
a3.sources = r3
a3.sinks = k3
a3.channels = c3

#2 source
#监控目录的类型
a3.sources.r3.type = spooldir
#监控目录的路径
  a3.sources.r3.spoolDir = /opt/top_resources/flume_dir
#哪个文件上传hdfs,然后给这个文件添加一个后缀
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
#忽略所有以.tmp结尾的文件,不上传(可选)
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)

# 3 sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://topnpl200:9000/flume/%H
#上传到hdfs的文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 600
#设置每个文件的滚动大小大概是128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a3.sinks.k3.hdfs.rollCount = 0
#最小副本数
a3.sinks.k3.hdfs.minBlockReplicas = 1

# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
  1. 确认开启了hdfs
start-dfs.sh
  1. 启动
cd /usr/local/flume
bin/flume-ng agent --conf /conf --name a3 --conf-file jobconf/flume-dir.conf 
# 或者
bin/flume-ng agent --conf conf/ --name a3 --conf-file jobconf/flume-dir.conf
  1. 测试
    在/opt/top_resources/flume_dir下创建几个文件,发现HDFS中有了/flume/00/upload-.1567270075479.tmp。当达到往HDFS中刷新的限制或者关掉监测后,tmp消失。

4. 案例四:扇出

使用flume1监控文件变动,flume1将变动内容传递给flume-2,flume-2负责存储到HDFS。
同时flume1将变动内容传递给flume-3,flume-3负责输出到local。
以下在三个节点中配置,也可都放在bigdata111上。

Flume(三)—— Flume案例,Flume(三)—— Flume案例_HDFS_02,第2张

  1. 在bigdata111上,创建flume1.conf,用于监控某文件的变动,同时产生两个channel和两个sink分别输送给flume2和flume3。
# 1.agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 将数据流复制给多个channel
a1.sources.r1.selector.type = replicating

# 2.source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/test
a1.sources.r1.shell = /bin/bash -c

# 3.sink1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = bigdata112
a1.sinks.k1.port = 4141

# sink2
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = bigdata113
a1.sinks.k2.port = 4141

# 4.channel—1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 4.channel—2
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
  1. 在bigdata112上,创建flume-2.conf,用于接收flume1的event,同时产生1个channel和1个sink,将数据输送给hdfs。
# 1 agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# 2 source
a2.sources.r1.type = avro
a2.sources.r1.bind = bigdata112
a2.sources.r1.port = 4141

# 3 sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://bigdata111:9000/flume2/%H
#上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照时间滚动文件夹
a2.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k1.hdfs.rollInterval = 600
#设置每个文件的滚动大小大概是128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a2.sinks.k1.hdfs.rollCount = 0
#最小副本数
a2.sinks.k1.hdfs.minBlockReplicas = 1

# 4 channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

#5 Bind 
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
  1. 在bigdata113上,创建flume-3.conf,用于接收flume1的event,同时产生1个channel和1个sink,将数据输送给本地目录。
#1 agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1

# 2 source
a3.sources.r1.type = avro
a3.sources.r1.bind = bigdata113
a3.sources.r1.port = 4141

#3 sink
a3.sinks.k1.type = file_roll
#备注:此处的文件夹需要先创建好,如果该目录不存在,并不会创建新的目录。
a3.sinks.k1.sink.directory = /opt/flume3

# 4 channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

# 5 Bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
  1. 启动
bin/flume-ng agent --conf conf/ --name a1 --conf-file jobconf/flume1.conf
bin/flume-ng agent --conf conf/ --name a2 --conf-file jobconf/flume2.conf
bin/flume-ng agent --conf conf/ --name a3 --conf-file jobconf/flume3.conf

5. 案例五:扇入

flume11监控文件hive.log,flume-22监控某一个端口的数据流,flume11与flume-22将数据发送给flume-33,flume33将最终数据写入到HDFS。

Flume(三)—— Flume案例,Flume(三)—— Flume案例_flume_03,第3张

  1. 在bigdata111上,创建flume11.conf,用于监控hive.log文件,同时sink数据到flume-33。
# 1 agent
 a1.sources = r1
 a1.sinks = k1
 a1.channels = c1
 
 # 2 source
 a1.sources.r1.type = exec
 a1.sources.r1.command = tail -F /opt/test
 a1.sources.r1.shell = /bin/bash -c
 
 # 3 sink
 a1.sinks.k1.type = avro
 a1.sinks.k1.hostname = bigdata113
 a1.sinks.k1.port = 4141
 
 # 4 channel
 a1.channels.c1.type = memory
 a1.channels.c1.capacity = 1000
 a1.channels.c1.transactionCapacity = 100
 
 # 5 Bind
 a1.sources.r1.channels = c1
 a1.sinks.k1.channel = c1
  1. 在bigdata112上,创建flume-22.conf,用于监控端口44444数据流,同时sink数据到flume-33。
# 1 agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# 2 source
a2.sources.r1.type = netcat
a2.sources.r1.bind = bigdata112
a2.sources.r1.port = 44444

#3 sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = bigdata113
a2.sinks.k1.port = 4141

# 4 channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# 5 Bind
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
  1. 在bigdata113上,创建flume33.conf,用于接收flume11与flume22发送过来的数据流,最终合并后sink到HDFS。
# 1 agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1

# 2 source
a3.sources.r1.type = avro
a3.sources.r1.bind = bigdata113
a3.sources.r1.port = 4141

# 3 sink
a3.sinks.k1.type = hdfs
a3.sinks.k1.hdfs.path = hdfs://bigdata111:9000/flume3/%H
#上传文件的前缀
a3.sinks.k1.hdfs.filePrefix = flume3-
#是否按照时间滚动文件夹
a3.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a3.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k1.hdfs.rollInterval = 600	
#设置每个文件的滚动大小大概是128M
a3.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a3.sinks.k1.hdfs.rollCount = 0
#最小冗余数
a3.sinks.k1.hdfs.minBlockReplicas = 1

# 4 channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

# 5 Bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
  1. 启动
bin/flume-ng agent --conf conf/ --name a1 --conf-file jobconf/flume11.conf
bin/flume-ng agent --conf conf/ --name a2 --conf-file jobconf/flume22.conf
bin/flume-ng agent --conf conf/ --name a3 --conf-file jobconf/flume33.conf

6. 案例六:Flume拦截器

Flume(三)—— Flume案例,Flume(三)—— Flume案例_hdfs_04,第4张

6.1 时间拦截器

功能:将时间戳放到event的header(Map<key,value>)

  1. 创建并修改任务的配置文件 flume-interceptors-timestamp.conf
#1.定义agent名, source、channel、sink的名称
a4.sources = r1
a4.channels = c1
a4.sinks = k1

#2.具体定义source
a4.sources.r1.type = spooldir
a4.sources.r1.spoolDir = /opt/top_resources/flume_test

#定义拦截器,为文件最后添加时间戳
a4.sources.r1.interceptors = i1
a4.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder

#具体定义channel
a4.channels.c1.type = memory
a4.channels.c1.capacity = 10000
a4.channels.c1.transactionCapacity = 100

#具体定义sink
a4.sinks.k1.type = hdfs
a4.sinks.k1.hdfs.path = hdfs://topnpl200:9000/flume-interceptors/%H
a4.sinks.k1.hdfs.filePrefix = events-
a4.sinks.k1.hdfs.fileType = DataStream
#不按照条数生成文件
a4.sinks.k1.hdfs.rollCount = 0
#HDFS上的文件达到128M时生成一个文件
a4.sinks.k1.hdfs.rollSize = 134217728
#HDFS上的文件达到60秒生成一个文件
a4.sinks.k1.hdfs.rollInterval = 60

#组装source、channel、sink
a4.sources.r1.channels = c1
a4.sinks.k1.channel = c1
  1. 启动
bin/flume-ng agent -c conf/ -n a4 -f jobconf/flume-interceptors-timestamp.conf -Dflume.root.logger=INFO,console

6.2 主机名拦截器

功能:将主机名放到event的header(Map<key,value>)

  1. 创建并修改任务的配置文件 flume-interceptors-host.conf
#1.定义agent
a1.sources= r1
a1.sinks = k1
a1.channels = c1

#2.定义source
a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /opt/top_resources/flume_test.txt

#拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = host
#参数为true时用IP192.168.1.111,参数为false时用主机名,默认为true
a1.sources.r1.interceptors.i1.useIP = false
a1.sources.r1.interceptors.i1.hostHeader = agentHost

 #3.定义sinks
a1.sinks.k1.type=hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = hdfs://topnpl200:9000/flume-interceptors/%{agentHost}
a1.sinks.k1.hdfs.filePrefix = during_%{agentHost}
#往生成的文件加后缀名.log
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.useLocalTimeStamp = true
 
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
 
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  1. 启动
bin/flume-ng agent -c conf/ -n a1 -f jobconf/flume-interceptors-host.conf -Dflume.root.logger=INFO,console

6.3 UUID拦截器

功能:将生成的UUID放到event的header(Map<key,value>)

  1. 创建并修改任务的配置文件 flume-interceptors-uuid.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /opt/top_resources/flume_test.txt

a1.sources.r1.interceptors = i1
#type的参数不能写成uuid,得写具体,否则找不到类
a1.sources.r1.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
#如果UUID头已经存在,它应该保存
a1.sources.r1.interceptors.i1.preserveExisting = true
a1.sources.r1.interceptors.i1.prefix = UUID_

#如果sink类型改为HDFS,那么在HDFS的文本中没有headers的信息数据
a1.sinks.k1.type = logger

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  1. 启动
bin/flume-ng agent -c conf/ -f jobconf/flume-interceptors-uuid.conf -n a1 -Dflume.root.logger==INFO,console

6.4 查询替换拦截器

  1. 创建并修改任务的配置文件 flume-interceptors-search-replace.conf
#1 agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#2 source
a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /opt/top_resources/flume_test.txt
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = search_replace

#遇到数字改成*,A123会替换为A***
a1.sources.r1.interceptors.i1.searchPattern = [0-9]+
a1.sources.r1.interceptors.i1.replaceString = ***
a1.sources.r1.interceptors.i1.charset = UTF-8

#3 sink
a1.sinks.k1.type = logger

#4 Chanel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#5 bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  1. 启动
bin/flume-ng agent -c conf/ -f jobconf/flume-interceptors-search-replace.conf -n a1 -Dflume.root.logger=INFO,console

6.5 正则过滤拦截器

  1. 创建并修改任务的配置文件 flume-interceptors-filter.conf
#1 agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#2 source
a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /opt/top_resources/flume_test.txt
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = regex_filter
a1.sources.r1.interceptors.i1.regex = ^A.*
#如果excludeEvents设为true,则表示过滤掉以A开头的events(表示符合正则的)。
#如果excludeEvents设为false,表示过滤掉不是以A开头的events(表示不符合正则的)。
a1.sources.r1.interceptors.i1.excludeEvents = true

a1.sinks.k1.type = logger

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  1. 启动
bin/flume-ng agent -c conf/ -f jobconf/flume-interceptors-filter.conf -n a1 -Dflume.root.logger=INFO,console

6.6 正则抽取拦截器

  1. 创建并修改任务的配置文件 flume-interceptors-extractor.conf
#1 agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#2 source
a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /opt/top_resources/flume_test.txt
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = regex_extractor
a1.sources.r1.interceptors.i1.regex = hostname is (.*?) ip is (.*)
a1.sources.r1.interceptors.i1.serializers = s1 s2
a1.sources.r1.interceptors.i1.serializers.s1.name = cookieid
a1.sources.r1.interceptors.i1.serializers.s2.name = ip

a1.sinks.k1.type = logger

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  1. 启动
bin/flume-ng agent -c conf/ -f jobconf/flume-interceptors-extractor.conf -n a1 -Dflume.root.logger=INFO,console
  1. 测试
    往flume_test.txt输入 hostname is topnpl200 ip is 39.104.119.200
    控制台监控到如下:
2019-09-01 19:06:42,417 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{ip=39.104.119.200, cookieid=topnpl200} body: 68 6F 73 74 6E 61 6D 65 20 69 73 20 74 6F 70 6E hostname is topn }

注意:正则抽取拦截器的headers不会出现在文件名和文件内容中。只会在header中有想要获取的内容。

6.7 自定义拦截器

  1. 创建并修改任务的配置文件 flume-interceptors-custom.conf
#1.agent 
a1.sources = r1
a1.sinks =k1
a1.channels = c1
 
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/top_resources/flume_test.txt
a1.sources.r1.interceptors = i1
#全类名$Builder
a1.sources.r1.interceptors.i1.type = com.during.flume.ConvertInterceptor$Builder
 
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume-interceptors/custom-interceptors
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.rollInterval = 3
a1.sinks.k1.hdfs.rollSize = 20
a1.sinks.k1.hdfs.rollCount = 5
a1.sinks.k1.hdfs.batchSize = 1
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#生成的文件类型,默认是 Sequencefile,可用 DataStream,则为普通文本
a1.sinks.k1.hdfs.fileType = DataStream
 
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
 
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  1. Java程序
package com.during.flume;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.ArrayList;
import java.util.List;

public class MyInterceptor implements Interceptor {
	@Override
	public void initialize() {
	}

	@Override
	public void close() {
	}

	/**
	 * 拦截source发送到通道channel中的消息
	 *
	 * @param event 	接收过滤的event
	 * @return event  根据业务处理后的event
	 */
	@Override
	public Event intercept(Event event) {
	    // 获取事件对象中的字节数据
	    byte[] arr = event.getBody();
	    // 将获取的数据转换成大写
	    event.setBody(new String(arr).toUpperCase().getBytes());
	    // 返回到消息中
	    return event;
	}

	// 接收被过滤事件集合
	@Override
	public List<Event> intercept(List<Event> events) {
	    List<Event> list = new ArrayList<Event>();
	    for (Event event : events) {
	        list.add(intercept(event));
	    }
	    return list;
	}

	public static class Builder implements Interceptor.Builder {
	    // 获取配置文件的属性
	    @Override
	    public Interceptor build() {
	        return new MyInterceptor();
	    }

	    @Override
	    public void configure(Context context) {

	    }
	}
}
  1. 启动
    jar包放在/usr/local/flume
bin/flume-ng agent -c conf/ -n a1 -f jobconf/flume-interceptors-custom.conf -C ./during-1.0-SNAPSHOT.jar -Dflume.root.logger=DEBUG,console

7. 案例七:Flume对接Kafka

  1. 创建并修改任务的配置文件 flume2kafka.conf
# define
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F -c +0 /opt/jars/calllog.csv
a1.sources.r1.shell = /bin/bash -c

# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.brokerList = bigdata111:9092,bigdata112:9092,bigdata113:9092
a1.sinks.k1.topic = calllog
a1.sinks.k1.batchSize = 20
a1.sinks.k1.requiredAcks = 1

# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  1. 启动
/opt/module/flume-1.8.0/bin/flume-ng agent --conf /opt/module/flume-1.8.0/conf/ --name a1 --conf-file /opt/jars/flume2kafka.conf

8. 案例八:Kafka对接Flume

  1. 创建并修改任务的配置文件 kafka2flume.conf
agent.sources = kafkaSource
agent.channels = memoryChannel
agent.sinks = hdfsSink

# The channel can be defined as follows.
agent.sources.kafkaSource.channels = memoryChannel
agent.sources.kafkaSource.type=org.apache.flume.source.kafka.KafkaSource
agent.sources.kafkaSource.zookeeperConnect=bigdata111:2181,bigdata112:2181,bigdata113:2181
agent.sources.kafkaSource.topic=calllog
#agent.sources.kafkaSource.groupId=flume
agent.sources.kafkaSource.kafka.consumer.timeout.ms=100

agent.channels.memoryChannel.type=memory
agent.channels.memoryChannel.capacity=10000
agent.channels.memoryChannel.transactionCapacity=1000

# the sink of hdfs
agent.sinks.hdfsSink.type=hdfs
agent.sinks.hdfsSink.channel = memoryChannel
agent.sinks.hdfsSink.hdfs.path=hdfs://bigdata111:9000/kafka2flume
agent.sinks.hdfsSink.hdfs.writeFormat=Text
agent.sinks.hdfsSink.hdfs.fileType=DataStream
#这两个不配置,会产生大量的小文件
agent.sinks.hdfsSink.hdfs.rollSize=0
agent.sinks.hdfsSink.hdfs.rollCount=0
  1. 启动
bin/flume-ng agent --conf conf --conf-file jobconf/kafka2flume.conf --name agent -Dflume.root.logger=INFO,console



https://www.xamrdz.com/web/2um1926582.html

相关文章: