IoT开发,将数据转发至OBS长期储存【零代码零硬件玩转华为云IoT】
文章摘要
本文将使用 Python 编写一个MQTT应用程序,通过该程序上报数据至华为云IoT平台。华为云IoT平台收到数据后,根据预先配置的数据转发规则,将数据转存至对象存储服务(OBS)。
理论介绍
对于设备上报的数据,有两种方式
- 让平台将设备上报数据推送给应用服务器,由应用服务器进行保存
- 让平台将设备上报数据转发给对象存储服务(OBS),由OBS进行存储
对象存储服务(Object Storage Service,OBS)是一个基于对象的海量存储服务,为客户提供海量、安全、高可靠、低成本的数据存储能力。
本文主要介绍第二种方式 —— 使用华为云OBS服务转存数据
实验讲解
创建产品
要在华为云IoT平台上进行开发,第一步要做的就是定义产品模型。大致步骤如下
- 需求分析
- 定义服务
- 定义属性
- 定义命令
具体的可以看我这篇博客:https://bbs.huaweicloud.com/blogs/381937
已经写的很详细了,就不再赘述了。下面的实验均基于上述产品定义完成的基础上
注册设备
首先,进入华为云官网 https://www.huaweicloud.com/
依次点击:产品 —> IoT物联网 —> 设备接入IoTDA
点击 免费试用
设备 —> 所有设备 —> 注册设备
填写如下内容。设备认证类型选择:密钥(输入你的密码)
对话框中有一个Topic链接,导向华为云的帮助文档。地址为:https://support.huaweicloud.com/api-iothub/iot_06_v5_3004.html
Topic指的是:设备使用MQTT协议接入平台时,平台和设备通过Topic进行通信。
确定后,会弹窗提示设备创建成功
点击保存并关闭后,会自动下载一个文件:DEVICES-KEY
DEVICES-KEY 的文件内容如下
{
"device_id": "635d15a8222a8601d402e79c_air-conditioning",
"secret": "(省略密码)"
}
刚创建好的设备还是“未激活”状态,需要真实连接上华为云IoT平台后,该状态才会改变
编写应用程序代码(Python语言)
安装依赖
Python开发MQTT应用程序比较通用的项目是:paho-mqtt
。
PIPY官网地址为:https://pypi.org/project/paho-mqtt/
试用如下命令安装依赖
pip install paho-mqtt
基础代码模板
发布客户端 pub.py
:
import paho.mqtt.client as mqtt
def on_connect(client, userdata, flags, reason_code, properties):
print("Connected with result code: " + str(reason_code))
client.publish('IoT平台的Topic', payload='报文内容', qos=0)
def on_message(client, userdata, msg):
print(msg.topic + " " + str(msg.payload))
client = mqtt.Client(client_id='注册的客户端端ID',
clean_session=True,
userdata=None,
protocol=mqtt.MQTTv311,
transport="tcp",
callback_api_version=mqtt.CallbackAPIVersion.VERSION2)
client.on_connect = on_connect
client.on_message = on_message
client.username_pw_set('用户名', '密码')
client.connect('IoT平台地址', 1883, 600) # 600为keepalive的时间间隔
client.loop_forever()
接受客户端 sub.py
:
import paho.mqtt.client as mqtt
def on_connect(client, userdata, flags, reason_code, properties):
print("Connected with result code: " + str(reason_code))
client.subscribe('IoT平台的Topic', qos=0)
def on_message(client, userdata, msg):
print(msg.topic + " " + str(msg.payload))
client = mqtt.Client(client_id='注册的客户端端ID',
clean_session=True,
userdata=None,
protocol=mqtt.MQTTv311,
transport="tcp",
callback_api_version=mqtt.CallbackAPIVersion.VERSION2)
client.on_connect = on_connect
client.on_message = on_message
client.username_pw_set('用户名', '密码')
client.connect('IoT平台地址', 1883, 600) # 600为keepalive的时间间隔
client.loop_forever() # 保持连接
接入地址
其中接入地址位置:IoT平台 —> 总览 —> 接入信息
我这里的“接入地址”是
f5e4964ab8.iot-mqtts.cn-north-4.myhuaweicloud.com
Topic
首先点击产品,进入你的产品
切换到 “Topic管理”页面
我们这里选择“设备上报属性数据”这条Topic(发布者:设备 | 订阅者:平台)
$oc/devices/{device_id}/sys/properties/report
设备ID
设备ID在:设备 —> 所有设备 —> 设备列表
我这里的“设备ID”是
635d15a8222a8601d402e79c_air-conditioning
MQTT 连接参数
依次点击:设备 —> 所有设备 —> 设备详情 —> MQTT连接参数
记录如下信息:
- client_id:635d15a8222a8601d402e79c_air-conditioning_0_0_2024041701
- username:635d15a8222a8601d402e79c_air-conditioning
- password:(省略密码)
- hostname:f5e4964ab8.iot-mqtts.cn-north-4.myhuaweicloud.com
根据实际信息完善代码
这里以 pub.py
为例,上报 DeviceInfo
属性
这里可以使用虚拟设备在线调试,然后复制其报文,再贴入代码中
这里的body信息如下
{ "Manufacturer": "美的", "ProductionDate": { "begin": "2024-05-16T12:48:17.000Z", "end": "2024-05-16T12:48:22.000Z" }, "IndoorUnitModel": "EA35", "OutdoorUnitModel": "MW12345", "FluorinationPercent": 89, "DustPercent": 14, "ErrorCode": "0" }
通过源码可以看到 Payload 是一个自定义类型
它是一个联合类型,可以是如下几种类型之一
- str
- bytes
- bytearray
- int
- float
- None
因此需要将上述body的报文转成JSON字符串来发送
最终的代码如下
import paho.mqtt.client as mqtt
import json
host = 'f5e4964ab8.iot-mqtts.cn-north-4.myhuaweicloud.com'
port = 1883
username = '635d15a8222a8601d402e79c_air-conditioning'
password = '(密码略)'
client_id = '635d15a8222a8601d402e79c_air-conditioning_0_0_2024041701'
device_id = '635d15a8222a8601d402e79c_air-conditioning'
topic = f'$oc/devices/{device_id}/sys/properties/report'
payload = { "Manufacturer": "美的", "ProductionDate": { "begin": "2024-05-16T12:48:17.000Z", "end": "2024-05-16T12:48:22.000Z" }, "IndoorUnitModel": "EA35", "OutdoorUnitModel": "MW12345", "FluorinationPercent": 89, "DustPercent": 14, "ErrorCode": "0" }
payloadstr = json.dumps(payload)
print(f'topic = {topic}')
print(f'payloadstr = {payloadstr}')
def on_connect(client, userdata, flags, reason_code, properties):
print("Connected with result code: " + str(reason_code))
client.publish(topic, payload=payloadstr, qos=0)
def on_message(client, userdata, msg):
print(msg.topic + " " + str(msg.payload))
client = mqtt.Client(client_id=client_id,
clean_session=True,
userdata=None,
protocol=mqtt.MQTTv311,
transport="tcp",
callback_api_version=mqtt.CallbackAPIVersion.VERSION2)
client.on_connect = on_connect
client.on_message = on_message
client.username_pw_set(username=username, password=password)
client.connect(host, port, 60) # 600为keepalive的时间间隔
client.loop_forever()
点击在线调试,选择相应的设备
点击绿色的三角形运行代码,可以看到已经连接成功了
可以在消息跟踪这里看到对应的日志记录
同时,在线调试这里,也可以看到 鉴权、数据流转、属性上报 的调试信息
购买OBS
OBS的费用说明
对象存储服务OBS的定价表如下,本次实验所涉及的资费项,主要有以下两个
- 存储费用:实验中会选择“低频访问存储数据”这块所占用的空间是需要付费的
- 流量费用:IoT平台收到数据后,转发到OBS属于“内/公网流入流量”这块是免费的
购买OBS套餐
来到华为云OBS主页,点击购买
https://www.huaweicloud.com/product/obs.html
选择“标准存储单AZ”或者“标准存储多AZ”都行,很便宜,一个月才1块钱,就能存40GB的数据了。但是注意:资源包不能抵扣已产生的用量,已购买成功的资源包不支持退订。
挑选完毕后,点击右下角的“加入清单”,然后点击“立即购买”
这里扫码付款即可
创建桶的方式如下,首先进入“桶列表”,然后点击“创建桶”
依次选择/填写
- 区域
- 桶名称(注意:桶名称仅支持小写字母、数字、中划线、英文点号)
- 数据冗余存储策略
- 默认存储类别
然后点击立即创建
创建好后,会自动返回桶列表。然后就可以看到刚刚创建的桶了
配置转发规则
回到IoT设备接入页面,在左侧点击 规则 —> 数据转发 —> 规则列表,点击“创建规则”按钮
新建数据转发规则主要有以下3步骤
- 设置转发数据:针对部分类型数据提供的快速配置,将引导您完成简单的业务设置。您也可以直接编辑过滤语句,实现更复杂的查询要求
总共支持以下几种数据来源
- 设备
- 设备属性
- 设备消息
- 设备消息状态
- 设备状态
- 批量任务
- 产品
- 设备异步命令状态
我们这里要做的是当接收到设备端(或模拟器)的数据上报时,转发到OBS,因此这里数据来源选择“设备属性”,触发事件选择属性上报。都设置完后,点击“创建规则”
- 设置转发目标:您可以设置将数据转发至华为云其他服务(如OBS)或私有服务器
最多支持添加10个转发目标,而且支持跨区域转发(例如这里IoT平台位于北京四,但是你的转发目标可以设置为广州的OBS)
首次使用需要进行“访问授权”,直接同意即可,非常方便
下面配置存储OBS的桶、数据存储的目录、文件名。文件类型用JSON或者CSV均可
这里已经设置完成了
- 启动规则:完成完整的规则定义后,您就可以控制规则的运行,以实现数据转发
现在这条规则还未被启动,点击右上角的开关按钮启动规则即可
切换开关后,这个规则就已经启动了
回到规则列表也,就可以看到刚刚新增的这条规则了
测试转发到OBS
进入 监控运维 —> 消息跟踪 页面,在“设备列表”中选中你的设备。然后点击右上角的“清除数据”(方便后续调试、查看日志)
切换到“在线调试”页面,选择我们的设备
回到python代码,执行该代码
可以看到这里已经收到“属性上报”,并且触发了“流转规则”
这里虽然触发了“流转规则”但是执行失败了
这是点击“详情”弹出的对话框
这是点击“定位建议”弹出的对话框
建议是:请您检查触发属性上报的设备对应产品的产品模型定义中是否包含您所上报数据中的服务ID
从他的建议中,可以发现应该是我们上送的报文出错了。我们可以使用模拟器发送同样的指令,来看看具体报文该怎么写的
在线调试页面,选择同一个产品中的“虚拟设备”(如果没有就创建一个)
切换到设备模拟器,选择对应的服务,填写数据完成后,点击发送
我这里之前创建的虚拟设备被冻结了,因此需要解冻设备
在设备列表中选择该设备,点击操作中的解冻即可
从新选择设备,发送命令。可以看到,这次设备模拟器发送成功了,应用模拟器也接收成功了
从“消息跟踪”这里可以看到,模拟器上报的属性,触发对象存储服务(OBS)流转成功
切换到 obs桶列表 —> 对象,可以看到数据创建成功
点击查看文件信息,默认选项卡是“对象ACL”
“元数据”如下
从消息跟踪里找到模拟器上报属性的日志
点击“详情”,复制data里的内容
{"services":[{"properties":{"Manufacturer":"美的","ProductionDate":{"begin":"2024-04-01T15:21:21.000Z","end":"2024-05-01T15:21:23.000Z"},"IndoorUnitModel":"ABC","OutdoorUnitModel":"XYZ","FluorinationPercent":93,"DustPercent":24,"ErrorCode":"-1"},"service_id":"DeviceInfo","event_time":null}]}
使用在线格式化后的JSON如下,工具地址:https://www.json.cn/
修正后的载荷如下
time_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
payload = {"services":[{
"properties":{
"Manufacturer":"美的",
"ProductionDate": {
"begin":"2024-04-01T15:21:21.000Z",
"end":"2024-05-01T15:21:23.000Z"
},
"IndoorUnitModel":"ABC",
"OutdoorUnitModel":"XYZ",
"FluorinationPercent":93,
"DustPercent":24,
"ErrorCode":"-1"
},
"service_id":"DeviceInfo",
"event_time":time_str
}]}
再次执行代码
可以看到“消息跟踪”里,python代码上报的属性,也成功触发了OBS转发规则了(两者时间一致)
再次查看OBS对象记录,可以看到大小比刚刚增加了一倍(也就是说二次提交成功),并且最后修改时间也跟上面python脚本执行时间对上了
注意:帮助文档里有说**“华为云对象存储服务OBS禁止通过OBS的默认域名(桶访问域名或静态网站访问域名)在线预览桶内对象,因此如果你想查看OBS文件的内容,就需要下载到本地进行查看了(不过下载需要支付流量费)”**
地址如下:https://support.huaweicloud.com/obs_faq/obs_03_0087.html
最后附上最终的python代码(经过修订载荷后成功跑通的代码)
# -*- coding: utf-8 -*-
"""
Created on Tue Apr 16 20:31:23 2024
@author: Administrator
"""
import json
from datetime import datetime
import paho.mqtt.client as mqtt
host = 'f5e4964ab8.iot-mqtts.cn-north-4.myhuaweicloud.com'
port = 1883
username = '635d15a8222a8601d402e79c_air-conditioning'
password = '(略)'
client_id = '635d15a8222a8601d402e79c_air-conditioning_0_0_2024041701'
device_id = '635d15a8222a8601d402e79c_air-conditioning'
topic = f'$oc/devices/{device_id}/sys/properties/report'
time_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
payload = {"services":[{
"properties":{
"Manufacturer":"美的",
"ProductionDate": {
"begin":"2024-04-01T15:21:21.000Z",
"end":"2024-05-01T15:21:23.000Z"
},
"IndoorUnitModel":"ABC",
"OutdoorUnitModel":"XYZ",
"FluorinationPercent":93,
"DustPercent":24,
"ErrorCode":"-1"
},
"service_id":"DeviceInfo",
"event_time":time_str
}]}
payloadstr = json.dumps(payload)
print(f'topic = {topic}')
print(f'payloadstr = {payloadstr}')
def on_connect(client, userdata, flags, reason_code, properties):
print("Connected with result code: " + str(reason_code))
client.publish(topic, payload=payloadstr, qos=0)
print('property publish finished!')
def on_message(client, userdata, msg):
print(msg.topic + " " + str(msg.payload))
client = mqtt.Client(client_id=client_id,
clean_session=True,
userdata=None,
protocol=mqtt.MQTTv311,
transport="tcp",
callback_api_version=mqtt.CallbackAPIVersion.VERSION2)
client.on_connect = on_connect
client.on_message = on_message
client.username_pw_set(username=username, password=password)
client.connect(host, port, 60) # 600为keepalive的时间间隔
client.loop_forever()
释放资源
- 停止/删除转发规则
如果后续还要用到这个规则,可以先停止该规则(否则直接删除即可)
停止后,状态会变为“未启动”
- 删除OBS桶
使用完后,建议删除OBS桶(虽然OBS桶是免费的,但是里面存东西需要付费)
结束语
通过这篇文章,可以看出,华为云IoT平台的功能基本是涵盖了IoT开发的方方面面,基本所有你能想到的,平台上都给出了一整套完整的解决方案,熟练掌握该平台,可以方便你今后快速开发迭代IoT相关的应用程序。
上面的例子中,还展示了一个实际生产过程中可能遇到的报文数据错误的问题,并演示了如何通过IoT平台的“定位建议”功能,定位到该问题,并展示了一个快速的解决问题的方案 —— 通过模拟器发送指令,并核对两者日志的差异,最终完美并快速的解决问题。
我正在参加【有奖征文 第29期】零代码零硬件玩转华为云IoT物联网平台多场景
链接:https://bbs.huaweicloud.com/blogs/423245
- 点赞
- 收藏
- 关注作者
评论(0)