多元线性回归模型选股应用(α策略)
内容介绍
本文重点在于如何利用python收集各类型因子并进行预处理最终用于构建量化选股模型。
工具介绍
本代码所需要调用的包如下图所示:
import pandas as pd
import tushare as ts
pro = ts.pro_api()
import numpy as np
import time
import math
import statsmodels.api as sm
from sklearn import preprocessing
from sklearn.decomposition import PCA
import os
这里需要用到的是python中的pandas和statsmodels模块,分别用于数据处理和做多元回归。
另外,还需要获取股票和指数的各项数据,这里所用到的是tushare,tushare拥有丰富的数据内容,如股票、基金等行情数据,公司财务理等基本面数据。通过tushare平台赚取一定的积分可以免费获取平台提供的数据。(个人ID:419382)
数据获取
本段代码展示了如何通过tushare获取相关数据并清洗成为所需因子,共展示了估值因子,成长因子,财务质量因子,杠杆因子,动量反转因子,波动率因子以及beta的构建。其中beta因子是取个股与指数收益率线性回归的回归系数。
def get_all_factors(codes_list,date,start,end,one_m_start,beta_start,beta_end):
#获取估值因子
evaluate_factors = pd.DataFrame()
for i in range(len(codes_list)):
#获取给定交易日的指标
df1 = pro.daily_basic(ts_code=codes_list[i],trade_date=date,fields="ts_code,trade_date,pe_ttm,pb,ps_ttm,dv_ttm")
evaluate_factors = evaluate_factors.append(df1)
time.sleep(0.4)
print("第%d支股票估值因子获取成功"%i)
#根据指标计算因子
evaluate_factors["EP"] = 1/evaluate_factors["pe_ttm"]
evaluate_factors["BP"] = 1/evaluate_factors["pb"]
evaluate_factors["SP"] = 1/evaluate_factors["ps_ttm"]
evaluate_factors["DP"] = evaluate_factors["dv_ttm"]
evaluate_factors = evaluate_factors[["ts_code","trade_date","EP","BP","SP","DP"]]
#获取成长因子
growth_factors = pd.DataFrame()
for i in range(len(codes_list)):
#获取给定日期最近的财报
df1 = pro.income(ts_code=codes_list[i],end_date=date,
fields="ts_code,end_date,revenue,n_income")
df1 = df1.drop_duplicates(subset=["end_date"])
#根据财报计算因子
df1["end_date"] = df1["end_date"].astype("int64")
df1 = df1.iloc[[0,4],:]
revenue = df1["revenue"].tolist()
n_income = df1["n_income"].tolist()
sales_G = revenue[0]/revenue[1] - 1
profit_G = n_income[0]/n_income[1] - 1
df2 = pro.fina_indicator(ts_code=codes_list[i],end_date=date)
df2 = df2.drop_duplicates(subset=["end_date"])
df2["end_date"] = df2["end_date"].astype("int64")
df2 = df2.iloc[[0,4],:]
roe = df2["roe"].tolist()
ROE_G = roe[0]/roe[1] - 1
df3 = pd.DataFrame({"ts_code":codes_list[i],"sales_G":sales_G,"profit_G":profit_G,"ROE_G":ROE_G},index=[20201231])
growth_factors = growth_factors.append(df3)
time.sleep(1.2)
print("第%d支股票成长因子获取成功"%i)
all_factors = evaluate_factors.merge(growth_factors)
#获取财务质量因子
quality_factors = pd.DataFrame()
for i in range(len(codes_list)):
#获取给定日期最近的财报
df1 = pro.fina_indicator(ts_code=codes_list[i],end_date=date)
df1["end_date"] = df1["end_date"].astype("int64")
df1 = df1.iloc[0,:]
df1 = df1[["ts_code","roe","assets_turn"]]
quality_factors = quality_factors.append(df1)
time.sleep(1)
print("第%d支股票财务质量获取成功"%i)
all_factors = all_factors.merge(quality_factors)
#获取杠杆因子
leverage_factors = pd.DataFrame()
for i in range(len(codes_list)):
df1 = pro.fina_indicator(ts_code=codes_list[i],end_date=date)
df1["end_date"] = df1["end_date"].astype("int64")
df1 = df1.iloc[0,:]
df1 = df1[["ts_code","debt_to_assets"]]
leverage_factors = leverage_factors.append(df1)
time.sleep(0.8)
print("第%d支股票杠杆因子获取成功"%i)
all_facotrs = all_factors.merge(leverage_factors)
#获取市值因子
capital_factors = pd.DataFrame()
for i in range(len(codes_list)):
df1 = pro.daily_basic(ts_code=codes_list[i],trade_date=date,fields="ts_code,total_mv")
df1["total_mv"] = df1["total_mv"].apply(lambda x:math.log(x))
capital_factors = capital_factors.append(df1)
time.sleep(0.5)
print("第%d支股票市值因子获取成功"%i)
all_factors = all_facotrs.merge(capital_factors)
#获取动量反转因子
return_factors = pd.DataFrame()
for i in range(len(codes_list)):
df1 = ts.pro_bar(ts_code=codes_list[i], freq='M', adj='hfq')
df1["trade_date"] = df1["trade_date"].astype("int64")
close = df1["close"].tolist()
return_1m = df1["pct_chg"].tolist()[0]
return_3m = close[0]/close[3] - 1
df2 = ts.pro_bar(ts_code=codes_list[i], adj='hfq')
df2["trade_date"] = df2["trade_date"].astype("int64")
df3 = pro.daily_basic(ts_code=codes_list[i],start_date=start,end_date=end,fields="ts_code,trade_date,turnover_rate")
df3["trade_date"] = df3["trade_date"].astype("int64")
df2 = df2[df2["trade_date"]>=start]
w_return_3m = (df2["pct_chg"]*df3["turnover_rate"]).sum()/len(df2)
df_1m_return = df2[df2["trade_date"]>=one_m_start]
df_1m_turnover = df3[df3["trade_date"]>=one_m_start]
w_return_1m = (df_1m_return["pct_chg"]*df_1m_turnover["turnover_rate"]).sum()/len(df_1m_return)
df4 = pd.DataFrame({"ts_code":codes_list[i],
"return_1m":return_1m,"return_3m":return_3m,"w_return_1m":w_return_1m,"w_return_3m":w_return_3m},index=[0])
return_factors = return_factors.append(df4)
print("第%d支股票动量反转因子获取成功"%i)
all_factors = all_factors.merge(return_factors)
#获取波动率因子
vol_factors = pd.DataFrame()
for i in range(len(codes_list)):
df1 = ts.pro_bar(ts_code=codes_list[i], adj='hfq')
df1["trade_date"] = df1["trade_date"].astype("int64")
df2 = df1[df1["trade_date"]>=start]
std_1m = df2["pct_chg"].std()
df3 = df1[df1["trade_date"]>=one_m_start]
std_3m = df3["pct_chg"].std()
df4 = pd.DataFrame({"ts_code":codes_list[i],"std_1m":std_1m,"std_3m":std_3m},index=[0])
vol_factors = vol_factors.append(df4)
print("第%d支股票波动率因子获取成功"%i)
all_factors = all_factors.merge(vol_factors)
#获取beta
beta_factors = pd.DataFrame()
rf = 1.03**(1/360) - 1
hs300 = pro.index_daily(ts_code="399300.SZ",start_date=beta_start,end_date=beta_end)
hs300["rm"] = hs300["pct_chg"]/100 - rf
for i in range(len(codes_list)):
df1 = pro.daily(ts_code=codes_list[i],start_date=beta_start,end_date=beta_end)
df1["rp"] = df1["pct_chg"]/100 - rf
df_model = pd.merge(hs300[["trade_date","rm"]],df1[["trade_date","rp"]],on="trade_date")
df_model.index = pd.to_datetime(df1.trade_date)
df_model.sort_index(inplace=True)
model = sm.OLS(df_model["rp"],sm.add_constant(df_model["rm"]))
result = model.fit()
beta = result.params["rm"]
df2 = pd.DataFrame({"ts_code":codes_list[i],"beta":beta},index=[0])
beta_factors = beta_factors.append(df2)
print("第%d支股票beta因子获取成功"%i)
all_factors = all_factors.merge(beta_factors)
#获取换手率因子
turn_factors = pd.DataFrame()
for i in range(len(codes_list)):
df1 = pro.daily_basic(ts_code=codes_list[i],start_date=start,end_date=end,fields="ts_code,trade_date,turnover_rate")
df1["trade_date"] = df1["trade_date"].astype("int64")
turn_3m = df1["turnover_rate"].sum().mean()
df2 = df1[df1["trade_date"]>=one_m_start]
turn_1m = df2["turnover_rate"].sum().mean()
df3 = pd.DataFrame({"ts_code":codes_list[i],"turn_1m":turn_1m,"turn_3m":turn_3m},index=[0])
turn_factors = turn_factors.append(df3)
print("第%d支股票换手率因子获取成功"%i)
time.sleep(0.3)
all_factors = all_factors.merge(turn_factors)
return all_factors
数据预处理
本段的数据预处理显示获取个股所属行业,这一部分需要先在网上下载一份上司公司所属中信一级行业的表格(即代码中的“ industry.xlsx ”)。之后再用merge函数合并进数据表中。之后可以利用所属行业进行行业市值中性化处理,其他预处理步骤包括中位数去极值,缺失值处理,标准化和PCA。这里的PCA主要是为了去除多重共线性的影响,不在于筛选因子。
#获取标签
def get_tag(data,date):
stocks = data["ts_code"].tolist()
rm = pro.index_monthly(ts_code="399300.SZ",trade_date=date).pct_chg.tolist()
return_list=[]
for i in range(len(stocks)):
r = pro.monthly(ts_code=stocks[i], trade_date=date).pct_chg.tolist()
return_list.append(r[0]-rm[0])
print("第%d支股票超额收益计算完成"%i)
time.sleep(0.5)
data["ex_return"] = return_list
return data
#获取行业
def get_industry(data):
df1 = pd.read_excel(r"industry.xlsx",dtype="object")
df1 = df1.rename(columns={"code":"ts_code"})
ts_codes = data["ts_code"].tolist()
data["ts_code"] = data["ts_code"].apply(lambda x:x[0:6])
data = data.merge(df1,on="ts_code")
data["ts_code"] = ts_codes
return data
#数据预处理
#中位数去极值
def MAD(data,n):
indexes = data.columns.values.tolist()
indexes = indexes[2:len(indexes)-1]
for i in range(len(indexes)):
Dm = data[indexes[i]].quantile(0.5)
Dm1 = ((data[indexes[i]] - Dm).abs()).quantile(0.5)
max_range = Dm + n*Dm1
min_range = Dm - n*Dm1
data[indexes[i]] = np.clip(data[indexes[i]],min_range,max_range)
return data
#缺失值处理
def Miss_data(data):
indexes = data.columns.values.tolist()
indexes = indexes[2:len(indexes)-1]
for i in range(len(indexes)):
data[indexes[i]] = data.groupby("industry")[indexes[i]].transform(lambda x:x.fillna(x.mean()))
return data
#市值行业中性化
def Indifference(data):
data2 = data.drop_duplicates(subset=["industry"])
list1 = data2["industry"].tolist()
list2 = data["industry"].tolist()
industry_matrix = pd.DataFrame(columns=list1)
industry_names = industry_matrix.columns.values.tolist()
for each in range(len(industry_names)):
for i in range(len(list2)):
if list2[i] == list1[each]:
list2[i] = 1
else:
list2[i] = 0
industry_matrix[list1[each]] = list2
list2 = data["industry"].tolist()
indexes = data.columns.values.tolist()
indexes = indexes[2:len(indexes)-1]
for i in range(len(indexes)):
model = sm.OLS(data[indexes[i]],sm.add_constant(industry_matrix))
result = model.fit()
data[indexes[i]] = result.resid
return data
#标准化
def Standardize(data):
indexes = data.columns.values.tolist()
indexes = indexes[2:len(indexes)-1]
data[indexes] = preprocessing.scale(data[indexes])
return data
#主成分分析
def PCA_data(data):
indexes = data.columns.values.tolist()
indexes = indexes[2:len(indexes)-1]
pca = PCA(n_components=20)
new_data = pca.fit_transform(data[indexes])
new_data = pd.DataFrame(new_data)
data[indexes] = new_data
return data
模型构建
到这里之后用于回归模型的数据集就已经构建完成了,利用传统机器学习的步骤,将数据集分为训练集和测试集,训练集为T-2期至T-12期的数据,测试集为T-1期的数据,本模型最终目的是利用T-1期的数据预测T期个股的超额收益率。
- 点赞
- 收藏
- 关注作者
评论(0)