物联网网关开发:基于MQTT消息总线的设计过程丨【拜托了,物联网!】

道哥 发表于 2021/09/26 17:07:27 2021/09/26
【摘要】 一、前言 二、网关的作用 2.1 指令转发 2.2 外网通信 2.3 协议转换 2.4 设备管理 2.5 边沿计算(自动化控制) 三、网关内部进程之间的通信 3.1 网关中需要哪些进程 3.2 MQTT消息总线 3.3 Topic 的设计 3.4 与 DBUS 总线的对比 四、网关与云平台之间的通信 4.1 与云平台之间的 MQTT 连接 4.2 Proc_Bridge 进程:外部和内部消...

一、前言

在一个嵌入式系统中,利用MQTT消息总线在各进程之间进行通信,是一个很常见的进行通信方式。

这样的通信模型,对于非工控产品来说,通信速度完全足够。以前做过测试,在x86平台和ARM平台,一条数据从本地到云端绕一下,然后再回到本地,可以控制在毫秒级别

这篇文章,我们来具体的聊一聊物联网系统中的网关内部程序可以利用 MQTT 消息总线如何进行设计。

阅读这篇文章,你可以有如下收获:

  1. 物联网系统中,设备之间是如何通信的;
  2. 网关中的进程之间消息总线通信模型;
  3. 网关内部消息总线上的数据如何与服务器进行通信;
  4. 作为消遣,了解一下物联网系统中的一些基本知识;

二、网关的作用

物联网这个词语的范畴太广,似乎所有的硬件设备,只要能够接入网络,就可以称之为物联网产品,似乎物联网这个词可以把一切都纳入到其中。这么空洞的词语不利于我们的讲解,因此我们就用一个可以感知、想象的场景来代替,那就是智能家居系统,这是最能代表物联网时代的典型产品了。

2.1 指令转发

在一个智能家居系统中,假设有这么几个设备:

这些设备的通信模块,如果是 WiFi 或者是蓝牙,那么一般都可以直接通过手机来控制(当然,需要厂家提供相应的手机 APP),手机就相当于一个中心节点,控制着所有的设备。目前市面上的一些智能设备单品都是这样的通信方式,例如:空调、吸尘器、空气净化器、冰箱等等。只要在这些设备中加一个无线通信模块即可(例如:ESP8266模块)。

如果通信模块是其它的通信模块,例如:RF433、ZigBee、ZWave等,由于手机没有这些通信模块,因此就需要一个网关来“转发”指令。手机和网关都连接到家中的路由器,处于同一个局域网中,手机把控制指令发送给网关,网关再把指令转发给相应的设备。通信模型如下:

2.2 外网通信

在上面的通信模型中,手机和网关由于处于同一个局域网中,因此可以直接通信。如果手机不在局域网中呢?那么就要通过云端的服务器来转发了,通信模型如下:

  1. 手机把指令发到服务器;
  2. 服务器把指令转发给网关;
  3. 网关把指令发给指定的设备;

以上描述的是控制指令的流程,如果是设备发出的报警信息呢,数据的流向就是倒过来进行的

可以看出,网关是所有设备之间通信的中心节点,也是内网与外网之间通信的中转节点,也就是把各种智能设备连接到互联网的中转器。

2.3 协议转换

上面已经提到,硬件设备上的通信模块都是确定的(RF,ZigBee,ZWave等等),一般来说,可以把这些通信模块称呼为无线通信协议。在一套智能家居系统中,所有设备的无线通信协议大部分都是相同的。

那么,不同类型的无线通信协议设备是否可以共存在同一个系统中呢?

答案是:可以。只要在网关中,集成了相应的无线通信协议模块就可以达到这个目的!如下图所示:

从手机APP上看,所有的设备都是相同的,不会关心设备的无线通信协议是什么,因此,发出的控制指令都是协议无关的。

当网关接收到控制指令时,首先根据指令内容查找出目标设备,然后确定目标设备的无线通信协议,最后把指令发送给对应的硬件通信模块,由该通信模块通过无线电信号把控制指令发送到设备。

从这个指令的传输过程来看,网关就充当着协议转换的角色

另外还有一种通信场景:当系统中的一个“输入”设备与一个“输出”设备进行绑定/关联时,例如:

  1. 红外感应器与声光报警器绑定:当红外感应器监测到人体时,发出信号,然后控制声光报警器发出报警;
  2. 门磁与灯绑定:当开门时,门磁发出信号,自动打开灯光;

如果“输入”设备与“输出”设备是不同类型的无线通信协议,也需要网关来进行协议转换

2.4 设备管理

在一个智能家居系统中,设备可多可少,对这些设备进行管理也是很重要的事情。网关作为系统的中心节点,对设备进行管理的重任理所当然就由网关来承担。

设备管理功能包括:

设备的添加和删除;
设备状态的管理(电量、设备断网、失联等等);
设备树的管理;

2.5 边沿计算(自动化控制)

在正常的情况下,网关是可以通过路由器,与服务器保持着长连接的。如果服务器的处理能力比较强大,智能家居系统中所有需要处理的事情都可以丢给服务器来计算、处理,服务器在计算之后把处理结果再发送给网关。看起来想法很完美!

但是,考虑下面这 2 种情况:

  1. 路由器出现问题了,网关无法连接到服务器,因此就无法把本地数据及时上报;
  2. 系统中出现了异常情况,需要紧急处理,如果把信息上报到服务器,由服务器计算之后再回传给网关,耗费的时间可能超过了可容忍时间,该如何处理?(可以用车联网系统来脑补一下这个场景:自动驾驶中的汽车遇到紧急情况,如果把所有信息上传给服务器,然后等待服务器的下一步指令?)

对于上面的这些场景,把一些计算、处理操作放在网关这一端来处理也许更合适!这也是近几年比较流行的边沿计算

1. 边缘计算,是指在靠近物或数据源头的一侧,采用网络、计算、存储、应用核心能力为一体的开放平台,就近提供最近端服务。

2. 其应用程序在边缘侧发起,产生更快的网络服务响应,满足行业在实时业务、应用智能、安全与隐私保护等方面的基本需求。

3. 边缘计算处于物理实体和工业连接之间,或处于物理实体的顶端。而云端计算,仍然可以访问边缘计算的历史数据

三、网关内部进程之间的通信

在设计一个应用程序的架构时,可以通过多线程来实现,也可以通过</font color=orange>多进程来实现,每个人的习惯都不一样,各有各的好处。我们这里不去讨论孰优孰劣,因为我对多进程这样的设计思想比较偏爱,所以就直接按照多进程的程序架构来讨论。

3.1 网关中需要哪些进程

网关中需要执行的所有进程,是根据网关的功能来决定的,假设包括如下的功能:

(1)连接外网的进程 Proc_Bridge

网关需要连接到云端的服务器,需要一个进程与服务器之间保持长连接,这样就可以及时接收到服务器发来的控制指令,以及把系统内部数据及时上报给服务器。

这个进程需要把从服务器接收到的指令转发到网关系统内部,把从系统内部接收到的信息转发给服务器,类似于桥接的功能,因此命名为 Proc_Bridge。

(2)设备管理进程 Proc_DevMgr

这个进程用来执行设备管理功能,设备的添加(入网)、删除(退网),都由此进程来管理。

(3)协议转换进程 Proc_Protocol

下行:把应用层的统一通信协议,转换成不同类型无线通信协议,发送给相应的无线模块。

上行:把设备上报的、不同类型的无线通信协议,转换成应用层的统一通信协议。

(4)边沿计算进程(自动化控制) Proc_Auto

很明显,这需要一个独立的进程来处理各种计算,这个进程就相当于系统的大脑

(5)无线通信协议相关的进程 Proc_ZigBee, Proc_RF, Proc_ZWave

在硬件上,每一种无线通信模块通过串口或其他硬件连接方式与到网关的 CPU 进行通信,因此,每一种无线通信模块都需要一个相应的进程来处理。

(6)其他“软设备”进程 Proc_Xxx

在之前的项目中,还遇到一些硬件设备,它们与门磁、插座等设备在逻辑上处于同一个层次,但是与网关之间是通过 TCP 来连接。对于这样的设备,也可以使用一个独立的进程来进行管理。

上面的这些进程,在网关中的运行模型如下:

3.2 MQTT消息总线

以上这些进程之间需要相互通信,不是简单的点对点通信,而是一个网状的通信模型。比如:

  1. 设备管理进程 Proc_DevMgr:当任何一种设备被添加到系统中时,都需要处进行处理,因此它需要与 Proc_ZigBee, Proc_RF, Proc_ZWave 这些进程进行通信;
  2. 当某个设备上报数据时(例如:Proc_ZigBee),Proc_Protocol 进程需要把数据进行协议转换,然后 Proc_Bridge 进程把转换后的数据上报给服务器,同时 Proc_Auto 进程需要检查这个设备上报的数据是否触发了其他相关联的设备;

也就是说,这些进程中间的通信是相互交叉的,如果通过传统的 IPC 方式(共享内存、命名管道、消息队列、Socket)等,处理起来比较复杂。

引入了 MQTT 消息总线之后,每个进程只需要挂载到总线上。每个进程只需要监听自己感兴趣的 topic,就可以接收到相应的数据。

既然这些进程之间的通信关系比较复杂,那么一个良好的 topic 设计规范就显得很重要了!

3.3 Topic 的设计

MQTT 的通信模型是基于订阅/发布的模式,一个客户端(进程)接入到消息总线之后,需要注册自己感兴趣的 主题 topic,其他客户端(进程)往这个 topic 发送消息,即可被订阅者接收到。

主题 topic 是一个以反斜线(/)分割的字符串,用来表示多层的分级结构,例如下面的这 2 个 topic,是亚马逊 AWS 平台中在线升级(OTA)相关的 topic:

  1. /aws/things/MyThing/jobs/get/accepted
  2. /aws/things/MyThing/jobs/get/rejected

在我们的示例场景中,可以按照下面这样来设计主题 topic:

(1) Proc_DevMgr

订阅主题:

/iot/v1/ZigBee/Register
/iot/v1/ZigBee/UnRegister
/iot/v1/RF/Register
/iot/v1/RF/UnRegister
/iot/v1/ZWave/Register
/iot/v1/ZWave/UnRegister

(2) Proc_Bridge

订阅主题:

/iot/v1/Device/Report

发出数据的主题:

/iot/v1/Device/Control
/iot/v1/Device/Remove
/iot/v1/Auto/AddRule
/iot/v1/Auto/RemoveRule

(3) Proc_Protocol

订阅主题:

/iot/v1/Device/Control
/iot/v1/Device/Remove
/iot/v1/ZigBee/Report
/iot/v1/RF/Report
/iot/v1/ZWave/Report

发送数据主题:

/iot/v1/Device/Report
/iot/v1/ZigBee/Control
/iot/v1/ZigBee/Remove
/iot/v1/RF/Control
/iot/v1/RF/Remove
/iot/v1/ZWave/Control
/iot/v1/ZWave/Remove

(4) Proc_Auto

订阅主题:

/iot/v1/Auto/AddRule
/iot/v1/Auto/RemoveRule
/iot/v1/Device/Report

发送数据主题:

/iot/v1/Device/Control

(5) Proc_ZigBee

订阅主题:

/iot/v1/ZigBee/Control
/iot/v1/ZigBee/Remove

发送数据主题:

/iot/v1/ZigBee/Register
/iot/v1/ZigBee/UnRegister
/iot/v1/ZigBee/Report

(6) Proc_RF

订阅主题:

/iot/v1/RF/Control
/iot/v1/RF/Remove

发送数据主题:

/iot/v1/RF/Register
/iot/v1/RF/UnRegister
/iot/v1/RF/Report

(7) Proc_ZWave

订阅主题:

/iot/v1/ZWave/Control
/iot/v1/ZWave/Remove

发送数据主题:

/iot/v1/ZWave/Register
/iot/v1/ZWave/UnRegister
/iot/v1/ZWave/Report

以上这些主题 topic 的设计,还是有些粗略的。如果借助通配符(#, +, $),可以设计出更灵活的层次结构。

  1. 多层通配符: “#”是用于匹配主题中任意层级的通配符,多层通配符表示它的父级和任意数量的子层级。
  2. 单层通配符:“+”加号是只能用于单个主题层级匹配的通配符,在主题过滤器的任意层级都可以使用单层通配符,包括第一个和最后一个层级。
  3. 通配符:“$”表示匹配一个字符,只要不是放在主题的最开头,其它情况下都表示匹配一个字符。

我们以一个控制指令为例,来梳理一下数据是如何通过 topic 进行流动:

  1. Proc_Bridge 进程从服务器接收到控制指令后,发送到消息总线上的 topic: /iot/v1/Device/Control。
  2. 由于 Proc_Protocol 进程订阅了这个 topic,所以立刻接收到指令。
  3. Proc_Protocol 分析指令内容,发现是一个 ZigBee 设备,于是进行协议转换,发送一个 ZigBee 控制指令到消息总线上的 topic: /iot/v1/ZigBee/Control。
  4. 由于 Proc_ZigBee 进程订阅了这个 topic,因此它接收到这个控制指令。
  5. Proc_ZigBee 把控制指令转换成 ZigBee 无线通信模块要求的格式,通过硬件发送给设备灯泡。

我们再分析一下设备数据上报的场景:

先关注图中红色箭头,忽略蓝色箭头:

  1. 门磁打开后,通过无线通信把信息上报给进程 Proc_CF。
  2. Proc_RF 进程接收到 RF433 通信模块上报的数据,把“门磁打开”这个信息发送到消息总线上的 topic:/iot/v1/RF/Report。
  3. 由于 Proc_Protocol 进程订阅了这个 topic,因此接收到上报的门磁数据。
  4. Proc_Protocol 分析数据,把 RF433 协议的数据转成统一的应用层协议的数据,发送到消息总线上的 topic:/iot/v1/Device/Report。
  5. 由于 Proc_Bridge 进程订阅了这个 topic,因此就接收到了这次上报的数据。
  6. Proc_Bridge 进程把数据上报给服务器。

再来看一下蓝色箭头流程:

在上面的第 4 步:Proc_Protocol 进程把 RF433 协议数据转成应用层统一协议之后,把数据发送到消息总线上的 topic:/iot/v1/Device/Report 之后,Proc_Auto 进程同时进行如下操作:

  1. 由于 Proc_Auto 也订阅了这个 topic,因此它也接收到了门磁上报的这个应用层协议的数据。
  2. Proc_Auto 查找自己的配置信息(假设用户已经提前配置好了一条规则:当门磁打开的时候,就触发声光报警器),发现匹配到了“门磁->报警器”这条规则,于是发出一条控制报警器的指令,发送到消息总线上的 topic: /iot/v1/Device/Control。

后面的 7,8,9,10 这四个步骤就与上面的控制指令流程完全一样了

3.4 与 DBUS 总线的对比

从上面描述的 3 个数据流向的场景中,是不是感觉到使用 topic 为“数据管道”的这种通信方式,与 Linux 系统中的 DBUS 总线特别的相似?

DBUS 总线也是用于进程之间的通信,按照我个人的理解,DBUS中其实是把进程之间的两种通信组织在一起了:

  1. 基于信号的数据传输;
  2. 基于方法的 RPC 远程调用;

DBUS 总线包含的概念更复杂一些,包括:路径、对象、接口、方法等等,这些概念组织在一起共同定位到一个具体的服务提供者了。

相比较而言,我感觉 MQTT 这样的方式更简洁一些。

所谓的 RPC 远程调用,就是调用位于远程机器上的一个函数,主要解决两个问题:

  1. 网络连接;
  2. 数据的序列化和反序列化;

后面我会专门写一篇文章,利用 protobuf 框架来实现 RPC 调用。

四、网关与云平台之间的通信

上面讲解的设计过程,是网关内部的各功能模块之间通信方式,这也是我们作为嵌入式开发者能充分发挥的部分

网关与云平台之间的通信方式一般都是客户指定的,就那么几种(阿里云、华为云、腾讯云、亚马逊AWS平台)。一般都要求网关与云平台之间处于长连接的状态,这样云端的各种指令就可以随时发送到网关。

当然了,这些云平台都会提供相应的 SDK 开发包,一般使用 MQTT 协议来连接云平台的更多一些。在一些文档中,会把位于云端的 MQTT 服务器称作 Broker,其实就是一个服务器。

进程 Proc_Bridge 的功能主要有 2 点:

  1. 与云平台的数据传输通道;
  2. 协议转换:把云平台相关的协议转换成网关内部的协议,以及相反的转换。

也就是说:Proc_Bridge 进程需要同时连接到云平台的 MQTT Broker 和网关内部的 MQTT 消息总线

4.1 与云平台之间的 MQTT 连接

目前的几大物联网云平台,都提供了不同的接入方式。对于网关来说,应用最多的就是 MQTT 接入。

我们知道,MQTT 只是一个协议而已,不同的编程语言中都有实现,在 C 语言中也有好几个实现。

在网关内部,运行着一个后台 deamon: MQTT Broker,其实就是 mosquitto 这个可执行程序,它充当着消息总线的功能。这里请大家注意:因为这个消息总线是运行在嵌入式系统的内部,接入总线的客户端就是需要相互通信的那些进程。这些进程的数量是有限的,即使是一个比较复杂的系统,最多十几个进程也就差不多了。因此,mosquitto 这个实现是完全可以支撑系统负载的

那么,如果在云端部署一个 MQTT Broker,理论上是可以直接使用 mosquitto 这个实现来作为消息总线的,但是你要评估接入的客户端(也就是网关)在一个什么样的数量级,考虑到并发的问题,一定要做压力测试。

对于后台开发,我的经验不多,不敢(也不能)多言,误导大家就罪过了。不过,对于一般的学习和测试来说,在云端直接部署 mosquitto 作为消息总线,是没有问题的。

4.2 Proc_Bridge 进程:外部和内部消息总线之间的桥接器

下面这张图,说明了 Proc_Bridge 进程在这个模型中的作用:

  1. 从云平台消息总线接收到的消息,需要转发到内部的消息总线;
  2. 从内部消息总线接收到的消息,需要转发到云平台的消息总线;

如果用 mosquitto 来实现,应该如何来实现呢?

1. mosquitto 的 API 接口

mosquitto 这个实现是基于回调函数的机制来运行的,例如:

// 连接成功时的回调函数
void my_connect_callback(struct mosquitto *mosq, void *obj, int rc)
{
    // ...
}

// 连接失败时的回调函数
void my_disconnect_callback(struct mosquitto *mosq, void *obj, int result)
{
    // ...
}

// 接收到消息时的回调函数
void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message)
{
	// ..
}

int main()
{
    // 其他代码
    // ...
    
    // 创建一个 mosquitto 对象
    struct mosquitto g_mosq = mosquitto_new("client_name", true, NULL);
    
    // 注册回调函数
    mosquitto_connect_callback_set(g_mosq, my_connect_callback);
	mosquitto_disconnect_callback_set(g_mosq, my_disconnect_callback);
	mosquitto_message_callback_set(g_mosq, my_message_callback);
	// 这里还有其他的回调函数设置
	
	// 开始连接到消息总线
	mosquitto_connect(g_mosq, "127.0.0.1", 1883, 60);
	
	while(1)
	{
		int rc = mosquitto_loop(g_mosq, -1, 1);
		if (rc) {
			printf("mqtt_portal: mosquitto_loop rc = %d \n", rc);
			sleep(1);
			mosquitto_reconnect(g_mosq);
		}
	}

	mosquitto_destroy(g_mosq);
	mosquitto_lib_cleanup();
	return 0;
}

以上代码就是一个 mosquitto 客户端的最简代码了,使用回调函数的机制,让程序的开发非常简单。

mosquitto 把底层的细节问题都帮助我们处理了,只要我们注册的函数被调用了,就说明发生了我们感兴趣的事件

这样的回调机制在各种开源软件中使用的比较多,比如:glib 里的定时器、libevent通讯处理、libmodbus 里的数据处理、linux 内核中的驱动开发和定时器,都是这个套路,一通百通!

在网关中的每个进程,只需要添加上面这部分代码,就可以挂载到消息总线上,从而可以与其它进程进行收发数据了。

2. 利用 UserData 指针,实现多个 MQTT 连接

上面的实例仅仅是连接到一个消息总线上,对于一个普通的进程来说,达到了通信的目的。

但是对于 Proc_Bridge 进程来说,还没有达到目的,因为这个进程处于桥接的位置,需要同时连接到远程和本地这两个消息总线上。那么应该如何实现呢?

看一下 mosquitto_new 这个函数的签名:

/*
 * obj - A user pointer that will be passed as an argument to any
 *      callbacks that are specified.
*/
libmosq_EXPORT struct mosquitto *mosquitto_new(const char *id, bool clean_session, void *obj);

最后一个参数的作用是:可以设置一个用户自己的数据(作为指针传入),那么 mosquitto 在回调我们的注册的任何一个函数时,都会把这个指针传入。因此,我们可以利用这个参数来区分这个连接是远程连接?还是本地连接。

所以,我们可以定义一个结构体变量,把一个 MQTT 连接的所有信息都记录在这里,然后注册给 mosquitto。当 mosquitto 回调函数时,把这个结构体变量的指针回传给我们,这样就拿到了这个连接的所有数据,在某种程度上来说,这也是一种面向对象的思想。

// 从来表示一个 MQTT 连接的结构体
typedef struct{
	char *id;
	char *name;
	char *pw;
	char *host;
	int port;
	pthread_t tHandle;
	struct mosquitto *mosq;
	int mqtt_num;
}MQData;

完整的代码已经放到网盘里了,为了让你先从原理上看明白,我把关键几个地方的代码贴在这里:

// 分配结构体变量
MQData userData = (MQData *)malloc(sizeof(MQData));

// 设置属于这里连接的参数: id, name 等等

// 创建 mosquitto 对象时,传入 userData。
struct mosquitto *mosq = mosquitto_new(userData->id, true, userData);

// 在回调函数中,把 obj 指针前转成 MQData 指针
static void messageCB(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message)
{
	MQData *userData = (MQData *)obj;
	
	// 此时就可以根据 userData 指针中的内容分辨出这是哪一个链接了
}

另外一个问题:不知道你是否注意到示例中的 mosquitto_loop() 这个函数?这个函数需要放在 while 死循环中不停的调用,才能出发 mosuiqtto 内部的事件。(其实在 mosuiqtto 中,还提供了另一个简化的函数 mosquitto_loop_forever)。

也就是说:在每个连接中,需要持续的触发 mosquitto 底层的事件,才能让消息系统顺利的收发。因此,在示例代码中,使用两个线程分别连接到云平台的总线和内部的总线。

五、总结

这篇文章,基本上把一个物联网系统的网关中,最基本的通信模型聊完了,相当于是一个程序的骨架吧,剩下的事情就是处理业务层的细节问题了。

万里长征,这才是第一步!

对于一个网关来说,还有其他更多的问题需要处理,比如:MQTT 连接的鉴权(用户名+密码,证书)、通信数据的序列化和反序列化、加密和解密等等,以后慢慢聊吧,希望我们一路前行!

【拜托了,物联网!】有奖征文火热进行中:https://bbs.huaweicloud.com/blogs/296704

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区),文章链接,文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件至:cloudbbs@huaweicloud.com进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容。
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。