Apache IoTDB开发系统之MQTT 协议

举报
小云悠悠zZ 发表于 2023/09/12 08:43:56 2023/09/12
【摘要】 内置的 MQTT 服务提供了通过 MQTT 直接连接到 IoTDB 的能力。它侦听来自 MQTT 客户端的发布消息,然后立即将数据写入存储。MQTT 主题对应 IoTDB 时间序列。消息有效负载的格式可以格式化为 java SPI 加载的事件,缺省实现为 。

MQTT是一种机器对机器 (M2M)/“物联网”连接协议。它被设计为极其轻量级的发布/订阅消息传递传输。它对于与需要少量代码占用和/或网络带宽非常宝贵的远程位置的连接非常有用。

IoTDB 支持 MQTT v3.1(OASIS 标准)协议。IoTDB 服务器包含内置的 MQTT 服务,允许远程设备直接向 IoTDB 服务器发送消息。

MQTT - 图2

内置 MQTT 服务

内置的 MQTT 服务提供了通过 MQTT 直接连接到 IoTDB 的能力。它侦听来自 MQTT 客户端的发布消息,然后立即将数据写入存储。MQTT 主题对应 IoTDB 时间序列。消息有效负载的格式可以格式化为 java SPI 加载的事件,缺省实现为 。默认格式化程序支持两种 json 格式,下面是一个 MQTT 消息负载示例:PayloadFormatterJSONPayloadFormatterjson

  1. {
  2. "device":"root.sg.d1",
  3. "timestamp":1586076045524,
  4. "measurements":["s1","s2"],
  5. "values":[0.530635,0.530635]
  6. }

  1. {
  2. "device":"root.sg.d1",
  3. "timestamps":[1586076045524,1586076065526],
  4. "measurements":["s1","s2"],
  5. "values":[[0.530635,0.530635], [0.530655,0.530695]]
  6. }

MQTT - 图3

MQTT 配置

IoTDB MQTT 服务默认加载配置。${IOTDB_HOME}/${IOTDB_CONF}/iotdb-engine.properties

配置如下:

NAME DESCRIPTION DEFAULT
enable_mqtt_service whether to enable the mqtt service false
mqtt_host the mqtt service binding host 0.0.0.0
mqtt_port the mqtt service binding port 1883
mqtt_handler_pool_size the handler pool size for handing the mqtt messages 1
mqtt_payload_formatter the mqtt message payload formatter json
mqtt_max_message_size the max mqtt message size in byte 1048576

举例

下面是一个 mqtt 客户端向 IoTDB 服务器发送消息的示例。

        MQTT mqtt = new MQTT();
        mqtt.setHost("127.0.0.1", 1883);
        mqtt.setUserName("root");
        mqtt.setPassword("root");
        BlockingConnection connection = mqtt.blockingConnection();
        connection.connect();
        Random random = new Random();
        for (int i = 0; i < 10; i++) {
            String payload = String.format("{\n" +
                    "\"device\":\"root.sg.d1\",\n" +
                    "\"timestamp\":%d,\n" +
                    "\"measurements\":[\"s1\"],\n" +
                    "\"values\":[%f]\n" +
                    "}", System.currentTimeMillis(), random.nextDouble());
            connection.publish("root.sg.d1.s1", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
        }
        connection.disconnect();
    }

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。