【实践案例】通过Flume生产数据到kafka

举报
尘枫 发表于 2021/06/25 17:27:55 2021/06/25
【摘要】 Flume, Source, Sink, Kafka

本示例中使用到的Source、Channel、Sink分别是 SpoolDir source、File Channel、Kafka Sink。配置清单如下:

server.sources = spool111
server.channels = channel_111
server.sinks = sink111

#config the source
server.sources.spool111.type = spooldir
server.sources.spool111.spoolDir = /tmp/testflume
server.sources.spool111.fileSuffix = .COMPLETED
server.sources.spool111.ignorePattern = ^$
server.sources.spool111.trackerDir = /tmp/test/tracker
server.sources.spool111.maxBlobLength = 16384
server.sources.spool111.batchSize = 1998
server.sources.spool111.inputCharset = UTF-8
server.sources.spool111.deserializer = LINE
server.sources.spool111.selector.type = replicating
server.sources.spool111.fileHeaderKey = file
server.sources.spool111.fileHeader = false
server.sources.spool111.basenameHeader = true
server.sources.spool111.basenameHeaderKey = basename
server.sources.spool111.deletePolicy = never
server.sources.spool111.channels = channel_111

#config the channel
server.channels.channel_111.type = file
server.channels.channel_111.capacity = 7000
server.channels.channel_111.write-timeout = 1
server.channels.channel_111.transactionCapacity = 6000
server.channels.channel_111.maxFileSize = 2146435071
server.channels.channel_111.minimumRequiredSpace = 524288000
server.channels.channel_111.dataDirs = /tmp/test/dataDirs
server.channels.channel_111.checkpointDir = /tmp/test/checkpoint


#config the sink

server.sinks.sink111.type = org.apache.flume.sink.kafka.KafkaSink
server.sinks.sink111.kafka.bootstrap.servers = ${你的brokerIP实例}:21007,${你的brokerIP实例}:21007,${你的brokerIP实例}:21007
server.sinks.sink111.kafka.topic = sink-topic1
server.sinks.sink111.kafka.flumeBatchSize = 500
server.sinks.sink111.channel = channel_111

执行完成以后,可看到监控目录所在的文件被打上配置的完成标识符,completed.表示文件被source读完了。但这并不表示Kafka就一定正常消费了。 可以通过查看flume的日志,Kafka topic监控 页面以及kafka客户端命令来确认。
image.png
(1) 日志查看启动状态:
image.png

(2)页面观察 登录FI集群,选择Kafka服务,查看Kafka topic 监控页面。是否生成了以指定名称(此处为sink-topic)的topic.
image.png
(3) 通过Kafka客户端读取kafka topic中的数据。
image.png

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。