华为昇腾CANN——AscendCL特性之同步异步(C++)

举报
Tianyi_Li 发表于 2022/06/06 20:00:00 2022/06/06
【摘要】 本次介绍到的AscendCL接口系列旨在了解AscendCL提供的多种特性,其中每种特性都包含了若干个可调用的接口。

前情提要

本次介绍到的AscendCL接口系列旨在了解AscendCL提供的多种特性,其中每种特性都包含了若干个可调用的接口。

简介:

这里介绍AscendCL提供的多种特性,其中每种特性都包含了若干个可调用的接口。这些特性分别是:

  1. 运行资源管理
  2. 内存管理与数据传输
  3. 模型推理基础场景
  4. 同步&异步
  5. DVPP图像编解码
  6. DVPP图形图像增强
  7. 单算子计算
  8. 模型推理扩展场景

在优化应用性能的漫漫长路上,同步等待方面的优化通常是个绕不开的问题。一张推理卡上有多个NPU,一个进程内有多个线程可用,这些资源能否充分利用,很大程度上决定了应用的最终性能。这里聚焦于理解AscendCL的同步等待场景、业务流程以及相关接口的使用方法。

这里将会提到AscendCL提供的3类同步等待机制,不像模型推理实验中提到的3类接口那样要顺序调用,同步等待的3类接口,在使用的时候,可以根据业务场景自由组合。这3类场景分别是:

  1. 多Device场景
  2. 多Stream场景
  3. Callback场景

下面请容余一一道来。

开始编写代码之前,记得在你的.h或.cpp文件中包含AscendCL的头文件:

#include "acl/acl.h"
// For x86
#pragma cling add_include_path("/usr/local/Ascend/ascend-toolkit/latest/x86_64-linux/acllib/include/")
#pragma cling add_library_path("/usr/local/Ascend/ascend-toolkit/latest/x86_64-linux/acllib/lib64/")
#pragma cling load("libascendcl.so")

#define INFO_LOG(fmt, args...) fprintf(stdout, "[INFO]  " fmt "\n", ##args)
#define WARN_LOG(fmt, args...) fprintf(stdout, "[WARN]  " fmt "\n", ##args)
#define ERROR_LOG(fmt, args...) fprintf(stdout, "[ERROR] " fmt "\n", ##args)

#include <iostream>
#include "acl/acl.h"
#include <stdio.h>
#include <fstream>
#include <cstring>
#include <sys/stat.h>
#include <map>
#include <sstream>
#include <thread>
#include <mutex>
using namespace std;

多Device场景

关于Device概念,我们之前聊过,不了解的同学可以看看之前的博文或到查看昇腾文档,一个设备上(比如一张Atlas300I推理卡上),是可以有多个芯片的,我们在开始计算之前,首先要指定芯片编号来执行进一步计算动作(还记得aclrtSetDevice吗)。既然设备上有多个芯片可用,那么很自然地我们就会想到,可不可以在应用中同时调用这些芯片做计算呢?

答案是肯定的,但是需要一点技巧,首先看下边这张图:

image.png

上图是一个常见的多线程推理应用的执行流程示意图,忽略图中对于Stream的描述,我们只看Device和Context的部分。大矩形框中,每一列代表的是一个线程。其中:

  • 线程A申请了Device0,在Device0上创建了ctx1,并执行后续计算;
  • 线程B同样申请了Device0,在Device0上创建了ctx2,并执行后续计算;
  • 线程C申请了Device1,在Device1上创建了ctx3,执行后续计算;执行一定量的任务后,又将当前Context切换到了线程2创建的ctx2上,执行后续的计算。

通过上边的描述,我们能得到如下信息:

  1. 线程A和线程B都指定了Device0作为计算设备,意味着Device不是线程独占的,多个线程可以同时指定某个Device作为计算设备,大家可以一起往同一个Device上堆任务,而任务的先后顺序,就由AscendCL内部任务调度来决定了;
  2. 线程C首先指定了Device1作为计算设备,并在Device1上创建了个“ctx3”,在这上边执行了一定量的计算之后,又将当前context切换到了“ctx2”。从图中可以看出,“ctx2”是由线程B在Device0上创建的,而线程C这样一切换,就让线程C接下来的执行也在“ctx2”中了,好处在于,线程C接下来的计算都会被分发到Device0上了。换句话说,线程C通过切换context,间接实现了Device的切换。实际上,这种操作在AscendCL中是被鼓励的,用切换当前context的方式来切换Device甚至比直接调用“aclrtSetDevice”接口来切换Device效率更高。
  3. 说回运行资源的管理,当涉及多种运行资源的申请和释放时,申请流程通常是(暂时忽略什么是Stream和Event,下文会进行说明):

SetDevice->CreateContext->CreateStream->CreateEvent

而与之相对应的,释放流程应该是:

DestroyEvent->DestroyStream->DestroyContext->ResetDevice

但是当我们面对多线程推理应用时,直接执行DestroyEvent/Stream/Context可能导致其上的任务被强行断开,产生未知的后果。这个时候,为了保证在执行各种销毁接口之前,该Device上的任务都已经执行完毕,我们需要下边这个接口:

aclError aclrtSynchronizeDevice(void)

这个接口会阻塞当前线程的执行,直到对应Device上的所有任务都执行完毕。但是在接口的参数表中并没有看到指定DeviceId的参数,是因为当前线程一定有且只有一个“currentcontext”,而currentContext是会绑定一个Device的,所以调用aclrtSynchronizeDevice接口,等待的就是currentContext绑定的Device。

调用完这个接口之后,就可以放心地销毁Event/Stream/Context,进而ResetDevice了。

多Stream场景

多Stream场景是AscendCL的一个核心知识点,也是后续很多接口调用的不可或缺的一部分。在开始讲解多Stream场景之前,先对齐一下对“同步&异步”概念的理解。

在AscendCL中当我们提及“同步&异步”的时候,都是站在调用者、执行者的角度来看的。

  • 同步:

image.png

方法/函数的同步调用是比较符合我们平时的编程习惯的,即在当前线程发起一个方法调用,然后阻塞在调用点等待被调用方法执行完毕返回,再继续向下走:

// 伪码表达
print("before")
foo()  // 当前线程会阻塞在调用foo()的这个点,等待foo()执行完毕返回再继续向下走
print("after")

在AscendCL中,所谓“同步”是指调用者(当前线程)在向执行者(某个执行任务的线程,经常在Device侧)下发一个任务后,阻塞在调用点,等待这个任务执行完毕后拿回控制权,再继续向下执行的过程。

  • 异步:

image.png

异步与同步一个很明显的区别就是,调用者向执行者下发一个任务之后,不再等待任务执行完毕,而是立即返回往下走,暂且不管这个任务是否执行完毕,以后再说。

但是这种下发任务之后让任务放飞自我不管了的方式,在后续想得到任务执行结果的时候可就伤脑筋了,没法知道任务是否已经执行完成,因为与任务之间的联系都断掉了。我们当然不会允许这种事情发生。观察下边两个接口:

aclError aclrtMemcpy(void *dst, size_t destMax, const void *src, size_t count, aclrtMemcpyKind kind)

aclError aclrtMemcpyAsync(void *dst, size_t destMax, const void *src, size_t count, aclrtMemcpyKind kind, aclrtStream stream)

上边的接口“aclrtMemcpy”在关于内存管理与数据传输的,具体可参考官方文档,是内存拷贝专用的接口。内存拷贝这个动作有些时候是很耗时的,而“aclrtMemcpy”作为一个同步接口,调用者调用之后要一直阻塞在调用点等待拷贝完成才能进行下一步操作,对调用者而言浪费了很多(本可以做一些其它事情的)时间,于是有了下边的接口,即“aclrtMemcpyAsync”,内存拷贝接口的异步版本。

玄机都在这个“Async”上,多了这个字眼的接口,观察其参数表,通常比其同步版本多一个参数,“aclrtStream stream”,要在调用异步接口的时候传一个Stream对象进去。

在AscendCL中,Stream用于维护一些异步操作的执行顺序,确保按照应用程序中的代码调用顺序在Device上执行。可以将Stream简单理解为一个异步任务的队列,主线程在调用异步接口时,指定了某个stream,本质上就是把异步任务送进了这个任务队列中,既然只是给任务队列下发一个任务,当然不必等待任务执行完毕再返回,而是立即就可以返回。而在AscendCL的后台,运行时环境会自动从stream中依次取出一个个任务来执行。

image.png

把Stream理解为“任务队列”,其实还有另一层含义,就是在队列中任务的执行是保序的,即运行时环境会根据任务下发的顺序来依次执行,不会出现乱序执行的情况。所以,如果放进同一个Stream的任务之间前后存在依赖关系,那么在编程上确保被依赖的接口调用更早放入Stream即可。

任务丢给Stream了,主线程跑去做别的事儿了,当别的事情都完成了,总会有一个时刻我们需要知道(或者说确保)Stream中的任务都已经执行完毕了,才能继续向下走(比如把数据预处理任务丢给了Stream,但接下来马上要推理了,需要数据预处理的结果),这个时候我们需要一个确认Stream中任务全部执行完毕的接口

aclError aclrtSynchronizeStream(aclrtStream stream)

这个接口就是专门为上述场景准备的,调用这个接口的时候,线程会阻塞在调用点上,等待指定stream中所有任务全部执行完毕才会继续向下进行。
阅读下边的代码,执行一下,观察效果:

void test()
{
    /* 1-初始化 */
    const char *aclConfigPath = "";
    aclError ret = aclInit(aclConfigPath);
    INFO_LOG("AscendCL init success.");
        
    /* 2-申请运行资源 */
    ret = aclrtSetDevice(0);  
    INFO_LOG("Set device %d success.",0);
    
    aclrtContext context;
    ret = aclrtCreateContext(&context, 0);
    INFO_LOG("Create context success.");
    
    /* 3-申请stream */
    aclrtStream stream;
    ret = aclrtCreateStream(&stream);
    INFO_LOG("Create stream success.");
    
    /* 4-创建host内存 */
    void *hostInput = nullptr;
    int64_t size_input = 32;
    ret = aclrtMallocHost(&hostInput, size_input);
    ret = aclrtMemset(hostInput, size_input, 0, size_input);   
    INFO_LOG("AscendCL HostMem Malloc success .");
    
    /* 5-创建device内存 */
    void *devInput = nullptr;
    ret = aclrtMalloc(&devInput, size_input, ACL_MEM_MALLOC_HUGE_FIRST);
    INFO_LOG("AscendCL DeviceMem Malloc success.");
        
    /* 6-device内存初始化*/
    ret = aclrtMemset(devInput, size_input, 10, size_input);    
    INFO_LOG("Set All Device Memory to 10");
    
    /* 7-显示host内存*/
    char *p = (char *)hostInput;  
    INFO_LOG("Display: host data :");
    for(int i = 0 ;i< size_input;i++)        
        printf("%d ", p[i]); 
    printf("\n"); 
        
    /* 8-异步内存拷贝device->host*/
    ret =aclrtMemcpyAsync(hostInput, size_input,devInput,size_input, ACL_MEMCPY_DEVICE_TO_HOST, stream);
    INFO_LOG("Copy Device Data to Host Async.");

    /* 9-同步等待*/
    ret = aclrtSynchronizeStream(stream);
    INFO_LOG("AclrtSynchronizeStream success.");

    /* 10-显示host内存*/
    INFO_LOG("Display: host data :");
    for(int i = 0 ;i< size_input;i++)
        printf("%d ", p[i]);
    printf("\n");

    /* 11-销毁资源 */
    ret = aclrtDestroyStream(stream);
    INFO_LOG("End to destroy stream.");

    ret = aclrtFree(devInput);
    ret = aclrtFree(hostInput);

    ret = aclrtDestroyContext(context);
    INFO_LOG("Context is destroyed.");

    ret = aclrtResetDevice(0); 
    ret = aclFinalize();
    INFO_LOG("End to finalize acl.");
    return 0;
}

test();

image.png

现在您应该了解什么是Stream了,但到现在为止,我们只讨论了单Stream的场景,下面我们更进一步,讨论一下多Stream要如何配合。

前边的aclrtSynchronizeStream,是等待某个Stream内的任务全部执行完毕。但是有些时候,我们只想知道某个特定的任务是什么时候执行完毕的,而不是等待整个Stream结束,如下图所示:

image.png

上图中“Stream1->任务4”的执行依赖“Stream2->任务6”执行完毕,而如果还按照之前的方式,任务4执行前等待整个Stream2全部执行完毕,其实是多等了“任务7”、“任务8”的执行时间的。为了对Stream间任务依赖进行精细化管理,我们需要一个新的运行资源:Event。

Event通常用于在Stream之间执行事件同步操作,在两个Stream之间存在任务级别的依赖时尤其有用,如下图所示:

image.png

注意区分与前一个图的区别,“Stream1->任务4”的确是依赖“Stream2->任务6”的完成,但这两个任务之间是无法直接产生依赖关系的,要使用Event机制来同步:

  • Stream2->任务6:在执行完毕后,声明“event1”这一事件已发生
  • Stream1->任务4:在执行之前,等待“event1”这一事件的发生

两个无法产生直接依赖关系的任务,通过Event实现了依赖机制。Event这个运行资源的其创建和销毁接口请参见官方文档,声明和等待Event发生需要下边这两个接口:

aclError aclrtRecordEvent(aclrtEvent event, aclrtStream stream)

aclError aclrtStreamWaitEvent(aclrtStream stream, aclrtEvent event)

从接口名字上应该能很容易地猜到其功能,第一个接口就是在stream上声明event已经发生,第二个接口则是让stream等待event的发生。

image.png

可以把“aclrtRecordEvent”和“aclrtStreamWaitEvent”这两个接口当做两种特殊的送进Stream的任务,和其他送进Stream的任务一样,这两个接口调用完都是可以立即返回的,主线程不必跟着Stream一起等着事件发生。

执行一下下边的小例子,思考一下为什么这么写:

void EventTest()
{
    /*1-初始化*/
    const char *aclConfigPath = "";
    aclError ret = aclInit(aclConfigPath);
    INFO_LOG("AscendCL init success.");    
        
    /*2-申请运行资源*/
    ret = aclrtSetDevice(0);  
    INFO_LOG("Set device %d success.",0);  
    aclrtContext context;
    ret = aclrtCreateContext(&context, 0);
    INFO_LOG("Create context success.");       
     
    /* 3-创建一个Event,显示event状态 */
    aclrtEvent event;
    aclrtEventStatus status;
    /*
    typedef enum aclrtEventStatus {
    ACL_EVENT_STATUS_COMPLETE  = 0, //完成
    ACL_EVENT_STATUS_NOT_READY = 1, //未完成
    ACL_EVENT_STATUS_RESERVED  = 2, //预留
    } aclrtEventStatus;
    */    
    ret = aclrtCreateEvent(&event);
    ret = aclrtQueryEvent(event, &status);
    INFO_LOG("Create event success, event status is %d (0:COMPLETE, 1:NOT_READY).", status);
    
    /* 4-创建两个Stream */
    aclrtStream stream1, stream2;
    ret = aclrtCreateStream(&stream1);
    ret = aclrtCreateStream(&stream2);
    INFO_LOG("Create stream1&stream2 success.");    
        
    /* 5-创建host内存 */
    void *hostInput = nullptr;
    int64_t size_input = 32;
    ret = aclrtMallocHost(&hostInput, size_input);
    ret = aclrtMemset(hostInput, size_input, 0, size_input);  
    INFO_LOG("AscendCL Host Mem Malloc success.");
    
    /* 6-创建device内存 */
    void *devInput = nullptr;
    ret = aclrtMalloc(&devInput, size_input, ACL_MEM_MALLOC_HUGE_FIRST);
    INFO_LOG("AscendCL Device Mem Malloc success.");
        
    /* 7-device内存初始化*/
    ret = aclrtMemset(devInput, size_input, 10, size_input);    
    INFO_LOG("Set all device mem 10.");
    
    /* 8- 在Stream中记录一个Event*/
    ret =  aclrtRecordEvent(event, stream1);
    INFO_LOG("AclrtRecordEvent stream1 success.");
    
    /*9-阻塞应用程序运行,等待event发生,也就是stream执行完成*/
    ret =  aclrtStreamWaitEvent(stream2, event);
    INFO_LOG("AclrtStreamWaitEvent stream2 success.");
    
    /* 10-显示event状态、host内存内容*/    
    ret = aclrtQueryEvent(event, &status);
    INFO_LOG("AclrtQueryEvent success, event status is %d (0:COMPLETE, 1:NOT_READY).", status);
    char *p = (char *)hostInput;          
    INFO_LOG("Display host data:");
    for(int i = 0; i< size_input; i++)        
        printf("%d ", p[i]); 
    printf("\n"); 
        
    /* 11-异步内存拷贝device->host*/
    ret = aclrtMemcpyAsync(hostInput, size_input,devInput,size_input, ACL_MEMCPY_DEVICE_TO_HOST, stream2);
    INFO_LOG("Copy Device Data to Host Async.");    
    
    /* 12-显示event状态、host内存内容*/
    ret = aclrtQueryEvent(event, &status);
    INFO_LOG("AclrtQueryEvent success, event status is %d (0:COMPLETE, 1:NOT_READY).", status);
    INFO_LOG("Display host data:");
    for(int i = 0; i < size_input; i++)            
        printf("%d ", p[i]);     
    printf("\n");                   
        
    /* 13- aclrtResetEvent */
    aclrtResetEvent(stream2, event);
    INFO_LOG("AclrtResetEvent  stream2 success.");    
    ret = aclrtSynchronizeStream(stream2);
    /* 14- 查看event状态、host内存内容*/
    ret = aclrtQueryEvent(event, &status);
    INFO_LOG("AclrtQueryEvent success, event status is %d (0:COMPLETE, 1:NOT_READY).", status);   
    INFO_LOG("Display host data:");
    for(int i = 0; i < size_input; i++)            
        printf("%d ", p[i]);     
    printf("\n"); 
    
    /* 15-销毁资源 */
    ret = aclrtDestroyStream(stream1);
    ret = aclrtDestroyStream(stream2);
    INFO_LOG("End to destroy stream.");
    ret = aclrtFree(devInput);
    ret = aclrtFree(hostInput);    
    ret =  aclrtDestroyEvent(event);
    INFO_LOG("End to destroy event.");    

    ret = aclrtDestroyContext(context);
    INFO_LOG("End to destroy context.");
    
    ret = aclrtResetDevice(0); 
    ret = aclFinalize();
    INFO_LOG("End to finalize acl.");
    return 0;
}

EventTest();

输出结果:

[INFO]  AscendCL init success.
[INFO]  Set device 0 success.
[INFO]  Create context success.
[INFO]  Create event success, event status is 0 (0:COMPLETE, 1:NOT_READY).
[INFO]  Create stream1&stream2 success.
[INFO]  AscendCL Host Mem Malloc success.
[INFO]  AscendCL Device Mem Malloc success.
[INFO]  Set all device mem 10.
[INFO]  AclrtRecordEvent stream1 success.
[INFO]  AclrtStreamWaitEvent stream2 success.
[INFO]  AclrtQueryEvent success, event status is 1 (0:COMPLETE, 1:NOT_READY).
[INFO]  Display host data:
0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 
[INFO]  Copy Device Data to Host Async.
[INFO]  AclrtQueryEvent success, event status is 0 (0:COMPLETE, 1:NOT_READY).
[INFO]  Display host data:
0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 
[INFO]  AclrtResetEvent  stream2 success.
[INFO]  AclrtQueryEvent success, event status is 0 (0:COMPLETE, 1:NOT_READY).
[INFO]  Display host data:
10 10 10 10 10 10 10 10 10 10 10 10 10 10 10 10 10 10 10 10 10 10 10 10 10 10 10 10 10 10 10 10 
[INFO]  End to destroy stream.
[INFO]  End to destroy event.
[INFO]  End to destroy context.
[INFO]  End to finalize acl.

前边介绍Device和Stream的时候,都有对应的Synchronize接口,Event当然也有:

aclError aclrtSynchronizeEvent(aclrtEvent event)

功能是阻塞主线程执行,直到event发生(即某个stream跑到了aclrtRecordEvent)。到现在为止我们见过的三个Synchronize接口:

  • aclrtSynchronizeDevice
  • aclrtSynchronizeStream
  • aclrtSynchronizeEvent

都是阻塞主线程等待对应的运行资源内任务全部完成,只是资源的粒度不一样,请同学们注意区分。

Callback场景

下面我们来研究 AscendCL同步异步机制中最复杂的场景————Callback场景。

关于Callback(回调),可以先读一读这篇分享

选用Callback场景,其目的总结下来就是一句话:

另一个线程 去处理 异步接口 计算的 结果

这句话听起来有点抽象,乍一听不太好理解,我们这样去理解:
把一个推理应用极简化抽象,只剩2个步骤:

  1. 模型推理
  2. 推理结果后处理

在同步推理场景下,我们是这么做的:

image.png

  • 在当前线程调用同步推理接口
  • 等待推理结束
  • 在当前线程处理推理结果

而在Stream场景下,我们是这么做的:

image.png

  • 在当前线程向某个Stream下发异步推理任务,并立即返回
  • 当前线程可以做一些(或者不做任何事)事情
  • 需要推理结果时,在当前线程调用aclrtSynchronizeStream等待Stream中任务完成
  • 在当前线程处理推理结果

在Callback场景下,我们将会这么做:

image.png

  • 在当前线程向Stream下发异步推理任务,立即返回
  • 在当前线程向Stream下发Callback任务(处理推理结果),立即返回
  • 当前线程自由啦

从同步场景,到Stream场景,再到Callback场景,我们见证了主线程一步一步被释放。在同步场景,所有场景都在主线程完成;在Stream场景,推理送到了Stream来做,而后处理还要主线程完成;而在Callback场景下,推理和后处理就都放到Stream中了。

讲完了Callback场景的基本概念,下面我们看一下使用Callback场景所需的编程要件。

要件一:Callback函数

既然是Callback场景,Callback函数本体肯定是不可或缺的。这个函数是完全由我们自己编程定义的,只需要遵循下边这种函数签名即可:

typedef void (*aclrtCallback)(void *userData)

其中,“aclrtCallback”这个名字可以你来起,遵循基本编程规范即可;而参数表中的userData,则是软件栈在调用Callback时传递给我们的用户数据,这部分数据也可以由我们自己来定义。以前边的例子为例,我们可以在Callback函数体中定义后处理过程。

要件二:定义一个用于执行Callback函数的线程

每个Callback调用都会被当做一个任务送进Stream队列中(就跟异步推理、异步内存拷贝类似),但是跟一般的异步接口不同的是,一般的异步接口会有Stream上系统默认的线程来处理任务,但是Callback任务要由用户自定义的线程来处理。想要让另一个线程去执行Callback,首先要定义一下这个线程。C++定义线程的编程操作不在这里过多赘述,这里我们强调一下线程主函数的编写:

aclrtSetCurrentContext
while(退出标志)
{
    aclrtProcessReport(等待时长) 
}

在这个Callback执行线程的线程主函数中,我们做了2件事情:

  1. aclrtSetCurrentContext,为这个线程指定一个Context。
  2. while一个无限循环,循环内调用aclrtProcessReport接口。

aclrtProcessReport(int32_t timeout):等待指定时间后,触发回调处理

虽说我们用这个额外定义的线程去执行Callback,但为了保持其灵活性,我们并不在线程方法中直接调用Callback,而是调用aclrtProcessReport去等待Stream送Callback过来,一旦等到,就执行它,等不到,就下个while接着等。这样做的好处是线程不必和某个特定的Callback绑定,Stream送什么Callback过来,线程就执行什么。现在看不懂这段描述没关系,读完整个实验及代码再回来读几遍。

要件三:把Callback的执行线程注册到Stream上

把上一步中创建的线程注册到Stream上。一个Stream可能会执行多种多样的Callback,同一个Stream上的所有Callback都由同一个自定义线程来执行。

aclError aclrtSubscribeReport(uint64_t threadId, aclrtStream stream)

这个接口就是用于给特定Stream注册回调函数处理线程的,参数表应该很好理解,这里就不赘述了,但是要注意以下几个要点:

  • 支持多次调用aclrtSubscribeReport接口给多个Stream(仅支持同一Device内的多个Stream)注册同一个处理回调函数的线程;
  • 为确保Stream内的任务按调用顺序执行,不支持调用aclrtSubscribeReport接口给同一个Stream注册多个处理回调函数的线程;
  • 单进程内调用aclrtSubscribeReport接口注册的线程数量如果超过128个,则接口返回失败;
  • 考虑操作系统的线程切换性能开销,建议调用aclrtSubscribeReport接口注册的线程数量控制在32个以下(包括32);
  • 同一个进程内,在不同的Device上注册回调函数的线程时,不能指定同一个线程ID;
要件四:向Stream提交Callback任务

前边的三个编程要件都是准备工作,第四个要件才是执行Callback任务。Callback函数也可以像异步内存拷贝、异步推理一样,被当做任务送进Stream中。接口如下:

aclError aclrtLaunchCallback(aclrtCallback fn, void *userData, aclrtCallbackBlockType blockType, aclrtStream stream)

再强调一次,这个接口是“把Callback当做一个任务送进Stream”,而不是在launchCallback这个点就直接执行Callback。
解释一下这个函数的参数表:

  • fn:就是我们想要执行的Callback函数了,函数指针,指向“要件一”中创建的那个Callback
  • userData:顾名思义,用户定义的数据,想传啥传啥,这个userData在将来Callback真的被调用的时候,是会被传进Callback中的。举个典型的场景,把异步推理的output当做userData传进Callback,由Callback来处理推理结果
  • blockType:阻塞类型,即执行Callback的时候是否阻塞Stream的执行。当前只有“ACL_CALLBACK_BLOCK”选项可用,执行Callback会阻塞Stream。
  • stream:这个应该不用过多解释了,想把Callback送进哪个Stream?

总结一下Callback场景涉及的4个编程要件:

  • 准备一个Callback函数
  • 准备一个用于执行Callback函数的线程
  • 把这个线程注册到Stream上
  • 向Stream提交Callback任务

观察下边的代码,运行一下看看效果:

int32_t deviceId_ = 0;
uint32_t modelId = 0;
aclrtStream stream_;
pthread_t threadId_;
static aclrtContext context_;
size_t pictureDataSize = 0;
void *pictureHostData = nullptr;
void *pictureDeviceData = nullptr;
aclmdlDataset *inputDataSet = nullptr;
aclDataBuffer *inputDataBuffer = nullptr;
aclmdlDataset *outputDataSet = nullptr;
aclDataBuffer *outputDataBuffer = nullptr;
aclmdlDesc *modelDesc = nullptr;
size_t outputDataSize = 0;
void *outputDeviceData = nullptr;
void *outputHostData = nullptr;
static bool g_isExit = false;
void ReadPictureTotHost(const char *picturePath)
{
    string fileName = picturePath;
    ifstream binFile(fileName, ifstream::binary);
    binFile.seekg(0, binFile.end);
    pictureDataSize = binFile.tellg();
    binFile.seekg(0, binFile.beg);
    aclError ret = aclrtMallocHost(&pictureHostData, pictureDataSize);
    binFile.read((char*)pictureHostData, pictureDataSize);
    binFile.close();
    
    INFO_LOG("ReadPictureTotHost !");
}
void PrintResult()
{
    aclError ret = aclrtMallocHost(&outputHostData, outputDataSize);
    ret = aclrtMemcpy(outputHostData, outputDataSize, outputDeviceData, outputDataSize, ACL_MEMCPY_DEVICE_TO_HOST);
    float* outFloatData = reinterpret_cast<float *>(outputHostData);
    
    map<float, unsigned int, greater<float>> resultMap;
    
    for (unsigned int j = 0; j < outputDataSize / sizeof(float);++j)
    {
        resultMap[*outFloatData] = j;
        outFloatData++;
    }
    
    int cnt = 0;
    for (auto it = resultMap.begin();it != resultMap.end();++it)
    {
        if(++cnt > 5)
        {
            break;
        }
        INFO_LOG("Top %d: index[%d] value[%lf] ", cnt, it->second, it->first);
    }
}
void CopyDataFromHostToDevice()
{
    aclError ret = aclrtMalloc(&pictureDeviceData, pictureDataSize, ACL_MEM_MALLOC_HUGE_FIRST);
    ret = aclrtMemcpy(pictureDeviceData, pictureDataSize, pictureHostData, pictureDataSize, ACL_MEMCPY_HOST_TO_DEVICE);
    
    INFO_LOG("CopyDataFromHostToDevice!");
}
void CreateModelInput()
{
    inputDataSet = aclmdlCreateDataset();
    inputDataBuffer = aclCreateDataBuffer(pictureDeviceData, pictureDataSize);
    aclError ret = aclmdlAddDatasetBuffer(inputDataSet, inputDataBuffer);
    
    INFO_LOG("CreateModelInput!");
}
void CreateModelOutput()
{
    modelDesc =  aclmdlCreateDesc();
    aclError ret = aclmdlGetDesc(modelDesc, modelId);
    outputDataSet = aclmdlCreateDataset();
    outputDataSize = aclmdlGetOutputSizeByIndex(modelDesc, 0);
    ret = aclrtMalloc(&outputDeviceData, outputDataSize, ACL_MEM_MALLOC_HUGE_FIRST);
    outputDataBuffer = aclCreateDataBuffer(outputDeviceData, outputDataSize);
    ret = aclmdlAddDatasetBuffer(outputDataSet, outputDataBuffer);
    
    INFO_LOG("CreateModelOutput!");
}
void LoadPicture(const char* picturePath)
{
    ReadPictureTotHost(picturePath);
    CopyDataFromHostToDevice();
    CreateModelInput();
    CreateModelOutput();
    INFO_LOG("LoadPicture!");
}
void CallBackFunc(void *arg)
{
    aclmdlDataset *output = (aclmdlDataset *)arg;
    INFO_LOG("CallBackFunc!");
    PrintResult();

}
void ExecuteAsync()
{
    aclError ret = aclmdlExecuteAsync(modelId, inputDataSet, outputDataSet, stream_);

    /* launch callback is to process all output data of model async execute */
    ret = aclrtLaunchCallback(CallBackFunc, (void *)outputDataSet, ACL_CALLBACK_BLOCK, stream_);    
    INFO_LOG("AclmdlExecuteAsync success!");
    return ;
}
void UnloadPicture()
{
    aclError ret = aclrtFreeHost(pictureHostData);
    pictureHostData = nullptr;
    ret = aclrtFree(pictureDeviceData);
    pictureDeviceData = nullptr;
    aclDestroyDataBuffer(inputDataBuffer);
    inputDataBuffer = nullptr;
    aclmdlDestroyDataset(inputDataSet);
    inputDataSet = nullptr;

    ret = aclrtFreeHost(outputHostData);
    outputHostData = nullptr;
    ret = aclrtFree(outputDeviceData);
    outputDeviceData = nullptr;
    aclDestroyDataBuffer(outputDataBuffer);
    outputDataBuffer = nullptr;
    aclmdlDestroyDataset(outputDataSet);
    outputDataSet = nullptr;
    INFO_LOG("UnloadPicture success!");
}
void DestroyResource()
{
    aclError ret = aclrtResetDevice(deviceId_);
    aclFinalize();
    INFO_LOG("DestroyResource success!");
}
void *ProcessCallback(void *arg)
{
    aclrtSetCurrentContext(context_);
    while (1) {
        // timeout value is 100ms
        (void)aclrtProcessReport(100);
        if(*(static_cast<bool *>(arg)) == true) {
            return nullptr;
        }
    }
}
int testmain()
{    
    const char *picturePath = "./src/dog1_1024_683.bin";
    const char *modelPath = "./src/resnet50.om";
 
    /* 1-Init Resource */    
    aclError ret = aclInit(nullptr);
    ret = aclrtSetDevice(deviceId_);    
    ret = aclrtCreateContext(&context_, deviceId_);
    INFO_LOG("Create context success!");    
    ret = aclrtCreateStream(&stream_);
    INFO_LOG("Create stream success!");
    
    /* 2-Load Model */
    ret = aclmdlLoadFromFile(modelPath, &modelId);
    INFO_LOG("LoadModel success!");
    
    /* 3-Load Picture */
    LoadPicture(picturePath);
    
    /* 4-aclrtSubscribeReport */
    g_isExit = false;    
    pthread_create(&threadId_, nullptr, ProcessCallback, &g_isExit);
    (void)aclrtSubscribeReport(static_cast<uint64_t>(threadId_), stream_);
    INFO_LOG("Subscribe report success!");
    
    /* 5-ExecuteAsync */
    ExecuteAsync();
    ret = aclrtSynchronizeStream(stream_);
    INFO_LOG("Model execute success !");
    g_isExit = true;

    /* 6-unsubscribe report */
    aclrtUnSubscribeReport(static_cast<uint64_t>(threadId_), stream_);
    INFO_LOG("Unsubscribe report success !");
   
    /* 7-release resource*/       
    aclmdlDestroyDesc(modelDesc);
    aclmdlUnload(modelId);    
    INFO_LOG("UnloadModel success!");
    UnloadPicture();
    DestroyResource();
    return 0;
}
testmain();

最终结果展示:

image.png

结语

再来回顾一下:

同步异步一共涉及三个典型场景:

  1. 多Device场景
  2. 多Stream场景
  3. Callback场景

其中,最复杂的是Callback场景,涉及的编程要件比较多,但是灵活使用会很好的增强你的推理应用的性能和灵活性、可扩展性,对性能有追求的同学建议仔细阅读学习。如有意见和建议可以到昇腾社区提问。

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。