用 AI 方式来重读 Transformer 论文

举报
deli007 发表于 2026/01/07 10:26:08 2026/01/07
【摘要】   如果我要学习大模型技术,按照原来的学习方式,我需要先读谷歌的论文《Attention is all you need》,然后学习Transformer原理,然后上网搜demo,运行,到最后理解,整个过程大约需要2周,前提是还得懂得机器学习、深度学习的基本原理和实操代码。    我尝试用AI来学习大模型,大约前后用了两天时间。首先直接问通义千问,让其中英文对照并详细解释论文《Attenti...

  如果我要学习大模型技术,按照原来的学习方式,我需要先读谷歌的论文《Attention is all you need》,然后学习Transformer原理,然后上网搜demo,运行,到最后理解,整个过程大约需要2周,前提是还得懂得机器学习、深度学习的基本原理和实操代码。


    我尝试用AI来学习大模型,大约前后用了两天时间。首先直接问通义千问,让其中英文对照并详细解释论文《Attention is all you need》,这样对于英语不太好的我来说,既能看到英文原话,也能及时看到中文翻译和解释,保持了英文原汁原味,又能借助中文翻译和解释吸收其理论要点。这大概需要2~3个小时,读完论文对原理就有了一个大概的概念了。然后再来理解这张图,它浓缩了论文的所有关键步骤,再把这张图也丢给千问,让它详细解释每一步,借助千问对这张图的解读,对tansformer的关键步骤就有了理解了,它不仅告诉我们关键顺序,还把每个环节起什么作用告诉我们了。

    

tf.png

    至此,我们如果对里面具体某个概念不清楚,例如 no recurrence 为什么就能并行计算,训练更快?那再把这个问题丢给千问,就能进一步理解原来用RNN处理长序列的不足点在哪,Transformer的设计相比RNN高明之处在哪了。再例如,为什么decoder要把output右移一位?看看千问怎么说.

    掌握了理论后,接下来就是要动手实践了,传统学习动手时,得去网上搜示例代码,费时间筛选不说,复制下来还不一定能正常运行。现在直接用AI Coding工具可以生成一个大语言模型训练demo示例,它会直接生成一个用pytorch高阶api实现的transfomer模型结构,这个代码结合论文图来看,可以看到大概的过程,先是对token和pos进行编码,然后把他们相加,加完之后传给多头注意力层,再线性化,完成了Encoder过程。但这个过程,pytorch用了不少高阶api,屏蔽了论文里的一些模型结构细节,还是很难跟模型完全对应上。但能够快速理解整体流程。

class MiniGPT(nn.Module):
def __init__(self, vocab_size, d_model, nhead, num_layers, max_seq_len=512):
super().__init__()
self.token_emb = nn.Embedding(vocab_size, d_model)
self.pos_emb = nn.Embedding(max_seq_len, d_model)
self.dropout = nn.Dropout(0.1)

decoder_layer = nn.TransformerDecoderLayer(
d_model=d_model,
nhead=nhead,
dim_feedforward=d_model * 4,
batch_first=True,
dropout=0.1,
activation="gelu"
)

self.transformer = nn.TransformerDecoder(decoder_layer, num_layers=num_layers)
self.lm_head = nn.Linear(d_model, vocab_size, bias=False)
self.lm_head.weight = self.token_emb.weight # 权重绑定

self.apply(self._init_weights)


def _init_weights(self, module):
if isinstance(module, nn.Linear):
torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
if module.bias is not None:
torch.nn.init.zeros_(module.bias)
elif isinstance(module, nn.Embedding):
torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)

def forward(self, x):
B, L = x.shape
tok_emb = self.token_emb(x)
pos_ids = torch.arange(L, device=x.device).unsqueeze(0)
x = self.dropout(tok_emb + self.pos_emb(pos_ids))

# Causal mask
mask = torch.triu(torch.ones(L, L), diagonal=1).bool().to(x.device)

output = self.transformer(x, x, tgt_mask=mask)
return self.lm_head(output)


    接下来,我们可以让AI Coding工具生成一个完全展开实现的tansformer模型结构,不用高阶api,这样它可以所有关键环节全部展开,让我们能够完全与论文细节对应上,阅读完论文,再理解和运行完这个代码,实际训练一个大模型demo(虽然是完全虚拟的数据集),但对transformer模型结构和训练过程会有了清晰的认知。

import torch
import torch.nn as nn
import math
import numpy as np
from torch.utils.data import Dataset, DataLoader
import copy

# 设置随机种子
torch.manual_seed(42)

np.random.seed(42)

# 定义Transformer模型的组件
class PositionalEncoding(nn.Module):

"""位置编码层"""
def __init__(self, d_model, max_len=5000):

super(PositionalEncoding, self).__init__()

# 创建位置编码矩阵
pe = torch.zeros(max_len, d_model)

position = torch.arange(0, max_len).unsqueeze(1).float()

# 使用sincos函数计算位置编码
div_term = torch.exp(torch.arange(0, d_model, 2).float() *

-(math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term) # 偶数位置
pe[:, 1::2] = torch.cos(position * div_term) # 奇数位置

# 注册为buffer,这样它不会被优化器更新
self.register_buffer('pe', pe.unsqueeze(0))


def forward(self, x):
# 将位置编码添加到输入中
return x + self.pe[:, :x.size(1)]


class MultiHeadAttention(nn.Module):
"""多头注意力机制"""
def __init__(self, d_model, num_heads):

super(MultiHeadAttention, self).__init__()
assert d_model % num_heads == 0

self.d_model = d_model

self.num_heads = num_heads
self.d_k = d_model // num_heads

# 定义线性变换层
self.W_q = nn.Linear(d_model, d_model)

self.W_k = nn.Linear(d_model, d_model)
self.W_v = nn.Linear(d_model, d_model)
self.W_o = nn.Linear(d_model, d_model)

def forward(self, query, key, value, mask=None):
batch_size = query.size(0)

# 线性变换
Q = self.W_q(query)

K = self.W_k(key)
V = self.W_v(value)

# Q, K, V分割成多头
Q = Q.view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)

K = K.view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
V = V.view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)

# 计算注意力分数
scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)


# 应用掩码(如果提供)
if mask is not None:

scores = scores.masked_fill(mask == 0, -1e9)

# 应用softmax
attention_weights = torch.softmax(scores, dim=-1)


# 计算输出
output = torch.matmul(attention_weights, V)


# 重新组合多头
output = output.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model)


# 最后的线性变换
return self.W_o(output)


class FeedForward(nn.Module):
"""前馈网络"""
def __init__(self, d_model, d_ff):

super(FeedForward, self).__init__()
self.linear1 = nn.Linear(d_model, d_ff)
self.linear2 = nn.Linear(d_ff, d_model)
self.relu = nn.ReLU()

def forward(self, x):
return self.linear2(self.relu(self.linear1(x)))

class EncoderLayer(nn.Module):
"""编码器层"""
def __init__(self, d_model, num_heads, d_ff, dropout=0.1):

super(EncoderLayer, self).__init__()

self.self_attention = MultiHeadAttention(d_model, num_heads)
self.feed_forward = FeedForward(d_model, d_ff)

self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)

self.dropout = nn.Dropout(dropout)

def forward(self, x, mask=None):
# 自注意力机制
attended = self.self_attention(x, x, x, mask)

x = self.norm1(x + self.dropout(attended))

# 前馈网络
ff_out = self.feed_forward(x)

x = self.norm2(x + self.dropout(ff_out))

return x

class DecoderLayer(nn.Module):
"""解码器层"""
def __init__(self, d_model, num_heads, d_ff, dropout=0.1):

super(DecoderLayer, self).__init__()

self.self_attention = MultiHeadAttention(d_model, num_heads)
self.cross_attention = MultiHeadAttention(d_model, num_heads)
self.feed_forward = FeedForward(d_model, d_ff)

self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.norm3 = nn.LayerNorm(d_model)

self.dropout = nn.Dropout(dropout)

def forward(self, x, encoder_output, src_mask=None, tgt_mask=None):
# 自注意力机制
attended = self.self_attention(x, x, x, tgt_mask)

x = self.norm1(x + self.dropout(attended))

# 交叉注意力机制
attended = self.cross_attention(x, encoder_output, encoder_output, src_mask)

x = self.norm2(x + self.dropout(attended))

# 前馈网络
ff_out = self.feed_forward(x)

x = self.norm3(x + self.dropout(ff_out))

return x

class Transformer(nn.Module):
"""完整的Transformer模型"""
def __init__(self, src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff, max_seq_length, dropout=0.1):

super(Transformer, self).__init__()

self.d_model = d_model

# 词嵌入层
self.src_embedding = nn.Embedding(src_vocab_size, d_model)

self.tgt_embedding = nn.Embedding(tgt_vocab_size, d_model)

# 位置编码
self.positional_encoding = PositionalEncoding(d_model, max_seq_length)


# 编码器和解码器层
self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])

self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])

# 输出线性层
self.fc_out = nn.Linear(d_model, tgt_vocab_size)

self.dropout = nn.Dropout(dropout)

def make_src_mask(self, src):
# 为源序列创建掩码(通常在机器翻译中用于忽略填充标记)
src_mask = (src != 0).unsqueeze(1).unsqueeze(2)

return src_mask

def make_tgt_mask(self, tgt):
# 为目标序列创建掩码(用于防止模型看到未来的信息)
tgt_len = tgt.size(1)

tgt_mask = (tgt != 0).unsqueeze(1).unsqueeze(3)
subsequent_mask = torch.tril(torch.ones((tgt_len, tgt_len), device=tgt.device)).unsqueeze(0).unsqueeze(1)
tgt_mask = tgt_mask & subsequent_mask.bool()
return tgt_mask

def forward(self, src, tgt):
# 获取掩码
src_mask = self.make_src_mask(src)

tgt_mask = self.make_tgt_mask(tgt)

# 词嵌入
src_embedded = self.dropout(self.positional_encoding(self.src_embedding(src) * math.sqrt(self.d_model)))

tgt_embedded = self.dropout(self.positional_encoding(self.tgt_embedding(tgt) * math.sqrt(self.d_model)))

# 编码器处理
enc_output = src_embedded

for layer in self.encoder_layers:
enc_output = layer(enc_output, src_mask)

# 解码器处理
dec_output = tgt_embedded

for layer in self.decoder_layers:
dec_output = layer(dec_output, enc_output, src_mask, tgt_mask)

# 输出层
output = self.fc_out(dec_output)


return output

# 创建一个简单的数据集
class SimpleDataset(Dataset):

def __init__(self, data_size=1000, seq_length=10):
# 创建简单的数字序列数据
self.data = []

for i in range(data_size):
# 创建一个简单的序列: 输入是[1,2,3,4,5],输出是[6,7,8,9,10]
src_seq = [j for j in range(1, seq_length//2 + 1)]

tgt_seq = [j for j in range(seq_length//2 + 1, seq_length + 1)]
self.data.append((src_seq, tgt_seq))

def __len__(self):
return len(self.data)

def __getitem__(self, idx):
src, tgt = self.data[idx]
return torch.tensor(src), torch.tensor(tgt)

def train_transformer():
"""训练Transformer模型的函数"""
print("开始训练Transformer模型...")


# 模型参数
src_vocab_size = 100 # 源词汇表大小
tgt_vocab_size = 100 # 目标词汇表大小
d_model = 512 # 模型维度
num_heads = 8 # 注意力头数
num_layers = 6 # 编码器和解码器层数
d_ff = 2048 # 前馈网络维度
max_seq_length = 20 # 最大序列长度
dropout = 0.1 # Dropout

# 创建模型
model = Transformer(src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff, max_seq_length, dropout)


# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss(ignore_index=0) # 忽略填充标记
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9)


# 创建数据集和数据加载器
dataset = SimpleDataset(data_size=1000, seq_length=max_seq_length//2)

dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

# 训练循环
model.train()

for epoch in range(10): # 训练10epoch
total_loss = 0
for batch_idx, (src, tgt) in enumerate(dataloader):

# 准备目标序列(输入是tgt[:-1],标签是tgt[1:]
tgt_input = tgt[:, :-1] # 不包括最后一个元素
tgt_output = tgt[:, 1:] # 不包括第一个元素

# 前向传播
output = model(src, tgt_input)


# 计算损失
loss = criterion(output.reshape(-1, tgt_vocab_size), tgt_output.reshape(-1))


# 反向传播
optimizer.zero_grad()

loss.backward()
optimizer.step()

total_loss += loss.item()

# 10个批次打印一次损失
if batch_idx % 10 == 0:

print(f'Epoch: {epoch}, Batch: {batch_idx}, Loss: {loss.item():.4f}')

avg_loss = total_loss / len(dataloader)
print(f'Epoch {epoch} completed, Average Loss: {avg_loss:.4f}')

print("模型训练完成!")

# 保存模型
torch.save(model.state_dict(), 'transformer_model.pth')

print("模型已保存为 'transformer_model.pth'")

return model

def inference_example(model):
"""推理示例"""
print("\n开始推理示例...")


# 使用训练好的模型进行推理
model.


# 创建一个简单的测试输入
with torch.no_grad():

test_src = torch.tensor([[1, 2, 3, 4, 5]]) # 简单的输入序列
test_tgt = torch.tensor([[1]]) # 初始目标序列(通常是起始标记)

# 自回归生成
for i in range(5): # 生成5个词
output = model(test_src, test_tgt)

# 获取最后一个位置的预测结果
next_word_logits = output[0, -1, :]

next_word = torch.argmax(next_word_logits, dim=-1).unsqueeze(0).unsqueeze(0)
test_tgt = torch.cat([test_tgt, next_word], dim=1)

print(f"输入序列: {test_src.tolist()}")
print(f"生成序列: {test_tgt.tolist()}")

def main():
"""主函数"""
print("=== Transformer论文模型实现演示 ===")

print("此代码实现了论文《Attention Is All You Need》中的完整Transformer架构")
print("包含以下关键组件:")
print("1. 位置编码 (Positional Encoding)")
print("2. 多头自注意力机制 (Multi-Head Self-Attention)")
print("3. 编码器和解码器层 (Encoder and Decoder Layers)")
print("4. 残差连接和层归一化 (Residual Connections & Layer Normalization)")
print("5. 前馈网络 (Feed-Forward Networks)")
print("6. 掩码机制 (Masking for Causal Decoding)")
print()

# 训练模型
model = train_transformer()


# 运行推理示例
inference_example(model)


print("\n=== 论文要点总结 ===")
print("1. 使用自注意力机制替代RNN/CNN")
print("2. 并行化处理提高训练效率")
print("3. 残差连接和层归一化稳定训练")
print("4. 位置编码保留序列顺序信息")
print("5. 多头注意力捕获不同子空间的依赖关系")

if __name__ == "__main__":
main()

  

     跑完这个demo后,也可以借助AI Coding去跑一些中译英真实数据集(前提是有GPU/NPU卡),会更有成就感。当然,基于AI再去学习大模型微调、蒸馏、推理,也就很容易了。


     学习完这一套组合拳后,会发现在AI时代,很多技术一旦公布,就没有秘密可言了。站在现在再去看2017年那几位作者,为何能写出这篇论文,我认为不是他们编程能力有多强,而是他们高等数学很厉害,且有各种实验调参的尝试,最后能够设计和验证出transformer这种新的模型架构。站在今天看未来,有了AI加持后,个人编程能力高低不重要了,但重要的是基础科学(数学、物理等知识)是否扎实,是否有创新思维,是否敢于不断尝试和验证,这些能力和品质更为重要了。

【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。