为量化交易系统接入实时期货数据接口(WebSocket)
在量化交易系统中,实时行情数据是最核心的基础设施之一。相比 REST API 轮询,WebSocket 可以提供:
- 低延迟推送
- 实时成交明细
- 实时盘口深度
- 实时 K 线更新
- 持久连接 + 心跳机制
本文将围绕一个Infoway API的期货实时行情接口,教你如何构建一个稳定、可自动重连、带心跳机制的实时期货数据接入模块。
我们先来看看整体的接入示例:
import json
import time
import schedule
import threading
import websocket
from loguru import logger
# 申请API KEY: www.infoway.io
class WebsocketExample:
def __init__(self):
self.session = None
self.ws_url = "wss://data.infoway.io/ws?business=common&apikey=yourApikey"
self.reconnecting = False
self.is_ws_connected = False # 添加连接状态标志
def connect_all(self):
"""建立WebSocket连接并启动自动重连机制"""
try:
self.connect(self.ws_url)
self.start_reconnection(self.ws_url)
except Exception as e:
logger.error(f"Failed to connect to {self.ws_url}: {str(e)}")
def start_reconnection(self, url):
"""启动定时重连检查"""
def check_connection():
if not self.is_connected():
logger.debug("Reconnection attempt...")
self.connect(url)
# 使用线程定期检查连接状态
schedule.every(10).seconds.do(check_connection)
def run_scheduler():
while True:
schedule.run_pending()
time.sleep(1)
threading.Thread(target=run_scheduler, daemon=True).start()
def is_connected(self):
"""检查WebSocket连接状态"""
return self.session and self.is_ws_connected
def connect(self, url):
"""建立WebSocket连接"""
try:
if self.is_connected():
self.session.close()
self.session = websocket.WebSocketApp(
url,
on_open=self.on_open,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close
)
# 启动WebSocket连接(非阻塞模式)
threading.Thread(target=self.session.run_forever, daemon=True).start()
except Exception as e:
logger.error(f"Failed to connect to the server: {str(e)}")
def on_open(self, ws):
"""WebSocket连接建立成功后的回调"""
logger.info(f"Connection opened")
self.is_ws_connected = True # 设置连接状态为True
try:
# 发送实时成交明细订阅请求
trade_send_obj = {
"code": 10000,
"trace": "01213e9d-90a0-426e-a380-ebed633cba7a",
"data": {"codes": "XAGUSD"}
}
self.send_message(trade_send_obj)
# 不同请求之间间隔一段时间
time.sleep(5)
# 发送实时盘口数据订阅请求
depth_send_obj = {
"code": 10003,
"trace": "01213e9d-90a0-426e-a380-ebed633cba7a",
"data": {"codes": "XAGUSD"}
}
self.send_message(depth_send_obj)
# 不同请求之间间隔一段时间
time.sleep(5)
# 发送实时K线数据订阅请求
kline_data = {
"arr": [
{
"type": 1,
"codes": "XAGUSD"
}
]
}
kline_send_obj = {
"code": 10006,
"trace": "01213e9d-90a0-426e-a380-ebed633cba7a",
"data": kline_data
}
self.send_message(kline_send_obj)
# 启动定时心跳任务
schedule.every(30).seconds.do(self.ping)
except Exception as e:
logger.error(f"Error sending initial messages: {str(e)}")
def on_message(self, ws, message):
"""接收消息的回调"""
try:
logger.info(f"Message received: {message}")
except Exception as e:
logger.error(f"Error processing message: {str(e)}")
def on_close(self, ws, close_status_code, close_msg):
"""连接关闭的回调"""
logger.info(f"Connection closed: {close_status_code} - {close_msg}")
self.is_ws_connected = False # 设置连接状态为False
def on_error(self, ws, error):
"""错误处理的回调"""
logger.error(f"WebSocket error: {str(error)}")
self.is_ws_connected = False # 发生错误时设置连接状态为False
def send_message(self, message_obj):
"""发送消息到WebSocket服务器"""
if self.is_connected():
try:
self.session.send(json.dumps(message_obj))
except Exception as e:
logger.error(f"Error sending message: {str(e)}")
else:
logger.warning("Cannot send message: Not connected")
def ping(self):
"""发送心跳包"""
ping_obj = {
"code": 10010,
"trace": "01213e9d-90a0-426e-a380-ebed633cba7a"
}
self.send_message(ping_obj)
# 使用示例
if __name__ == "__main__":
ws_client = WebsocketExample()
ws_client.connect_all()
# 保持主线程运行
try:
while True:
schedule.run_pending()
time.sleep(1)
except KeyboardInterrupt:
logger.info("Exiting...")
if ws_client.is_connected():
ws_client.session.close()
一、整体架构设计思路
一个成熟的实时行情接入模块应该具备:
- WebSocket 持久连接
- 自动重连机制
- 订阅多种数据类型
- 心跳保活
- 消息处理分发
- 可扩展对接交易策略
在我们上面的示例中已经包含了这些核心能力,接下来我们将逐步拆解。
二、环境准备
安装依赖
pip install websocket-client schedule loguru
说明:
websocket-client:WebSocket通信
schedule:定时任务
loguru:日志系统
三、第一步:建立 WebSocket 连接
核心代码:
self.session = websocket.WebSocketApp(
url,
on_open=self.on_open,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close
)
关键点说明:
WebSocketApp是事件驱动模型- 通过回调函数处理不同事件
- 使用线程启动连接,避免阻塞主线程
threading.Thread(target=self.session.run_forever, daemon=True).start()
四、第二步:连接成功后订阅数据
在 on_open() 中发送订阅请求。
示例订阅了三种数据:
实时成交明细
trade_send_obj = {
"code": 10000,
"trace": "uuid",
"data": {"codes": "XAGUSD"}
}
实时盘口深度
depth_send_obj = {
"code": 10003,
"trace": "uuid",
"data": {"codes": "XAGUSD"}
}
实时K线数据
kline_send_obj = {
"code": 10006,
"trace": "uuid",
"data": {
"arr": [
{"type": 1, "codes": "XAGUSD"}
]
}
}
注意:
- 每次发送之间间隔 5 秒
- 避免服务器限流
trace建议使用真实 UUID
五、第三步:处理实时数据
所有服务器推送 的数据都会进入:
def on_message(self, ws, message):
目前示例只是打印:
logger.info(f"Message received: {message}")
实战建议:接入量化系统
在真实交易系统中,你应该:
data = json.loads(message)
if data["code"] == 20000:
self.process_trade(data)
elif data["code"] == 20003:
self.process_depth(data)
elif data["code"] == 20006:
self.process_kline(data)
然后:
- 更新本地行情缓存
- 触发策略计算
- 写入内存数据库(Redis)
- 推送给内部撮合系统
六、第四步:自动重连机制
生产级行情系统必须具备自动重连能力。
示例通过:
schedule.every(10).seconds.do(check_connection)
每 10 秒检查一次:
if not self.is_connected():
self.connect(url)
为什么要自己做重连?
因为:
- 网络抖动
- 服务器重启
- 云厂商丢包
- 运营商重置连接
这些都会导致连接断开。
七、第五步:心跳机制(防止服务器踢掉连接)
很多行情服务器要求:如果客户端长时间不发数据,就会断开连接。
schedule.every(30).seconds.do(self.ping)
发送心跳:
ping_obj = {
"code": 10010,
"trace": "uuid"
}
为什么不能依赖 WebSocket 内建 ping?
- 有些行情系统使用业务层心跳
- 只认自定义 ping code
- 不认底层 TCP ping
八、第六步:改造成真正可用的量化行情模块
下面是推荐升级结构。
增加线程安全缓存
from collections import defaultdict
import threading
self.market_cache = defaultdict(dict)
self.lock = threading.Lock()
在 on_message 中:
with self.lock:
self.market_cache[symbol] = data
提供对策略的接口
def get_latest_price(self, symbol):
with self.lock:
return self.market_cache.get(symbol)
策略层调用:
price = ws_client.get_latest_price("XAGUSD")
改造成独立模块
目录结构建议:
quant_system/
│
├── data_feed/
│ ├── websocket_client.py
│
├── strategy/
│ ├── trend_strategy.py
│
├── engine/
│ ├── backtest_engine.py
│
├── main.py
九、生产环境必须注意的 8 个问题
1. 不能只开一个线程
推荐使用 asyncio 或多进程隔离策略计算。
2. 不要在 on_message 里做复杂计算
否则:
- 会阻塞消息接收
- 导致行情堆积
推荐:
- 使用队列(queue)
- 单独策略线程消费
3. 要做数据校验
if "code" not in data:
return
4. 做异常保护
防止 JSON 解析异常导致线程崩溃。
5. 做订阅恢复
重连成功后必须重新订阅。
6. 做流量控制
不要同时订阅几百个合约,或高频发订阅请求
7. 监控连接状态
建议Prometheus或日志告警
8. 做多数据源 冗余
专业量化系统通常两个行情源,主备切换
十、完整运行方式
if __name__ == "__main__":
ws_client = WebsocketExample()
ws_client.connect_all()
while True:
schedule.run_pending()
time.sleep(1)
十一、接入到完整量化交易系统的流程
1️⃣ 接入行情
2️⃣ 本地缓存
3️⃣ 触发策略
4️⃣ 生成信号
5️⃣ 调用交易 API
6️⃣ 风控检查
7️⃣ 下单
十二、进阶升级建议
如果你要做真正专业级系统,建议:
- 使用 asyncio 重写
- 使用 Redis 做行情缓存
- 使用 Kafka 做数据分发
- 做行情持久化
- 做回放系统
- 使用 Docker 部署
- 做高可用容灾
- 点赞
- 收藏
- 关注作者
评论(0)