IoT开发,将数据转发至OBS长期储存【零代码零硬件玩转华为云IoT】

举报
福州司马懿 发表于 2024/04/17 12:04:39 2024/04/17
【摘要】 本文将使用 Python 编写一个MQTT应用程序,通过该程序上报数据至华为云IoT平台。华为云IoT平台收到数据后,根据预先配置的数据转发规则,将数据转存至对象存储服务(OBS)。

文章摘要

本文将使用 Python 编写一个MQTT应用程序,通过该程序上报数据至华为云IoT平台。华为云IoT平台收到数据后,根据预先配置的数据转发规则,将数据转存至对象存储服务(OBS)。

图片.png

理论介绍

对于设备上报的数据,有两种方式

  1. 让平台将设备上报数据推送给应用服务器,由应用服务器进行保存
  2. 让平台将设备上报数据转发给对象存储服务(OBS),由OBS进行存储

对象存储服务(Object Storage Service,OBS)是一个基于对象的海量存储服务,为客户提供海量、安全、高可靠、低成本的数据存储能力。

本文主要介绍第二种方式 —— 使用华为云OBS服务转存数据

实验讲解

创建产品

要在华为云IoT平台上进行开发,第一步要做的就是定义产品模型。大致步骤如下

  • 需求分析
  • 定义服务
  • 定义属性
  • 定义命令

具体的可以看我这篇博客:https://bbs.huaweicloud.com/blogs/381937

已经写的很详细了,就不再赘述了。下面的实验均基于上述产品定义完成的基础上

注册设备

首先,进入华为云官网 https://www.huaweicloud.com/

依次点击:产品 —> IoT物联网 —> 设备接入IoTDA

图片.png

点击 免费试用

图片.png

设备 —> 所有设备 —> 注册设备

图片.png

填写如下内容。设备认证类型选择:密钥(输入你的密码)

图片.png

对话框中有一个Topic链接,导向华为云的帮助文档。地址为:https://support.huaweicloud.com/api-iothub/iot_06_v5_3004.html

Topic指的是:设备使用MQTT协议接入平台时,平台和设备通过Topic进行通信。

图片.png

确定后,会弹窗提示设备创建成功

图片.png

点击保存并关闭后,会自动下载一个文件:DEVICES-KEY

图片.png

DEVICES-KEY 的文件内容如下

{
    "device_id": "635d15a8222a8601d402e79c_air-conditioning",
    "secret": "(省略密码)"
}

刚创建好的设备还是“未激活”状态,需要真实连接上华为云IoT平台后,该状态才会改变

图片.png

编写应用程序代码(Python语言)

安装依赖

Python开发MQTT应用程序比较通用的项目是:paho-mqtt

PIPY官网地址为:https://pypi.org/project/paho-mqtt/

图片.png

试用如下命令安装依赖

pip install paho-mqtt

图片.png

基础代码模板

发布客户端 pub.py

import paho.mqtt.client as mqtt

def on_connect(client, userdata, flags, reason_code, properties):
    print("Connected with result code: " + str(reason_code))
    client.publish('IoT平台的Topic', payload='报文内容', qos=0)

def on_message(client, userdata, msg):
    print(msg.topic + " " + str(msg.payload))

client = mqtt.Client(client_id='注册的客户端端ID', 
                     clean_session=True, 
                     userdata=None, 
                     protocol=mqtt.MQTTv311, 
                     transport="tcp", 
                     callback_api_version=mqtt.CallbackAPIVersion.VERSION2)
client.on_connect = on_connect
client.on_message = on_message
client.username_pw_set('用户名', '密码')
client.connect('IoT平台地址', 1883, 600) # 600为keepalive的时间间隔
client.loop_forever()

接受客户端 sub.py

import paho.mqtt.client as mqtt

def on_connect(client, userdata, flags, reason_code, properties):
    print("Connected with result code: " + str(reason_code))
    client.subscribe('IoT平台的Topic', qos=0)

def on_message(client, userdata, msg):
    print(msg.topic + " " + str(msg.payload))

client = mqtt.Client(client_id='注册的客户端端ID', 
                     clean_session=True, 
                     userdata=None, 
                     protocol=mqtt.MQTTv311, 
                     transport="tcp", 
                     callback_api_version=mqtt.CallbackAPIVersion.VERSION2)
client.on_connect = on_connect
client.on_message = on_message
client.username_pw_set('用户名', '密码')
client.connect('IoT平台地址', 1883, 600) # 600为keepalive的时间间隔
client.loop_forever() # 保持连接

接入地址

其中接入地址位置:IoT平台 —> 总览 —> 接入信息

我这里的“接入地址”是

f5e4964ab8.iot-mqtts.cn-north-4.myhuaweicloud.com

图片.png

Topic

首先点击产品,进入你的产品

图片.png

切换到 “Topic管理”页面

图片.png

我们这里选择“设备上报属性数据”这条Topic(发布者:设备 | 订阅者:平台)

$oc/devices/{device_id}/sys/properties/report

设备ID

设备ID在:设备 —> 所有设备 —> 设备列表

我这里的“设备ID”是

635d15a8222a8601d402e79c_air-conditioning

图片.png

MQTT 连接参数

依次点击:设备 —> 所有设备 —> 设备详情 —> MQTT连接参数

图片.png

图片.png

记录如下信息:

根据实际信息完善代码

这里以 pub.py 为例,上报 DeviceInfo 属性

图片.png

这里可以使用虚拟设备在线调试,然后复制其报文,再贴入代码中

图片.png

这里的body信息如下

{ "Manufacturer": "美的", "ProductionDate": { "begin": "2024-05-16T12:48:17.000Z", "end": "2024-05-16T12:48:22.000Z" }, "IndoorUnitModel": "EA35", "OutdoorUnitModel": "MW12345", "FluorinationPercent": 89, "DustPercent": 14, "ErrorCode": "0" }

通过源码可以看到 Payload 是一个自定义类型

图片.png

它是一个联合类型,可以是如下几种类型之一

  • str
  • bytes
  • bytearray
  • int
  • float
  • None

图片.png

因此需要将上述body的报文转成JSON字符串来发送

最终的代码如下

import paho.mqtt.client as mqtt
import json

host = 'f5e4964ab8.iot-mqtts.cn-north-4.myhuaweicloud.com'
port = 1883
username = '635d15a8222a8601d402e79c_air-conditioning'
password = '(密码略)'
client_id = '635d15a8222a8601d402e79c_air-conditioning_0_0_2024041701'
device_id = '635d15a8222a8601d402e79c_air-conditioning'
topic = f'$oc/devices/{device_id}/sys/properties/report'
payload = { "Manufacturer": "美的", "ProductionDate": { "begin": "2024-05-16T12:48:17.000Z", "end": "2024-05-16T12:48:22.000Z" }, "IndoorUnitModel": "EA35", "OutdoorUnitModel": "MW12345", "FluorinationPercent": 89, "DustPercent": 14, "ErrorCode": "0" }
payloadstr = json.dumps(payload)
print(f'topic = {topic}')
print(f'payloadstr = {payloadstr}')

def on_connect(client, userdata, flags, reason_code, properties):
    print("Connected with result code: " + str(reason_code))
    
    client.publish(topic, payload=payloadstr, qos=0)

def on_message(client, userdata, msg):
    print(msg.topic + " " + str(msg.payload))

client = mqtt.Client(client_id=client_id, 
                     clean_session=True, 
                     userdata=None, 
                     protocol=mqtt.MQTTv311, 
                     transport="tcp", 
                     callback_api_version=mqtt.CallbackAPIVersion.VERSION2)

client.on_connect = on_connect
client.on_message = on_message



client.username_pw_set(username=username, password=password)
client.connect(host, port, 60) # 600为keepalive的时间间隔

client.loop_forever()

点击在线调试,选择相应的设备

图片.png

点击绿色的三角形运行代码,可以看到已经连接成功了

图片.png

可以在消息跟踪这里看到对应的日志记录

图片.png

图片.png

同时,在线调试这里,也可以看到 鉴权、数据流转、属性上报 的调试信息

图片.png

购买OBS

OBS的费用说明

对象存储服务OBS的定价表如下,本次实验所涉及的资费项,主要有以下两个

  1. 存储费用:实验中会选择“低频访问存储数据”这块所占用的空间是需要付费的
  2. 流量费用:IoT平台收到数据后,转发到OBS属于“内/公网流入流量”这块是免费的

图片.png

购买OBS套餐

来到华为云OBS主页,点击购买

https://www.huaweicloud.com/product/obs.html

图片.png

选择“标准存储单AZ”或者“标准存储多AZ”都行,很便宜,一个月才1块钱,就能存40GB的数据了。但是注意:资源包不能抵扣已产生的用量,已购买成功的资源包不支持退订。

图片.png

图片.png

挑选完毕后,点击右下角的“加入清单”,然后点击“立即购买”

图片.png

这里扫码付款即可

图片.png

图片.png

创建桶的方式如下,首先进入“桶列表”,然后点击“创建桶”

图片.png

依次选择/填写

  • 区域
  • 桶名称(注意:桶名称仅支持小写字母、数字、中划线、英文点号)
  • 数据冗余存储策略
  • 默认存储类别

图片.png

然后点击立即创建

图片.png

创建好后,会自动返回桶列表。然后就可以看到刚刚创建的桶了

图片.png

配置转发规则

回到IoT设备接入页面,在左侧点击 规则 —> 数据转发 —> 规则列表,点击“创建规则”按钮

图片.png

新建数据转发规则主要有以下3步骤

  1. 设置转发数据:针对部分类型数据提供的快速配置,将引导您完成简单的业务设置。您也可以直接编辑过滤语句,实现更复杂的查询要求

总共支持以下几种数据来源

  • 设备
  • 设备属性
  • 设备消息
  • 设备消息状态
  • 设备状态
  • 批量任务
  • 产品
  • 设备异步命令状态

图片.png

我们这里要做的是当接收到设备端(或模拟器)的数据上报时,转发到OBS,因此这里数据来源选择“设备属性”,触发事件选择属性上报。都设置完后,点击“创建规则”

图片.png

  1. 设置转发目标:您可以设置将数据转发至华为云其他服务(如OBS)或私有服务器

最多支持添加10个转发目标,而且支持跨区域转发(例如这里IoT平台位于北京四,但是你的转发目标可以设置为广州的OBS)

图片.png

首次使用需要进行“访问授权”,直接同意即可,非常方便

图片.png

图片.png

下面配置存储OBS的桶、数据存储的目录、文件名。文件类型用JSON或者CSV均可

图片.png

这里已经设置完成了

图片.png

  1. 启动规则:完成完整的规则定义后,您就可以控制规则的运行,以实现数据转发

现在这条规则还未被启动,点击右上角的开关按钮启动规则即可

图片.png

切换开关后,这个规则就已经启动了

图片.png

回到规则列表也,就可以看到刚刚新增的这条规则了

图片.png

测试转发到OBS

进入 监控运维 —> 消息跟踪 页面,在“设备列表”中选中你的设备。然后点击右上角的“清除数据”(方便后续调试、查看日志)

图片.png

切换到“在线调试”页面,选择我们的设备

图片.png

回到python代码,执行该代码

图片.png

可以看到这里已经收到“属性上报”,并且触发了“流转规则”

图片.png

这里虽然触发了“流转规则”但是执行失败了

图片.png

这是点击“详情”弹出的对话框

图片.png

这是点击“定位建议”弹出的对话框

图片.png

建议是:请您检查触发属性上报的设备对应产品的产品模型定义中是否包含您所上报数据中的服务ID

从他的建议中,可以发现应该是我们上送的报文出错了。我们可以使用模拟器发送同样的指令,来看看具体报文该怎么写的

在线调试页面,选择同一个产品中的“虚拟设备”(如果没有就创建一个)

图片.png

切换到设备模拟器,选择对应的服务,填写数据完成后,点击发送

图片.png

我这里之前创建的虚拟设备被冻结了,因此需要解冻设备

图片.png

在设备列表中选择该设备,点击操作中的解冻即可

图片.png

从新选择设备,发送命令。可以看到,这次设备模拟器发送成功了,应用模拟器也接收成功了

图片.png

从“消息跟踪”这里可以看到,模拟器上报的属性,触发对象存储服务(OBS)流转成功

图片.png

切换到 obs桶列表 —> 对象,可以看到数据创建成功

图片.png

点击查看文件信息,默认选项卡是“对象ACL”

图片.png

“元数据”如下

图片.png

从消息跟踪里找到模拟器上报属性的日志

图片.png

点击“详情”,复制data里的内容

{"services":[{"properties":{"Manufacturer":"美的","ProductionDate":{"begin":"2024-04-01T15:21:21.000Z","end":"2024-05-01T15:21:23.000Z"},"IndoorUnitModel":"ABC","OutdoorUnitModel":"XYZ","FluorinationPercent":93,"DustPercent":24,"ErrorCode":"-1"},"service_id":"DeviceInfo","event_time":null}]}

图片.png

使用在线格式化后的JSON如下,工具地址:https://www.json.cn/

图片.png

修正后的载荷如下

time_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
payload = {"services":[{
    "properties":{
        "Manufacturer":"美的",
        "ProductionDate": {
            "begin":"2024-04-01T15:21:21.000Z",
            "end":"2024-05-01T15:21:23.000Z"
        },
        "IndoorUnitModel":"ABC",
        "OutdoorUnitModel":"XYZ",
        "FluorinationPercent":93,
        "DustPercent":24,
        "ErrorCode":"-1"
    },
    "service_id":"DeviceInfo",
    "event_time":time_str
    }]}

再次执行代码

图片.png

可以看到“消息跟踪”里,python代码上报的属性,也成功触发了OBS转发规则了(两者时间一致)

图片.png

再次查看OBS对象记录,可以看到大小比刚刚增加了一倍(也就是说二次提交成功),并且最后修改时间也跟上面python脚本执行时间对上了

图片.png

注意:帮助文档里有说**“华为云对象存储服务OBS禁止通过OBS的默认域名(桶访问域名或静态网站访问域名)在线预览桶内对象,因此如果你想查看OBS文件的内容,就需要下载到本地进行查看了(不过下载需要支付流量费)”**

地址如下:https://support.huaweicloud.com/obs_faq/obs_03_0087.html

图片.png

最后附上最终的python代码(经过修订载荷后成功跑通的代码)

# -*- coding: utf-8 -*-
"""
Created on Tue Apr 16 20:31:23 2024
@author: Administrator
"""

import json
from datetime import datetime
import paho.mqtt.client as mqtt

host = 'f5e4964ab8.iot-mqtts.cn-north-4.myhuaweicloud.com'
port = 1883
username = '635d15a8222a8601d402e79c_air-conditioning'
password = '(略)'
client_id = '635d15a8222a8601d402e79c_air-conditioning_0_0_2024041701'
device_id = '635d15a8222a8601d402e79c_air-conditioning'
topic = f'$oc/devices/{device_id}/sys/properties/report'

time_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
payload = {"services":[{
    "properties":{
        "Manufacturer":"美的",
        "ProductionDate": {
            "begin":"2024-04-01T15:21:21.000Z",
            "end":"2024-05-01T15:21:23.000Z"
        },
        "IndoorUnitModel":"ABC",
        "OutdoorUnitModel":"XYZ",
        "FluorinationPercent":93,
        "DustPercent":24,
        "ErrorCode":"-1"
    },
    "service_id":"DeviceInfo",
    "event_time":time_str
    }]}
payloadstr = json.dumps(payload)
print(f'topic = {topic}')
print(f'payloadstr = {payloadstr}')

def on_connect(client, userdata, flags, reason_code, properties):
    print("Connected with result code: " + str(reason_code))
    client.publish(topic, payload=payloadstr, qos=0)
    print('property publish finished!')

def on_message(client, userdata, msg):
    print(msg.topic + " " + str(msg.payload))

client = mqtt.Client(client_id=client_id, 
                     clean_session=True, 
                     userdata=None, 
                     protocol=mqtt.MQTTv311, 
                     transport="tcp", 
                     callback_api_version=mqtt.CallbackAPIVersion.VERSION2)

client.on_connect = on_connect
client.on_message = on_message

client.username_pw_set(username=username, password=password)
client.connect(host, port, 60) # 600为keepalive的时间间隔

client.loop_forever()

释放资源

  1. 停止/删除转发规则

图片.png

如果后续还要用到这个规则,可以先停止该规则(否则直接删除即可)

图片.png

停止后,状态会变为“未启动”

图片.png

  1. 删除OBS桶

使用完后,建议删除OBS桶(虽然OBS桶是免费的,但是里面存东西需要付费)

图片.png

结束语

通过这篇文章,可以看出,华为云IoT平台的功能基本是涵盖了IoT开发的方方面面,基本所有你能想到的,平台上都给出了一整套完整的解决方案,熟练掌握该平台,可以方便你今后快速开发迭代IoT相关的应用程序。

上面的例子中,还展示了一个实际生产过程中可能遇到的报文数据错误的问题,并演示了如何通过IoT平台的“定位建议”功能,定位到该问题,并展示了一个快速的解决问题的方案 —— 通过模拟器发送指令,并核对两者日志的差异,最终完美并快速的解决问题。

我正在参加【有奖征文 第29期】零代码零硬件玩转华为云IoT物联网平台多场景
链接:https://bbs.huaweicloud.com/blogs/423245

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。