浅谈LLM之分布式训练
随着语言模型参数量和所需训练数据量的急速增长,单个机器上有限的资源已无法满足大语言模型训练的要求。需要设计分布式训练(Distributed Training)系统来解决海量的计算和内存资源要求问题。在分布式训练系统环境下需要将一个模型训练任务拆分成多个子任务,并将子任务分发给多个计算设备,从而解决资源瓶颈。但是如何才能利用包括数万计算加速芯片的集群,训练模型参数量千亿甚至是万亿的大规模语言模型?这其中涉及到集群架构、并行策略、模型架构、内存优化、计算优化等一系列的技术。本节将介绍分布式机器学习系统的基础概念、分布式训练集群架构、分布式训练并行策略,并以DeepSpeed 为例介绍如何在集群上训练大语言模型。
一、分布式训练概述
分布式训练(Distributed Training)是指将机器学习或深度学习模型训练任务分解成多个子任务,并在多个计算设备上并行地进行训练。图4.1给出了单个计算设备和多个计算设备的示例,这里计算设备可以是中央处理器(Central Processing Unit,CPU)、图形处理器(Graphics ProcessingUnit,GPU)、张量处理器(Tensor Processing Unit,TPU)也可以是神经网络处理器(Neural network Processing Unit,NPU)。由于同一个服务器内部的多个计算设备之间内存也可能并不共享,因此无论这些计算设备是否处于一个服务器还是多个服务器中,其系统架构都属于分布式系统范畴。一个模型训练任务往往会有大量的训练样本作为输入,可以利用一个计算设备完成,也可以将整个模型的训练任务拆分成子任务,分发给不同的计算设备,实现并行计算。此后,还需要对每个计算设备的输出进行合并,最终得到与单个计算设备等价的计算结果。由于每个计算设备只需要负责子任务,并且多个计算设备可以并行执行,因此其可以更快速地完成整体计算,并最终实现对整个计算过程的加速。
促使人们设计分布式训练系统的一个最重要的原因就是单个计算设备的算力已经不足以支撑模型训练。图4.2给出了机器学习模型对于算力的需求以及同期单个计算设备能够提供的算力。如图所示,机器学习模型快速发展,从2013 年AlexNet 开始,到2022 年拥有5400 亿参数的PalM 模型,机器学习模型以每18 个月增长56 倍的速度发展。模型参数规模增大的同时,对训练数据量
图1.1 单计算设备计算和多计算设备示例
的要求也指数级增长,这更加剧了对算力的需求。然而,近几年CPU 的算力增加已经远低于摩尔定律(Moore’s Law),虽然计算加速设备(如GPU、TPU 等)为机器学习模型提供了大量的算力,但是其增长速度仍然没有突破每18 个月翻倍的摩尔定律。为了能够满足机器学习模型的发展,只有通过分布式训练系统才可以匹配模型不断增长的算力需求。
图1.2 机器学习模型参数量增长和计算硬件的算力增长对比[128]
分布式训练的总体目标就是提升总的训练速度,减少模型训练的总体时间。总训练速度可以
用如下公式简略估计:
总训练速度∝ 单设备计算速度× 计算设备总量× 多设备加速比
其中,单设备计算速度主要由单块计算加速芯片的运算速度和数据I/O 能力来决定,对单设备训练效率进行优化,主要的技术手段有混合精度训练、算子融合、梯度累加等;分布式训练系统中计算设备数量越多,其理论峰值计算速度就会越高,但是受到通讯效率的影响,计算设备数量增大则会造成加速比急速降低;多设备加速比则是由计算和通讯效率决定,需要结合算法和网络拓扑结构进行优化,分布式训练并行策略主要目标就是提升分布式训练系统中的多设备加速比。大语言模型参数量和所使用的数据量都非常巨大,因此都采用了分布式训练架构完成训练。文献[5] 针对GPT-3 的训练过程仅介绍了训练过程全部使用NVIDIA V100 GPU,文献[31] 介绍了OPT 使用了992 块NVIDIA A100 80G GPU,采用全分片数据并行(Fully Shared Data Parallel)[129]以及Megatron-LM 张量并行(Tensor Parallelism)[130],整体训练时间将近2 个月。BLOOM[33] 模型的研究人员则公开了更多在硬件和所采用的系统架构方面的细节。该模型的训练一共花费3.5 个月,使用48 个计算节点。每个节点包含8 块NVIDIA A100 80G GPU(总计384 个GPU),并且使用4*NVLink 用于节点内部GPU 之间通信。节点之间采用四个Omni-Path 100 Gbps 网卡构建的增强8 维超立方体全局拓扑网络进行通信。文献[37] 并没有给出LLaMA 模型训练中所使用的集群的具体配置和网络拓扑结构,但是给出了不同参数规模的总GPU 小时数。LLaMA 模型训练采用
A100-80GB GPU,LLaMA-7B 模型训练需要82432 GPU 小时,LLaMA-13B 模型训练需要135168GPU 小时,LLaMA-33B 模型训练花费了530432 GPU 小时,而LLaMA-65B 模型训练花费则高达1022362 GPU 小时。由于LLaMA 所使用的训练数据量远超OPT 和BLOOM 模型,因此,虽然模型参数量远小于上述两个模型,但是其所需计算量仍然非常惊人。
通过使用分布式训练系统,大语言模型训练周期可以从单计算设备花费几十年,缩短到使用数千个计算设备花费几十天就可以完成。然而,分布式训练系统仍然需要克服计算墙、显存墙、通信墙等多种挑战,以确保集群内的所有资源得到充分利用,从而加速训练过程并缩短训练周期。
• 计算墙:单个计算设备所能提供的计算能力与大语言模型所需的总计算量之间存在巨大差异。2022 年3 年发布的NVIDIA H100 SXM 的单卡FP16 算力也只有2000 TFLOPs,而GPT-3则需要314 ZFLOPs 的总算力,两者相差 了8 个数量级。
• 显存墙:单个计算设备无法完整存储一个大语言模型的参数。GPT-3 包含1750 亿参数,如果采用FP16 格式进行存储,需要700GB 的计算设备内存空间,而NVIDIA H100 GPU 只有80 GB 显存。
• 通信墙:分布式训练系统中各计算设备之间需要频繁地进行参数传输和同步。由于通信的延迟和带宽限制,这可能成为训练过程的瓶颈。GPT-3 训练过程中,如果分布式系统中存在128个模型副本,那么在每次迭代过程中至少需要传输89.6TB 的梯度数据。而截止2023 年8 月,单个InfiniBand 链路仅能够提供不超过800Gb/s 带宽。
计算墙和显存墙源于单计算设备的计算和存储能力有限,与模型对庞大计算和存储需求之间存在矛盾。这个问题可以通过采用分布式训练方法来解决,但分布式训练又会面临通信墙的挑战。在多机多卡的训练中,这些问题逐渐显现。随着大模型参数的增大,对应的集群规模也随之增加,这些问题变得更加突出。同时,在大型集群进行长时间训练时,设备故障可能会影响或中断训练过程,对分布式系统的问题性也提出了很高要求。
二、分布式训练并行策略
分布式训练系统目标就是将单节点模型训练转换成等价的分布式并行模型训练。对于大语言模型来说,训练过程就是根据数据和损失函数,利用优化算法对神经网络模型参数进行更新的过程。单节点模型训练系统结构如图4.3所示,主要由数据和模型两个部分组成。训练过程会由多个数据小批次(Mini-batch)完成。图中数据表示一个数据小批次。训练系统会利用数据小批次根据损失函数和优化算法生成梯度,从而对模型参数进行修正。针对大语言模型多层神经网络的执行过程,可以由一个计算图(Computational Graph)表示。这个图有多个相互连接的算子(Operator),每个算子实现一个神经网络层(Neural Network Layer),而参数则代表了这个层在训练中所更新的的权重。
图2.3 单设备模型训练系统
计算图的执行过程可以分为前向计算和反向计算两个阶段。前向计算的过程是将数据读入第一个算子,计算出相应的输出结构,然后依此重复这个前向计算过程,直到最后一个算子结束。反向计算过程,是根据优化函数和损失,每个算子依次计算出梯度,并利用梯度更新本地的参数。在反向计算结束后,该数据小批次的计算完成,系统就会读取下一个数据小批次,继续下一轮的模型参数更新。
根据单设备模型训练系统的流程,可以看到如果进行并行加速,可以从数据和模型两个维度进行考虑。首先可以对数据进行切分(Partition),并将同一个模型复制到多个设备上,并行执行不同的数据分片,这种方式通常被称为数据并行(Data Parallelism,DP)。还可以对模型进行划分,将模型中的算子分发到多个设备分别完成,这种方式通常被称为模型并行(Model Parallelism,MP)。当训练超大规模语言模型时,往往需要同时对数据和模型进行切分,从而实现更高程度的并行,这种方式通常被称为混合并行(Hybrid Parallelism,HP)。
2.1 数据并行
在数据并行系统中,每个计算设备都有整个神经网络模型的完整副本(Model Replica),进行迭代时,每个计算设备只分配了一个批次数据样本的子集,并根据该批次样本子集的数据进行网络模型的前向计算。假设一个批次的训练样本数为N,使用M 个计算设备并行计算,每个计算设备会分配到N/M 个样本。前向计算完成后,每个计算设备都会根据本地样本计算损失误差得到梯度Gi(i 为加速卡编号),并将本地梯度Gi 进行广播。所有计算设备需要聚合其他加速度卡给出的梯度值,然后使用平均梯度(ΣNi=1Gi)/N 对模型进行更新,完成该批次训练。图4.4给出了由两个计算设备组成的数据并行训练系统样例。
图2.4 两节点数据并行训练系统样例
数据并行训练系统可以通过增加计算设备,有效提升整体训练吞吐量,每秒全局批次数(Global Batch Size Per Second) 。它和单计算设备训练相比,最主要的区别就在于反向计算中的梯度需要在所有计算设备中进行同步,以保证每个计算设备上最终得到的是所有进程上梯度的平均值。常见的神经网络框架中都有数据并行方式的具体实现,包括:TensorFlow DistributedStrategy、PyTorch Distributed、Horovod DistributedOptimizer 等。由于基于Transformer 架构的大语言模型中每个算子都是依赖单个数据而非批次数据,因此数据并行并不会影响其计算逻辑,一般情况下各训练设备中前向计算是独立的,不涉及同步问题。数据并行训练加速比最高,但要求每个设备上都备份一份模型,显存占用比较高。
使用PyTorch DistributedDataParallel 实现单个服务器多加速卡训练代码如下,首先构造DistributedSampler类,将数据集的样本随机打乱并分配到不同计算设备:
```python
class DistributedSampler(Sampler):
def __init__(self, dataset, num_replicas=None, rank=None, shuffle=True, seed=0):
if num_replicas is None:
if not dist.is_available():
raise RuntimeError("Requires distributed package to be available")
num_replicas = dist.get_world_size()
if rank is None:
if not dist.is_available():
raise RuntimeError("Requires distributed package to be available")
rank = dist.get_rank()
self.dataset = dataset # 数据集
self.num_replicas = num_replicas # 进程个数默认等于 world_size(GPU个数)
self.rank = rank # 当前属于哪个进程/哪块GPU
self.epoch = 0
self.num_samples = int(math.ceil(len(self.dataset) * 1.0 / self.num_replicas)) # 每个进程的样本个数
self.total_size = self.num_samples * self.num_replicas # 数据集总样本的个数
self.shuffle = shuffle # 是否要打乱数据集
self.seed = seed
def __iter__(self):
# 1、Shuffle处理:打乱数据集顺序
if self.shuffle:
# 根据epoch和种子进行混淆
g = torch.Generator()
# 这里 self.seed是一个定值,通过 set_epoch改变 self.epoch可以改变我们的初始化种子
# 这就可以让每一个epoch中数据集的打乱顺序不同,使每一个epoch中,
# 每一块GPU拿到的数据都不一样,这样可以有利于更好的训练
g.manual_seed(self.seed + self.epoch)
indices = torch.randperm(len(self.dataset), generator=g).tolist()
else:
indices = list(range(len(self.dataset)))
# 数据补充
indices += indices[:(self.total_size - len(indices))]
assert len(indices) == self.total_size
# 分配数据
indices = indices[self.rank:self.total_size:self.num_replicas]
assert len(indices) == self.num_samples
return iter(indices)
def __len__(self):
return self.num_samples
def set_epoch(self, epoch):
"""
Sets the epoch for this sampler. When `shuffle=True`, this ensures all replicas use a different random ordering for each epoch.
Otherwise, the next iteration of this sampler will yield the same ordering.
Arguments:
epoch (int): Epoch number.
"""
self.epoch = epoch
```
利用DistributedSampler 构造完整的训练程序样例main.py 如下:
```python
import argparse
import os
import shutil
import time
import warnings
import numpy as np
import torch
import torch.nn as nn
import torch.nn.parallel
import torch.backends.cudnn as cudnn
import torch.distributed as dist
import torch.optim
import torch.utils.data
from torch.utils.data.distributed import DistributedSampler
from models import DeepLab
from dataset import Cityscapes
warnings.filterwarnings('ignore')
parser = argparse.ArgumentParser(description='DeepLab')
parser.add_argument('-j', '--workers', default=4, type=int, metavar='N',
help='number of data loading workers (default:4)')
parser.add_argument('--epochs', default=100, type=int, metavar='N',
help='number of total epochs to run')
parser.add_argument('--start-epoch', default=0, type=int, metavar='N',
help='manual epoch number (useful on restarts)')
parser.add_argument('-b', '--batch-size', default=3, type=int, metavar='N')
parser.add_argument('--local_rank', default=0, type=int, help='node rank for distributed training')
args = parser.parse_args()
torch.distributed.init_process_group(backend="nccl")
print("Use GPU:{} for training".format(args.local_rank))
# create model
model = DeepLab()
torch.cuda.set_device(args.local_rank)
model = model.cuda()
model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.local_rank], output_device=args.local_rank, find_unused_parameters=True)
criterion = nn.CrossEntropyLoss().cuda()
optimizer = torch.optim.SGD(model.parameters(), args.lr, momentum=args.momentum, weight_decay=args.weight_decay)
train_dataset = Cityscapes()
train_sampler = DistributedSampler(train_dataset)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=args.batch_size, shuffle=False, num_workers=args.workers, pin_memory=True, sampler=train_sampler)
# Repeated code block below is intentionally omitted for brevity
```
通过以下命令行启动上述程序:
CUDA_VISIBLE_DEVICES=0,1 python -m torch.distributed.launch --nproc_per_node=2 main.py
2.3 分布式训练的集群架构
高性能计算集群硬件组成
典型的高性能计算集群的硬件组成如图4.18所示。整个计算集群包含大量带有计算加速设备的服务器。每个服务器中往往有多个计算加速设备(通常2-16 个)。多个服务器会被放置在一个机柜(Rack)中,服务器通过架顶交换机(Top of Rack Switch,ToR)连接网络。在架顶交换机满载的情况下,可以通过在架顶交换机间增加骨干交换机(Spine Switch)进一步接入新的机柜。这种连接服务器的拓扑结构往往是一个多层树(Multi-Level Tree)。
图2.3.1 典型用于分布式训练的计算集群硬件组成[
多层树结构集群中跨机柜通信(Cross-Rack Communication)往往会有网络瓶颈。以包含1750亿参数的GPT-3 模型为例,每一个参数使用32 位浮点数表示,那么每一轮训练迭代训练中,每个模型副本(Model Replica)会生成700GB(即175G×4 Bytes = 700GB)的本地梯度数据。假如采用包含1024 卡的计算集群,包含128 个模型副本,那么至少需要传输89.6TB(即700GB×128 =89.6TB)的梯度数据。这会造成严重的网络通信瓶颈。因此,针对大语言模型分布式训练,通常采用胖树[137](Fat-Tree)拓扑结构,试图实现网络带宽的无收敛。此外,采用InfiniBand(IB)技术搭建高速网络,单个InfiniBand 链路可以提供200Gb/s 或者400Gb/s 带宽。NVIDIA 的DGX 服务器提供单机1.6Tb(200Gb×8)网络带宽,HGX 服务器网络带宽更是可以达到3.2Tb(400Gb×8)。单个服务器内通常由2 到16 个计算加速设备组成,这些计算加速设备之间的通讯带宽也是影响分布式训练的重要因素。如果这些计算加速设备通过服务器PCI 总线互联,会造成服务器内部计算加速设备之间通讯瓶颈。PCIe 5.0 总线也只能提供128GB/s 的带宽,而NVIDIA H100 采用高带宽内存(High-Bandwidth Memory,HBM)可以提供3350GB/s 的带宽。因此,服务器内部通常也采用了异构网络架构。NVIDIA HGX H100 8 GPU 服务器,采用了NVLink 和NVSwitch(NVLink 交换机)技术,如图4.19所示。每个H100 GPU 都有多个NVLink 端口,并连接到所有四个NVSwitch上。每个NVSwitch 都是一个完全无阻塞的交换机,完全连接所有8 个H100 计算加速卡。NVSwitch的这种完全连接的拓扑结构,使得服务器内任何H100 加速卡之间都可以达到900GB/s 双向通信速度。
图2.19 NVIDIA HGX H100 8-GPU NVLink 和NVSwitch 连接框图
2.4 DeepSpeed 实践
DeepSpeed[133] 是一个由Microsoft 公司开发的开源深度学习优化库,旨在提高大规模模型训练的效率和可扩展性,使研究人员和工程师能够更快地迭代和探索新的深度学习模型和算法。它采用了多种技术手段来加速训练,包括模型并行化、梯度累积、动态精度缩放和本地模式混合精度等。此外,DeepSpeed 还提供了一些辅助工具,例如分布式训练管理、内存优化和模型压缩,以帮助开发者更好地管理和优化大规模深度学习训练任务。DeepSpeed 是基于PyTorch 构建的,因此将现有的PyTorch 训练代码迁移到DeepSpeed 上通常只需要进行简单的修改。这使得开发者可以快速利用DeepSpeed 的优化功能来加速他们的训练任务。DeepSpeed 已经在许多大规模深度学习项目中得到了应用,包括语言模型、图像分类、目标检测等领域。大语言模型BLOOM[33] 模型(1750 亿参数)和MT-NLG[104] 模型(5400 亿参数)都是采用DeepSpeed 框架完成训练。
DeepSpeed 主要优势在于支持大规模神经网络模型、提供了更多的优化策略和工具。DeepSpeed通过实现三种并行方法的灵活组合,即ZeRO 支持的数据并行、流水线并行和张量并行,可以应对不同工作负载的需求。特别是通过3D 并行性的支持,DeepSpeed 可以处理具有万亿参数的超大规模模型。DeepSpeed 还引入了ZeRO-Offload,使单个GPU 能够训练比其显存大小大10 倍的模型。此外,为了充分利用CPU 和GPU 内存来训练大型模型,DeepSpeed 还扩展了ZeRO-2。此外,DeepSpeed 还提供了稀疏注意力核(Sparse Attention Kernel),支持处理包括文本、图像和语音等长序列输入的模型。DeepSpeed 还集成了1 比特Adam 算法(1-bit Adam),它可以只使用原始Adam算法1/5 的通信量,同时达到与Adam 类似的收敛率,可以显著提高分布式训练的效率,并降低通信开销。
DeepSpeed 的3D 并行充分利用硬件架构特性,有效综合考虑了显存效率和计算效率两个方面。本章第2.3 节介绍了分布式集群的硬件架构,可以看到截止到2023 年9 月分布式训练集群通常采用NVIDIA DGX/HGX 节点,利用胖树网络拓扑结构构建计算集群。因此,每个节点内部8 个计算加速设备之间具有非常高的通信带宽,但是节点之间的通信带宽则相对较低。由于张量并行是分布式训练策略中通信开销最大的,因此优先考虑将张量并行计算组放置在节点内以利用更大的节点内带宽。如果张量并行组并不能占满节点内的所有计算节点时,选择将数据并行组放置在节点内,否则就使用跨节点进行数据并行。流水线并行的通信量最低,因此可以使用跨节点调度
流水线的各个阶段,降低通信带宽的要求。每个数据并行组需要通信的梯度量随着流水线和模型并行的规模线性减小,因此总通信量少于单纯使用数据并行。此外,每个数据并行组会在局部的一小部分计算节点内部独立进行通信,组间通信可以相互并行。通过减少通信量和增加局部性与并行性,数据并行通信的有效带宽有效增大。图2.29给出了DeepSpeed 3D 并行策略示意图。图中给出了包含32 个计算设备进行3D 并行的例子。神经网络的各层分为4 个流水线阶段。每个流水线阶段中的层在4 个张量并行计算设备之间进一步划分。最后,每个流水线阶段有两个数据并行实例,使用ZeRO 内存优化在这2 个副本之间划分优化器状态量。
DeepSpeed 软件架构如图2.30所示,主要包含三部分:
• APIs:DeepSpeed 提供了易于使用的API 接口,简化了训练模型和推断的过程。用户只需通过调用几个API 接口即可完成任务。通过“initialize”接口可以初始化引擎,并在参数中配置训练参数和优化技术等。这些配置参数通常保存在名为“ds_config.json”的文件中。。
• RunTime:DeepSpeed 的核心运行时组件,使用Python 语言实现,负责管理、执行和优化性能。它承担了将训练任务部署到分布式设备的功能,包括数据分区、模型分区、系统优化、微调、故障检测以及检查点的保存和加载等任务。
图2.29 DeepSpeed 3D 并行策略示意图
Ops:DeepSpeed 的底层内核组件,使用C++ 和CUDA 实现。它优化计算和通信过程,提供了一系列底层操作,包括Ultrafast Transformer Kernels、fuse LAN kernels、Customary Deals等。Ops 的目标是通过高效的计算和通信加速深度学习训练过程。
2.4.1 基础概念
DeepSpeed 提供了分布式计算框架,首先需要明确几个重要的基础的概念:节点编号、全局进程编号、局部进程编号、全局总进程数和主节点。DeepSpeed 主节点(master_ip+master_port)负责协调所有其他节点和进程的工作,由主节点所在服务器的IP 地址和主节点进程的端口号来确定主节点。主节点还负责监控系统状态、处理任务分配和结果汇总等任务,因此是整个系统的关键部分。节点编号(node_rank)是系统中每个节点的唯一标识符,用于区分不同计算机之间的通信。全局进程编号(rank)是整个系统中的每个进程的唯一标识符,用于区分不同进程之间的通信。局部进程编号(local_rank):是单个节点内的每个进程的唯一标识符,用于区分同一节点内的不同进程之间的通信。全局总进程数(word_size)是整个系统中运行的所有进程的总数,用于确定可以并行完成多少工作以及需要完成任务所需的资源数量。
在网络通信策略方面,DeepSpeed 提供了MPI、GLOO 和NCCL 等选项,可以根据具体情况进行选择和配置。DeepSpeed 配置文件中,在optimizer 部分配置通信策略,以下是使用OneBitAdam优化器的配置样例,配置中其中使用了nccl 通讯库:
图2.30 DeepSpeed 软件架构
```json
{
"optimizer": {
"type": "OneBitAdam",
"params": {
"lr": 0.001,
"betas": [
0.8,
0.999
],
"eps": 1e-8,
"weight_decay": 3e-7,
"freeze_step": 400,
"cuda_aware": false,
"comm_backend_name": "nccl"
}
}
}
```
DeepSpeed 中也支持各多种类型ZeRO 的分片机制,包括ZeRO-0、ZeRO-1、ZeRO-2、ZeRO-3 以及ZeRO-Infinity。ZeRO-0 禁用所有类型的分片,仅将DeepSpeed 当作分布式数据并行使用;ZeRO-1 对优化器状态都进行分片,占用内存为原始的1/4, 通信容量与数据并行性相同;ZeRO-2对优化器状态和梯度都进行分片,占用内存为原始的1/8,通信容量与数据并行性相同;ZeRO-3:对优化器状态、梯度以及模型参数都进行分片,内存减少与数据并行度和复杂度成线性关系,同时通信容量是数据并行性的1.5 倍;ZeRO-Infinity 是ZeRO-3 的拓展,允许通过使用NVMe 固态硬盘扩展GPU 和CPU 内存来训练大型模型。以下是DeepSpeed 使用ZeRO-3 配置参数样例:
```json
{
"zero_optimization": {
"stage": 3
},
"fp16": {
"enabled": true
},
"optimizer": {
"type": "AdamW",
"params": {
"lr": 0.001,
"betas": [
0.8,
0.999
],
"eps": 1e-8,
"weight_decay": 3e-7
}
}
}
```
如果希望在ZeRO-3 基础上继续使用ZeRO-Infinity 将优化器状态和计算转移到CPU 中,可以在配置文件中按照方式如下配置:
```json
{
"zero_optimization": {
"stage": 3,
"offload_optimizer": true,
"device": "cpu"
},
"device": "cpu"
}
2.4.2 LLaMA 分布式训练实践
LLaMA 模型是目前最流行和性能最强大的开源模型之一,基于LLaMA 所构造的模型生态可以覆盖绝大部分模型使用场景。在设置完必要的数据和环境配置后,本节将逐步演示如何使用DeepSpeed 框架训练LLaMA 模型。
Deepspeed 可以很好的兼容PyThorch 和CUDA 的大多数版本,其安装过程通常无需指定特殊配置选项,可以直接通过pip 命令完成。
pip install deepspeed
1. 训练数据配置
使用PyTorch 和Transformers 库来设置预训练模型的数据加载器,以实现在单机或多机分布式训练环境中对数据的加载和采样。需要导入的模块为:
• DataLoader: PyTorch 提供的工具,用于从数据集加载数据到模型进行训练或评估。
• RandomSampler 和SequentialSampler: 这是PyTorch 提供的两种采样器。
• RandomSampler 随机采样数据,而SequentialSampler 顺序采样数据。
• DistributedSampler:用于分布式训练的数据采样器。
• default_data_collator: Transformers 库提供的默认数据收集器,用于将多个样本整合为一个批量数据。
• create_pretrain_dataset: 一个自定义函数,用于创建预训练数据集。
通过检查args.local_rank 是否为−1,代码决定使用普通的采样器(单机)还是分布式采样器(多机)。DistributedSampler 确保在分布式训练环境中,每个进程或节点都能获得数据的一个不重复的子集,这使得分布式训练变得可能。而在单机环境中,使用常规的随机或顺序采样器即可。具体代码如下所示:
from torch.utils.data import DataLoader, RandomSampler, SequentialSampler
from torch.utils.data.distributed import DistributedSampler
from transformers import default_data_collator
from utils.data.data_utils import create_pretrain_dataset
### # Prepare the data
train_dataset, eval_dataset = create_pretrain_dataset(
args.local_rank,
args.data_path,
args.data_split,
args.data_output_path,
args.seed,
tokenizer,
args.max_seq_len
)
### # DataLoaders creation:
if args.local_rank == -1:
train_sampler = RandomSampler(train_dataset)
eval_sampler = SequentialSampler(eval_dataset)
else:
train_sampler = DistributedSampler(train_dataset)
eval_sampler = DistributedSampler(eval_dataset)
train_dataloader = DataLoader(
train_dataset,
collate_fn=default_data_collator,
sampler=train_sampler,
batch_size=args.per_device_train_batch_size
)
eval_dataloader = DataLoader(
eval_dataset,
collate_fn=default_data_collator,
sampler=eval_sampler,
batch_size=args.per_device_eval_batch_size
)
2. 模型载入
使用Transformers 库加载和配置LLaMA 模型及其相关的分词器。在从transformers 库中导入LLaMA 模型、相应的分词器和模型配置后,使用from_pretrained 方法来加载预训练的LLaMA 模型、分词器和配置。为了确保分词器可以处理各种文本长度,还需要进行了填充设置。如果分词器还没有指定填充符号,将其设置为[PAD],并确定填充行为发生在句子的右侧。此外,为了保证模型能够正确地处理句子结束和填充,还为模型配置设置了结束符号和填充符号的ID。最后,为了优化模型在硬件上的性能,还需要调整了模型的词汇表嵌入大小,使其成为8 的倍数。通过这些步骤,可以成功地加载并配置了LLaMA 模型,为后续的训练任务做好了准备。具体代码如下:
import math
from transformers import LlamaForCausalLM, LlamaTokenizer, LlamaConfig
# load_hf_tokenizer will get the correct tokenizer and set padding tokens based on the model family
tokenizer = LlamaTokenizer.from_pretrained(model_name_or_path, fast_tokenizer=True)
if tokenizer.pad_token is None:
tokenizer.add_special_tokens({'pad_token': '[PAD]'})
tokenizer.padding_side = 'right'
model_config = LlamaConfig.from_pretrained(model_name_or_path)
model = LlamaForCausalLM.from_pretrained(model_name_or_path, config=model_config)
model.config.end_token_id = tokenizer.eos_token_id
model.config.pad_token_id = model.config.eos_token_id
# make the vocab size multiple of 8
model.resize_token_embeddings(int(math.ceil(len(tokenizer) / 8.0)))
3. 优化器设置
DeepSpeed 库提供了高效的优化器算法,如DeepSpeedCPUAdam 和FusedAdam,这些算法经过特殊优化以提高在大规模数据和模型上的训练速度。优化器可以配置主要包含一下几个方面:
• 参数分组:通过get_optimizer_grouped_parameters 函数将模型参数分为两组:一组使用权重衰减,另一组则不使用。这种参数分组有助于正则化模型,防止过拟合,并允许对特定参数应用不同的学习设置。
• 优化器选择:根据训练设置(如是否在CPU 上进行模型参数卸载),我们可以选择使用Deep-SpeedCPUAdam 或FusedAdam 优化器。这两种优化器都是对经典的Adam 优化器进行优化和改进的版本,为大规模训练提供了高效性能。
• 学习率调度:不同于固定的学习率,学习率调度器在训练过程中动态调整学习率。例如,在训练初期快速提高学习率以加速收敛,然后在训练中后期逐渐降低学习率以获得更精细的优化。我们的配置考虑了预热步骤、训练的总步数以及其他关键因素。
具体代码如下所示:
from transformers import get_scheduler
from deepspeed.ops.adam import DeepSpeedCPUAdam, FusedAdam
import math
# Split weights in two groups, one with weight decay and the other not
optimizer_grouped_parameters = get_optimizer_grouped_parameters(
model,
args.weight_decay,
args.learning_rate
)
AdamOptimizer = DeepSpeedCPUAdam if args.offload else FusedAdam
optimizer = AdamOptimizer(
optimizer_grouped_parameters,
lr=args.learning_rate,
betas=(0.9, 0.95)
)
num_update_steps_per_epoch = math.ceil(
len(train_dataloader) / args.gradient_accumulation_steps
)
lr_scheduler = get_scheduler(
name=args.lr_scheduler_type,
optimizer=optimizer,
num_warmup_steps=args.num_warmup_steps,
num_training_steps=args.num_train_epochs * num_update_steps_per_epoch
)
def get_optimizer_grouped_parameters(model, weight_decay, no_decay_name_list=["bias", "LayerNorm.weight"]):
optimizer_grouped_parameters = [
{
"params": [p for n, p in model.named_parameters() if (not any(nd in n for nd in no_decay_name_list) and p.requires_grad)],
"weight_decay": weight_decay,
},
{
"params": [p for n, p in model.named_parameters() if (any(nd in n for nd in no_decay_name_list) and p.requires_grad)],
"weight_decay": 0.0,
},
]
return optimizer_grouped_parameters
4. DeepSpeed 设置
在配置代码的开始,定义了两个关键参数:GLOBAL_BATCH_SIZE: 定义了全局的批次大小。这通常是所有GPU 加起来的总批次大小。MICRO_BATCH_SIZE: 定义了每个GPU 上的微批次大小。微批次处理可以帮助大型模型在有限的GPU 内存中运行,因为每次只加载并处理一小部分数据。训练配置函数get_train_ds_config 主要包括以下内容:
• ZeRO 优化配置:ZeRO(Zero Redundancy Optimizer)是DeepSpeed 提供的一种优化策略,旨在减少训练中的冗余并加速模型的训练。其中的参数,如offload_param 和offload_optimizer,允许用户选择是否将模型参数或优化器状态卸载到CPU。
• 混合精度训练:通过设置fp16 字段,使得模型可以使用16 位浮点数进行训练,从而加速训练过程并减少内存使用。
• 梯度裁剪:通过gradient_clipping 字段,我们可以防止训练过程中的梯度爆炸问题。
• 混合引擎配置:hybrid_engine 部分允许用户配置更高级的优化选项,如输出分词的最大数量和推理张量的大小。
• TensorBoard 配置:使用DeepSpeed 时,可以通过配置选项直接集成TensorBoard,从而更方
便地跟踪训练过程。
验证集配置函数:get_eval_ds_config:此函数提供了DeepSpeed 的验证集。与训练配置相比,验证集配置更为简洁,只需要关注模型推理阶段即可。
具体代码如下所示:
import torch
import deepspeed.comm as dist
GLOBAL_BATCH_SIZE = 32
MICRO_BATCH_SIZE = 4
def get_train_ds_config(
offload,
stage=2,
enable_hybrid_engine=False,
inference_tp_size=1,
release_inference_cache=False,
pin_parameters=True,
tp_gather_partition_size=8,
max_out_tokens=512,
enable_tensorboard=False,
tb_path="",
tb_name=""
):
device = "cpu" if offload else "none"
zero_opt_dict = {
"stage": stage,
"offload_param": {
"device": device
},
# 此处代码未完整,可能还有其他配置项
}
{
"offload_optimizer": {
"device": device
},
"stage3_param_persistence_threshold": 1e4,
"stage3_max_live_parameters": 3e7,
"stage3_prefetch_bucket_size": 3e7,
"memory_efficient_linear": False
}
return {
"train_batch_size": GLOBAL_BATCH_SIZE,
"train_micro_batch_size_per_gpu": MICRO_BATCH_SIZE,
"steps_per_print": 10,
"zero_optimization": zero_opt_dict,
"fp16": {
"enabled": True,
"loss_scale_window": 100
},
"gradient_clipping": 1.0,
"prescale_gradients": False,
"wall_clock_breakdown": False,
"hybrid_engine": {
"enabled": enable_hybrid_engine,
"max_out_tokens": max_out_tokens,
"inference_tp_size": inference_tp_size,
"release_inference_cache": release_inference_cache,
"pin_parameters": pin_parameters,
"tp_gather_partition_size": tp_gather_partition_size,
},
"tensorboard": {
"enabled": enable_tensorboard,
"output_path": f"{tb_path}/ds_tensorboard_logs/",
"job_name": f"{tb_name}_tensorboard"
}
}
def get_eval_ds_config(offload, stage=0):
device = "cpu" if offload else "none"
zero_opt_dict = {
"stage": stage,
"stage3_param_persistence_threshold": 1e4,
"offload_param": {
"device": device
},
"memory_efficient_linear": False
}
return {
"train_batch_size": GLOBAL_BATCH_SIZE,
"train_micro_batch_size_per_gpu": MICRO_BATCH_SIZE,
"steps_per_print": 10,
"zero_optimization": zero_opt_dict,
"fp16": {
"enabled": True
},
"gradient_clipping": 1.0,
"prescale_gradients": False,
"wall_clock_breakdown": False
}
5. DeepSpeed 初始化
在设置DeepSpeed 配置参数后,可以利用DeepSpeed 进行模型训练的初始化,初始化流程包括:
• 确定运行的设备:首先,代码检查是否有指定的本地GPU(通过args.local_rank)。如果没有指定,程序默认使用CUDA 设备。否则,它会为进程设置指定的GPU。
• 初始化分布式后端:在分布式训练中,使用deepspeed.init_distributed() 函数实现每个进程与其他进程的同步,初始化分布式环境。
• 获取当前进程的全局排序:在分布式训练中,使用torch.distributed.get_rank() 获得每个进程的唯一排序或ID。
• 设置DeepSpeed 配置:根据用户参数(如是否进行offload、使用哪个zero stage 等),构建构建了一个DeepSpeed 配置字典,来决定训练设置。
• 同步所有工作进程:使用torch.distributed.barrier() 确保在进一步的初始化之前所有进程都已同步。
• DeepSpeed 初始化:这是最关键的一步。通过deepspeed.initialize,可以将模型、优化器、参数和先前构建的DeepSpeed 配置传递给库,进行初始化。这个函数会返回一个已经根据DeepSpeed配置进行了优化的模型和优化器。
• 梯度检查点:对于特别大的模型,梯度检查点是一种节省显存的技巧,即只在需要时计算模型的中间梯度。如果用户启用了这个选项,则会调用model.gradient_checkpointing_enable() 方法来实现相关功能。
具体代码如下所示:
import deepspeed
if args.local_rank == -1:
device = torch.device("cuda")
else:
torch.cuda.set_device(args.local_rank)
device = torch.device("cuda", args.local_rank)
# Initializes the distributed backend which will take care of sychronizing nodes/GPUs
# torch.distributed.init_process_group(backend='nccl')
deepspeed.init_distributed()
args.global_rank = torch.distributed.get_rank()
ds_config = get_train_ds_config(offload=args.offload,
stage=args.zero_stage,
enable_tensorboard=args.enable_tensorboard,
tb_path=args.tensorboard_path,
tb_name="step1_model")
ds_config['train_micro_batch_size_per_gpu'] = args.per_device_train_batch_size
config['train_batch_size'] = args.per_device_train_batch_size * torch.distributed.get_world_size() * args.gradient_accumulation_steps
# If passed along, set the training seed now.
set_random_seed(args.seed)
torch.distributed.barrier()
model, optimizer, _, lr_scheduler = deepspeed.initialize(
model=model,
optimizer=optimizer,
args=args,
config=ds_config,
lr_scheduler=lr_scheduler,
dist_init_required=True
)
if args.gradient_checkpointing:
model.gradient_checkpointing_enable()
6. 模型训练
借助DeepSpeed 框架实现对模型的训练,训练步骤大致分为以下几个阶段:
• 训练前的准备:使用print_rank_0 函数输出当前的训练状态。该函数确保只有指定的进程(通常是主进程)会打印消息,避免了多进程环境下的重复输出。在开始训练之前,对模型进行了一次评估,计算模型的困惑度。
• 训练循环:每个周期的开始,都会打印当前周期和总周期数。在每次迭代中,数据批次首先被移动到相应的GPU 设备,接着模型对这个批次进行前向传播计算损失。使用model.backward(loss)计算梯度,并使用model.step() 更新模型参数。对于主进程,还会使用print_throughput 函数打印吞吐量,这有助于了解模型的训练速度和效率。
• 保存模型:如果指定了输出目录,模型的状态和配置将被保存。模型可以在不同的格式中保存,例如Hugging Face 的模型格式或DeepSpeed 的Zero Stage 3 特定格式。save_hf_format函数用于保存模型为Hugging Face 格式,这意味着训练后的模型可以使用Hugging Face 的from_pretrained 方法直接加载。对于Zero Stage 3,save_zero_three_model 函数负责保存,因为在这个阶段,每个GPU 只保存了模型的一部分。
具体代码如下所示:
# Train!
print_rank_0("***** Running training *****", args.global_rank)
print_rank_0(f"***** Evaluating perplexity, Epoch {0}/{args.num_train_epochs} *****", args.global_rank)
perplexity = evaluation(model, eval_dataloader)
print_rank_0(f"ppl: {perplexity}", args.global_rank)
for epoch in range(args.num_train_epochs):
print_rank_0(f"Beginning of Epoch {epoch + 1}/{args.num_train_epochs}, Total Micro Batches {len(train_dataloader)}",
args.global_rank)
model.train()
import time
for step, batch in enumerate(train_dataloader):
start = time.time()
batch = to_device(batch, device)
outputs = model(**batch, use_cache=False)
loss = outputs.loss
if args.print_loss:
print(f"Epoch: {epoch}, Step: {step}, Rank: {torch.distributed.get_rank()}, loss: {loss}")
model.backward(loss)
model.step()
end = time.time()
if torch.distributed.get_rank() == 0:
print_throughput(model.model, args, end - start, args.global_rank)
if args.output_dir is not None:
print_rank_0('saving the final model...', args.global_rank)
model = convert_lora_to_linear_layer(model)
if args.global_rank == 0:
save_hf_format(model, tokenizer, args)
if args.zero_stage == 3:
# For zero stage 3, each gpu only has a part of the model, so we need a special save function
save_zero_three_model(model, args.global_rank, args.output_dir, zero_stage=args.zero_stage)
def print_rank_0(msg, rank=0):
if rank <= 0:
print(msg)
# This function can be used to print throughput for Step 1 and 2 only
def print_throughput(hf_model, args, e2e_time, rank=0):
# 函数实现未提供,可能包含计算和打印吞吐量的逻辑
# 假设以下函数和变量已经在其他地方定义
# hf_model, get_hf_configs, calculate_flops, args, model, tokenizer
hf_config = hf_model.config
num_layers, hidden_size, vocab_size = get_hf_configs(hf_config)
gpus_per_model = torch.distributed.get_world_size()
seq_length = args.max_seq_len
batch_size = args.per_device_train_batch_size
e2e_time = ... # 需要提供实际的值或计算方法
checkpoint_activations_factor = 4 if args.gradient_checkpointing else 3
if args.lora_dim > 0:
k = args.lora_dim * 2 / hidden_size
checkpoint_activations_factor -= (1 - k)
hf_model._num_params = sum([p.ds_numel() if hasattr(p, "ds_tensor") else p.numel() for p in hf_model.parameters()])
params_in_billions = hf_model._num_params / (1e9)
# Megatron paper's formula to calculate training flops
train_flops_per_iteration = calculate_flops(
checkpoint_activations_factor,
batch_size,
seq_length,
hf_config
)
train_tflops = train_flops_per_iteration / (e2e_time * gpus_per_model * (10 ** 12))
param_string = f"{params_in_billions:.3f} B" if params_in_billions != 0 else "NA"
print(
f"Model Parameters:{param_string}, Latency:{e2e_time:.2f}s, "
f"TFLOPs:{train_tflops:.2f}, Samples/sec:{samples_per_second:.2f}, "
f"Time/seq{e2e_time / batch_size:.2f}s, Batch Size:{batch_size}, Sequence Length:{seq_length}"
)
def save_hf_format(model, tokenizer, args, sub_folder=""):
# used to save huggingface format, so we can use it for hf.from_pretrained
model_to_save = model.module if hasattr(model, 'module') else model
CONFIG_NAME = "config.json"
WEIGHTS_NAME = "pytorch_model.bin"
output_dir = os.path.join(args.output_dir, sub_folder)
os.makedirs(output_dir, exist_ok=True)
output_model_file = os.path.join(output_dir, WEIGHTS_NAME)
output_config_file = os.path.join(output_dir, CONFIG_NAME)
save_dict = mod
- 点赞
- 收藏
- 关注作者
评论(0)