华为云实时流计算服务初体验

举报
天天向上✌️ 发表于 2018/09/11 22:57:24 2018/09/11
【摘要】 初次听说华为云实时流计算服务(Cloud Stream Service, 简称CS)可以以纯SQL的方式去提交Spark/Flink实时流作业,就想去狠狠体验一把。一来想解决心中的一个梗(之前有基于可视化配置+Spark SQL封装流作业,但着实太low),二来想看看CS到底能带给我什么样的惊喜。

    初次听说华为云实时流计算服务(Cloud Stream Service, 简称CS)可以以纯SQL的方式去提交Spark/Flink实时流作业,就想去狠狠体验一把。一来想解决心中的一个梗(之前有基于可视化配置+Spark SQL封装流作业,但着实太low),二来想看看CS到底能带给我什么样的惊喜。


CS控制台


CS控制台左侧导航包含四部分:总览、作业管理、作业模板、集群管理。


总览

image.png

作业管理

创建、编辑、提交、监控作业。


image.png


作业模板


可自定义作业模板,提高后期作业编写效率。


image.png

集群管理


image.png


驾驶行为实时分析



目的

    车载设备实时上报驾驶数据,每条日志包含四个字段:数据时间,经度,纬度,速度。

    这里想要实现:

  • 指定时间段内超速情况发送到手机短信。

  • 指定时间段内平均速度发送到手机短信。

  • 偏航情况发送到手机短信。

前提


  • 开通DIS Channel服务


    注:DIS 暂可简单理解为Kafka集群、Channel 可理解为Kafka Topic。


  • 开通SMN短信服务


    注:SMN短信服务可将结果发送到对应手机。


  • 安装配置DIS Agent


    注:DIS Agent可简单理解成Kafka Producer Client。这里DIS Agent以类似Tail -F的方式监控文件并将日志近实时发送到DIS Channel。


#下载解压
curl -O 

tar -zxvf dis-agent-1.1.0.zip 


#配置(配置DIS Channel和待监控文件)
vim dis-agent-1.1.0/conf/agent.yml

region: cn-north-1
# user ak (get from 'My Credential')
ak: your_ak
# user sk (get from 'My Credential')
sk: your_sk
# user project id (get from 'My Credential')
projectId: your_projectId
# the dis gw endpoint
endpoint: https://dis.cn-north-1.myhuaweicloud.com:20004
# config each flow to monitor file.
flows:
  # DIS Channel名称
  - DISStream: input-dis-C0Db
    # 待监控文件或目录,支持正则
    filePattern: /Users/wangpei/software/dis-agent-1.1.0/data/*.log
    # from where to start: 'START_OF_FILE' or 'END_OF_FILE'
    initialPosition: START_OF_FILE
    # upload max interval(ms)
    maxBufferAgeMillis: 5000
    
    
#启动Dis Agent
cd ${DIS_AGENT_HOME}
bin/start-dis-agent.sh


#查看Dis Agent 日志
tail -f dis-agent-1.1.0/logs/dis-agent.log


  • 向监控文件追加日志


    这里用一段python向本地文件追加日志。也可用shell脚本。


#日志格式:数据时间,经度,纬度,速度
import time
import random

if __name__ == '__main__':

    f = open("/Users/wangpei/software/dis-agent-1.1.0/data/test.log",mode="a")

    geoPoints=[(23.1234532,35.3321232),(23.1234532,35.3411262),(23.1234532,35.3511272)]

    while True:
        dataTime=int(time.time())*1000
        longitude,latitude=random.choice(geoPoints)
        velocity=random.randint(80,110)
        log=",".join([str(dataTime),str(longitude),str(latitude),str(velocity)])
        f.write(log)
        f.write("\n")
        f.flush()
        print "写入:"+log
        time.sleep(1)


  • 查看日志是否发送功


tail -f dis-agent-1.1.0/logs/dis-agent.log

[INFO ] (sender-001-dis:input-dis-C0Db:/Users/wangpei/software/dis-agent-1.1.0/data/*.log)
 com.bigdata.dis.agent.tailing.DISSender  Put 6 records to [input-dis-C0Db] spend 212ms, latestSequenceNumber [ShardId: shardId-0000000000, SequenceNumber: 298476]


创建作业


在作业管理中新建作业。


类型:可以选Flink SQL 作业 或 Spark 作业。


编辑器:可选SQL编辑器和可视化编辑器。


模板:可直接基于已有模板快速创建作业。


image.png


编辑作业并提交


支持SQL语法检查、Debug调试、保存为模板、SQL格式化、切换主题颜色、修改作业描述、帮助等。



image.png

/* 输入   DIS Channel 类似 Kafka Topic */
CREATE SOURCE STREAM driver_behavior (
    DataTime BIGINT, /* 数据时间 */
    Longitude FLOAT, /* 经度 */
    Latitude FLOAT, /* 纬度 */
    Velocity FLOAT /* 车速 */
) WITH (
  type = "dis",
  region = "cn-north-1",
  channel = "input-dis-C0Db",
  partition_count = "1",
  encode = "csv",
  field_delimiter = ","
) TIMESTAMP BY DataTime.rowtime SET WATERMARK (RANGE interval 1 second, interval 1 second);

/* 输出   SMN短信 */
CREATE SINK STREAM toMSG (
  MessageContent STRING
) WITH (
  type = "smn",
  region = "cn-north-1",
  topic_urn = "urn:smn:cn-north-1:2b7ca6e07a344157af769b76bea3551c:out-put-smn",
  message_subject = "out-put-smn",
  message_column = "MessageContent"
);

/** 业务逻辑 **/

/**
    业务1: 30秒内超速三次的设备的超速次数和最大速度
 **/
INSERT INTO toMSG
SELECT "30秒内超速次数: " || CAST(overspeed_count as VARCHAR(20)) || "次,最大速度为:" || CAST(Velocity as VARCHAR(20))
FROM (
  SELECT
    MAX(Velocity) AS Velocity, //最大速度
    COUNT(Velocity) AS overspeed_count //超速次数
  FROM driver_behavior
  WHERE Velocity > 90
  GROUP BY TUMBLE (DataTime, INTERVAL '30' SECOND)
) WHERE overspeed_count >= 3;


/**
    业务2: 10秒内的平均速度
    AVG_SPEED:地理函数,通过地理点计算一段时间内的平均速度
**/
INSERT INTO toMSG
SELECT "10秒内的平均速度:" || cast(avg_speed as VARCHAR(20))
from(
SELECT AVG_SPEED(ST_POINT(Longitude, Latitude)) as avg_speed
FROM driver_behavior
GROUP BY TUMBLE (DataTime, INTERVAL '10' SECOND)
);

/**
    业务3: 偏航告警
    ST_WITHIN:地理函数,一个点是否包含在几何体,几何体定义了既定路径
 **/
INSERT INTO toMSG
SELECT "已偏移既定路径,速度为 " || CAST(Velocity as VARCHAR(20))
FROM driver_behavior
--ST_WITHIN 为地理位置函数,见华为官网
WHERE NOT ST_WITHIN(ST_POINT(cast(Longitude as DOUBLE), cast(Latitude as DOUBLE)), ST_BUFFER(ST_LINE(ARRAY[ST_POINT(34.585555,105.725221),ST_POINT(34.586313,105.725168),ST_POINT(34.586982,105.72515),ST_POINT(34.587064,105.72603),ST_POINT(34.587123,105.727593),ST_POINT(34.587071,105.728698),ST_POINT(34.587012,105.72956),ST_POINT(34.586982,105.730674),ST_POINT(34.586922,105.73177),ST_POINT(34.586863,105.732758),ST_POINT(34.586818,105.733782),ST_POINT(34.586789,105.73486),ST_POINT(34.586729,105.735974),ST_POINT(34.586655,105.73698),ST_POINT(34.58661,105.738202),ST_POINT(34.58661,105.739226),ST_POINT(34.586492,105.740538),ST_POINT(34.586388,105.741651),ST_POINT(34.586298,105.742783),ST_POINT(34.586224,105.743879),ST_POINT(34.586165,105.745227),ST_POINT(34.586076,105.746359),ST_POINT(34.58609,105.747562),ST_POINT(34.586135,105.748712),ST_POINT(34.58612,105.749952),ST_POINT(34.587086,105.750006),ST_POINT(34.588691,105.74997)]),0.001))


作业运行时


提交作业后,作业就进入运行阶段。点击作业管理>作业名称,查看作业运行情况。



image.png


  • 作业详情


图表化展示作业输入输出的速率、总记录数、总字节数。


  • 执行计划


类似Spark / Flink DAG。展示每一步执行情况。


image.png


  • 任务列表


展示当前作业执行的任务列表。



image.png


  • 运行日志


展示Flink 作业JobManager、TaskManager运行日志。


image.png


作业输出


image.png


总结

优势

  1. 流处理,只需编写SQL即可,大大降低了开发难度,提高了开发效率。

  2. 可视化界面统一管理流作业,后期方便维护。

  3. 同时兼容Flink和Spark两大组件。

  4. 输入输出可配置。输入输出已经以插件形式存在,编写作业时只需简单配置即可。

  5. 对开发友好。有SQL语法检查、Debug、作业监控、执行计划、运行日志、一键启停等等。

  6. 对运营和BI友好,数据实时洞察只需要托拉拽即可快速触发流式作业。


需要改进

   个人认为有以下几点可能需要强化或改进:

  1. 增强对更复杂逻辑的支持,以应对复杂流处理。

  2. 增强对Spark SQL的支持,目前Spark流作业的触发需要依赖提交的Jar包。

  3. 增强对主流输入输出的插件化支持。增强托拉拽触发流作业的功能,流处理除了要面对开发,还需要面对运营产品等非SQL人员。

  4. 增强对实时处理和批处理的结合。

  5. 增强对流作业的监控和告警。如流作业发不出短信,也不报错,应该有短信告警,告知那个环节出了问题。


最后


    华为云实时流计算服务CS确实让我很惊喜,应用于普通实时监控、实时分析等场景应该没什么问题。同时,从诸多细节,可以发现,

华为云确实有在用心打磨这个产品。

    

    期待有朝一日,华为云实时流计算服务能大规模应用到各种实时流计算场景,甚至应用到机器学习、深度学习、图计算等领域!


    最后,由衷感谢华为云推出实时流计算服务,大大降低了流处理门槛,推动流计算领域的发展!感谢!

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。