华为云短信服务教你用Python实现Cmpp协议

举报
张俭 发表于 2024/03/01 13:17:58 2024/03/01
【摘要】 引言&协议概述中国移动通信短信网关协议(CMPP)是中国移动为实现短信业务而制定的一种通信协议,全称叫做China Mobile Point to Point,用于在客户端(SP,Service Provider)和中国移动短信网关之间传输短消息,有时也叫做移动梦网短信业务。CMPP3.0是该协议的第三个版本,相比于前两个版本,它增加了对长短信的支持、优化了数据结构等。本文对CMPP协议进...

引言&协议概述

中国移动通信短信网关协议(CMPP)是中国移动为实现短信业务而制定的一种通信协议,全称叫做China Mobile Point to Point,用于在客户端(SP,Service Provider)和中国移动短信网关之间传输短消息,有时也叫做移动梦网短信业务。CMPP3.0是该协议的第三个版本,相比于前两个版本,它增加了对长短信的支持、优化了数据结构等。本文对CMPP协议进行介绍,并给出Python实现CMPP协议栈的思路。

Python的asyncio模块提供了一套简洁的异步IO编程模型,非常适合用于实现协议栈。

CMPP协议基于客户端/服务端模型工作。由客户端(短信应用,如手机,应用程序等)先和ISMG(Internet Short Message Gateway 互联网短信网关)建立起TCP长连接,并使用CMPP命令与ISMG进行交互,实现短信的发送和接收。在CMPP协议中,无需同步等待响应就可以发送下一个指令,实现者可以根据自己的需要,实现同步、异步两种消息传输模式,满足不同场景下的性能要求。

连接成功,发送短信并查询短信发送成功

CMPP Client (Transmitter)ISMGconnectconnect_resp连接成功,准备发送短信submit (send SMS)submit_resp (message id)短信发送成功,准备查询状态query (message id)query_resp (return status)terminateterminate_respCMPP Client (Transmitter)ISMG

连接成功,从ISMG接收到短信

CMPP Client (Receiver)ISMGconnectconnect_resp连接成功,等待接收短信deliver (短信到达)deliver_respterminateterminate_respCMPP Client (Receiver)ISMG

协议帧介绍

在CMPP协议中,每个PDU都包含两个部分:CMPP Header和CMPP Body。

image.png

CMPP Header

Header包含以下字段,大小长度都是4字节

  • Total Length:整个PDU的长度,包括Header和Body。
  • Command ID:用于标识PDU的类型(例如,Connect、Submit等)。
  • Sequence Id:序列号,用来匹配请求和响应。

用Python Asyncio实现CMPP协议栈里的建立连接

本文的代码均已上传到cmpp-python。可以以本文的代码作为基础,很容易地在上面扩展。

代码结构组织如下:

.
├── LICENSE
├── README.md
├── cmpp
│   ├── __init__.py
│   ├── client.py
│   ├── protocol.py
│   └── utils.py
├── requirements.txt
├── setup.cfg
└── setup.py
  • cmpp/protocol.py:定义不同 CMPP 协议数据单元 (PDU) 的数据类,包括 CmppHeader、CmppConnect、CmppConnectResp、CmppSubmit 和 CmppSubmitResp
  • cmpp/client.py:该类处理与 ISMG(互联网短消息网关)的连接以及发送/接收 PDU。 主要 asyncio 进行异步 I/O 操作
  • cmpp/utils.py:定义 BoundAtomic 类,它是一种线程安全的方式来管理具有最小值和最大值的序列号。保证CMPP序列号在一定的范围内
  • setup.py:配置要分发的包,指定包名称、版本、作者和依赖项等元数据。

利用Python锁实现sequence_id

sequence_id是从1到0x7FFFFFFF的值

import threading

class BoundAtomic:
    def __init__(self, min_val: int, max_val: int):
        assert min_val <= max_val, "min must be less than or equal to max"
        self.min = min_val
        self.max = max_val
        self.value = min_val
        self.lock = threading.Lock()

    def next_val(self) -> int:
        with self.lock:
            if self.value >= self.max:
                self.value = self.min
            else:
                self.value += 1
            return self.value

在Python中定义CMPP PDU,篇幅有限,仅定义数个PDU

from dataclasses import dataclass
from typing import Union, List

@dataclass
class CmppHeader:
    total_length: int
    command_id: int
    sequence_id: int

@dataclass
class CmppConnect:
    source_addr: str
    authenticator_source: bytes
    version: int
    timestamp: int

@dataclass
class CmppConnectResp:
    status: int
    authenticator_ismg: str
    version: int

@dataclass
class CmppSubmit:
    msg_id: int
    pk_total: int
    pk_number: int
    registered_delivery: int
    msg_level: int
    service_id: str
    fee_user_type: int
    fee_terminal_id: str
    fee_terminal_type: int
    tp_pid: int
    tp_udhi: int
    msg_fmt: int
    msg_src: str
    fee_type: str
    fee_code: str
    valid_time: str
    at_time: str
    src_id: str
    dest_usr_tl: int
    dest_terminal_id: List[str]
    dest_terminal_type: int
    msg_length: int
    msg_content: bytes
    link_id: str

@dataclass
class CmppSubmitResp:
    msg_id: int
    result: int

@dataclass
class CmppPdu:
    header: CmppHeader
    body: Union[CmppHeader, CmppConnectResp, CmppSubmit, CmppSubmitResp]

实现编解码方法

@dataclass
class CmppConnect:
    source_addr: str
    authenticator_source: bytes
    version: int
    # MMDDHHMMSS format
    timestamp: int

    def encode(self) -> bytes:
        source_addr_bytes = self.source_addr.encode('utf-8').ljust(6, b'\x00')
        version_byte = self.version.to_bytes(1, 'big')
        timestamp_bytes = self.timestamp.to_bytes(4, 'big')
        return source_addr_bytes + self.authenticator_source + version_byte + timestamp_bytes

@dataclass
class CmppConnectResp:
    status: int
    authenticator_ismg: str
    version: int

    @staticmethod
    def decode(data: bytes) -> 'CmppConnectResp':
        status = int.from_bytes(data[0:4], 'big')
        authenticator_ismg = data[4:20].rstrip(b'\x00').decode('utf-8')
        version = data[20]
        return CmppConnectResp(status=status, authenticator_ismg=authenticator_ismg, version=version)

@dataclass
class CmppPdu:
    header: CmppHeader
    body: Union[CmppConnect, CmppConnectResp, CmppSubmit, CmppSubmitResp]

    def encode(self) -> bytes:
        body_bytes = self.body.encode()
        self.header.total_length = len(body_bytes) + 12
        header_bytes = (self.header.total_length.to_bytes(4, 'big') +
                        self.header.command_id.to_bytes(4, 'big') +
                        self.header.sequence_id.to_bytes(4, 'big'))
        return header_bytes + body_bytes

    @staticmethod
    def decode(data: bytes) -> 'CmppPdu':
        header = CmppHeader(total_length=int.from_bytes(data[0:4], 'big'),
                            command_id=int.from_bytes(data[4:8], 'big'),
                            sequence_id=int.from_bytes(data[8:12], 'big'))

        body_data = data[12:header.total_length]
        if header.command_id == CONNECT_RESP_ID:
            body = CmppConnectResp.decode(body_data)
        else:
            raise NotImplementedError("not implemented yet.")

        return CmppPdu(header=header, body=body)

asyncio tcp流相关代码

class CmppClient:
    def __init__(self, host: str, port: int):
        self.host = host
        self.port = port
        self.sequence_id = BoundAtomic(1, 0x7FFFFFFF)
        self.reader = None
        self.writer = None

    async def connect(self):
        self.reader, self.writer = await asyncio.open_connection(self.host, self.port)

    async def close(self):
        if self.writer:
            self.writer.close()

实现同步的connect_ismg方法

    async def connect_ismg(self, request: CmppConnect):
        if self.writer is None or self.reader is None:
            raise ConnectionError("Client is not connected")
        sequence_id = self.sequence_id.next_val()
        header = CmppHeader(0, command_id=CONNECT_ID, sequence_id=sequence_id)
        pdu: CmppPdu = CmppPdu(header=header, body=request)
        self.writer.write(pdu.encode())
        await self.writer.drain()

        length_bytes = await self.reader.readexactly(4)
        response_length = int.from_bytes(length_bytes)

        response_data = await self.reader.readexactly(response_length)

        return CmppPdu.decode(response_data)

运行example,验证连接成功

async def main():
    client = CmppClient(host='localhost', port=7890)

    await client.connect()
    print("Connected to ISMG")

    connect_request = CmppConnect(
        source_addr='source_addr',
        authenticator_source=b'authenticator_source',
        version=0,
        timestamp=1122334455,
    )

    connect_response = await client.connect_ismg(connect_request)
    print(f"Connect response: {connect_response}")

    await client.close()
    print("Connection closed")

asyncio.run(main())

image.png

相关开源项目

总结

本文简单对CMPP协议进行了介绍,并尝试用python实现协议栈,但实际商用发送短信往往更加复杂,面临诸如流控、运营商对接、传输层安全等问题,可以选择华为云消息&短信(Message & SMS)服务通过http协议接入华为云短信服务是华为云携手全球多家优质运营商和渠道,为企业用户提供的通信服务。企业调用API或使用群发助手,即可使用验证码、通知短信服务。

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。