Python量化数据仓库搭建3:数据落库代码封装
一、恒有数(UDATA)操作简介
1、获取Token
A、在恒有数官网(https://udata.hs.net)注册并登录,在订阅页面,下单免费的体验套餐;
B、在右上角,头像下拉菜单中,进入总览页面,复制Token;
C、在数据页面,查看数据接口文档,获取接口名称、请求参数、返回参数和Python代码示例;
2、安装hs_udata
pip install hs_udata
使用示例如下:
import hs_udata as hs
# 设置Token
hs.set_token(token = 'xxxxxxxxxxxxxxx') # 总览页面获取个人Token
# 以分钟线行情为例,获取000001.SZ在2021-05-01至2021-06-01期间的分钟线数据
# 接口文档见:https://udata.hs.net/datas/342/
df = hs.stock_quote_minutes(en_prod_code="000001.SZ",begin_date="20210501",end_date="20210601")
# 增加股票代码列
df['hs_code']='000001.SZ'
df.head()
其余接口使用过程与之类似;
二、数据落库示例
以股票列表(stock_list)为例,讲解建表、落库、查询等操作;全套代码见本文第三章;
1、准备工作
(1)在MySQL数据库中,创建数据库udata,创建过程见第一讲《Python量化数据仓库搭建系列1:数据库安装与操作》;
(2)在MySQL数据库中,创建数据更新记录表udata.tb_update_records,表结构如下:
建表SQL如下:
CREATE TABLE udata.tb_update_records (
table_name CHAR(40),
data_date CHAR(20),
update_type CHAR(20),
data_number INT(20),
elapsed_time INT(20),
updatetime CHAR(20)
)
(3)将Token与数据库参数写入配置文件DB_MySQL.config,文件内容如下:
[udata]
token='你的Token'
host='127.0.0.1'
port=3306
user='root'
passwd='密码'
db='udata'
(4)读取配置文件中的参数
import configparser
# 读取配置文件中,恒有数和数据库参数
configFilePath = 'DB_MySQL.config'
section = 'udata'
config = configparser.ConfigParser()
config.read(configFilePath)
# 读取 恒有数(UData) 的 token
token = eval(config.get(section=section, option='token'))
# MySQL连接参数读取
host = eval(config.get(section=section, option='host'))
port = int(config.get(section=section, option='port'))
db = eval(config.get(section=section, option='db'))
user = eval(config.get(section=section, option='user'))
passwd = eval(config.get(section=section, option='passwd'))
2、建表
在社区数据页面,查看数据表返回参数字段,以股票列表(stock_list,https://udata.hs.net/datas/202/)为例:
将建表、删除表格、清空数据的SQL,写入配置文件DB_Table.config,文件内容如下:
[tb_stock_list]
DROP_TABLE='DROP TABLE IF EXISTS tb_stock_list'
CREATE_TABLE='''CREATE TABLE tb_stock_list (
secu_code CHAR(20),
hs_code CHAR(20),
secu_abbr CHAR(20),
chi_name CHAR(40),
secu_market CHAR(20),
listed_state CHAR(20),
listed_sector CHAR(20),
updatetime CHAR(20)
)'''
DELETE_DATA = 'truncate table tb_stock_list'
建表代码如下:
# MySQL操作实例化,pymysql连接设置,可打开、关闭、执行sql、执行sql读取数据
MySQL = MySQLOperation(host, port, db, user, passwd)
# 读取配置文件中,恒有数和数据库参数
configFilePath = 'DB_Table.config'
section = 'tb_stock_list'
config = configparser.ConfigParser()
config.read(configFilePath)
DROP_TABLE = eval(config.get(section=section, option='DROP_TABLE'))
CREATE_TABLE = eval(config.get(section=section, option='CREATE_TABLE'))
DELETE_DATA = eval(config.get(section=section, option='DELETE_DATA'))
# 删除表结构
MySQL.Execute_Code(DROP_TABLE)
# 创建表结构
MySQL.Execute_Code(CREATE_TABLE)
3、数据落库
import hs_udata as hs
from sqlalchemy import create_engine
from datetime import datetime
# 设置token
hs.set_token(token)
# 获取 股票列表 数据
df = hs.stock_list()
# 在最后一列增加系统时间戳
dt = datetime.now()
df['updatetime'] = dt.strftime('%Y-%m-%d %H:%M:%S')
# 由于股票列表数据为全量更新,数据插入之前,先清空表中数据
MySQL.Execute_Code(DELETE_DATA)
# sqlalchemy 连接设置,可用于pandas.read_sql、pandas.to_sql
engine = create_engine('mysql://{0}:{1}@{2}:{3}/{4}?charset=utf8'.format(user,passwd,host,port,db))
# 将数据写入到MySQL中的数据表
df.to_sql(name='tb_stock_list', con=engine, index=False, if_exists='append')
在数据库中查看结果如下:
4、读取数据
import pandas as pd
result = pd.read_sql('''SELECT * FROM tb_stock_list ''', con=engine)
print(result.head())
三、落库代码封装
将上述几步操作,封装到一起,定义并调用python中class类的属性和方法。代码中涉及主要技术点如下:
(1)使用pymysql、pandas.to_sql和pandas.read_sql操作MySQL数据库;
(2)使用class类的方法,集成建表、插入数据和查询数据的操作;
(3)使用配置文件的方式,从本地文件中,读取数据库参数与表操作的SQL代码;
(4)使用try容错机制,结合日志函数,将执行日志打印到本地的DB_MySQL_LOG.txt文件;
import pandas as pd
import hs_udata as hs
from MySQLOperation import *
from sqlalchemy import create_engine
from datetime import datetime
import time
import configparser
import logging
import traceback
import warnings
warnings.filterwarnings("ignore")
class TB_Stock_List:
def __init__(self,MySQL_Config,BD_Name,Table_Config,Table_Name):
# 创建日志
self.logging = logging
self.logging.basicConfig(filename='DB_MySQL_LOG.txt', level=self.logging.DEBUG
, format='%(asctime)s - %(levelname)s - %(message)s')
# 读取配置文件中,恒有数和数据库参数
configFilePath = MySQL_Config
self.section1 = BD_Name
config = configparser.ConfigParser()
config.read(configFilePath)
# 读取 恒有数(UData) 的 token
self.token = eval(config.get(section=self.section1, option='token'))
# MySQL连接参数读取
self.host = eval(config.get(section=self.section1, option='host'))
self.port = int(config.get(section=self.section1, option='port'))
self.db = eval(config.get(section=self.section1, option='db'))
self.user = eval(config.get(section=self.section1, option='user'))
self.passwd = eval(config.get(section=self.section1, option='passwd'))
# pymysql连接设置,可打开、关闭、执行sql、执行sql读取数据
self.MySQL = MySQLOperation(self.host, self.port, self.db, self.user, self.passwd)
# sqlalchemy 连接设置,可用于pandas.read_sql、pandas.to_sql
self.engine = create_engine('mysql://{0}:{1}@{2}:{3}/{4}?charset=utf8'.format(self.user
, self.passwd
, self.host
, self.port
, self.db))
# 读取配置文件中,恒有数和数据库参数
configFilePath = Table_Config
self.section2 = Table_Name
config = configparser.ConfigParser()
config.read(configFilePath)
self.DROP_TABLE_SQL = eval(config.get(section=self.section2, option='DROP_TABLE'))
self.CREATE_TABLE_SQL = eval(config.get(section=self.section2, option='CREATE_TABLE'))
self.DELETE_DATA_SQL = eval(config.get(section=self.section2, option='DELETE_DATA'))
self.logging.info('*********************{0}.{1}*********************'.format(self.section1, self.section2))
def CREATE_TABLE(self):
try:
# 删除表结构
self.MySQL.Execute_Code('SET FOREIGN_KEY_CHECKS = 0')
self.MySQL.Execute_Code(self.DROP_TABLE_SQL)
# 创建表结构
self.MySQL.Execute_Code(self.CREATE_TABLE_SQL)
self.logging.info('表{0}.{1},表格创建成功'.format(self.section1,self.section2))
except:
self.logging.info('表{0}.{1},表格创建失败'.format(self.section1,self.section2))
self.logging.debug(traceback.format_exc())
def UPDATE_DATA(self):
try:
# 设置token
hs.set_token(self.token)
time_start = time.time() # 计时
# 获取 股票列表 数据
df = hs.stock_list()
# 在最后一列增加系统时间戳
dt = datetime.now()
df['updatetime'] = dt.strftime('%Y-%m-%d %H:%M:%S')
# 由于股票列表数据为全量更新,数据插入之前,先清空表中数据
self.MySQL.Execute_Code(self.DELETE_DATA_SQL)
# 将数据写入到MySQL中的数据表
df.to_sql(name='tb_stock_list', con=self.engine, index=False, if_exists='append')
time_end = time.time() # 计时
elapsed_time = round(time_end-time_start,2)
# 向mysql库中记录一条数据更新记录:表名,数据日期,更新方式,更新条数,更新耗时,系统时间
self.RECORDS_SQL = '''INSERT INTO udata.tb_update_records
VALUES ('{0}','{1}','全量',{2},{3}, SYSDATE())'''.format(self.section2
,dt.strftime('%Y-%m-%d'),len(df),elapsed_time)
self.MySQL.Execute_Code(self.RECORDS_SQL)
self.logging.info('表{0}.{1},数据更新成功'.format(self.section1,self.section2))
except:
self.logging.info('表{0}.{1},数据更新失败'.format(self.section1,self.section2))
self.logging.debug(traceback.format_exc())
def READ_DATA(self, WhereCondition=''):
try:
result = pd.read_sql('''SELECT * FROM {0}.{1} '''.format(self.section1,self.section2)
+ WhereCondition, con=self.engine)
self.logging.info('表{0}.{1},数据读取成功'.format(self.section1,self.section2))
return result
except:
self.logging.info('表{0}.{1},数据读取失败'.format(self.section1,self.section2))
self.logging.debug(traceback.format_exc())
return 0
if __name__ == '__main__':
MySQL_Config = 'DB_MySQL.config'
BD_Name = 'udata'
Table_Config = 'DB_Table.config'
Table_Name = 'tb_stock_list'
# 实例化
TB_Stock_List_Main = TB_Stock_List(MySQL_Config,BD_Name,Table_Config,Table_Name)
# 创建表结构
TB_Stock_List_Main.CREATE_TABLE()
# 更新数据
TB_Stock_List_Main.UPDATE_DATA()
# 读取数据
data = TB_Stock_List_Main.READ_DATA()
四、代码及配置文件下载
(1)附件文件目录:
DB_MySQL.config、DB_Table.config、MySQLOperation.py、TB_Stock_List.py
(2)下载附件文件后,将配置文件DB_MySQL.config中的Token与数据库参数,修改为自己的参数;将代码放置在同一目录下执行。
- 点赞
- 收藏
- 关注作者
评论(0)