华为云实时流计算服务CS

举报
砖头2333 发表于 2019/01/20 00:14:30 2019/01/20
【摘要】 1. CS服务说明具体可以参考官网 https://console.huaweicloud.com/cs/?region=cn-north-1#/dashboard ,根据我的学习了解,CS服务提供了一种监控预警的流水线系统。首先是监控,当我们关注的某些数据更新后,通过DIS agent上传至CS服务,然后通过Flink SQL作业任务筛选出以及判断关键数据是否符合要求,然后根据实际情况向...

 1. CS服务说明


具体可以参考官网 https://console.huaweicloud.com/cs/?region=cn-north-1#/dashboard ,根据我的学习了解,CS服务提供了一种监控预警的流水线系统。首先是监控,当我们关注的某些数据更新后,通过DIS agent上传至CS服务,然后通过Flink SQL作业任务筛选出以及判断关键数据是否符合要求,然后根据实际情况向用户发送消息,看到这里你看已经蒙了,但是后面会用我的实验过程消息描述。


 2. 服务对象


当我做完基础实验后基本了解了整个过程,实验是监控车速并预警,同理,很显然容易想到类似的使用场景,比如监控股票涨跌(这个对实时性要求较高)、灾害天气预警(这个可以有)、热点新闻推送(这个可以,但没必要)等等,根据上文介绍的流程以及我举出的例子,相信你已经明白了我接下来要介绍的CS服务。


本文建立在天气预警案例上,我的目标是获取北京市天气信息,若在一段时间内天气温度始终低于某个温度5摄氏度,那就发送预警消息。当然这只是简单的流程,理论上完整的天气预警应该是面向多个城市,实时性要求相对较高,判断条件应该是未来1到2小时内的天气预测信息,以及最高温度、最低温度、湿度和PM25等信息进行综合评价,然后发送给用户一个通过机器学习预测的结果。


 3. 实现过程


 3.1 天气数据获取


天气数据一般的获取手段是通过api获取,市面上有很多免费或者收费的api,发送请求就可以得到数据,还有一种傻一点的方式,通过爬虫爬取页面的天气信息。后者相对复杂,前者简单,但是需要靠谱的数据源,我这里找到了一个免费公开的api,然后用python获取数据并写入到文件中。

# ==============================================================================
"""向log文件中定时写入数据模拟天气变化
版本
   Python3.6.7
参考
   https://www.sojson.com/blog/305.html
数据来源不知名小站天气免费api源数据格式json目标是解析json文件获取data
按照10秒写入一次当前数据模拟天气的动态变化样例请求的是北京天气city_code为101010100
apihttp://t.weather.sojson.com/api/weather/city/101010100最后面是city_code
"""
import json
import time
import requests


def get_weather_data(city_code='101010100'):
"""根据city_code获取json数据CS服务接受json数据但是不熟悉所以还是最后使用csv
   数据样例:
   {
       "time": "2019-01-19 19:00:40",
       "cityInfo":{
           "city": "北京市",
           "cityId": "101010100",
           "parent": "北京",
           "updateTime": "18:47"
       },
       "date": "20190119",
       "message": "Success !",
       "status": 200,
       "data":{
           "shidu": "24%",
           "pm25": 14,
           "pm10": 37,
           "quality": "优",
           "wendu": "2",
           "ganmao": "各类人群可自由活动",
           "yesterday":{"date": "18", "sunrise": "07:34", "high": "高温 6.0℃", "low": "低温 -6.0℃",…},
           "forecast":[{"date": "19", "sunrise": "07:33", "high": "高温 4.0℃", "low": "低温 -5.0℃",…]
       }
   }
   Args:
       city_code: 城市代码比如北京是101010100
   Returns:
       data: 解析并筛选的数据
   """
url = ('http://t.weather.sojson.com/api/weather/city/{}').format(city_code)
# requests获取到的数据是byte类型我们decode为utf-8
response = requests.get(url)
data = response.content.decode('utf-8')
# decode后得到str类型数据若满足条件数组或对象之中的字符串必须使用双引号
# 不能使用单引号则可以使用json.loads解析为json字典数据
data = json.loads(data)
return data

def write2log(weather_data, file_path='./weather.log', delay=10):
"""将得到的list数据写入到.log文件中
   Args:
       weather_data: 解析得到的数据
       file_path: log文件保存路径
       delay: 模拟的时间间隔 单位秒
   """
my_weather_data = []
my_weather_data.append(weather_data['time'])
my_weather_data.append(weather_data['cityInfo']['city'])
my_weather_data.append(weather_data['cityInfo']['cityId'])
my_weather_data.append(weather_data['data']['shidu'])
my_weather_data.append(str(weather_data['data']['pm25']))
my_weather_data.append(weather_data['data']['quality'])
my_weather_data.append(weather_data['data']['wendu'])

my_weather_data = ','.join(my_weather_data)
print(my_weather_data)
with open(file_path, 'a+') as file:
file.write(str(my_weather_data) + '\n')
time.sleep(delay)

if __name__ == "__main__":
for i in range(200):
data = get_weather_data()
write2log(data)


 3.2 DIS Agent配置


基本是按照实验步骤配置的,通道、主题、订阅什么的就省略不写了,因为可以复用

---
# cloud region id
region: cn-north-1
# user ak (get from 'My Credential')
ak: AK
# user sk (get from 'My Credential')
sk: SK
# user project id (get from 'My Credential')
projectId: XX
# the dis gw endpoint
endpoint: https://dis.cn-north-1.myhuaweicloud.com:20004
# config each flow to monitor file.
flows:
# DIS stream
- DISStream: csinput
# only support specified directory, filename can use * to match some files. eg. * means match all file, test*.log means match test1.log or test-12.log and so on.
filePattern: /Users/zhoutao/Downloads/dis-agent/data/*.log
# from where to start: 'START_OF_FILE' or 'END_OF_FILE'
initialPosition: START_OF_FILE
# upload max interval(ms)
maxBufferAgeMillis: 5000


启动DIS Agent并且启动天气数据写入的脚本后成功运行的log日志(在log文件夹下)


log.png


python脚本运行结果


python.png


3.3 Flink SQL作业配置


大体上是参照基础实验做的,小部分的内容进行了修改,sql不会,随便改了点代码,理论上来说这个是直接支持json文件的,但是我还是按照样例csv来搞了

/**
 *
 * >>>>>样例输入<<<<<
 *  流名: weather_data(WeatherTime,City,CityID,Shidu,PM25,Quality,Wendu):
 *  2019-01-19 19:00:40,北京市,101010100,24%,14.0,优,2
 * >>>>>样例输出<<<<<
 *  流名: weather_info(WeatherTime,City,Shidu,PM25,Quality,Wendu):
 *  2019-01-19 19:00:40,北京市,24%,14.0,优,2
 *  流名: low_wendu_msg(MessageContent)
 *  北京市温度为2摄氏度请注意保暖
 **/
/** 创建输入流从DIS的csinput通道获取数据。
 *
 * 根据实际情况修改以下选项
 * channel数据所在通道名
 * partition_count该通道分区数
 * encode: 数据编码方式可以是csv或json
 * field_delimiter当编码格式为csv时属性之间的分隔符
 **/
CREATE SOURCE STREAM weather_data (
WeatherTime STRING,
City STRING,
CityID STRING,
Shidu STRING,
PM25 STRING,
Quality STRING,
Wendu FLOAT
)
WITH (
type = "dis",
region = "cn-north-1",
channel = "csinput",
partition_count = "1",
encode = "csv",
field_delimiter = ","
) TIMESTAMP BY proctime.proctime;

/** 创建输出流结果输出到DIS的csoutput通道。
 *
 * 根据实际情况修改以下选项
 * channel数据所在通道名
 * partition_key当通道有多个分区时用来分发的主键
 * encode 结果编码方式可以为csv或者json
 * field_delimiter: 当编码格式为csv时属性之间的分隔符
 **/
CREATE SINK STREAM weather_info (
WeatherTime STRING,
City STRING,
Shidu STRING,
PM25 STRING,
Quality STRING,
Wendu FLOAT
)
WITH (
type = "dis",
region = "cn-north-1",
channel = "csoutput",
partition_key = "City",
encode = "csv",
field_delimiter = ","
);

/** 将部分字段输出 **/
INSERT INTO weather_info
SELECT WeatherTime,City,Shidu,PM25,Quality,Wendu
FROM weather_data;

/** 创建输出流结果输出到SMN。
 *
 * 根据实际情况修改以下选项
 * topic_urnSMN服务的主题URN作为消息通知的目标主题需要提前在SMN服务中创建
 * message_subject发往SMN服务的消息标题
 * message_column输出流的列名其内容作为消息的内容
 **/
CREATE SINK STREAM low_wendu_msg (
MessageContent STRING
)
WITH (
type = "smn",
region = "cn-north-1",
topic_urn = "urn:smn:cn-north-1:5d72f4f9035c4b6e937aa5efa483e83f:message",
message_subject = "message",
message_column = "MessageContent"
);

/** 当100秒内温度在5度以下的次数超过5次发送告警消息到SMN服务实现用户手机终端实时告警功能 **/
INSERT INTO low_wendu_msg
SELECT City || "温度为" || CAST(Wendu as VARCHAR(20)) || "摄氏度请注意保暖"
FROM (
SELECT City, MAX(Wendu) AS Wendu, COUNT(Wendu) AS lowwendu_count
FROM weather_data
WHERE Wendu < 5
GROUP BY TUMBLE (proctime, INTERVAL '60' SECOND), City
)
WHERE lowwendu_count >= 5;


最后这部分的判断,需要sql了解一点,后续定个学习计划


4. 结果


Flink作业成功运行了,且接收到了数据


cs.png


最后成功接收到了定制的邮件,这里用的是icloud邮箱,qq可能收不到订阅请求,请自行尝试


email.png


后续思考,如果配合爬虫应该能更加灵活的配置CS服务,想一想还是可以搞得。



【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。