目录
素材
一、Flume的概述
1、Flume的认识
2、Flume的运行机制
(1)Source(数据采集器)
(2)Channel(缓冲通道)
(3)Sink(接收器)
3、Flume的日志采集系统结构
(1)简单结构
(2)复杂结构
二、Flume的基本使用
1、系统要求
2、Flume安装
(1)下载Flume
(2)解压
(3)重命名
(4)配置Flume环境
3、Flume的入门使用
(1)配置Flume采集方案
(2)指定采集方案启动Flume
(3)Flume数据采集测试
三、Flume采集方案配置说明
1、Flume Source
(1)Avro Source
(2)Spooling Directory Source
(3)Taildir Source
(4)HTTP Source
2、Flume channel
(1)Memory Channel
(2)File channel
3、Flume Sinks
(1)HDFS Sink
(2)Logger Sink
(3)Avro Sink
四、Flume的可靠性保证
1、负载均衡
(1)搭建并配置Flume机器
(2)配置Flume采集方案
a、exec-avro.conf
b、netcat-logger.conf
(2)启动Flume系统
(3)Flume系统负载均衡测试
2、故障转移
(1)配置Flume采集方案
a、avro-logger-memory.conf
b、exec-avro-failover.conf
(2)启动Flume系统
(3)Flume系统故障转移测试
五、Flume拦截器
1、Timestamp interceptor
2、 Static interceptor
3、Search and Replace Interceptor
六、案例——日志采集
1、配置采集方案
(1)exec-avro_logCollection.conf
(2)avro-hdfs_logCollection.conf
2、启动hadoop集群
3、启动Flume系统
4、 日志采集系统测试
参考书籍
素材
http://链接: https://pan.baidu.com/s/19cSqan67QhB_x3vdnXsANQ?pwd=fh35 提取码: fh35http://xn--gzu811i//pan.baidu.com/s/19cSqan67QhB_x3vdnXsANQ?pwd=fh35%20%E6%8F%90%E5%8F%96%E7%A0%81:%20fh35
一、Flume的概述
1、Flume的认识
Flume原是Cloudera公司提供的一个高可用的、高可靠的、分布式海量日志采集、聚合和传输系统,而后纳人到了Apache旗下,作为一个顶级开源项目。Apache Flume不仅只限日志数据的采集,由于Flume 采集的数据源是可定制的,因此Flume 还可用于传输大量事件数据,包括但不限于网络流量数据、社交媒体生成的数据、电子邮件消息以及几乎任可能的数据源。
当前 Flume 分为两个版本:Flume 0.9x版本,统称Flume-og(original generation)和 Flume 1.x版本,统称Flumeng(nextgeneration)。由于早期的Flume-og存在设计不合理、代码臃肿、不易扩展等问题,因此在Flume纳入到Apache旗下后,开发人员对Clouden Flume 的代码进行了重构,同时对 Flume 功能进行了补充和加强,并重命名为 Apache Flume于是就出现了 Flume-ng 与Flume-og两种截然不同的版本。而在实际开发中,多数使用目前比较流行的Flumeng 版本进行 Flume 开发。
2、Flume的运行机制
Flume的核心是把数据从数据源(Web Server)通过数据采集器(Source)收集过来,再将收集的数据通过缓冲通道(Channel)汇集到指定的接收器(Sink)。可参考下面官方文档图片。
Flume 基本架构中有一个 Agent(代理),它是Flume 的核心角色,meAgent是一个JVM进程,它承载着数据从外部源流向下一个目标的3个核心组件Source、Channel、Sink。
(1)Source(数据采集器)
用于源数据的采集,从一个Web服务器采集源数据,然后将采集到的数据写入到 Channel 中并流向 Sink;
(2)Channel(缓冲通道)
底层是一个缓冲队列,对Source中的数据进行缓存,将数据高效,准确地写人 Sink,待数据全部到达 Sink后,Flume 就会删除该缓存通道中的数据;
(3)Sink(接收器)
接收并汇集流向 Sink 所有数据,根据需求,可以直接进行集中式存储(采用HDFS进行存储),也可以继续作为数据源传人其他远程服务器或者 Source 中。
在整个数据传输的过程中,Flume将流动的数据封装到一个event(事件)中,它是 Flume 内部数据传输的基本单元。一个完整的event 包含 headers和body,其中 headers 包含了一些标识信息,而body中就是Flume 收集到的数据信息。
3、Flume的日志采集系统结构
(1)简单结构
当需要采集数据的生产源比较单一、简单时,可以直接使用一个 Agent 来进行数据采集并最终存储。
(2)复杂结构
当需要采集数据的数据源分布在不同的服务器上时,使用一个Agent 进行数据采集不再适用,这时就可以根据业务需要部署多个 Agent 进行数据采集并最终存储。也就是说,对每一个需要收集数据的 Web 服务端都搭建了一个 Agent 进行数据采集,接着再将这多个 Agent 中的数据作为下一个 Agent 的 Source 进行采集并最终集中存储到 HDFS 中。除此之外,在开发过程工作中可能遇到从同一个服务端采集数据,然后通过多路复用流分别传输并存储到不同目的地情况,根据具体需求,将一个 Agent 采集的数据通过不同的 Channel 分别流向了不同的 Sink,然后再进行下一阶段的传输或存储。
二、Flume的基本使用
1、系统要求
作为 Apache 旗下的一个顶级项目,想要使用 Flume 进行开发,必须满足一定的系统要求,这里以官方说明为准,具体要求如下。
(1)安装Java 1.8 或更高版本 Java 运行环境(针对本次使用的 Flume 1.8版本);
(2)为Source(数据采集器)、Channel(缓冲通道)和Sink(接收器)的配置提供足够的内存空间;
(3)为Channel(缓冲通道)和 Sink(接收器)的配置提供足够的磁盘空间;
(4)保证Agent(代理)对要操作的目录有读写权限。
上述系统要求中,Java运行环境的版本与将要安装使用的Flume版本是对应的,如果使用Flume 1.6 版本,则要求使用Java 1.6及以上运行环境,由于本章后续将以Flume 1.8.0 为准,所以要求安装 Java 1.8 及以上运行环境。
2、Flume安装
(1)下载Flume
下载flume到 /export/software/ 目录中
https://dlcdn.apache.org/flume/1.8.0/apache-flume-1.8.0-bin.tar.gz
(2)解压
进入目录/export/software/,执行命令
tar -xzvf apache-flume-1.8.0-bin.tar.gz -C /export/servers/
(3)重命名
进入目录/export/servers/,执行命令
mv apache-flume-1.8.0-bin flume
(4)配置Flume环境
配置 flume-env.sh
cd /export/servers/flume/conf
cp flume-env.sh.template flume-env.sh
vi flume-env.sh #编辑文件,增加如下行
export JAVA_HOME=/export/servers/jdk
配置 /etc/profile
vi /etc/profile #编辑文件,增加如下行
export FLUME_HOME=/export/servers/flume
export PATH=$PATH:$FLUME_HOME/bin
3、Flume的入门使用
(1)配置Flume采集方案
在/export/servers/flume/conf的目录下配置netcat-logger.conf,相关代码如下
# 示例配置方案: 单节点 Flume 配置
#为agent各组件命名
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 描述并配置sources组件(数据类型、采集数据源的应用地址)
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# 描述并配置sinks组件(采集后数据流出的类型)
a1.sinks.k1.type = logger
# 描述并配置channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 将source 和 sink 通过同一个channel连接绑定
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a、采集方案的名称可以自定义,但为了方便管理和使用,通常会根据数据源类型和收集的结果类型进行命名。如 netcat-logger.conf 表示采集 netcat 类型数据源并最终作为 logger 日志信息收集。
b、采集方案文件的位置可以自定义存放,在使用的时候会要求指定配置方案的具体位置,为了方便统一管理,通常会将采集方案统一存放。如本案例中,会将所有自定义的采集方案文件保存在/export/servers/flume/conf目录下。
c、采集方案中的sources、channels、sinks是在具体编写时根据业务需求进行配置的,不能随意定义。Flume支持采集的数据类型可以通过查看官网进行详细了解(地址https://flume.apache.org/FlumeUserGuide.html ),同时针对不同的 sources type、channelstype和 sinks type 需要编写不同的配置属性。
注意:配置采集方案中,在编写Source、Sink 与Channel 关联绑定时特别容易出错,如文件netcat-logger.conf中所示的al.sources.rl.channels = c1 和 al.sinks. kl.channel = cl, sources 的 channels 比 sinks 的 channel 多了一个s。这是因为,在一个 Agent 中,同一个Source 可以有多个Channel,所以配置时使用 channels(channel 的复数形式);而同一个 Sink 只能为一个 Channel 服务,所以配置时必须使用 Channel。
(2)指定采集方案启动Flume
进入 /export/servers/flume 目录,使用指定采集方案启动FLume
flume-ng agent --conf conf/ --conf-file conf/netcat-logger.conf --name a1 -Dflume.root.logger=INFO,console
执行上述指令后,就会使用前面编写的采集方案 netcat-logger.conf来启动Flume.该 Flume系统会根据采集方案的配置监听当前主机 localhost下44444 端口发送的neteat类型源数据,并将信息收集接收到类型为 logger 的 Sink中。
接下来,对上述指令中的各部分内容进行说明,具体如下所示。
a、flume-ng agent:表示使用flume-ng启动一个agent;
b、-confconf/:-conf选项指定了Flume自带的配置文件路径,可用-c简写格式;
c、-conf-file conf/netcat-logger.conf:-conf-file 选项指定了开发者编写的采集方案,可用-f简写格式,需要注意配置文件所在路径,建议读者使用绝对路径指定采集方案,否则将提示文件不存在的错误;
d、-nameal:表示启动的agent名称为al,该名称al 必须与采集方案中agent的名称保持一致;
e、-Dflume.root.logger=INFO,console:表示将采集处理后的信息通过logger日志的信息输出到控制台进行展示。
(3)Flume数据采集测试
如果出现”-bash: telnet: command not found",请使用以下指令安装telnet工具。
yum -y install telnet
使用telnet 连接到本地主机 localhost端口44444,用来持续发送数据信息作为Flume将要采集的源数据。
telnet localhost 44444
登录后发送信息:
hello
OK(收到信息)
world
OK(收到信息)
三、Flume采集方案配置说明
1、Flume Source
在编写 Flume 采集方案时,首先必须明确的是采集的数据源类型、出处;接着,根据这些信息与Flume 已提供的支持的 Flume Source 进行匹配,选择对应的数据采集器类型(source. type);然后,再根据选择的数据采集器类型,匹配必要和非必要的数据采集器属性。Flume提供的并支持的 Flume Source 有很多,具体的可前往官网查看https://flume.apache.org/FlumeUserGuide.html#flume-sources
https://flume.apache.org/FlumeUserGuide.html#flume-sources 这里列举一些常用的 Flume Source。(1)Avro Source
监听 Avro 端口并从外部 Avro 客户端流中接收 event 数据,当与另外一个 Flume Agent 上的 Avro Sink 配对时,它可以创建分层集合拓扑,利用 Avro Source 可以实现多级流动、扇出流、扇入流等效果。
Avro Source常用属性(加粗部分为必须属性)
属性名称 | 默认值 | 说明 |
channels | —— | |
type | —— | 组件类型名必须是 avro |
bind | —— | 要监听的主机名或 IP 地址 |
port | —— | 要监听的服务端口 |
threads | —— | 要生成的工作线程的最大数目 |
ssl | false | 将此设置为 true 以启用 SSL 加密,则必须指定 keystore 和 keystore-password |
keystore | —— | SSL 所必需的通往 Java 密钥存储路径 |
keystore-password | —— | SSL 所必需的 Java 密钥存储的密码 |
使用 Avro Source 采集器配置一个名称为 a1 的 Agent。
a1.sources=r1
a1.channels=c1
a1.sources.r1.type=avro
a1.sources.r1.channels=c1
a1.sources.r1.bind=0.0.0.0
a1.sources.r1.port=4141
(2)Spooling Directory Source
Spooling Directory Source 允许对指定磁盘上的文件目录进行监控来提取数据,它将查看文件的指定目录的新增文件,并将文件中的数据读取出来。
Spooling Directory Source常用属性(加粗部分为必须属性)
属性名称 | 默认值 | 说明 |
channels | —— | |
type | —— | 组件类型名必须是spooldir |
spoolDir | —— | 从中读取文件的目录 |
fileSuffix | . COMPLETED | 附加到完全摄取的文件后缀 |
deletePolicy | never | 何时删除已完成的文件:never 或 immediate |
fileHeader | false | 是否添加存储绝对路径文件名的标头 |
includePattern | ^. * $ | 正则表达式,指定要包含的文件 |
ignorePattern | ^ $ | 正则表达式,指定要忽略的文件 |
使用 Spooling Directory Source 采集器配置一个名称为 a1 的 Agent。
a1.channels=ch-1
a1.sources=src-1
a1.sources.src-1.type=spooldir
a1.sources.src-1.channels=ch-1
a1.sources.src-1.spoolDir=/var/log/apache/flumeSpool
a1.sources.src-1.fileHeader=true
(3)Taildir Source
Taildir Source 用于观察指定的文件,几乎可以实时监测到添加到每个文件的新行。如果文件正在写入新行,则此采集器将重试采集它们以等待写入完成。
Taildir Source常用属性(加粗部分为必须属性)
属性名称 | 默认值 | 说明 |
channels | —— | |
type | —— | 组件类型名必须是 TAILDIR |
filegroups | —— | 以空格分隔的文件组列表。每个文件组都指定了要监控的一系列文件 |
filegroups. <filegroupName> | —— | 文件组的绝对路径。正则表达式(而不是文件系统模式)只能用于文件名 |
idle Timeout | 120000 | 关闭非活动文件的时间(ms)。如果关闭的文件附加了新行,则此源将自动重新打开它 |
writePosInterval | 3000 | 写入位置文件上每个文件的最后位置的间隔时间(ms) |
batchSize | 100 | 一次读取和发送到通道的最大行数。使用默认值通常效果较好 |
backoffSleepIncrement | 1000 | 当最后一次尝试未找到任何新数据时,每次重新尝试轮询新数据之间的最大时间延迟 |
fileHeader | false | 是否添加存储绝对路径文件名的标头 |
fileHeaderKey | file | 将绝对路径文件名附加到 event header 时使用的 header 关键字 |
使用 Taildir Source 采集器配置一个名称为 a1 的 Agent。
a1.sources=r1
a1.channels=c1
a1.sources.r1.type=TAILDIR
a1.sources.r1.channels=c1
a1.sources.r1.positionFile=/var/log/flume/taildir_position.json
a1.sources.r1.filegroups=f1 f2
a1.sources.r1.filegroups.f1=/var/log/test1/example.log
a1.sources.r1.headers.f1.headerKet1=value1
a1.sources.r1.filegroups.f2=/var/log/test2/. * log. *
a1.sources.r1.headers.f2.headersKey1=value2
a1.sources.r1.headers.f2.headersKey2=value2-2
a1.sources.r1.fileHeader=true
(4)HTTP Source
HTTP Source 可以通过 HTTP POST 和 GET 请求方式接收 event 数据,GET 通常只能用于测试使用。HTTP 请求会被实现了 HTTPSourceHandler 接口的 handler(处理器)可插拔插件转成 Flume events,这个 handler 接收 HttpServletRequest,返回 Flume events列表。一个 HTTP请求处理的所有事件都在一个事务中提交给通道,从而允许在诸如file channel 之类的 channel 上提高效率。如果 handler 抛出异常,source 会返回400;如果 channel 满了或者 source 不能再向 channel 追加 event,source 会返回 503。在一个 POST 请求发送的所有的 events 都被认为是一个批次,会在一个事务中插入 channel。
HTTP Source常用配置属性(加粗部分为必须属性)
属性名称 | 默认值 | 说明 |
channels | —— | |
type | 组件类型名必须是 http | |
port | —— | 采集源要绑定的端口 |
bind | 0.0.0.0 | 要监听绑定的主机名或 IP 地址 |
handler | org.apache.flume.source.http.JSONHandler | handler 类的全路径名 |
handler. * | —— | 配置 handler 的参数 |
使用 HTTP Source 采集器配置一个名称为a1 的 Agent。
a1.sources=r1
a1.channels=c1
a1.sources.r1.type=http
a1.sources.r1.port=5140
a1.sources.r1.channels=c1
a1.sources.r1.handler=org.example.rest.RestHandler
a1.sources.r1.handler.nickname=random props
2、Flume channel
Channels 通道是event 在Agent 上暂存的存储库,Source 向 Channel 中添加event,Sink 在读取完数据后再删除它。在配置Channels时,需要明确的是将要传输的 sources 数据源类型;接着,根据这些信息并结合开发中的实际需求,选择Flume 已提供支持的 Flume Channels;然后,再根据选择的Channel类型,配置必要和非必要的Channel属性。
这是官方文档
https://flume.apache.org/FlumeUserGuide.html#flume-channels
https://flume.apache.org/FlumeUserGuide.html#flume-channels(1)Memory Channel
Memory Channel 会将 event 存储在具有可配置最大尺寸的内存队列中,它非常适用于需要更高吞吐量的流量,但是在 Agent发生故障时会丢失部分阶段数据。
Memory Channel常用配置属性(加粗部分为必须属性)
属性名称 | 默认值 | 说明 |
type | —— | 组件类型名必须是memory |
capacity | 100 | 存储在 Channel 中的最大 even数 |
transactionCapacity | 100 | Channel 将从 Source 接收或向 Sink传递的每一个事务中的最大 event数 |
keep-alive | 3 | 添加或删除 event 的超时时间(s) |
byteCapacityBufferPercentage | 20 | 定义 byteCapacity与Channel中所有event的估计总大小之间的缓冲区百分比,以计算 header中的数据 |
byteCapacity | (见说明) | 允许此Channel 中所有event的最大内存字节数总和。该统计仅计算Eventbody,这也是提供byteCapacityBufferPercentage 配置参数的原因。默认计算值,等于JVM可用的最大内存的 80%(即命令行传递的-Xmx 值的 80%) |
使用Memory Channel 通道配置一个名称为 a1 的 Agent。
a1.channels=c1
a1.channels.c1.type=memory
a1.channels.c1.capacity=10000
a1.channels.c1.transactionCapacity=10000
a1.channels.c1.byteCapacityBufferPercentage=20
a1.channels.c1.byteCapacity=800000
(2)File channel
File channel 是 Flum 的持久通道,它将所有的 event 写入磁盘,因此不会丢失进程或机器关机、崩溃时的数据。File channel 通过在一次事务中提交多个 event 来提高吞吐量,做到了只要事务被提交,那么数据就不会有丢失。
File channel常用配置属性(加粗部分为必须属性)
属性名称 | 默认值 | 说明 |
type | —— | 组件类型名必须是file |
checkpointDir | ~/.flume/file-channel/checkpoint | 检测点文件所存储的目录 |
useDualCheckpoints | false | 备份检测点如果设置为 true,backupCheckpointDir 必须设置 |
backupCheckpointDir | —— | 备份检查点目录。此目录不能与数据目最或检查点目录相同 |
dataDirs | ~/.flume/file-channel/data | 数据存储所在的目录设置 |
transactionCapacity | 10000 | 事务容量的最大值设置 |
checkpointInterval | 30000 | 检测点之间的时间值设置(ms) |
maxFileSize | 2146435071 | 一个单一日志的最大值设置(以字节为单位) |
capacity | 100000 | Channel的最大容量 |
使用 File channel 通道配置一个名称为 a1 的 Agent。
a1.channels=c1
a1.channels.c1.type=file
a1.channels.c1.checkpointDir=/mnt/flume/checkpoint
a1.channels.c1.dataDirs=/mnt/flume/data
3、Flume Sinks
Flume Sources 采集到的数据通过 Channels 就会流向 Sink 中,此时的 Sink类似一个区结的递进中心,它需要根据后续需求进行配置,从而最终选择是将数据直接进行集中式存储(例如,直接存储到 HDFS中),还是继续作为其他 Agent 的 Source 进行传输。在配置Sinks时,需要明确的就是将要传输的数据目的地、结果类型;接着,根据这些实际需求信息,选择Flume已提供支持的 Flume Sinks;然后,再根据选择的 Sinks类型,配置必要和非必要的 Sinks 属性。
具体可前往官方文档查看
https://flume.apache.org/FlumeUserGuide.html#flume-sinks
https://flume.apache.org/FlumeUserGuide.html#flume-sinks(1)HDFS Sink
HDFS Sink 将 event 写入 Hadoop 分布式文件系统(HDFS),它目前支持创建文本和序列文件,以及两种类型的压缩文件。HDFS Sink 可以基于经过的时间或数据大小或 event数量来周期性地滚动文件(关闭前文件并创建新文件),同时,它还通过属性(如 event发生的时间戳或机器)来对数据进分桶/分区。HDFS 目录路径可能包含将由 HDFS 接收器替换的格式化转义序列,以生用于存储 event 的目录/文件名,使用 HDFS Sink 时需要安装 Hadoop,以便 Flume 可以有用Hadoop jar 与HDFS 集群进行通信。
HDFS Sink常用配置属性(加粗部分为必须属性)
属性名称 | 默认值 | 说明 |
channel | —— | |
type | —— | 组件类型名必须是hdfs |
hdfs.path | —— | HDFS目录路径(如hdfs://namenode/ume/webdata/) |
hdfs.filePrefix | FlumeData | 为在 hdis 目录中由 Flume 创建的文件指定前缀 |
hdís.round | false | 是否应将时间戳向下舍人(如果为 true.则影响除%;之外的 所有基于时间的转义序列) |
hdfs.roundValue | 1 | 舍人到此最高倍数(在使用 hdfs.roundUnit 配置的单位中),小于当前时间 |
hdfs.roundUnit | second | 舍人值的单位(秒、分钟或小时) |
hdfs. rollInterval | 30 | 滚动当前文件之前等待的秒数(0==根据时间间隔从不滚动) |
hdfs.rollSize | 1024 | 触发滚动的文件大小,以字节为单位(0:永不基于文件大小滚动) |
hdfs.rollCount | 10 | 在滚动之前写入文件的事件数(0==从不基于事件数滚动) |
hdfs.batchSize | 100 | 在将文件刷新到 HDFS之前写人文件的 event 数 |
hdfs.useLocalTimeStamp | false | 替换转义序列时,请使用本地时间(而不是event beader 中的时间戳) |
使用 HDFS Sink 配置一个名称为 a1 的 Agent。
a1.channels=c1
a1.sinks=k1
a1.sinks.k1.type=hdfs
a1.sinks.k1.channel=c1
a1.sinks.k1.hdfs.path=/flume/events/%y-%m-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix=events-
a1.sinks.k1.hdfs.round=true
a1.sinks.k1.hdfs.roundValue=10
a1.sinks.k1.hdfs.roundUnit=minute
(2)Logger Sink
Logger Sink用于记录 INFO 级别 event,它通常用于调试。Logger Sink 接收器的不同之处是它不需要在“记录原始数据”部分中说明额外配置。
Logger Sink常用配置属性(加粗部分为必须属性)
属性名称 | 默认值 | 说明 |
channel | —— | |
type | —— | 组件类型名必须是 logger |
maxBytes ToLog | 16 | 要记录的 event body 的最大字节数 |
使用 Logger Sink 配置一个名称为 a1 的 Agent。
a1.channels=c1
a1.sinks=k1
a1.sinks.k1.type=logger
a1.sinks.k1.channel=c1
(3)Avro Sink
Avro Sink 形成了 Flume 的分层收集支持的一半,发送到此接收器的 Flume event 将转换为 Avro event 并发送到配置的主机名/端口对上,event 将从配置的 Channel中批量获取配置的批处理大小。
Avro Sink常用配置属性(加粗部分为必须属性)
属性名称 | 默认值 | 说明 |
channel | —— | |
type | —— | 组件类型名必须是 avro |
hostname | —— | 要监听的主机名或IP地址 |
port | —— | 要监听的服务端口 |
batch-size | 100 | 要一起批量发送的 event 数 |
connect-timeout | 20000 | 允许第一次(握手)请求的时间量(ms) |
request-timeout | 20000 | 在第一个之后允许请求的时间量 |
使用 Avro Sink 配置一个名称为 a1 的 Agent。
a1.channels=c1
a1.sinks=k1
a1.sinks.k1.type=avro
a1.sinks.k1.channel=c1
a1.sinks.k1.hostname=10.10.10.10
a1.sinks.k1.port=4545
四、Flume的可靠性保证
前面讲解的Flume 人门使用中,配置的采集方案是通过唯一一个Sink作为接收器接收后续需要的数据,但有时候会出现当前 Sink故障或者数据收集请求量较大的情况,这候单一的 Sink 配置可能就无法保证 Flume 开发的可靠性。为此,Flume 提供了Flume Sink Processors(Flume Sink 处理器)来解决上述问题。
Sink 处理器允许开发者定义一个 Sink groups(接收器组),将多个 Sink 分组到一个实体中,这样Sink处理器就可以通过组内的多个Sink为服务提供负载均衡功能,或者是在某个Sink出现短暂故障的时候实现从一个 Sink 到另一个 Sink的故障转移。
1、负载均衡
负载均衡接收器处理器(Load balancing sink processor)提供了在多个 Sink 上进行负载均衡流量的功能,它维护了一个活跃的Sink索引列表,必须在其上分配负载。Load balancing sink processor 支持使用 round_robin (轮询)和 random (随机)选择机制进行流量分配,其默认选择机制为 round_robin,但可以通过配置进行覆盖。还支持继承 AbstractSinkSelector 的自定义类来自定义选择机制。
在使用时选择器(selector)会根据配置的选择机制挑选下一个可用的Sink并进行调用。对于round_robin和random两种选择机制,如果所选 Sink无法收集 event,则处理器会通过其配置的选择机制选择下一个可用 Sink。这种实现方案不会将失败的Sink列人黑单,而是继续乐观地尝试每个可用的 Sink。如果所有 Sink 都调用失败,则选择器将故障传播到接收器运行器(sink runner)。
如果启用了 backoff 属性,则 Sink处理器会将失败的Sink列人黑名单。当超时结束时,如果Sink仍然没有响应,则超时会呈指数级增加,以避免在无响应的 Sink 上长时间等待时卡住。在禁用backoff 功能的情况下,在 round_robin 机制下,所有失败的 Sink 将被传递到 Sink 队列中的下一个 Sink后,因此不再均衡。
Load balancing sink processor提供的配置属性(加粗部分为必须属性)。
属性名称 | 默认值 | 说明 |
sinks | —— | 以空格分隔的参与 sink 组的 sink 列表 |
processor.type | default | 组件类型名必须是 load_balance |
processor.backoff | false | 设置失败的 sink 进人黑名单 |
processor.selector | round_robin | 选择机制。必须是 round_robin、random或是继承自AbstractSinkSelector 的自定义选择机制类全路径名 |
processor.selector.maxTimeOut | 30000 | 失败 sink 放置在黑名单的超时时间,失败sink在指 |
processor.type 属性的默认值为 default,这是因为 Sink 处理器的 processor.type 提供了3 种处理机制:default(默认值)、failover 和 load_balance。其中, default 表示配置单独一个sink,配置和使用非常简单,同时也不强制要求使用 sink group 进行封装;另外的 failover 和 load_balance 就分别代表故障转移和负载均衡情况下的配置属性。
使用 Load balancing sink processor 配置一个名称为 al 的 Agent。
a1.sinkgroups=g1
a1.sinkgroups.g1.sinks=kl k2
a1.sinkgroups.g1.processor.type=load_balance
al.sinkgroups.g1.processor.backoff-true
al.ainkgroups.g1.processor.selector=random
(1)搭建并配置Flume机器
在hadoop01上,将Flume同步到hadoop02、hadoop03上
scp -r /export/servers/flume hadoop02.bgd01:/export/servers/
scp -r /export/servers/flume hadoop03.bgd01:/export/servers/
scp -r /etc/profile hadoop02.bgd01:/etc/profile
scp -r /etc/profile hadoop03.bgd01:/etc/profile
分别在hadoop02、hadoop03上执行如下命令,立即刷新配置
source /etc/profile
(2)配置Flume采集方案
a、exec-avro.conf
在hadoop01.bgd01上配置第一级采集配置,在/export/servers/flume/conf/目录下编写采集方案exec-avro.conf。
# 配置load balancing sink processor一级采集方案
a1.sources = r1
#用空格分别配置2个sink
a1.sinks = k1 k2
a1.channels = c1
# 描述并配置sources组件(数据类型、采集数据源的应用地址)
a1.sources.r1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /root/logs/123.log
# 描述并配置channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 设置sink1,流向Hadoop02,由Hadoop02上的Agent进行采集
a1.sinks.k1.channel = c1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop02.bgd01
a1.sinks.k1.port = 52020
# 设置sink2,流向Hadoop03,由Hadoop03上的Agent进行采集
a1.sinks.k2.channel = c1
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop03.bgd01
a1.sinks.k2.port = 52020
# 配置sink组及处理器策略
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random
a1.sinkgroups.g1.processor.timeout = 10000
b、netcat-logger.conf
分别在hadoop02.bgd01和hadoop03.bgd01上配置第二级采集配置,在/export/servers/flume/conf/目录下编写采集方案netcat-logger.conf。
# 配置load balancing sink processor二级采集方案的一个Sink分支
#为agent各组件命名
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 描述并配置sources组件(数据类型、采集数据源的应用地址)
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop02.bgd01
a1.sources.r1.port = 52020
# 描述并配置sinks组件(采集后数据流出的类型)
a1.sinks.k1.type = logger
# 描述并配置channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 将source 和 sink 通过同一个channel连接绑定
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
# 配置load balancing sink processor二级采集方案的一个Sink分支
#为agent各组件命名
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 描述并配置sources组件(数据类型、采集数据源的应用地址)
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop03.bgd01
a1.sources.r1.port = 52020
# 描述并配置sinks组件(采集后数据流出的类型)
a1.sinks.k1.type = logger
# 描述并配置channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 将source 和 sink 通过同一个channel连接绑定
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
两个采集方案内容的唯一区别就是 source.bind 的不同,hadoop02.bgd01 机器的source.bind=hadoop02.bgd01,而hadoop03.bgd01 机器的 source.bind=hadoop03.bgd01,在 r述两个文件中,均设置了一个名为 al的Agent,在该 Agent 内部设置了 source. type= avro、source.bind=hadoop02.bgd01/hadoop03.bgd01 以及 source. port=52020,特意用来对接在 hddoop01.bgd01中前一个Agent收集后到 Sink的数据类型和配置传输的目标;最后,又设置了二采集方案的 sink.type=logger,将二次收集的数据作为日志收集打印。
(2)启动Flume系统
1、分别在hadoop02、hadoop03上,进入 /export/servers/flume 目录,使用netcat-logger采集方案启动FLume
flume-ng agent --conf conf/ --conf-file conf/netcat-logger.conf --name a1 -Dflume.root.logger=INFO,console
2、在hadoop01上,进入 /export/servers/flume 目录,使用exec-avro.conf采集方案启动FLume
flume-ng agent --conf conf/ --conf-file conf/exec-avro.conf --name a1 -Dflume.root.logger=INFO,console
hadoop02.bgd01
hadoop03.bgd01
hadoop01.bgd01
(3)Flume系统负载均衡测试
在hadoop01.bgd01的root目录创建logs目录
mkdir -p /root/logs
在hadoop01上,重新打开一个终端,执行如下命令:
while true; do echo "access access ..." >> /root/logs/123.log; sleep 1; done
hadoop02.bgd01
hadoop03.bgd01
2、故障转移
故障转移接收器处理器(Failover Sink Processor)维护一个具有优先级的sink列表,保证在处理 event 只要有一个可用的 sink 即可。故障转移机制的工作原理是将故障的sink降级到故障池中,在池中为它们分配一个冷却期,在重试之前冷却时间会增加,当 sink 成功发送 event后,它将恢复到活跃池中。sink具有与之相关的优先级,数值越大,优先级越高。如果在发送 event 时 sink 发生故障,则会尝试下一个具有最高优先级的 sink来继续发送event。如果未指定优先级,则根据配置文件中指定 sink 的顺序确定优先级。
Failover Sink Processor配置属性(加粗部分为必须属性)。
属性名称 | 默认值 | 说明 |
sinks | —— | 以空格分隔的参与 sink 组的 sink 列表 |
processor.type | default | 组件类型名必须是 failover |
processor.priority.<sinkName> | —— | 设置 sink 的优先级取值 |
processor.maxpenalty | 30000 | 失败 sink 的最大退避时间 |
使用 Failover Sink Processor 配置一个名称为al的Agent。
al.sinkgroups=g1
a1.sinkgroups.g1.sinks=kl k2
al.sinkgroups.g1.processor.type=failover
al.sinkgroups.g1.processor.priority.k1=5
a1.sinkgroups.gl.processor.priority.k2-10 al.sinkgroups.g1.processor.maxpenalty=10000
(1)配置Flume采集方案
a、avro-logger-memory.conf
在hadoop01.bgd01上配置第一级采集配置,在/export/servers/flume/conf/目录下编写采集方案avro-logger-memory.conf。
# 配置load balancing sink processor一级采集方案
a1.sources = r1
#用空格分别配置2个sink
a1.sinks = k1 k2
a1.channels = c1
# 描述并配置sources组件(数据类型、采集数据源的应用地址)
a1.sources.r1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /root/logs/456.log
# 描述并配置channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 设置sink1,流向Hadoop02,由Hadoop02上的Agent进行采集
a1.sinks.k1.channel = c1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop02.bgd01
a1.sinks.k1.port = 52020
# 设置sink2,流向Hadoop03,由Hadoop03上的Agent进行采集
a1.sinks.k2.channel = c1
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop03.bgd01
a1.sinks.k2.port = 52020
# 配置sink组及处理器策略
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1=5
a1.sinkgroups.g1.processor.priority.k2=10
a1.sinkgroups.g1.processor.timeout = 10000
b、exec-avro-failover.conf
分别在hadoop02.bgd01和hadoop03.bgd01上配置第二级采集配置,在/export/servers/flume/conf/目录下编写采集方案exec-avro-failover.conf。
# 配置load balancing sink processor二级采集方案的一个Sink分支
#为agent各组件命名
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 描述并配置sources组件(数据类型、采集数据源的应用地址)
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop02.bgd01
a1.sources.r1.port = 52020
# 描述并配置sinks组件(采集后数据流出的类型)
a1.sinks.k1.type = logger
# 描述并配置channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 将source 和 sink 通过同一个channel连接绑定
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
# 配置load balancing sink processor二级采集方案的一个Sink分支
#为agent各组件命名
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 描述并配置sources组件(数据类型、采集数据源的应用地址)
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop03.bgd01
a1.sources.r1.port = 52020
# 描述并配置sinks组件(采集后数据流出的类型)
a1.sinks.k1.type = logger
# 描述并配置channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 将source 和 sink 通过同一个channel连接绑定
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
(2)启动Flume系统
1、分别在hadoop02、hadoop03上,进入 /export/servers/flume 目录,使用avro-logger-memory.conf采集方案启动FLume
flume-ng agent --conf conf/ --conf-file conf/avro-logger-memory.conf --name a1 -Dflume.root.logger=INFO,console
2、在hadoop01上,进入 /export/servers/flume 目录,使用exec-avro-failover.conf采集方案启动FLume
flume-ng agent --conf conf/ --conf-file conf/exec-avro-failover.conf --name a1 -Dflume.root.logger=INFO,console
hadoop01.bgd01
hadoop02.bgd01
hadoop03.bgd01
(3)Flume系统故障转移测试
在hadoop01.bgd01上,重新打开一个终端,执行如下命令:
while true; do echo "access access ..." >> /root/logs/456.log; sleep 1; done
hadoop02.bgd01
hadoop03.bgd01
五、Flume拦截器
FlumeInterceptors(拦截器)主要用于实现对Flume系统数据流中 event 的修改操作。在使用 Flume 拦截器时,只需要参考官方配置属性在采集方案中选择性地配置即可,当涉及配置多个拦截器时,拦截器名称中间需要用空格分隔,并且拦截器的配置顺序就是拦微顺序。这里只简述常用的几种的,具体可前往官方文档查看。
https://flume.apache.org/FlumeUserGuide.html#flume-interceptors
https://flume.apache.org/FlumeUserGuide.html#flume-interceptors1、Timestamp interceptor
TimestampInterceptor(时间戳拦截器)会将流程执行的时间插入到event的header头部。此拦截器插人带有 timestamp 键(或由 header 属性指定键名)的标头,其值为对应时间戳。如果配置中已存在时间戳时,此拦截器可以保留现有的时间戳。
Timestamp Interceptor常用配置属性(加粗部分为必须属性)。
属性名称 | 默认值 | 说明 |
type | —— | 组件类型名必须是timestamp |
header | timestamp | 用于放置生成的时间戳的标头的名称 |
preserveExisting | false | 如果时间戳已存在,是否应保留,true或false |
为名称为a1的Agent中配置Timestamp interceptor。
a1.sources=r1
a1.channels=c1
a1.sources.r1.channels=c1
a1.sources.r1.type=seq
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=timestamp
2、 Static interceptor
Static Interceptor(静态拦截器)允许用户将具有舒志值的静态头附加到所有even,当前实现不支持一次指定多个 header头,但是用户可以定又多个 Static Intercrptor 来为每一个拦截器都追加一个 header。
Static Interceptor常用配置属性(加粗部分为必须属性)。
属性名称 | 默认值 | 说明 |
type | —— | 组件类型名必须是static |
preserveExisting | true | 如果配置的header已存在,是否应保服 |
key | key | 应创建的 header 的名称 |
value | value | 应创建的header 对应的静态值 |
为名称是al的Agent 中配置 Static Intercepior。
a1.sources=r1
a1.channels=c1
a1.sources.r1.channels=c1
a1.sources.r1.type=seq
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=static
a1.sources.r1.interceptors.i1.key=datacenter
a1.sources.r1.interceptors.i1.value=BET_JING
3、Search and Replace Interceptor
Search and Replace Interceptor(查询和替换拦戴器)基于Java正则表达式提供了简用的用于字符串的搜索和替换功能,同时还具有进行回溯/群组捕捉功能。此拦截器的便用与Matcher.replaceAllO方法具有相同的规则。
Sarch and Replace Intereeptor常用配置属性(加租部分为必须属性)
属性名称 | 默认值 | 说明 |
type | —— | 组件类型名必须是 search_replane |
searchPattern | —— | 要查询或替换的模式 |
replaceString | —— | 替换的字符串 |
charset | UTF-8 | event body 的字符集,默认为 UTF-8 |
为名称为al的Agent 中配置 Search and Replace Interceptor的示例如下。
al.sources=r1
a1.channels=cl
a1.sources.r1.channels=cl
a1.sources.r1.type=seg
a1.sources.avroSrc.interceptors=i1
al.sources.avroSrc.interceptors.i1.type=search_replace
# 影除 event body 中的前导字母数字字符
a1.sources.avroSrc.interceptors.i1.searchPattern=^[A-Za-z0-9_]+ al.sources.avroSrc.interceptors.i1.replacesString=
六、案例——日志采集
1、配置采集方案
(1)exec-avro_logCollection.conf
分别在hadoop02.bgd01和hadoop03.bgd01上配置同样的采集目录,在/export/servers/flume/conf/目录下编写采集方案exec-avro_logCollection.conf。
# 配置 Agent 组件
# 用3个source采集不同的日志类型数据
a1.sources = r1 r2 r3
a1.sinks = k1
a1.channels = c1
# 描述并配置第一个sources组件(数据类型、采集数据源的应用地址、包括自带的拦截器)
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /root/logs/access.log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = type
a1.sources.r1.interceptors.i1.value = access
# 描述并配置第二个sources组件(数据类型、采集数据源的应用地址、包括自带的拦截器)
a1.sources.r2.type = exec
a1.sources.r2.command = tail -F /root/logs/nginx.log
a1.sources.r2.interceptors = i2
a1.sources.r2.interceptors.i2.type = static
a1.sources.r2.interceptors.i2.key = type
a1.sources.r2.interceptors.i2.value = nginx
# 描述并配置第三个sources组件(数据类型、采集数据源的应用地址、包括自带的拦截器)
a1.sources.r3.type = exec
a1.sources.r3.command = tail -F /root/logs/web.log
a1.sources.r3.interceptors = i3
a1.sources.r3.interceptors.i3.type = static
a1.sources.r3.interceptors.i3.key = type
a1.sources.r3.interceptors.i3.value = web
# 描述并配置channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 200000
a1.channels.c1.transactionCapacity = 100000
# 描述并配置sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop01.bgd01
a1.sinks.k1.port = 41414
# 将Source、Sink 与Channel 进行关联绑定
a1.sources.r1.channels = c1
a1.sources.r2.channels = c1
a1.sources.r3.channels = c1
a1.sinks.k1.channel = c1
(2)avro-hdfs_logCollection.conf
在hadoop01.bgd01上配置第二级日志采集方案,在/export/servers/flume/conf/目录下编写采集方案exec-hdfs_logCollection.conf。
# 配置load balancing sink processor二级采集方案的一个Sink分支
#配置 Agent 组件
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 描述并配置sources组件(数据类型、采集数据源的应用地址)
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop01.bgd01
a1.sources.r1.port = 41414
# 描述并配置拦截器,用于后续%Y%m%d获取时间
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
# 描述并配置channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 20000
a1.channels.c1.transactionCapacity = 10000
# 描述并配置sinks组件(采集后数据流出的类型)
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path =hdfs://hadoop01.bgd01:9000/source/logs/%{type}/%Y%m%d
a1.sinks.k1.hdfs.filePrefix = events
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
#生成的文本不按条数生成
a1.sinks.k1.hdfs.rollCount = 0
#生成的文本不按时间生成
a1.sinks.k1.hdfs.rollInterval = 0
#生成的文本按大小生成
a1.sinks.k1.hdfs.rollSize = 10485760
#批量写入HDFS的个数
a1.sinks.k1.hdfs.batchSize = 20
#Flume操作HDFS的线程数(包括新建、写入)
a1.sinks.k1.hdfs.threadsPoolSize = 10
#操作HDFS超时时间
a1.sinks.k1.hdfs.callTimeout = 30000
# 将source 和 sink 通过同一个channel连接绑定
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2、启动hadoop集群
start-dfs.sh #启动HDFS
start-yarn.sh #启动YARN
3、启动Flume系统
在hadoop01上启动Flume系统
打开终端。进入 /export/servers/flume 目录,使用avro-hdfs_logCollection.conf采集方案启动FLume。命令如下:
flume-ng agent --conf conf/ --conf-file conf/avro-hdfs_logCollection.conf --name a1 -Dflume.root.logger=INFO,console
分别在hadoop02、hadoop03上,进入 /export/servers/flume 目录,使用exec-avro_logCollection.conf采集方案启动FLume。命令如下:
flume-ng agent --conf conf/ --conf-file conf/exec-avro_logCollection.conf --name a1 -Dflume.root.logger=INFO,console
4、 日志采集系统测试
1、在hadoop02上,创建目录/root/logs;然后打开3个终端,分别执行执行如下命令,用来产生日志数据:
while true; do echo "access access ..." >> /root/logs/access.log; sleep 1; done
while true; do echo "nginx nginx ..." >> /root/logs/nginx.log; sleep 1; done
while true; do echo "web web ..." >> /root/logs/web.log; sleep 1; done
2、在hadoop03上,创建目录/root/logs;然后打开3个终端,分别执行执行如下命令,用来产生日志数据:
while true; do echo "access access ..." >> /root/logs/access.log; sleep 1; done
while true; do echo "nginx nginx ..." >> /root/logs/nginx.log; sleep 1; done
while true; do echo "web web ..." >> /root/logs/web.log; sleep 1; done
回到在hadoop01.bgd01上启动Flume系统的终端窗口,观察日志采集信息。
在hadoop01.bgd01上,打开FireFox浏览器,在地址栏输入地址 http://hadoop01.bgd01:50070(集群IP/主机名+端口),进入到Hadoop集群UI,可以看到集群下新添加了一个Source目录。单击进入Source目录,查看内部文件存储结构。
参考书籍
《Hadoop大数据技术原理与应用》