【日志可观测】经典组合Fluent-bit×Kafka×Logstash×Elasticsearch×Kibana
一、背景
随着企业数字化转型进程加快,日志数据在运维监控和数据分析中的重要性日益凸显,部分用户在使用K8S过程中,存在日志采集不全面、数据处理效率低、无法关联上下文分析能力弱,本方案基于 Fluent-bit+Kafka+Logstash+Elasticsearch+Kibana,解决云原生环境下日志采集不全、高并发丢包、格式混乱问题,实现容器日志统一采集、流量削峰、标准化处理,支撑故障快速定位、业务监控告警,让日志高效服务运维与业务决策。
二、方案简介
本方案采用先进的ELK(Elasticsearch, Logstash, Kibana)技术栈结合fluent-bit进行日志管理,旨在实现高效率的数据流处理和可视化展示。整体架构分为四个主要模块:日志采集插件 (fluent-bit)、消息队列缓冲层 (kafka)、解析转换过滤层 (logstash) 和 存储索引展示层 (elasticsearch & kibana),通过在k8s集群(cce集群)中部署fluent-bit容器实例来采集日志,并发送到外部kafka,再经过logstash对日志数据进行解析、转换、过滤,经过处理后的日志数据由elasticsearch进行存储,通过web服务kibana界面化的展示日志数据。最终形成了一套完整的云原生日志采集与分析系统

三、技术原理分析
Fluent-bit
工作原理:日志通过数据管道从数据源发送到目的地,一个数据管道通常由 Input、Parser、Filter、Buffer、Routing 、Output 组成 。
- Input:用于从数据源抽取数据,一个数据管道中可以包含多个 Input。
- Parser:负责将 Input 抽取的非结构化数据转化为标准的结构化数据,每个 Input 均可以定义自己的 Parser(可选)。
- Filter:负责对格式化数据进行过滤和修改。一个数据管道中可以包含多个 Filter,Filter 会顺序执行,其执行顺序与配置文件中的顺序一致。
- Buffer:用户缓存经过 Filter 处理的数据,默认情况下buffer把input插件的数据缓存到内存中,直到路由传递到output为止。
- Routing:将 Buffer 中缓存的数据路由到不同的 Output。
- Output:负责将数据发送到不同的目的地,一个数据管道中可以包含多个 Output。
Kafka
工作原理:Kafka 是一个分布式的消息队列系统,采用发布-订阅模型。生产者将消息发送到 Kafka 的主题(Topic)中,消费者从主题中订阅消息 博客园 。Kafka 将消息存储在磁盘上,并通过分区(Partition)和副本(Replica)机制来保证高可用性和可扩展性
Logstash
工作原理:Logstash 事件处理管道拥有三个阶段:输入 → 过滤 → 输出 。
- 输入(Inputs):使用 Input 插件将数据输入至 Logstash 中。一些常用的 Input 插件有 file、syslog、redis、beats 等 。
- 过滤(Filters):过滤器插件是在 Logstash 管道中的中间处理设备。如果事件符合特定的条件,则可以将过滤器与条件结合使用 以对事件执行操作。一些常用的 filters 包括 grok、mutate、drop、clone、geoip 等 。
- 输出(Outputs):输出是 Logstash 管道的最后阶段。一个事件可以通过多个输出进行传递,但一旦所有输出处理都完成了该事件就完成了全部流程的执行。一些常用的 outputs 包括 elasticsearch、file、graphite、statsd 等 。
Elasticsearch
工作原理:Elasticsearch 是一个基于 Lucene 的搜索引擎,采用倒排索引的方式存储和检索数据。它支持全文搜索、结构化搜索、分析等功能。数据被存储在索引(Index)中,每个索引可以包含多个文档
Kibana
工作原理:Kibana 是一个开源的分析和可视化平台,用于与 Elasticsearch 配合使用 。它允许用户通过可视化的方探索和分析数据,创建仪表板、图表、地图等 。Kibana 通过 Elasticsearch 的 API 来获取数据,并将其展示在用户界面上 。
四、具体使用场景
4.1 企业级日志管理场景
- 大规模分布式系统日志收集与分析:在大型企业中,系统通常由多个分布式服务组成,产生海量的日志数据。通过 Fluent-bit 在各服务节点上轻量级地采集日志,借助 Kafka 的高吞吐量和可扩展性进行缓冲和传输,Logstash 对日志进行进一步的解析和处理,最终存储到 Elasticsearch 中,再利用 Kibana 进行可视化展示和分析,可帮助企业快速定位问题、监控系统运行状态。
- 多业务系统日志整合:企业内部可能有多种不同的业务系统,如 ERP、CRM、OA 等,这些系统产生的日志格式各异。该方案可以将不同业务系统的日志统一收集、处理和存储,方便企业进行整体的日志管理和审计。
4.2 容器化环境日志管理场景
- Kubernetes 集群日志采集:在 Kubernetes 环境中,容器的生命周期较短且数量众多,日志管理较为复杂。Fluent-bit 以 DaemonSet 的形式部署在 Kubernetes 集群中,能够高效地采集容器的日志,并将其发送到 Kafka,后续流程与上述一致。这种方案不仅解决了容器日志的收集问题,还通过 Kafka 缓存机制保障了日志数据的完整性,避免因容器重启或删除导致日志丢失。
- 容器化微服务架构日志监控:对于采用容器化微服务架构的应用,该方案可以实时收集各个微服务产生的日志,通过 Logstash 对日志进行分析和处理,从而实现对微服务架构的全面监控,及时发现服务之间的调用问题、性能瓶颈等。
5.Fluent Bit核心配置详解
Fluent Bit通过配置文件来设置各个组件的行为,配置文件定义了Fluent bit将如何采集日志,如何处理日志,如何将日志对接到其他平台等行为。
因此在运行服务之前,需要深入了解一下如何编写正确的配置文件。
Fluent Bit的配置文件主要由四部分组成: 【SERVICE】、【INPUT】、【FILTER】、【OUTPUT】
SERVICE 部分:定义了全局服务级别的配置
[SERVICE]
Flush 1
Log_Level info
Daemon off
Parsers_File parsers.conf
HTTP_Server On
HTTP_Listen 0.0.0.0
HTTP_Port 2020
Health_Check On
scheduler.cap 180
scheduler.base 2
- Flush: 设置数据刷新到输出插件(output)的时间间隔,默认5s,每到刷新时间,fluent bit都会把数据刷新到输出插件
- Log_Level: 日志级别有 error、warning、info、debug和trace
- Daemon: 用于设置fluent bit为守护进程,当设置为on时 fluent bit会在启动后转入后台运行,不会阻塞当前的终端或命令行界面,通常用于在系统启动时自动启动Fluent Bit,而不需要手动干预。在Kubernetes环境中,通常不需要将Fluent Bit设置为守护进程,因为Kubernetes会负责管理Pod的生命周期,包括自动重启失败的容器。在这种情况下,Fluent Bit容器通常会以always重启策略运行,确保在任何情况下都能自动重启。
- Parsers_File: 指定解析器配置文件
- HTTP_Server:用于启用Fluent Bit的内置HTTP服务器,启用后可以通过HTTP接口来获取Fluent Bit的状态信息、配置信息、统计数据等也可以用于调试和实时监控Fluent Bit的行为
- HTTP_Listen: 设置HTTP服务器监听的地址,默认为0.0.0.0(即监听所有接口)
- HTTP_Port: 设置HTTP服务器的端口,默认为2020
- Health_Check: 开启健康检查功能,默认为off
- scheduler.cap: 用于设置在进行数据刷新时,如果输出插件(output)返回重试状态,调度器scheduler将等待的最大重试时间。这个配置项是与scheduler.base一起使用的,共同决定了重试前的等待时间,默认2000s。
- scheduler.base : 设置输出插件重试等待时间的下限,默认是5s。fluent bit使用指数退避和抖动算法来确定等待时间。等待时间是一个可配置的上下限的随机数,对于第n次重试,下限是scheduler.base的值,上限是min(base*(2^n),cap)。这里的base是指数退避的基数,而cap是上限值。通过合理配置这些值,可以优化Fluent Bit在面对网络波动或输出插件暂时不可用时的重试行为,从而提高数据传输的可靠性和效率。
INPUT部分: 负责从不同的数据源收集数据,定义了日志采集数据输入的配置
[INPUT]
Alias in_tail_default-stdout
Name tail
Buffer_Chunk_Size 128k
Buffer_Max_Size 512k
DB /db/default-stdout.db
Mem_Buf_Limit 40MB
Path /var/log/containers-symlinks/default-stdout/*/*/*/*/container-stdout*
Refresh_Interval 5
Skip_Empty_Lines On
Skip_Long_Lines On
Tag kube.*
multiline.parser docker, cri
threaded On
- Alias: 为输入插件指定一个别名。这个别名可以用来区分同一个INPUT输入插件的不同实例或者在配置文件中为特定的输入插件提供一个更易于识别的名称
- Name:FluentBit中,每个输入插件(input)都需要一个名称来标识它,这个名称通过Name配置项来设置。Name配置项告诉fluentbit需要加载并使用哪个插件。不同的输入插件有不同的配置选项。常用的输入插件有tail,tail输入插件用于监控一个或多个文本文件,类似于Unix中的tail -f命令,支持多行日志的合并,可以处理由于行分割导致的跨日志条目的情况
- Buffer_Chunk_Size:用于设置读取文件数据的初始缓冲区大小,这个值用于增加缓冲区大小,以适应更长的日志行。默认情况下Buffer_Chunk_Size的值为32k
- Buffer_Max_Size: 用于设置每个被监控文件的缓冲区大小限制,限制内存缓冲区可以增长的最大额度。如果读取的文件内容超过此限制,则将该文件从监控列表中移除。这些缓冲区配置对于处理大量日志数据和防止内存过度使用至关重要。
- DB: DB配置项与tail输入插件一起使用,用于跟踪文件的状态和偏移量,当使用tail插件监控日志文件时,DB文件会记录每个文件的读取位置,这样即使Fluent Bit服务重启,它也能从上次停止的地方继续读取,而不是从头开始。DB文件是一个SQLite数据库,它存储了每个被监控文件的最后读取位置。这个数据库文件通常放在一个持久化存储路径下,以确保服务重启后仍然可用。
- Mem_Buf_Limit: 用于限制tail输入插件在将数据添加到处理引擎时可以使用的内存量。如果达到了这个极限,tail插件将暂停接受更多的数据,直到已经刷新的数据被输出插件处理,释放了一部分内存后,它才会恢复接受数据。这个参数对于避免高负载情况下过度使用内存非常重要,特别是在日志数据生成速度超过输出速度时。通过设置Mem_Buf_Limit ,可以防止Fluent Bit占用过多内存,从而可能导致系统性能问题或服务崩溃。Mem_Buf_Limit仅适用于Tail输入插件,并且与Buffer_Chunk_Size和Buffer_Max_Size参数一起使用,进一步控制内存使用和文件处理。
Filter 部分:负责对格式化数据进行过滤和更改
[Filter]
Name kubernetes
match kube.*
match_Regex *
- name: 过滤器名称(必填,fluent-bit 通过name知道应该加载哪个插件)
- match: 传入记录标签匹配,* 作为统配符
- match_Regex:通过正则表达式语法进行配(和Match同时定义,Match_Regex优先)
Output 部分:负责将数据发送到不同的目的地,一个数据管道中可以包含多个 Output。
[OUTPUT]
name es
match kube.*
Host 192.168.30.40
Port 9200
Logstash_Format true
Logstash_Prefix cb-logstash-log
time_Key @time-satmp
- name: 过滤器名称(必填,fluent-bit 通过name知道应该加载哪个插件)
- match: 传入记录标签匹配,* 作为统配符
- Host:es实例的地址
- Port:es端口
- Logstash_Format:启用Logtash格式兼容性
- Logstash_Prefix:用 Logstash_Format 时,索引名称由前缀和后缀组成,例如:如果 Logstash_Prefix 等于 ‘mydata’ 和 Logstash_Suffix 等于 ‘YYYY.MM.DD’ 的索引将变为 ‘mydata-YYYY.MM.DD’
- time_Key:启用 Logstash_Format 后,每条记录都会获得一个新的时间戳字段
六、操作步骤
6.1 创建cce集群
6.2 通过helm方式在cce集群中部署Fluent-Bit
helm repo add fluent https://fluent.github.io/helm-charts
helm upgrade --install fluent-bit fluent/fluent-bit
部署完成后会生成对应的DaemonSet和ConfigMap


6.3 Fluent Bit对接自建kafka
6.31 安装kafka以及创建topic
6.311部署zookeeper
apiVersion: apps/v1
kind: Deployment
metadata:
name: zookeeper-deployment
spec:
replicas: 1
selector:
matchLabels:
app: zookeeper
template:
metadata:
labels:
app: zookeeper
spec:
containers:
- name: zookeeper
image: zookeeper:3.8
ports:
- containerPort: 2181
---
apiVersion: v1
kind: Service
metadata:
name: zookeeper-service
spec:
selector:
app: zookeeper
ports:
- protocol: TCP
port: 2181
targetPort: 2181
type: ClusterIP
6.312 部署kafka
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka
namespace: default
spec:
replicas: 1
selector:
matchLabels:
app: kafka
template:
metadata:
labels:
app: kafka
spec:
containers:
- env:
- name: KAFKA_BROKER_ID
value: "0"
- name: KAFKA_ZOOKEEPER_CONNECT
value: zookeeper-service:2181
- name: KAFKA_LISTENERS
value: PLAINTEXT://0.0.0.0:9092
- name: KAFKA_ADVERTISED_LISTENERS
value: PLAINTEXT://kafka-service:9092
- name: KAFKA_AUTO_CREATE_TOPICS_ENABLE
value: "true"
- name: ALLOW_PLAINTEXT_LISTENER
value: "yes"
image: confluentinc/cp-kafka:7.5.0
name: kafka
ports:
- containerPort: 9092
---
apiVersion: v1
kind: Service
metadata:
name: kafka-service
namespace: default
spec:
type: ClusterIP
ports:
- port: 9092
targetPort: 9092
selector:
app: kafka

创建一个topic(test-log)用于本次对接测试。
kubectl exec -it kafka-xxx -- /bin/bash
kafka-topics --create --topic test-log --bootstrap-server kafka-service:9092 --partitions 1 --replication-factor 1

6.32 修改配置文件fluent-bit,添加kafka相关配置
kafka相关配置如下,其中brokers为kafka服务地址,topics为上面创建的topic,其中Match匹配参考上面的OUTPUT进行配置(即匹配数据来源)。
[OUTPUT]
Match kube*
Name kafka
Retry_Limit 10
Workers 8
brokers kafka-service.default.svc.cluster.local:9092
topics test-log
修改配置后,删除fluent-bit下的实例重新生成,使其配置生效。brokers为kafka service的集群内部访问地址
6.4 部署logstash
apiVersion: v1
kind: ConfigMap
metadata:
name: logstash-configmap
namespace: elk
labels:
app: logstash-cm
data:
logstash.conf: |-
input {
kafka {
bootstrap_servers => ["kafka-service.default.svc.cluster.local:9092"]
client_id => "k8s_logstash"
group_id => "k8s"
auto_offset_reset => "latest"
consumer_threads => 3
decorate_events => false
topics => ["test-log"]
codec => "json"
}
}
input {
beats {
host => "0.0.0.0"
port => 5044
codec => json
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "cce-log-%{+YYYY.MM.dd}"
}
}
---
apiVersion: v1
kind: ConfigMap
metadata:
name: logstash-yml
namespace: elk
labels:
type: logstash
data:
logstash.yml: |-
http.host: "0.0.0.0"
xpack.monitoring.elasticsearch.hosts: http://elasticsearch:9200
xpack.monitoring.enabled: true
---
apiVersion: v1
kind: Service
metadata:
name: logstash-svc
namespace: elk
labels:
app: logstash
spec:
type: NodePort
selector:
app: logstash-7.12.1
ports:
- name: logstashport
port: 5044
targetPort: logport
protocol: TCP
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: logstash-deployment
namespace: elk
spec:
replicas: 1
selector:
matchLabels:
app: logstash-7.12.1
template:
metadata:
labels:
app: logstash-7.12.1
spec:
containers:
- name: logstash
image: logstash:7.12.1
imagePullPolicy: IfNotPresent
env:
- name: ES_JAVA_OPTS
value: "-Xms256m -Xmx1024m"
- name: TZ
value: "Asia/Shanghai"
ports:
- name: logport
containerPort: 5044
protocol: TCP
command: ["logstash","-f","/usr/share/logstash/config/logstash.conf"]
volumeMounts:
- name: logstash-config
mountPath: /usr/share/logstash/config/logstash.conf
subPath: logstash.conf
- name: timezone
mountPath: /etc/localtime
- name: logstash-yml
mountPath: /usr/share/logstash/config/logstash.yml
subPath: logstash.yml
volumes:
- name: logstash-config
configMap:
name: logstash-configmap
- name: timezone
hostPath:
path: /etc/localtime
- name: logstash-yml
configMap:
name: logstash-yml
logstash中配置了输入(即设置kafka相关配置,如kafka服务地址和topics名称),输出(推送到elasticsearch服务)。
6.5 部署elasticsearch
这里以单机版elasticsearch为例进行部署验证。
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: elasticsearch
namespace: elk
labels:
k8s-app: elasticsearch
spec:
serviceName: elasticsearch
replicas: 1
selector:
matchLabels:
k8s-app: elasticsearch
template:
metadata:
labels:
k8s-app: elasticsearch
spec:
containers:
- image: elasticsearch:7.12.1
name: elasticsearch
resources:
limits:
cpu: 2
memory: 2048Mi
requests:
cpu: 1
memory: 256Mi
env:
- name: "discovery.type"
value: "single-node"
- name: ES_JAVA_OPTS
value: "-Xms256m -Xmx2048m"
ports:
- containerPort: 9200
name: db
protocol: TCP
volumeMounts:
- name: elasticsearch-data
mountPath: /usr/share/elasticsearch/data
volumeClaimTemplates:
- metadata:
name: elasticsearch-data
spec:
storageClassName: "csi-disk-topology"
accessModes: [ "ReadWriteOnce" ]
resources:
requests:
storage: 10Gi
---
apiVersion: v1
kind: Service
metadata:
name: elasticsearch
namespace: elk
spec:
clusterIP: None
ports:
- port: 9200
protocol: TCP
targetPort: db
selector:
k8s-app: elasticsearch
6.6 部署kibana
6.61 部署kibana web平台用来进行日志查看
apiVersion: apps/v1
kind: Deployment
metadata:
name: kibana
namespace: elk
labels:
k8s-app: kibana
spec:
replicas: 1
selector:
matchLabels:
k8s-app: kibana
template:
metadata:
labels:
k8s-app: kibana
spec:
containers:
- name: kibana
image: kibana:7.12.1
resources:
limits:
cpu: 1
memory: 500Mi
requests:
cpu: '0.5'
memory: 200Mi
env:
- name: ELASTICSEARCH_HOSTS
value: http://elasticsearch:9200
ports:
- containerPort: 5601
name: ui
protocol: TCP
---
apiVersion: v1
kind: Service
metadata:
name: kibana
namespace: elk
spec:
ports:
- port: 5601
protocol: TCP
targetPort: 5601
type: NodePort
selector:
k8s-app: kibana
---
通过web浏览器访问kibana服务
6.62 进入管理界面,查看索引信息
6.63 创建索引模式

索引名字前缀保持一致。

时间字段这里选择时间戳。


6.64 kibana平台日志查看
创建完成后在discover中查看日志信息,可以根据索引进行切换,便于查看不同索引中的日志。


这边部署了一个nginx应用,通过循环访问nginx,标准输出日志会推送kafka,查看kibana上是否有nginx应用的标准输出日志。

kibana查看对应日志。

可以查看到nginx的标准输出日志。
- 点赞
- 收藏
- 关注作者
评论(0)