文章目录
- 整体介绍
- 环境准备
- 安装配置
- 解压安装包
- 配置文件
- flume-env.sh
- flume-conf.propertise
- 启动服务
- 使用示例
- memory channel
- file channel
- Spooling Directory Source
- 写到HDFS
整体介绍
按照Apache官网介绍,Flume是一个分布式的、可靠的、可用的系统逛街,用来高效的收集、聚集、移动大规模的日志数据(collecting, aggregating, and moving large amounts of log data)。
环境准备
三台Linux机器、JDK1.6以上
http://flume.apache.org/download.htmlhttp://archive.apache.org/dist/flume/ 下载apache-flume-1.6.0-bin.tar.gz解压到linux机器/opt目录下。
安装配置
解压安装包
cd /opt/
tar zxf XXXXX/apache-flume-1.6.0-bin.tar.gz
#XXXXX 为下载的文件完整路径
配置文件
flume-env.sh
cd /opt/apache-flume-1.6.0-bin/conf
mv flume-env.sh.template flume-env.sh
vi flume-env.sh
添加以下后保存
export JAVA_HOME=/usr/java/jdk1.8.0_181/
#根据自身机器JAVA路径配置
如果Hadoop配置了HA,将Hadoop的core-site和hdfs-site文件拷贝到flume/conf下
如果Flume不在Hadoop集群中,将Hadoop相关jar包放到lib目录下。
flume-conf.propertise
Flume的配置非常简单,核心的配置就是 conf目录下的配置文件,可参照flume-conf.propertise的格式进行配置。Flume agent包括Source、Channel、Sink三个组件,每个组件的类型、参数都按照需求配置即可。
从官网的简单例子入手配置。生成event给Flume后在把他们打印到控制台输出。agent的名称是a1,a1的source是netstat的类型,监听44444端口;channel是memory类型;sink是logger类型。
具体配置如下(根据需要可以配置多个不同的文件,在启动Flume的时候指定其中一个)
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动服务
启动时指定使用的配置文件example.conf
$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console
启动后在另一个窗口往44444端口写入数据,测试程序是否成功打印出数据。
telnet localhost 44444
随便输入数据,将在控制台打印出相关日志,测试成功。
也可以在配置文件中配置变量,在启动命令中传入变量值,具体可参照官网。
使用示例
以上实现了最简单的Flume配置和启动。Flume提供了丰富的agent配置类型和参数,用以满足各种使用场景。下面举例说明:
memory channel
监听日志文件
# define agent
a1.sources = s1
a1.channels = c1
a1.sinks = k1
# define sources
a1.sources.s1.type = exec
a1.sources.s1.command = tail -f /var/log/nginx/access.log
# define channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100
a1.channels.c1.transactionCapacity = 10
# define sinks
a1.sinks.k1.type = logger
# bind source and sink to the channel
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
file channel
# define agent
a1.sources = s1
a1.channels = c1
a1.sinks = k1
# define sources
a1.sources.s1.type = exec
a1.sources.s1.command = tail =f /var/log/nginx/access.log
# define channels
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/datas/flume/file/check
a1.channels.c1.dataDirs = /opt/datas/flume/file/data
a1.channels.c1.transactionCapacity = 10
# define sinks
a1.sinks.k1.type = logger
# bind source and sink to the channel
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
Spooling Directory Source
监听目录:当目录有新文件出现时,会将文件写入Channel,完成后会在文件名后加后缀.COMPLETED。如果不想监听一些临时文件等,可以设置ignorePattern用正则过滤。
# define agent
a1.sources = s1
a1.channels = c1
a1.sinks = k1
# define sources
a1.sources.s1.type = spooldir
#指定监听目录
a1.sources.s1.spoolDir = /opt/datas/spooling
a1.sources.s1.ignorePattern = ([^ ]*\.tmp$)
# define channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100
a1.channels.c1.transactionCapacity = 10
# define sinks
a1.sinks.k1.type = logger
# bind source and sink to the channel
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
写到HDFS
a1.sources = s1
a1.channels = c1
a1.sinks = k1
# define sources
a1.sources.s1.type = exec
a1.sources.s1.command = tail -n +0 -F /var/log/nginx/access.log
# define channels
a1.channels.c1.type = file
a1.channels.c1.checkpointDir=/var/checkpoint
a1.channels.c1.dataDirs=/var/tmp
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# define sinks
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://localhost:8020/flume/event/hdfs/date=%Y-%m_%d
a1.sinks.k1.hdfs.rollSize = 10240
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# bind source and sink to the channel
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
参考链接
http://flume.apache.org/