GPU ANT1裸金属服务器Pytorch多机多卡分布式训练指导书

举报
modelarts-dev-server 发表于 2023/11/13 15:10:39 2023/11/13
【摘要】 1. 环境描述服务器信息: 华为云GPU ANT1裸金属服务器操作系统:Ubuntu 18.04 server 64bit for V100 BareMetalGPU驱动版本:470CUDA版本:11.42. 安装NVIDIA驱动 wget https://cn.download.nvidia.com/tesla/470.103.01/NVIDIA-Linux-x86_64-470.103....

1. 环境描述

服务器信息: 华为云GPU ANT1裸金属服务器

操作系统:Ubuntu 18.04 server 64bit for V100 BareMetal

GPU驱动版本:470

CUDA版本:11.4

2. 安装NVIDIA驱动 

wget https://cn.download.nvidia.com/tesla/470.103.01/NVIDIA-Linux-x86_64-470.103.01.run
chmod +x NVIDIA-Linux-x86_64-470.103.01.run
./NVIDIA-Linux-x86_64-470.103.01.run   # 遇到选择对话框,直接回车

3. 安装CUDA

wget https://developer.download.nvidia.cn/compute/cuda/11.4.4/local_installers/cuda_11.4.4_470.82.01_linux.run
chmod +x cuda_11.4.4_470.82.01_linux.run
./cuda_11.4.4_470.82.01_linux.run --toolkit --samples --silent

4. 验证安装结果

验证驱动安装结果

nvidia-smi -pm 1
nvidia-smi
/usr/local/cuda/bin/nvcc -V

5. 安装NCCL&验证NCCL

(1) 安装NCCL

wget https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64/cuda-keyring_1.0-1_all.deb
sudo dpkg -i cuda-keyring_1.0-1_all.deb
sudo apt update
sudo apt install libnccl2=2.10.3-1+cuda11.4 libnccl-dev=2.10.3-1+cuda11.4

(2) 配置环境变量

echo 'export LD_LIBRARY_PATH=/lib64/openmpi/lib:/usr/local/cuda/lib:usr/local/cuda/lib64:/usr/include/nccl.h:/usr/mpi/gcc/openmpi-4.1.5a1/lib:$LD_LIBRARY_PATH
export PATH=$PATH:/usr/local/cuda/bin:/usr/mpi/gcc/openmpi-4.1.5a1/bin'  >>  ~/.bashrc
source ~/.bashrc

(3) 编译NCCL测试工具

cd /root
git clone https://github.com/NVIDIA/nccl-tests.git
cd ./nccl-tests
make  MPI=1 MPI_HOME=/usr/mpi/gcc/openmpi-4.1.5a1 -j 8

(4) 测试NCCL

/root/nccl-tests/build/all_reduce_perf -b 8 -e 1024M -f 2 -g 8

返回如下结果表示安装成功

6. Pytorch多机多卡分布式训练演示

说明:下面的演示是resnet18在cifar10数据集上的分类训练任务,实际使用模型不同,Python依赖包可能有所不同,应该基于实际情况安装。

(1) 安装依赖软件包和Python包

apt-get install libjpeg-dev zlib1g-dev
pip3 install torch -i http://pypi.tuna.tsinghua.edu.cn/simple --trusted-host pypi.tuna.tsinghua.edu.cn
pip3 install torchvision -i http://pypi.tuna.tsinghua.edu.cn/simple --trusted-host pypi.tuna.tsinghua.edu.cn
pip3 install numpy -i http://pypi.tuna.tsinghua.edu.cn/simple --trusted-host pypi.tuna.tsinghua.edu.cn
pip3 install scikit-learn -i http://mirrors.aliyun.com/pypi/simple/ --trusted-host=mirrors.aliyun.com
pip3 install torchrun -i http://mirrors.aliyun.com/pypi/simple/ --trusted-host=mirrors.aliyun.com

(2) 执行ibstatus查询IB设备,并记录IB设备名

(3) 执行Ibdev2netdev获取IB设备名和网络设备名的对应关系

(4) 执行ifconfig [IB的网络设备名]获取IB网卡的IP地址,并记录IP地址

(5) 编写节点上的训练启动脚本

训练启动脚本用于设置NCCL环境变量后通过torch.distributed.launch执行train.py启动分布式训练,以两个节点的多卡训练为例,节点1(master)和节点2上的训练脚本如下:

节点1(master):

#!/usr/bin/env bash
export NCCL_DEBUG=INFO                    # NCCL日志级别
export NCCL_IB_HCA=mlx5_0                 # IB设备的名称
export NCCL_SOCKET_IFNAME=bond0  # 主机的host网卡,通过ifconfig查看

now=$(date +"%Y%m%d_%H%M%S")
CUDA_VISIBLE_DEVICES=0,1,2,3,4,5,6,7 python3 -m torch.distributed.launch --master_addr="169.254.142.62" --master_port 1234 --nproc_per_node=8 --nnodes=2 --node_rank=0 \
train.py --log_time $now

节点2

#!/usr/bin/env bash
export NCCL_DEBUG=INFO                    # NCCL日志级别
export NCCL_IB_HCA=mlx5_0                 # IB设备的名称
export NCCL_SOCKET_IFNAME=bond0  # 主机的host网卡,通过ifconfig查看

now=$(date +"%Y%m%d_%H%M%S")
CUDA_VISIBLE_DEVICES=0,1,2,3,4,5,6,7 python3 -m torch.distributed.launch --master_addr="169.254.142.62" --master_port 1234 --nproc_per_node=8 --nnodes=2 --node_rank=1 \
train.py --log_time $now

参数解释:

torch.distributed.launch参数:

--master_addr="169.254.142.62"  上面第(3)步记录的主节点的IB网卡地址

--master_port 1234 主节点端口号,按需选取

--nproc_per_node=8 每个节点上运行的进程数,建议与卡数一致

 --nnodes=2 节点总数

 --node_rank=0 节点编号,注意上面两个节点上的启动脚本的区别就在这里,每个节点要有自己单独的编号

train.py脚本的参数:训练脚本的参数可以自行添加,并在脚本中做相应的处理,这里仅演示一个参数

--log_time 训练任务启动时间

(5) 编写train.py脚本

注意:

a. 下面脚本仅用于演示分布式训练,参数不一定合理,请勿直接用于生产。

b. 需要关注的是 “###分布式改造” 开头的注释部分,该部分说明了多节点分布式训练需要适配的代码改造点。

import datetime
import inspect
import os
import pickle
import random
import logging 

import argparse
import numpy as np
from sklearn.metrics import accuracy_score
import torch
from torch import nn, optim
import torch.distributed as dist
from torch.utils.data import TensorDataset, DataLoader
from torch.utils.data.distributed import DistributedSampler

MASTER_ADDR = os.environ["MASTER_ADDR"]
MASTER_PORT = os.environ["MASTER_PORT"]
local_rank = int(os.environ["LOCAL_RANK"])
rank = int(os.environ["RANK"])
world_size = int(os.environ["WORLD_SIZE"])

print("MASTER_ADDR: {}\tMASTER_PORT: {}".format(MASTER_ADDR, MASTER_PORT))
print("LOCAL_RANK: {}\tRANK: {}\tWORLD_SIZE: {}".format(local_rank, rank, world_size))

file_dir = os.path.dirname(inspect.getframeinfo(inspect.currentframe()).filename)

def load_pickle_data(path):
    with open(path, 'rb') as file:
        data = pickle.load(file, encoding='bytes')
    return data


def _load_data(file_path):
    raw_data = load_pickle_data(file_path)
    labels = raw_data[b'labels']
    data = raw_data[b'data']
    filenames = raw_data[b'filenames']

    data = data.reshape(10000, 3, 32, 32) / 255
    return data, labels, filenames


def load_cifar_data(root_path):
    train_root_path = os.path.join(root_path, 'cifar-10-batches-py/data_batch_')
    train_data_record = []
    train_labels = []
    train_filenames = []
    for i in range(1, 6):
        train_file_path = train_root_path + str(i)
        data, labels, filenames = _load_data(train_file_path)
        train_data_record.append(data)
        train_labels += labels
        train_filenames += filenames
    train_data = np.concatenate(train_data_record, axis=0)
    train_labels = np.array(train_labels)

    val_file_path = os.path.join(root_path, 'cifar-10-batches-py/test_batch')
    val_data, val_labels, val_filenames = _load_data(val_file_path)
    val_labels = np.array(val_labels)

    tr_data = torch.from_numpy(train_data).float()
    tr_labels = torch.from_numpy(train_labels).long()
    val_data = torch.from_numpy(val_data).float()
    val_labels = torch.from_numpy(val_labels).long()
    return tr_data, tr_labels, val_data, val_labels


def get_data(root_path, custom_data=False):
    if custom_data:
        train_samples, test_samples, img_size = 5000, 1000, 32
        tr_label = [1] * int(train_samples / 2) + [0] * int(train_samples / 2)
        val_label = [1] * int(test_samples / 2) + [0] * int(test_samples / 2)
        random.seed(2021)
        random.shuffle(tr_label)
        random.shuffle(val_label)
        tr_data, tr_labels = torch.randn((train_samples, 3, img_size, img_size)).float(), torch.tensor(tr_label).long()
        val_data, val_labels = torch.randn((test_samples, 3, img_size, img_size)).float(), torch.tensor(
            val_label).long()
        tr_set = TensorDataset(tr_data, tr_labels)
        val_set = TensorDataset(val_data, val_labels)
        return tr_set, val_set
    elif os.path.exists(os.path.join(root_path, 'cifar-10-batches-py')):
        tr_data, tr_labels, val_data, val_labels = load_cifar_data(root_path)
        tr_set = TensorDataset(tr_data, tr_labels)
        val_set = TensorDataset(val_data, val_labels)
        return tr_set, val_set
    else:
        try:
            import torchvision
            from torchvision import transforms

            transform = transforms.Compose([
                transforms.RandomResizedCrop(224),
                transforms.ToTensor(),
                transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
            ])

            tr_set = torchvision.datasets.CIFAR10(root='./data', train=True,
                                                  download=True, transform=transform)
            val_set = torchvision.datasets.CIFAR10(root='./data', train=False,
                                                   download=True, transform=transform)
            return tr_set, val_set
        except Exception as e:
            raise Exception(
                f"{e}, you can download and unzip cifar-10 dataset manually, "
                "the data url is http://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz")


class Block(nn.Module):

    def __init__(self, in_channels, out_channels, stride=1):
        super().__init__()
        self.residual_function = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(out_channels)
        )

        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )

    def forward(self, x):
        out = self.residual_function(x) + self.shortcut(x)
        return nn.ReLU(inplace=True)(out)


class ResNet(nn.Module):

    def __init__(self, block, num_classes=10):
        super().__init__()
        self.conv1 = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True))
        self.conv2 = self.make_layer(block, 64, 64, 2, 1)
        self.conv3 = self.make_layer(block, 64, 128, 2, 2)
        self.conv4 = self.make_layer(block, 128, 256, 2, 2)
        self.conv5 = self.make_layer(block, 256, 512, 2, 2)
        self.avg_pool = nn.AdaptiveAvgPool2d((1, 1))
        self.dense_layer = nn.Linear(512, num_classes)

    def make_layer(self, block, in_channels, out_channels, num_blocks, stride):
        strides = [stride] + [1] * (num_blocks - 1)
        layers = []
        for stride in strides:
            layers.append(block(in_channels, out_channels, stride))
            in_channels = out_channels
        return nn.Sequential(*layers)

    def forward(self, x):
        out = self.conv1(x)
        out = self.conv2(out)
        out = self.conv3(out)
        out = self.conv4(out)
        out = self.conv5(out)
        out = self.avg_pool(out)
        out = out.view(out.size(0), -1)
        out = self.dense_layer(out)
        return out


def setup_seed(seed):
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    np.random.seed(seed)
    random.seed(seed)
    torch.backends.cudnn.deterministic = True


def obs_transfer(src_path, dst_path):
    import moxing as mox
    mox.file.copy_parallel(src_path, dst_path)
    logging.info(f"end copy data from {src_path} to {dst_path}")


def main():
    seed = datetime.datetime.now().year
    setup_seed(seed)

    parser = argparse.ArgumentParser(description='Pytorch distribute training',
                                     formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    parser.add_argument('--enable_gpu', default='true')
    parser.add_argument('--lr', default='0.01', help='learning rate')
    parser.add_argument('--epochs', default='10', help='training iteration')

    parser.add_argument('--init_method', default=None, help='tcp_port')
    parser.add_argument('--rank', type=int, default=0, help='index of current task')
    parser.add_argument('--world_size', type=int, default=1, help='total number of tasks')

    parser.add_argument('--custom_data', default='false')
    parser.add_argument('--data_url', type=str, default=os.path.join(file_dir, 'input_dir'))
    parser.add_argument('--output_dir', type=str, default=os.path.join(file_dir, 'output_dir'))

    args, unknown = parser.parse_known_args()

    args.enable_gpu = args.enable_gpu == 'true'
    args.custom_data = args.custom_data == 'true'
    args.lr = float(args.lr)
    args.epochs = int(args.epochs)

    if args.custom_data:
        logging.warning('you are training on custom random dataset, '
              'validation accuracy may range from 0.4 to 0.6.')

    ### 分布式改造,DDP初始化进程,其中init_method, rank和world_size参数均由平台自动入参 ###
    dist.init_process_group(init_method=args.init_method, backend="nccl", world_size=world_size, rank=rank)
    ### 分布式改造,DDP初始化进程,其中init_method, rank和world_size参数均由平台自动入参 ###

    tr_set, val_set = get_data(args.data_url, custom_data=args.custom_data)

    batch_per_gpu = 16
    gpus_per_node = torch.cuda.device_count() if args.enable_gpu else 1
    batch = batch_per_gpu * gpus_per_node

    tr_loader = DataLoader(tr_set, batch_size=batch, shuffle=False)

    ### 分布式改造,构建DDP分布式数据sampler,确保不同进程加载到不同的数据 ###
    tr_sampler = DistributedSampler(tr_set, num_replicas=world_size, rank=rank)
    tr_loader = DataLoader(tr_set, batch_size=batch, sampler=tr_sampler, shuffle=False, drop_last=True)
    ### 分布式改造,构建DDP分布式数据sampler,确保不同进程加载到不同的数据 ###

    val_loader = DataLoader(val_set, batch_size=batch, shuffle=False)

    lr = args.lr * gpus_per_node * args.world_size
    max_epoch = args.epochs
    torch.cuda.set_device(local_rank)
    model = ResNet(Block).cuda() if args.enable_gpu else ResNet(Block)
    torch.backends.cudnn.enabled = False

    ### 分布式改造,构建DDP分布式模型 ###
    model = nn.parallel.DistributedDataParallel(model)
    ### 分布式改造,构建DDP分布式模型 ###

    optimizer = optim.Adam(model.parameters(), lr=lr)
    loss_func = torch.nn.CrossEntropyLoss()

    os.makedirs(args.output_dir, exist_ok=True)

    for epoch in range(1, max_epoch + 1):
        model.train()
        train_loss = 0

        ### 分布式改造,DDP sampler, 基于当前的epoch为其设置随机数,避免加载到重复数据 ###
        tr_sampler.set_epoch(epoch)
        ### 分布式改造,DDP sampler, 基于当前的epoch为其设置随机数,避免加载到重复数据 ###

        for step, (tr_x, tr_y) in enumerate(tr_loader):
            if args.enable_gpu:
                tr_x, tr_y = tr_x.cuda(), tr_y.cuda()
            out = model(tr_x)
            loss = loss_func(out, tr_y)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
        print('train | epoch: %d | loss: %.4f' % (epoch, train_loss / len(tr_loader)))

        val_loss = 0
        pred_record = []
        real_record = []
        model.eval()
        with torch.no_grad():
            for step, (val_x, val_y) in enumerate(val_loader):
                if args.enable_gpu:
                    val_x, val_y = val_x.cuda(), val_y.cuda()
                out = model(val_x)
                pred_record += list(np.argmax(out.cpu().numpy(), axis=1))
                real_record += list(val_y.cpu().numpy())
                val_loss += loss_func(out, val_y).item()
        val_accu = accuracy_score(real_record, pred_record)
        print('val | epoch: %d | loss: %.4f | accuracy: %.4f' % (epoch, val_loss / len(val_loader), val_accu), '\n')

        ### 分布式改造,由0节点保存模型 ###
        if rank == 0:
        ### 分布式改造,由0节点保存模型 ###
            # save ckpt every epoch
            torch.save(model.state_dict(), os.path.join(args.output_dir, f'epoch_{epoch}.pth'))

if __name__ == '__main__':
    main()

(5) 启动训练

分别在各个节点上运行自己的 trains.sh 脚本,显示如下内容说明NCCL使用了IB网卡进行数据传输

(6) IB网卡流量指标查看

可查看IB网卡打点计数计算流量,路径如下,统计为双字数,计算字节数需要乘以4:

/sys/class/infiniband/mlx5_0/ports/1/counters/port_xmit_data

/sys/class/infiniband/mlx5_0/ports/1/counters/port_rcv_data

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。