Coap服务器搭建Python版本

举报
苌奥林 发表于 2021/12/01 22:43:06 2021/12/01
【摘要】 Coap服务器搭建Python版本 Coap 简介CoAP(Constrained Application Protocol)被设计用于同一受限网络(例如,低功耗、有损网络)上的设备之间、设备和因特网上的一般节点之间以及由因特网连接的不同受限网络上的设备之间使用。定义四种消息类型PUTGETPOSTDELETE基于请求/响应模型双向通信(M2M)轻量:轻量最小长度仅为4B基于UDP低功耗 ...

Coap服务器搭建Python版本

Coap 简介

  • CoAP(Constrained Application Protocol)被设计用于同一受限网络(例如,低功耗、有损网络)上的设备之间、设备和因特网上的一般节点之间以及由因特网连接的不同受限网络上的设备之间使用。
  • 定义四种消息类型
    • PUT
    • GET
    • POST
    • DELETE
  • 基于请求/响应模型
  • 双向通信(M2M)
  • 轻量:轻量最小长度仅为4B
  • 基于UDP
  • 低功耗 ,非长连接通信

版本

  • ubuntu 20.04
  • python 3.8.5
  • aiocoap

Step1 安装aiocoap

pip3 install --upgrade aiocoap

Step2 编写服务端程序

# coding=utf-8
# @filename     :server.py
# @time         :2021/12/1 10:28 pm
# @author       :changaolin@gmail.com
# @description  :
import asyncio

from aiocoap import Message, Context
from aiocoap.numbers import Code
from aiocoap.resource import Resource, Site


class Alarm(Resource):
    def __init__(self):
        super().__init__()
        self.state = "OFF"

    async def render_put(self, request):
        self.state = request.payload
        print('update: %s' % self.state)
        return Message(code=Code.CHANGED, payload=self.state)


def main():
    site = Site()
    site.add_resource(['alarm'], Alarm())
    asyncio.Task(Context.create_server_context(site, bind=('localhost', 5683)))
    print("开启coap 服务: coap://localhost/alarm")
    asyncio.get_event_loop().run_forever()


if __name__ == "__main__":
    main()

Step3 编写 客户端程序

# coding=utf-8
# @filename     :client.py
# @time         :2021/12/1 10:31 pm
# @author       :changaolin@gmail.com
# @description  :
import asyncio
import random

from aiocoap import Context, Message
from aiocoap.numbers.codes import Code


async def main():
    context = await Context.create_client_context()
    payload = random.choice([b'OFF', b'ON'])

    request = Message(code=Code.PUT, payload=payload, uri="coap://localhost/alarm")

    response = await context.request(request).response
    print('%s\n%r' % (response.code, response.payload))


if __name__ == "__main__":
    asyncio.get_event_loop().run_until_complete(main())

Step4 执行测试

python server.py # 在终端启动,等待链接
# 开启coap 服务: coap://localhost/alarm
# update: b'ON'

python client.py # 在新终端启动
# 2.04 Changed
# b'ON'
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。