华为云实践课程:实时流计算日志分析操作
任务介绍
本示例从DIS数据源读取数据,实时分析日志信息,向OBS输出源写数据。通过本示例,你会学习到:
创建并运行Flink SQL应用
完成“日志分析”示例场景
实时流计算服务(Cloud Stream Service, 简称CS)提供实时处理流式大数据的全栈能力, 简单易用, 即时执行 Stream SQL或自定义作业。无需关心计算集群, 无需学习编程技能。完全兼容Apache Flink和Spark API。
任务执行
本示例的输入源由DIS服务提供,输出通道由OBS服务提供,需先开通DIS服务和OBS服务,在DIS服务里配置您的 通道,在OBS服务里创建您的桶,然后在CS服务中创建Flink SQL作业,提交运行。
第一步:创建DIS通道和OBS桶
1. 创建DIS通道
进入DIS控制台,点击右侧 购买接入通道 ,创建一个DIS通道:注意选择通道所在 区域 , 数据源类型 选择 CSV , 通 道名称 填写 input-dis 。
2. 创建OBS桶
进入OBS控制台, 点击右上角的 创建桶 ,创建一个OBS桶: janeconan-output-obs 作为数据输出通道。注意选择桶所在 区 域 。如果页面提示桶名已存在,您可按照个人喜好,取一个您喜欢的桶名。
在创建的 janeconan-output-obs 桶内创建 logInfos 目录,用来保存输出数据。进入 output-obs 桶,点击左侧的 对象 , 点击 新建文件夹 ,输入 logInfos ,完成目录创建。
第二步:创建Flink SQL作业
1. 进入CS控制台
方式一:直接进入 CS控制台 方式二:华为云官网 -> 产品 -> EI企业智能 -> 实时流计算服务,进入实时流计算的首页后,点击 立即使用
2. 新建Flink SQL作业
作业管理 -> 新建 :选择模板 [云生态]DIS-CS-OBS样例模板
编辑器:Flink SQL作业支持SQL编辑器和SQL可视化编辑器,这里选择 SQL编辑器
模版:目前提供了22个缺省模版,也支持用户自定义模版
点击“确认”,完成新建Flink SQL作业
3. 编辑Flink SQL作业
如果前面两个步骤您是按照指导文档给DIS通道和OBS取名,则Flink SQL作业不需要做任何修改,可以直接跳过此 步骤,进入下一步。
SQL编辑器中包含三部分内容:
source数据源:在 with 语句中配置,本示例选择的是DIS服务,因此需要配置以下参数:
配置DIS输入流
type = "dis"# 类型选择dis
region = "cn-north-1"# 作业所在的区域, cn-north-1 表示 华北-北京一 区域
channel = "input-dis" #DIS通道名称
partition_count = "1" # DIS通道的分区数
encode = "csv" # 数据编码方式,选择CSV
field_delimiter = "\\|\\|" # 当编码格式为csv时,属性之间的分隔符
quote = "\u005c\u0022" #指定数据格式中的引用符号,在两个引用符号之间的属性分隔符会被当做普 通字符处理。当引用符号为双引号时,请设置quote = "\u005c\u0022"进行转义
offset = "0" #偏移量,offset="0"时,表示CS服务从DIS服务的第0条数据开始处理sink输出源:在 with 语句中配置,本示例选择的是OBS服务,因此需要配置以下参数:
配置OBS输出流 type = "obs" # 类型选择obs
region = "cn-north-1" # 作业所在的区域,
cn-north-1 表示 华北-北京一 区域
encode = "csv" # 数据编码方式,选择CSV
field_delimiter = "," # 当编码格式为csv时,属性之间的分隔符
row_delimiter = "\n" #行分隔符
obs_dir = "janeconan-output-obs/logInfos" #文件存储目录,格式为{桶名}/{目录名}
file_prefix = "log_out" #输出文件名的前缀,默认为temp
rolling_size = "100m" #单个文件最大允许大小SQL 查询:形如 INSERT INTO log_out SELECT http_host,forward_ip,cast(cast(msec * 1000 as bigint) + 28800000 as timestamp),status,request_length,bytes_sent,string_to_array(request, '\\ ')[1],string_to_array(request, '\\ ')[2],http_referer,http_user_agent,upstream_cache_status,upstream_status,request_time,cookie_DedeUserID_cookie_sid_sent_http_logdata,upstream_response_time, upstream_addr, case IP_TO_PROVINCE(forward_ip) when"Guangxi" then "Guangxi Zhuang Autonomous Region" when "Ningxia" then "Ningxia HuiAutonomous Region" when "Taiwan" then "Taiwan Province" when "Macao" then "Macau" elseIP_TO_PROVINCE(forward_ip) end, case when http_user_agent like "%Chrome%" then "Chrome"when http_user_agent like "%Firefox%" then "Firefox" when http_user_agent like"%Safari%" then "Safari" else "Others" end FROM log_infos
4. 设置运行参数
在SQL编辑器的右侧,设置如下参数:
SPU:Stream Processing Units 流处理单元,一个SPU为1核4G的资源,每SPU 0.5元/小时。最低2个SPU 起。必选
并行数:Flink作业算子并行度,缺省为1。必选
开启checkpoint:是否开启Flink快照。非必选
保存作业日志:作业日志是否保存,选中会保存作业日志到您个人的OBS桶中。非必选
开启作业异常告警:作业异常后是否推送SMN消息(邮件和短线)。非必选
5.提交作业
完成作业编辑并且设置完运行参数后,可点击作业编辑器顶部的 语义校验 ,检查SQL语句的语法是否正确,若语义 校验通过,则点击 提交 。
至此,实时流计算方面的工作已经完成了,下面就要接入数据,查看实时计算结果。
第三步:发送DIS数据,测试结果
这里使用DIS Agent向云上DIS通道发送CSV结构的数据,DIS Agent是一个本地运行的代理,监控本地文件变化, 一旦文件中有新的数据追加,就会即时把新增的数据发送到DIS通道中,类似flume。
DIS Agent使用方法: DIS Agent的官网地址,上面有详细的DIS Agent使用指南,您也可以根据网站上的指南进行 操作,也可以参考下文进行操作。
1.启动DIS Agent
1. 下载DIS Agent
2. 本地解压
3. 修改 conf/agent.yml
4. 启动 DIS Agent : Linux环境 bin/start-dis-agent.sh ; Windows环境 bin/start-dis-agent.bat
--# 不变 region: cn-north-1 # user ak (get from 'My Credential') ak: 填写你的AK # user sk (get from 'My Credential') sk: 填写你的SK ak/sk:进入console控制台->右上角 我的账号 选择"我的凭证"-> "管理访问秘钥"->"新增访问秘钥" # user project id (get from 'My Credential') projectId: 进入console控制台->右上角 我的账号 选择"我的凭证"-> "项目列表"中选择"cn-north-1"对应 的"项目ID" # 不变。 endpoint: https://dis.cn-north-1.myhwclouds.com:20004 # config each flow to monitor file. flows: # 填写您创建的DIS通道的名称 - DISStream: input-dis # 填写数据文件所在的路径 filePattern: D:/disagent-cw/dis-agent-1.1.0/data/*.log # 不变
2.发送DIS数据
将您的数据文件放置在 agent.yml 中配置的文件路径处,您可以通过在本地写个小程序,向文件中追加数据,这 里使用的python,即资料中 send.py 文件,您需将它同样移到 agent.yml 中配置的文件路径处。
查看输出到janeconan-output-obs/logInfos的结果
点击上图中的 下载 ,查看输出结果,您的输出应该类似于下图给出的结果:
资源删除
如果您已经完成本示例,记得删除以下资源,以免资源继续产生费用,首先删除CS的Flink SQL作业,然后删除DIS 通道,最后删除OBS桶,删除obs桶需要先删除桶内的数据,才能删除桶。
感受
其实最终感觉有意思的是在CS服务中的实时监控
- 点赞
- 收藏
- 关注作者
评论(0)