【云驻共创】深入学习Flume海量日志聚合

举报
Mr红凯 发表于 2021/12/19 21:25:50 2021/12/19
【摘要】 Flume的功能和应用场景,并对Flume的一些基本概念、作用、可靠性、配置项做了详细说明。通过学习,能够清楚知道Flume的作用、适用场景以及如何正确配置使用Flume。

前言

Flume是开源日志系统。是一个分布式、可靠和高可用的海量日志聚合的系统,支持在系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方。


一、Flume简介及架构

Flume是什么?

Flume是流式日志采集工具,Flume提供对数据进行简单处理并且写到各种数据接受方的能力,Flume提供从本地文件(spooling directory source)、实时日志(taildir、 exec)、REST消息、Thrift . Avro、Syslog、Kafka等数据源上收集数据的能力,并且把接收的数据传送到各种数据传输方。

Flume能干什么?

  • 提供从固定自录下采集日志信息到目的地(HDFS,HBase,Kafka)能力。
  • 提供实时采集日志信息(taildir)到目的地的能力。
  • Flume支持级联(多个Flume对接起来),合并数据的能力。
  • Flume支持按照用户定制采集数据的能力。


Flume Agent架构

Flume基础架构:Flume可以单节点直接采集数据,主要应用于集群内数据。

Flume多agent架构: Flume可以将多个节点连接起来,将最初的数据源经过收集,存储到最终的存储系统中。主要应用于集群外的数据导入到集群内。

Flume多Agent合并

通过配置多个一级agents,然后全部指向-一个agent的source,这在Flume上是可以实现的,二级agent的source合并接收events进一个单独channel,这个channel里面的events会被一个sink消费后进入到最终目的地。


Flume Agent 原理

通道是具体用来存放了缓存数据的,把消息缓存到了自己的队列中。Sink  Runner的它主要是用来驱动Sink  Processor,Sink  Processor又进一步的驱动Sink,进行了消息的消费或者是二次数据的消费。Sink  Processor可以去设置一些消费的策略或者拉取数据的策略,通常有负载均衡、直通的方式、故障转移的方式。Sink主要是从Channel中拉取数据、拉取日志信息,然后最后存放到了接收方,接收方可以是Flume Agent,也可以是数据最终的归宿,比如说HDFS、HBase等等这样的检索组件。

基本概念-Source

Source负责接收events或通过特殊机制产生events,并将events批量放到一个或多个Channels。有驱动和轮询2种类型的Source。

  • 驱动型source:是外部主动发送数据给Flume,驱动Flume接受数据。
  • 轮询source:是Flume周期性主动去获取数据。
  • Source必须至少和一个channel关联。

Source类型

  • exec source:执行某个命令或者脚本,并将其执行结果的输出作为数据源。
  • avro source:提供一个基于avro协议的server,bind到某个端口上,等待avro协议客户端发过来的数据。
  • thrift source:同avro,不过传输协议为thrift。
  • http source:支持http的post发送数据。
  • syslog source:采集系统syslog。
  • spooling directory  source:采集本地静态文件。
  • jms source:从消息队列获取数据。
  • Kafka source:从Kafka中获取数据。


Channel 基本概念

Channel位于Source和Sink之间,Channel的作用类似队列,用于临时缓存进来的events,当Sink成功地将events发送到下一跳的channel或最终目的,events从Channel移除。

不同的Channel提供的持久化水平也是不一样的:

  • Memory Channel:不会持久化。
  • File Channel:基于WAL((预写式日志Write-Ahead Log)实现。
  • JDBC Channel:基于嵌入式Database实现。

Channels支持事务,提供较弱的顺序保证,可以连接任何数量的Source和Sink。

Memory Channel:消息存放在内存中,提供高吞吐,但不提供可靠性;可能丢失数据。

File Channel:对数据持久化;但是配置较为麻烦,需要配置数据目录和checkpoint目录;不同的file channel均需要配置一个checkpoint目录。

JDBC Channel:内置的derby数据库,对event进行了持久化,提供高可靠性;可以取代同样具有持久特性的file channel。


Sink基本概念

Sink负责将events传输到下一跳或最终目的,成功完成后将events从channel移除。必须作用于一个确切的channel。

Sink类型

  • hdfs sink:将数据写到hdfs上。
  • avro sink:使用avro协议将数据发送给另下一跳的flume。
  • thift sink:同avro,不过传输协议为thrift。
  • file roll sink:将数据保存在本地文件系统中。
  • hbase sink:将数据写到hbase中。
  • Kafka sink:将数据写入到Kafka中。
  • MorphlineSolr sink:将数据写入到Solr中。

二、Flume关键特性介绍

Flume支持采集日志文件

Flume主要是用来做二次收集的,它非常的轻便、非常好用,使用它只需要去做一些配置文件,然后启动它就可以完成具体的任务。把信息从日志数据库中获取,然后把它发送到了我们的接收方。这种模式我们是采用的是单Agent进程,每个Agent进程对应一个数据的来源,并且最终把数据存储到接收方,它主要是用于将集群内的日志文件采集并归档到HDFS、HBase、Kafka上,供上层应用对数据分析、清洗数据使用。

Flume支持多级级联和多路复制

Flume支持将多个Flume级联起来,同时级联节点内部支持数据复制。

如果说HDFS它接收数据发生了宕机,Flume可以把发送的数据进行回滚,同时选择一个正常的链路把数据发送给下一个接收方,这是一种错误恢复的配置方式。

这个场景主要应用于:收集FusionInsight集群外上的节点上的日志,并通过多个Flume节点,最终汇聚到集群当中。

Flume级联消息压缩、加密

Flume级联节点之间的数据传输支持压缩和加密,提升数据传输效率和安全性。在同一个Flume内部进行传输时,不需要加密,为进程内部的数据交换。

比如前一个Agent它的输出信息经过Sink之后,它要发送给下一个Flume,在这个两个Agent之间,我们可以用Flume的api来对它进行压缩或者是加密,最后发送给Flume。在Flume的内部之后可以进行来解密。

Flume 数据监控

我们可以对Flume的一些信息进行监控。比如说呢我们可以通过华为的MRS manager对Flume来进行一些状态监控。Source接收的数据量,Channel缓存的数据量,Sink写入的数据量,这些都可以通过Manager图形化界面呈现出来。


Flume传输可靠性

Flume在传输数据过程中,采用事务管理方式,保证传输过程中数据不会丢失,增强了数据传输的可靠性,同时缓存在channel中的数据如果采用file channel,进程或者节点重启数据不会丢失。增加数据传输的可靠性,同时Flume可以采用一些持久性的通道,以此保证数据不会丢失。

Flume 传输可靠性(failover)

Flume在传输数据过程中,如果下一跳的Flume节点故障或者数据接收异常时,可以自动切换到另外一路上继续传输。

Flume对于下一跳的策略默认有三种,一种是Default方式,另外还有Load balancing。还有一种方式是容错(failover)方式。那容错方式当他发现接收通道出现问题之后,它会自动切换到另外一个通道上来继续传输。Load balancing是我们会根据情况自由地去选择,根据算法去选择了往哪个分支上发送数据。


Flume传输过程中数据过滤

Flume在传输数据过程中,可以简单的对数据简单过滤、清洗,可以去掉不关心的数据,同时如果需要对复杂的数据过滤,需要用户根据自己的数据特殊性,开发过滤插件,Flume支持第三方过滤插件调用。用户也可以去继承了某一个接口,自己去开发相应的过滤组件。


三、Flume应用举例

操作示例1

说明

本例子演示Flume通过采集集群内应用(比如网银系统)产生的日志到HDFS上。

1、数据准备

在集群某一个节点上创建日志目录/tmp/log_test。该目录作为监控目录mkdir /tmp/log_test。

2、下载Flume客户端

登录MRS MAanager集群管理界面,单击“服务管理> Flume >下载客户端。

3、安装Flume客户端

解压客户端

tar-xvf MRS_Flume_Client.tar

tar -xvf MRS_Flume_ClientConfig.tar

cd /tmp/MRS-client/MRS_Flume ClientConfig/Flume

tar -xvf FusionInsight-Flume-1.6.0.tar.gz

安装客户端

./install.sh -d /opt/FlumeClient -f hostlP -c

flume/conf/client.properties.properties

4、配置Flume source

server.sources = a1

server.channels = ch1

server.sinks = s1

#the source configuration of a1

server.sources.a1.type = spooldir

server.sources.a1.spoolDir = /tmp/log_test

server.sources.a1.fileSuffix = .COMPLETED

server.sources.a1.deletePolicy = never

server.sources.a1.trackerDir = .flumespool

server.sources.a 1.ignorePattern = A$

server.sources.a1.batchSize = 1000

server.sources.a1.inputCharset = UTF-8

server.sources.a1.deserializer = LINE

server.sources.a1.selector.type = replicating

server.sources.a1.fileHeaderKey = file

server.sources.a1.fileHeader = false

server.sources.a1.channels = ch1

5、配置Flume channel

# the channel configuration of ch1

server.channels.ch1.type = memory

server.channels.ch1.capacity = 10000

server.channels.ch1.transactionCapacity = 1000

server.channels.ch1.channlefullcount = 10

server.channels.ch1.keep-alive = 3

server.channels.ch1.byteCapacityBufferPercentage = 20

6、配置Flume sink

server.sinks.s1.type = hdfs

server.sinks.s1.hdfs.path= /tmp/flume_avro

server.sinks.s1.hdfs.filePrefix = over_%{basename}

server.sinks.s1.hdfs.inUseSuffix = .tmp

server.sinks.s1.hdfs.rolllnterval = 30

server.sinks.s1.hdfs.rollSize = 1024

server.sinks.s1.hdfs.rollCount = 10

server.sinks.s1.hdfs.fileType = DataStream

server.sinks.s1.hdfs.maxOpenFiles = 5000

server.sinks.s1.hdfs.writeFormat = Writable

server.sinks.s1.hdfs.callTimeout = 10000

server.sinks.s1.hdfs.threadsPoolSize = 10

server.sinks.s1.hdfs.failcount = 10

server.sinks.s 1.hdfs.fileCloseByEndEvent = true

server.sinks.s1.channel = ch1

7、将Flume agent的配置文件命名为properties.properties,并上传该配置文件

8、向监控目录/tmp/log_test生产数据:

mw /var/log/log.11 /tmp/log_test

查看数据是香sink到HDFS:hdfs dfs -ls /tmp/flume_avro

此时,log.11已被flume重命名为log.11.COMPLETED,表示已采集成功。


操作示例2

说明

本例子演示Flume实时采集点击流日志到Kafka,用于后续的实时分析处理。

数据准备

在集群某一个节点上创建日志目录/tmp/log_click 。数据采集到Kafka topic_1028中。

1、配置Flume source:

server.sources = a1

server.channels = ch1

server.sinks = s1

# the source configuration of a1

server.sources.a1.type = spooldir

server.sources.a1.spoolDir = /tmp/log_click

server.sources.a1.fileSuffix =.COMPLETED

server.sources.a1.deletePolicy = never

server.sources.a1.trackerDir = .flumespool

server.sources.a1.ignorePattern = A$

server.sources.a1.batchSize = 1000

server.sources.a1.inputCharset = UTF-8

server.sources.a1.selector.type = replicating

jserver.sources.a1.basenameHeaderKey = basename

server.sources.a1.deserializer.maxBatchLine = 1

server.sources.a1.deserializer.maxLineLength = 2048

server.sources.a1.channels = ch1

2、配置Flume channel:

# the channel configuration of ch1

server.channels.ch1.type = memory

server.channels.ch1.capacity = 10000

server.channels.ch1.transactionCapacity = 1000

server.channels.ch1.channlefullcount = 10

server.channels.ch1.keep-alive = 3

server.channels.ch1.byteCapacityBufferPercentage = 20

4、配置Flume sink,把kafka作为数据的接收方

# the sink configuration of s1

server.sinks.s1.type = org.apache.flume.sink.kafka.KafkaSink

server.sinks.s1.kafka.topic = topic_1028

server.sinks.s1.flumeBatchSize = 1000

server.sinks.s1.kafka.producer.type = sync

server.sinks.s1.kafka.bootstrap.servers = 192.168.225.15:21007

server.sinks.s1.kafka.security.protocol = SASL_PLAINTEXT

server.sinks.s1.requiredAcks =0

server.sinks.s1.channel = ch1

5、上传配置文件到Flume。使用Kafka命令查看采集到Kafka topic_1028的数据。通过华为的大数据平台把数据上传到系统中。


总结

Flume的功能和应用场景,并对Flume的一些基本概念、作用、可靠性、配置项做了详细说明。清楚的知道了Flume的作用、适用场景以及如何正确配置使用Flume。

学习推荐

华为Learning网站: https://support.huawei.com/learning

华为e学云: https://support.huawei.com/learning/elearning

华为Support案例库:https://support.huawei.com/enterprise


本文整理自华为云社区【内容共创系列】活动。
查看活动详情:https://bbs.huaweicloud.com/blogs/314887
相关任务详情:Flume海量日志聚合

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。