SpringCloud微服务日志经kafka缓冲写入到ELK
【摘要】 启动kafka启动zookeeper新版本中将不再依赖外部的zookeeper$ bin/zookeeper-server-start.sh config/zookeeper.properties启动kafka$ bin/kafka-server-start.sh config/server.properties创建一个Topic$ bin/kafka-topics.sh --create...
启动kafka
启动zookeeper
新版本中将不再依赖外部的zookeeper
$ bin/zookeeper-server-start.sh config/zookeeper.properties
启动kafka
$ bin/kafka-server-start.sh config/server.properties
创建一个Topic
$ bin/kafka-topics.sh --create --topic logger-channel --bootstrap-server localhost:9092
启动elasticsearch
bin/elasticsearch
启动kibana
bin/kibana
启动logstash
编写配置文件kafka.conf
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.
input {
kafka {
id => "my_plugin_id"
bootstrap_servers => "localhost:9092"
topics => ["logger-channel"]
auto_offset_reset => "latest"
}
}
filter {
#json
json {
source => "message"
}
date {
match => ["time", "yyyy-MM-dd HH:mm:ss.SSS"]
remove_field => ["time"]
}
}
output {
#stdout {}
elasticsearch {
hosts => ["http://localhost:9200"]
index => "logs-%{+YYYY.MM.dd}"
#user => "elastic"
#password => "changeme"
}
}
启动服务
./bin/logstash -f ./config/kafka.conf --config.reload.automatic
配置SpringBoot服务日志文件
logback-spring.xml
<?xml version="1.0" encoding="UTF-8"?>
<!-- scan:当此属性设置为true时,配置文件如果发生改变,将会被重新加载,默认值为true。 scanPeriod:设置监测配置文件是否有修改的时间间隔,如果没有给出时间单位,
默认单位是毫秒当scan为true时,此属性生效。默认的时间间隔为1分钟。 debug:当此属性设置为true时,将打印出logback内部日志信息,实时查看logback运行状态。
默认值为false。 -->
<!-- <configuration scan="false" scanPeriod="60 seconds" debug="false"> -->
<configuration>
<!--设置上下文名称,用于区分不同应用程序的记录。一旦设置不能修改, 可以通过%contextName来打印日志上下文名称 -->
<contextName>kafka-log-test</contextName>
<property name="applicationName" value="${spring.application.name:-paw-kelk}"/>
<!-- 定义日志的根目录 -->
<property name="logDir" value="/Users/rubble/logs/${applicationName}" />
<!-- 定义日志文件名称 -->
<property name="logName" value="${applicationName}"/>
<!-- profile -->
<property name="profileActive" value="${spring.profile.active:-dev}" />
<!-- ConsoleAppender 表示控制台输出 -->
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<!-- 日志输出格式: %d表示日期时间, %thread表示线程名, %-5level:级别从左显示5个字符宽度, %logger{50}
表示logger名字最长50个字符,否则按照句点分割。 %msg:日志消息, %n是换行符 -->
<encoder>
<pattern>%d{HH:mm:ss.SSS} %contextName [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<!-- 异常错误日志记录到文件 -->
<appender name="logfile" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!-- <Encoding>UTF-8</Encoding> -->
<File>${logDir}/${logName}.log</File>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<FileNamePattern>${logDir}/history/${myspringboottest_log}.%d{yyyy-MM-dd}.rar</FileNamePattern>
<maxHistory>30</maxHistory>
</rollingPolicy>
<encoder>
<pattern>%d{HH:mm:ss.SSS} %contextName [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<appender name="kafkaAppender" class="com.github.danielwegener.logback.kafka.KafkaAppender">
<encoder class="com.github.danielwegener.logback.kafka.encoding.LayoutKafkaMessageEncoder">
<layout class="net.logstash.logback.layout.LogstashLayout" >
<includeContext>false</includeContext>
<includeCallerData>true</includeCallerData>
<customFields>{"appName":"${applicationName}","env":"${profileActive}"}</customFields>
<fieldNames class="net.logstash.logback.fieldnames.ShortenedFieldNames"/>
<!-- <fieldNames class="net.logstash.logback.fieldnames.LogstashFieldNames"/>-->
</layout>
<charset>UTF-8</charset>
</encoder>
<!--kafka topic 需要与配置文件里面的topic一致 否则kafka会沉默并鄙视你-->
<topic>logger-channel</topic>
<keyingStrategy class="com.github.danielwegener.logback.kafka.keying.HostNameKeyingStrategy" />
<deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy" />
<producerConfig>bootstrap.servers=localhost:9092</producerConfig>
<!-- 如果kafka不可用则输出到 appender -->
<appender-ref ref="logfile" />
</appender>
<!--异步写入kafka,尽量不占用主程序的资源-->
<appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender">
<neverBlock>true</neverBlock>
<includeCallerData>true</includeCallerData>
<discardingThreshold>0</discardingThreshold>
<queueSize>2048</queueSize>
<appender-ref ref="kafkaAppender" />
</appender>
<root level="INFO">
<appender-ref ref="CONSOLE" />
<!-- <appender-ref ref="logfile"/>-->
<!-- kafka 日志 -->
<appender-ref ref="ASYNC" />
</root>
</configuration>
启动springBoot服务,打印任意日志。
kibana查看日志
从菜单找到stack Management>>index patterns>> create index pattern 创建索引
菜单discover >> 选择刚创建的索引 即可查看日志了
至此SpringCloud微服务日志写入到kafka-elk完成。
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)