深入浅出MQTT协议
物联网(Internet of Things,IoT)时代机器之间(Machine-to-Machine,M2M)的大规模沟通需要发布/订阅(Publish/Subscribe)模式,轻量级、易扩展的MQTT(Message Queuing Telemetry Transport)顺势推进。MQTT是基于二进制消息的发布/订阅编程模式的消息协议,适合需要低功耗和网络带宽有限的IoT场景。 协议就是通信双方的一个约定,即,表示第1位传输的什么、第2位传输的什么……。在MQTT协议中,一个MQTT数据包由:固定头(Fixed header)、 可变头(Variable header)、 消息体(payload)三部分构成。 MQTT 数据包结构:固定头部 可变头部 消息体 固定头(Fixed header),存在于所有MQTT数据包中,表示数据包类型及数据包的分组类标识 可变头(Variable header),存在于部分MQTT数据包中,数据包类型决定了可变头是否存在及其具体内容 消息体(Payload),存在于部分MQTT数据包中,表示客户端收到的具体内容 MQTT固定头:固定头存在于所有MQTT数据包中,其结构如下: Bit 7 6 5 4 | 3 2 1 0 byte 1 MQTT数据包类型 | 不同类型MQTT数据包的具体标识 byte 2… 剩余长度 MQTT可变头:它驻位于固定的头和负载之间。可变头的内容因数据包类型而不同,较常的应用是做为包的标识: Bit 7 6 5 4 3 2 1 0 byte 1 包标签符(MSB) byte 2… 包标签符(LSB) Payload消息体位MQTT数据包的第三部分,CONNECT、SUBSCRIBE、SUBACK、UNSUBSCRIBE四种类型的消息 有消息体: CONNECT,消息体内容主要是:客户端的ClientID、订阅的Topic、Message以及用户名和密码。 SUBSCRIBE,消息体内容是一系列的要订阅的主题以及QoS。 SUBACK,消息体内容是服务器对于SUBSCRIBE所申请的主题及QoS进行确认和回复。 UNSUBSCRIBE,消息体内容是要订阅的主题。 MQTT协议特点: 轻量级的 machine-to-machine 通信协议。 publish/subscribe模式。 基于TCP/IP。 支持QoS。 适合于低带宽、不可靠连接、嵌入式设备、CPU内存资源紧张。 是一种比较不错手机APP消息推送方案。 主题: 1.可以通过反斜杠表示多个层级关系。 2.主题可以通过通配符进行过滤,但是不允许使用通配符广播。( 可以过滤一个层级,而*只能出现在主题最后表示过滤任意级别的层级) MQTT支持三种不同级别的服务质量(Quality of Service,QoS)为不同场景提供消息可靠性: 级别0:尽力而为。消息发送者会想尽办法发送消息,但是遇到意外并不会重试。 级别1:至少一次。消息接收者如果没有知会或者知会本身丢失,消息发送者会再次发送以保证消息接收者至少会收到一次,当然可能造成重复消息。 级别2:恰好一次。保证这种语义肯待会减少并发或者增加延时,不过丢失或者重复消息是不可接受的时候,级别2是最合适的。级别2所提供的不重不丢很多情况下是最理想的,不过往返多次的确认一定对并发和延迟带来影响。 消息类型: CONNECT:客户端连接到MQTT代理 TCP连接建立完毕后,Client向Server发出一个Request。 如果一段时间内接收不到Server的Response,则关闭socket,重新建立一个session连接。 如果一个ClientID已经与服务器连接,则持有同样ClientID的旧有连接必须由服务器关闭后,新建立才能建立。 CONNACK:连接确认 PUBLISH:新发布消息 关于Topic通配符 /:用来表示层次,比如a/b,a/b/c。 #:表示匹配>=0个层次,比如a/#就匹配a/,a/b,a/b/c。 单独的一个#表示匹配所有。 不允许 a#和a/#/c。 :表示匹配一个层次,例如a/ 匹配a/b,a/c,不匹配a/b/c。 单独的一个 是允许的,a 不允许,a/ /b不允许 PUBACK:新发布消息确认,是QoS 1给PUBLISH消息的回复 PUBREC:QoS 2消息流的第一部分,表示消息发布已记录 PUBREL:QoS 2消息流的第二部分,表示消息发布已释放 PUBCOMP:QoS 2消息流的第三部分,表示消息发布完成 QoS=0:最多一次,有可能重复或丢失。 QoS=1:至少一次,有可能重复。 Server向Client发布该确认(Client收到确认后删除),订阅者向Server发布确认。 QoS=2:只有一次,确保消息只到达一次(用于比较严格的计费系统)。 1. Server->Client发布PUBREC(已收到); 2. Client->Server发布PUBREL(已释放); 3. Server->Client发布PUBCOMP(已完成),Client删除msg; 订阅者也会向Server发布类似过程确认。 SUBSCRIBE:客户端订阅某个主题 SUBACK:对于SUBSCRIBE消息的确认 UNSUBSCRIBE:客户端终止订阅的消息 UNSUBACK:对于UNSUBSCRIBE消息的确认 PINGREQ:心跳 Client有责任发送KeepAliveTime时长告诉给Server。在一个时长内,发送PINGREQ,Server发送PINGRES确认。 Server在1.5个时长内未收到PINGREQ,就断开连接。 Client在1个时长内未收到PINGRES,断开连接。 一般来说,时长设置为几个分钟。最大18hours,0表示一直未断开。 PINGRESP:确认心跳 DISCONNECT:客户端终止连接前优雅地通知MQTT代理 Clean Session 如果为false(flag=0),Client断开连接后,Server应该保存Client的订阅信息。 如果为true(flag=1),表示Server应该立刻丢弃任何会话状态信息。 MQTT提供了多个层次的安全特性: 网络层:有条件可以通过拉专线或者使用VPN来连接设备与MQTT代理,以提高网络传输的安全性。 传输层:传输层使用TLS加密是确保安全的一个好手段,可以防止中间人攻击(Man-In-The-Middle Attack)。客户端证书不但可以作为设备的身份凭证,还可以用来验证设备。 应用层:MQTT还提供客户标识(Client Identifier)以及用户名密码,在应用层验证设备。 加密: TLS是非常成熟的安全协议,在握手的时候便可以创建安全连接,使得黑客无法窃听或者篡改内容了。使用TLS的时候有以下注意点: 1.尽可能使用高版本的TLS。 2.验证X509证书链防止中间人攻击。 3.尽量使用有CA发布的证书。 4.TLS会增加连接时开销,对低运算能力的设备而言是额外的负担,不过如果设备是长连接的话就会避免反复连接的开销。 MQTT支持两种层次的认证: 传输层:传输层使用TLS不但可以加密通讯,还可以使用X509证书来认证设备。 应用层:MQTT支持客户标识、用户名密码以及X509证书,在应用层验证设备。 mosquitto是一个开源的轻量级的C实现的MQTT代理,完全兼容了MQTT 3.1和MQTT 3.1.1。mqtt本身是基于TCP的,但是代理对WebSockets的支持,可以支持http。 Mosquitto原生支持了TLS加密,生成证书后再配置一下MQTT代理即可。 安装 mosquitto,mosquitto-clients 源码安装: wget http://mosquitto.org/files/source/mosquitto-1.4.5.tar.gz tar zxfv mosquitto-1.4.5.tar.gz cd mosquitto-1.4.5 make make install 订阅消息: mosquitto_sub -d -t 'floor-5/temperature' Received CONNACK Received SUBACK Subscribed (mid: 1): 0 发送消息: mosquitto_pub -d -t 'floor-5/temperature' -m '15' Received CONNACK Sending PUBLISH (d0, q0, r0, m1, 'floor-5/temperature', ... (2 bytes)) 接收消息: Received PUBLISH (d0, q0, r0, m0, 'floor-5/temperature', ... (2 bytes)) 15 Sending PINGREQ Received PINGRESP 参数: 【-t】指定主题,此处为sensor 【-v】打印更多的调试信息 【-m】指定消息内容 在JAVA环境下: 在Maven中引用MQTT-Client <dependency> <groupId>org.fusesource.mqtt-client</groupId> <artifactId>mqtt-client</artifactId> <version>1.0-SNAPSHOT</version> </dependency> <repositories> <repository> <id>fusesource.snapshots</id> <name>FuseSource Snapshot Repository</name> <url>http://repo.fusesource.com/nexus/content/repositories/snapshots</url> <snapshots><enabled>true</enabled></snapshots> <releases><enabled>false</enabled></releases> </repository> </repositories> 建立连接: MQTT mqtt = new MQTT(); mqtt.setHost('localhost', 1883); 或者 mqtt.setHost('tcp://localhost:1883'); MQTT设置说明 setClientId:用于设置客户端会话的ID。在setCleanSession(false);被调用时,MQTT服务器利用该ID获得相应的会话。此ID应少于23个字符,默认根据本机地址、端口和时间自动生成。 setCleanSession:若设为false,MQTT服务器将持久化客户端会话的主体订阅和ACK位置,默认为true。 setKeepAlive:定义客户端传来消息的最大时间间隔秒数,服务器可以据此判断与客户端的连接是否已经断开,从而避免TCP/IP超时的长时间等待。 setUserName:服务器认证用户名。 setPassword:服务器认证密码。 setWillTopic:设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。 setWillMessage:设置“遗嘱”消息的内容,默认是长度为零的消息。 setWillQos:设置“遗嘱”消息的QoS,默认为QoS.ATMOSTONCE。 setWillRetain:若想要在发布“遗嘱”消息时拥有retain选项,则为true。 MQTT.connectBlocking方法建立并返回一个阻塞API连接。 BlockingConnection connection = mqtt.blockingConnection(); connection.connect(); 使用publish方法发布消息: connection.publish('foo', 'Hello'.toBytes(), QoS.AT_LEAST_ONCE, false); 利用subscribe方法订阅多个主题: Topic[] topics = {new Topic('foo', QoS.AT_LEAST_ONCE)}; byte[] qoses = connection.subscribe(topics); 利用receive和ack方法,获取并应答消息: Message message = connection.receive(); System.out.println(message.getTopic()); byte[] payload = message.getPayload(); message.ack(); 结束连接: connection.disconnect(); 文章转载于http://www.360doc.com/content/17/1109/19/47869400_702448421.shtml |
- 点赞
- 收藏
- 关注作者
评论(0)