HCCL集合通信算法开发Hello World示例(超详细)
本文给读者介绍了HCCL算法开发所涉及的概念和流程,并通过一个样例将前文介绍的内容串联起来。本文定位为HCCL算法开发的入门介绍,读者读完后,可结合HCCL开放代码仓中的算法样例,做深入研究。
1、什么是集合通信
集合通信定义了一系列标准信息交换接口,解决并行计算时不同进程之间的通信问题。集合通信的概念和作用可参考文章《HCCL——昇腾高性能集合通信库》。
举例来说,AllGather将group内所有节点的输入按照Rank Id重新排序,然后拼接起来,再将结果发送到所有节点的输出。
HCCL为集合通信算子提供了对应的API供开发者调用,例如AllGather算子的API如下:
HcclResult HcclAllGather(void *sendBuf, void *recvBuf, uint64_t sendCount, HcclDataType dataType, HcclComm comm, aclrtStream stream);
- sendBuf表示输入内存缓冲区指针
- recvBuf表示输出内存缓冲区指针
- sendCount表示输入的数据个数
- dataType表示处理的数据类型
- comm表示算子执行所在的通信域
- stream表示算子执行所在的主流
开发者可以在PyTorch等AI框架中调用HcclAllGather接口,使能昇腾AI处理器(下文简称NPU)执行AllGather通信。
2、集合通信算法
针对同一个通信算子,随着网络拓扑、通信数据量、硬件资源等变化,往往会采用不同的通信算法,从而最大化集群通信性能。以AllGather算子举例,HCCL实现了Mesh、Ring、Recursive Halving-Doubling(RHD)、NHR(Nonuniform Hierarchical Ring)、NB(Nonuniform Bruck)等多种算法用于Server内和Server间的集合通信,通信操作执行时,HCCL会根据输入条件从算法仓中自动选择性能最佳的算法。
3、概念介绍
通信域
集合通信发生在一组通信对象上(比如一个NPU就是一个通信对象)。通信域是集合通信算子执行的上下文,管理对应的通信对象和通信所需的资源。
Rank
通信域中的每个通信对象称为一个Rank。
Memory
集合通信执行所需要的各种Buffer资源。
- Input Buffer:集合通信算子输入数据缓冲区。
- Output Buffer:集合通信算子输出数据缓冲区,存放算法计算结果。
- CCL Buffer:一组地址固定的Buffer,可被远端访问。单算子模式下,通信对象通过CCLBuffer来实现跨Rank的数据交换。CCL buffer和通信域绑定,通信域初始化的时候创建两块CCL buffer,分别称为CCL_In和CCL_Out。CCL_In和CCL_Out默认大小是200M Byte,可以通过环境变量HCCL_BUFFSIZE进行修改。同一个通信域内执行的集合通信算子都复用相同的CCL Buffer。
- Scratch Buffer:除了CCLBuffer,有些算法计算过程中需要额外的存储空间,这部分额外的存储空间,称为Scratch Buffer。
流
流(Stream)是NPU上的一种硬件资源,承载了待执行的Task序列。Task可以是一个DMA操作、一个同步操作或者一个NPU算子等。同一个流上的Task序列按顺序执行,不同流上的Task序列可并发执行。
- 主流:由Pytorch等训练框架调用集合通信API传入的stream对象称为主流。
- 从流:为了实现集合通信算法所需要的并行性而额外申请的stream对象称为从流。
上图展示了一条主流和一条从流,主从流之间通过Post/Wait这一组Task进行同步。主从流之间没有依赖关系时,Task可并行执行。如上图主流的TaskA、TaskB和从流的TaskX、TaskY可以是并行执行的。Post/Wait的具体含义会在后文给出。
Notify
Notify是NPU上的硬件资源,用来做同步。在集合通信中主要有两种作用:1)Rank内主从流之间的同步;2)跨Rank数据收发的同步。
和Notify有关的Task有两种:1)Post;2)Wait。
Post操作给对应的notify寄存器置1,并返回。如果对应的notify值已经是1,则不产生变化,直接返回。
Wait操作会等待对应的notify值变为1。条件满足后,将对应的notify值复位为0,并继续执行后续的Task。
- Rank内主从流同步示例:主流通知从流,实质是将notify1置位为1。从流通知主流,实质是将notify2置位为1。
- 跨Rank数据收发同步示例:与Rank内主从流同步类似,跨Rank的数据收发也需要同步。比如向远端Rank写数据前,得知道远端是否准备好接受数据的Buffer。关于跨Rank的同步,可参考下面关于Transport链路的章节。
Transport链路
要完成Rank间的数据通信需要先建立Transport链路,Transport链路分两种:1)SDMA链路,对应到HCCS/PCIE硬件连接;2)RDMA链路,对应到RoCE硬件连接。
- SDMA Transport链路的两端各有两种类型的notify,分别称为Ack、DataSignal;
- RDMA Transport链路的两端各有三种类型的notify,分别称为Ack、DataSiganl、DataAck;
每条Transport链路会申请各自的notify资源,不同的Transport之间不会复用notify。SDMA链路会申请四个notify,每端各两个;RDMA链路会申请六个notfy,每端各三个。
- SDMA数据收发同步:一次SDMA数据收发需要两组同步,如下图所示,分别使用了Ack和DataSignal两个notify。为了避免同一条链路上多次收发数据相互影响,同步需以Ack开始,以DataSignal结束。
- RDMA数据收发同步:一次RDMA数据收发需要三组同步信号,如下图所示。这是因为RDMA操作在流上是异步执行的,所以Rank 0执行完Write和Post DataSignal之后,并不知道数据什么时候写完,因此需要Rank1 Wait DataSignal满足条件后,再给Rank 0发送一个DataAck同步信号,通知Rank0数据已经写完了。为了避免同一条链路上多次收发数据相互影响,同步需以Ack开始,以DataAck结束。
4、算法实现样例解析
1. 算法原理
在了解完集合通信开发涉及的基本概念后,我们以单台Atlas 800T A2 训练服务器执行AllGather算子来举例,实现一个server内的mesh算法。
假定通信域中有4个NPU,每个NPU与另外三个NPU都有独立的HCCS链路,也就是说server内是mesh链接。硬件拓扑如下:
Buffer初始状态如下图,每个Rank只有UserIn(输入Buffer)中存在有效数据。Rank之间使用CCL内存交换数据,因为本算法只使用CCL_Out,所以没有把CCL_In画出来。
第一步,将数据从UserIn搬移到CCL_Out。
第二步,每个Rank同时从本Rank的CCL_Out和其他Rank的CCL_Out读取数据,并写到自己的UserOut(输出Buffer)的对应位置。这边只画了Rank0的数据搬运方向,其他Rank的搬运方式是类似的。
2. 算法执行流程
在开发集合通信算法之前,需要先了解通信算法的执行流程,如下图所示:
通信算法的主体执行流程可划分为4步:
- HCCL框架根据算子类型构造出对应的算子实例。
- 框架调用算子实例执行算法选择,算法选择会返回将要执行的算法名字和一个标志资源的新tag字符串。
- 框架根据新tag查询对应的资源(Scratch Buffer、从流、主从流同步的notify、Transport链路)是否存在,若不存在,则调用算子实例的资源计算接口获取算法执行需要的资源诉求,然后框架根据资源诉求将对应的资源创建出来。
- 框架传入算法执行需要的资源,并执行算法编排。算法编排执行的过程中,会通过平台层的接口提交要执行的Task。
上述主体流程的步骤2至步骤4,是执行一个算法的必经步骤。那么开发一个新的算法,也需要完成相应的步骤,即算法选择、资源计算和算法编排。
3. 算法选择
每个算法有自己的适用范围,在适用范围内,该算法往往是性能最佳的。本样例介绍的算法在以下三个条件满足时,为优选算法。具体条件为:1)server内是mesh拓扑;2)执行模式为单算子模式;3)通信域中只有一台server。
具体代码实现如下:
// isMeshTopo为true表示server内mesh拓扑
if (isMeshTopo) {
// 表示当前为单算子模式
if (workflowMode_ == HcclWorkflowMode::HCCL_WORKFLOW_MODE_OP_BASE) {
// 表示拓扑中只有一个server
if (isSingleMeshAggregation_) {
// 选择条件:1)server内mesh拓扑;2)单server场景;3)单算子模式
algName = "AllGatherMeshOpbaseExecutor";
}
}
}
4. 资源计算
接下来,我们从算法原理来分析该算法执行需要的资源,代码中以rankSize表示参与通信的实体个数,本样例中rankSize即为4。
- 算法执行不需要额外的ScratchBuffer,因此Scratch Buffer大小为0。
- 算法原理第二步中,Rank内有rankSize块数据并发搬运,一共需要rankSize条流,除去主流,还需要rankSize -1个从流。
- 需要的主从流同步的notify数量为(rankSize -1) * 2。Transport同步的notify随Transport链路自动申请,这边不用计算Transport需要的notify数量。
- 算法原理第二步中,每个Rank都需要和其他的Rank交互,因此需要建立mesh链路。
资源计算的参考代码实现:
// 计算从流的数量,deviceNumPerAggregation即为mesh内的Rank个数,即为RankSize
HcclResult CollAllGatherMeshOpbaseExecutor::CalcStreamNum(u32& streamNum)
{
u32 totalStreamNum = topoAttr_.deviceNumPerAggregation;
// 返回的从流数量为rankSize-1
streamNum = totalStreamNum - 1U;
return HCCL_SUCCESS;
}
// 计算建链诉求
HcclResult CollAllGatherMeshOpbaseExecutor::CalcCommInfo(std::vector<LevelNSubCommTransport>& opTransport)
{
TransportMemType inputType = TransportMemType::RESERVED;
TransportMemType outputType = TransportMemType::RESERVED;
CHK_RET(CalcTransportMemType(inputType, outputType));
CHK_RET(CalcLevel0CommInfo(inputType, outputType, opTransport));
return HCCL_SUCCESS;
}
// 获取建链的内存类型
HcclResult CollAllGatherMeshOpbaseExecutor::CalcTransportMemType(TransportMemType &inputType, TransportMemType &outputType)
{
if (GetWorkflowMode() == HcclWorkflowMode::HCCL_WORKFLOW_MODE_OP_BASE) {
inputType = TransportMemType::CCL_INPUT; // 单算子模式下使用CCL Buffer建链
outputType = TransportMemType::CCL_OUTPUT; // 单算子模式下使用CCL Buffer建链
}
return HCCL_SUCCESS;
}
// 调用公共函数完成server内mesh建链
HcclResult CollAllGatherMeshOpbaseExecutor::CalcLevel0CommInfo(TransportMemType inputType, TransportMemType outputType, std::vector<LevelNSubCommTransport>& opTransport)
{
CommParaInfo commParaLevel0(COMM_LEVEL0, CommType::COMM_TAG_MESH);
CHK_RET(CalcCommPlaneInfo(tag_, commParaLevel0, opTransport[COMM_LEVEL0], inputType, outputType));
return HCCL_SUCCESS;
}
5. 算法编排
以Rank0视角来举例说明算法编排。Rank0上一共有四条流,主流做Rank内的数据搬运,三条从流分别与另外三个Rank做Rank间的数据搬运。
算法原理中第一步对应了主流上的第一个蓝色Task,即将数据从输入Buffer搬到CCL_Out Buffer。在执行第二步之前,需要由主流唤醒从流开始工作,即第一组橙色块所示。然后执行算法原来的第二步,主流上做Rank内搬运,从流上做Rank间搬运,即中间蓝色和绿色块。最后,从流完成任务并通知主流,即图中最后一组橙色块。
图中主从流同步一共使用了6个Wait,对应到资源中的6个notify。主从流同步的写法如下:
// 主流通知从流,其中stream_表示主流,meshSignalAux_表示安排在从流上的notify
HcclResult AllgatherMeshDirect::MainRecordSub()
{
for (u32 signalIndex = 0; signalIndex < meshSignalAux_.size(); signalIndex++) {
CHK_RET(LocalNotify::Post(stream_, dispatcher_, meshSignalAux_[signalIndex],
profilerInput_.stage));
}
return HCCL_SUCCESS;
}
// 从流等待主流,meshStream_表示从流,meshSignalAux_表示安排在从流上的notify
HcclResult AllgatherMeshDirect::SubWaitMain()
{
for (u32 streamIndex = 0; streamIndex < meshSignalAux_.size(); streamIndex++) {
CHK_RET(LocalNotify::Wait(meshStreams_[streamIndex], dispatcher_, meshSignalAux_[streamIndex],
profilerInput_.stage));
}
return HCCL_SUCCESS;
}
// 从流通知主流,meshStream_表示从流,meshSignal_表示安排在主流上的notify
HcclResult AllgatherMeshDirect::SubRecordMain()
{
for (u32 streamIndex = 0; streamIndex < meshSignal_.size(); streamIndex++) {
CHK_RET(LocalNotify::Post(meshStreams_[streamIndex], dispatcher_, meshSignal_[streamIndex],
profilerInput_.stage));
}
return HCCL_SUCCESS;
}
// 主流等待从流,其中stream_表示主流,meshSignal_表示安排在主流上的notify
HcclResult AllgatherMeshDirect::MainWaitSub()
{
for (u32 signalIndex = 0; signalIndex < meshSignal_.size(); signalIndex++) {
CHK_RET(LocalNotify::Wait(stream_, dispatcher_, meshSignal_[signalIndex], profilerInput_.stage));
}
return HCCL_SUCCESS;
}
算法编排实现中使用TxAck表示Post远端的Ack notify,使用RxAck表示Wait本端的Ack notify;使用TxDataSignal表示Post远端的DataSignal notify,使用RxDataSignal表示Wait本端的DataSignal notify。编排逻辑代码如下:
HcclResult AllgatherMeshDirect::RunAsync(const u32 rank, const u32 rankSize, const std::vector<LINK> &links)
{
// 获取数据类型对应的字节数
u32 unitSize = DataUnitSize(dataType_);
u64 sdmaSize = count_ * unitSize; // 当前一次SDMA通信的字节数
u64 sliceSize = opInfo_->count * unitSize; // AllGather输入Buffer对应的字节数
// 获取输入Buffer指针
char* curUerMemInPtr = static_cast<char *>(opInfo_->inputAddr);
// 获取输出Buffer指针
char* curUerMemOutPtr = static_cast<char *>(opInfo_->outputAddr);
// 获取CCL_Out Buffer指针
char* curCommMemOutPtr = static_cast<char *>(outputMem_.ptr());
// 第一步,本地数据从输入Buffer搬移到CCL_Out
DeviceMem src;
DeviceMem dst;
src = DeviceMem::create(curUerMemInPtr, sdmaSize);
u64 localOffsetByte = (sliceSize * rank) % HCCL_MIN_SLICE_ALIGN;
dst = DeviceMem::create(curCommMemOutPtr + localOffsetByte, sdmaSize);
CHK_RET(HcclD2DMemcpyAsync(dispatcher_, dst, src, stream_));
// 主流通知从流开始干活
CHK_RET(MainRecordSub());
CHK_RET(SubWaitMain());
// 本Rank与远端Rank互发Ack信号
for (u32 round = 1; round < rankSize; round++) {
u32 dstRank = BackwardRank(rank, rankSize, round);
Stream& subStream = meshStreams_[round - 1];
// TxAck表示Post远端的Ack notify,RxAck表示Wait本端的Ack nofity
CHK_RET(links[dstRank]->TxAck(subStream));
CHK_RET(links[dstRank]->RxAck(subStream));
}
// 主流将数据从CCL_Out搬移到输出Buffer
src = dst;
dst = DeviceMem::create(curUerMemOutPtr + rank * sliceSize, sdmaSize);
CHK_RET(HcclD2DMemcpyAsync(dispatcher_, dst, src, stream_));
// 从流从其他Rank搬运数据;本Rank与远端Rank互发DataSignal信号
for (u32 round = 1; round < rankSize; round++) {
u32 dstRank = BackwardRank(rank, rankSize, round);
// 获取要下发Task的从流
Stream& subStream = meshStreams_[round - 1];
// 获取server内远端Rank在本Rank映射的CCL_Out Buffer的地址
void *remMemPtr = nullptr;
CHK_RET(links[dstRank]->GetRemoteMem(UserMemType::OUTPUT_MEM, &remMemPtr));
u64 remoteOffsetByte = (sliceSize * dstRank) % HCCL_MIN_SLICE_ALIGN;
src = DeviceMem::create(static_cast<char *>(remMemPtr) + remoteOffsetByte, sdmaSize);
dst = DeviceMem::create(curUerMemOutPtr + dstRank * sliceSize, sdmaSize);
// 将数据从远端Rank读取到本端输出Buffer
CHK_RET(HcclD2DMemcpyAsync(dispatcher_, dst, src, subStream,
links[dstRank]->GetRemoteRank(), links[dstRank]->GetLinkType()));
// TxDataSignal表示Post远端的DataSignal notify
// RxDataSignal表示Wait本端的DataSignal nofity
CHK_RET(links[dstRank]->TxDataSignal(subStream));
CHK_RET(links[dstRank]->RxDataSignal(subStream));
}
// 从流通知主流任务完成
CHK_RET(SubRecordMain());
CHK_RET(MainWaitSub());
return HCCL_SUCCESS;
}
6. 参考代码
本样例即为HCCL开放仓中的AllGatherMeshOpbaseExecutor算法实现,对应代码链接可参考以下链接。为了减少干扰,文档中贴出的代码删除了部分逻辑,不影响整体流程。
算法选择部分代码实现:
资源计算部分代码实现:
算法编排部分代码实现:
5、参考资料
- 点赞
- 收藏
- 关注作者
评论(0)