大语言模型底层架构你了解多少?大语言模型底层架构之一Transfomer
语言模型目标是建模自然语言的概率分布,在自然语言处理研究中具有重要的作用,是自然语言处理基础任务之一。大量的研究从n 元语言模型(n-gram Language Models)、神经语言模型(Neural Language Models,NLM)以及预训练语言模型(Pre-trained Language Models,PLM)等不同角度开展了系列工作。这些研究在不同阶段都对自然语言处理任务有着重要作用。随着基于Transformer 各类语言模型的发展以及预训练微调范式在自然语言处理各类任务中取得突破性进展,从2020 年OpenAI 发布GPT-3 开始,大语言模型研究也逐渐深入。虽然大语言模型的参数量巨大,通过有监督微调和强化学习能够完成非常多的任务,但是其基础理论也仍然离不开对语言的建模。
上篇文章介绍了大语言模型的发展史,本篇文章将首先介绍Transformer 结构,并在此基础上后面会介绍生成式预训练语言模型GPT、大语言模型网络结构和注意力机制优化以及相关实践。
一、Transformer 模型
Transformer 模型是由谷歌在2017 年提出并首先应用于机器翻译的神经网络模型结构。机器翻译的目标是从源语言(Source Language)转换到目标语言(Target Language)。Transformer 结构完全通过注意力机制完成对源语言序列和目标语言序列全局依赖的建模。当前几乎全部大语言模型都是基于Transformer 结构,本节以应用于机器翻译的基于Transformer 的编码器和解码器介绍该模型。
基于Transformer 结构的编码器和解码器结构如图1.1所示,左侧和右侧分别对应着编码器(Encoder)和解码器(Decoder)结构。它们均由若干个基本的Transformer 块(Block)组成(对应着图中的灰色框)。这里N× 表示进行了N 次堆叠。每个Transformer 块都接收一个向量序列 作为输入,并输出一个等长的向量序列作为输出。这里的xi 和yi 分别对应着文本序列中的一个单词的表示。而yi 是当前Transformer 块对输入xi 进一步整合其上下文语义后对应的输出。在从输入 到输出 的语义抽象过程中,主要涉及到如下几个模块:
• 注意力层:使用多头注意力(Multi-Head Attention)机制整合上下文语义,它使得序列中任意两个单词之间的依赖关系可以直接被建模而不基于传统的循环结构,从而更好地解决文本的长程依赖。
• 位置感知前馈层(Position-wise FFN):通过全连接层对输入文本序列中的每个单词表示进行更复杂的变换。
• 残差连接:对应图中的Add 部分。它是一条分别作用在上述两个子层当中的直连通路,被用于连接它们的输入与输出。从而使得信息流动更加高效,有利于模型的优化。
• 层归一化:对应图中的Norm 部分。作用于上述两个子层的输出表示序列中,对表示序列进行层归一化操作,同样起到稳定优化的作用。
图1.1 基于Transformer 的编码器和解码器结构
接下来将依次介绍各个模块的具体功能和实现方法。
1.1 嵌入表示层
对于输入文本序列,首先通过输入嵌入层(Input Embedding)将每个单词转换为其相对应的向量表示。通常直接对每个单词创建一个向量表示。由于Transfomer 模型不再使用基于循环的方式建模文本输入,序列中不再有任何信息能够提示模型单词之间的相对位置关系。在送入编码器端建模其上下文语义之前,一个非常重要的操作是在词嵌入中加入位置编码(Positional Encoding)这一特征。具体来说,序列中每一个单词所在的位置都对应一个向量。这一向量会与单词表示对应相加并送入到后续模块中做进一步处理。在训练的过程当中,模型会自动地学习到如何利用这部分位置信息。为了得到不同位置对应的编码,Transformer 模型使用不同频率的正余弦函数如下所示:
其中,pos 表示单词所在的位置,2i 和2i+1 表示位置编码向量中的对应维度,d 则对应位置编码的总维度。通过上面这种方式计算位置编码有这样几个好处:首先,正余弦函数的范围是在[-1,+1],导出的位置编码与原词嵌入相加不会使得结果偏离过远而破坏原有单词的语义信息。其次,依据三角函数的基本性质,可以得知第pos+k 个位置的编码是第pos 个位置的编码的线性组合,这就意味着位置编码中蕴含着单词之间的距离信息。使用Pytorch 实现的位置编码参考代码如下:
class PositionalEncoder(nn.Module):
def __init__(self, d_model, max_seq_len = 80):
super().__init__()
self.d_model = d_model
# 根据pos 和i 创建一个常量PE 矩阵
pe = torch.zeros(max_seq_len, d_model)
for pos in range(max_seq_len):
for i in range(0, d_model, 2):
pe[pos, i] = math.sin(pos / (10000 ** ((2 * i)/d_model)))
pe[pos, i + 1] = math.cos(pos / (10000 ** ((2 * (i + 1))/d_model)))
pe = pe.unsqueeze(0)
self.register_buffer('pe', pe)
def forward(self, x):
# 使得单词嵌入表示相对大一些
x = x * math.sqrt(self.d_model)
# 增加位置常量到单词嵌入表示中
seq_len = x.size(1)
x = x + Variable(self.pe[:,:seq_len], requires_grad=False).cuda()
1.2 注意力层
自注意力(Self-Attention)操作是基于Transformer 的机器翻译模型的基本操作,在源语言的编码和目标语言的生成中频繁地被使用以建模源语言、目标语言任意两个单词之间的依赖关系。给定由单词语义嵌入及其位置编码叠加得到的输入表示{xi ∈ Rd}ti=1,为了实现对上下文语义依赖的建模,进一步引入在自注意力机制中涉及到的三个元素:查询qi(Query),键ki(Key),值vi(Value)。在编码输入序列中每一个单词的表示的过程中,这三个元素用于计算上下文单词所对应的权重得分。直观地说,这些权重反映了在编码当前单词的表示时,对于上下文不同部分所需要的关注程度。具体来说,如图1.2所示,通过三个线性变换WQ ∈ Rd×dq,WK ∈ Rd×dk,WV ∈ Rd×dv将输入序列中的每一个单词表示xi 转换为其对应的qi ∈ Rdk,ki ∈ Rdk,vi ∈ Rdv 向量。
1.2 自注意力机制中的查询、键、值向量
为了得到编码单词xi 时所需要关注的上下文信息,通过位置i 查询向量与其他位置的键向量做点积得到匹配分数qi · k1, qi · k2, ..., qi · kt。为了防止过大的匹配分数在后续Softmax 计算过程中导致的梯度爆炸以及收敛效率差的问题,这些得分会除放缩因子√d 以稳定优化。放缩后的得分经过Softmax 归一化为概率之后,与其他位置的值向量相乘来聚合希望关注的上下文信息,并最小化不相关信息的干扰。上述计算过程可以被形式化地表述如下:
其中Q ∈ RL×dq ,K ∈ RL×dk ,V ∈ Rd×dv 分别表示输入序列中的不同单词的q, k, v 向量拼接组成的矩阵,L 表示序列长度,Z ∈ RL×dv 表示自注意力操作的输出。为了进一步增强自注意力机制聚合上下文信息的能力,提出了多头自注意力(Multi-head Attention)的机制,以关注上下文的不同侧面。具体来说,上下文中每一个单词的表示xi 经过多组线性{WQj WKj WVj}Nj=1 映射到不同的表示子空间中。公式会在不同的子空间中分别计算并得到不同的上下文相关的单词序列表示{Zj}Nj=1。最终,线性变换WO ∈ R(Ndv)×d 用于综合不同子空间中的上下文表示并形成自注意力层最终的输出{xi ∈ Rd}ti=1。
使用Pytorch 实现的自注意力层参考代码如下:
class MultiHeadAttention(nn.Module):
def __init__(self, heads, d_model, dropout = 0.1):
super().__init__()
self.d_model = d_model
self.d_k = d_model // heads
self.h = heads
self.q_linear = nn.Linear(d_model, d_model)
self.v_linear = nn.Linear(d_model, d_model)
self.k_linear = nn.Linear(d_model, d_model)
self.dropout = nn.Dropout(dropout)
self.out = nn.Linear(d_model, d_model)
def attention(q, k, v, d_k, mask=None, dropout=None):
scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(d_k)
# 掩盖掉那些为了填补长度增加的单元,使其通过softmax 计算后为0
if mask is not None:
mask = mask.unsqueeze(1)
scores = scores.masked_fill(mask == 0, -1e9)
scores = F.softmax(scores, dim=-1)
if dropout is not None:
scores = dropout(scores)
output = torch.matmul(scores, v)
return output
def forward(self, q, k, v, mask=None):
bs = q.size(0)
# 进行线性操作划分为成h 个头
k = self.k_linear(k).view(bs, -1, self.h, self.d_k)
q = self.q_linear(q).view(bs, -1, self.h, self.d_k)
v = self.v_linear(v).view(bs, -1, self.h, self.d_k)
# 矩阵转置
k = k.transpose(1,2)
q = q.transpose(1,2)
v = v.transpose(1,2)
# 计算attention
scores = attention(q, k, v, self.d_k, mask, self.dropout)
# 连接多个头并输入到最后的线性层
concat = scores.transpose(1,2).contiguous().view(bs, -1, self.d_model)
output = self.out(concat)
return output
1.3 前馈层
前馈层接受自注意力子层的输出作为输入,并通过一个带有Relu 激活函数的两层全连接网络对输入进行更加复杂的非线性变换。实验证明,这一非线性变换会对模型最终的性能产生十分重要的影响。
其中W1, b1,W2, b2 表示前馈子层的参数。实验结果表明,增大前馈子层隐状态的维度有利于提升最终翻译结果的质量,因此,前馈子层隐状态的维度一般比自注意力子层要大。
使用Pytorch 实现的前馈层参考代码如下:
class FeedForward(nn.Module):
def __init__(self, d_model, d_ff=2048, dropout = 0.1):
super().__init__()
# d_ff 默认设置为2048
self.linear_1 = nn.Linear(d_model, d_ff)
self.dropout = nn.Dropout(dropout)
self.linear_2 = nn.Linear(d_ff, d_model)
def forward(self, x):
x = self.dropout(F.relu(self.linear_1(x)))
x = self.linear_2(x)
1.4 残差连接与层归一化
由Transformer 结构组成的网络结构通常都是非常庞大。编码器和解码器均由很多层基本的Transformer 块组成,每一层当中都包含复杂的非线性映射,这就导致模型的训练比较困难。因此,研究者们在Transformer 块中进一步引入了残差连接与层归一化技术以进一步提升训练的稳定性。具体来说,残差连接主要是指使用一条直连通道直接将对应子层的输入连接到输出上去,从而避免由于网络过深在优化过程中潜在的梯度消失问题:
其中表示第l 层的输入,f(·) 表示一个映射函数。此外,为了进一步使得每一层的输入输出范围稳定在一个合理的范围内,层归一化技术被进一步引入每个Transformer 块的当中:
其中μ 和σ 分别表示均值和方差,用于将数据平移缩放到均值为0,方差为1 的标准分布,α 和b是可学习的参数。层归一化技术可以有效地缓解优化过程中潜在的不稳定、收敛速度慢等问题。
使用Pytorch 实现的层归一化参考代码如下:
class NormLayer(nn.Module):
def __init__(self, d_model, eps = 1e-6):
super().__init__()
self.size = d_model
# 层归一化包含两个可以学习的参数
self.alpha = nn.Parameter(torch.ones(self.size))
self.bias = nn.Parameter(torch.zeros(self.size))
self.eps = eps
def forward(self, x):
norm = self.alpha * (x - x.mean(dim=-1, keepdim=True)) \
/ (x.std(dim=-1, keepdim=True) + self.eps) + self.bias
return norm
1.5 编码器和解码器结构
基于上述模块,根据图1.1所给出的网络架构,编码器端可以较为容易实现。相比于编码器端,解码器端要更复杂一些。具体来说,解码器的每个Transformer 块的第一个自注意力子层额外增加了注意力掩码,对应图中的掩码多头注意力(Masked Multi-Head Attention)部分。这主要是因为在翻译的过程中,编码器端主要用于编码源语言序列的信息,而这个序列是完全已知的,因而编码器仅需要考虑如何融合上下文语义信息即可。
而解码端则负责生成目标语言序列,这一生成过程是自回归的,即对于每一个单词的生成过程,仅有当前单词之前的目标语言序列是可以被观测的,因此这一额外增加的掩码是用来掩盖后续的文本信息,以防模型在训练阶段直接看到后续的文本序列进而无法得到有效地训练。此外,解码器端还额外增加了一个多头注意力(Multi-Head Attention)模块,使用交叉注意力(Cross-attention)方法,同时接收来自编码器端的输出以及当前Transformer 块的前一个掩码注意力层的输出。查询是通过解码器前一层的输出进行投影的,而键和值是使用编码器的输出进行投影的。
它的作用是在翻译的过程当中,为了生成合理的目标语言序列需要观测待翻译的源语言序列是什么。
基于上述的编码器和解码器结构,待翻译的源语言文本,首先经过编码器端的每个Transformer 块对其上下文语义的层层抽象,最终输出每一个源语言单词上下文相关的表示。解码器端以自回归的方式生成目标语言文本,即在每个时间步t,根据编码器端输出的源语言文本表示,以及前t − 1 个时刻生成的目标语言文本,生成当前时刻的目标语言单词。使用Pytorch 实现的编码器参考代码如下:
class EncoderLayer(nn.Module):
def __init__(self, d_model, heads, dropout=0.1):
super().__init__()
self.norm_1 = Norm(d_model)
self.norm_2 = Norm(d_model)
self.attn = MultiHeadAttention(heads, d_model, dropout=dropout)
self.ff = FeedForward(d_model, dropout=dropout)
self.dropout_1 = nn.Dropout(dropout)
self.dropout_2 = nn.Dropout(dropout)
def forward(self, x, mask):
x2 = self.norm_1(x)
x = x + self.dropout_1(self.attn(x2,x2,x2,mask))
x2 = self.norm_2(x)
x = x + self.dropout_2(self.ff(x2))
return x
class Encoder(nn.Module):
def __init__(self, vocab_size, d_model, N, heads, dropout):
super().__init__()
self.N = N
self.embed = Embedder(vocab_size, d_model)
self.pe = PositionalEncoder(d_model, dropout=dropout)
self.layers = get_clones(EncoderLayer(d_model, heads, dropout), N)
self.norm = Norm(d_model)
def forward(self, src, mask):
x = self.embed(src)
x = self.pe(x)
for i in range(self.N):
x = self.layers[i](x, mask)
return self.norm(x)
使用Pytorch 实现的解码器参考代码如下:
class DecoderLayer(nn.Module):
def __init__(self, d_model, heads, dropout=0.1):
super().__init__()
self.norm_1 = Norm(d_model)
self.norm_2 = Norm(d_model)
self.norm_3 = Norm(d_model)
self.dropout_1 = nn.Dropout(dropout)
self.dropout_2 = nn.Dropout(dropout)
self.dropout_3 = nn.Dropout(dropout)
self.attn_1 = MultiHeadAttention(heads, d_model, dropout=dropout)
self.attn_2 = MultiHeadAttention(heads, d_model, dropout=dropout)
self.ff = FeedForward(d_model, dropout=dropout)
def forward(self, x, e_outputs, src_mask, trg_mask):
x2 = self.norm_1(x)
x = x + self.dropout_1(self.attn_1(x2, x2, x2, trg_mask))
x2 = self.norm_2(x)
x = x + self.dropout_2(self.attn_2(x2, e_outputs, e_outputs, \
src_mask))
x2 = self.norm_3(x)
x = x + self.dropout_3(self.ff(x2))
return x
class Decoder(nn.Module):
def __init__(self, vocab_size, d_model, N, heads, dropout):
super().__init__()
self.N = N
self.embed = Embedder(vocab_size, d_model)
self.pe = PositionalEncoder(d_model, dropout=dropout)
self.layers = get_clones(DecoderLayer(d_model, heads, dropout), N)
self.norm = Norm(d_model)
def forward(self, trg, e_outputs, src_mask, trg_mask):
x = self.embed(trg)
x = self.pe(x)
for i in range(self.N):
x = self.layers[i](x, e_outputs, src_mask, trg_mask)
return self.norm(x)
最终基于Transformer 的编码器和解码器结构整体实现参考代码如下:
class Transformer(nn.Module):
def __init__(self, src_vocab, trg_vocab, d_model, N, heads, dropout):
super().__init__()
self.encoder = Encoder(src_vocab, d_model, N, heads, dropout)
self.decoder = Decoder(trg_vocab, d_model, N, heads, dropout)
self.out = nn.Linear(d_model, trg_vocab)
def forward(self, src, trg, src_mask, trg_mask):
e_outputs = self.encoder(src, src_mask)
d_output = self.decoder(trg, e_outputs, src_mask, trg_mask)
output = self.out(d_output)
return output
基于上述模型结构,可以使用如下代码进行模型训练和测试:
# 模型参数定义
d_model = 512
heads = 8
N = 6
src_vocab = len(EN_TEXT.vocab)
trg_vocab = len(FR_TEXT.vocab)
model = Transformer(src_vocab, trg_vocab, d_model, N, heads)
for p in model.parameters():
if p.dim() > 1:
nn.init.xavier_uniform_(p)
optim = torch.optim.Adam(model.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9)
# 模型训练
def train_model(epochs, print_every=100):
model.train()
start = time.time()
temp = start
total_loss = 0
for epoch in range(epochs):
for i, batch in enumerate(train_iter):
src = batch.English.transpose(0,1)
# the French sentence we input has all words except
# the last, as it is using each word to predict the next
trg_input = trg[:, :-1]
# the words we are trying to predict
targets = trg[:, 1:].contiguous().view(-1)
# create function to make masks using mask code above
src_mask, trg_mask = create_masks(src, trg_input)
preds = model(src, trg_input, src_mask, trg_mask)
optim.zero_grad()
loss = F.cross_entropy(preds.view(-1, preds.size(-1)),
results, ignore_index=target_pad)
loss.backward()
optim.step()
total_loss += loss.data[0]
if (i + 1) % print_every == 0:
loss_avg = total_loss / print_every
print("time = %dm, epoch %d, iter = %d, loss = %.3f,
%ds per %d iters" % ((time.time() - start) // 60,
epoch + 1, i + 1, loss_avg, time.time() - temp,
print_every))
total_loss = 0
temp = time.time()
# 模型测试
def translate(model, src, max_len = 80, custom_string=False):
model.eval()
if custom_sentence == True:
src = tokenize_en(src)
sentence=Variable(torch.LongTensor([[EN_TEXT.vocab.stoi[tok] for tok in sentence]])).cuda()
src_mask = (src != input_pad).unsqueeze(-2)
e_outputs = model.encoder(src, src_mask)
outputs = torch.zeros(max_len).type_as(src.data)
outputs[0] = torch.LongTensor([FR_TEXT.vocab.stoi['<sos>']])
for i in range(1, max_len):
trg_mask = np.triu(np.ones((1, i, i),k=1).astype('uint8')
trg_mask= Variable(torch.from_numpy(trg_mask) == 0).cuda()
out = model.out(model.decoder(outputs[:i].unsqueeze(0),e_outputs, src_mask, trg_mask))
out = F.softmax(out, dim=-1)
val, ix = out[:, -1].data.topk(1)
outputs[i] = ix[0][0]
if ix[0][0] == FR_TEXT.vocab.stoi['<eos>']:
break
return ' '.join(
[FR_TEXT.vocab.itos[ix] for ix in outputs[:i]]
)
接下来将重点介绍GPT 无监督预训练、有监督下游任务微调以及基于HuggingFace 的预训练语言模型实践。请持续关注,不要忘记点赞支持一下!!
文章参考链接:
https://blog.51cto.com/u_14439393/5751732
- 点赞
- 收藏
- 关注作者
评论(0)