Logstash集成Kafka配置方法(基于Docker)

举报
菊花茶 发表于 2018/01/22 01:39:29 2018/01/22
【摘要】 Kafka因为具有优秀的并发读写性能在大数据收集传输过程中有的非常重要的作用。近期项目计划将Kafka作为数据建模分析的总线使用,但是因为项目的特殊性需求,我们在两个不同的地理位置上部署了两套Kafka队列,并且要求其中一套能把数据传递到另一套的指定队列,说简单点就是需要从一个Kafka队列里读数据,然后写到另一个Kafka队列里,读写Kafka队列的工具有很多,相比Apache Flume我们更

Kafka因为具有优秀的并发读写性能在大数据收集传输过程中有的非常重要的作用。近期项目计划将Kafka作为数据建模分析的总线使用,但是因为项目的特殊性需求,我们在两个不同的地理位置上部署了两套Kafka队列,并且要求其中一套能把数据传递到另一套的指定队列,说简单点就是需要从一个Kafka队列里读数据,然后写到另一个Kafka队列里,读写Kafka队列的工具有很多,相比Apache Flume我们更倾向于Logstash。本文简单介绍一下何如基于Docker容器配置Logstash集成Kafka。

实验场景

假设有两个Kafka的Topic:

  • 第一个Logstash实例读取实验机器的linux系统日志,将文本原样发送到Test2队列;

  • 第二个Logtash实例从Test2队列读取消息,转发到Test3队列

实现用Logstash读/写队列实验。

场景说明

Kafka环境准备

基于Docker搭建简单Kafka的指导请参考《基于Docker搭建分布式消息队列Kafka》

配置Logstash

基于Docker安装Logstash非常简单,不用准备一大堆环境变量什么的,直接拉镜像就行了。Logstash对Kafka的操作需要用到相应的插件,但是当前的版本默认就带这些插件,所以也不需要额外注意什么。

docker pull logstash

从日志文件到Kafka

logstash.conf

input {
  file {
    path => "/log/*"
  }
}

output {
  kafka {
    topic_id => "test2"
    bootstrap_servers => "192.168.2.249:9092"
  }
 stdout {}
}

配置很简单,我们为了方便调试,在output里加了一个stdout把日志输出到控制台,实际使用的时候最好还是去掉。这里还要注意一点,因为配置是跑在docker容器里面的,如果我们实际要监控的是宿主机的系统日志,那么在启容器的时候一定要把监控的目标路径映射到容器里,然后配置的path指向的是映射到容器里的位置。

logstash.conf所在的目录也要映射到容器里面,启动容器:

docker run -it --rm \
-v /root/config-dir:/config-dir \
-v /var/log/messages:/log/messages \
logstash -f /config-dir/logstash.conf

容器启动以后可以看到控制台上有日志输出,同时我们也可以登录到Kafka的容器里用命令查看kafka队列的写入情况:

/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.2.249:9092 --topic test2 --from-beginning

从Kafka到Kafka

logstash2.conf 从test2队列中读取数据,写入新的队列test3:

input{
      kafka{
        bootstrap_servers => "192.168.2.249:9092"
        client_id => "test"
        group_id => "test"
        auto_offset_reset => "latest" 
        consumer_threads => 4
        decorate_events => true 
        topics => ["test2"]
        type => "bhy"
      }
}
output {
  kafka {
    topic_id => "test3"
    bootstrap_servers => "192.16.2.249:9092"
  }

  stdout {}
}
  • auto_offset_reset => “latest” //从最新的偏移量开始消费

  • decorate_events => true //此属性会将当前topic、offset、group、partition等信息也带到message中

  • topics => [“logq”,”loge”] //数组类型,可配置多个topic

  • type => “bhy” //所有插件通用属性,尤其在input里面配置多个数据源时很有用


启动容器同样需要映射相应的目录:

docker run -it --privileged --rm \
-v /root/config-dir:/config-dir \
logstash -f /config-dir/logstash2.conf

同样的,可以用kafka命令行查看消息是否正确写入。我们也可以用Kafka的UI工具来查看,结果如下,可以看到test2和test3同时都有数据写入

Kafka Manger


【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

举报
请填写举报理由
0/200