基于200I DK A2的GRU股票预测器-预测‘赛力斯’走势
PS:巨量时间,不断踩坑,悲凉血泪史,总结成的小小见解。
如果各位觉得写的还行,劳请各位老师们点赞收藏一波!!各位老师的鼓励是我坚持写作的动力!!
我玩的就是真实!!!我玩的就是真实!!!我玩的就是真实!!!
以下解决方案仅代表个人见解,如果有更好的方案,希望大家不吝提出,共同学习
问:什么是GRU?
见下图,引用自网图
GRU是循环神经网络(Recurrent Neural Network, RNN)的一种。
和LSTM(Long-Short Term Memory)一样,
为了解决长期记忆和反向传播中的梯度等问题而提出来的。
具备两个门控,更新的门控,重置的门控(reset gate)
比LSTM效果差不多,但是所需数据更少
本次使用10天日线数据预测1天未来数据
问:数据源是什么?
使用tushare的数据库,下载赛力斯自上市以来的日线数据并保存于CSV中备用
问:达到预测目标需要哪几个步骤?
1.在PC端使用GPU加速计算并验证算法正确性
2.生成对应的Onnx,并且设定好输入射出数据定义
3.使用atc进行Onnx转换,转换成om
3.使用acendCL python 语法读取om模型,并进行推理
4.使用matplotlib进行图形拼接以及输出
问:训练的结果是怎么样的?
有以下几个结论
1. 设备进行推理耗时短,只用了1S左右的时间,但是画图用了要40s
2. 该模型虽然使用了dropout但是仍然对输入数据非常敏感
3. 用预测数据当成是实际发生的数据输入Model中,会出现单边的上行或者单边下行的情况
问:200dkA2代码是怎么样的?
import pandas as pd
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import numpy as np
from torch.utils.data import Dataset, DataLoader
import os
import tushare as ts
import acl
import struct
os.environ['KMP_DUPLICATE_LIB_OK'] = 'TRUE'
def generate_df_affect_by_n_days(series, n):
df = pd.DataFrame()
for i in range(n):
df['c%d' % i] = series.tolist()[i:-(n - i)]
df['y'] = series.tolist()[n:]
return df
def readData(column='close', n=10, train_end=-180):
df = pd.read_csv("sh.csv", index_col=1, encoding='utf-8')
# df.index = list(map(lambda x: datetime.datetime.strptime(x, "%Y-%m-%d").date(), df.index))
df_column = df[column].copy()
df_index = df.index.tolist()
df_column_train, df_column_test = df_column[:train_end], df_column[train_end - n:]
df_generate_from_df_column_train = generate_df_affect_by_n_days(df_column_train, n)
# print(df_generate_from_df_column_train)
return df_generate_from_df_column_train, df_column,df_index
class RNN(nn.Module):
def __init__(self, input_size):
super(RNN, self).__init__()
self.rnn = nn.GRU(
input_size=input_size,
hidden_size=256,
num_layers=6,
batch_first=True,
)
self.out = nn.Sequential(
nn.Linear(256, 1),
)
self.hidden = None
self.dropout = nn.Dropout(0.2)
def forward(self, x):
x = self.dropout(x)
r_out, self.hidden = self.rnn(x)
out = self.out(r_out)
return out
class TrainSet(Dataset):
def __init__(self, data):
self.data, self.label = data[:, :-1].float(), data[:, -1].float()
def __getitem__(self, index):
return self.data[index], self.label[index]
def __len__(self):
return len(self.data)
def data_upgrade():
ts.set_token('')
pro = ts.pro_api()
df = pro.query('daily', ts_code='603288.SH', start_date='20140211', end_date='20231106')
df["trade_date"] = pd.to_datetime(df["trade_date"], format="%Y%m%d")
df = df.sort_values(by=["trade_date"], ascending=True)
df.fillna(0, inplace=True)
df.replace(to_replace="None", value=0)
df.to_csv("sh.csv", index=False, sep=',')
def data_process():
device = 'cuda'
n = 10
LR = 0.0001
EPOCH = 100
train_end = -180
df, df_all,df_index = readData('close', n=n, train_end=train_end)
df_all = np.array(df_all.tolist())
plt.figure(1)
plt.plot(df_index, df_all, label='real-data')
df_numpy = np.array(df)
print(df_numpy)
df_numpy_mean = np.mean(df_numpy)
df_numpy_std = np.std(df_numpy)
df_numpy = (df_numpy - df_numpy_mean) / df_numpy_std
df_tensor = torch.Tensor(df_numpy)
trainset = TrainSet(df_tensor)
trainloader = DataLoader(trainset, batch_size=64, shuffle=True)
#pyACL初始化
ret = acl.init()
#运行管理资源申请。
device_id = 0
# 指定运算的Device。
ret = acl.rt.set_device(device_id)
# 显式创建一个Context,用于管理Stream对象。
context, ret = acl.rt.create_context(device_id)
#加载模型,并获取模型描述信息。
# 初始化变量。
model_path = './model.om'
# 加载离线模型文件,返回标识模型的ID。
model_id, ret = acl.mdl.load_from_file(model_path)
# 根据加载成功的模型的ID,获取该模型的描述信息。
model_desc = acl.mdl.create_desc()
ret = acl.mdl.get_desc(model_desc, model_id)
# 初始化变量。
ACL_MEM_MALLOC_HUGE_FIRST = 0
# 1.准备模型推理的输入数据集。
# 创建aclmdlDataset类型的数据,描述模型推理的输入。
load_input_dataset = acl.mdl.create_dataset()
# 获取模型输入的数量。
input_size = acl.mdl.get_num_inputs(model_desc)
input_data = []
# 循环为每个输入申请内存,并将每个输入添加到aclmdlDataset类型的数据中。
for i in range(input_size):
buffer_size = acl.mdl.get_input_size_by_index(model_desc, i)
# 申请输入内存。
buffer, ret = acl.rt.malloc(buffer_size, ACL_MEM_MALLOC_HUGE_FIRST)
data = acl.create_data_buffer(buffer, buffer_size)
_, ret = acl.mdl.add_dataset_buffer(load_input_dataset, data)
input_data.append({"buffer": buffer, "size": buffer_size})
# 2.准备模型推理的输出数据集。
# 创建aclmdlDataset类型的数据,描述模型推理的输出。
load_output_dataset = acl.mdl.create_dataset()
# 获取模型输出的数量。
output_size = acl.mdl.get_num_outputs(model_desc)
output_data = []
# 循环为每个输出申请内存,并将每个输出添加到aclmdlDataset类型的数据中。
for i in range(output_size):
buffer_size = acl.mdl.get_output_size_by_index(model_desc, i)
# 申请输出内存。
buffer, ret = acl.rt.malloc(buffer_size, ACL_MEM_MALLOC_HUGE_FIRST)
data = acl.create_data_buffer(buffer, buffer_size)
_, ret = acl.mdl.add_dataset_buffer(load_output_dataset, data)
output_data.append({"buffer": buffer, "size": buffer_size})
print(input_data,input_size)
print(output_data,output_size)
ACL_MEMCPY_HOST_TO_DEVICE = 1
ACL_MEMCPY_DEVICE_TO_HOST = 2
generate_data_train = []
generate_data_test = []
test_index = len(df_all) + train_end
df_all_normal = (df_all - df_numpy_mean) / df_numpy_std
df_all_normal_tensor = (df_all_normal)
buffer_host, ret = acl.rt.malloc_host(output_data[0]["size"])
for i in range(n, len(df_all)):
x = df_all_normal_tensor[i - n:i]
x_all = b''
for j in x:
x_b = struct.pack('<f',j)
x_all = x_all + x_b
np_ptr = acl.util.bytes_to_ptr(x_all)
ret = acl.rt.memcpy(input_data[0]["buffer"], input_data[0]["size"], np_ptr,
input_data[0]["size"], ACL_MEMCPY_HOST_TO_DEVICE)
ret = acl.mdl.execute(model_id, load_input_dataset, load_output_dataset)
ret = acl.rt.memcpy(buffer_host,output_data[0]["size"], output_data[0]["buffer"],
output_data[0]["size"], ACL_MEMCPY_DEVICE_TO_HOST)
bytes_out = acl.util.ptr_to_bytes(buffer_host, output_data[0]["size"])
y = np.frombuffer(bytes_out, dtype=np.float32)
# print(y)
if i < test_index:
generate_data_train.append(y * df_numpy_std + df_numpy_mean)
else:
generate_data_test.append(y * df_numpy_std + df_numpy_mean)
ret = acl.rt.free(buffer_host)
predict_num = 100
predict_dict = []
predict_list = []
x = df_all_normal_tensor[len(df_all)-n:len(df_all)]
x_all = b''
for j in x:
x_b = struct.pack('<f',j)
x_all = x_all + x_b
input = x
for i in range(predict_num):
np_ptr = acl.util.bytes_to_ptr(x_all)
ret = acl.rt.memcpy(input_data[0]["buffer"], input_data[0]["size"], np_ptr,
input_data[0]["size"], ACL_MEMCPY_HOST_TO_DEVICE)
ret = acl.mdl.execute(model_id, load_input_dataset, load_output_dataset)
ret = acl.rt.memcpy(buffer_host,output_data[0]["size"], output_data[0]["buffer"],
output_data[0]["size"], ACL_MEMCPY_DEVICE_TO_HOST)
bytes_out = acl.util.ptr_to_bytes(buffer_host, output_data[0]["size"])
y = np.frombuffer(bytes_out, dtype=np.float32)
input = np.append(input[1:],y)
x_all = b''
for j in input:
x_b = struct.pack('<f',j)
x_all = x_all + x_b
m= y * df_numpy_std + df_numpy_mean
predict_dict.append(m)
predict_list.append(i)
ret = acl.rt.free(buffer_host)
plt.plot(df_index[n:train_end], generate_data_train, label='generate_train')
plt.plot(df_index[train_end:] + predict_list, generate_data_test + predict_dict, label='generate_test')
plt.axvline(x=len(df_all)-1, color='r', linestyle='--')
plt.legend()
while input_data:
item = input_data.pop()
ret = acl.rt.free(item["buffer"])
input_number = acl.mdl.get_dataset_num_buffers(load_input_dataset)
for i in range(input_number):
data_buf = acl.mdl.get_dataset_buffer(load_input_dataset, i)
if data_buf:
ret = acl.destroy_data_buffer(data_buf)
ret = acl.mdl.destroy_dataset(load_input_dataset)
# 释放输出资源,包括数据结构和内存。
while output_data:
item = output_data.pop()
ret = acl.rt.free(item["buffer"])
output_number = acl.mdl.get_dataset_num_buffers(load_output_dataset)
for i in range(output_number):
data_buf = acl.mdl.get_dataset_buffer(load_output_dataset, i)
if data_buf:
ret = acl.destroy_data_buffer(data_buf)
ret = acl.mdl.destroy_dataset(load_output_dataset)
#卸载模型,并释放模型描述信息。
# 卸载模型。
ret = acl.mdl.unload(model_id)
# 释放模型描述信息。
if model_desc:
ret = acl.mdl.destroy_desc(model_desc)
model_desc = None
#运行管理资源释放。
# 释放Context。
if context:
ret = acl.rt.destroy_context(context)
context = None
# 释放Device。
ret = acl.rt.reset_device(device_id)
#pyACL去初始化。
ret = acl.finalize()
plt.show()
if __name__ == '__main__':
#data_upgrade()
data_process()
复制
问:PC端训练代码是怎么样的?
import pandas as pd
import matplotlib.pyplot as plt
import datetime
import torch
import torch.nn as nn
import numpy as np
from torch.utils.data import Dataset, DataLoader
import os
import tushare as ts
from tqdm import tqdm
os.environ['KMP_DUPLICATE_LIB_OK'] = 'TRUE'
def generate_df_affect_by_n_days(series, n):
df = pd.DataFrame()
for i in range(n):
df['c%d' % i] = series.tolist()[i:-(n - i)]
df['y'] = series.tolist()[n:]
return df
def readData(column='close', n=10, train_end=-180):
df = pd.read_csv("sh.csv", index_col=1, encoding='utf-8')
# df.index = list(map(lambda x: datetime.datetime.strptime(x, "%Y-%m-%d").date(), df.index))
df_column = df[column].copy()
df_index = df.index.tolist()
df_column_train, df_column_test = df_column[:train_end], df_column[train_end - n:]
df_generate_from_df_column_train = generate_df_affect_by_n_days(df_column_train, n)
# print(df_generate_from_df_column_train)
return df_generate_from_df_column_train, df_column,df_index
class RNN(nn.Module):
def __init__(self, input_size):
super(RNN, self).__init__()
self.rnn = nn.GRU(
input_size=input_size,
hidden_size=256,
num_layers=6,
batch_first=True,
)
self.out = nn.Sequential(
nn.Linear(256, 1),
)
self.hidden = None
self.dropout = nn.Dropout(0.2)
def forward(self, x):
x = self.dropout(x)
r_out, self.hidden = self.rnn(x)
out = self.out(r_out)
return out
class TrainSet(Dataset):
def __init__(self, data):
self.data, self.label = data[:, :-1].float(), data[:, -1].float()
def __getitem__(self, index):
return self.data[index], self.label[index]
def __len__(self):
return len(self.data)
def data_upgrade():
ts.set_token('')
pro = ts.pro_api()
df = pro.query('daily', ts_code='603288.SH', start_date='20140211', end_date='20231106')
df["trade_date"] = pd.to_datetime(df["trade_date"], format="%Y%m%d")
df = df.sort_values(by=["trade_date"], ascending=True)
df.fillna(0, inplace=True)
df.replace(to_replace="None", value=0)
df.to_csv("sh.csv", index=False, sep=',')
def data_process():
device = 'cuda'
n = 10
LR = 0.0001
EPOCH = 100
train_end = -180
df, df_all,df_index = readData('close', n=n, train_end=train_end)
df_all = np.array(df_all.tolist())
plt.figure(1)
plt.plot(df_index, df_all, label='real-data')
df_numpy = np.array(df)
print(df_numpy)
df_numpy_mean = np.mean(df_numpy)
df_numpy_std = np.std(df_numpy)
df_numpy = (df_numpy - df_numpy_mean) / df_numpy_std
df_tensor = torch.Tensor(df_numpy)
print(df_tensor.shape)
trainset = TrainSet(df_tensor)
trainloader = DataLoader(trainset, batch_size=64, shuffle=True)
rnn = RNN(n).to(device)
if not os.path.exists('model.pth'):
rnn.train()
optimizer = torch.optim.Adam(rnn.parameters(), lr=LR) # optimize all cnn parameters
loss_func = nn.MSELoss()
mini_loss = 100;
for step in (range(EPOCH)):
for tx, ty in trainloader:
output = rnn(torch.unsqueeze(tx, dim=0).to(device))
loss = loss_func(torch.squeeze(output).to(device), ty.to(device))#torch.unsqueeze(ty, dim=0)) #torch.squeeze(output)
if loss < mini_loss:
mini_loss = loss
torch.save(rnn.state_dict(), 'model.pth')
optimizer.zero_grad() # clear gradients for this training step
loss.backward() # back propagation, compute gradients
optimizer.step()
print(step, loss)
generate_data_train = []
generate_data_test = []
rnn.load_state_dict(torch.load('model.pth'))
if not os.path.exists('model.onnx'):
input = torch.randn(1, 1, 10, requires_grad=True)
input_name = 'input'
output_name = 'output'
torch.onnx.export(rnn.to('cpu'),
input,
"M:\\R5-Python\\R19_SZ_Altas_200I_DK_A2_20231014\\R2_FILE\\GRU预测股票\\code\\model.onnx",
opset_version=11,
input_names = [input_name],
output_names = [output_name],
)
rnn.to(device)
rnn.eval()
test_index = len(df_all) + train_end
df_all_normal = (df_all - df_numpy_mean) / df_numpy_std
df_all_normal_tensor = torch.Tensor(df_all_normal)
for i in range(n, len(df_all)):
x = df_all_normal_tensor[i - n:i]
x = torch.unsqueeze(torch.unsqueeze(x, dim=0), dim=0).to(device)
y = rnn(x).to('cpu')
if i < test_index:
generate_data_train.append(torch.squeeze(y).detach().numpy() * df_numpy_std + df_numpy_mean)
else:
generate_data_test.append(torch.squeeze(y).detach().numpy() * df_numpy_std + df_numpy_mean)
predict_num = 100
predict_dict = []
predict_list = []
x = df_all_normal_tensor[len(df_all)-n:len(df_all)]
x = torch.unsqueeze(torch.unsqueeze(x, dim=0), dim=0).to(device)
# y = rnn(x)
# print(x,y)
for i in range(predict_num):
y = rnn(x).to('cpu')
input = [x[:,:,1:].to('cpu'),y.detach()]
x = torch.cat(input,dim=2).to(device)
m=torch.squeeze(y).detach().numpy() * df_numpy_std + df_numpy_mean
predict_dict.append(m)
predict_list.append(i)
plt.plot(df_index[n:train_end], generate_data_train, label='generate_train')
plt.plot(df_index[train_end:] + predict_list, generate_data_test + predict_dict, label='generate_test')
plt.axvline(x=len(df_all)-1, color='r', linestyle='--')
plt.legend()
plt.show()
if __name__ == '__main__':
#data_upgrade()
data_process()
pass
- 点赞
- 收藏
- 关注作者
评论(0)