【RNN从入门到实战】GRU入门到实战——使用GRU预测股票。

举报
AI浩 发表于 2022/02/05 11:35:26 2022/02/05
【摘要】 摘要      GRU是LSTM网络的一种效果很好的变体,它较LSTM网络的结构更加简单,而且效果也很好,因此也是当前非常流形的一种网络。GRU既然是LSTM的变体,因此也是可以解决RNN网络中的长依赖问题。       在LSTM中引入了三个门函数:输入门、遗忘门和输出门来控制输入值、记忆值和输出值。而在GRU模型中只有两个门:分别是更新门和重置门。具体结构如下图所示:图中的update...

摘要

      GRU是LSTM网络的一种效果很好的变体,它较LSTM网络的结构更加简单,而且效果也很好,因此也是当前非常流形的一种网络。GRU既然是LSTM的变体,因此也是可以解决RNN网络中的长依赖问题。

       在LSTM中引入了三个门函数:输入门、遗忘门和输出门来控制输入值、记忆值和输出值。而在GRU模型中只有两个门:分别是更新门和重置门。具体结构如下图所示:
在这里插入图片描述
图中的update gate和reset gate分别表示更新门和重置门。
     更新门的作用类似于LSTM的遗忘和输入门。它决定前一时刻的状态信息哪些被丢弃和哪些被更新新信息,更新门的值越大说明前一时刻的状态信息带入越多。
    重置门控制前一状态有多少信息被写入到当前的候选集 h t ~ \tilde{h_{t}} 上,重置门越小,前一状态的信息被写入的越少。
详见下图:
在这里插入图片描述

GRU和LSTM的区别

在这里插入图片描述

GRU使用门控机制学习长期依赖关系的基本思想和 LSTM 一致,但还是有一些关键区别:

1、GRU 有两个门(重置门与更新门),而 LSTM 有三个门(输入门、遗忘门和输出门)。
2、GRU 并不会控制并保留内部记忆(c_t),且没有 LSTM 中的输出门。
3、LSTM 中的输入与遗忘门对应于 GRU 的更新门,重置门直接作用于前面的隐藏状态。
4、在计算输出时并不应用二阶非线性。

如何选择?

由于 GRU 参数更少,收敛速度更快,因此其实际花费时间要少很多,这可以大大加速了我们的迭代过程。 而从表现上讲,二者之间孰优孰劣并没有定论,这要依据具体的任务和数据集而定,而实际上,二者之间的 performance 差距往往并不大,远没有调参所带来的效果明显,我通常的做法,首先选择GRU作为基本的单元,因为其收敛速度快,可以加速试验进程,快速迭代,而我认为快速迭代这一特点很重要。如果实现没其余优化技巧,才会尝试将 GRU 换为 LSTM,看看有没有变化。

实战——使用GRU预测股票

制作数据集

数据的下载地址:http://quotes.money.163.com/trade/lsjysj_zhishu_000001.html
首先选择查询数据的范围,然后选择下载数据:
在这里插入图片描述
点击下载数据后,弹出个框,这个框里可以看到起止日期,数据的列,默认勾选全部,然后单击下载。
在这里插入图片描述
将下载后的数据放到工程的根目录下面,然后做一些处理。
第一步 读入csv文件,编码方式设置为gbk。
第二步 将日期列转为“年-月-日”这样的格式。
第三步 根据日期排序,数据默认是从降序,改为升序。
最后 保存到“sh.csv”中。
代码如下:

import pandas as pd

df = pd.read_csv("000001.csv", encoding='gbk')
df["日期"] = pd.to_datetime(df["日期"], format="%Y-%m-%d")
df.sort_values(by=["日期"], ascending=True)
df = df.sort_values(by=["日期"], ascending=True)
df.to_csv("sh.csv", index=False, sep=',')

到这里数据集制作完成了

读入数据集,构建时序数据。

generate_df_affect_by_n_days函数,通过一个序列来生成一个矩阵(用于处理时序的数据)。就是把当天的前n天作为参数,当天的数据作为label。
readData中的文件名为:sh.csv ,参数column,代表选用的数据,本次预测只用了一列数据,列名就是column,参数n就是之前模型中所说的n,代表column前n天的数据。train_end表示的是后面多少个数据作为测试集。

import pandas as pd
import matplotlib.pyplot as plt
import datetime
import torch
import torch.nn as nn
import numpy as np
from torch.utils.data import Dataset, DataLoader


def generate_df_affect_by_n_days(series, n, index=False):
    if len(series) <= n:
        raise Exception("The Length of series is %d, while affect by (n=%d)." % (len(series), n))
    df = pd.DataFrame()
    for i in range(n):
        df['c%d' % i] = series.tolist()[i:-(n - i)]
    df['y'] = series.tolist()[n:]
    if index:
        df.index = series.index[n:]
    return df


def readData(column='收盘价', n=30, all_too=True, index=False, train_end=-300):
    df = pd.read_csv("sh.csv", index_col=0, encoding='utf-8')
    df.fillna(0, inplace=True)
    df.replace(to_replace="None", value=0)
    del df["股票代码"]
    del df["名称"]
    df.index = list(map(lambda x: datetime.datetime.strptime(x, "%Y-%m-%d").date(), df.index))
    df_column = df[column].copy()
    df_column_train, df_column_test = df_column[:train_end], df_column[train_end - n:]
    df_generate_from_df_column_train = generate_df_affect_by_n_days(df_column_train, n, index=index)
    print(df_generate_from_df_column_train)
    if all_too:
        return df_generate_from_df_column_train, df_column, df.index.tolist()
    return df_generate_from_df_column_train

构建模型

模型见RNN类,采用GRU+全连接。隐藏层设置64,GRU的输出是64维的,所以设置全连接也是64。
TrainSet是数据读取类,将输出数据的最后一列做标签,前面的列做输入,类的写法是Pytorch的固定模式。

class RNN(nn.Module):
    def __init__(self, input_size):
        super(RNN, self).__init__()
        self.rnn = nn.GRU(
            input_size=input_size,
            hidden_size=64,
            num_layers=1,
            batch_first=True,
        )
        self.out = nn.Sequential(
            nn.Linear(64, 1),
        )
        self.hidden = None
    def forward(self, x):
        r_out, self.hidden = self.rnn(x)  # None 表示 hidden state 会用全0的 state
        out = self.out(r_out)
        return out


class TrainSet(Dataset):
    def __init__(self, data):
        # 定义好 image 的路径
        self.data, self.label = data[:, :-1].float(), data[:, -1].float()

    def __getitem__(self, index):
        return self.data[index], self.label[index]

    def __len__(self):
        return len(self.data)

训练与测试

n为模型中的n
LR是模型的学习率
EPOCH是多次循环
train_end这个在之前的数据集中有提到。(注意是负数)
n = 30
LR = 0.0001
EPOCH = 100
train_end = -500
loss选用mse
预测的数据选择“收盘价”

n = 10
LR = 0.0001
EPOCH = 100
train_end = -500
# 数据集建立
df, df_all, df_index = readData('收盘价', n=n, train_end=train_end)
df_all = np.array(df_all.tolist())
plt.plot(df_index, df_all, label='real-data')
df_numpy = np.array(df)
df_numpy_mean = np.mean(df_numpy)
df_numpy_std = np.std(df_numpy)
df_numpy = (df_numpy - df_numpy_mean) / df_numpy_std
df_tensor = torch.Tensor(df_numpy)
trainset = TrainSet(df_tensor)
trainloader = DataLoader(trainset, batch_size=16, shuffle=True)
rnn = RNN(n)
optimizer = torch.optim.Adam(rnn.parameters(), lr=LR)  # optimize all cnn parameters
loss_func = nn.MSELoss()
for step in range(EPOCH):
    for tx, ty in trainloader:
        output = rnn(torch.unsqueeze(tx, dim=0))
        loss = loss_func(torch.squeeze(output), ty)
        optimizer.zero_grad()  # clear gradients for this training step
        loss.backward()  # back propagation, compute gradients
        optimizer.step()
    print(step, loss)
    if step % 10:
        torch.save(rnn, 'rnn.pkl')
torch.save(rnn, 'rnn.pkl')
generate_data_train = []
generate_data_test = []
test_index = len(df_all) + train_end
df_all_normal = (df_all - df_numpy_mean) / df_numpy_std
df_all_normal_tensor = torch.Tensor(df_all_normal)
for i in range(n, len(df_all)):
    x = df_all_normal_tensor[i - n:i]
    x = torch.unsqueeze(torch.unsqueeze(x, dim=0), dim=0)
    y = rnn(x)
    if i < test_index:
        generate_data_train.append(torch.squeeze(y).detach().numpy() * df_numpy_std + df_numpy_mean)
    else:
        generate_data_test.append(torch.squeeze(y).detach().numpy() * df_numpy_std + df_numpy_mean)
plt.plot(df_index[n:train_end], generate_data_train, label='generate_train')
plt.plot(df_index[train_end:], generate_data_test, label='generate_test')
plt.legend()
plt.show()
plt.cla()
plt.plot(df_index[train_end:-400], df_all[train_end:-400], label='real-data')
plt.plot(df_index[train_end:-400], generate_data_test[:-400], label='generate_test')
plt.legend()
plt.show()

在这里插入图片描述
在这里插入图片描述

总结

本实例数据集使用上证的收盘价,构建时序数据集,使用GRU对数据预测,从结果上来看,走势有一定的滞后性,由于只是用了价格预测价格,还是太简单了,可以考虑加入更多的特征去优化这一算法。

完整代码

import pandas as pd
import matplotlib.pyplot as plt
import datetime
import torch
import torch.nn as nn
import numpy as np
from torch.utils.data import Dataset, DataLoader
import os
os.environ['KMP_DUPLICATE_LIB_OK'] = 'TRUE'

def generate_df_affect_by_n_days(series, n, index=False):
    if len(series) <= n:
        raise Exception("The Length of series is %d, while affect by (n=%d)." % (len(series), n))
    df = pd.DataFrame()
    for i in range(n):
        df['c%d' % i] = series.tolist()[i:-(n - i)]
    df['y'] = series.tolist()[n:]
    if index:
        df.index = series.index[n:]
    return df


def readData(column='收盘价', n=30, all_too=True, index=False, train_end=-300):
    df = pd.read_csv("sh.csv", index_col=0, encoding='utf-8')
    df.fillna(0, inplace=True)
    df.replace(to_replace="None", value=0)
    del df["股票代码"]
    del df["名称"]
    df.index = list(map(lambda x: datetime.datetime.strptime(x, "%Y-%m-%d").date(), df.index))
    df_column = df[column].copy()
    df_column_train, df_column_test = df_column[:train_end], df_column[train_end - n:]
    df_generate_from_df_column_train = generate_df_affect_by_n_days(df_column_train, n, index=index)
    print(df_generate_from_df_column_train)
    if all_too:
        return df_generate_from_df_column_train, df_column, df.index.tolist()
    return df_generate_from_df_column_train


class RNN(nn.Module):
    def __init__(self, input_size):
        super(RNN, self).__init__()
        self.rnn = nn.GRU(
            input_size=input_size,
            hidden_size=64,
            num_layers=1,
            batch_first=True,
        )
        self.out = nn.Sequential(
            nn.Linear(64, 1),
        )
        self.hidden = None
    def forward(self, x):
        r_out, self.hidden = self.rnn(x)  # None 表示 hidden state 会用全0的 state
        out = self.out(r_out)
        return out


class TrainSet(Dataset):
    def __init__(self, data):
        # 定义好 image 的路径
        self.data, self.label = data[:, :-1].float(), data[:, -1].float()

    def __getitem__(self, index):
        return self.data[index], self.label[index]

    def __len__(self):
        return len(self.data)


n = 10
LR = 0.0001
EPOCH = 100
train_end = -500
# 数据集建立
df, df_all, df_index = readData('收盘价', n=n, train_end=train_end)
df_all = np.array(df_all.tolist())
plt.plot(df_index, df_all, label='real-data')
df_numpy = np.array(df)
df_numpy_mean = np.mean(df_numpy)
df_numpy_std = np.std(df_numpy)
df_numpy = (df_numpy - df_numpy_mean) / df_numpy_std
df_tensor = torch.Tensor(df_numpy)
trainset = TrainSet(df_tensor)
trainloader = DataLoader(trainset, batch_size=64, shuffle=True)
rnn = RNN(n)
optimizer = torch.optim.Adam(rnn.parameters(), lr=LR)  # optimize all cnn parameters
loss_func = nn.MSELoss()
for step in range(EPOCH):
    for tx, ty in trainloader:
        output = rnn(torch.unsqueeze(tx, dim=0))
        loss = loss_func(torch.squeeze(output), ty)
        optimizer.zero_grad()  # clear gradients for this training step
        loss.backward()  # back propagation, compute gradients
        optimizer.step()
    print(step, loss)
    if step % 10:
        torch.save(rnn, 'rnn.pkl')
torch.save(rnn, 'rnn.pkl')
generate_data_train = []
generate_data_test = []
test_index = len(df_all) + train_end
df_all_normal = (df_all - df_numpy_mean) / df_numpy_std
df_all_normal_tensor = torch.Tensor(df_all_normal)
for i in range(n, len(df_all)):
    x = df_all_normal_tensor[i - n:i]
    x = torch.unsqueeze(torch.unsqueeze(x, dim=0), dim=0)
    y = rnn(x)
    if i < test_index:
        generate_data_train.append(torch.squeeze(y).detach().numpy() * df_numpy_std + df_numpy_mean)
    else:
        generate_data_test.append(torch.squeeze(y).detach().numpy() * df_numpy_std + df_numpy_mean)
plt.plot(df_index[n:train_end], generate_data_train, label='generate_train')
plt.plot(df_index[train_end:], generate_data_test, label='generate_test')
plt.legend()
plt.show()
plt.cla()
plt.plot(df_index[train_end:-400], df_all[train_end:-400], label='real-data')
plt.plot(df_index[train_end:-400], generate_data_test[:-400], label='generate_test')
plt.legend()
plt.show()
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。