【愚公系列】2022年12月 Elasticsearch数据库-.NET CORE的Serilog=>Rabbitmq=>Log

举报
愚公搬代码 发表于 2022/12/30 23:14:51 2022/12/30
【摘要】 前言Logstash是一种分布式日志收集框架,经常与ElasticSearch,Kibana配置,组成著名的ELK技术栈,非常适合用来做日志数据的分析。logstash具备实时数据传输能力的管道,负责将数据信息从管道的输入端传输到管道的输出端;与此同时这根管道还可以让你根据自己的需求在中间加上滤网,Logstash提供里很多功能强大的滤网以满足你的各种应用场景。当然它可以单独出现,作为日志...

前言

Logstash是一种分布式日志收集框架,经常与ElasticSearch,Kibana配置,组成著名的ELK技术栈,非常适合用来做日志数据的分析。logstash具备实时数据传输能力的管道,负责将数据信息从管道的输入端传输到管道的输出端;与此同时这根管道还可以让你根据自己的需求在中间加上滤网,Logstash提供里很多功能强大的滤网以满足你的各种应用场景。当然它可以单独出现,作为日志收集软件,你可以收集日志到多种存储系统或临时中转系统,如MySQL,Redis,Kakfa,HDFS, Lucene,Solr等,并不一定是ElasticSearch。

1.logstash的简介

  • logstash的概念:是一款开源的数据收集引擎,具有实时管道处理能力。
  • logstash具有200多个插件,可以接受各种各样的数据(如日志、网络请求、关系型数据库、传感器或物联网等等)
    • Logstash工作过程:
    • Logstash 就像管道符一样,读取输入数据,然后处理过滤数据,最后输出到目的地(数据在线程之间以事件的形式流传)
    • logstash数据处理过程包括三个部分:input、filter、output
    • input和output部分可以使用codecs对数据格式进行处理
    • Input,Filter,Output和codec:都是以插件形式存在
    • 用户可以通过设置pipeline配置文件,设置符合需求的input、filter、outer、codecs插件实现对指定数据的采集、处理和输出功能
  • logstash的三大部分的介绍:
    • input:从数据源获取数据。不同的数据源使用不同的插件。常用的插件有:file、jdbc、Redis、syslog、beats、http等
    • filter:用于处理数据。对从数据源获取到的数据按照需求进行处理(如:解析数据、删除字段、类型转换等)。常用的组件有:date、grok、dessect、mutate、json、geoip、ruby等
    • output:用于将数据输出到目的地。不同的目的地使用不同的插件。常用的插件有:elaticsearch、file、graphite、statsd等
    • codec:用于对数据进行编码解码。不是一个单独的流程,是用于input和output部分对数据进行编解码作用。常见的组件有:json、multiline等
    • logstash不是一个input-filter-output的数据流,而是一个 input | decode | filter |
      encode | output 的数据流。
  • logstash将数据转换为事件时候,会给事件添加一些额外的信息。下面介绍几个常见的额为信息:
    • @timestamp:用来标记事件的发生时间
    • host:标记事件发生地址
    • type:标记事件的唯一类型(input和output部分都可以配置多个不同的插件,每个插件可以用type来唯一标记这个插件,可以实现对不同的插件进行不同的处理)
    • tags:标记事件的某方面属性。这是一个数组,一个事件可以有多个标签
  • Logstash中的数据类型:
    • bool:use_column_value => true
    • string:jdbc_driver_class => “com.mysql.jdbc.Driver”
    • number:jdbc_page_size => 50000
    • array:hosts => [“192.168.57.100:9200”,“192.168.57.101:9200”,“192.168.57.102:9200”]
    • hash:options =>{key1 =>value1,key2 =>value2}
  • logastah中的逻辑运算符:
    • 相等关系:==、!=、<、>、<=、>=
    • 正则关系:=~(匹配正则)、!~(不匹配正则)
    • 包含关系:in、not in
    • 布尔操作:and(与)、or(或)、nand(非与)、xor(非或)
    • 一元运算符:!(取反)、()(复合表达式)、!() (对复合表达式结果取反)

Logstash的Gitee地址:https://gitee.com/chatopera/logstash
在这里插入图片描述

Logstash的下载地址:https://artifacts.elastic.co/downloads/logstash/logstash-7.15.2-windows-x86_64.zip

一、.NET CORE的Serilog=>Rabbitmq=>Logstash=>Elasticsearch的日志传输

1.安装包

nuget安装:

Serilog.Sinks.RabbitMQ
Serilog
Serilog.AspNetCore
Serilog.Extensions.Logging
Serilog.Sinks.Async
Serilog.Sinks.Console
Serilog.Sinks.File

在这里插入图片描述

2.进行Serilog的配置

1、客户端配置

#region Serilog日志
builder.Host.UseSerilog((context, logger) =>//注册Serilog
{
    //第一种方式:配置形式进行
    logger.ReadFrom.Configuration(context.Configuration);
    logger.Enrich.FromLogContext();
    //输出到RabbitMQ
    logger.WriteTo.RabbitMQ((clientConfiguration, sinkConfiguration) =>
    {
        clientConfiguration.Hostnames.Add("127.0.0.1");
        clientConfiguration.Username = "guest";
        clientConfiguration.Password = "guest";
        clientConfiguration.Exchange = "rqlogstashExchange";
        clientConfiguration.ExchangeType = RabbitMQ.Client.ExchangeType.Direct;
        clientConfiguration.DeliveryMode = RabbitMQDeliveryMode.Durable;
        clientConfiguration.RouteKey = "rqlogstash";
        clientConfiguration.Port = 5672;
        sinkConfiguration.TextFormatter = new JsonFormatter();
    });
});
#endregion

在这里插入图片描述

"Serilog": {
    "MinimumLevel": {
      "Default": "Debug", //最小日志记录级别
      "Override": { //系统日志最小记录级别
        "Default": "Warning",
        "System": "Warning",
        "Microsoft": "Warning"
      }
    },
    "WriteTo": [
      { "Name": "Console" }, //输出到控制台
      {
        "Name": "Async", //Serilog.Sinks.Async
        "Args": {
          "configure": [
            {
              "Name": "File", //输出文件
              "Args": {
                "path": "log/log.txt",
                "outputTemplate": "{NewLine}Date:{Timestamp:yyyy-MM-dd HH:mm:ss.fff}{NewLine}LogLevel:{Level}{NewLine}Message:{Message}{NewLine}Exception:{Exception}{NewLine}---------------------------------------------------------------------------------------------------------",
                "rollingInterval": "3" //按天记录
              }
            }
          ]
        }
      }
    ]
    //serilog  end
  }

在这里插入图片描述

程序启动时候,进行主动创建一个Exchange为rqlogstashExchange的,RouteKey是rqlogstash的消息队列,包括生产者和消费者。

2、RabbitMQ客户端端配置

安装RabbitMQ.Client

using RabbitMQ.Client;

Console.WriteLine("Hello, World! 生产者");

var factory = new ConnectionFactory()       // 创建连接工厂对象
{
    HostName = "127.0.0.1",
    Port = 5672,
    UserName = "guest",
    Password = "guest"
};
var connection = factory.CreateConnection();    // 创建连接对象
var channel = connection.CreateModel();         // 创建连接会话对象


string routeKey = "rqlogstash";

string exchangeName = "rqlogstashExchange";

channel.ExchangeDeclare(exchange: exchangeName, type: RabbitMQ.Client.ExchangeType.Direct);

string str;
do
{
    Console.WriteLine("发送内容:");
    str = Console.ReadLine()!;

    byte[] body = System.Text.Encoding.UTF8.GetBytes(str); // 消息内容

    channel.BasicPublish(exchangeName, routeKey, null, body); // 发送消息
} while (str.Trim().ToLower() != "exit");

channel.Close();
connection.Close();

在这里插入图片描述

3.LogLogstash配置

复制/config/logstash-sample.conf配置文件为/bin/rabbitmq.conf 进行如下配置

# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.

input {
  rabbitmq {
    host => "127.0.0.1"
    # 虚拟机Host
    #vhost => "Ocsa_Cap" 
    port => 5672
    user => "guest"
    password => "guest"
    queue => "logstash"
    key => "rqlogstash"
    exchange => "rqlogstashExchange"
	  #持久化跟队列配置一致
    durable => true 
    #格式
    codec => "plain"       
  }
}


filter {
  grok {
    match => {"Timestamp" => "%{TIMESTAMP_ISO8601:ctime}"}
    add_field => ["create_time","%{@timestamp}"]
  }
  date {
    match => ["ctime","yyyy-MM-dd HH:mm:ss.SSS","ISO8601"]
    target => "@timestamp"
  }
  mutate {
    remove_field => ["@version","Properties","Timestamp","ctime"]
    rename => {"MessageTemplate" => "message"}
    rename => {"Level" => "level"}
  }
  ruby {
    code => "event.set('create_time',event.get('@timestamp').time.localtime)"
  }
}

output {
  elasticsearch {
    hosts => ["127.0.0.1:19200"]
    index => "rabbit-%{+YYYYMMdd}"
  }
}

在这里插入图片描述

启动logstash

logstash -f rabbitmq.conf

在这里插入图片描述

4.测试

1、客户端日志测试
在这里插入图片描述
Rabbitmq发现队列

在这里插入图片描述
Elasticsearch发现数据
在这里插入图片描述
在这里插入图片描述

2、服务端日志测试
在这里插入图片描述

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。