GPU VNT1裸金属服务器Pytorch多机多卡分布式训练指导书
1. 环境描述
服务器信息: 华为云GPU VNT1裸金属服务器
操作系统:Ubuntu 18.04 server 64bit for V100 BareMetal
GPU驱动版本:470
CUDA版本:11.4
2. 安装NVIDIA驱动
wget https://cn.download.nvidia.com/tesla/470.103.01/NVIDIA-Linux-x86_64-470.103.01.run
chmod +x NVIDIA-Linux-x86_64-470.103.01.run
./NVIDIA-Linux-x86_64-470.103.01.run # 遇到选择对话框,直接回车
3. 安装CUDA
wget https://developer.download.nvidia.cn/compute/cuda/11.4.4/local_installers/cuda_11.4.4_470.82.01_linux.run
chmod +x cuda_11.4.4_470.82.01_linux.run
./cuda_11.4.4_470.82.01_linux.run --toolkit --samples --silent
4. 验证安装结果
验证驱动安装结果
nvidia-smi -pm 1
nvidia-smi
/usr/local/cuda/bin/nvcc -V
5. 安装NCCL&验证NCCL
(1) 安装NCCL
wget https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64/cuda-keyring_1.0-1_all.deb
sudo dpkg -i cuda-keyring_1.0-1_all.deb
sudo apt update
sudo apt install libnccl2=2.10.3-1+cuda11.4 libnccl-dev=2.10.3-1+cuda11.4
(2) 配置环境变量
echo 'export LD_LIBRARY_PATH=/lib64/openmpi/lib:/usr/local/cuda/lib:usr/local/cuda/lib64:/usr/include/nccl.h:/usr/mpi/gcc/openmpi-4.1.5a1/lib:$LD_LIBRARY_PATH
export PATH=$PATH:/usr/local/cuda/bin:/usr/mpi/gcc/openmpi-4.1.5a1/bin' >> ~/.bashrc
source ~/.bashrc
(3) 编译NCCL测试工具
cd /root
git clone https://github.com/NVIDIA/nccl-tests.git
cd ./nccl-tests
make MPI=1 MPI_HOME=/usr/mpi/gcc/openmpi-4.1.5a1 -j 8
(4) 测试NCCL
/root/nccl-tests/build/all_reduce_perf -b 8 -e 1024M -f 2 -g 8
返回如下结果表示安装成功
6. Pytorch多机多卡分布式训练演示
说明:下面的演示是resnet18在cifar10数据集上的分类训练任务,实际使用模型不同,Python依赖包可能有所不同,应该基于实际情况安装。
(1) 安装依赖软件包和Python包
apt-get install libjpeg-dev zlib1g-dev
pip3 install torch -i http://pypi.tuna.tsinghua.edu.cn/simple --trusted-host pypi.tuna.tsinghua.edu.cn
pip3 install torchvision -i http://pypi.tuna.tsinghua.edu.cn/simple --trusted-host pypi.tuna.tsinghua.edu.cn
pip3 install numpy -i http://pypi.tuna.tsinghua.edu.cn/simple --trusted-host pypi.tuna.tsinghua.edu.cn
pip3 install scikit-learn -i http://mirrors.aliyun.com/pypi/simple/ --trusted-host=mirrors.aliyun.com
pip3 install torchrun -i http://mirrors.aliyun.com/pypi/simple/ --trusted-host=mirrors.aliyun.com
(2) 执行ibstatus查询IB设备,并记录IB设备名
(3) 执行Ibdev2netdev获取IB设备名和网络设备名的对应关系
(4) 执行ifconfig [IB的网络设备名]获取IB网卡的IP地址,并记录IP地址
(5) 编写节点上的训练启动脚本
训练启动脚本用于设置NCCL环境变量后通过torch.distributed.launch执行train.py启动分布式训练,以两个节点的多卡训练为例,节点1(master)和节点2上的训练脚本如下:
节点1(master):
#!/usr/bin/env bash
export NCCL_DEBUG=INFO # NCCL日志级别
export NCCL_IB_HCA=mlx5_0 # IB设备的名称
export NCCL_SOCKET_IFNAME=bond0 # 主机的host网卡,通过ifconfig查看
now=$(date +"%Y%m%d_%H%M%S")
CUDA_VISIBLE_DEVICES=0,1,2,3,4,5,6,7 python3 -m torch.distributed.launch --master_addr="169.254.142.62" --master_port 1234 --nproc_per_node=8 --nnodes=2 --node_rank=0 \
train.py --log_time $now
节点2
#!/usr/bin/env bash
export NCCL_DEBUG=INFO # NCCL日志级别
export NCCL_IB_HCA=mlx5_0 # IB设备的名称
export NCCL_SOCKET_IFNAME=bond0 # 主机的host网卡,通过ifconfig查看
now=$(date +"%Y%m%d_%H%M%S")
CUDA_VISIBLE_DEVICES=0,1,2,3,4,5,6,7 python3 -m torch.distributed.launch --master_addr="169.254.142.62" --master_port 1234 --nproc_per_node=8 --nnodes=2 --node_rank=1 \
train.py --log_time $now
参数解释:
torch.distributed.launch参数:
--master_addr="169.254.142.62" 上面第(3)步记录的主节点的IB网卡地址
--master_port 1234 主节点端口号,按需选取
--nproc_per_node=8 每个节点上运行的进程数,建议与卡数一致
--nnodes=2 节点总数
--node_rank=0 节点编号,注意上面两个节点上的启动脚本的区别就在这里,每个节点要有自己单独的编号
train.py脚本的参数:训练脚本的参数可以自行添加,并在脚本中做相应的处理,这里仅演示一个参数
--log_time 训练任务启动时间
(5) 编写train.py脚本
注意:
a. 下面脚本仅用于演示分布式训练,参数不一定合理,请勿直接用于生产。
b. 需要关注的是 “###分布式改造” 开头的注释部分,该部分说明了多节点分布式训练需要适配的代码改造点。
import datetime
import inspect
import os
import pickle
import random
import logging
import argparse
import numpy as np
from sklearn.metrics import accuracy_score
import torch
from torch import nn, optim
import torch.distributed as dist
from torch.utils.data import TensorDataset, DataLoader
from torch.utils.data.distributed import DistributedSampler
MASTER_ADDR = os.environ["MASTER_ADDR"]
MASTER_PORT = os.environ["MASTER_PORT"]
local_rank = int(os.environ["LOCAL_RANK"])
rank = int(os.environ["RANK"])
world_size = int(os.environ["WORLD_SIZE"])
print("MASTER_ADDR: {}\tMASTER_PORT: {}".format(MASTER_ADDR, MASTER_PORT))
print("LOCAL_RANK: {}\tRANK: {}\tWORLD_SIZE: {}".format(local_rank, rank, world_size))
file_dir = os.path.dirname(inspect.getframeinfo(inspect.currentframe()).filename)
def load_pickle_data(path):
with open(path, 'rb') as file:
data = pickle.load(file, encoding='bytes')
return data
def _load_data(file_path):
raw_data = load_pickle_data(file_path)
labels = raw_data[b'labels']
data = raw_data[b'data']
filenames = raw_data[b'filenames']
data = data.reshape(10000, 3, 32, 32) / 255
return data, labels, filenames
def load_cifar_data(root_path):
train_root_path = os.path.join(root_path, 'cifar-10-batches-py/data_batch_')
train_data_record = []
train_labels = []
train_filenames = []
for i in range(1, 6):
train_file_path = train_root_path + str(i)
data, labels, filenames = _load_data(train_file_path)
train_data_record.append(data)
train_labels += labels
train_filenames += filenames
train_data = np.concatenate(train_data_record, axis=0)
train_labels = np.array(train_labels)
val_file_path = os.path.join(root_path, 'cifar-10-batches-py/test_batch')
val_data, val_labels, val_filenames = _load_data(val_file_path)
val_labels = np.array(val_labels)
tr_data = torch.from_numpy(train_data).float()
tr_labels = torch.from_numpy(train_labels).long()
val_data = torch.from_numpy(val_data).float()
val_labels = torch.from_numpy(val_labels).long()
return tr_data, tr_labels, val_data, val_labels
def get_data(root_path, custom_data=False):
if custom_data:
train_samples, test_samples, img_size = 5000, 1000, 32
tr_label = [1] * int(train_samples / 2) + [0] * int(train_samples / 2)
val_label = [1] * int(test_samples / 2) + [0] * int(test_samples / 2)
random.seed(2021)
random.shuffle(tr_label)
random.shuffle(val_label)
tr_data, tr_labels = torch.randn((train_samples, 3, img_size, img_size)).float(), torch.tensor(tr_label).long()
val_data, val_labels = torch.randn((test_samples, 3, img_size, img_size)).float(), torch.tensor(
val_label).long()
tr_set = TensorDataset(tr_data, tr_labels)
val_set = TensorDataset(val_data, val_labels)
return tr_set, val_set
elif os.path.exists(os.path.join(root_path, 'cifar-10-batches-py')):
tr_data, tr_labels, val_data, val_labels = load_cifar_data(root_path)
tr_set = TensorDataset(tr_data, tr_labels)
val_set = TensorDataset(val_data, val_labels)
return tr_set, val_set
else:
try:
import torchvision
from torchvision import transforms
transform = transforms.Compose([
transforms.RandomResizedCrop(224),
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])
tr_set = torchvision.datasets.CIFAR10(root='./data', train=True,
download=True, transform=transform)
val_set = torchvision.datasets.CIFAR10(root='./data', train=False,
download=True, transform=transform)
return tr_set, val_set
except Exception as e:
raise Exception(
f"{e}, you can download and unzip cifar-10 dataset manually, "
"the data url is http://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz")
class Block(nn.Module):
def __init__(self, in_channels, out_channels, stride=1):
super().__init__()
self.residual_function = nn.Sequential(
nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False),
nn.BatchNorm2d(out_channels),
nn.ReLU(inplace=True),
nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1, bias=False),
nn.BatchNorm2d(out_channels)
)
self.shortcut = nn.Sequential()
if stride != 1 or in_channels != out_channels:
self.shortcut = nn.Sequential(
nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
nn.BatchNorm2d(out_channels)
)
def forward(self, x):
out = self.residual_function(x) + self.shortcut(x)
return nn.ReLU(inplace=True)(out)
class ResNet(nn.Module):
def __init__(self, block, num_classes=10):
super().__init__()
self.conv1 = nn.Sequential(
nn.Conv2d(3, 64, kernel_size=3, padding=1, bias=False),
nn.BatchNorm2d(64),
nn.ReLU(inplace=True))
self.conv2 = self.make_layer(block, 64, 64, 2, 1)
self.conv3 = self.make_layer(block, 64, 128, 2, 2)
self.conv4 = self.make_layer(block, 128, 256, 2, 2)
self.conv5 = self.make_layer(block, 256, 512, 2, 2)
self.avg_pool = nn.AdaptiveAvgPool2d((1, 1))
self.dense_layer = nn.Linear(512, num_classes)
def make_layer(self, block, in_channels, out_channels, num_blocks, stride):
strides = [stride] + [1] * (num_blocks - 1)
layers = []
for stride in strides:
layers.append(block(in_channels, out_channels, stride))
in_channels = out_channels
return nn.Sequential(*layers)
def forward(self, x):
out = self.conv1(x)
out = self.conv2(out)
out = self.conv3(out)
out = self.conv4(out)
out = self.conv5(out)
out = self.avg_pool(out)
out = out.view(out.size(0), -1)
out = self.dense_layer(out)
return out
def setup_seed(seed):
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
np.random.seed(seed)
random.seed(seed)
torch.backends.cudnn.deterministic = True
def obs_transfer(src_path, dst_path):
import moxing as mox
mox.file.copy_parallel(src_path, dst_path)
logging.info(f"end copy data from {src_path} to {dst_path}")
def main():
seed = datetime.datetime.now().year
setup_seed(seed)
parser = argparse.ArgumentParser(description='Pytorch distribute training',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--enable_gpu', default='true')
parser.add_argument('--lr', default='0.01', help='learning rate')
parser.add_argument('--epochs', default='10', help='training iteration')
parser.add_argument('--init_method', default=None, help='tcp_port')
parser.add_argument('--rank', type=int, default=0, help='index of current task')
parser.add_argument('--world_size', type=int, default=1, help='total number of tasks')
parser.add_argument('--custom_data', default='false')
parser.add_argument('--data_url', type=str, default=os.path.join(file_dir, 'input_dir'))
parser.add_argument('--output_dir', type=str, default=os.path.join(file_dir, 'output_dir'))
args, unknown = parser.parse_known_args()
args.enable_gpu = args.enable_gpu == 'true'
args.custom_data = args.custom_data == 'true'
args.lr = float(args.lr)
args.epochs = int(args.epochs)
if args.custom_data:
logging.warning('you are training on custom random dataset, '
'validation accuracy may range from 0.4 to 0.6.')
### 分布式改造,DDP初始化进程,其中init_method, rank和world_size参数均由平台自动入参 ###
dist.init_process_group(init_method=args.init_method, backend="nccl", world_size=world_size, rank=rank)
### 分布式改造,DDP初始化进程,其中init_method, rank和world_size参数均由平台自动入参 ###
tr_set, val_set = get_data(args.data_url, custom_data=args.custom_data)
batch_per_gpu = 16
gpus_per_node = torch.cuda.device_count() if args.enable_gpu else 1
batch = batch_per_gpu * gpus_per_node
tr_loader = DataLoader(tr_set, batch_size=batch, shuffle=False)
### 分布式改造,构建DDP分布式数据sampler,确保不同进程加载到不同的数据 ###
tr_sampler = DistributedSampler(tr_set, num_replicas=world_size, rank=rank)
tr_loader = DataLoader(tr_set, batch_size=batch, sampler=tr_sampler, shuffle=False, drop_last=True)
### 分布式改造,构建DDP分布式数据sampler,确保不同进程加载到不同的数据 ###
val_loader = DataLoader(val_set, batch_size=batch, shuffle=False)
lr = args.lr * gpus_per_node * args.world_size
max_epoch = args.epochs
torch.cuda.set_device(local_rank)
model = ResNet(Block).cuda() if args.enable_gpu else ResNet(Block)
torch.backends.cudnn.enabled = False
### 分布式改造,构建DDP分布式模型 ###
model = nn.parallel.DistributedDataParallel(model)
### 分布式改造,构建DDP分布式模型 ###
optimizer = optim.Adam(model.parameters(), lr=lr)
loss_func = torch.nn.CrossEntropyLoss()
os.makedirs(args.output_dir, exist_ok=True)
for epoch in range(1, max_epoch + 1):
model.train()
train_loss = 0
### 分布式改造,DDP sampler, 基于当前的epoch为其设置随机数,避免加载到重复数据 ###
tr_sampler.set_epoch(epoch)
### 分布式改造,DDP sampler, 基于当前的epoch为其设置随机数,避免加载到重复数据 ###
for step, (tr_x, tr_y) in enumerate(tr_loader):
if args.enable_gpu:
tr_x, tr_y = tr_x.cuda(), tr_y.cuda()
out = model(tr_x)
loss = loss_func(out, tr_y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
train_loss += loss.item()
print('train | epoch: %d | loss: %.4f' % (epoch, train_loss / len(tr_loader)))
val_loss = 0
pred_record = []
real_record = []
model.eval()
with torch.no_grad():
for step, (val_x, val_y) in enumerate(val_loader):
if args.enable_gpu:
val_x, val_y = val_x.cuda(), val_y.cuda()
out = model(val_x)
pred_record += list(np.argmax(out.cpu().numpy(), axis=1))
real_record += list(val_y.cpu().numpy())
val_loss += loss_func(out, val_y).item()
val_accu = accuracy_score(real_record, pred_record)
print('val | epoch: %d | loss: %.4f | accuracy: %.4f' % (epoch, val_loss / len(val_loader), val_accu), '\n')
### 分布式改造,由0节点保存模型 ###
if rank == 0:
### 分布式改造,由0节点保存模型 ###
# save ckpt every epoch
torch.save(model.state_dict(), os.path.join(args.output_dir, f'epoch_{epoch}.pth'))
if __name__ == '__main__':
main()
(5) 启动训练
分别在各个节点上运行自己的 trains.sh 脚本,显示如下内容说明NCCL使用了IB网卡进行数据传输
(6) IB网卡流量指标查看
可查看IB网卡打点计数计算流量,路径如下,统计为双字数,计算字节数需要乘以4:
/sys/class/infiniband/mlx5_0/ports/1/counters/port_xmit_data
/sys/class/infiniband/mlx5_0/ports/1/counters/port_rcv_data
- 点赞
- 收藏
- 关注作者
评论(0)