当前位置: 首页>编程语言>正文

flume 二次开发 flume搭建


文章目录

  • 整体介绍
  • 环境准备
  • 安装配置
  • 解压安装包
  • 配置文件
  • flume-env.sh
  • flume-conf.propertise
  • 启动服务
  • 使用示例
  • memory channel
  • file channel
  • Spooling Directory Source
  • 写到HDFS


整体介绍

按照Apache官网介绍,Flume是一个分布式的、可靠的、可用的系统逛街,用来高效的收集、聚集、移动大规模的日志数据(collecting, aggregating, and moving large amounts of log data)。

环境准备

三台Linux机器、JDK1.6以上
http://flume.apache.org/download.htmlhttp://archive.apache.org/dist/flume/ 下载apache-flume-1.6.0-bin.tar.gz解压到linux机器/opt目录下。

安装配置

解压安装包

cd /opt/
tar zxf XXXXX/apache-flume-1.6.0-bin.tar.gz
#XXXXX 为下载的文件完整路径

配置文件

flume-env.sh
cd /opt/apache-flume-1.6.0-bin/conf
mv flume-env.sh.template flume-env.sh
vi flume-env.sh

添加以下后保存

export JAVA_HOME=/usr/java/jdk1.8.0_181/
#根据自身机器JAVA路径配置

如果Hadoop配置了HA,将Hadoop的core-site和hdfs-site文件拷贝到flume/conf下
如果Flume不在Hadoop集群中,将Hadoop相关jar包放到lib目录下。

flume-conf.propertise

Flume的配置非常简单,核心的配置就是 conf目录下的配置文件,可参照flume-conf.propertise的格式进行配置。Flume agent包括Source、Channel、Sink三个组件,每个组件的类型、参数都按照需求配置即可。
从官网的简单例子入手配置。生成event给Flume后在把他们打印到控制台输出。agent的名称是a1,a1的source是netstat的类型,监听44444端口;channel是memory类型;sink是logger类型。
具体配置如下(根据需要可以配置多个不同的文件,在启动Flume的时候指定其中一个)

# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动服务

启动时指定使用的配置文件example.conf

$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console

启动后在另一个窗口往44444端口写入数据,测试程序是否成功打印出数据。

telnet localhost 44444

随便输入数据,将在控制台打印出相关日志,测试成功。
也可以在配置文件中配置变量,在启动命令中传入变量值,具体可参照官网。

使用示例

以上实现了最简单的Flume配置和启动。Flume提供了丰富的agent配置类型和参数,用以满足各种使用场景。下面举例说明:

memory channel

监听日志文件

# define agent
a1.sources = s1
a1.channels = c1
a1.sinks = k1
# define sources
a1.sources.s1.type = exec
a1.sources.s1.command = tail -f /var/log/nginx/access.log
# define channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100
a1.channels.c1.transactionCapacity = 10
# define sinks
a1.sinks.k1.type = logger
# bind source and sink to the channel
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

file channel

# define agent
a1.sources = s1
a1.channels = c1
a1.sinks = k1
# define sources
a1.sources.s1.type = exec
a1.sources.s1.command = tail =f /var/log/nginx/access.log
# define channels
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/datas/flume/file/check
a1.channels.c1.dataDirs = /opt/datas/flume/file/data
a1.channels.c1.transactionCapacity = 10
# define sinks
a1.sinks.k1.type = logger
# bind source and sink to the channel
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

Spooling Directory Source

监听目录:当目录有新文件出现时,会将文件写入Channel,完成后会在文件名后加后缀.COMPLETED。如果不想监听一些临时文件等,可以设置ignorePattern用正则过滤。

# define agent
a1.sources = s1
a1.channels = c1
a1.sinks = k1
# define sources
a1.sources.s1.type = spooldir
#指定监听目录
a1.sources.s1.spoolDir = /opt/datas/spooling
a1.sources.s1.ignorePattern = ([^ ]*\.tmp$)
# define channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100
a1.channels.c1.transactionCapacity = 10
# define sinks
a1.sinks.k1.type = logger
# bind source and sink to the channel
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

写到HDFS

a1.sources = s1
a1.channels = c1
a1.sinks = k1
# define sources
a1.sources.s1.type = exec
a1.sources.s1.command = tail -n +0 -F /var/log/nginx/access.log
# define channels
a1.channels.c1.type = file
a1.channels.c1.checkpointDir=/var/checkpoint
a1.channels.c1.dataDirs=/var/tmp
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# define sinks
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://localhost:8020/flume/event/hdfs/date=%Y-%m_%d
a1.sinks.k1.hdfs.rollSize = 10240
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# bind source and sink to the channel
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

参考链接
http://flume.apache.org/



https://www.xamrdz.com/lan/5df1931502.html

相关文章: