当前位置: 首页>数据库>正文

Flume有国产化 flume ng

软件版本:

  • CentOS 6.7
  • hadoop-2.7.4
  • apache-flume-1.6.0

一、Flume NG简述

  • Flume 是 Cloudera 提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。
  • Flume将采集到的文件,socket数据包等各种形式的数据源,输出到HDFS、Hbase、hive、kafka等众多外部存储系统中
  • Flume针对特殊场景也具备良好的自定义扩展能力,因此flume适用于大部分的日常数据采集场景
  • 一般的采集需求,通过对flume的简单配置即可实现
  • Flume分布式系统中最核心的角色时agent,flume采集系统就是由一个个agent所连接起来形成的
  • 每个agent相当于一个数据传递员(Source到Channel到Sink之间传递数据的形式时event事件,event事件是一个数据流单元)

Flume的架构图中有3个组件,分别是source、channel、sink

  • Source:采集数据源,用于和数据源对接,获取数据
  • Sink:下沉,采集数据传送目的地,用于往下一级agent传递数据或者往最终的存储系统传递数据
  • Channel:agent的内部传输管道,用于将数据源以事件event的形式,从Source到Sink

Flume有国产化 flume ng,Flume有国产化 flume ng_Flume有国产化,第1张

运行流程:

从外部系统(Web Server)中收集产生的日志,然后通过Agent的Source组件将数据发送到临时存储Channel组件,最后传递给Sink组件,Sink组件将满足预设值的临时文件,存储到HDFS文件系统中。

二、搭建单点Flume NG

1、解压软件包

tar -zxvf apache-flume-1.6.0-bin.tar.gz -C /export/servers/

2、配置环境变量

export FLUME_HOME=/export/servers//flume-1.6.0
export PATH=$PATH:$FLUME_HOME/bin

3、修改flume配置文件

$FLUME_HOME/conf/flume-env.sh(flume-env.sh.template修改成flume-env.sh)

export JAVA_HOME=/export/servers/jdk1.8.0_171

4、简单测试—采集指定文件到 HDFS

服务器会在指定目录下,会不断产生新的日志文件,每当有新的日志文件产生,flume自动将新产生的数据源采集到文件存储系统HDFS中

创建配置文件spooldir-hdfs.properties

# Name the components on this agent 
a1.sources = r1 	# agent的别名
a1.sinks = k1 
a1.channels = c1 
 
# Describe/configure the source 
# 采集数据的类型
a1.sources.r1.type = exec       
# 指定执行命令(flume自动执行该命令)
a1.sources.r1.command = tail -F /export/data/callLog.log    
  
# Describe the sink
# 指定采集信息下沉到哪里
a1.sinks.k1.type = hdfs 
# 指定采集到的数据存放在hdfs文件系统的哪个路径下
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/
# 指定保存信息文件名的前缀
a1.sinks.k1.hdfs.filePrefix = events- 
# 是否舍弃已下沉的文件,舍弃指的是:根据指定的时间间隔创建文件夹
a1.sinks.k1.hdfs.round = true
#指定每10分钟舍弃已下沉的文件
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute 
# 每30s,将临时文件,持久化到hdfs文件系统中;设置为0,不滚动,滚动即下沉
a1.sinks.k1.hdfs.rollInterval = 30 	
# 临时文件达到指定字节就滚动,默认1024;设置为0,不根据临时文件大小来滚动文件
a1.sinks.k1.hdfs.rollSize = 1024 
# 临时文件的事件event个数达到指定值就滚动,默认10;如果设置成0,不根据events数据来滚动文件	
a1.sinks.k1.hdfs.rollCount = 10
# 每个事件写入的行数,默认100
a1.sinks.k1.hdfs.batchSize = 100
# 是否使用本地的时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true 
# 生成的文件类型,默认是 Sequencefile,可用 DataStream(普通文本)
a1.sinks.k1.hdfs.fileType = DataStream
 
# Use a channel which buffers events in memory
# channels数据缓存类型
a1.channels.c1.type = memory
# 该通道中最大的可以存储的event数量	
a1.channels.c1.capacity = 1000
# 每次最大可以从source中拿到或者送到sink中的event数量
a1.channels.c1.transactionCapacity = 100
 
# Bind the source and sink to the channel 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1

提示:配置文件中的注释,在虚拟机配置时尽可能删除,如果保留可能会报错

启动flume

flume-ng agent -c conf -f conf/spooldir-hdfs.properties -n a1 -Dflume.root.logger=INFO,console

命令行参数解释: 

  • -c conf   指定flume自身的配置文件所在目录
  • -f conf/spooldir-hdfs.properties 指定我们所描述的采集方案
  • -n a1  指定我们这个agent的名字

下面截图是复制的节点启动发送数据源后,收到的数据源

Flume有国产化 flume ng,Flume有国产化 flume ng_flume集群_02,第2张

复制该节点,产生数据,这里是自己编写的java代码,也可以使用下面shell命令测试

[root@node01 flume-1.6.0]# while true;do echo test >> /export/data/callLog.log;sleep 0.5;done

Flume有国产化 flume ng,Flume有国产化 flume ng_flume集群_03,第3张

 三、搭建高可用Flume NG

高可用的Flume NG集群,架构图如下所示:

由于电脑性能的限制,将agent减少到1个节点,Collector维持原来的2个节点。主要是为了后面测试负载均衡、以及容错考虑

Flume有国产化 flume ng,Flume有国产化 flume ng_容错机制_04,第4张

1、节点分配

 图中所示,Agent数据分别流入到Collector1和Collector2,Flume NG本身提供了Failover机制,可以自动切换和恢复。在上图中,有3个产生日志服务器分布在不同的机房,要把所有的日志都收集到一个集群中存储。下面我们开发配置Flume NG集群

2、Flume 的 load-balance

负载均衡是用于解决一台机器(一个进程)无法解决所有请求而产生的一种算法。Load balancing Sink Processor能够实现load balance功能,如上图 Agent1是一个路由节点,负责将Channel暂存的Event均衡到对应的Collector1和Collector2中

2.1、agent端的配置文件:exec-avro.properties

#agent1 name
agent1.channels = c1
agent1.sources = r1
agent1.sinks = k1 k2

#set channel
agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 1000
agent1.channels.c1.transactionCapacity = 100

agent1.sources.r1.channels = c1
agent1.sources.r1.type = exec
agent1.sources.r1.command = tail -F /export/data/callLog.log

# set sink1
agent1.sinks.k1.channel = c1
agent1.sinks.k1.type = avro
agent1.sinks.k1.hostname = node02
agent1.sinks.k1.port = 52020

# set sink2
agent1.sinks.k2.channel = c1
agent1.sinks.k2.type = avro
agent1.sinks.k2.hostname = node03
agent1.sinks.k2.port = 52020

#set sink group
agent1.sinkgroups = g1
agent1.sinkgroups.g1.sinks = k1 k2

#set load_balance
agent1.sinkgroups.g1.processor.type = load_balance
agent1.sinkgroups.g1.processor.backoff = true
agent1.sinkgroups.g1.processor.selector = round_robin
agent1.sinkgroups.g1.processor.selector.maxTimeOut=10000

2.2、Collector1端的配置文件:avro-logger.properties

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = node02
a1.sources.r1.port = 52020

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

提示:Collector2的配置文件与Collector1的基本相同,只需要修改a1.sources.r1.bind

2.3、启动测试

  1. 分别启动Collector1、Collector2、agent
  2. 复制agent虚拟机,执行shell命令:while true;do echo test >> /export/data/callLog.log;sleep 0.5;done

Collector1的运行结果

Flume有国产化 flume ng,Flume有国产化 flume ng_flume集群_05,第5张

 Collector2的运行结果

Flume有国产化 flume ng,Flume有国产化 flume ng_Flume有国产化_06,第6张

3、Flume 的 failover

3.1、故障转移机制

实现 failover 功能,具体流程类似 load balance,但是内部处理机制与load balance完全不同

  • Failover Sink Processor维护一个优先级Sink组件列表,只要有一个Sink 组件可用,Event就会被传递到下一个组件。
  • 故障转移机制的作用是将失败的Sink 降级到一个池,在这些池中它们被分配一个冷却时间,随着故障的连续,增加下次该Sink的重试时间。但只要失败的Sink成功发送一个Event,它将恢复到活动池。
  • Sink具有与之相关的优先级,数值越大,优先级越高。

例如,具有优先级为10的sink在优先级为8的Sink之前被激活。在激活过程中,如果发送事件时汇聚失败,则将尝试让优先级为8的Sink发送事件。如果没有指定优先级,则根据在配置中指定Sink的顺序来确定发送顺序

3.2、配置如下:

#agent1 name
agent1.channels = c1
agent1.sources = r1
agent1.sinks = k1 k2

#set channel
agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 1000
agent1.channels.c1.transactionCapacity = 100

agent1.sources.r1.channels = c1
agent1.sources.r1.type = exec
agent1.sources.r1.command = tail -F /export/data/callLog.log

# set sink1
agent1.sinks.k1.channel = c1
agent1.sinks.k1.type = avro
agent1.sinks.k1.hostname = node02
agent1.sinks.k1.port = 52020

# set sink2
agent1.sinks.k2.channel = c1
agent1.sinks.k2.type = avro
agent1.sinks.k2.hostname = node03
agent1.sinks.k2.port = 52020

#set sink group
agent1.sinkgroups = g1
agent1.sinkgroups.g1.sinks = k1 k2

#set failover
a1.sinkgroups.g1.processor.type = failover
# 如果开启,则将失败的 sink 放入黑名单
a1.sinkgroups.g1.processor.backoff = true
# 还支持random
a1.sinkgroups.g1.processor.selector = round_robin
#在黑名单放置的超时时间,超时结束时,若仍然无法接收,则超时时间呈指数增长
a1.sinkgroups.g1.processor.selector.maxTimeOut=10000
#优先级值, 绝对值越大表示优先级越高,若不设置,则按照sink的先后顺序
a1.sinkgroups.g1.processor.priority.k1 = 5  
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.priority.k3 = 6
#失败的 Sink 的最大回退期(millis)
a1.sinkgroups.g1.processor.maxpenalty = 20000

3.3、测试参考负载均衡,这里就不测试了


https://www.xamrdz.com/database/6ax1962862.html

相关文章: