【教程】使用加密货币行情接口 - 查询比特币实时价格
【摘要】 官方文档地址import timeimport requests # pip3 install requestsimport json # Extra headerstest_headers = { 'Content-Type': 'application/json'} '''github:https://github.com/alltick/realtime-forex-crypt...
import time
import requests # pip3 install requests
import json
# Extra headers
test_headers = {
'Content-Type': 'application/json'
}
'''
github:https://github.com/alltick/realtime-forex-crypto-stock-tick-finance-websocket-api
申请免费token:https://alltick.co/register
官网:https://alltick.co
备用地址: https://alltick.io
将如下JSON进行url的encode,复制到http的查询字符串的query字段里
{"trace" : "python_http_test1","data" : {"code" : "BTCUSD","kline_type" : 1,"kline_timestamp_end" : 0,"query_kline_num" : 1,"adjust_type": 0}}
'''
# Modify the URL to request BTCUSD (Bitcoin to USD) real-time price
test_url = 'https://quote.aatest.online/quote-b-api/kline?token=3662a972-1a5d-4bb1-88b4-66ca0c402a03-1688712831841&query=%7B%22trace%22%20%3A%20%22python_http_test1%22%2C%22data%22%20%3A%20%7B%22code%22%20%3A%20%22BTCUSD%22%2C%22kline_type%22%20%3A%201%2C%22kline_timestamp_end%22%20%3A%200%2C%22query_kline_num%22%20%3A%201%2C%22adjust_type%22%3A%200%7D%7D'
resp = requests.get(url=test_url, headers=test_headers)
# Decoded text returned by the request
text = resp.text
print(text)
Websocket订阅BTC价格
import json
import websocket # pip install websocket-client
'''
# Special Note:
# GitHub: https://github.com/alltick/realtime-forex-crypto-stock-tick-finance-websocket-api
# Token Application: https://alltick.co
# 备用地址: https://alltick.io
# Replace "testtoken" in the URL below with your own token
# API addresses for forex, cryptocurrencies, and precious metals:
# wss://quote.tradeswitcher.com/quote-b-ws-api
# Stock API address:
# wss://quote.tradeswitcher.com/quote-stock-b-ws-api
'''
class Feed(object):
def __init__(self):
# WebSocket URL for forex, cryptocurrencies, and precious metals (use your token)
self.url = 'wss://quote.tradeswitcher.com/quote-b-ws-api?token=testtoken'
self.ws = None
def on_open(self, ws):
"""
Callback object which is called at opening websocket.
1 argument:
@ ws: the WebSocketApp object
"""
print('A new WebSocketApp is opened!')
# Start subscribing to BTCUSD (Bitcoin to USD)
sub_param = {
"cmd_id": 22002,
"seq_id": 123,
"trace": "3baaa938-f92c-4a74-a228-fd49d5e2f8bc-1678419657806",
"data": {
"symbol_list": [
{
"code": "BTCUSD", # Subscribe to BTCUSD (Bitcoin to USD)
"depth_level": 5,
}
]
}
}
# Send the subscription message
sub_str = json.dumps(sub_param)
ws.send(sub_str)
print("BTCUSD (Bitcoin to USD) depth quote subscribed!")
def on_data(self, ws, string, type, continue_flag):
"""
4 arguments.
The 1st argument is this class object.
The 2nd argument is utf-8 string which we get from the server.
The 3rd argument is data type. ABNF.OPCODE_TEXT or ABNF.OPCODE_BINARY will be came.
The 4th argument is continue flag. If 0, the data continue
"""
def on_message(self, ws, message):
"""
Callback object which is called when received data.
2 arguments:
@ ws: the WebSocketApp object
@ message: utf-8 data received from the server
"""
# Parse the received message
result = eval(message)
print(result)
def on_error(self, ws, error):
"""
Callback object which is called when got an error.
2 arguments:
@ ws: the WebSocketApp object
@ error: exception object
"""
print(error)
def on_close(self, ws, close_status_code, close_msg):
"""
Callback object which is called when the connection is closed.
2 arguments:
@ ws: the WebSocketApp object
@ close_status_code
@ close_msg
"""
print('The connection is closed!')
def start(self):
self.ws = websocket.WebSocketApp(
self.url,
on_open=self.on_open,
on_message=self.on_message,
on_data=self.on_data,
on_error=self.on_error,
on_close=self.on_close,
)
self.ws.run_forever()
if __name__ == "__main__":
feed = Feed()
feed.start()
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)