【华为云IoTEdge开发实战】Java开发如何通过集成IoT边缘ModuleSDK进行数据处理
前面已经介绍了Java如何集成IoT边缘ModuleSDK,现在我们来看看如何通过Java开发集成IoT边缘ModuleSDK完成数据处理场景。
进行实操前,请先下载demo,解压并导入示例工程。
- dc-driver: 此模块主要演示ot数采集成。
- erp-integration:此模块主要演示It集成服务。
- modbus-driver:此模块主要演示协议转换。
- monitor-app:此模块主要演示数据处理。
协议转换Demo,解压并导入示例工程。
操作场景
开发应用集成ModuleSDK进行数据处理。
在节点接入一个电机设备,设备遇到问题上报信息“error”给节点,节点监听到设备的“error”信息,下发命令让设备进行重启。
示例工程为monitor-app。
代码解析
代码解释使用ModuleSDK开发应用集成ModuleSDK进行数据处理。
AppClient类有以下几个关键方法(具体参考JavaDoc)。
- createFromEnv(): AppClient创建时由此方法自动获取环境变量。
- setBusMessageCallback(): 设置总线消息回调,用于对设备上报的数据进行处理
- sendBusMessage(): 向总线发送消息,用于将处理后的设备数据发送到总线
- callDeviceCommand(): 调用设备命令
- getDevicesInfo(): 查询设备状态
MonitorApp代码解析
片段一
privatestaticfinalString INPUT = "input";
public static final String OUTPUT = "output";
定义输入和输出的端点,关于取值需要需在创建应用版本的inputs参数中定义,创建应用时输入端点与输出端点以及数据流转规则的配置与此是对应的。例如此处定义了输入端点为“input”,输出端点为“output”,则创建应用时的端点和软件配置输入端点需要配置为input,输出端点需要配置为output。
应用部署后还需要设置数据流转规则后,决定数据的流向。
/** * 电机设备的产品ID */ public static final String MOTOR_PRODUCT_ID = "60988d94aa3bcc02c0200667";
单击设备的产品ID,需要在IoTDA设备接入创建产品时获取。
片段二
private AppClient appClient;
publicMonitorApp() throws GeneraException {
appClient = AppClient.createFromEnv();
}
定义并创建AppClient, AppClient.createFromEnv()创建其配置参数将会自动从边缘节点环境中获取,数据的传输将会依赖AppClient。
片段三
publicvoidstart() throws GeneraException {
//设置回调,打开客户端
appClient.setBusMessageCallback(INPUT, this);//设置收到设备数据的回调
appClient.open();
}
appClient在接收到数据后的处理动作需要用户定义,具体操作是设置回调。 这里设置回调方法并传入输入端点后,appClient将会开启并启动一个监听器监听输入端点的数据传输,接收到设备经过hub发来的数据后会调用回调进行数据处理。
片段四
public void onMessageReceived(BusMessage busMessage) {
try {
if (busMessage.getProductId().equals(MOTOR_PRODUCT_ID)) {
//马达设备状态错误时对马达进行重启
MotorData motorData = JsonUtil.fromJson(
JsonUtil.toJson(busMessage.getServices().get(0).getProperties()), MotorData.class);
if (motorData.getStatus().equals("error")) {
Command command = new Command(busMessage.getDeviceId(),"power", "power_control", "restart"
);
appClient.callDeviceCommand(command, FIVE_SECOND);
}
} else {
//其他设备数据发布到总线
appClient.sendBusMessage(OUTPUT, busMessage);
}
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
onMessageReceived(BusMessage busMessage)是前面设置回调具体的回调函数,它是BusMessageCallback接口的方法,MonitorApp要实现BusMessageCallback接口并实现此方法,AppClient通过设置的输入端口input监听到设备发送数据时调用此方法进行数据处理,用户关于设备发送的数据的处理逻辑在此方法内实现,处理后的数据通过设置的输出端口output发送经过hub发送到云端。
appClient.callDeviceCommand(command, FIVE_SECOND)是示例演示应用在接受到设备传来的error信号后,向设备发送秒后重启命令。
注册节点
注册节点请参照注册边缘节点。
创建产品
创建产品具体教程参照创建产品_设备接入 IoTD ,以下是具体配置中的参照。
1.创建产品
说明:
将修改代码里代码的产品ID复制到高级设置>定制ProjectID,自定义ID后可省去修改代码步骤。
2.在新建产品后需要在产品页的模型定义中添加服务。
3.新增属性
4.新增命令
5.单击确定完成创建
修改代码
查看所创建产品的id,查看方式:IoTDA->产品列表。
根据id修改代码。
/** * 电机设备的产品ID */ public static final String MOTOR_PRODUCT_ID = "60988d94aa3bcc02c0200667";
项目打包
打包参考项目打包
将monitor-app进行打包得到monitor-app.jar。
制作镜像包
将jar文件打包成镜像文件上,请参照制作镜像包或插件包。
创建应用
以容器镜像方式为例,镜像包上传到容器镜像服务SWR后,创建应用。
1.在IoT边缘单击创建应用
2.添加边缘应用-软件和运行配置
3.添加边缘应用-端点和部署配置
4.勾选立即发布并单击确定完成创建。
部署应用
1.部署应用,具体参考部署应用。
注意:
IT应用需要依赖APIGW,在部署ITy应用之前,请先部署系统应用$sys_edge_apigw。
2.添加流转规则
说明:
流转规则是非必选的,OT应用需要添加数据流转规则。驱动应用和IT应用不用添加。
添加边缘设备
添加子设备请参照设备接入,以下是添加边缘设备(MQTT设备)配置时的参考:
记住设备ID和密码,用于设备接入平台认证 。
设备接入
使用MQTT.fx模拟设备接入。
1.下载fx及证书,MQTT.fx下载地址/证书下载地址。
安装完成后打开,MQTT.fx软件界面如下:
注意:
Connect左边的蓝色齿轮为设置。
Publish是消息发送,Subscribe为消息接受,Log可查看日志。
2.单击设置-General,输入以下信息
Broker Address:输入节点的公网地址。
Broker Port: MQTTS协议使用的端口,默认为7883。
尝试连接时间和保持连接时间等自定义。
3.单击设置-User Credentials
Client ID和密码需要工具进行转换。利用网页转换工具进行转换。
填写添加设备(IoT边缘)后生成的设备ID和设备密钥,生成连接信息(ClientId、Username、Password)。
4.单击设置-SSL/TLS
勾选Enable SSL/TLS,单击CA certificate,选择下载的证书文件。
单击Apply应用设置后返回。
单击Connect连接,连接成功后右边红点会变成绿色,IoTDA也会显示在线。
5.选择publish输入topic地址。
Topic:$oc/devices/hwiotedgedevice2/sys/properties/report
其中,hwiotedgedevice2为设备ID,请替换为实际值,可在IoTDA->产品管理中查看。消息体输入:
{
"services": [{
"service_id": "power",
"properties": {
"status":"error" },
"event_time": "20210508T173342Z"
} ]
}
6.输入订阅的topic,可在IoTDA->产品->topic管理中查看。
opic:$oc/devices/hw_iotedge_device2/sys/commands/#
其中,hw_iotedge_device2为设备ID,请替换为实际值,设备ID请进入设备详情查看。
返回到publish,单击publish按钮后进入Subscribe,可以看到订阅命令收到一条command。
7.进入边缘设备查看数据上报情况
进入IoTDA单击设备,进入概览发现并无数据上报,说明设备发送的数据在节点本地被集成SDK的monitor-app应用拦截,当数据内容为error时,在本地向hub调用重启命令。应用实现了数据处理和命令下发的功能。
延伸阅读:
- 点赞
- 收藏
- 关注作者
评论(0)