【日志可观测】经典组合Fluent-bit×Kafka×Logstash×Elasticsearch×Kibana

举报
可以交个朋友 发表于 2025/12/11 11:11:47 2025/12/11
【摘要】 一、背景随着企业数字化转型进程加快,日志数据在运维监控和数据分析中的重要性日益凸显,部分用户在使用K8S过程中,存在日志采集不全面、数据处理效率低、无法关联上下文分析能力弱,本方案基于 Fluent-bit+Kafka+Logstash+Elasticsearch+Kibana,解决云原生环境下日志采集不全、高并发丢包、格式混乱问题,实现容器日志统一采集、流量削峰、标准化处理,支撑故障快速...

一、背景

随着企业数字化转型进程加快,日志数据在运维监控和数据分析中的重要性日益凸显,部分用户在使用K8S过程中,存在日志采集不全面、数据处理效率低、无法关联上下文分析能力弱,本方案基于 Fluent-bit+Kafka+Logstash+Elasticsearch+Kibana,解决云原生环境下日志采集不全、高并发丢包、格式混乱问题,实现容器日志统一采集、流量削峰、标准化处理,支撑故障快速定位、业务监控告警,让日志高效服务运维与业务决策。

二、方案简介

本方案采用先进的ELK(Elasticsearch, Logstash, Kibana)技术栈结合fluent-bit进行日志管理,旨在实现高效率的数据流处理和可视化展示。整体架构分为四个主要模块:日志采集插件 (fluent-bit)消息队列缓冲层 (kafka)解析转换过滤层 (logstash)存储索引展示层 (elasticsearch & kibana),通过在k8s集群(cce集群)中部署fluent-bit容器实例来采集日志,并发送到外部kafka,再经过logstash对日志数据进行解析、转换、过滤,经过处理后的日志数据由elasticsearch进行存储,通过web服务kibana界面化的展示日志数据。最终形成了一套完整的云原生日志采集与分析系统
image.png

三、技术原理分析

Fluent-bit
工作原理:日志通过数据管道从数据源发送到目的地,一个数据管道通常由 Input、Parser、Filter、Buffer、Routing 、Output 组成  。

  • Input:用于从数据源抽取数据,一个数据管道中可以包含多个 Input。
  • Parser:负责将 Input 抽取的非结构化数据转化为标准的结构化数据,每个 Input 均可以定义自己的 Parser(可选)。
  • Filter:负责对格式化数据进行过滤和修改。一个数据管道中可以包含多个 Filter,Filter 会顺序执行,其执行顺序与配置文件中的顺序一致。
  • Buffer:用户缓存经过 Filter 处理的数据,默认情况下buffer把input插件的数据缓存到内存中,直到路由传递到output为止。
  • Routing:将 Buffer 中缓存的数据路由到不同的 Output。
  • Output:负责将数据发送到不同的目的地,一个数据管道中可以包含多个 Output。

Kafka
工作原理:Kafka 是一个分布式的消息队列系统,采用发布-订阅模型。生产者将消息发送到 Kafka 的主题(Topic)中,消费者从主题中订阅消息 博客园 。Kafka 将消息存储在磁盘上,并通过分区(Partition)和副本(Replica)机制来保证高可用性和可扩展性
Logstash
工作原理:Logstash 事件处理管道拥有三个阶段:输入 → 过滤 → 输出 。

  • 输入(Inputs):使用 Input 插件将数据输入至 Logstash 中。一些常用的 Input 插件有 file、syslog、redis、beats 等  。
  • 过滤(Filters):过滤器插件是在 Logstash 管道中的中间处理设备。如果事件符合特定的条件,则可以将过滤器与条件结合使用 以对事件执行操作。一些常用的 filters 包括 grok、mutate、drop、clone、geoip 等  。
  • 输出(Outputs):输出是 Logstash 管道的最后阶段。一个事件可以通过多个输出进行传递,但一旦所有输出处理都完成了该事件就完成了全部流程的执行。一些常用的 outputs 包括 elasticsearch、file、graphite、statsd 等  。

Elasticsearch
工作原理:Elasticsearch 是一个基于 Lucene 的搜索引擎,采用倒排索引的方式存储和检索数据。它支持全文搜索、结构化搜索、分析等功能。数据被存储在索引(Index)中,每个索引可以包含多个文档

Kibana
工作原理:Kibana 是一个开源的分析和可视化平台,用于与 Elasticsearch 配合使用 。它允许用户通过可视化的方探索和分析数据,创建仪表板、图表、地图等 。Kibana 通过 Elasticsearch 的 API 来获取数据,并将其展示在用户界面上 。

四、具体使用场景

4.1 企业级日志管理场景

  • 大规模分布式系统日志收集与分析:在大型企业中,系统通常由多个分布式服务组成,产生海量的日志数据。通过 Fluent-bit 在各服务节点上轻量级地采集日志,借助 Kafka 的高吞吐量和可扩展性进行缓冲和传输,Logstash 对日志进行进一步的解析和处理,最终存储到 Elasticsearch 中,再利用 Kibana 进行可视化展示和分析,可帮助企业快速定位问题、监控系统运行状态。
  • 多业务系统日志整合:企业内部可能有多种不同的业务系统,如 ERP、CRM、OA 等,这些系统产生的日志格式各异。该方案可以将不同业务系统的日志统一收集、处理和存储,方便企业进行整体的日志管理和审计。

4.2 容器化环境日志管理场景

  • Kubernetes 集群日志采集:在 Kubernetes 环境中,容器的生命周期较短且数量众多,日志管理较为复杂。Fluent-bit 以 DaemonSet 的形式部署在 Kubernetes 集群中,能够高效地采集容器的日志,并将其发送到 Kafka,后续流程与上述一致。这种方案不仅解决了容器日志的收集问题,还通过 Kafka 缓存机制保障了日志数据的完整性,避免因容器重启或删除导致日志丢失。
  • 容器化微服务架构日志监控:对于采用容器化微服务架构的应用,该方案可以实时收集各个微服务产生的日志,通过 Logstash 对日志进行分析和处理,从而实现对微服务架构的全面监控,及时发现服务之间的调用问题、性能瓶颈等。

5.Fluent Bit核心配置详解

Fluent Bit通过配置文件来设置各个组件的行为,配置文件定义了Fluent bit将如何采集日志,如何处理日志,如何将日志对接到其他平台等行为。
因此在运行服务之前,需要深入了解一下如何编写正确的配置文件。
Fluent Bit的配置文件主要由四部分组成: 【SERVICE】、【INPUT】、【FILTER】、【OUTPUT】

SERVICE 部分:定义了全局服务级别的配置

[SERVICE]
      Flush           1
      Log_Level       info
      Daemon          off
      Parsers_File    parsers.conf
      HTTP_Server     On
      HTTP_Listen     0.0.0.0
      HTTP_Port       2020
      Health_Check    On
      scheduler.cap   180
      scheduler.base  2
  • Flush: 设置数据刷新到输出插件(output)的时间间隔,默认5s,每到刷新时间,fluent bit都会把数据刷新到输出插件
  • Log_Level: 日志级别有 error、warning、info、debug和trace
  • Daemon: 用于设置fluent bit为守护进程,当设置为on时 fluent bit会在启动后转入后台运行,不会阻塞当前的终端或命令行界面,通常用于在系统启动时自动启动Fluent Bit,而不需要手动干预。在Kubernetes环境中,通常不需要将Fluent Bit设置为守护进程,因为Kubernetes会负责管理Pod的生命周期,包括自动重启失败的容器。在这种情况下,Fluent Bit容器通常会以always重启策略运行,确保在任何情况下都能自动重启。
  • Parsers_File: 指定解析器配置文件
  • HTTP_Server:用于启用Fluent Bit的内置HTTP服务器,启用后可以通过HTTP接口来获取Fluent Bit的状态信息、配置信息、统计数据等也可以用于调试和实时监控Fluent Bit的行为
  • HTTP_Listen: 设置HTTP服务器监听的地址,默认为0.0.0.0(即监听所有接口)
  • HTTP_Port: 设置HTTP服务器的端口,默认为2020
  • Health_Check: 开启健康检查功能,默认为off
  • scheduler.cap: 用于设置在进行数据刷新时,如果输出插件(output)返回重试状态,调度器scheduler将等待的最大重试时间。这个配置项是与scheduler.base一起使用的,共同决定了重试前的等待时间,默认2000s。
  • scheduler.base : 设置输出插件重试等待时间的下限,默认是5s。fluent bit使用指数退避和抖动算法来确定等待时间。等待时间是一个可配置的上下限的随机数,对于第n次重试,下限是scheduler.base的值,上限是min(base*(2^n),cap)。这里的base是指数退避的基数,而cap是上限值。通过合理配置这些值,可以优化Fluent Bit在面对网络波动或输出插件暂时不可用时的重试行为,从而提高数据传输的可靠性和效率。

INPUT部分: 负责从不同的数据源收集数据,定义了日志采集数据输入的配置

[INPUT]
     Alias                      in_tail_default-stdout
     Name                       tail
     Buffer_Chunk_Size          128k
     Buffer_Max_Size            512k
     DB                         /db/default-stdout.db
     Mem_Buf_Limit              40MB
     Path                       /var/log/containers-symlinks/default-stdout/*/*/*/*/container-stdout*
     Refresh_Interval           5
     Skip_Empty_Lines           On
     Skip_Long_Lines            On
     Tag                        kube.*
     multiline.parser           docker, cri
     threaded                   On
  • Alias: 为输入插件指定一个别名。这个别名可以用来区分同一个INPUT输入插件的不同实例或者在配置文件中为特定的输入插件提供一个更易于识别的名称
  • Name:FluentBit中,每个输入插件(input)都需要一个名称来标识它,这个名称通过Name配置项来设置。Name配置项告诉fluentbit需要加载并使用哪个插件。不同的输入插件有不同的配置选项。常用的输入插件有tail,tail输入插件用于监控一个或多个文本文件,类似于Unix中的tail -f命令,支持多行日志的合并,可以处理由于行分割导致的跨日志条目的情况
  • Buffer_Chunk_Size:用于设置读取文件数据的初始缓冲区大小,这个值用于增加缓冲区大小,以适应更长的日志行。默认情况下Buffer_Chunk_Size的值为32k
  • Buffer_Max_Size: 用于设置每个被监控文件的缓冲区大小限制,限制内存缓冲区可以增长的最大额度。如果读取的文件内容超过此限制,则将该文件从监控列表中移除。这些缓冲区配置对于处理大量日志数据和防止内存过度使用至关重要。
  • DB: DB配置项与tail输入插件一起使用,用于跟踪文件的状态和偏移量,当使用tail插件监控日志文件时,DB文件会记录每个文件的读取位置,这样即使Fluent Bit服务重启,它也能从上次停止的地方继续读取,而不是从头开始。DB文件是一个SQLite数据库,它存储了每个被监控文件的最后读取位置。这个数据库文件通常放在一个持久化存储路径下,以确保服务重启后仍然可用。
  • Mem_Buf_Limit: 用于限制tail输入插件在将数据添加到处理引擎时可以使用的内存量。如果达到了这个极限,tail插件将暂停接受更多的数据,直到已经刷新的数据被输出插件处理,释放了一部分内存后,它才会恢复接受数据。这个参数对于避免高负载情况下过度使用内存非常重要,特别是在日志数据生成速度超过输出速度时。通过设置Mem_Buf_Limit ,可以防止Fluent Bit占用过多内存,从而可能导致系统性能问题或服务崩溃。Mem_Buf_Limit仅适用于Tail输入插件,并且与Buffer_Chunk_Size和Buffer_Max_Size参数一起使用,进一步控制内存使用和文件处理。

Filter 部分:负责对格式化数据进行过滤和更改

[Filter]
    Name        kubernetes
    match       kube.*
    match_Regex   *
  • name: 过滤器名称(必填,fluent-bit 通过name知道应该加载哪个插件)
  • match: 传入记录标签匹配,* 作为统配符
  • match_Regex:通过正则表达式语法进行配(和Match同时定义,Match_Regex优先)

Output 部分:负责将数据发送到不同的目的地,一个数据管道中可以包含多个 Output。

[OUTPUT]
      name          es
      match         kube.*
      Host          192.168.30.40
      Port          9200
      Logstash_Format   true
      Logstash_Prefix   cb-logstash-log
      time_Key        @time-satmp
  • name: 过滤器名称(必填,fluent-bit 通过name知道应该加载哪个插件)
  • match: 传入记录标签匹配,* 作为统配符
  • Host:es实例的地址
  • Port:es端口
  • Logstash_Format:启用Logtash格式兼容性
  • Logstash_Prefix:用 Logstash_Format 时,索引名称由前缀和后缀组成,例如:如果 Logstash_Prefix 等于 ‘mydata’ 和 Logstash_Suffix 等于 ‘YYYY.MM.DD’ 的索引将变为 ‘mydata-YYYY.MM.DD’
  • time_Key:启用 Logstash_Format 后,每条记录都会获得一个新的时间戳字段

六、操作步骤

6.1 创建cce集群

6.2 通过helm方式在cce集群中部署Fluent-Bit

helm repo add fluent https://fluent.github.io/helm-charts
helm upgrade --install fluent-bit fluent/fluent-bit

部署完成后会生成对应的DaemonSet和ConfigMap
image.png
image.png

6.3 Fluent Bit对接自建kafka

6.31 安装kafka以及创建topic

6.311部署zookeeper
apiVersion: apps/v1
kind: Deployment
metadata:
  name: zookeeper-deployment
spec:
  replicas: 1
  selector:
    matchLabels:
      app: zookeeper
  template:
    metadata:
      labels:
        app: zookeeper
    spec:
      containers:
        - name: zookeeper
          image: zookeeper:3.8
          ports:
            - containerPort: 2181

---
apiVersion: v1
kind: Service
metadata:
  name: zookeeper-service
spec:
  selector:
    app: zookeeper
  ports:
    - protocol: TCP
      port: 2181
      targetPort: 2181
  type: ClusterIP
6.312 部署kafka
apiVersion: apps/v1
kind: Deployment
metadata:
  name: kafka
  namespace: default
spec:
  replicas: 1
  selector:
    matchLabels:
      app: kafka
  template:
    metadata:
      labels:
        app: kafka
    spec:
      containers:
      - env:
        - name: KAFKA_BROKER_ID
          value: "0"
        - name: KAFKA_ZOOKEEPER_CONNECT
          value: zookeeper-service:2181
        - name: KAFKA_LISTENERS
          value: PLAINTEXT://0.0.0.0:9092
        - name: KAFKA_ADVERTISED_LISTENERS
          value: PLAINTEXT://kafka-service:9092
        - name: KAFKA_AUTO_CREATE_TOPICS_ENABLE
          value: "true"
        - name: ALLOW_PLAINTEXT_LISTENER
          value: "yes"
        image: confluentinc/cp-kafka:7.5.0
        name: kafka
        ports:
        - containerPort: 9092

---
apiVersion: v1
kind: Service
metadata:
  name: kafka-service
  namespace: default
spec:
  type: ClusterIP
  ports:
    - port: 9092
      targetPort: 9092
  selector:
    app: kafka

image.png

创建一个topic(test-log)用于本次对接测试。

kubectl exec -it kafka-xxx -- /bin/bash
kafka-topics --create --topic test-log --bootstrap-server kafka-service:9092 --partitions 1 --replication-factor 1

image.png

6.32 修改配置文件fluent-bit,添加kafka相关配置

kafka相关配置如下,其中brokers为kafka服务地址,topics为上面创建的topic,其中Match匹配参考上面的OUTPUT进行配置(即匹配数据来源)。

[OUTPUT]
    Match kube*
    Name kafka
    Retry_Limit 10
    Workers 8
    brokers kafka-service.default.svc.cluster.local:9092
    topics test-log

修改配置后,删除fluent-bit下的实例重新生成,使其配置生效。brokers为kafka service的集群内部访问地址

6.4 部署logstash

apiVersion: v1
kind: ConfigMap
metadata:
  name: logstash-configmap
  namespace: elk
  labels:
    app: logstash-cm
data:
  logstash.conf: |-
        input {
                kafka {
                        bootstrap_servers => ["kafka-service.default.svc.cluster.local:9092"]
                        client_id => "k8s_logstash"
                        group_id => "k8s"
                        auto_offset_reset => "latest"
                        consumer_threads => 3
                        decorate_events => false
                        topics => ["test-log"]
                        codec => "json"
                }
        }
        input {
                beats {
                        host => "0.0.0.0"
                        port => 5044
                        codec => json
                }
        }
        output {
                elasticsearch {
                          hosts => ["elasticsearch:9200"]
                          index => "cce-log-%{+YYYY.MM.dd}"
                        }

        }
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: logstash-yml
  namespace: elk
  labels:
    type: logstash
data:
  logstash.yml: |-
    http.host: "0.0.0.0"
    xpack.monitoring.elasticsearch.hosts: http://elasticsearch:9200
    xpack.monitoring.enabled: true
---
apiVersion: v1
kind: Service
metadata:
  name: logstash-svc
  namespace: elk
  labels:
    app: logstash
spec:
  type: NodePort
  selector:
    app: logstash-7.12.1
  ports:
  - name: logstashport
    port: 5044
    targetPort: logport
    protocol: TCP
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: logstash-deployment
  namespace: elk
spec:
  replicas: 1
  selector:
    matchLabels:
        app: logstash-7.12.1
  template:
    metadata:
      labels:
          app: logstash-7.12.1
    spec:
      containers:
      - name: logstash
        image: logstash:7.12.1
        imagePullPolicy: IfNotPresent
        env:
        - name: ES_JAVA_OPTS
          value: "-Xms256m -Xmx1024m"
        - name: TZ
          value: "Asia/Shanghai"
        ports:
        - name: logport
          containerPort: 5044
          protocol: TCP
        command: ["logstash","-f","/usr/share/logstash/config/logstash.conf"]
        volumeMounts:
        - name: logstash-config
          mountPath: /usr/share/logstash/config/logstash.conf
          subPath: logstash.conf
        - name: timezone
          mountPath: /etc/localtime
        - name: logstash-yml
          mountPath: /usr/share/logstash/config/logstash.yml
          subPath: logstash.yml
      volumes:
      - name: logstash-config
        configMap:
          name: logstash-configmap
      - name: timezone
        hostPath:
          path: /etc/localtime
      - name: logstash-yml
        configMap:
          name: logstash-yml

logstash中配置了输入(即设置kafka相关配置,如kafka服务地址和topics名称),输出(推送到elasticsearch服务)。

6.5 部署elasticsearch

这里以单机版elasticsearch为例进行部署验证。

apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: elasticsearch
  namespace: elk
  labels:
    k8s-app: elasticsearch
spec:
  serviceName: elasticsearch
  replicas: 1
  selector:
    matchLabels:
      k8s-app: elasticsearch
  template:
    metadata:
      labels:
        k8s-app: elasticsearch
    spec:
      containers:
      - image: elasticsearch:7.12.1
        name: elasticsearch
        resources:
          limits:
            cpu: 2
            memory: 2048Mi
          requests:
            cpu: 1
            memory: 256Mi
        env:
          - name: "discovery.type"
            value: "single-node"
          - name: ES_JAVA_OPTS
            value: "-Xms256m -Xmx2048m" 
        ports:
        - containerPort: 9200
          name: db
          protocol: TCP
        volumeMounts:
        - name: elasticsearch-data
          mountPath: /usr/share/elasticsearch/data
  volumeClaimTemplates:
  - metadata:
      name: elasticsearch-data
    spec:
      storageClassName: "csi-disk-topology"
      accessModes: [ "ReadWriteOnce" ]
      resources:
        requests:
          storage: 10Gi

---

apiVersion: v1
kind: Service
metadata:
  name: elasticsearch
  namespace: elk
spec:
  clusterIP: None
  ports:
  - port: 9200
    protocol: TCP
    targetPort: db
  selector:
    k8s-app: elasticsearch

6.6 部署kibana

6.61 部署kibana web平台用来进行日志查看

apiVersion: apps/v1
kind: Deployment
metadata:
  name: kibana
  namespace: elk
  labels:
    k8s-app: kibana
spec:
  replicas: 1
  selector:
    matchLabels:
      k8s-app: kibana
  template:
    metadata:
      labels:
        k8s-app: kibana
    spec:
      containers:
      - name: kibana
        image: kibana:7.12.1
        resources:
          limits:
            cpu: 1
            memory: 500Mi
          requests:
            cpu: '0.5'
            memory: 200Mi
        env:
          - name: ELASTICSEARCH_HOSTS
            value: http://elasticsearch:9200
        ports:
        - containerPort: 5601
          name: ui
          protocol: TCP

---
apiVersion: v1
kind: Service
metadata:
  name: kibana
  namespace: elk
spec:
  ports:
  - port: 5601
    protocol: TCP
    targetPort: 5601
  type: NodePort
  selector:
    k8s-app: kibana

---

通过web浏览器访问kibana服务

6.62 进入管理界面,查看索引信息

6.63 创建索引模式

image.png

索引名字前缀保持一致。
image.png

时间字段这里选择时间戳。
image.png

image.png

6.64 kibana平台日志查看

创建完成后在discover中查看日志信息,可以根据索引进行切换,便于查看不同索引中的日志。
image.png
image.png

这边部署了一个nginx应用,通过循环访问nginx,标准输出日志会推送kafka,查看kibana上是否有nginx应用的标准输出日志。
image.png

kibana查看对应日志。
image.png

可以查看到nginx的标准输出日志。

【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。