【云驻共创】深入学习Flume海量日志聚合
前言
Flume是开源日志系统。是一个分布式、可靠和高可用的海量日志聚合的系统,支持在系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方。
一、Flume简介及架构
Flume是什么?
Flume是流式日志采集工具,Flume提供对数据进行简单处理并且写到各种数据接受方的能力,Flume提供从本地文件(spooling directory source)、实时日志(taildir、 exec)、REST消息、Thrift . Avro、Syslog、Kafka等数据源上收集数据的能力,并且把接收的数据传送到各种数据传输方。
Flume能干什么?
- 提供从固定自录下采集日志信息到目的地(HDFS,HBase,Kafka)能力。
- 提供实时采集日志信息(taildir)到目的地的能力。
- Flume支持级联(多个Flume对接起来),合并数据的能力。
- Flume支持按照用户定制采集数据的能力。
Flume Agent架构
Flume基础架构:Flume可以单节点直接采集数据,主要应用于集群内数据。
Flume多agent架构: Flume可以将多个节点连接起来,将最初的数据源经过收集,存储到最终的存储系统中。主要应用于集群外的数据导入到集群内。
Flume多Agent合并
通过配置多个一级agents,然后全部指向-一个agent的source,这在Flume上是可以实现的,二级agent的source合并接收events进一个单独channel,这个channel里面的events会被一个sink消费后进入到最终目的地。
Flume Agent 原理
通道是具体用来存放了缓存数据的,把消息缓存到了自己的队列中。Sink Runner的它主要是用来驱动Sink Processor,Sink Processor又进一步的驱动Sink,进行了消息的消费或者是二次数据的消费。Sink Processor可以去设置一些消费的策略或者拉取数据的策略,通常有负载均衡、直通的方式、故障转移的方式。Sink主要是从Channel中拉取数据、拉取日志信息,然后最后存放到了接收方,接收方可以是Flume Agent,也可以是数据最终的归宿,比如说HDFS、HBase等等这样的检索组件。
基本概念-Source
Source负责接收events或通过特殊机制产生events,并将events批量放到一个或多个Channels。有驱动和轮询2种类型的Source。
- 驱动型source:是外部主动发送数据给Flume,驱动Flume接受数据。
- 轮询source:是Flume周期性主动去获取数据。
- Source必须至少和一个channel关联。
Source类型
- exec source:执行某个命令或者脚本,并将其执行结果的输出作为数据源。
- avro source:提供一个基于avro协议的server,bind到某个端口上,等待avro协议客户端发过来的数据。
- thrift source:同avro,不过传输协议为thrift。
- http source:支持http的post发送数据。
- syslog source:采集系统syslog。
- spooling directory source:采集本地静态文件。
- jms source:从消息队列获取数据。
- Kafka source:从Kafka中获取数据。
Channel 基本概念
Channel位于Source和Sink之间,Channel的作用类似队列,用于临时缓存进来的events,当Sink成功地将events发送到下一跳的channel或最终目的,events从Channel移除。
不同的Channel提供的持久化水平也是不一样的:
- Memory Channel:不会持久化。
- File Channel:基于WAL((预写式日志Write-Ahead Log)实现。
- JDBC Channel:基于嵌入式Database实现。
Channels支持事务,提供较弱的顺序保证,可以连接任何数量的Source和Sink。
Memory Channel:消息存放在内存中,提供高吞吐,但不提供可靠性;可能丢失数据。
File Channel:对数据持久化;但是配置较为麻烦,需要配置数据目录和checkpoint目录;不同的file channel均需要配置一个checkpoint目录。
JDBC Channel:内置的derby数据库,对event进行了持久化,提供高可靠性;可以取代同样具有持久特性的file channel。
Sink基本概念
Sink负责将events传输到下一跳或最终目的,成功完成后将events从channel移除。必须作用于一个确切的channel。
Sink类型
- hdfs sink:将数据写到hdfs上。
- avro sink:使用avro协议将数据发送给另下一跳的flume。
- thift sink:同avro,不过传输协议为thrift。
- file roll sink:将数据保存在本地文件系统中。
- hbase sink:将数据写到hbase中。
- Kafka sink:将数据写入到Kafka中。
- MorphlineSolr sink:将数据写入到Solr中。
二、Flume关键特性介绍
Flume支持采集日志文件
Flume主要是用来做二次收集的,它非常的轻便、非常好用,使用它只需要去做一些配置文件,然后启动它就可以完成具体的任务。把信息从日志数据库中获取,然后把它发送到了我们的接收方。这种模式我们是采用的是单Agent进程,每个Agent进程对应一个数据的来源,并且最终把数据存储到接收方,它主要是用于将集群内的日志文件采集并归档到HDFS、HBase、Kafka上,供上层应用对数据分析、清洗数据使用。
Flume支持多级级联和多路复制
Flume支持将多个Flume级联起来,同时级联节点内部支持数据复制。
如果说HDFS它接收数据发生了宕机,Flume可以把发送的数据进行回滚,同时选择一个正常的链路把数据发送给下一个接收方,这是一种错误恢复的配置方式。
这个场景主要应用于:收集FusionInsight集群外上的节点上的日志,并通过多个Flume节点,最终汇聚到集群当中。
Flume级联消息压缩、加密
Flume级联节点之间的数据传输支持压缩和加密,提升数据传输效率和安全性。在同一个Flume内部进行传输时,不需要加密,为进程内部的数据交换。
比如前一个Agent它的输出信息经过Sink之后,它要发送给下一个Flume,在这个两个Agent之间,我们可以用Flume的api来对它进行压缩或者是加密,最后发送给Flume。在Flume的内部之后可以进行来解密。
Flume 数据监控
我们可以对Flume的一些信息进行监控。比如说呢我们可以通过华为的MRS manager对Flume来进行一些状态监控。Source接收的数据量,Channel缓存的数据量,Sink写入的数据量,这些都可以通过Manager图形化界面呈现出来。
Flume传输可靠性
Flume在传输数据过程中,采用事务管理方式,保证传输过程中数据不会丢失,增强了数据传输的可靠性,同时缓存在channel中的数据如果采用file channel,进程或者节点重启数据不会丢失。增加数据传输的可靠性,同时Flume可以采用一些持久性的通道,以此保证数据不会丢失。
Flume 传输可靠性(failover)
Flume在传输数据过程中,如果下一跳的Flume节点故障或者数据接收异常时,可以自动切换到另外一路上继续传输。
Flume对于下一跳的策略默认有三种,一种是Default方式,另外还有Load balancing。还有一种方式是容错(failover)方式。那容错方式当他发现接收通道出现问题之后,它会自动切换到另外一个通道上来继续传输。Load balancing是我们会根据情况自由地去选择,根据算法去选择了往哪个分支上发送数据。
Flume传输过程中数据过滤
Flume在传输数据过程中,可以简单的对数据简单过滤、清洗,可以去掉不关心的数据,同时如果需要对复杂的数据过滤,需要用户根据自己的数据特殊性,开发过滤插件,Flume支持第三方过滤插件调用。用户也可以去继承了某一个接口,自己去开发相应的过滤组件。
三、Flume应用举例
操作示例1
说明
本例子演示Flume通过采集集群内应用(比如网银系统)产生的日志到HDFS上。
1、数据准备
在集群某一个节点上创建日志目录/tmp/log_test。该目录作为监控目录mkdir /tmp/log_test。
2、下载Flume客户端
登录MRS MAanager集群管理界面,单击“服务管理> Flume >下载客户端。
3、安装Flume客户端
解压客户端
tar-xvf MRS_Flume_Client.tar
tar -xvf MRS_Flume_ClientConfig.tar
cd /tmp/MRS-client/MRS_Flume ClientConfig/Flume
tar -xvf FusionInsight-Flume-1.6.0.tar.gz
安装客户端
./install.sh -d /opt/FlumeClient -f hostlP -c
flume/conf/client.properties.properties
4、配置Flume source
server.sources = a1
server.channels = ch1
server.sinks = s1
#the source configuration of a1
server.sources.a1.type = spooldir
server.sources.a1.spoolDir = /tmp/log_test
server.sources.a1.fileSuffix = .COMPLETED
server.sources.a1.deletePolicy = never
server.sources.a1.trackerDir = .flumespool
server.sources.a 1.ignorePattern = A$
server.sources.a1.batchSize = 1000
server.sources.a1.inputCharset = UTF-8
server.sources.a1.deserializer = LINE
server.sources.a1.selector.type = replicating
server.sources.a1.fileHeaderKey = file
server.sources.a1.fileHeader = false
server.sources.a1.channels = ch1
5、配置Flume channel
# the channel configuration of ch1
server.channels.ch1.type = memory
server.channels.ch1.capacity = 10000
server.channels.ch1.transactionCapacity = 1000
server.channels.ch1.channlefullcount = 10
server.channels.ch1.keep-alive = 3
server.channels.ch1.byteCapacityBufferPercentage = 20
6、配置Flume sink
server.sinks.s1.type = hdfs
server.sinks.s1.hdfs.path= /tmp/flume_avro
server.sinks.s1.hdfs.filePrefix = over_%{basename}
server.sinks.s1.hdfs.inUseSuffix = .tmp
server.sinks.s1.hdfs.rolllnterval = 30
server.sinks.s1.hdfs.rollSize = 1024
server.sinks.s1.hdfs.rollCount = 10
server.sinks.s1.hdfs.fileType = DataStream
server.sinks.s1.hdfs.maxOpenFiles = 5000
server.sinks.s1.hdfs.writeFormat = Writable
server.sinks.s1.hdfs.callTimeout = 10000
server.sinks.s1.hdfs.threadsPoolSize = 10
server.sinks.s1.hdfs.failcount = 10
server.sinks.s 1.hdfs.fileCloseByEndEvent = true
server.sinks.s1.channel = ch1
7、将Flume agent的配置文件命名为properties.properties,并上传该配置文件
8、向监控目录/tmp/log_test生产数据:
mw /var/log/log.11 /tmp/log_test
查看数据是香sink到HDFS:hdfs dfs -ls /tmp/flume_avro
此时,log.11已被flume重命名为log.11.COMPLETED,表示已采集成功。
操作示例2
说明
本例子演示Flume实时采集点击流日志到Kafka,用于后续的实时分析处理。
数据准备
在集群某一个节点上创建日志目录/tmp/log_click 。数据采集到Kafka topic_1028中。
1、配置Flume source:
server.sources = a1
server.channels = ch1
server.sinks = s1
# the source configuration of a1
server.sources.a1.type = spooldir
server.sources.a1.spoolDir = /tmp/log_click
server.sources.a1.fileSuffix =.COMPLETED
server.sources.a1.deletePolicy = never
server.sources.a1.trackerDir = .flumespool
server.sources.a1.ignorePattern = A$
server.sources.a1.batchSize = 1000
server.sources.a1.inputCharset = UTF-8
server.sources.a1.selector.type = replicating
jserver.sources.a1.basenameHeaderKey = basename
server.sources.a1.deserializer.maxBatchLine = 1
server.sources.a1.deserializer.maxLineLength = 2048
server.sources.a1.channels = ch1
2、配置Flume channel:
# the channel configuration of ch1
server.channels.ch1.type = memory
server.channels.ch1.capacity = 10000
server.channels.ch1.transactionCapacity = 1000
server.channels.ch1.channlefullcount = 10
server.channels.ch1.keep-alive = 3
server.channels.ch1.byteCapacityBufferPercentage = 20
4、配置Flume sink,把kafka作为数据的接收方
# the sink configuration of s1
server.sinks.s1.type = org.apache.flume.sink.kafka.KafkaSink
server.sinks.s1.kafka.topic = topic_1028
server.sinks.s1.flumeBatchSize = 1000
server.sinks.s1.kafka.producer.type = sync
server.sinks.s1.kafka.bootstrap.servers = 192.168.225.15:21007
server.sinks.s1.kafka.security.protocol = SASL_PLAINTEXT
server.sinks.s1.requiredAcks =0
server.sinks.s1.channel = ch1
5、上传配置文件到Flume。使用Kafka命令查看采集到Kafka topic_1028的数据。通过华为的大数据平台把数据上传到系统中。
总结
Flume的功能和应用场景,并对Flume的一些基本概念、作用、可靠性、配置项做了详细说明。清楚的知道了Flume的作用、适用场景以及如何正确配置使用Flume。
学习推荐
华为Learning网站: https://support.huawei.com/learning
华为e学云: https://support.huawei.com/learning/elearning
华为Support案例库:https://support.huawei.com/enterprise
本文整理自华为云社区【内容共创系列】活动。
查看活动详情:https://bbs.huaweicloud.com/blogs/314887
相关任务详情:Flume海量日志聚合
- 点赞
- 收藏
- 关注作者
评论(0)