如何用Java实现一个基于MQTT协议的发布订阅示例

举报
jackwangcumt 发表于 2021/10/23 16:59:51 2021/10/23
【摘要】 MQTT是ISO 标准下基于发布/订阅模式的协议。它基于TCP/IP协议,具有轻量、简单、开放和易于实现的特点。该协议广泛应用于机器间通信,即物联网领域。它是为硬件性能低下的远程设备以及网络状况不佳的情况下而设计协议,为此,它需要一个消息中间件(MQTT Broker,可以理解为Server端。)MQTT协议这些特点使它适用范围非常广泛。

1 MQTT协议概述


      根据百度百科定义,MQTT是ISO 标准下基于发布/订阅模式的协议。它基于TCP/IP协议,具有轻量、简单、开放和易于实现的特点。该协议广泛应用于机器间通信,即物联网领域。它是为硬件性能低下的远程设备以及网络状况不佳的情况下而设计协议,为此,它需要一个消息中间件(MQTT Broker,可以理解为Server端。)MQTT协议这些特点使它适用范围非常广泛。

     MQTT协议通信示意图如下所示:

0.jpg

    其中的消息发布和订阅需要通过中间代理MQTT Broker实现,而MQTT Client则可以发布消息,也可以订阅消息。换句话说,消息的发布者和订阅者都是客户端,消息代理是服务器(例如EMQTT、Mosquitto、Apollo等),消息发布者可以同时是订阅者。MQTT传输的消息分为:主题(Topic)和负载(Payload)两部分。Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容。而Payload可以理解为消息的内容,是指订阅者具体要使用的内容。另外,MQTT协议是一个分布式通信协议,消息传递需要关心数据的质量,它有三个值分别是:
QoS 0:“至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
QoS 1:“至少一次”,确保消息到达,但消息重复可能会发生。
QoS 2:“只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。

2 Mosquitto概述


      Mosquitto是一个开源消息代理(Broker),实现了MQTT协议版本3.1和3.1.1。它非常轻量,适用于低功耗单板计算机到完整服务器的所有设备。Mosquitto项目还提供了用于实现MQTT客户端的C库以及非常受欢迎的mosquitto_pub和mosquitto_sub命令行MQTT客户端。首先安装从官网 http://mosquitto.org/download/ 下载对应的安装软件,并安装:

1.jpg

   安装后,可以启动该服务,默认端口为1883。如下图所示:

3.jpg

另外,官网还提供MQTT客户端工具,可以非常方便的提供测试,下载地址为http://www.eclipse.org/paho/components/tool 可以下载org.eclipse.paho.mqtt.utility-1.0.0.jar这个Jar包,进行双击运行:

5.jpg

3 MQTT Java实现


      首先保证Mosquitto Broker运行,否则无法进行客户端的消息发布和订阅。首先需要用maven下载客户端库,代码如下:

<dependency>
	<groupId>org.eclipse.paho</groupId>
	<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
	<version>1.2.5</version>
</dependency>

     下面给出发布者核心代码:

package com.example.demo.mqtt;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class MqttProducer {

    private MqttClient mqttClient;

    public MqttProducer(String SERVER_URI,String CLIENT_ID){
        try {
            MemoryPersistence persistence = new MemoryPersistence();
            mqttClient = new MqttClient(SERVER_URI, CLIENT_ID,persistence);
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setCleanSession(true);
            System.out.println("Connecting to broker: "+ SERVER_URI);
            mqttClient.connect(connOpts);
        }catch (Exception ex){
            ex.printStackTrace();
        }
    }


    public void send(String topic, int qos, boolean retained, String payload) {
        if (mqttClient == null){
            return;
        }
        try {
            mqttClient.publish(topic, payload.getBytes(), qos, retained);
        } catch (MqttException e) {
            System.out.println(e.getMessage());
        }
    }
}

  MqttClient是MQTT客户端的抽象,它需要指定Broker地址,如"tcp://localhost:1883",同时需要指定一个唯一的客户端ID,在发布消息时,mqttClient.publish(topic, payload.getBytes(), qos, retained),需要指定主题topic,内容payload,以及质量Qos等。下面再给出订阅者代码:

package com.example.demo.mqtt;

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class MqttSubscriber {
    
    private MqttClient mqttClient;

    public MqttSubscriber(String SERVER_URI,String CLIENT_ID){
        try {
            MemoryPersistence persistence = new MemoryPersistence();
            mqttClient = new MqttClient(SERVER_URI, CLIENT_ID,persistence);
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setCleanSession(true);
            mqttClient.connect(connOpts);

        }catch (Exception ex){
            ex.printStackTrace();
        }
    }


    public void subscribe(String topic) {
        if (mqttClient == null){
            return;
        }
        try {
            mqttClient.subscribe(topic);
            mqttClient.setCallback(new MqttCallback() {
                @Override
                public void connectionLost(Throwable throwable) {
                    System.out.println("连接丢失");
                }

                @Override
                public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
                    System.out.println(topic);
                    System.out.println(mqttMessage.toString());
                }

                @Override
                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                    System.out.println("delivery isComplete:" + iMqttDeliveryToken.isComplete());
                }
            });
        } catch (MqttException e) {
            System.out.println(e.getMessage());
        }
    }
}

项目启动代码如下:

package com.example.demo.mqtt;

public class Test {
    public static void main(String[] args) {
        String serverURI="tcp://localhost:1883";
        String clientID="demo_mqtt";
        MqttProducer mqttProducer = new MqttProducer(serverURI, clientID);
        String msg ="";
        while (true){
            msg = "time:" + System.currentTimeMillis();
            mqttProducer.send("topic/msg02", 1, true, msg);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
/////////////////////////////////
package com.example.demo.mqtt;

public class Test2 {
    public static void main(String[] args) {
        String serverURI="tcp://localhost:1883";
        String clientID="demo_mqtt000";
        MqttSubscriber mqttSubscriber = new MqttSubscriber(serverURI, clientID);
        mqttSubscriber.subscribe("topic/msg02");
    }
}

运行后,控制台打印如下:

topic/msg02
time:1634979250907
topic/msg02
time:1634979251908
topic/msg02
time:1634979252908
topic/msg02
time:1634979253909
//.......................
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。