【实践案例】通过Flume生产数据到kafka
【摘要】 Flume, Source, Sink, Kafka
本示例中使用到的Source、Channel、Sink分别是 SpoolDir source、File Channel、Kafka Sink。配置清单如下:
server.sources = spool111
server.channels = channel_111
server.sinks = sink111
#config the source
server.sources.spool111.type = spooldir
server.sources.spool111.spoolDir = /tmp/testflume
server.sources.spool111.fileSuffix = .COMPLETED
server.sources.spool111.ignorePattern = ^$
server.sources.spool111.trackerDir = /tmp/test/tracker
server.sources.spool111.maxBlobLength = 16384
server.sources.spool111.batchSize = 1998
server.sources.spool111.inputCharset = UTF-8
server.sources.spool111.deserializer = LINE
server.sources.spool111.selector.type = replicating
server.sources.spool111.fileHeaderKey = file
server.sources.spool111.fileHeader = false
server.sources.spool111.basenameHeader = true
server.sources.spool111.basenameHeaderKey = basename
server.sources.spool111.deletePolicy = never
server.sources.spool111.channels = channel_111
#config the channel
server.channels.channel_111.type = file
server.channels.channel_111.capacity = 7000
server.channels.channel_111.write-timeout = 1
server.channels.channel_111.transactionCapacity = 6000
server.channels.channel_111.maxFileSize = 2146435071
server.channels.channel_111.minimumRequiredSpace = 524288000
server.channels.channel_111.dataDirs = /tmp/test/dataDirs
server.channels.channel_111.checkpointDir = /tmp/test/checkpoint
#config the sink
server.sinks.sink111.type = org.apache.flume.sink.kafka.KafkaSink
server.sinks.sink111.kafka.bootstrap.servers = ${你的brokerIP实例}:21007,${你的brokerIP实例}:21007,${你的brokerIP实例}:21007
server.sinks.sink111.kafka.topic = sink-topic1
server.sinks.sink111.kafka.flumeBatchSize = 500
server.sinks.sink111.channel = channel_111
执行完成以后,可看到监控目录所在的文件被打上配置的完成标识符,completed.表示文件被source读完了。但这并不表示Kafka就一定正常消费了。 可以通过查看flume的日志,Kafka topic监控 页面以及kafka客户端命令来确认。
(1) 日志查看启动状态:
(2)页面观察 登录FI集群,选择Kafka服务,查看Kafka topic 监控页面。是否生成了以指定名称(此处为sink-topic)的topic.
(3) 通过Kafka客户端读取kafka topic中的数据。
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)