flink与elasticsearch的奇妙相遇——Nginx日志分析
在万物互联的时代,数据在源源不断的产生,数据即财富、时间即财富,海量数据越实时越有价值。时势造英雄,各种用于实时流计算的开源框架应运而生,Flink更是个中翘楚。Flink不仅提供实时计算能力,而且支持计算过程的状态管理,更能很好地处理数据的时序问题。完美的实时流计算框架搭配上高扩展、高实时的分布式搜索引擎Elasticsearch,Nginx日志分析便是手到擒来。
让我们来看看nginx日志长什么样,nginx日志分两种:访问日志和错误日志,这里我们只关注访问日志。以下是nginx日志常见的字段集合:
参数 |
描述 |
http_host |
请求地址 |
forwad_ip |
转发ip |
status |
请求状态 |
request_length |
请求长度 |
bytes_sent |
发送给客户端的字节数,不包括响应头的大小 |
request |
请求的URI和HTTP协议 |
http_referer |
url跳转来源 |
http_user_agent |
浏览器信息 |
upstream_cache_status |
上游缓存状态 |
upstream_status |
上游状态 |
request_time |
请求时间 |
upstream_response_time |
上游响应时间 |
upstream_addr |
上游地址 |
更多字段可以参考官网:http://nginx.org/en/docs/http/ngx_http_core_module.html#variables
样例:
xxx.xxx.com||45.249.212.44||15421010072.675||200||651||228||POST /x/report/heartbeat HTTP/1.1||-||Mozilla/5.0 (Windows NT 6.0; rv:34.0) Gecko/20100101 Firefox/34.0||-||200||0.033||0.033||140.206.227.10:80
想象一下,每分每秒有无数的这样日志在产生,丰富的信息等着我们去挖掘,我们该如何处理,前文已经有了答案。但是我们要自己去搭flink框架、去配置elasticsearch吗?NO,华为云数据湖探索服务(DLI)向你伸出友爱的小爪,下面我们来看看如何在DLI服务完成这么一个场景,解决我们面对海量日志数据却宝山空回的窘境吧。DLI服务支持标准的Flink SQL并且兼容主流的数据格式,这里我们使用sql就能轻松处理。
CREATE SOURCE STREAM log_infos (
http_host STRING,
forward_ip STRING,
msec DOUBLE,
status INT,
request_length INT,
bytes_sent INT,
request STRING,
http_referer STRING,
http_user_agent STRING,
upstream_cache_status STRING,
upstream_status INT,
request_time DOUBLE,
upstream_response_time DOUBLE,
upstream_addr STRING
)
WITH (
type = "dis",
region = "cn-north-7",
channel = "csinput",
partition_count = "1",
encode = "csv",
field_delimiter = "\\|\\|",
quote = "\u005c\u0022",
offset = "0"
);
CREATE SINK STREAM log_out (
http_host STRING,
forward_ip STRING,
msec LONG,
status INT,
request_length INT,
bytes_sent INT,
http_method STRING,
url STRING,
http_referer STRING,
http_user_agent STRING,
upstream_cache_status STRING,
upstream_status INT,
request_time DOUBLE,
upstream_response_time DOUBLE,
upstream_addr STRING,
province STRING,
browser STRING
)
WITH (
type = "es",
region = "cn-north-7",
cluster_address = "192.168.0.205:9200,192.168.0.143:9200,192.168.0.66:9200",
es_index = "nginxlog",
es_type = "nginx",
es_fields = "http_host,forward_ip,msec,status,request_length,bytes_sent,http_method,url,http_referer,http_user_agent,upstream_cache_status,upstream_status,request_time,upstream_response_time,upstream_addr,province,browser",
batch_insert_data_num = "100"
);
INSERT INTO log_out
SELECT http_host,forward_ip,cast(msec * 1000 as bigint) + 28800000,status,request_length, bytes_sent,string_to_array(request, '\\ ')[1],string_to_array(request, '\\ ')[2],http_referer,http_user_agent,
upstream_cache_status,upstream_status,request_time,upstream_response_time,
upstream_addr,
case IP_TO_PROVINCE(forward_ip) when "Guangxi" then "Guangxi Zhuang Autonomous Region"
when "Ningxia" then "Ningxia Hui Autonomous Region"
when "Taiwan" then "Taiwan Province"
when "Macao" then "Macau"
else IP_TO_PROVINCE(forward_ip) end,
case when http_user_agent like "%Chrome%" then "Chrome"
when http_user_agent like "%Firefox%" then "Firefox"
when http_user_agent like "%Safari%" then "Safari"
else "Others" end
FROM log_infos;
除了标准的flink sql,DLI还封装扩展了一些内置函数,例如样例中的IP_TO_PROVINCE可以将日志的请求ip解析成省份信息,帮助我们更好更方便地提取有用信息。通过便利的sql我们就能实现PV/UV统计、访问前十地址、请求方法占比、请求状态占比等等。Nginx日志们再也不用待在角落落灰了。
附:
DLI官网访问链接:https://console.huaweicloud.com/dli
- 点赞
- 收藏
- 关注作者
评论(0)