华为云实时流计算服务初体验
初次听说华为云实时流计算服务(Cloud Stream Service, 简称CS)可以以纯SQL的方式去提交Spark/Flink实时流作业,就想去狠狠体验一把。一来想解决心中的一个梗(之前有基于可视化配置+Spark SQL封装流作业,但着实太low),二来想看看CS到底能带给我什么样的惊喜。
CS控制台
CS控制台左侧导航包含四部分:总览、作业管理、作业模板、集群管理。
总览
作业管理
创建、编辑、提交、监控作业。
作业模板
可自定义作业模板,提高后期作业编写效率。
集群管理
驾驶行为实时分析
目的
车载设备实时上报驾驶数据,每条日志包含四个字段:数据时间,经度,纬度,速度。
这里想要实现:
指定时间段内超速情况发送到手机短信。
指定时间段内平均速度发送到手机短信。
偏航情况发送到手机短信。
前提
开通DIS Channel服务
注:DIS 暂可简单理解为Kafka集群、Channel 可理解为Kafka Topic。
开通SMN短信服务
注:SMN短信服务可将结果发送到对应手机。
安装配置DIS Agent
注:DIS Agent可简单理解成Kafka Producer Client。这里DIS Agent以类似Tail -F的方式监控文件并将日志近实时发送到DIS Channel。
#下载解压 curl -O tar -zxvf dis-agent-1.1.0.zip #配置(配置DIS Channel和待监控文件) vim dis-agent-1.1.0/conf/agent.yml region: cn-north-1 # user ak (get from 'My Credential') ak: your_ak # user sk (get from 'My Credential') sk: your_sk # user project id (get from 'My Credential') projectId: your_projectId # the dis gw endpoint endpoint: https://dis.cn-north-1.myhuaweicloud.com:20004 # config each flow to monitor file. flows: # DIS Channel名称 - DISStream: input-dis-C0Db # 待监控文件或目录,支持正则 filePattern: /Users/wangpei/software/dis-agent-1.1.0/data/*.log # from where to start: 'START_OF_FILE' or 'END_OF_FILE' initialPosition: START_OF_FILE # upload max interval(ms) maxBufferAgeMillis: 5000 #启动Dis Agent cd ${DIS_AGENT_HOME} bin/start-dis-agent.sh #查看Dis Agent 日志 tail -f dis-agent-1.1.0/logs/dis-agent.log
向监控文件追加日志
这里用一段python向本地文件追加日志。也可用shell脚本。
#日志格式:数据时间,经度,纬度,速度 import time import random if __name__ == '__main__': f = open("/Users/wangpei/software/dis-agent-1.1.0/data/test.log",mode="a") geoPoints=[(23.1234532,35.3321232),(23.1234532,35.3411262),(23.1234532,35.3511272)] while True: dataTime=int(time.time())*1000 longitude,latitude=random.choice(geoPoints) velocity=random.randint(80,110) log=",".join([str(dataTime),str(longitude),str(latitude),str(velocity)]) f.write(log) f.write("\n") f.flush() print "写入:"+log time.sleep(1)
查看日志是否发送功
tail -f dis-agent-1.1.0/logs/dis-agent.log [INFO ] (sender-001-dis:input-dis-C0Db:/Users/wangpei/software/dis-agent-1.1.0/data/*.log) com.bigdata.dis.agent.tailing.DISSender Put 6 records to [input-dis-C0Db] spend 212ms, latestSequenceNumber [ShardId: shardId-0000000000, SequenceNumber: 298476]
创建作业
在作业管理中新建作业。
类型:可以选Flink SQL 作业 或 Spark 作业。
编辑器:可选SQL编辑器和可视化编辑器。
模板:可直接基于已有模板快速创建作业。
编辑作业并提交
支持SQL语法检查、Debug调试、保存为模板、SQL格式化、切换主题颜色、修改作业描述、帮助等。
/* 输入 DIS Channel 类似 Kafka Topic */ CREATE SOURCE STREAM driver_behavior ( DataTime BIGINT, /* 数据时间 */ Longitude FLOAT, /* 经度 */ Latitude FLOAT, /* 纬度 */ Velocity FLOAT /* 车速 */ ) WITH ( type = "dis", region = "cn-north-1", channel = "input-dis-C0Db", partition_count = "1", encode = "csv", field_delimiter = "," ) TIMESTAMP BY DataTime.rowtime SET WATERMARK (RANGE interval 1 second, interval 1 second); /* 输出 SMN短信 */ CREATE SINK STREAM toMSG ( MessageContent STRING ) WITH ( type = "smn", region = "cn-north-1", topic_urn = "urn:smn:cn-north-1:2b7ca6e07a344157af769b76bea3551c:out-put-smn", message_subject = "out-put-smn", message_column = "MessageContent" ); /** 业务逻辑 **/ /** 业务1: 30秒内超速三次的设备的超速次数和最大速度 **/ INSERT INTO toMSG SELECT "30秒内超速次数: " || CAST(overspeed_count as VARCHAR(20)) || "次,最大速度为:" || CAST(Velocity as VARCHAR(20)) FROM ( SELECT MAX(Velocity) AS Velocity, //最大速度 COUNT(Velocity) AS overspeed_count //超速次数 FROM driver_behavior WHERE Velocity > 90 GROUP BY TUMBLE (DataTime, INTERVAL '30' SECOND) ) WHERE overspeed_count >= 3; /** 业务2: 10秒内的平均速度 AVG_SPEED:地理函数,通过地理点计算一段时间内的平均速度 **/ INSERT INTO toMSG SELECT "10秒内的平均速度:" || cast(avg_speed as VARCHAR(20)) from( SELECT AVG_SPEED(ST_POINT(Longitude, Latitude)) as avg_speed FROM driver_behavior GROUP BY TUMBLE (DataTime, INTERVAL '10' SECOND) ); /** 业务3: 偏航告警 ST_WITHIN:地理函数,一个点是否包含在几何体,几何体定义了既定路径 **/ INSERT INTO toMSG SELECT "已偏移既定路径,速度为 " || CAST(Velocity as VARCHAR(20)) FROM driver_behavior --ST_WITHIN 为地理位置函数,见华为官网 WHERE NOT ST_WITHIN(ST_POINT(cast(Longitude as DOUBLE), cast(Latitude as DOUBLE)), ST_BUFFER(ST_LINE(ARRAY[ST_POINT(34.585555,105.725221),ST_POINT(34.586313,105.725168),ST_POINT(34.586982,105.72515),ST_POINT(34.587064,105.72603),ST_POINT(34.587123,105.727593),ST_POINT(34.587071,105.728698),ST_POINT(34.587012,105.72956),ST_POINT(34.586982,105.730674),ST_POINT(34.586922,105.73177),ST_POINT(34.586863,105.732758),ST_POINT(34.586818,105.733782),ST_POINT(34.586789,105.73486),ST_POINT(34.586729,105.735974),ST_POINT(34.586655,105.73698),ST_POINT(34.58661,105.738202),ST_POINT(34.58661,105.739226),ST_POINT(34.586492,105.740538),ST_POINT(34.586388,105.741651),ST_POINT(34.586298,105.742783),ST_POINT(34.586224,105.743879),ST_POINT(34.586165,105.745227),ST_POINT(34.586076,105.746359),ST_POINT(34.58609,105.747562),ST_POINT(34.586135,105.748712),ST_POINT(34.58612,105.749952),ST_POINT(34.587086,105.750006),ST_POINT(34.588691,105.74997)]),0.001))
作业运行时
提交作业后,作业就进入运行阶段。点击作业管理>作业名称,查看作业运行情况。
作业详情
图表化展示作业输入输出的速率、总记录数、总字节数。
执行计划
类似Spark / Flink DAG。展示每一步执行情况。
任务列表
展示当前作业执行的任务列表。
运行日志
展示Flink 作业JobManager、TaskManager运行日志。
作业输出
总结
优势
流处理,只需编写SQL即可,大大降低了开发难度,提高了开发效率。
可视化界面统一管理流作业,后期方便维护。
同时兼容Flink和Spark两大组件。
输入输出可配置。输入输出已经以插件形式存在,编写作业时只需简单配置即可。
对开发友好。有SQL语法检查、Debug、作业监控、执行计划、运行日志、一键启停等等。
对运营和BI友好,数据实时洞察只需要托拉拽即可快速触发流式作业。
需要改进
个人认为有以下几点可能需要强化或改进:
增强对更复杂逻辑的支持,以应对复杂流处理。
增强对Spark SQL的支持,目前Spark流作业的触发需要依赖提交的Jar包。
增强对主流输入输出的插件化支持。增强托拉拽触发流作业的功能,流处理除了要面对开发,还需要面对运营产品等非SQL人员。
增强对实时处理和批处理的结合。
增强对流作业的监控和告警。如流作业发不出短信,也不报错,应该有短信告警,告知那个环节出了问题。
最后
华为云实时流计算服务CS确实让我很惊喜,应用于普通实时监控、实时分析等场景应该没什么问题。同时,从诸多细节,可以发现,
华为云确实有在用心打磨这个产品。
期待有朝一日,华为云实时流计算服务能大规模应用到各种实时流计算场景,甚至应用到机器学习、深度学习、图计算等领域!
最后,由衷感谢华为云推出实时流计算服务,大大降低了流处理门槛,推动流计算领域的发展!感谢!
- 点赞
- 收藏
- 关注作者
评论(0)