flume读取kafka的数据写入到HDFS
操作场景
Flume采集文件内容导入到habse
前提条件
- 已创建启用Kerberos认证的流集群。
- 已在日志生成节点安装Flume客户端,请参见安装Flume客户端。
- 已配置网络,使日志生成节点与流集群互通。
操作步骤
(1) 从HDFS客户端拷贝配置文件core-site.xml,hdfs-site.xml到Flume Client的配置目录 " /opt/FlumeClient/fusioninsight-flume-1.6.0/conf" 下。
通常可以在分析集群Master节点HBase客户端安装目录如“/opt/client/HDFS/hadoop/etc/hadoop/”下找到core-site.xml,hdfs-site.xml文件。
(2) 从MRS集群下载用户的认证凭据。
①在MRS Manager,单击“系统设置”。
②在“权限配置”区域,单击“用户管理”。
③在用户列表中选择需要的用户,单击后面的“更多”下载用户凭据。
④解压下载的用户凭据文件,获取krb5.conf和user.keytab文件。
(3) 将上一步获得的krb5.conf和user.keytab拷贝到Flume Client所在节点的配置目录 " /opt/FlumeClient/fusioninsight-flume-1.X.X/conf" 下。
(4)修改配置文件jaas.conf,
① 将/opt/Bigdata/MRS_x.x.x/1_x_Flume/etc/" 的jaas.conf文件拷贝到/opt/FlumeClient/fusioninsight-flume-1.X.XS/conf下
② 修改jaas.conf文件中的参数principal 与keyTab
Principal: 可以kinit 用户名进行认证查看参数
keyTab: 定义的用户认证文件完整路径即步骤(1)中保存user.keytab认证文件的目录
(5) 修改配置文件flume-env.sh,文件所在目录 /opt/FlumeClient/fusioninsight-flume-1.6.0/conf。
在 “-XX:+UseCMSCompactAtFullCollection”后面,增加以下内容:-Djava.security.krb5.conf=/opt/FlumeClient/fusioninsight-flume-1.6.0/conf /krb5.conf -Djava.security.auth.login.config=/opt/FlumeClient/fusioninsight-flume-1.6.0/conf /jaas.conf -Dzookeeper.request.timeout=120000
krb5.conf与jaas.conf根据实际情况,修改配置文件路径,然后保存并退出配置文件。
(6)假设Flume客户端安装路径为“/opt/FlumeClient”,执行以下命令,重启Flume客户端:
cd /opt/FlumeClient/fusioninsight-flume-1.6.0/bin
./flume-manage.sh restart
(7)执行以下命令,修改Flume客户端配置文件“properties.properties”。
vi Flume客户端安装目录/fusioninsight-flume-1.6.0/conf/properties.properties
将以下内容保存到文件中:
client.sources = obs
client.channels = flume
client.sinks = hdfs
client.sources.obs.type = org.apache.flume.source.kafka.KafkaSource
client.sources.obs.monTime = 0
client.sources.obs.nodatatime = 0
client.sources.obs.batchSize = 1
client.sources.obs.batchDurationMillis = 1
client.sources.obs.keepTopicInHeader = false
client.sources.obs.keepPartitionInHeader = false
client.sources.obs.kafka.bootstrap.servers = 172.16.0.43:21007
client.sources.obs.kafka.consumer.group.id = consumer_group
client.sources.obs.kafka.topics = topic01
client.sources.obs.kafka.security.protocol = SASL_PLAINTEXT
client.sources.obs.kafka.kerberos.domain.name = hadoop.f178e9bd_eb34_4cdd_b5b1_2f9fd4ddf4b1.com
client.sources.obs.channels = flume
client.channels.flume.type = memory
client.channels.flume.capacity = 10000
client.channels.flume.transactionCapacity = 1000
client.channels.flume.channelfullcount = 10
client.channels.flume.keep-alive = 3
client.channels.flume.byteCapacity =
client.channels.flume.byteCapacityBufferPercentage = 20
client.sinks.hdfs.type = hdfs
client.sinks.hdfs.hdfs.path = hdfs://hacluster/tmp/ldp
client.sinks.hdfs.montime =
client.sinks.hdfs.hdfs.filePrefix = over_%{basename}
client.sinks.hdfs.hdfs.fileSuffix =
client.sinks.hdfs.hdfs.inUsePrefix =
client.sinks.hdfs.hdfs.inUseSuffix = .tmp
client.sinks.hdfs.hdfs.idleTimeout = 0
client.sinks.hdfs.hdfs.inUseSuffix = .tmp
client.sinks.hdfs.hdfs.idleTimeout = 0
client.sinks.hdfs.hdfs.batchSize = 1000
client.sinks.hdfs.hdfs.codeC =
client.sinks.hdfs.hdfs.fileType = DataStream
client.sinks.hdfs.hdfs.maxOpenFiles = 5000
client.sinks.hdfs.hdfs.writeFormat = Writable
client.sinks.hdfs.hdfs.callTimeout = 10000
client.sinks.hdfs.hdfs.threadsPoolSize = 10
client.sinks.hdfs.hdfs.rollTimerPoolSize = 1
client.sinks.hdfs.hdfs.kerberosPrincipal = lidengpeng
client.sinks.hdfs.hdfs.kerberosKeytab = /opt/FlumeClient/fusioninsight-flume-1.6.0/conf/user.keytab
client.sinks.hdfs.hdfs.round = false
client.sinks.hdfs.hdfs.roundUnit = second
client.sinks.hdfs.hdfs.useLocalTimeStamp = true
client.sinks.hdfs.hdfs.failcount = 10
client.sinks.hdfs.hdfs.fileCloseByEndEvent = false
client.sinks.hdfs.hdfs.rollInterval = 30
client.sinks.hdfs.hdfs.rollSize = 1024
client.sinks.hdfs.hdfs.rollCount = 10
client.sinks.hdfs.hdfs.batchCallTimeout = 0
client.sinks.hdfs.serializer.appendNewline = true
client.sinks.hdfs.channel = flume
- 点赞
- 收藏
- 关注作者
评论(0)