利用 WebSocket 和 Telegram Bot 监控特斯拉股价

举报
yd_254103451 发表于 2025/06/19 10:06:36 2025/06/19
【摘要】 今天和大家分享一个最近做的小项目,通过Websocket获取实时股票价格,并结合 Telegram Bot 实现实时价格预警。你可能会问,为什么监控特斯拉的股票?好吧,因为我套在里面了。。。当然这个脚本可以根据你自己的喜好来调整,只要上游接口支持的数据,都可以监控。下面是代码:import asyncioimport jsonimport websocketsimport requestsf...

今天和大家分享一个最近做的小项目,通过Websocket获取实时股票价格,并结合 Telegram Bot 实现实时价格预警。你可能会问,为什么监控特斯拉的股票?好吧,因为我套在里面了。。。当然这个脚本可以根据你自己的喜好来调整,只要上游接口支持的数据,都可以监控。

下面是代码:

import asyncio
import json
import websockets
import requests
from telegram import Bot
 
# WebSocket的订阅地址,这里使用的是infoway.io的数据,可以根据你自己的情况调整
WS_URL = "wss://data.infoway.io/ws?business=crypto&apikey=yourApikey"
 
# Telegram Bot Token 和 Chat ID
TELEGRAM_TOKEN = 'your_telegram_bot_token'
CHAT_ID = 'your_telegram_chat_id'
 
# 设置预警价格阈值(根据你的实际情况调整)
ALERT_THRESHOLD = 700  # 当特斯拉股价超过 700 时发送预警
 
# 创建 Telegram Bot 实例
bot = Bot(token=TELEGRAM_TOKEN)
 
# 发送 Telegram 消息
def send_telegram_message(message):
    bot.send_message(chat_id=CHAT_ID, text=message)
 
# WebSocket 连接和接收数据
async def connect_and_receive():
    async with websockets.connect(WS_URL) as websocket:
        # 发送初始订阅请求
        init_message = {
            "code": 10000,
            "trace": "01213e9d-90a0-426e-a380-ebed633cba7a",
            "data": {"codes": "TSLA.US"}  # 订阅特斯拉股票
        }
        await websocket.send(json.dumps(init_message))
 
        # 设置 ping 心跳任务,防止ws断开
        async def send_ping():
            while True:
                await asyncio.sleep(30)
                ping_message = {
                    "code": 10010,
                    "trace": "01213e9d-90a0-426e-a380-ebed633cba7a"
                }
                await websocket.send(json.dumps(ping_message))
 
        # 启动 ping 心跳任务
        ping_task = asyncio.create_task(send_ping())
 
        try:
            # 持续接收股票数据
            while True:
                message = await websocket.recv()
                data = json.loads(message)
 
                # 从上游接口返回的数据中提取价格信息
                if "data" in data and "price" in data["data"]:
                    price = data["data"]["price"]
                    print(f"Current TSLA Price: {price}")
 
                    # 检查是否超过预警阈值
                    if price > ALERT_THRESHOLD:
                        alert_message = f"⚠️ Pre-warning: TSLA stock price exceeded {ALERT_THRESHOLD}. Current price: ${price}."
                        send_telegram_message(alert_message)
                else:
                    print("Error: Data format is not as expected")
 
        except websockets.exceptions.ConnectionClosedOK:
            print("Connection closed normally")
        finally:
            # 取消 ping 任务
            ping_task.cancel()
 
# 运行主函数
asyncio.run(connect_and_receive())
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。