使用filebeat将日志发布到kafka
架构图
Filbeat with kafka
我们可以配置filebeat从本地/远程服务器提取日志文件内容。Filebeat保证日志文件的内容将至少传递一次到配置的输出,并且没有数据丢失。它实现此行为是因为它将每个事件的交付状态存储在注册表文件中。
通常filebeat与ELK堆栈上的logstash集成在一起。Filebeat监听日志文件的新内容,并将它们发布到logstash。Logstash过滤并发布到elasticsearch。然后kibana会把它们显示在仪表板上。现在最新版本的filebeat支持将日志文件数据直接输出到kafka。
在这篇文章中,我将展示我是如何将filebeat与kafka集成在一起,从不同的服务中获取日志的。所有与这篇文章相关的源代码都可以在beats gitlab repo上找到。请克隆它并按照下面的步骤操作。我在用docker运行我所有的服务。
1. Kafka/Zookeeper docker-compose
以下是docker- composer的内容。对应于运行zookeeper和kafka。
ookeeper:
image: confluentinc/cp-zookeeper:5.1.0
container_name: zookeeper
environment:
- ZOOKEEPER_CLIENT_PORT=2181
ports:
- 2181:2181
kafka:
image: confluentinc/cp-kafka:5.1.0
container_name: kafka
environment:
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://10.252.94.61:9092
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
ports:
- 9092:9092
links:
- zookeeper
2. 运行 kafka/zookeeper
我可以用下面的命令启动zookeeper和kafka。在这里,我运行的是zookeeper和kafka的单个实例。Kafka将在我的本地机器上使用9092端口。
docker-compose up -d zookeeper
docker-compose up -d kafka
3. Filebeat docker-compose
以下是docker-compose的内容。对应于运行filebeat的Yml。
filebeat:
image: docker.elastic.co/beats/filebeat:6.6.0
container_name: filebeat
user: root
environment:
- strict.perms=false
volumes:
- './filebeat.yml:/usr/share/filebeat/filebeat.yml:ro'
- '/private/var/services/octopus/filebeat:/usr/share/filebeat/data:rw'
- '/private/var/services/octopus/netflower/logs:/usr/share/services/netflower/logs'
- '/private/var/services/octopus/execer/logs:/usr/share/services/execer/logs'
在这里,我定义了四个docker卷、filbeat配置文件、filbeat数据目录和本地机器上存在的两个服务日志文件。在filebeat配置中,我将这些日志文件指定为输入(勘探者)。
4. Filebeat config
Filebeat配置文件存在于/usr/share/ Filebeat / Filebeat中。docker容器内的Yml。我用filebeat替换文件内容。存在于我的本地目录(与docker卷)的Yml文件。它定义了所有的日志文件配置和kafka输出配置。
filebeat.prospectors:
- type: log
enabled: true
tags:
- netflower
paths:
- /usr/share/services/netflower/logs/netflower.log
- type: log
enabled: true
tags:
- execer
paths:
- /usr/share/services/execer/logs/execer.log
output.kafka:
# specifying filebeat to take timestamp and message fields, other wise it
# take the lines as json and publish to kafka
codec.format:
string: '%{[@timestamp]} %{[message]}'
# kafka
# 10.252.94.61 is my local machine ip address
# publishing to 'log' topic
hosts: ["10.252.94.61:9092"]
topic: 'log'
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
5. 运行 filebeat
现在我可以用下面的命令启动filebeat。它将开始读取定义了文件节拍的日志文件内容。并将它们推送到kafka主题日志。
docker-compose up -d filebeat
6. 测试with kafkacat
现在filebeat将日志文件内容发布到日志kafka主题中。为了查看发布内容,我们可以使用kafkacat命令创建一个kafka控制台消费者。你可以从这里带走卡夫卡猫。
# kafka broker - 10.252.94.61:9092
# kafka topic - log
kafkacat -C -b 10.252.94.61:9092 -t log
要使用Filebeat将日志发布到Kafka,需要进行以下步骤:
- 在Filebeat配置文件中,配置输入模块或者日志路径,以指定要收集的日志数据。
- 配置Filebeat的输出为Kafka,指定Kafka的连接信息(例如,主机地址和端口)以及要发布到的Kafka主题。
- 确保Filebeat已安装并正确配置了Kafka输出插件。
- 启动Filebeat,并确保它与Kafka建立连接。
- Filebeat将开始监听配置的日志路径或输入模块,收集日志数据,并将其发布到配置的Kafka主题。
- 确保Kafka消费者已订阅相应的主题以接收Filebeat发布的日志消息。
- 点赞
- 收藏
- 关注作者
评论(0)