【愚公系列】2022年12月 Elasticsearch数据库-.NET CORE的Serilog=>Rabbitmq=>Log
前言
Logstash是一种分布式日志收集框架,经常与ElasticSearch,Kibana配置,组成著名的ELK技术栈,非常适合用来做日志数据的分析。logstash具备实时数据传输能力的管道,负责将数据信息从管道的输入端传输到管道的输出端;与此同时这根管道还可以让你根据自己的需求在中间加上滤网,Logstash提供里很多功能强大的滤网以满足你的各种应用场景。当然它可以单独出现,作为日志收集软件,你可以收集日志到多种存储系统或临时中转系统,如MySQL,Redis,Kakfa,HDFS, Lucene,Solr等,并不一定是ElasticSearch。
1.logstash的简介
- logstash的概念:是一款开源的数据收集引擎,具有实时管道处理能力。
- logstash具有200多个插件,可以接受各种各样的数据(如日志、网络请求、关系型数据库、传感器或物联网等等)
- Logstash工作过程:
- Logstash 就像管道符一样,读取输入数据,然后处理过滤数据,最后输出到目的地(数据在线程之间以事件的形式流传)
- logstash数据处理过程包括三个部分:input、filter、output
- input和output部分可以使用codecs对数据格式进行处理
- Input,Filter,Output和codec:都是以插件形式存在
- 用户可以通过设置pipeline配置文件,设置符合需求的input、filter、outer、codecs插件实现对指定数据的采集、处理和输出功能
- logstash的三大部分的介绍:
- input:从数据源获取数据。不同的数据源使用不同的插件。常用的插件有:file、jdbc、Redis、syslog、beats、http等
- filter:用于处理数据。对从数据源获取到的数据按照需求进行处理(如:解析数据、删除字段、类型转换等)。常用的组件有:date、grok、dessect、mutate、json、geoip、ruby等
- output:用于将数据输出到目的地。不同的目的地使用不同的插件。常用的插件有:elaticsearch、file、graphite、statsd等
- codec:用于对数据进行编码解码。不是一个单独的流程,是用于input和output部分对数据进行编解码作用。常见的组件有:json、multiline等
- logstash不是一个input-filter-output的数据流,而是一个 input | decode | filter |
encode | output 的数据流。
- logstash将数据转换为事件时候,会给事件添加一些额外的信息。下面介绍几个常见的额为信息:
- @timestamp:用来标记事件的发生时间
- host:标记事件发生地址
- type:标记事件的唯一类型(input和output部分都可以配置多个不同的插件,每个插件可以用type来唯一标记这个插件,可以实现对不同的插件进行不同的处理)
- tags:标记事件的某方面属性。这是一个数组,一个事件可以有多个标签
- Logstash中的数据类型:
- bool:use_column_value => true
- string:jdbc_driver_class => “com.mysql.jdbc.Driver”
- number:jdbc_page_size => 50000
- array:hosts => [“192.168.57.100:9200”,“192.168.57.101:9200”,“192.168.57.102:9200”]
- hash:options =>{key1 =>value1,key2 =>value2}
- logastah中的逻辑运算符:
- 相等关系:==、!=、<、>、<=、>=
- 正则关系:=
~
(匹配正则)、!~
(不匹配正则) - 包含关系:in、not in
- 布尔操作:and(与)、or(或)、nand(非与)、xor(非或)
- 一元运算符:!(取反)、()(复合表达式)、!() (对复合表达式结果取反)
Logstash的Gitee地址:https://gitee.com/chatopera/logstash
Logstash的下载地址:https://artifacts.elastic.co/downloads/logstash/logstash-7.15.2-windows-x86_64.zip
一、.NET CORE的Serilog=>Rabbitmq=>Logstash=>Elasticsearch的日志传输
1.安装包
nuget安装:
Serilog.Sinks.RabbitMQ
Serilog
Serilog.AspNetCore
Serilog.Extensions.Logging
Serilog.Sinks.Async
Serilog.Sinks.Console
Serilog.Sinks.File
2.进行Serilog的配置
1、客户端配置
#region Serilog日志
builder.Host.UseSerilog((context, logger) =>//注册Serilog
{
//第一种方式:配置形式进行
logger.ReadFrom.Configuration(context.Configuration);
logger.Enrich.FromLogContext();
//输出到RabbitMQ
logger.WriteTo.RabbitMQ((clientConfiguration, sinkConfiguration) =>
{
clientConfiguration.Hostnames.Add("127.0.0.1");
clientConfiguration.Username = "guest";
clientConfiguration.Password = "guest";
clientConfiguration.Exchange = "rqlogstashExchange";
clientConfiguration.ExchangeType = RabbitMQ.Client.ExchangeType.Direct;
clientConfiguration.DeliveryMode = RabbitMQDeliveryMode.Durable;
clientConfiguration.RouteKey = "rqlogstash";
clientConfiguration.Port = 5672;
sinkConfiguration.TextFormatter = new JsonFormatter();
});
});
#endregion
"Serilog": {
"MinimumLevel": {
"Default": "Debug", //最小日志记录级别
"Override": { //系统日志最小记录级别
"Default": "Warning",
"System": "Warning",
"Microsoft": "Warning"
}
},
"WriteTo": [
{ "Name": "Console" }, //输出到控制台
{
"Name": "Async", //Serilog.Sinks.Async
"Args": {
"configure": [
{
"Name": "File", //输出文件
"Args": {
"path": "log/log.txt",
"outputTemplate": "{NewLine}Date:{Timestamp:yyyy-MM-dd HH:mm:ss.fff}{NewLine}LogLevel:{Level}{NewLine}Message:{Message}{NewLine}Exception:{Exception}{NewLine}---------------------------------------------------------------------------------------------------------",
"rollingInterval": "3" //按天记录
}
}
]
}
}
]
//serilog end
}
程序启动时候,进行主动创建一个Exchange为rqlogstashExchange的,RouteKey是rqlogstash的消息队列,包括生产者和消费者。
2、RabbitMQ客户端端配置
安装RabbitMQ.Client
包
using RabbitMQ.Client;
Console.WriteLine("Hello, World! 生产者");
var factory = new ConnectionFactory() // 创建连接工厂对象
{
HostName = "127.0.0.1",
Port = 5672,
UserName = "guest",
Password = "guest"
};
var connection = factory.CreateConnection(); // 创建连接对象
var channel = connection.CreateModel(); // 创建连接会话对象
string routeKey = "rqlogstash";
string exchangeName = "rqlogstashExchange";
channel.ExchangeDeclare(exchange: exchangeName, type: RabbitMQ.Client.ExchangeType.Direct);
string str;
do
{
Console.WriteLine("发送内容:");
str = Console.ReadLine()!;
byte[] body = System.Text.Encoding.UTF8.GetBytes(str); // 消息内容
channel.BasicPublish(exchangeName, routeKey, null, body); // 发送消息
} while (str.Trim().ToLower() != "exit");
channel.Close();
connection.Close();
3.LogLogstash配置
复制/config/logstash-sample.conf
配置文件为/bin/rabbitmq.conf 进行如下配置
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.
input {
rabbitmq {
host => "127.0.0.1"
# 虚拟机Host
#vhost => "Ocsa_Cap"
port => 5672
user => "guest"
password => "guest"
queue => "logstash"
key => "rqlogstash"
exchange => "rqlogstashExchange"
#持久化跟队列配置一致
durable => true
#格式
codec => "plain"
}
}
filter {
grok {
match => {"Timestamp" => "%{TIMESTAMP_ISO8601:ctime}"}
add_field => ["create_time","%{@timestamp}"]
}
date {
match => ["ctime","yyyy-MM-dd HH:mm:ss.SSS","ISO8601"]
target => "@timestamp"
}
mutate {
remove_field => ["@version","Properties","Timestamp","ctime"]
rename => {"MessageTemplate" => "message"}
rename => {"Level" => "level"}
}
ruby {
code => "event.set('create_time',event.get('@timestamp').time.localtime)"
}
}
output {
elasticsearch {
hosts => ["127.0.0.1:19200"]
index => "rabbit-%{+YYYYMMdd}"
}
}
启动logstash
logstash -f rabbitmq.conf
4.测试
1、客户端日志测试
Rabbitmq发现队列
Elasticsearch发现数据
2、服务端日志测试
- 点赞
- 收藏
- 关注作者
评论(0)