华为云实践课程:实时流计算日志分析操作

举报
JaneConan 发表于 2019/01/16 15:36:41 2019/01/16
【摘要】 任务介绍 本示例从DIS数据源读取数据,实时分析日志信息,向OBS输出源写数据。通过本示例,你会学习到: 创建并运行Flink SQL应用 完成“日志分析”示例场景实时流计算服务(Cloud Stream Service, 简称CS)提供实时处理流式大数据的全栈能力, 简单易用, 即时执行 Stream SQL或自定义作业。无需关心计算集群, 无需学习编程技能。完全兼容Apache Flin...

任务介绍 

本示例从DIS数据源读取数据,实时分析日志信息,向OBS输出源写数据。通过本示例,你会学习到: 


创建并运行Flink SQL应用 

完成“日志分析”示例场景


实时流计算服务(Cloud Stream Service, 简称CS)提供实时处理流式大数据的全栈能力, 简单易用, 即时执行 Stream SQL或自定义作业。无需关心计算集群, 无需学习编程技能。完全兼容Apache Flink和Spark API。


任务执行 

本示例的输入源由DIS服务提供,输出通道由OBS服务提供,需先开通DIS服务和OBS服务,在DIS服务里配置您的 通道,在OBS服务里创建您的桶,然后在CS服务中创建Flink SQL作业,提交运行。 


第一步:创建DIS通道和OBS桶

1. 创建DIS通道 

进入DIS控制台,点击右侧 购买接入通道 ,创建一个DIS通道:注意选择通道所在 区域 , 数据源类型 选择 CSV , 通 道名称 填写 input-dis 。

image.png

2. 创建OBS桶 

进入OBS控制台, 点击右上角的 创建桶 ,创建一个OBS桶: janeconan-output-obs 作为数据输出通道。注意选择桶所在 区 域 。如果页面提示桶名已存在,您可按照个人喜好,取一个您喜欢的桶名。

image.png

在创建的 janeconan-output-obs 桶内创建 logInfos 目录,用来保存输出数据。进入 output-obs 桶,点击左侧的 对象 , 点击 新建文件夹 ,输入 logInfos ,完成目录创建。

image.png


第二步:创建Flink SQL作业 

1. 进入CS控制台 

方式一:直接进入 CS控制台 方式二:华为云官网 -> 产品 -> EI企业智能 -> 实时流计算服务,进入实时流计算的首页后,点击 立即使用

image.png


2. 新建Flink SQL作业 

作业管理 -> 新建 :选择模板 [云生态]DIS-CS-OBS样例模板

image.png


编辑器:Flink SQL作业支持SQL编辑器和SQL可视化编辑器,这里选择 SQL编辑器 

模版:目前提供了22个缺省模版,也支持用户自定义模版 


点击“确认”,完成新建Flink SQL作业



3. 编辑Flink SQL作业 

如果前面两个步骤您是按照指导文档给DIS通道和OBS取名,则Flink SQL作业不需要做任何修改,可以直接跳过此 步骤,进入下一步。


SQL编辑器中包含三部分内容: 

  1. source数据源:在 with 语句中配置,本示例选择的是DIS服务,因此需要配置以下参数:
    配置DIS输入流
    type = "dis"# 类型选择dis
    region = "cn-north-1"# 作业所在的区域, cn-north-1 表示 华北-北京一 区域
    channel = "input-dis" #DIS通道名称
    partition_count = "1" # DIS通道的分区数
    encode = "csv" # 数据编码方式,选择CSV
    field_delimiter = "\\|\\|" # 当编码格式为csv时,属性之间的分隔符
    quote = "\u005c\u0022" #指定数据格式中的引用符号,在两个引用符号之间的属性分隔符会被当做普 通字符处理。当引用符号为双引号时,请设置quote = "\u005c\u0022"进行转义
    offset = "0" #偏移量,offset="0"时,表示CS服务从DIS服务的第0条数据开始处理

  2. sink输出源:在 with 语句中配置,本示例选择的是OBS服务,因此需要配置以下参数:
    配置OBS输出流 type = "obs" # 类型选择obs
    region = "cn-north-1" # 作业所在的区域,
    cn-north-1 表示 华北-北京一 区域
    encode = "csv" # 数据编码方式,选择CSV
    field_delimiter = "," # 当编码格式为csv时,属性之间的分隔符
    row_delimiter = "\n" #行分隔符
    obs_dir = "janeconan-output-obs/logInfos" #文件存储目录,格式为{桶名}/{目录名}
    file_prefix = "log_out" #输出文件名的前缀,默认为temp
    rolling_size = "100m" #单个文件最大允许大小

  3. SQL 查询:形如 INSERT INTO log_out SELECT http_host,forward_ip,cast(cast(msec * 1000 as bigint) + 28800000 as timestamp),status,request_length,bytes_sent,string_to_array(request, '\\ ')[1],string_to_array(request, '\\ ')[2],http_referer,http_user_agent,upstream_cache_status,upstream_status,request_time,cookie_DedeUserID_cookie_sid_sent_http_logdata,upstream_response_time, upstream_addr, case IP_TO_PROVINCE(forward_ip) when"Guangxi" then "Guangxi Zhuang Autonomous Region" when "Ningxia" then "Ningxia HuiAutonomous Region" when "Taiwan" then "Taiwan Province" when "Macao" then "Macau" elseIP_TO_PROVINCE(forward_ip) end, case when http_user_agent like "%Chrome%" then "Chrome"when http_user_agent like "%Firefox%" then "Firefox" when http_user_agent like"%Safari%" then "Safari" else "Others" end FROM log_infos


4. 设置运行参数 

在SQL编辑器的右侧,设置如下参数:

SPU:Stream Processing Units 流处理单元,一个SPU为1核4G的资源,每SPU 0.5元/小时。最低2个SPU 起。必选 

并行数:Flink作业算子并行度,缺省为1。必选 

开启checkpoint:是否开启Flink快照。非必选 

保存作业日志:作业日志是否保存,选中会保存作业日志到您个人的OBS桶中。非必选 

开启作业异常告警:作业异常后是否推送SMN消息(邮件和短线)。非必选 


image.png

5.提交作业 

完成作业编辑并且设置完运行参数后,可点击作业编辑器顶部的 语义校验 ,检查SQL语句的语法是否正确,若语义 校验通过,则点击 提交 。

至此,实时流计算方面的工作已经完成了,下面就要接入数据,查看实时计算结果。 


第三步:发送DIS数据,测试结果 

这里使用DIS Agent向云上DIS通道发送CSV结构的数据,DIS Agent是一个本地运行的代理,监控本地文件变化, 一旦文件中有新的数据追加,就会即时把新增的数据发送到DIS通道中,类似flume。 

DIS Agent使用方法: DIS Agent的官网地址,上面有详细的DIS Agent使用指南,您也可以根据网站上的指南进行 操作,也可以参考下文进行操作。 

1.启动DIS Agent 

1. 下载DIS Agent
2. 本地解压
3. 修改 conf/agent.yml
4. 启动 DIS Agent : Linux环境 bin/start-dis-agent.sh ; Windows环境 bin/start-dis-agent.bat

--# 不变 region: cn-north-1 # user ak (get from 'My Credential') ak: 填写你的AK # user sk (get from 'My Credential') sk: 填写你的SK ak/sk:进入console控制台->右上角 我的账号 选择"我的凭证"-> "管理访问秘钥"->"新增访问秘钥" # user project id (get from 'My Credential') projectId: 进入console控制台->右上角 我的账号 选择"我的凭证"-> "项目列表"中选择"cn-north-1"对应 的"项目ID" # 不变。 endpoint: https://dis.cn-north-1.myhwclouds.com:20004 # config each flow to monitor file. flows:  # 填写您创建的DIS通道的名称  - DISStream: input-dis    # 填写数据文件所在的路径    filePattern: D:/disagent-cw/dis-agent-1.1.0/data/*.log    # 不变

2.发送DIS数据 

将您的数据文件放置在 agent.yml 中配置的文件路径处,您可以通过在本地写个小程序,向文件中追加数据,这 里使用的python,即资料中 send.py 文件,您需将它同样移到 agent.yml 中配置的文件路径处。

image.png

image.png

image.png

查看输出到janeconan-output-obs/logInfos的结果

点击上图中的 下载 ,查看输出结果,您的输出应该类似于下图给出的结果:

image.png

资源删除 

如果您已经完成本示例,记得删除以下资源,以免资源继续产生费用,首先删除CS的Flink SQL作业,然后删除DIS 通道,最后删除OBS桶,删除obs桶需要先删除桶内的数据,才能删除桶。



感受


其实最终感觉有意思的是在CS服务中的实时监控

image.png

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

举报
请填写举报理由
0/200