为量化交易系统接入实时期货数据接口(WebSocket)

举报
yd_232559543 发表于 2026/03/03 11:31:19 2026/03/03
【摘要】 ​在量化交易系统中,实时行情数据是最核心的基础设施之一。相比 REST API 轮询,WebSocket 可以提供:低延迟推送实时成交明细实时盘口深度实时 K 线更新持久连接 + 心跳机制本文将围绕一个Infoway API的期货实时行情接口,教你如何构建一个稳定、可自动重连、带心跳机制的实时期货数据接入模块。我们先来看看整体的接入示例:import jsonimport timeimpor...


在量化交易系统中,实时行情数据是最核心的基础设施之一。相比 REST API 轮询,WebSocket 可以提供:

  • 低延迟推送
  • 实时成交明细
  • 实时盘口深度
  • 实时 K 线更新
  • 持久连接 + 心跳机制

本文将围绕一个Infoway API的期货实时行情接口,教你如何构建一个稳定、可自动重连、带心跳机制的实时期货数据接入模块。

我们先来看看整体的接入示例:

import json
import time
import schedule
import threading
import websocket
from loguru import logger
 
# 申请API KEY: www.infoway.io
class WebsocketExample:
    def __init__(self):
        self.session = None
        self.ws_url = "wss://data.infoway.io/ws?business=common&apikey=yourApikey"
        self.reconnecting = False
        self.is_ws_connected = False  # 添加连接状态标志
 
    def connect_all(self):
        """建立WebSocket连接并启动自动重连机制"""
        try:
            self.connect(self.ws_url)
            self.start_reconnection(self.ws_url)
        except Exception as e:
            logger.error(f"Failed to connect to {self.ws_url}: {str(e)}")
 
    def start_reconnection(self, url):
        """启动定时重连检查"""
        def check_connection():
            if not self.is_connected():
                logger.debug("Reconnection attempt...")
                self.connect(url)
        
        # 使用线程定期检查连接状态
        schedule.every(10).seconds.do(check_connection)
        def run_scheduler():
            while True:
                schedule.run_pending()
                time.sleep(1)
        threading.Thread(target=run_scheduler, daemon=True).start()
 
    def is_connected(self):
        """检查WebSocket连接状态"""
        return self.session and self.is_ws_connected
 
    def connect(self, url):
        """建立WebSocket连接"""
        try:
            if self.is_connected():
                self.session.close()
            
            self.session = websocket.WebSocketApp(
                url,
                on_open=self.on_open,
                on_message=self.on_message,
                on_error=self.on_error,
                on_close=self.on_close
            )
            
            # 启动WebSocket连接(非阻塞模式)
            threading.Thread(target=self.session.run_forever, daemon=True).start()
        except Exception as e:
            logger.error(f"Failed to connect to the server: {str(e)}")
 
    def on_open(self, ws):
        """WebSocket连接建立成功后的回调"""
        logger.info(f"Connection opened")
        self.is_ws_connected = True  # 设置连接状态为True
        
        try:
            # 发送实时成交明细订阅请求
            trade_send_obj = {
                "code": 10000,
                "trace": "01213e9d-90a0-426e-a380-ebed633cba7a",
                "data": {"codes": "XAGUSD"}
            }
            self.send_message(trade_send_obj)
            
            # 不同请求之间间隔一段时间
            time.sleep(5)
            
            # 发送实时盘口数据订阅请求
            depth_send_obj = {
                "code": 10003,
                "trace": "01213e9d-90a0-426e-a380-ebed633cba7a",
                "data": {"codes": "XAGUSD"}
            }
            self.send_message(depth_send_obj)
            
            # 不同请求之间间隔一段时间
            time.sleep(5)
            
            # 发送实时K线数据订阅请求
            kline_data = {
                "arr": [
                    {
                        "type": 1,
                        "codes": "XAGUSD"
                    }
                ]
            }
            kline_send_obj = {
                "code": 10006,
                "trace": "01213e9d-90a0-426e-a380-ebed633cba7a",
                "data": kline_data
            }
            self.send_message(kline_send_obj)
            
            # 启动定时心跳任务
            schedule.every(30).seconds.do(self.ping)
            
        except Exception as e:
            logger.error(f"Error sending initial messages: {str(e)}")
 
    def on_message(self, ws, message):
        """接收消息的回调"""
        try:
            logger.info(f"Message received: {message}")
        except Exception as e:
            logger.error(f"Error processing message: {str(e)}")
 
    def on_close(self, ws, close_status_code, close_msg):
        """连接关闭的回调"""
        logger.info(f"Connection closed: {close_status_code} - {close_msg}")
        self.is_ws_connected = False  # 设置连接状态为False
 
    def on_error(self, ws, error):
        """错误处理的回调"""
        logger.error(f"WebSocket error: {str(error)}")
        self.is_ws_connected = False  # 发生错误时设置连接状态为False
 
    def send_message(self, message_obj):
        """发送消息到WebSocket服务器"""
        if self.is_connected():
            try:
                self.session.send(json.dumps(message_obj))
            except Exception as e:
                logger.error(f"Error sending message: {str(e)}")
        else:
            logger.warning("Cannot send message: Not connected")
 
    def ping(self):
        """发送心跳包"""
        ping_obj = {
            "code": 10010,
            "trace": "01213e9d-90a0-426e-a380-ebed633cba7a"
        }
        self.send_message(ping_obj)
 
# 使用示例
if __name__ == "__main__":
    ws_client = WebsocketExample()
    ws_client.connect_all()
    
    # 保持主线程运行
    try:
        while True:
            schedule.run_pending()
            time.sleep(1)
    except KeyboardInterrupt:
        logger.info("Exiting...")
        if ws_client.is_connected():
            ws_client.session.close()

一、整体架构设计思路

一个成熟的实时行情接入模块应该具备:

  • WebSocket 持久连接
  • 自动重连机制
  • 订阅多种数据类型
  • 心跳保活
  • 消息处理分发
  • 可扩展对接交易策略

在我们上面的示例中已经包含了这些核心能力,接下来我们将逐步拆解。

二、环境准备

安装依赖

pip install websocket-client schedule loguru

​说明:
websocket-client:WebSocket通信
schedule:定时任务
loguru:日志系统

三、第一步:建立 WebSocket 连接

核心代码:

self.session = websocket.WebSocketApp(
    url,
    on_open=self.on_open,
    on_message=self.on_message,
    on_error=self.on_error,
    on_close=self.on_close
)

关键点说明:

  • WebSocketApp 是事件驱动模型
  • 通过回调函数处理不同事件
  • 使用线程启动连接,避免阻塞主线程
threading.Thread(target=self.session.run_forever, daemon=True).start()

四、第二步:连接成功后订阅数据

on_open() 中发送订阅请求。

示例订阅了三种数据:

实时成交明细

trade_send_obj = {
    "code": 10000,
    "trace": "uuid",
    "data": {"codes": "XAGUSD"}
}

实时盘口深度

depth_send_obj = {
    "code": 10003,
    "trace": "uuid",
    "data": {"codes": "XAGUSD"}
}

实时K线数据

kline_send_obj = {
    "code": 10006,
    "trace": "uuid",
    "data": {
        "arr": [
            {"type": 1, "codes": "XAGUSD"}
        ]
    }
}

注意:

  • 每次发送之间间隔 5 秒
  • 避免服务器限流
  • trace 建议使用真实 UUID

五、第三步:处理实时数据

所有服务器推送 的数据都会进入:

def on_message(self, ws, message):

目前示例只是打印:

logger.info(f"Message received: {message}")

实战建议:接入量化系统

在真实交易系统中,你应该:

data = json.loads(message)
 
if data["code"] == 20000:
    self.process_trade(data)
 
elif data["code"] == 20003:
    self.process_depth(data)
 
elif data["code"] == 20006:
    self.process_kline(data)

然后:

  1. 更新本地行情缓存
  2. 触发策略计算
  3. 写入内存数据库(Redis)
  4. 推送给内部撮合系统

六、第四步:自动重连机制

生产级行情系统必须具备自动重连能力。

示例通过:

schedule.every(10).seconds.do(check_connection)

每 10 秒检查一次:

if not self.is_connected():
    self.connect(url)

为什么要自己做重连?

因为:

  • 网络抖动
  • 服务器重启
  • 云厂商丢包
  • 运营商重置连接

这些都会导致连接断开。

七、第五步:心跳机制(防止服务器踢掉连接)

很多行情服务器要求:如果客户端长时间不发数据,就会断开连接。

schedule.every(30).seconds.do(self.ping)

发送心跳:

ping_obj = {
    "code": 10010,
    "trace": "uuid"
}

为什么不能依赖 WebSocket 内建 ping?

  • 有些行情系统使用业务层心跳
  • 只认自定义 ping code
  • 不认底层 TCP ping

八、第六步:改造成真正可用的量化行情模块

下面是推荐升级结构。

增加线程安全缓存

from collections import defaultdict
import threading
 
self.market_cache = defaultdict(dict)
self.lock = threading.Lock()

on_message 中:

with self.lock:
    self.market_cache[symbol] = data

提供对策略的接口

def get_latest_price(self, symbol):
    with self.lock:
        return self.market_cache.get(symbol)

策略层调用:

price = ws_client.get_latest_price("XAGUSD")

改造成独立模块

目录结构建议:

quant_system/
│
├── data_feed/
│   ├── websocket_client.py
│
├── strategy/
│   ├── trend_strategy.py
│
├── engine/
│   ├── backtest_engine.py
│
├── main.py

九、生产环境必须注意的 8 个问题

1. 不能只开一个线程

推荐使用 asyncio 或多进程隔离策略计算。

2. 不要在 on_message 里做复杂计算

否则:

  • 会阻塞消息接收
  • 导致行情堆积

推荐:

  • 使用队列(queue)
  • 单独策略线程消费

3. 要做数据校验

if "code" not in data:
    return

4. 做异常保护

防止 JSON 解析异常导致线程崩溃。

5. 做订阅恢复

重连成功后必须重新订阅。

6. 做流量控制

不要同时订阅几百个合约,或高频发订阅请求

7. 监控连接状态

建议Prometheus或日志告警

8. 做多数据源 冗余

专业量化系统通常两个行情源,主备切换

十、完整运行方式

if __name__ == "__main__":
    ws_client = WebsocketExample()
    ws_client.connect_all()
 
    while True:
        schedule.run_pending()
        time.sleep(1)

十一、接入到完整量化交易系统的流程

1️⃣ 接入行情
2️⃣ 本地缓存
3️⃣ 触发策略
4️⃣ 生成信号
5️⃣ 调用交易 API
6️⃣ 风控检查
7️⃣ 下单

十二、进阶升级建议

如果你要做真正专业级系统,建议:

  • 使用 asyncio 重写
  • 使用 Redis 做行情缓存
  • 使用 Kafka 做数据分发
  • 做行情持久化
  • 做回放系统
  • 使用 Docker 部署
  • 做高可用容灾
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。