flume读取kafka的数据写入到HDFS

举报
一夜 发表于 2020/12/22 14:40:28 2020/12/22
【摘要】 操作场景Flume采集文件内容导入到habse前提条件已创建启用Kerberos认证的流集群。已在日志生成节点安装Flume客户端,请参见安装Flume客户端。已配置网络,使日志生成节点与流集群互通。操作步骤(1)  从HDFS客户端拷贝配置文件core-site.xml,hdfs-site.xml到Flume Client的配置目录 " /opt/FlumeClient/fusionins...

操作场景

Flume采集文件内容导入到habse

前提条件

  • 已创建启用Kerberos认证的流集群。
  • 已在日志生成节点安装Flume客户端,请参见安装Flume客户端
  • 已配置网络,使日志生成节点与流集群互通。

操作步骤

(1)  从HDFS客户端拷贝配置文件core-site.xml,hdfs-site.xml到Flume Client的配置目录 " /opt/FlumeClient/fusioninsight-flume-1.6.0/conf" 下。

通常可以在分析集群Master节点HBase客户端安装目录如“/opt/client/HDFS/hadoop/etc/hadoop/”下找到core-site.xml,hdfs-site.xml文件。

(2)  从MRS集群下载用户的认证凭据。

①在MRS Manager,单击“系统设置”。

 ②在“权限配置”区域,单击“用户管理”。

 ③在用户列表中选择需要的用户,单击后面的“更多”下载用户凭据。

 ④解压下载的用户凭据文件,获取krb5.conf和user.keytab文件。

(3) 将上一步获得的krb5.conf和user.keytab拷贝到Flume Client所在节点的配置目录 " /opt/FlumeClient/fusioninsight-flume-1.X.X/conf" 下。

(4)修改配置文件jaas.conf,

        ① 将/opt/Bigdata/MRS_x.x.x/1_x_Flume/etc/" 的jaas.conf文件拷贝到/opt/FlumeClient/fusioninsight-flume-1.X.XS/conf下

②  修改jaas.conf文件中的参数principal 与keyTab

Principal: 可以kinit 用户名进行认证查看参数

keyTab: 定义的用户认证文件完整路径即步骤(1)中保存user.keytab认证文件的目录

 (5) 修改配置文件flume-env.sh,文件所在目录 /opt/FlumeClient/fusioninsight-flume-1.6.0/conf。

在 “-XX:+UseCMSCompactAtFullCollection”后面,增加以下内容:-Djava.security.krb5.conf=/opt/FlumeClient/fusioninsight-flume-1.6.0/conf /krb5.conf -Djava.security.auth.login.config=/opt/FlumeClient/fusioninsight-flume-1.6.0/conf /jaas.conf -Dzookeeper.request.timeout=120000

 

krb5.conf与jaas.conf根据实际情况,修改配置文件路径,然后保存并退出配置文件。

(6)假设Flume客户端安装路径为“/opt/FlumeClient”,执行以下命令,重启Flume客户端:

cd /opt/FlumeClient/fusioninsight-flume-1.6.0/bin

./flume-manage.sh restart

 

(7)执行以下命令,修改Flume客户端配置文件“properties.properties”。

vi Flume客户端安装目录/fusioninsight-flume-1.6.0/conf/properties.properties

将以下内容保存到文件中:

client.sources = obs
client.channels = flume
client.sinks = hdfs

client.sources.obs.type = org.apache.flume.source.kafka.KafkaSource
client.sources.obs.monTime = 0
client.sources.obs.nodatatime = 0
client.sources.obs.batchSize = 1
client.sources.obs.batchDurationMillis = 1
client.sources.obs.keepTopicInHeader = false
client.sources.obs.keepPartitionInHeader = false
client.sources.obs.kafka.bootstrap.servers = 172.16.0.43:21007
client.sources.obs.kafka.consumer.group.id = consumer_group
client.sources.obs.kafka.topics = topic01
client.sources.obs.kafka.security.protocol = SASL_PLAINTEXT
client.sources.obs.kafka.kerberos.domain.name = hadoop.f178e9bd_eb34_4cdd_b5b1_2f9fd4ddf4b1.com
client.sources.obs.channels = flume


client.channels.flume.type = memory
client.channels.flume.capacity = 10000
client.channels.flume.transactionCapacity = 1000
client.channels.flume.channelfullcount = 10
client.channels.flume.keep-alive = 3
client.channels.flume.byteCapacity =
client.channels.flume.byteCapacityBufferPercentage = 20


client.sinks.hdfs.type = hdfs
client.sinks.hdfs.hdfs.path = hdfs://hacluster/tmp/ldp
client.sinks.hdfs.montime =
client.sinks.hdfs.hdfs.filePrefix = over_%{basename}
client.sinks.hdfs.hdfs.fileSuffix =
client.sinks.hdfs.hdfs.inUsePrefix =
client.sinks.hdfs.hdfs.inUseSuffix = .tmp
client.sinks.hdfs.hdfs.idleTimeout = 0
client.sinks.hdfs.hdfs.inUseSuffix = .tmp
client.sinks.hdfs.hdfs.idleTimeout = 0
client.sinks.hdfs.hdfs.batchSize = 1000
client.sinks.hdfs.hdfs.codeC =
client.sinks.hdfs.hdfs.fileType = DataStream
client.sinks.hdfs.hdfs.maxOpenFiles = 5000
client.sinks.hdfs.hdfs.writeFormat = Writable
client.sinks.hdfs.hdfs.callTimeout = 10000
client.sinks.hdfs.hdfs.threadsPoolSize = 10
client.sinks.hdfs.hdfs.rollTimerPoolSize = 1
client.sinks.hdfs.hdfs.kerberosPrincipal = lidengpeng
client.sinks.hdfs.hdfs.kerberosKeytab = /opt/FlumeClient/fusioninsight-flume-1.6.0/conf/user.keytab
client.sinks.hdfs.hdfs.round = false
client.sinks.hdfs.hdfs.roundUnit = second
client.sinks.hdfs.hdfs.useLocalTimeStamp = true
client.sinks.hdfs.hdfs.failcount = 10
client.sinks.hdfs.hdfs.fileCloseByEndEvent = false
client.sinks.hdfs.hdfs.rollInterval = 30
client.sinks.hdfs.hdfs.rollSize = 1024
client.sinks.hdfs.hdfs.rollCount = 10
client.sinks.hdfs.hdfs.batchCallTimeout = 0
client.sinks.hdfs.serializer.appendNewline = true
client.sinks.hdfs.channel = flume

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。