大模型基础--情感分析任务的演进(传统序列模型篇)
【摘要】 基于传统序列模型(RNN,LSTM和GRU)构建文本情感分类模型
1.需求说明
本案例的目标是基于传统序列模型(RNN,LSTM和GRU)构建文本情感分类模型,对评论内容进行二分类判断(正面或负面)。
2.需求分析
数据来源:https://github.com/SophonPlus/ChineseNlpCorpus/blob/master/datasets/online_shopping_10_cats/online_shopping_10_cats.zip
模型结构设计:模型整体由Embedding层,序列模型和线性层构成。
训练方案:损失函数使用 BCEWithLogitsLoss,结合了sigmoid激活和二分类交叉熵计算,数值稳定且适合二分类任务。优化器使用Adam优化器进行参数更新,提升训练效率。
评估方案:模型训练完毕后,使用测试集统计正确率。
3.代码实现
3.1 构建词表和分词工具
import jieba
from nlp_tutorial.ch03_tradtional_model.rnn_project_review_analysis.src.d_config import *
# 自定义分词工具
class JiebaTokenizer:
# 具有特殊标签的词
unk_token = UNK_TOKEN
pad_token = PAD_TOKEN
def __init__(self, vocab_list):
# 词表 列表的索引作为词的id
self.vocab_list = vocab_list
# 词表长度
self.vocab_size = len(vocab_list)
self.word2id = {}
self.id2word = {}
# id和词的映射
for id, word in enumerate(vocab_list):
self.word2id[word] = id
self.id2word[id] = word
# 特殊标签词的id
self.unk_id = self.word2id[UNK_TOKEN]
self.pad_id = self.word2id[PAD_TOKEN]
# 分词工具,直接调用jieba的精确分词
@staticmethod
def tokenize(text):
return jieba.lcut(text)
# 将文本按照词表编码,超过长度SEQ_LEN,截取前SEQ_LEN,不超过则使用PAD_TOKEN的索引填充
def encode(self, text, seq_len):
tokens = self.tokenize(text)
original_len_tokens = len(tokens)
if original_len_tokens > seq_len:
tokens = tokens[:seq_len]
elif original_len_tokens < seq_len:
tokens = tokens + [self.pad_token] * (seq_len - original_len_tokens)
ids = [self.word2id.get(token, self.unk_id) for token in tokens]
return ids
# 构建词表和将词表存入本地文件
@classmethod
def build_vocab(cls, sentences, vocab_file_path):
vocab_set = set()
for sentence in sentences:
vocab_set.update(jieba.lcut(sentence))
vocab_list = [cls.pad_token, cls.unk_token] + list(vocab_set)
print(f"词表大小: {len(vocab_list)}")
with open(vocab_file_path, mode='w', encoding='utf-8') as f:
f.writelines('\n'.join(vocab_list))
# 读取词表文件为词表列表
@classmethod
def from_vocab(cls, vocab_file_path):
with open(vocab_file_path, mode='r', encoding='utf-8') as f:
vocab_list = [token.strip() for token in f.readlines()]
tokenizer = cls(vocab_list)
return tokenizer
3.2 原始数据进行预处理(主要是构建词表,以及分词后按照词表映射为id)
import pandas as pd
from sklearn.model_selection import train_test_split
from nlp_tutorial.ch03_tradtional_model.rnn_project_review_analysis.src.d_config import *
from nlp_tutorial.ch03_tradtional_model.rnn_project_review_analysis.src.h_jieba_tokenizer import JiebaTokenizer
def preprocess():
print("开始数据预处理")
df = pd.read_csv(RAW_DATA_DIR / RAW_DATA_FILE, usecols=['label', 'review'], encoding='utf-8',
keep_default_na=False)
# 按照label进行划分
train_df, test_df = train_test_split(df, test_size=0.2, stratify=df['label'])
# 使用训练集,创建词表
JiebaTokenizer.build_vocab(train_df['review'].to_list(), MODEL_DIR / VOCAB_FILE)
# 从文件加载词表
tokenizer = JiebaTokenizer.from_vocab(MODEL_DIR / VOCAB_FILE)
# 将review进行分词,然后将词映射到id
train_df['review'] = train_df['review'].apply(lambda review: tokenizer.encode(review, SEQ_LEN))
test_df['review'] = test_df['review'].apply(lambda review: tokenizer.encode(review, SEQ_LEN))
# 将训练数据和测试数据存入本次
train_df.to_json(PROCESSED_DATA_DIR / TRAIN_DATA_FILE, orient="records", lines=True)
test_df.to_json(PROCESSED_DATA_DIR / TEST_DATA_FILE, orient="records", lines=True)
print("数据处理结束")
if __name__ == '__main__':
preprocess()
3.3 自定义DataLoader(DataLoader会自动分批,还可以在批次内处理数据)
import pandas as pd
import torch
from torch.utils.data import Dataset,DataLoader
from nlp_tutorial.ch03_tradtional_model.rnn_project_review_analysis.src.d_config import *
# 自定义Dataset,DataLoader依赖Dataset作为数据来源
class ReviewAnalysisDataset(Dataset):
def __init__(self,path):
# 加载数据
self.data = pd.read_json(path,lines=True,orient="records").to_dict(orient='records')
# 自定义Dataset必须实现两个内部函数
def __len__(self):
return len(self.data)
def __getitem__(self, index):
input = torch.tensor(self.data[index]['review'],dtype=torch.long)
target = torch.tensor(self.data[index]['label'],dtype=torch.float)
return input,target
# 获取DataLoader
def get_dataloader(train=True):
path = PROCESSED_DATA_DIR/(TRAIN_DATA_FILE if train else TEST_DATA_FILE)
dataset = ReviewAnalysisDataset(path)
# 将Dataset进行分批,分批的策略是洗牌随机
dataloader = DataLoader(dataset,batch_size=BATCH_SIZE,shuffle=True)
return dataloader
3.4 定义模型
import torch.nn as nn
import torch
from nlp_tutorial.ch03_tradtional_model.rnn_project_review_analysis.src.d_config import *
class RecurrentNeuralNetwork(nn.Module):
def __init__(self, input_size, hidden_size):
super().__init__()
self.rnn = nn.ModuleDict({"RNN": nn.RNN(input_size=input_size, hidden_size=hidden_size, batch_first=True),
"LSTM": nn.LSTM(input_size=input_size, hidden_size=hidden_size, batch_first=True),
"GRU": nn.GRU(input_size=input_size, hidden_size=hidden_size, batch_first=True)})
def forward(self, X, type="RNN"):
assert type in self.rnn.keys(), "请检查序列模型的简写名称,注意必须大写"
return self.rnn[type](X)
class ReviewAnalysisModel(nn.Module):
def __init__(self, vocab_size, padding_idx):
super().__init__()
# padding_idx的作用是对于矩阵中填充词不进行梯度计算
self.padding_idx = padding_idx
self.embedding = nn.Embedding(num_embeddings=vocab_size, embedding_dim=EMBEDDING_SIZE, padding_idx=padding_idx)
self.rnn = RecurrentNeuralNetwork(input_size=EMBEDDING_SIZE, hidden_size=HIDDEN_SIZE)
self.linear = nn.Linear(in_features=HIDDEN_SIZE, out_features=1)
def forward(self, X, type="RNN"):
embed = self.embedding(X)
rnn = self.rnn(embed, type)
output = rnn[0]
# 计算每个序列的真实长度(除padding填充)
len_valid = (X != self.padding_idx).sum(dim=1)
indices = torch.arange(output.shape[0])
# 使用列表索引
feature = output[indices, len_valid - 1]
return self.linear(feature).squeeze(-1)
3.5 配置信息(按照配置文件创建对应文件夹)
from pathlib import Path
ROOT_DIR = Path(__file__).parent.parent
RAW_DATA_DIR = ROOT_DIR / 'data' / 'raw'
PROCESSED_DATA_DIR = ROOT_DIR / 'data' / 'processed'
MODEL_DIR = ROOT_DIR / 'models'
LOG_DIR = ROOT_DIR / 'logs'
RAW_DATA_FILE = 'online_shopping_10_cats.csv'
TRAIN_DATA_FILE = 'train.jsonl'
TEST_DATA_FILE = 'test.jsonl'
VOCAB_FILE = 'vocab.txt'
BEST_MODEL = 'best_model.pt'
UNK_TOKEN = '<UNK>'
PAD_TOKEN = '<PAD>'
SEQ_LEN = 128
BATCH_SIZE = 64
EMBEDDING_SIZE = 128
HIDDEN_SIZE = 256
LEARN_RATE = 5e-4
EPOCHS = 20
SAMPLE_FRACTION = 1.0
3.6 定义训练过程
import time
import torch
from torch import nn, optim
from torch.utils.tensorboard import SummaryWriter
from tqdm import tqdm
from nlp_tutorial.ch03_tradtional_model.rnn_project_review_analysis.src.h_jieba_tokenizer import JiebaTokenizer
from nlp_tutorial.ch03_tradtional_model.rnn_project_review_analysis.src.b_dataset import get_dataloader
from nlp_tutorial.ch03_tradtional_model.rnn_project_review_analysis.src.c_model import ReviewAnalysisModel
from nlp_tutorial.ch03_tradtional_model.rnn_project_review_analysis.src.d_config import *
def train_one_epoch(model, train_loader, loss, optimizer, device,model_base):
model.train()
total_loss = 0.0
for input, target in tqdm(train_loader, desc='训练: '):
# 将训练的数据加载到指定计算资源
input, target = input.to(device), target.to(device)
# 启动模型
output = model(input,type=model_base)
# 计算损失
loss_value = loss(output, target)
loss_value.backward()
optimizer.step()
optimizer.zero_grad()
total_loss += loss_value.item()
return total_loss / len(train_loader)
def train(model_base="RNN"):
device = 'cuda' if torch.cuda.is_available() else 'cpu'
# 获取训练数据
train_loader = get_dataloader()
# 从文件加载词表
tokenizer = JiebaTokenizer.from_vocab(MODEL_DIR/VOCAB_FILE)
# 初始化模型,并传递指定计算资源
model = ReviewAnalysisModel(tokenizer.vocab_size,tokenizer.pad_id).to(device)
# 定义损失函数
loss = nn.BCEWithLogitsLoss()
# 定义优化器
optimizer = optim.Adam(lr=LEARN_RATE, params=model.parameters())
# 友好显示训练过程
min_loss = float('inf')
with SummaryWriter(log_dir=LOG_DIR / time.strftime('%Y-%m-%d_%H-%M-%S')) as writer:
for epoch in range(EPOCHS):
print("=" * 10, f"EPOCH:{epoch + 1}", "=" * 10)
time.sleep(0.1)
this_loss = train_one_epoch(model=model, train_loader=train_loader, loss=loss, optimizer=optimizer,
device=device,model_base=model_base)
print("this loss: ", this_loss)
writer.add_scalar('loss', this_loss, epoch + 1)
if this_loss < min_loss:
min_loss = this_loss
torch.save(model.state_dict(), f"{MODEL_DIR}/{model_base}_{BEST_MODEL}")
print("模型保存成功!")
if __name__ == '__main__':
[train(model_base=model_base) for model_base in ["RNN","LSTM","GRU"]]
3.7 模型评估
import torch
from tqdm import tqdm
from nlp_tutorial.ch03_tradtional_model.rnn_project_review_analysis.src.b_dataset import get_dataloader
from nlp_tutorial.ch03_tradtional_model.rnn_project_review_analysis.src.c_model import ReviewAnalysisModel
from nlp_tutorial.ch03_tradtional_model.rnn_project_review_analysis.src.d_config import *
from nlp_tutorial.ch03_tradtional_model.rnn_project_review_analysis.src.h_jieba_tokenizer import JiebaTokenizer
def predict_batch(model, input, model_base):
model.eval()
with torch.no_grad():
output = model(input, type=model_base)
output = torch.sigmoid(output)
return output.tolist()
def evaluate(model, dataloader, device, model_base):
correct_count = 0.0
total_count = 0.0
for inputs, targets in tqdm(dataloader, "评估模型中: "):
inputs, targets = inputs.to(device), targets.to(device)
batch_result = predict_batch(model, inputs, model_base)
for target, result in zip(targets, batch_result):
total_count += 1
result = 1 if result > 0.5 else 0
if result == target:
correct_count += 1.0
return correct_count / total_count
def run_evaluate(model_base="RNN"):
device = 'cuda' if torch.cuda.is_available() else 'cpu'
tokenizer = JiebaTokenizer.from_vocab(MODEL_DIR / VOCAB_FILE)
model = ReviewAnalysisModel(tokenizer.vocab_size, tokenizer.pad_id).to(device)
model.load_state_dict(torch.load(f"{MODEL_DIR}/{model_base}_{BEST_MODEL}"))
print("模型加载成功!")
test_dataloader = get_dataloader(train=False)
acc = evaluate(model, test_dataloader, device, model_base)
print("评估结果:")
print("Accuracy: ", acc)
if __name__ == '__main__':
[run_evaluate(model_base=model_base) for model_base in ["RNN", "LSTM", "GRU"]]
4. 运行数据预处理代码

5. 运行训练代码




6. 运行评估代码

【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)