利用 WebSocket 和 Telegram Bot 监控特斯拉股价
【摘要】 今天和大家分享一个最近做的小项目,通过Websocket获取实时股票价格,并结合 Telegram Bot 实现实时价格预警。你可能会问,为什么监控特斯拉的股票?好吧,因为我套在里面了。。。当然这个脚本可以根据你自己的喜好来调整,只要上游接口支持的数据,都可以监控。下面是代码:import asyncioimport jsonimport websocketsimport requestsf...
今天和大家分享一个最近做的小项目,通过Websocket获取实时股票价格,并结合 Telegram Bot 实现实时价格预警。你可能会问,为什么监控特斯拉的股票?好吧,因为我套在里面了。。。当然这个脚本可以根据你自己的喜好来调整,只要上游接口支持的数据,都可以监控。
下面是代码:
import asyncio
import json
import websockets
import requests
from telegram import Bot
# WebSocket的订阅地址,这里使用的是infoway.io的数据,可以根据你自己的情况调整
WS_URL = "wss://data.infoway.io/ws?business=crypto&apikey=yourApikey"
# Telegram Bot Token 和 Chat ID
TELEGRAM_TOKEN = 'your_telegram_bot_token'
CHAT_ID = 'your_telegram_chat_id'
# 设置预警价格阈值(根据你的实际情况调整)
ALERT_THRESHOLD = 700 # 当特斯拉股价超过 700 时发送预警
# 创建 Telegram Bot 实例
bot = Bot(token=TELEGRAM_TOKEN)
# 发送 Telegram 消息
def send_telegram_message(message):
bot.send_message(chat_id=CHAT_ID, text=message)
# WebSocket 连接和接收数据
async def connect_and_receive():
async with websockets.connect(WS_URL) as websocket:
# 发送初始订阅请求
init_message = {
"code": 10000,
"trace": "01213e9d-90a0-426e-a380-ebed633cba7a",
"data": {"codes": "TSLA.US"} # 订阅特斯拉股票
}
await websocket.send(json.dumps(init_message))
# 设置 ping 心跳任务,防止ws断开
async def send_ping():
while True:
await asyncio.sleep(30)
ping_message = {
"code": 10010,
"trace": "01213e9d-90a0-426e-a380-ebed633cba7a"
}
await websocket.send(json.dumps(ping_message))
# 启动 ping 心跳任务
ping_task = asyncio.create_task(send_ping())
try:
# 持续接收股票数据
while True:
message = await websocket.recv()
data = json.loads(message)
# 从上游接口返回的数据中提取价格信息
if "data" in data and "price" in data["data"]:
price = data["data"]["price"]
print(f"Current TSLA Price: {price}")
# 检查是否超过预警阈值
if price > ALERT_THRESHOLD:
alert_message = f"⚠️ Pre-warning: TSLA stock price exceeded {ALERT_THRESHOLD}. Current price: ${price}."
send_telegram_message(alert_message)
else:
print("Error: Data format is not as expected")
except websockets.exceptions.ConnectionClosedOK:
print("Connection closed normally")
finally:
# 取消 ping 任务
ping_task.cancel()
# 运行主函数
asyncio.run(connect_and_receive())
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)