【最佳实践】使用Flink SQL实时分析日志信息

举报
云上精选 发表于 2020/04/23 18:58:49 2020/04/23
【摘要】 从DIS数据源读取数据,利用CS服务的Flink SQL作业,实时分析日志信息,并向OBS输出结果数据

场景概述

本次实践从DIS数据源读取数据,利用CS服务的Flink SQL作业,实时分析日志信息,并向OBS输出结果数据。

本次实践基本流程如下所示:

  1. 创建DIS通道和OBS桶
  2. 创建Flink SQL作业
  3. 发送DIS数据,查看结果

创建DIS通道和OBS桶

本次实践从DIS读取数据,流处理后将数据写入OBS,需先创建DIS输入通道和OBS桶。

  1. 创建DIS通道

    1. 登录公有云管理控制台。
    2. 在“服务列表”中,选择“EI企业智能 > 数据接入服务”,进入DIS管理控制台。
    3. 单击“购买接入通道”,进入“购买接入通道”页面,配置通道信息。
      • 区域:我们需要将DIS,OBS和CS服务设置为统一区域,此处我们选择“华北-北京1”。
      • 通道名称:input-dis。
      • 源数据类型:选择CSV。
      • 其他参数配置保持默认。
      图1 创建DIS通道
    4. 单击“立即购买”,确认通道规格信息后,单击“提交”。
    5. 单击“返回通道列表”,我们可以看到已经创建成功的input-dis通道。此通道用来作为CS服务的输入通道。

  2. 创建OBS桶。

    1. 在“服务列表”中,选择“存储 > 对象存储服务”,进入OBS管理控制台。
    2. 单击“创建桶”,进入“创建桶”页面,配置桶信息。
      • 区域:选择“华北-北京1”。
      • 桶名称:output-obs。
      • 其他参数配置保持默认。
      图2 创建OBS桶
    1. 单击“立即创建”,我们可以在桶列表中看到已经创建成功的桶。
    2. 单击桶名称output-obs,然后在左侧导航栏中单击“对象”,单击“新建文件夹”,输入文件夹名称logInfos。在output-obs桶内创建logInfos目录,用来保存输出数据。
      图3 创建桶文件夹

创建Flink SQL作业

  1. 在“服务列表”中,选择“EI企业智能 >实时流计算服务”,进入CS管理控制台。

    如果是首次登录CS管理控制台,请先根据页面提示申请服务并授权。

  2. 单击“新建作业”,弹出“新建作业”窗口,配置作业信息。

    • 类型:选择“Flink SQL作业“。
    • 名称:test。
    • 模板:选择“DIS-CS-OBS样例模板”。
    • 其他参数保持默认。
    图4 创建Flink SQL作业

  3. 单击“确认”,进入“编辑”页面,编辑作业。

    在SQL编辑区域内可以看到“DIS-CS-OBS样例模板”的内容,本次演示就用该样例模板进行运行。

    SQL编辑器中包含三部分内容:

    • source数据源:在with 语句中配置,实现与DIS的“input-dis”通道对接,使CS服务可以从“input-dis”通道实时获取数据。需要配置以下参数:
      • type = "dis" :类型选择dis。
      • region = "cn-north-1" :作业所在的区域, cn-north-1 表示华北-北京一区域。
      • channel = "input-dis" :DIS通道名称。
      • partition_count = "1" :DIS通道的分区数。
      • encode = "csv" :数据编码方式,选择CSV。
      • field_delimiter = "\\|\\|" :当编码格式为CSV时,属性之间的分隔符。
      • quote = "\u005c\u0022" :指定数据格式中的引用符号,在两个引用符号之间的属性分隔符会被当做普通字符处理。当引用符号为双引号时,请设置quote = "\u005c\u0022"进行转义。
      • offset = "0" :偏移量,offset="0"时,表示CS服务从DIS服务的第0条数据开始处理。

        详细信息请参见配置DIS输入流

    • sink输出源:在with 语句中配置,实现与OBS桶对接,使CS服务能将处理结果输出到OBS桶中。需要配置以下参数:
      • type = "obs" :类型选择obs
      • region = "cn-north-1" :作业所在的区域, cn-north-1 表示华北-北京一区域。
      • encode = "csv" :数据编码方式,选择CSV。
      • field_delimiter = "," :当编码格式为CSV时,属性之间的分隔符。
      • row_delimiter = "\n" :行分隔符。
      • obs_dir = "output-obs/logInfos" :文件存储目录,格式为{桶名}/{目录名}。
      • file_prefix = "log_out" :输出文件名的前缀,默认为temp。
      • rolling_size = "100m" :单个文件最大允许大小。

      详细信息请参见配置OBS输出流

    • SQL 查询,示例如下:
      INSERT INTO log_out
      SELECT http_host,forward_ip,cast(cast(msec * 1000 as bigint) + 28800000  as timestamp),status,request_length, bytes_sent,string_to_array(request, '\\ ')[1],string_to_array(request, '\\ ')[2],http_referer,http_user_agent,
      upstream_cache_status,upstream_status,request_time,cookie_DedeUserID_cookie_sid_sent_http_logdata,upstream_response_time,
      upstream_addr,
      case IP_TO_PROVINCE(forward_ip) when "Guangxi" then "Guangxi Zhuang Autonomous Region"
      when "Ningxia" then "Ningxia Hui Autonomous Region"
      when "Taiwan" then "Taiwan Province"
      when "Macao" then "Macau"
      else IP_TO_PROVINCE(forward_ip) end,
      case when http_user_agent like "%Chrome%" then "Chrome"
      when http_user_agent like "%Firefox%" then "Firefox"
      when http_user_agent like "%Safari%" then "Safari"
      else "Others" end
      FROM log_infos;

  4. 设置作业运行参数。

    • SPU:Stream Processing Units 流处理单元,一个SPU为1核4G的资源,每SPU 0.5元/小时。最低2个SPU起。
    • 并行数:Flink作业算子并行度,缺省为1。
    • 开启Checkpoint:是否开启Flink快照。
    • 保存作业日志:是否保存作业日志到OBS桶中。
    • 开启作业异常告警:作业异常后是否推送SMN消息(邮件和短信)。
    图5 设置作业运行参数

  5. 完成作业编辑并且设置运行参数后,单击“语义校验”,确保语义校验成功。只有语义校验成功后,才可以执行“调试”、“提交”或“启动”作业的操作。
  6. 语义校验成功后,单击“提交”,进入作业配置确认页面,作业配置确认无误后,单击“确认”将作业提交并启动。

    提交作业后,系统将自动跳转到“作业管理”页面,新创建的作业将显示在作业列表中,在“状态”列中我们可以查看作业的状态。作业提交成功后,状态将变为“运行中”。

发送DIS数据,查看结果

这里使用DIS Agent向云上DIS通道发送CSV结构的数据,DIS Agent是一个本地运行的代理,监控本地文件变化,一旦文件中有新的数据追加,就会即时把新增的数据发送到DIS通道中,类似flume。

DIS Agent详细使用指南,请参见使用Agent上传数据

  1. 启动DIS Agent。

    1. 下载DIS Agent:https://dis-publish.obs-website.cn-north-1.myhuaweicloud.com/dis-agent-1.1.0.zip
    2. 将下载的DIS Agent在本地解压。
    3. 修改conf/agent.yml。
      ---
      # 不变
      region: cn-north-1
      # user ak (get from 'My Credential')
      ak: 填写你的AK
      # user sk (get from 'My Credential')
      sk: 填写你的SK
      ak/sk:进入console控制台->右上角 我的账号 选择"我的凭证"-> "管理访问秘钥"->"新增访问秘钥"
      # user project id (get from 'My Credential')
      projectId: 进入console控制台->右上角 我的账号 选择"我的凭证"-> "项目列表"中选择"cn-north-1"对应的"项目ID"
      # 不变。
      endpoint: https://dis.cn-north-1.myhuaweicloud.com:20004
      # config each flow to monitor file.
      flows:
      # 填写您创建的DIS通道的名称
      - DISStream: input-dis
      # 填写数据文件所在的路径
      filePattern: D:/disagent-cw/dis-agent-1.1.0/data/*.log
      # 不变
    4. 启动DIS Agent。
      • Linux环境:bin/start-dis-agent.sh
      • Windows环境:bin/start-dis-agent.bat

  2. 发送DIS数据。

    将您的数据文件放置在agent.yml中配置的文件路径处,您可以通过在本地写个小程序,如下所示,向文件中追加数据。您需将该小程序同样移到agent.yml中配置的文件路径处。

    1
    2
    3
    4
    5
    6
    import time
    
    for idx in range(10000):
    	with open("test.log", mode = "a+") as f:
    		f.write("api.huaweicloud.com||45.249.212.44||15421010072.675||200||651||228||POST /x/report/heartbeat HTTP/1.1||-||Mozilla/5.0 (Windows NT 6.0; rv:34.0) Gecko/20100101 Firefox/34.0||-||200||0.033||-.918nw0fj-||0.033||140.206.227.10:80" + "\n" + "api.huaweicloud.com||45.249.212.52||15421010072.875||200||651||228||POST /details/jobs HTTP/1.1||-||Mozilla/5.0 (Windows NT 6.0; rv:34.0) Gecko/20100101 Firefox/34.0||-||200||0.033||-.918nw0fj-||0.033||140.206.227.10:80" + "\n")
    	time.sleep(60)
    

  3. 登录OBS管理控制台,进入“output-obs” 桶下“logInfos”目录,单击“下载”,查看输出结果。
【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。