大模型LLM之分布式训练

举报
码上开花_Lancer 发表于 2024/05/06 15:02:03 2024/05/06
【摘要】     随着语言模型参数量和所需训练数据量的急速增长,单个机器上有限的资源已无法满足大语言模型训练的要求。需要设计分布式训练(Distributed Training)系统来解决海量的计算和内存资源要求问题。在分布式训练系统环境下需要将一个模型训练任务拆分成多个子任务,并将子任务分发给多个计算设备,从而解决资源瓶颈。但是如何才能利用包括数万计算加速芯片的集群,训练模型参数量千亿甚至是万亿的大...

    随着语言模型参数量和所需训练数据量的急速增长,单个机器上有限的资源已无法满足大语言模型训练的要求。需要设计分布式训练(Distributed Training)系统来解决海量的计算和内存资源要求问题。在分布式训练系统环境下需要将一个模型训练任务拆分成多个子任务,并将子任务分发给多个计算设备,从而解决资源瓶颈。但是如何才能利用包括数万计算加速芯片的集群,训练模型参数量千亿甚至是万亿的大规模语言模型?这其中涉及到集群架构、并行策略、模型架构、内存优化、计算优化等一系列的技术。

    我将详细介绍分布式机器学习系统的基础概念、分布式训练集群架构、分布式训练并行策略,并以DeepSpeed 为例介绍如何在集群上训练大语言模型。

一、分布式训练概述

    分布式训练(Distributed Training)是指将机器学习或深度学习模型训练任务分解成多个子任务,并在多个计算设备上并行地进行训练。图1给出了单个计算设备和多个计算设备的示例,这里计算设备可以是中央处理器(Central Processing Unit,CPU)图形处理器(Graphics Processing Unit,GPU)张量处理器(Tensor Processing Unit,TPU)也可以是神经网络处理器(Neural network Processing Unit,NPU)。由于同一个服务器内部的多个计算设备之间内存也可能并不共享,因此无论这些计算设备是否处于一个服务器还是多个服务器中,其系统架构都属于分布式系统范畴。一个模型训练任务往往会有大量的训练样本作为输入,可以利用一个计算设备完成,也可以将整个模型的训练任务拆分成子任务,分发给不同的计算设备,实现并行计算。此后,还需要对每个计算设备的输出进行合并,最终得到与单个计算设备等价的计算结果。由于每个计算设备只需要负责子任务,并且多个计算设备可以并行执行,因此其可以更快速地完成整体计算,并最终实现对整个计算过程的加速。

图1 单计算设备计算和多计算设备示例

    促使人们设计分布式训练系统的一个最重要的原因就是单个计算设备的算力已经不足以支撑模型训练。图2给出了机器学习模型对于算力的需求以及同期单个计算设备能够提供的算力。如图所示,机器学习模型快速发展,从2013 年AlexNet 开始,到2022 年拥有5400 亿参数的PalM 模型,机器学习模型以每18 个月增长56 倍的速度发展。模型参数规模增大的同时,对训练数据量的要求也指数级增长,这更加剧了对算力的需求。然而,近几年CPU 的算力增加已经远低于摩尔定律(Moore’s Law),虽然计算加速设备(如GPU、TPU 等)为机器学习模型提供了大量的算力,但是其增长速度仍然没有突破每18 个月翻倍的摩尔定律。为了能够满足机器学习模型的发展,只
有通过分布式训练系统才可以匹配模型不断增长的算力需求。

图2 机器学习模型参数量增长和计算硬件的算力增长对比

分布式训练的总体目标就是提升总的训练速度,减少模型训练的总体时间。总训练速度可以用如下公式简略估计:

总训练速度∝ 单设备计算速度× 计算设备总量× 多设备加速比

    其中,单设备计算速度主要由单块计算加速芯片的运算速度和数据I/O 能力来决定,对单设备训练效率进行优化,主要的技术手段有混合精度训练、算子融合、梯度累加等;分布式训练系统中计算设备数量越多,其理论峰值计算速度就会越高,但是受到通讯效率的影响,计算设备数量增大则会造成加速比急速降低;多设备加速比则是由计算和通讯效率决定,需要结合算法和网络拓扑结构进行优化,分布式训练并行策略主要目标就是提升分布式训练系统中的多设备加速比。

    大语言模型参数量和所使用的数据量都非常巨大,因此都采用了分布式训练架构完成训练。文献[5] 针对GPT-3 的训练过程仅介绍了训练过程全部使用NVIDIA V100 GPU,文献[31] 介绍了OPT 使用了992 块NVIDIA A100 80G GPU,采用全分片数据并行(Fully Shared Data Parallel)[129]以及Megatron-LM 张量并行(Tensor Parallelism)[130],整体训练时间将近2 个月。BLOOM[33] 模型的研究人员则公开了更多在硬件和所采用的系统架构方面的细节。该模型的训练一共花费3.5 个月,使用48 个计算节点。每个节点包含8 块NVIDIA A100 80G GPU(总计384 个GPU),并且使用4*NVLink 用于节点内部GPU 之间通信。节点之间采用四个Omni-Path 100 Gbps 网卡构建的增强8 维超立方体全局拓扑网络进行通信。文献[37] 并没有给出LLaMA 模型训练中所使用的集群的具体配置和网络拓扑结构,但是给出了不同参数规模的总GPU 小时数。LLaMA 模型训练采用A100-80GB GPU,LLaMA-7B 模型训练需要82432 GPU 小时,LLaMA-13B 模型训练需要135168GPU 小时,LLaMA-33B 模型训练花费了530432 GPU 小时,而LLaMA-65B 模型训练花费则高达1022362 GPU 小时。由于LLaMA 所使用的训练数据量远超OPT 和BLOOM 模型,因此,虽然模型参数量远小于上述两个模型,但是其所需计算量仍然非常惊人。

    通过使用分布式训练系统,大语言模型训练周期可以从单计算设备花费几十年,缩短到使用数千个计算设备花费几十天就可以完成。然而,分布式训练系统仍然需要克服计算墙、显存墙、通
信墙等多种挑战,以确保集群内的所有资源得到充分利用,从而加速训练过程并缩短训练周期。


• 计算墙:单个计算设备所能提供的计算能力与大语言模型所需的总计算量之间存在巨大差异。2022 年3 年发布的NVIDIA H100 SXM 的单卡FP16 算力也只有2000 TFLOPs,而GPT-3
  则需要314 ZFLOPs 的总算力,两者相差了8 个数量级。

• 显存墙:单个计算设备无法完整存储一个大语言模型的参数。GPT-3 包含1750 亿参数,如果采用FP16 格式进行存储,需要700GB 的计算设备内存空间,而NVIDIA H100 GPU 只有
  80 GB 显存。
• 通信墙:分布式训练系统中各计算设备之间需要频繁地进行参数传输和同步。由于通信的延迟和带宽限制,这可能成为训练过程的瓶颈。GPT-3 训练过程中,如果分布式系统中存在128
个模型副本,那么在每次迭代过程中至少需要传输89.6TB 的梯度数据。而截止2023 年8 月,单个InfiniBand 链路仅能够提供不超过800Gb/s 带宽。计算墙和显存墙源于单计算设备的计算和存储能力有限,与模型对庞大计算和存储需求之间存在矛盾。这个问题可以通过采用分布式训练方法来解决,但分布式训练又会面临通信墙的挑战。在多机多卡的训练中,这些问题逐渐显现。随着大模型参数的增大,对应的集群规模也随之增加,这些问题变得更加突出。同时,在大型集群进行长时间训练时,设备故障可能会影响或中断训练过程,对分布式系统的问题性也提出了很高要求。

二、分布式训练并行策略

    分布式训练系统目标就是将单节点模型训练转换成等价的分布式并行模型训练。对于大语言模型来说,训练过程就是根据数据和损失函数,利用优化算法对神经网络模型参数进行更新的过程。单节点模型训练系统结构如图3所示,主要由数据和模型两个部分组成。训练过程会由多个数据小批次(Mini-batch)完成。图中数据表示一个数据小批次。训练系统会利用数据小批次根据损失函数和优化算法生成梯度,从而对模型参数进行修正。针对大语言模型多层神经网络的执行过程,可以由一个计算图(Computational Graph)表示。这个图有多个相互连接的算子(Operator),每个算子实现一个神经网络层(Neural Network Layer),而参数则代表了这个层在训练中所更新的权重。

图3 单设备模型训练系统

    计算图的执行过程可以分为前向计算和反向计算两个阶段。前向计算的过程是将数据读入第一个算子,计算出相应的输出结构,然后依此重复这个前向计算过程,直到最后一个算子结束。反向计算过程,是根据优化函数和损失,每个算子依次计算出梯度,并利用梯度更新本地的参数。在反向计算结束后,该数据小批次的计算完成,系统就会读取下一个数据小批次,继续下一轮的模型参数更新。

根据单设备模型训练系统的流程,可以看到如果进行并行加速,可以从数据和模型两个维度进行考虑。首先可以对数据进行切分(Partition),并将同一个模型复制到多个设备上,并行执行不同的数据分片,这种方式通常被称为数据并行(Data Parallelism,DP)。还可以对模型进行划分,将模型中的算子分发到多个设备分别完成,这种方式通常被称为模型并行(Model Parallelism,MP)。当训练超大规模语言模型时,往往需要同时对数据和模型进行切分,从而实现更高程度的并行,这种方式通常被称为混合并行(Hybrid Parallelism,HP)

2.1、数据并行

    在数据并行系统中,每个计算设备都有整个神经网络模型的完整副本(Model Replica),进行迭代时,每个计算设备只分配了一个批次数据样本的子集,并根据该批次样本子集的数据进行网络模型的前向计算。假设一个批次的训练样本数为N,使用M 个计算设备并行计算,每个计算设备会分配到N/M 个样本。前向计算完成后,每个计算设备都会根据本地样本计算损失误差得到梯度Gi(i 为加速卡编号),并将本地梯度Gi 进行广播。所有计算设备需要聚合其他加速度卡给出的梯度值,然后使用平均梯度(ΣNi=1Gi)/N 对模型进行更新,完成该批次训练。图4给出了由两个计算设备组成的数据并行训练系统样例。

图4 两节点数据并行训练系统样例

    数据并行训练系统可以通过增加计算设备,有效提升整体训练吞吐量,每秒全局批次数(Global Batch Size Per Second) 。它和单计算设备训练相比,最主要的区别就在于反向计算中的梯度需要在所有计算设备中进行同步,以保证每个计算设备上最终得到的是所有进程上梯度的平均值。常见的神经网络框架中都有数据并行方式的具体实现,包括:TensorFlow DistributedStrategy、PyTorch Distributed、Horovod DistributedOptimizer 等。由于基于Transformer 架构的大语言模型中每个算子都是依赖单个数据而非批次数据,因此数据并行并不会影响其计算逻辑,一般情况下各训练设备中前向计算是独立的,不涉及同步问题。数据并行训练加速比最高,但要求每个设备上都备份一份模型,显存占用比较高。

使用PyTorch DistributedDataParallel 实现单个服务器多加速卡训练代码如下,首先构造DistributedSampler类,将数据集的样本随机打乱并分配到不同计算设备:

class DistributedSampler(Sampler):
  def __init__(self, dataset, num_replicas=None, rank=None, shuffle=True, seed=0):
    if num_replicas is None:
        if not dist.is_available():
            raise RuntimeError("Requires distributed package to be available")
        num_replicas = dist.get_world_size()
    if rank is None:
        if not dist.is_available():
            raise RuntimeError("Requires distributed package to be available")
        rank = dist.get_rank()
    self.dataset = dataset # 数据集
    self.num_replicas = num_replicas # 进程个数默认等于world_size(GPU 个数)
    self.rank = rank # 当前属于哪个进程/哪块GPU
    self.epoch = 0
    self.num_samples = int(math.ceil(len(self.dataset) * 1.0 / self.num_replicas))
    # 每个进程的样本个数
    self.total_size = self.num_samples * self.num_replicas # 数据集总样本的个数
    self.shuffle = shuffle # 是否要打乱数据集
    self.seed = seed

def __iter__(self):
# 1、Shuffle 处理:打乱数据集顺序
    if self.shuffle:
        # 根据epoch 和种子进行混淆
        g = torch.Generator()
        # 这里self.seed 是一个定值,通过set_epoch 改变self.epoch 可以改变我们的初始化种子
        # 这就可以让每一个epoch 中数据集的打乱顺序不同,使每一个epoch 中,
        # 每一块GPU 拿到的数据都不一样,这样可以有利于更好的训练
        g.manual_seed(self.seed + self.epoch)
        indices = torch.randperm(len(self.dataset), generator=g).tolist()
    else:
        indices = list(range(len(self.dataset)))
    # 数据补充
    indices += indices[:(self.total_size - len(indices))]
    assert len(indices) == self.total_size
    # 分配数据
    indices = indices[self.rank:self.total_size:self.num_replicas]
    assert len(indices) == self.num_samples
    return iter(indices)
def __len__(self):
    return self.num_samples
def set_epoch(self, epoch):

    self.epoch = epoch

利用DistributedSampler 构造完整的训练程序样例main.py 如下:

import argparse
import os
import shutil
import time
import warnings
import numpy as np
warnings.filterwarnings('ignore')
import torch
import torch.nn as nn
import torch.nn.parallel
import torch.backends.cudnn as cudnn
import torch.distributed as dist
import torch.optim
import torch.utils.data
import torch.utils.data.distributed
from torch.utils.data.distributed import DistributedSampler
from models import DeepLab
from dataset import Cityscaples
parser = argparse.ArgumentParser(description='DeepLab')
parser.add_argument('-j', '--workers', default=4, type=int, metavar='N',
help='number of data loading workers (default: 4)')
parser.add_argument('--epochs', default=100, type=int, metavar='N',
help='number of total epochs to run')
parser.add_argument('--start-epoch', default=0, type=int, metavar='N',
help='manual epoch number (useful on restarts)')
parser.add_argument('-b', '--batch-size', default=3, type=int,
metavar='N')
parser.add_argument('--local_rank', default=0, type=int, help='node rank for distributed training')
args = parser.parse_args()
torch.distributed.init_process_group(backend="nccl") # 初始化
print("Use GPU: {} for training".format(args.local_rank))
# create model
model = DeepLab()
torch.cuda.set_device(args.local_rank) # 当前显卡
model = model.cuda() # 模型放在显卡上
model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.local_rank],
    output_device=args.local_rank, find_unused_parameters=True) # 数据并行
criterion = nn.CrossEntropyLoss().cuda()
optimizer = torch.optim.SGD(model.parameters(), args.lr,
    momentum=args.momentum, weight_decay=args.weight_decay)
train_dataset = Cityscaples()
train_sampler = DistributedSampler(train_dataset) # 分配数据
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=args.batch_size,
    shuffle=False, num_workers=args.workers, pin_memory=True, sampler=train_sampler)

通过以下命令行启动上述程序:

CUDA_VISIBLE_DEVICES=0,1 python -m torch.distributed.launch --nproc_per_node=2 main.py

2.2 模型并行

    模型并行(Model Parallelism)往往用于解决单节点内存不足的问题。以包含1750 亿参数的GPT-3 模型为例,如果模型中每一个参数都使用32 位浮点数表示,那么模型需要占用700GB(即175G× 4 Bytes)内存,如果使用16 位浮点表示,每个模型副本需要也需要占用350GB 内存。以2022 年3 月NVIDIA 发布的H100 加速卡也仅支持80GB 显存,无法将整个模型完整放入其中。模型并行可以从计算图角度,以下两种形式进行切分:(1)按模型的层切分到不同设备,即层间并行或算子间并行(Inter-operator Parallelism),也称之为流水线并行(Pipeline Parallelism,PP);(2)将计算图层内的参数切分到不同设备,即层内并行或算子内并行(Intra-operator Parallelism),也称之为张量并行(Tensor Parallelism,TP)。两节点模型并行训练系统样例如图4.9所示,左边为流水线并行,模型的不同层被切分到不同的设备中;右边为张量并行,同一个层中的不同的参数被切分到不同的设备中进行计算。

1. 流水线并行

流水线并行(Pipeline Parallelism,PP)是一种并行计算策略,将模型的各个层分段处理,并将每个段分布在不同的计算设备上,使得前后阶段能够流水式、分批进行工作。流水线并行通常应用于大规模模型的并行系统中,以有效解决单个计算设备内存不足的问题。图4.6给出了一个由四个计算设备组成的流水线并行系统,包含了前向计算和后向计算。其中F1、F2、F3、F4 分别代表四个前向路径,位于不同的设备上;而B4、B3、B2、B1 则代表逆序的后向路径,也分别位于四个不同的设备上。然而,从图中可以看出,计算图中的下游设备(Downstream Device)需要长时间持续处于空闲状态,等待上游设备(Upstream Device)的计算完成,才能开始计算自身的任务。

图5 两节点模型并行训练系统样例


这种情况导致了设备的平均使用率大幅降低,形成了模型并行气泡(Model Parallelism Bubble),也称为流水线气泡(Pipeline Bubble)

图6 流水线并行样例


    朴素流水线策略所产生的并行气泡,使得系统无法充分利用计算资源,降低了系统整体的计算效率。为了能够减少并行气泡,文献[131] 提出了GPipe 方法,将小批次(Mini-batch)进一步划分成更小的微批次(Micro-batch),利用流水线并行方案,每次处理一个微批次的数据。在当前阶段计算完成得到结果后,将该微批次的结果发送给下游设备,同时开始处理后一个微批次的数据,这样可以在一定程度上减少并行气泡。图7GPipe 策略流水线并行样例。如图所示,前向F1计算被拆解为了F11,F12,F13,F14,在计算设备1 中计算完成F11 后,会在计算设备2 中开始进行F21 计算,同时计算设备1 中并行开始F12 的计算。相比于最原始的流水线并行方法,GPipe 流水线方法可以有效降低并行气泡。

图7 GPipe 策略流水线并行样例



    GPipe 策略虽然可以减少一定的并行气泡,但是只有当一个Mini-batch 中所有的前向计算完成后,才能开始执行后向计算。因此还是会产生很多并行气泡,从而降低了系统的并行效率。Megatron-
LM[132] 提出了1F1B 流水线策略,即一个前向通道和一个后向通道。1F1B 流水线策略引入了任务调度机制,使得下游设备能够在等待上游计算的同时执行其他可并行的任务,从而提高设备的利
用率。1F1B 给出了非交错式和交错式两种方式调度方式,如图8所示。

    1F1B 非交错式调度模式可分为三个阶段。首先是热身阶段,在该阶段中,计算设备中进行不同数量的前向计算。接下来的阶段是前向-后向阶段,计算设备按顺序执行一次前向计算,然后进行一次后向计算。最后一个阶段是后向阶段,计算设备在完成最后一次后向计算。相比于GPipe 策略,非交错式调度模式在节省内存方面表现更好。然而,它需要与GPipe 策略一样的时间来完成一轮计算。

    1F1B 交错式调度模式要求micro-batch 的数量是流水线阶段的整数倍。每个设备不再仅负责连续多个层的计算,而是可以处理多个层的子集,这些子集被称为模型块。具体而言,在之前的模式中,设备1 可能负责层1-4,设备2 负责层5-8,以此类推。然而,在新的模式下,设备1 可以处理层1、2、9、10,设备2 处理层3、4、11、12,以此类推。这种模式下,每个设备在流水线中被分配到多个阶段。例如,设备1 可能参与热身阶段、前向计算阶段和后向计算阶段的某些子集任务。每个设备可以并行执行不同阶段的计算任务,从而更好地利用流水线并行的优势。这种模式不仅在内存消耗方面表现出色,还能够提高计算效率,使得大型模型的并行系统能够更高效地
完成计算任务。


图8 1F1B 流水线并行策略样例

PyTorch 中也包含了实现流水线的API 函数Pipe,具体实现参考“torch.distributed.pipeline.sync.Pipe”类。可以使用这个API 构造一个包含两个线性层,分别放置在2 个不同计算设备中的样例如下:

{#
Step 0. Need to initialize RPC framework first.
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '29500'
torch.distributed.rpc.init_rpc('worker', rank=0, world_size=1)
# Step 1: build a model including two linear layers
fc1 = nn.Linear(16, 8).cuda(0)
fc2 = nn.Linear(8, 4).cuda(1)
# Step 2: wrap the two layers with nn.Sequential
model = nn.Sequential(fc1, fc2)
# Step 3: build Pipe (torch.distributed.pipeline.sync.Pipe)
model = Pipe(model, chunks=8)
# do training/inference
input = torch.rand(16, 16).cuda(0)
output_rref = model(input)
}

2. 张量并行

    张量并行(Tensor Parallelism,TP)需要根据模型的具体结构和算子类型,解决如何将参数切分到不同设备,以及如何保证切分后数学一致性两个问题。大语言模型都是以Transformer 结构为基础,Transformer 结构主要由以下三种算子构成:嵌入式表示(Embedding)、矩阵乘(MatMul)和交叉熵损失(Cross Entropy Loss)计算构成。这三种类型的算子有较大的差异,都需要设计对应的张量并行策略[130],才可以实现将参数切分到不同的设备。对于嵌入表示(Embedding)算子,如果总的词表数非常大,会导致单计算设备显存无法容纳Embedding 层参数。举例来说,如果词表数量是64000,嵌入表示维度为5120,类型采用32 位精度浮点数,那么整层参数需要的显存大约为64000 × 5120 × 4/1024/1024 = 1250MB,反向梯度同样需要1250MB,仅仅存储就需要将近2.5GB。对于嵌入表示层的参数,可以按照词维度切分,每个计算设备只存储部分词向量,然后通过汇总各个设备上的部分词向量,从而得到完整的词向量。图4.9给出了单节点Embedding 和两节点张量并行的示意图。在单节点上,执行Embedding 操作,bz 是批次大小(batch size),Embedding 的参数大小为[word_size, hidden_size],计算得到[bz,hidden_size] 张量。图4.9中Embedding 张量并行示例将Embedding 参数沿word_size 维度,切分为两块,每块大小为[word_size/2, hidden_size],分别存储在两个设备上。当每个节点查询各自的词表时,如果无法查到,则该词的表示为0,各自设备查询后得到[bz, hidden_size] 结果张量,最后通过AllReduce_Sum 通信¬,跨设备求和,得到完整的全量结果,可以看出,这里的输出结果和单计算设备执行的结果一致。

图9 两节点Embedding 算子张量并行示例

矩阵乘(MatMul)的张量并行要充分利用矩阵了分块乘法原理。举例来说,要实现如下矩阵乘法Y = X ×A,其中X 是维度为M × N 的输入矩阵,A 是维度为N ×K 的参数矩阵,Y 是结果矩阵,维度为M ×K。如果参数矩阵A 非常大,甚至超出单张卡的显存容量,那么可以把参数矩阵A 切分到多张卡上,并通过集合通信汇集结果,保证最终结果在数学计算上等价于单计算设备计算结果。参数矩阵A 存在两种切分方式:
(1) 参数矩阵A 按列切块,将矩阵A 按列切成:A = [A1,A2]

(2) 参数矩阵A 按行切块,将矩阵A 按行切成:

     

图10给出了参数矩阵按列切分的示例,参数矩阵A 分别将A1,A2 放置在两个计算设备上。两个计算设备分别计算Y1 = X ×A1 和Y2 = X ×A2。计算完成后,多计算设备间进行通信,从而获取其它计算设备上的计算结果,并拼接在一起得到最终的结果矩阵Y ,该结果在数学上与单计算设备计算结果上完全等价。

图10 两节点矩阵乘算子张量并行按列切分示例

    图11给出了参数矩阵按列行分的示例,为了满足矩阵乘法规则,输入矩阵X 需要按列切分X = [X1|X2]。同时,将矩阵分块,分别放置在两个计算设备上,每个计算设备分别计算Y1 =X1 ×A1 和Y2 = X2 ×A2。计算完成后,多个计算设备间通信获取归约其他卡上的计算结果,可以得到最终的结果矩阵Y 。同样,这种切分方式,既可以保证数学上的计算等价性,并解决单计算设备显存无法容纳,又可以保证单计算设备通过拆分方式可以装下参数A 的问题。

    Transformer 中的FFN 结构均包含两层全连接(FC)层,即存在两个矩阵乘,这两个矩阵乘分别采用上述两种切分方式,如图4.12所示。对第一个FC 层的参数矩阵按列切块,对第二个FC层参数矩阵按行切块。这样第一个FC 层的输出恰好满足第二个FC 层数据输入要求(按列切分),因此可以省去第一个FC 层后的汇总通信操作。多头自注意力机制的张量并行与FFN 类似,因为具有多个独立的头,因此相较于FFN 更容易实现并行,其矩阵切分方式如图4.13所示。具体可以参考文献[130]。

分类网络最后一层一般会选用Softmax 和Cross_entropy 算子来计算交叉熵损失(Cross Entropy Loss)。如果类别数量非常大,会导致单计算设备内存无法存储和计算logit 矩阵。针对这一类算子,可以按照类别维度切分,同时通过中间结果通信,得到最终的全局的交叉熵损失。首先计算


图11 两节点矩阵乘算子张量并行按行切分示例


图12 FNN 结构张量并行示意图

的是softmax 值,公式如下:

其中,p 表示张量并行的设备号。得到Softmax 计算结果之后,同时对标签Target 按类别切分,每个设备得到部分损失,最后再进行一次通信,得到所有类别的损失。整个过程,只需要进行三次小量的通信,就可以完成交叉熵损失的计算。PyTorch 提供了细粒度张量级别的并行API,DistributedTensor。也提供了粗粒度模型层面的API 对“nn.Module”进行张量并行。通过以下几行代码就可以实现对一个大的张量进行分片:

import torch
from torch.distributed._tensor import DTensor, DeviceMesh, Shard, distribute_tensor
# construct a device mesh with available devices (multi-host or single host)
device_mesh = DeviceMesh("cuda", [0, 1, 2, 3])
# if we want to do row-wise sharding
rowwise_placement=[Shard(0)]
# if we want to do col-wise sharding
colwise_placement=[Shard(1)]
big_tensor = torch.randn(888, 12)
# distributed tensor returned will be sharded across the dimension specified in placements
rowwise_tensor = distribute_tensor(big_tensor, device_mesh=device_mesh, placements=rowwise_placement)


对于像“nn.Linear”这样已经有“torch.Tensor”作为参数的模块,也提供了模块级API “distribute_module”在模型层面进行张量并行,参考代码如下:

import torch
from torch.distributed._tensor import DeviceMesh, Shard, distribute_tensor,distribute_module
class MyModule(nn.Module):
    def __init__(self):
        super().__init__()
        self.fc1 = nn.Linear(8, 8)
        self.fc2 = nn.Linear(8, 8)
        self.relu = nn.ReLU()
        def forward(self, input):
            return self.relu(self.fc1(input) + self.fc2(input))
    mesh = DeviceMesh(device_type="cuda", mesh=[[0, 1], [2, 3]])
    def shard_params(mod_name, mod, mesh):
        rowwise_placement = [Shard(0)]
        def to_dist_tensor(t): return distribute_tensor(t, mesh, rowwise_placement)
        mod._apply(to_dist_tensor)
    sharded_module = distribute_module(MyModule(), mesh, partition_fn=shard_params)
    def shard_fc(mod_name, mod, mesh):
        rowwise_placement = [Shard(0)]
        if mod_name == "fc1":
            mod.weight = torch.nn.Parameter(distribute_tensor(mod.weight, mesh, rowwise_placement))
    sharded_module = distribute_module(MyModule(), mesh, partition_fn=shard_fc)

2.3.混合并行

混合并行(Hybrid Parallelism,HP)是将多种并行策略如数据并行、流水线并行和张量并行等进行混合使用。通过结合不同的并行策略,混合并行可以充分发挥各种并行策略的优点,以最大程度地提高计算性能和效率。针对千亿规模的大语言模型,通常在每个服务器内部使用张量并行策略,由于该策略涉及的网络通信量较大,需要利用服务器内部的不同计算设备之间进行高速通信带宽。通过流水线并行,将模型的不同层划分为多个阶段,每个阶段由不同的机器负责计算。这样可以充分利用多台机器的计算能力,并通过机器之间的高速通信来传递计算结果和中间数据,以提高整体的计算速度和效率。最后,在外层叠加数据并行策略,以增加并发数量,提升整体训练速度。通过数据并行,将训练数据分发到多组服务器上进行并行处理,每组服务器处理不同的数据批次。这样可以充分利用多台服务器的计算资源,并增加训练的并发度,从而加快整体训练速度。BLOOM 使用了Megatron-DeepSpeed[104] 框架进行训练,主要包含两个部分:Megatron-LM 提供张量并行能力和数据加载原语;DeepSpeed提供ZeRO 优化器、模型流水线以及常规的分布式训练组件。通过这种方式可以实现数据、张量和流水线三维并行,BLOOM 模型训练时采用的并行计算结构如图14所示。BLOOM 模型训练使用了由48 个NVIDIA DGX-A100 服务器组成的集群,每个DGX-A100 服务器包含8 张NVIDIA A100 80GB GPU,总计包含384 张。BLOOM 训练采用的策略是首先将集群分为48 个一组,进行数据并行。接下来,模型整体被分为12 个阶段,进行流水线并行。每个阶段的模型被划分到4 张GPU 中,进行张量并行。同时BLOOM 也使用了ZeRO(零冗余优化器)[134] 进一步降低了模型对显存的占用。用了通过上述四个步骤可以实现数百个GPU 的高效并行计算。

图14 BLOOM 模型训练时采用的并行计算结构

2.4. 计算设备内存优化

当前大语言模型训练通常采用Adam 优化算法,除了需要每个参数梯度之外,还需要一阶动量(Momentum)和二阶动量(Variance)。虽然Adam 优化算法相较SGD 算法通常效果更好也更稳定,但是对计算设备内存的占用显著增大。为了降低内存占用,大多数系统已经采用了混合精度训练(Mixed Precision Training)方式,即同时存在FP16(16 位浮点数)或者BF16(Bfloat16)和FP32(32 位浮点数)两种格式的数值。FP32、FP16 和BF16 表示如图4.15所示。FP32 中第31 位为符号位,第30 到第23 位用于表示指数,第22 到第0 位用于表示尾数。FP16 中第15 位为符号位,第14 到第10 位用于表示指数,第9 到第用于表示尾数。BF16 中第15 位为符号位,第14 到第7 位用于表示指数,第6 到第0 位用于表示尾数。由于FP16 的值区间比FP32 的值区间小很多,所以在计算过程中很容易出现上溢出和下溢出。BF16 相较于FP16 以精度换取更大的值区间范围。但是,由于FP16 和BF16 相较FP32 精度低,训练过程中可能会出现梯度消失和模型不稳定的问题。
    因此,需要使用一些技术来解决这些问题,例如动态损失缩放(Dynamic Loss Scaling)和混合精度优化器(Mixed Precision Optimizer)等。混合精度优化的过程如图4.16所示。Adam 优化器状态包括采用FP32 保存的模型参数备份,一阶动量和二阶动量也都采用FP32 格式存储。假设模型参数量为Φ,模型参数和梯度都是用FP16格式存储,则共需要2Φ + 2Φ + (4Φ + 4Φ + 4Φ) = 16Φ 字节存储。其中Adam 状态占比75%。动态损失缩放反向传播前,将损失变化(dLoss)手动增大2K 倍,因此反向传播时得到的激活函数梯度则不会溢出;反向传播后,将权重梯度缩小2K 倍,恢复正常值。举例来说,对于包含75 亿个参数模型,如果用FP16 格式,只需要15GB 计算设备内存,但是在训练阶段模型状态实际上需要耗费120GB。计算卡内存占用中除了模型状态之外,还有剩余状态(Residual States),包括激活值(Activation)、各种临时缓冲区(Buffer)以及无法使用的显存碎片(Fragmentation)等。由于激活值可以用检查点(Activation Checkpointing)方式使得激活值内存占用大幅度减少,因此如何减少模型状态尤其是Adam 优化器状态是解决内存占用问题的关键。

图16 混合精度优化过程


以上是我简单介绍分布式机器学习系统的基础概念、分布式训练集群架构、分布式训练并行策略,DeepSpeed 为例如何在集群上训练大语言模型在下一次的文章给大家继续介绍,欢迎大家点赞关注支持,您的支持是我创作的动力。

参考内容:

(1) 收藏丨30个大语言模型训练相关的数据集分享 - 知乎. https://zhuanlan.zhihu.com/p/612243919.
(2) 大语言模型训练数据常见的4种处理方法 - 知乎. https://zhuanlan.zhihu.com/p/673045395.

(3)《大规模语言模型:从理论到实践》张奇等著. —北京:电子工业出版社
(4) 大语言模型综述 - Renmin University of China. http://ai.ruc.edu.cn/research/science/20230605100.html.

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。