大模型基础--情感分析任务的演进(传统序列模型篇)

举报
剑指南天 发表于 2026/04/26 17:30:21 2026/04/26
【摘要】 基于传统序列模型(RNN,LSTM和GRU)构建文本情感分类模型

1.需求说明

本案例的目标是基于传统序列模型(RNN,LSTM和GRU)构建文本情感分类模型,对评论内容进行二分类判断(正面或负面)。

2.需求分析

数据来源:https://github.com/SophonPlus/ChineseNlpCorpus/blob/master/datasets/online_shopping_10_cats/online_shopping_10_cats.zip

模型结构设计:模型整体由Embedding层,序列模型和线性层构成。

训练方案:损失函数使用 BCEWithLogitsLoss,结合了sigmoid激活和二分类交叉熵计算,数值稳定且适合二分类任务。优化器使用Adam优化器进行参数更新,提升训练效率。

评估方案:模型训练完毕后,使用测试集统计正确率。

3.代码实现

3.1 构建词表和分词工具

import jieba

from nlp_tutorial.ch03_tradtional_model.rnn_project_review_analysis.src.d_config import *


# 自定义分词工具
class JiebaTokenizer:
    # 具有特殊标签的词
    unk_token = UNK_TOKEN
    pad_token = PAD_TOKEN

    def __init__(self, vocab_list):
        # 词表 列表的索引作为词的id
        self.vocab_list = vocab_list
        # 词表长度
        self.vocab_size = len(vocab_list)
        self.word2id = {}
        self.id2word = {}
        # id和词的映射
        for id, word in enumerate(vocab_list):
            self.word2id[word] = id
            self.id2word[id] = word
        # 特殊标签词的id
        self.unk_id = self.word2id[UNK_TOKEN]
        self.pad_id = self.word2id[PAD_TOKEN]

    # 分词工具,直接调用jieba的精确分词
    @staticmethod
    def tokenize(text):
        return jieba.lcut(text)

    # 将文本按照词表编码,超过长度SEQ_LEN,截取前SEQ_LEN,不超过则使用PAD_TOKEN的索引填充
    def encode(self, text, seq_len):
        tokens = self.tokenize(text)
        original_len_tokens = len(tokens)
        if original_len_tokens > seq_len:
            tokens = tokens[:seq_len]
        elif original_len_tokens < seq_len:
            tokens = tokens + [self.pad_token] * (seq_len - original_len_tokens)
        ids = [self.word2id.get(token, self.unk_id) for token in tokens]
        return ids

    # 构建词表和将词表存入本地文件
    @classmethod
    def build_vocab(cls, sentences, vocab_file_path):
        vocab_set = set()
        for sentence in sentences:
            vocab_set.update(jieba.lcut(sentence))
        vocab_list = [cls.pad_token, cls.unk_token] + list(vocab_set)
        print(f"词表大小: {len(vocab_list)}")
        with open(vocab_file_path, mode='w', encoding='utf-8') as f:
            f.writelines('\n'.join(vocab_list))

    # 读取词表文件为词表列表
    @classmethod
    def from_vocab(cls, vocab_file_path):
        with open(vocab_file_path, mode='r', encoding='utf-8') as f:
            vocab_list = [token.strip() for token in f.readlines()]
        tokenizer = cls(vocab_list)
        return tokenizer

3.2 原始数据进行预处理(主要是构建词表,以及分词后按照词表映射为id)

import pandas as pd
from sklearn.model_selection import train_test_split

from nlp_tutorial.ch03_tradtional_model.rnn_project_review_analysis.src.d_config import *
from nlp_tutorial.ch03_tradtional_model.rnn_project_review_analysis.src.h_jieba_tokenizer import JiebaTokenizer


def preprocess():
    print("开始数据预处理")
    df = pd.read_csv(RAW_DATA_DIR / RAW_DATA_FILE, usecols=['label', 'review'], encoding='utf-8',
                     keep_default_na=False)
    # 按照label进行划分
    train_df, test_df = train_test_split(df, test_size=0.2, stratify=df['label'])
    # 使用训练集,创建词表
    JiebaTokenizer.build_vocab(train_df['review'].to_list(), MODEL_DIR / VOCAB_FILE)
    # 从文件加载词表
    tokenizer = JiebaTokenizer.from_vocab(MODEL_DIR / VOCAB_FILE)
    # review进行分词,然后将词映射到id
    train_df['review'] = train_df['review'].apply(lambda review: tokenizer.encode(review, SEQ_LEN))
    test_df['review'] = test_df['review'].apply(lambda review: tokenizer.encode(review, SEQ_LEN))
    # 将训练数据和测试数据存入本次
    train_df.to_json(PROCESSED_DATA_DIR / TRAIN_DATA_FILE, orient="records", lines=True)
    test_df.to_json(PROCESSED_DATA_DIR / TEST_DATA_FILE, orient="records", lines=True)
    print("数据处理结束")


if __name__ == '__main__':
    preprocess()

3.3 自定义DataLoader(DataLoader会自动分批,还可以在批次内处理数据)

import pandas as pd
import torch
from torch.utils.data import Dataset,DataLoader
from nlp_tutorial.ch03_tradtional_model.rnn_project_review_analysis.src.d_config import *

# 自定义DatasetDataLoader依赖Dataset作为数据来源
class ReviewAnalysisDataset(Dataset):
    def __init__(self,path):
        # 加载数据
        self.data = pd.read_json(path,lines=True,orient="records").to_dict(orient='records')
    # 自定义Dataset必须实现两个内部函数
    def __len__(self):
        return len(self.data)
    def __getitem__(self, index):
        input = torch.tensor(self.data[index]['review'],dtype=torch.long)
        target = torch.tensor(self.data[index]['label'],dtype=torch.float)
        return input,target
# 获取DataLoader
def get_dataloader(train=True):
    path = PROCESSED_DATA_DIR/(TRAIN_DATA_FILE if train else TEST_DATA_FILE)
    dataset = ReviewAnalysisDataset(path)
    # Dataset进行分批,分批的策略是洗牌随机
    dataloader = DataLoader(dataset,batch_size=BATCH_SIZE,shuffle=True)
    return dataloader

3.4 定义模型

import torch.nn as nn
import torch
from nlp_tutorial.ch03_tradtional_model.rnn_project_review_analysis.src.d_config import *


class RecurrentNeuralNetwork(nn.Module):
    def __init__(self, input_size, hidden_size):
        super().__init__()
        self.rnn = nn.ModuleDict({"RNN": nn.RNN(input_size=input_size, hidden_size=hidden_size, batch_first=True),
                                  "LSTM": nn.LSTM(input_size=input_size, hidden_size=hidden_size, batch_first=True),
                                  "GRU": nn.GRU(input_size=input_size, hidden_size=hidden_size, batch_first=True)})

    def forward(self, X, type="RNN"):
        assert type in self.rnn.keys(), "请检查序列模型的简写名称,注意必须大写"
        return self.rnn[type](X)


class ReviewAnalysisModel(nn.Module):
    def __init__(self, vocab_size, padding_idx):
        super().__init__()
        # padding_idx的作用是对于矩阵中填充词不进行梯度计算
        self.padding_idx = padding_idx
        self.embedding = nn.Embedding(num_embeddings=vocab_size, embedding_dim=EMBEDDING_SIZE, padding_idx=padding_idx)
        self.rnn = RecurrentNeuralNetwork(input_size=EMBEDDING_SIZE, hidden_size=HIDDEN_SIZE)
        self.linear = nn.Linear(in_features=HIDDEN_SIZE, out_features=1)

    def forward(self, X, type="RNN"):
        embed = self.embedding(X)
        rnn = self.rnn(embed, type)
        output = rnn[0]
        # 计算每个序列的真实长度(padding填充)
        len_valid = (X != self.padding_idx).sum(dim=1)
        indices = torch.arange(output.shape[0])
        # 使用列表索引
        feature = output[indices, len_valid - 1]
        return self.linear(feature).squeeze(-1)

3.5 配置信息(按照配置文件创建对应文件夹)

from pathlib import Path

ROOT_DIR = Path(__file__).parent.parent
RAW_DATA_DIR = ROOT_DIR / 'data' / 'raw'
PROCESSED_DATA_DIR = ROOT_DIR / 'data' / 'processed'
MODEL_DIR = ROOT_DIR / 'models'
LOG_DIR = ROOT_DIR / 'logs'

RAW_DATA_FILE = 'online_shopping_10_cats.csv'
TRAIN_DATA_FILE = 'train.jsonl'
TEST_DATA_FILE = 'test.jsonl'
VOCAB_FILE = 'vocab.txt'
BEST_MODEL = 'best_model.pt'

UNK_TOKEN = '<UNK>'
PAD_TOKEN = '<PAD>'

SEQ_LEN = 128
BATCH_SIZE = 64
EMBEDDING_SIZE = 128
HIDDEN_SIZE = 256

LEARN_RATE = 5e-4
EPOCHS = 20

SAMPLE_FRACTION = 1.0

3.6 定义训练过程

import time

import torch
from torch import nn, optim
from torch.utils.tensorboard import SummaryWriter
from tqdm import tqdm
from nlp_tutorial.ch03_tradtional_model.rnn_project_review_analysis.src.h_jieba_tokenizer import JiebaTokenizer
from nlp_tutorial.ch03_tradtional_model.rnn_project_review_analysis.src.b_dataset import get_dataloader
from nlp_tutorial.ch03_tradtional_model.rnn_project_review_analysis.src.c_model import ReviewAnalysisModel
from nlp_tutorial.ch03_tradtional_model.rnn_project_review_analysis.src.d_config import *


def train_one_epoch(model, train_loader, loss, optimizer, device,model_base):
    model.train()
    total_loss = 0.0
    for input, target in tqdm(train_loader, desc='训练: '):
        # 将训练的数据加载到指定计算资源
        input, target = input.to(device), target.to(device)
        # 启动模型
        output = model(input,type=model_base)
        # 计算损失
        loss_value = loss(output, target)
        loss_value.backward()
        optimizer.step()
        optimizer.zero_grad()
        total_loss += loss_value.item()
    return total_loss / len(train_loader)


def train(model_base="RNN"):
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    # 获取训练数据
    train_loader = get_dataloader()
    # 从文件加载词表
    tokenizer = JiebaTokenizer.from_vocab(MODEL_DIR/VOCAB_FILE)
    # 初始化模型,并传递指定计算资源
    model = ReviewAnalysisModel(tokenizer.vocab_size,tokenizer.pad_id).to(device)
    # 定义损失函数
    loss = nn.BCEWithLogitsLoss()
    # 定义优化器
    optimizer = optim.Adam(lr=LEARN_RATE, params=model.parameters())
    # 友好显示训练过程
    min_loss = float('inf')
    with SummaryWriter(log_dir=LOG_DIR / time.strftime('%Y-%m-%d_%H-%M-%S')) as writer:
        for epoch in range(EPOCHS):
            print("=" * 10, f"EPOCH:{epoch + 1}", "=" * 10)
            time.sleep(0.1)
            this_loss = train_one_epoch(model=model, train_loader=train_loader, loss=loss, optimizer=optimizer,
                                        device=device,model_base=model_base)
            print("this loss: ", this_loss)
            writer.add_scalar('loss', this_loss, epoch + 1)

            if this_loss < min_loss:
                min_loss = this_loss
                torch.save(model.state_dict(), f"{MODEL_DIR}/{model_base}_{BEST_MODEL}")
                print("模型保存成功!")


if __name__ == '__main__':
    [train(model_base=model_base) for model_base in ["RNN","LSTM","GRU"]]

3.7 模型评估

import torch
from tqdm import tqdm

from nlp_tutorial.ch03_tradtional_model.rnn_project_review_analysis.src.b_dataset import get_dataloader
from nlp_tutorial.ch03_tradtional_model.rnn_project_review_analysis.src.c_model import ReviewAnalysisModel
from nlp_tutorial.ch03_tradtional_model.rnn_project_review_analysis.src.d_config import *
from nlp_tutorial.ch03_tradtional_model.rnn_project_review_analysis.src.h_jieba_tokenizer import JiebaTokenizer


def predict_batch(model, input, model_base):
    model.eval()
    with torch.no_grad():
        output = model(input, type=model_base)
        output = torch.sigmoid(output)
    return output.tolist()


def evaluate(model, dataloader, device, model_base):
    correct_count = 0.0
    total_count = 0.0
    for inputs, targets in tqdm(dataloader, "评估模型中: "):
        inputs, targets = inputs.to(device), targets.to(device)
        batch_result = predict_batch(model, inputs, model_base)
        for target, result in zip(targets, batch_result):
            total_count += 1
            result = 1 if result > 0.5 else 0
            if result == target:
                correct_count += 1.0
    return correct_count / total_count


def run_evaluate(model_base="RNN"):
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    tokenizer = JiebaTokenizer.from_vocab(MODEL_DIR / VOCAB_FILE)
    model = ReviewAnalysisModel(tokenizer.vocab_size, tokenizer.pad_id).to(device)
    model.load_state_dict(torch.load(f"{MODEL_DIR}/{model_base}_{BEST_MODEL}"))
    print("模型加载成功!")
    test_dataloader = get_dataloader(train=False)
    acc = evaluate(model, test_dataloader, device, model_base)
    print("评估结果:")
    print("Accuracy: ", acc)


if __name__ == '__main__':
    [run_evaluate(model_base=model_base) for model_base in ["RNN", "LSTM", "GRU"]]

4. 运行数据预处理代码

5. 运行训练代码

6. 运行评估代码


【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。