ModelArts/ModelBox开发案例 - 驾驶行为检测【玩转华为云】

举报
HouYanSong 发表于 2023/07/26 16:59:55 2023/07/26
【摘要】 本文将使用ModelArts从零开始训练模型,并使用ModelBox开发一个驾驶行为的检测的AI应用,实现正常驾驶、闭眼、打哈欠、打电话、左顾右盼等5种行为的识别。

驾驶行为检测【玩转华为云】

本文将使用ModelArts从零开始训练模型,并使用ModelBox开发一个驾驶行为的检测的AI应用,实现正常驾驶、闭眼、打哈欠、打电话、左顾右盼等5种行为的识别,最终效果如下:

 

本案例所需资源(代码、模型、测试数据等)均可从网盘链接下载。

ModelArts模型训练

我们使用疲劳/分神样例数据集进行标注用于模型训练:

首先训练一个轻量化的人脸手机检测模型,训练步骤可以参考我发布的Notebook

训练结束后可以在Tensorboard中查看训练结果:

这里我设置模型训练步数为60k,每10k学习率衰减至原来的95%:

之后再训练一个人脸关键点检测模型,使用之前训练好的SSD目标检测模型以及OpenCV人脸检测库进行图像的自动标注,具体可以参考我发布的Notebook

运行效果如下:

手工进行数据清洗并使用Mediapipe对嘴部的标注进行优化,最终得到人脸关键点检测数据集V2

人脸关键点检测模型采用MobileNetV2作为backbone,检测人脸并对关键点进行回归,代码实现可以参考我发布的Notebook

使用原生onnxruntime进行推理,视频检测的帧率在9~10左右:

视频检测代码如下:

"""
OpenCV 读取摄像头视频视频流,使用原生的onnxruntime推理
"""

import cv2
import glob
import time
import numpy as np
import onnxruntime
from scipy.spatial import distance as dist
from PIL import Image, ImageDraw, ImageFont

video_path = glob.glob('videos/*.mp4')
video_names = [name.split('\\')[-1].split('.')[0] for name in video_path]

video_name = video_names[6]
print(video_name)

def image_preprocess_ssdlite(img):
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    img = cv2.resize(img, (300, 300))
    return img

def image_preprocess_mbv2(img):
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    img = cv2.resize(img, (224, 224))
    img = img.astype(np.float32)
    img = img/255
    return img

def get_max_roi_head(bboxes, img_data, scale=1):
        """找出roi最大的头部检测框"""
        max_bbox = max(bboxes, key = lambda x: (x[2] - x[0]) * (x[3] - x[1]))

        # 原始检测框数据归一化到[0,1],此处需还原到原图中的坐标
        img_h, img_w, _ = img_data.shape
        x1, y1, x2, y2 = max_bbox    
        x1 = int(x1 * img_w)
        y1 = int(y1 * img_h)
        x2 = int(x2 * img_w)
        y2 = int(y2 * img_h)
       
        # 检测框进行一定的外扩
        w_ = max(abs(x2 - x1), abs(y2 - y1))
        w_ = w_ * scale
        x_mid = (x1 + x2) / 2
        y_mid = (y1 + y2) / 2
        new_x1 = max(0, int(x_mid - w_ / 2))
        new_y1 = max(0, int(y_mid - w_ / 2))
        new_x2 = min(int(x_mid + w_ / 2), img_w)
        new_y2 = min(int(y_mid + w_ / 2), img_h)
        new_bbox = [new_x1, new_y1, new_x2, new_y2]
        return new_bbox

def get_max_roi_phone(bboxes, img_data, scale=1):
        """找出roi最大的手机检测框"""
        max_bbox = max(bboxes, key = lambda x: (x[2] - x[0]) * (x[3] - x[1]))

        # 原始检测框数据归一化到[0,1],此处需还原到原图中的坐标
        img_h, img_w, _ = img_data.shape
        x1, y1, x2, y2 = max_bbox    
        x1 = int(x1 * img_w)
        y1 = int(y1 * img_h)
        x2 = int(x2 * img_w)
        y2 = int(y2 * img_h)
        new_bbox = [x1, y1, x2, y2]
        return new_bbox

def cover_ratio(box1, box2):
    '''计算两个矩形框的IOU与box2区域的比值'''
    def _overlap(x1, x2, x3, x4):
        left = max(x1, x3)
        right = min(x2, x4)
        return right - left

    w = _overlap(box1[0], box1[2], box2[0], box2[2])
    h = _overlap(box1[1], box1[3], box2[1], box2[3])
    if w <= 0 or h <= 0:
        return 0
    inter_area = w * h
    small_area = (box2[2] - box2[0]) * (box2[3] - box2[1])
    return inter_area * 1.0 / small_area

def eye_aspect_ratio(eye):
    '''计算出EAR'''
    # compute the euclidean distances between the two sets of
    # vertical eye landmarks (x, y)-coordinates
    A = dist.euclidean(eye[1], eye[5])
    B = dist.euclidean(eye[2], eye[4])

    # compute the euclidean distance between the horizontal
    # eye landmark (x, y)-coordinates
    C = dist.euclidean(eye[0], eye[3])

    # compute the eye aspect ratio
    ear = (A + B) / (2.0 * C)

    # return the eye aspect ratio
    return ear

# 绘制中文
def cv2AddChineseText(img, text, position, textColor=(0, 255, 0), textSize=30):
    img = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
    draw = ImageDraw.Draw(img)
    # 字体的格式
    fontStyle = ImageFont.truetype(
        "./font/simsun.ttc", textSize, encoding="utf-8")
    # 绘制文本
    draw.text(position, text, textColor, font=fontStyle)
    # 转换回OpenCV格式
    return cv2.cvtColor(np.asarray(img), cv2.COLOR_RGB2BGR)

cap = cv2.VideoCapture('videos/'+video_name+'.mp4')

if not cap.isOpened():
    print('文件不存在或编码错误')
else:
    fps = 30
    font = cv2.FONT_HERSHEY_PLAIN
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    mbv2 = onnxruntime.InferenceSession('mbv2_224x224.onnx')
    ssdlite = onnxruntime.InferenceSession('ssdlite_mbv2.onnx')
    writer = cv2.VideoWriter(video_name + '_result.mp4',cv2.VideoWriter_fourcc(*'X264'),fps,(width,height))
# -----------------------------------------------------------------------------------------------------------
EAR_THRESH = 0.2
queue_eye = [0] * 10
queue_phone = [0] * 10
queue_mouth = [0] * 10
queue_look_ground = [0] * 10
# -----------------------------------------------------------------------------------------------------------
start_time = time.time()  
while cap.isOpened():
    ret,frame = cap.read()
    if ret:
        # ---------------------------------------------------------------------------------------------------
        queue_eye.pop(0)
        queue_phone.pop(0)
        queue_mouth.pop(0)
        queue_look_ground.pop(0)
        # ---------------------------------------------------------------------------------------------------
        image = frame.copy()
        img = image_preprocess_ssdlite(image)
        data = np.expand_dims(img, axis=0)
        ssdlite_input ={ssdlite.get_inputs()[0].name: data}
        detection_boxes, detection_classes, _, detection_scores, _, _, _ = ssdlite.run(None, ssdlite_input)
        head_bboxes = []
        phone_bboxes = []
        for i in range(100):
            if detection_scores[0][i] > 0.5:
                ymin, xmin, ymax, xmax = detection_boxes[0][i][0], detection_boxes[0][i][1], detection_boxes[0][i][2], detection_boxes[0][i][3]
                if detection_classes[0][i] == 1:
                    head_bbox = [xmin, ymin, xmax, ymax]
                    head_bboxes.append(head_bbox)
                else:
                    phone_bbox = [xmin, ymin, xmax, ymax]
                    phone_bboxes.append(phone_bbox)
        max_roi_head = []
        max_roi_phone = []
        if head_bboxes:
            max_roi_head = get_max_roi_head(head_bboxes, image)
            xmin, ymin, xmax, ymax = max_roi_head[0], max_roi_head[1], max_roi_head[2], max_roi_head[3]
            cv2.rectangle(img=frame,pt1=(xmin,ymin),pt2=(xmax,ymax),color=(0,255,0),thickness=10)
        if phone_bboxes:
            max_roi_phone = get_max_roi_phone(phone_bboxes, image)
            xmin, ymin, xmax, ymax = max_roi_phone[0], max_roi_phone[1], max_roi_phone[2], max_roi_phone[3]
            cv2.rectangle(img=frame,pt1=(xmin,ymin),pt2=(xmax,ymax),color=(0,0,255),thickness=10)
        if max_roi_head:
            xmin, ymin, xmax, ymax = max_roi_head[0], max_roi_head[1], max_roi_head[2], max_roi_head[3]
            image_crop = image[ymin:ymax,xmin:xmax]
            h, w = image_crop.shape[0], image_crop.shape[1]
            img = image_preprocess_mbv2(image_crop)
            data = np.expand_dims(img, axis=0)
            onnx_input ={mbv2.get_inputs()[0].name: data}
            score, label = mbv2.run(None, onnx_input)
            confidence = (score[0][0]*100).round(2)
            points = []
            if score[0][0]==1:
                queue_look_ground.append(0)
                for x, y in label[0].reshape(-1, 2):
                    x = x * w
                    y = y * h
                    points.append(int(x)+xmin)
                    points.append(int(y)+ymin)
            else:
                queue_look_ground.append(1)
            if points:
                keypoint_features = np.array(points)
                eye_l = keypoint_features[:12].reshape((-1,1,2))
                eye_r = keypoint_features[12:24].reshape((-1,1,2))
                mouth = keypoint_features[24:36].reshape((-1,1,2))
                l_ear = eye_aspect_ratio(keypoint_features[:12].reshape(-1, 2))
                r_ear = eye_aspect_ratio(keypoint_features[12:24].reshape(-1, 2))
                m_ear = eye_aspect_ratio(keypoint_features[24:36].reshape(-1, 2))
                if l_ear<EAR_THRESH and l_ear<EAR_THRESH:
                    queue_eye.append(1)
                else:
                    queue_eye.append(0)
                if m_ear>EAR_THRESH:
                    queue_mouth.append(1)
                else:
                    queue_mouth.append(0)
                cv2.polylines(img=frame,pts=[eye_l],isClosed=True,color=(255,255,0),thickness=2)
                cv2.polylines(img=frame,pts=[eye_r],isClosed=True,color=(255,255,0),thickness=2)
                cv2.polylines(img=frame,pts=[mouth],isClosed=True,color=(255,255,0),thickness=2)
                cv2.putText(img=frame,text='eye_l_ear:'+str(l_ear.round(2)),org=(xmin+20,ymin+60),fontFace=font,fontScale=4,color=(0,255,0),thickness=4,lineType=cv2.LINE_AA)
                cv2.putText(img=frame,text='eye_r_ear:'+str(r_ear.round(2)),org=(xmin+20,ymin+120),fontFace=font,fontScale=4,color=(0,255,0),thickness=4,lineType=cv2.LINE_AA)
                cv2.putText(img=frame,text='mouth_ear:'+str(m_ear.round(2)),org=(xmin+20,ymin+180),fontFace=font,fontScale=4,color=(0,255,0),thickness=4,lineType=cv2.LINE_AA)
                index = 0
                for x, y in keypoint_features.reshape(-1, 2):
                    index+=1
                    cv2.circle(frame,(x,y),3,(0,255,0),-1)
                    cv2.putText(img=frame,text=str(index),org=(x,y),fontFace=font,fontScale=1,color=(255,255,0),thickness=1,lineType=cv2.LINE_AA)
            else:
                queue_eye.append(0)
                queue_mouth.append(0)
            cv2.putText(img=frame,text='has_face:'+str(confidence)+'%',org=(xmin,ymin-10),fontFace=font,fontScale=5,color=(0,255,0),thickness=5,lineType=cv2.LINE_AA)
            if max_roi_phone:
                ratio = cover_ratio(max_roi_head, max_roi_phone)
                if ratio>0.5:
                    queue_phone.append(1)
                else:
                    queue_phone.append(0)
            else:
                queue_phone.append(0)
        else:
        # ---------------------------------------------------------------------------------------------------
            queue_eye.append(0)
            queue_phone.append(0)
            queue_mouth.append(0)
            queue_look_ground.append(0)
        # ---------------------------------------------------------------------------------------------------
        # 计算FPS
        now = time.time()
        fps_text  = int(1 / ( now - start_time))
        start_time = now
        yawn_score = sum(queue_mouth)/len(queue_mouth)
        phone_score = sum(queue_phone)/len(queue_phone)
        sleeping_score = sum(queue_eye)/len(queue_eye)
        look_ground_score = sum(queue_look_ground)/len(queue_look_ground)
        name_list = ['打哈欠','打电话','打瞌睡','左顾右盼']
        score_list = [yawn_score, phone_score, sleeping_score, look_ground_score]
        queue_list = [queue_mouth, queue_phone, queue_eye, queue_look_ground]
        sorted_id = sorted(range(len(score_list)), key=lambda k: score_list[k], reverse=True)
        max_score_id = np.argmax(score_list)
        frame = cv2AddChineseText(frame, 'ssdlite_mbv2使用原生onnxruntime推理:'+str(fps_text)+'fps', (20,50), textColor=(0, 255, 0), textSize=40)
        frame = cv2AddChineseText(frame, 'AI练习生原创作品', (600,200), textColor=(0, 255, 255), textSize=50)
        if score_list[max_score_id]>0.9:
            frame = cv2AddChineseText(frame, '驾驶行为:'+name_list[max_score_id], (400,400), textColor=(0, 255, 255), textSize=50)
        else:
            frame = cv2AddChineseText(frame, '驾驶行为:'+'正常驾驶', (400,400), textColor=(0, 255, 255), textSize=50)
       
        frame = cv2AddChineseText(frame, name_list[sorted_id[0]]+str(queue_list[sorted_id[0]])+':'+str(np.array(score_list[sorted_id[0]]*100).round(2))+'%', (20,600), textColor=(0, 255, 255), textSize=40)
        frame = cv2AddChineseText(frame, name_list[sorted_id[1]]+str(queue_list[sorted_id[1]])+':'+str(np.array(score_list[sorted_id[1]]*100).round(2))+'%', (20,700), textColor=(0, 255, 0), textSize=40)
        frame = cv2AddChineseText(frame, name_list[sorted_id[2]]+str(queue_list[sorted_id[2]])+':'+str(np.array(score_list[sorted_id[2]]*100).round(2))+'%', (20,800), textColor=(0, 255, 0), textSize=40)
        frame = cv2AddChineseText(frame, name_list[sorted_id[3]]+str(queue_list[sorted_id[3]])+':'+str(np.array(score_list[sorted_id[3]]*100).round(2))+'%', (20,900), textColor=(0, 255, 0), textSize=40)
        print('ssdlite_mbv2_post')
        # cv2.imshow('demo', frame)
        writer.write(frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
               break
    else:
        break
cap.release()
cv2.destroyAllWindows()

ModelBox应用开发

通过以下章节您将大致了解ModelBox AI应用的开发流程:

1、查看流程图

test.jpg

可以看到我们的流程图有两个推理功能单元,分别是ssd_infer和mbv2_infer。其中ssd_infer使用原生onnxruntime进行推理,主要是由于ssdlite_mbv2.onnx目标检测模型是由TensorFlow 1.x训练转换导出,模型解析存在问题。mbv2_224x224.onnx多输出模型是由TensorFlow 2.x训练导出,可以使用ModelBox内置的推理功能单元进行推理。

2、查看推理功能单元

由于ssdlite_mbv2.onnx使用原生onnxruntime进行推理,需要编写模型的推理代码,首先查看模型的配置文件:

# Copyright (c) Huawei Technologies Co., Ltd. 2022. All rights reserved.

# Basic config
[base]
name = "ssd_infer" # The FlowUnit name
device = "cpu" # The flowunit runs on cpu
version = "1.0.0" # The version of the flowunit
type = "python" # Fixed value, do not change
description = "description" # The description of the flowunit
entry = "ssd_infer@ssd_inferFlowUnit" # Python flowunit entry function
group_type = "Generic"  # flowunit group attribution, change as Input/Output/Image/Generic ...

# Flowunit Type
stream = false # Whether the flowunit is a stream flowunit
condition = false # Whether the flowunit is a condition flowunit
collapse = false # Whether the flowunit is a collapse flowunit
collapse_all = false # Whether the flowunit will collapse all the data
expand = false #  Whether the flowunit is a expand flowunit

# The default Flowunit config
[config]
net_h = 300
net_w = 300
# Input ports description
[input]
[input.input1] # Input port number, the format is input.input[N]
name = "Input" # Input port name
type = "uint8" # Input port type

# Output ports description
[output]
[output.output1] # Output port number, the format is output.output[N]
name = "Output1" # Output port name
type = "float" # Output port type

[output.output2] # Output port number, the format is output.output[N]
name = "Output2" # Output port name
type = "float" # Output port type

[output.output3] # Output port number, the format is output.output[N]
name = "Output3" # Output port name
type = "float" # Output port type

我们的模型输入300x300大小的unit8图像,输出7个float类型的张量,我们只需要其中的检测框、置信度和物体类别,模型推理代码如下:

# Copyright (c) Huawei Technologies Co., Ltd. 2022. All rights reserved.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import _flowunit as modelbox
import numpy as np
import onnxruntime

class ssd_inferFlowUnit(modelbox.FlowUnit):
    # Derived from modelbox.FlowUnit
    def __init__(self):
        super().__init__()
        self.ssdlite = onnxruntime.InferenceSession('model/ssdlite_mbv2.onnx')

    def open(self, config):
        # Open the flowunit to obtain configuration information
        self.net_h = config.get_int('net_h', 300)
        self.net_w = config.get_int('net_w', 300)
        return modelbox.Status.StatusCode.STATUS_SUCCESS

    def process(self, data_context):
        # Process the data
        in_data = data_context.input("Input")
        out_data1 = data_context.output("Output1")
        out_data2 = data_context.output("Output2")
        out_data3 = data_context.output("Output3")

        # ssd_infer process code.
        # Remove the following code and add your own code here.
        for buffer in in_data:
            # 将输入Buffer转换为numpy对象
            buffer = np.array(buffer.as_object(), copy=False)
            img = buffer.reshape((self.net_h, self.net_w, 3))
            data = np.expand_dims(img, axis=0)
            ssdlite_input ={self.ssdlite.get_inputs()[0].name: data}
            detection_boxes, detection_classes, _, detection_scores, _, _, _ = self.ssdlite.run(None, ssdlite_input)
            # 将业务处理返回的结果数据转换为Buffer
            boxes = modelbox.Buffer(self.get_bind_device(), detection_boxes)
            scores = modelbox.Buffer(self.get_bind_device(), detection_scores)
            classes = modelbox.Buffer(self.get_bind_device(), detection_classes)
            out_data1.push_back(boxes)
            out_data2.push_back(classes)
            out_data3.push_back(scores)

        return modelbox.Status.StatusCode.STATUS_SUCCESS

    def close(self):
        # Close the flowunit
        return modelbox.Status()

    def data_pre(self, data_context):
        # Before streaming data starts
        return modelbox.Status()

    def data_post(self, data_context):
        # After streaming data ends
        return modelbox.Status()

    def data_group_pre(self, data_context):
        # Before all streaming data starts
        return modelbox.Status()

    def data_group_post(self, data_context):
        # After all streaming data ends
        return modelbox.Status()

mbv2_224x224.onnx使用ModelBox内置推理功能单元进行推理,有1个输入和2个输出,模型配置文件编辑如下,无需编写推理代码。

# Copyright (C) 2020 Huawei Technologies Co., Ltd. All rights reserved.

[base]
name = "mbv2_infer"
device = "cpu"
version = "1.0.0"
description = "your description"
entry = "./mbv2_224x224.onnx"  # model file path, use relative path
type = "inference"
virtual_type = "onnx" # inference engine type: win10 now only support onnx
group_type = "Inference"  # flowunit group attribution, do not change

# Input ports description
[input]
[input.input1]  # input port number, Format is input.input[N]
name = "Input"  # input port name
type = "float"  # input port data type ,e.g. float or uint8
device = "cpu"  # input buffer type: cpu, win10 now copy input from cpu

# Output ports description
[output]
[output.output1] # output port number, Format is output.output[N]
name = "Output1"  # output port name
type = "float"   # output port data type ,e.g. float or uint8

[output.output2] # output port number, Format is output.output[N]
name = "Output2"  # output port name
type = "float"   # output port data type ,e.g. float or uint8

3、查看条件功能单元

a、设置条件功能单元属性:

# Copyright (c) Huawei Technologies Co., Ltd. 2022. All rights reserved.

# Basic config
[base]
name = "head_condition" # The FlowUnit name
device = "cpu" # The flowunit runs on cpu
version = "1.0.0" # The version of the flowunit
type = "python" # Fixed value, do not change
description = "description" # The description of the flowunit
entry = "head_condition@head_conditionFlowUnit" # Python flowunit entry function
group_type = "Generic"  # flowunit group attribution, change as Input/Output/Image/Generic ...

# Flowunit Type
stream = false # Whether the flowunit is a stream flowunit
condition = true # Whether the flowunit is a condition flowunit
collapse = false # Whether the flowunit is a collapse flowunit
collapse_all = false # Whether the flowunit will collapse all the data
expand = false #  Whether the flowunit is a expand flowunit

# The default Flowunit config
[config]
item = "value"

# Input ports description
[input]
[input.input1] # 输入数据1
name = "in_image" # 原图
type = "uint8"    # 原图数据格式为 uint8

[input.input2] # 输入数据2
name = "in_bbox" # 头部和手机检测框
type = "string"  # 检测框数据格式为json字符串

# Output ports description
[output]
[output.output1] # 检测到头部时的输出数据
name = "has_head" # 原图 + 头部和手机检测框(检测框以属性方式附加在原图上)
type = "uint8"    # 原图数据格式为 uint8

[output.output2] # 未检测到头部时的输出数据
name = "no_head" # 原图
type = "uint8"   # 原图数据格式为 uint8

与通用功能单元不同的是,condition属性设置为true,且一定有多个输出端口,head_condition条件功能单元有两个输出端口has_head和no_head,分别代表检测到头部和未检测到头部的两个分支,下图展示了条件功能单元的数据流图:

b、实现条件功能单元接口:

条件功能单元process接口逻辑与通用功能单元基本一致,只是在最后输出Buffer上有所不同,逻辑代码如下:

# Copyright (c) Huawei Technologies Co., Ltd. 2022. All rights reserved.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import _flowunit as modelbox
import numpy as np
import json

class head_conditionFlowUnit(modelbox.FlowUnit):
    # Derived from modelbox.FlowUnit
    def __init__(self):
        super().__init__()

    def open(self, config):
        # Open the flowunit to obtain configuration information
        return modelbox.Status.StatusCode.STATUS_SUCCESS

    def process(self, data_context):
        # Process the data
        in_image = data_context.input("in_image")
        in_bbox = data_context.input("in_bbox")

        has_head = data_context.output("has_head")
        no_head = data_context.output("no_head")

        # face_condition process code.
        # Remove the following code and add your own code here.
        # 循环处理每一个输入Buffer数据(实际上条件功能单元的batch size为1,此处循环中只有1条数据)
        for buffer_img, buffer_bbox in zip(in_image, in_bbox):
            # 获取输入Buffer的属性信息
            width = buffer_img.get('width')
            height = buffer_img.get('height')
            channel = buffer_img.get('channel')

            # 将输入Buffer转换为numpy对象
            img_data = np.array(buffer_img.as_object(), copy=False)
            img_data = img_data.reshape((height, width, channel))

            # 字符串数据可以直接用as_object函数转换
            bbox_str = buffer_bbox.as_object()

            # 解码出头部检测框数据
            head_bboxes = self.decode_head_bboxes(bbox_str)
            # 解码出手机检测框数据
            phone_bboxes = self.decode_phone_bboxes(bbox_str)
            if phone_bboxes:
                max_roi = self.get_max_roi_phone(phone_bboxes, img_data)
                buffer_img.set("bbox_phone", max_roi)
            else:
                buffer_img.set("bbox_phone", 0)

            # 此处是将输入Buffer直接作为输出Buffer向后传递
            # 此时Buffer的Data、Meta等全部内容都将保留,无需构建Buffer、设置Meta
            if head_bboxes:  # 检测到头部时的输出分支
                max_roi = self.get_max_roi_head(head_bboxes, img_data)
                buffer_img.set("bbox", max_roi)   # 将头部检测框作为属性附在输出Buffer上
                has_head.push_back(buffer_img)
            else:            # 未检测到头部时的输出分支
                no_head.push_back(buffer_img)

        # 返回成功标志,ModelBox框架会将数据发送到后续的功能单元
        return modelbox.Status.StatusCode.STATUS_SUCCESS

    def decode_head_bboxes(self, bbox_str):
        """从json字符串中解码出头部检测框"""
        try:
            hand_labels = [1]  # 头部对应的类别号是 1
            det_result = json.loads(bbox_str)['det_result']
            if det_result == "None":
                return []
            bboxes = json.loads(det_result)
            hand_bboxes = list(filter(lambda x: int(x[5]) in hand_labels, bboxes))
        except Exception as ex:
            modelbox.error(str(ex))
            return []
        else:
            return hand_bboxes
       
    def decode_phone_bboxes(self, bbox_str):
        """从json字符串中解码出头部检测框"""
        try:
            phone_labels = [2]  # 手机对应的类别号是 1
            det_result = json.loads(bbox_str)['det_result']
            if det_result == "None":
                return []
            bboxes = json.loads(det_result)
            phone_bboxes = list(filter(lambda x: int(x[5]) in phone_labels, bboxes))
        except Exception as ex:
            modelbox.error(str(ex))
            return []
        else:
            return phone_bboxes
       
    def get_max_roi_head(self, bboxes, img_data, scale=1):
        """找出roi最大的头部检测框"""
        max_bbox = max(bboxes, key = lambda x: (x[2] - x[0]) * (x[3] - x[1]) * x[4])

        # 原始检测框数据归一化到[0,1],此处需还原到原图中的坐标
        img_h, img_w, _ = img_data.shape
        x1, y1, x2, y2, _, _ = max_bbox
        x1 = int(x1 * img_w)
        y1 = int(y1 * img_h)
        x2 = int(x2 * img_w)
        y2 = int(y2 * img_h)
       
        # 检测框进行一定的外扩
        w_ = max(abs(x2 - x1), abs(y2 - y1))
        w_ = w_ * scale
        x_mid = (x1 + x2) / 2
        y_mid = (y1 + y2) / 2
        new_x1 = max(0, int(x_mid - w_ / 2))
        new_y1 = max(0, int(y_mid - w_ / 2))
        new_x2 = min(int(x_mid + w_ / 2), img_w)
        new_y2 = min(int(y_mid + w_ / 2), img_h)
        new_bbox = [new_x1, new_y1, new_x2, new_y2]
        return new_bbox
   
    def get_max_roi_phone(self, bboxes, img_data):
        """找出roi最大的手机检测框"""
        max_bbox = max(bboxes, key = lambda x: (x[2] - x[0]) * (x[3] - x[1]) * x[4])

        # 原始检测框数据归一化到[0,1],此处需还原到原图中的坐标
        img_h, img_w, _ = img_data.shape
        x1, y1, x2, y2, _, _ = max_bbox
        x1 = int(x1 * img_w)
        y1 = int(y1 * img_h)
        x2 = int(x2 * img_w)
        y2 = int(y2 * img_h)
        new_bbox = [x1, y1, x2, y2]
        return new_bbox
   
    def close(self):
        # Close the flowunit
        return modelbox.Status()

    def data_pre(self, data_context):
        # Before streaming data starts
        return modelbox.Status()

    def data_post(self, data_context):
        # After streaming data ends
        return modelbox.Status()

    def data_group_pre(self, data_context):
        # Before all streaming data starts
        return modelbox.Status()

    def data_group_post(self, data_context):
        # After all streaming data ends
        return modelbox.Status()

4、查看绘图功能单元

绘图功能单元是一个通用功能单元,通用功能单元在处理完数据后,会产生一个或多个输出,后续功能单元在承接时,必须接收它的所有数据(当然可以接收后只处理其中一部分输出);另外,通用功能单元一次可以处理多个Buffer(即batch size>1),数据按batch size设置分成多组,多组数据并发处理,生成结果与输入一致,下图展示了通用功能单元的数据流图:

配置文件内容如下:

# Copyright (c) Huawei Technologies Co., Ltd. 2022. All rights reserved.

# Basic config
[base]
name = "ssd_mbv2_draw" # The FlowUnit name
device = "cpu" # The flowunit runs on cpu
version = "1.0.0" # The version of the flowunit
type = "python" # Fixed value, do not change
description = "description" # The description of the flowunit
entry = "ssd_mbv2_draw@ssd_mbv2_drawFlowUnit" # Python flowunit entry function
group_type = "Generic"  # flowunit group attribution, change as Input/Output/Image/Generic ...

# Flowunit Type
stream = false # Whether the flowunit is a stream flowunit
condition = false # Whether the flowunit is a condition flowunit
collapse = false # Whether the flowunit is a collapse flowunit
collapse_all = false # Whether the flowunit will collapse all the data
expand = false #  Whether the flowunit is a expand flowunit

# The default Flowunit config
[config]
item = "value"

# Input ports description
# 输入端口描述
[input]
[input.input1] # 输入数据1:原图 + 头部和手机检测框(检测框以属性方式附加在原图上)
name = "in_image"
type = "uint8"

[input.input2] # 输入数据2:转为json字符串格式的手部关键点数据
name = "in_landmarks"
type = "string"

# Output ports description
[output]
[output.output1] # Output port number, the format is output.output[N]
name = "out_image" # Output port name
type = "uint8" # Output port type

补充功能单元逻辑代码:

# Copyright (c) Huawei Technologies Co., Ltd. 2022. All rights reserved.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import _flowunit as modelbox
from scipy.spatial import distance as dist
from PIL import Image, ImageDraw, ImageFont
import numpy as np
import json
import cv2

class ssd_mbv2_drawFlowUnit(modelbox.FlowUnit):
    # Derived from modelbox.FlowUnit
    def __init__(self):
        super().__init__()
        self.EAR_THRESH = 0.2
        self.queue_eye = [0] * 10
        self.queue_phone = [0] * 10
        self.queue_mouth = [0] * 10
        self.queue_look_ground = [0] * 10
        self.font = cv2.FONT_HERSHEY_PLAIN

    def open(self, config):
        # Open the flowunit to obtain configuration information
        return modelbox.Status.StatusCode.STATUS_SUCCESS

    def process(self, data_context):
        # Process the data
        # 从DataContext中获取输入输出BufferList对象
        in_image = data_context.input("in_image")
        in_landmarks = data_context.input("in_landmarks")
        out_image = data_context.output("out_image")

        # ssd_mbv2_draw process code.
        # Remove the following code and add your own code here.
        # 循环处理每一个输入Buffer数据
        for buffer_img, buffer_landmarks in zip(in_image, in_landmarks):
            self.queue_eye.pop(0)
            self.queue_phone.pop(0)
            self.queue_mouth.pop(0)
            self.queue_look_ground.pop(0)
            # 获取输入图像Buffer的宽、高、通道数等属性信息
            width = buffer_img.get('width')
            height = buffer_img.get('height')
            channel = buffer_img.get('channel')

            # 将输入Buffer转换为numpy对象
            img_data = np.array(buffer_img.as_object(), dtype=np.uint8, copy=False)
            img_data = img_data.reshape(height, width, channel)

            max_bbox = buffer_img.get("bbox")
            phone_bbox = buffer_img.get("bbox_phone")
           
            # 将输入Buffer转换为字符串,从json字符串中解码出手部关键点数据
            landmarks_str = buffer_landmarks.as_object()
            landmarks = self.decode_landmarks(landmarks_str)
            confidence = landmarks[0]*100
            # 业务处理:将最大的人形检测框对应的手部关键点数据画在图上
            img_out = img_data.copy()
            xmin, ymin, xmax, ymax = max_bbox[0], max_bbox[1], max_bbox[2], max_bbox[3]
            cv2.rectangle(img_out,pt1=(xmin,ymin),pt2=(xmax,ymax),color=(0,255,0),thickness=10)
            cv2.putText(img_out,text='has_face:'+str(confidence)+'%',org=(xmin,ymin-10),fontFace=self.font,fontScale=5,color=(0,255,0),thickness=5,lineType=cv2.LINE_AA)

            if landmarks[0]==1:
                self.queue_look_ground.append(0)
                points = np.array(landmarks[1:])
                for x, y in points.reshape(-1, 2):
                    x = int(x*(xmax-xmin))
                    y = int(y*(ymax-ymin))
                    cv2.circle(img_out,(x+xmin,y+ymin),3,(0,255,0),-1)
                l_ear = self.eye_aspect_ratio(points[:12].reshape(-1, 2))
                r_ear = self.eye_aspect_ratio(points[12:24].reshape(-1, 2))
                m_ear = self.eye_aspect_ratio(points[24:36].reshape(-1, 2))
                if l_ear<self.EAR_THRESH and l_ear<self.EAR_THRESH:
                    self.queue_eye.append(1)
                else:
                    self.queue_eye.append(0)
                if m_ear>self.EAR_THRESH:
                    self.queue_mouth.append(1)
                else:
                    self.queue_mouth.append(0)
                cv2.putText(img_out,text='eye_l_ear:'+str(np.array(l_ear).round(2)),org=(xmin+20,ymin+60),fontFace=self.font,fontScale=4,color=(0,255,0),thickness=4,lineType=cv2.LINE_AA)
                cv2.putText(img_out,text='eye_r_ear:'+str(np.array(r_ear).round(2)),org=(xmin+20,ymin+120),fontFace=self.font,fontScale=4,color=(0,255,0),thickness=4,lineType=cv2.LINE_AA)
                cv2.putText(img_out,text='mouth_ear:'+str(np.array(m_ear).round(2)),org=(xmin+20,ymin+180),fontFace=self.font,fontScale=4,color=(0,255,0),thickness=4,lineType=cv2.LINE_AA)
            else:
                self.queue_eye.append(0)
                self.queue_mouth.append(0)
                self.queue_look_ground.append(1)
           
            if phone_bbox:
                x1, y1, x2, y2 = phone_bbox[0], phone_bbox[1], phone_bbox[2], phone_bbox[3]
                cv2.rectangle(img_out,pt1=(x1,y1),pt2=(x2,y2),color=(255,0,0),thickness=10)
                ratio = self.cover_ratio(max_bbox, phone_bbox)
                if ratio>0.5:
                    self.queue_phone.append(1)
                else:
                    self.queue_phone.append(0)
            else:
                self.queue_phone.append(0)

            yawn_score = sum(self.queue_mouth)/len(self.queue_mouth)
            phone_score = sum(self.queue_phone)/len(self.queue_phone)
            sleeping_score = sum(self.queue_eye)/len(self.queue_eye)
            look_ground_score = sum(self.queue_look_ground)/len(self.queue_look_ground)
            name_list = ['打哈欠','打电话','打瞌睡','左顾右盼']
            score_list = [yawn_score, phone_score, sleeping_score, look_ground_score]
            queue_list = [self.queue_mouth, self.queue_phone, self.queue_eye, self.queue_look_ground]
            sorted_id = sorted(range(len(score_list)), key=lambda k: score_list[k], reverse=True)
            max_score_id = np.argmax(score_list)

            img_out = self.cv2AddChineseText(img_out, 'AI练习生原创作品', (600,50), textColor=(0, 255, 0), textSize=50)
            if score_list[max_score_id]>0.9:
                img_out = self.cv2AddChineseText(img_out, '驾驶行为:'+name_list[max_score_id], (400,400), textColor=(0, 255, 0), textSize=50)
            else:
                img_out = self.cv2AddChineseText(img_out, '驾驶行为:'+'正常驾驶', (400,400), textColor=(0, 255, 0), textSize=50)

            img_out = self.cv2AddChineseText(img_out, name_list[sorted_id[0]]+str(queue_list[sorted_id[0]])+':'+str(np.array(score_list[sorted_id[0]]*100).round(2))+'%', (20,600), textColor=(0, 255, 0), textSize=40)
            img_out = self.cv2AddChineseText(img_out, name_list[sorted_id[1]]+str(queue_list[sorted_id[1]])+':'+str(np.array(score_list[sorted_id[1]]*100).round(2))+'%', (20,700), textColor=(0, 255, 0), textSize=40)
            img_out = self.cv2AddChineseText(img_out, name_list[sorted_id[2]]+str(queue_list[sorted_id[2]])+':'+str(np.array(score_list[sorted_id[2]]*100).round(2))+'%', (20,800), textColor=(0, 255, 0), textSize=40)
            img_out = self.cv2AddChineseText(img_out, name_list[sorted_id[3]]+str(queue_list[sorted_id[3]])+':'+str(np.array(score_list[sorted_id[3]]*100).round(2))+'%', (20,900), textColor=(0, 255, 0), textSize=40)

            # 将业务处理返回的结果数据转换为Buffer
            out_buffer = modelbox.Buffer(self.get_bind_device(), img_out)

            # 设置输出Buffer的Meta信息,此处直接拷贝输入Buffer的Meta信息
            out_buffer.copy_meta(buffer_img)

            # 将输出Buffer放入输出BufferList中
            out_image.push_back(out_buffer)

        # 返回成功标志,ModelBox框架会将数据发送到后续的功能单元
        return modelbox.Status.StatusCode.STATUS_SUCCESS

    def decode_landmarks(self, landmarks_str):
        """从json字符串中解码出人脸关键点数据"""
        try:
            landmarks_data = json.loads(landmarks_str)
            landmarks_list = json.loads(landmarks_data['landmarks_result'])
        except Exception as ex:
            modelbox.error(str(ex))
            return []
        else:
            return landmarks_list
       
    def cover_ratio(self, box1, box2):
        '''计算两个矩形框的IOU与box2区域的比值'''
        def _overlap(x1, x2, x3, x4):
            left = max(x1, x3)
            right = min(x2, x4)
            return right - left

        w = _overlap(box1[0], box1[2], box2[0], box2[2])
        h = _overlap(box1[1], box1[3], box2[1], box2[3])
        if w <= 0 or h <= 0:
            return 0
        inter_area = w * h
        small_area = (box2[2] - box2[0]) * (box2[3] - box2[1])
        return inter_area * 1.0 / small_area
       
    def eye_aspect_ratio(self, eye):
        '''计算出EAR'''
        # compute the euclidean distances between the two sets of
        # vertical eye landmarks (x, y)-coordinates
        A = dist.euclidean(eye[1], eye[5])
        B = dist.euclidean(eye[2], eye[4])

        # compute the euclidean distance between the horizontal
        # eye landmark (x, y)-coordinates
        C = dist.euclidean(eye[0], eye[3])

        # compute the eye aspect ratio
        ear = (A + B) / (2.0 * C)

        # return the eye aspect ratio
        return ear
   
    def cv2AddChineseText(self, img, text, position, textColor=(0, 255, 0), textSize=30):
        '''绘制中文'''
        img = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
        draw = ImageDraw.Draw(img)
        # 字体的格式
        fontStyle = ImageFont.truetype(
            "./font/simsun.ttc", textSize, encoding="utf-8")
        # 绘制文本
        draw.text(position, text, textColor, font=fontStyle)
        # 转换回OpenCV格式
        return cv2.cvtColor(np.asarray(img), cv2.COLOR_RGB2BGR)

    def close(self):
        # Close the flowunit
        return modelbox.Status()

    def data_pre(self, data_context):
        # Before streaming data starts
        return modelbox.Status()

    def data_post(self, data_context):
        # After streaming data ends
        return modelbox.Status()

    def data_group_pre(self, data_context):
        # Before all streaming data starts
        return modelbox.Status()

    def data_group_post(self, data_context):
        # After all streaming data ends
        return modelbox.Status()

其余功能单元实现与前面类似。

5、查看ModelBox流程图

# Copyright (C) 2020 Huawei Technologies Co., Ltd. All rights reserved.

[driver]
dir = ["${HILENS_APP_ROOT}/etc/flowunit",
"${HILENS_APP_ROOT}/etc/flowunit/cpp",
"${HILENS_APP_ROOT}/model",
"${HILENS_MB_SDK_PATH}/flowunit"]
skip-default = true
[profile]
profile=false
trace=false
dir="${HILENS_DATA_DIR}/mb_profile"
[graph]
format = "graphviz"
graphconf = """digraph Fatigue_driving_detection {
    node [shape=Mrecord]
    queue_size = 1
    batch_size = 1

    # 定义节点,即功能单元及其属性
    input1[type=input, flowunit=input, device=cpu, deviceid=0]
    data_source_parser[type=flowunit, flowunit=data_source_parser, device=cpu, deviceid=0]
    video_demuxer[type=flowunit, flowunit=video_demuxer, device=cpu, deviceid=0]
    video_decoder[type=flowunit, flowunit=video_decoder, device=cpu, deviceid=0, pix_fmt=rgb]
    image_resize[type=flowunit, flowunit=resize, device=cpu, deviceid=0, image_width=300, image_height=300]
    ssd_infer[type=flowunit, flowunit=ssd_infer, device=cpu, deviceid=0]
    ssd_post[type=flowunit, flowunit=ssd_post, device=cpu, deviceid=0]
    head_condition[type=flowunit, flowunit=head_condition, device=cpu, deviceid=0]
    crop_head_image[type=flowunit, flowunit=crop_head_image, device=cpu, deviceid=0]
    image_resize2[type=flowunit, flowunit=resize, device=cpu, deviceid=0, image_width=224, image_height=224]
    normalize[type=flowunit flowunit=normalize device=cpu deviceid="0" standard_deviation_inverse="0.003921568627450,0.003921568627450,0.003921568627450"]
    mbv2_infer[type=flowunit, flowunit=mbv2_infer, device=cpu, deviceid=0]
    landmarks_post[type=flowunit, flowunit=landmarks_post, device=cpu, deviceid=0]
    ssd_mbv2_draw[type=flowunit, flowunit=ssd_mbv2_draw, device=cpu, deviceid=0]
    video_out[type=flowunit, flowunit=video_out, device=cpu, deviceid=0]

    # 定义边,即功能间的数据传递关系
    input1:input -> data_source_parser:in_data
    data_source_parser:out_video_url -> video_demuxer:in_video_url
    video_demuxer:out_video_packet -> video_decoder:in_video_packet
    video_decoder:out_video_frame -> image_resize:in_image
    image_resize:out_image -> ssd_infer:Input
    ssd_infer:Output1 -> ssd_post:in_feat1
    ssd_infer:Output2 -> ssd_post:in_feat2
    ssd_infer:Output3 -> ssd_post:in_feat3
    ssd_post:out_feat -> head_condition:in_bbox
    video_decoder:out_video_frame -> head_condition:in_image
    head_condition:no_head -> video_out:in_video_frame
    head_condition:has_head -> crop_head_image:in_data
    crop_head_image:roi_image -> image_resize2:in_image
    image_resize2:out_image -> normalize:in_data
    normalize:out_data -> mbv2_infer:Input
    mbv2_infer:Output1 -> landmarks_post:in_feat1
    mbv2_infer:Output2 -> landmarks_post:in_feat2
    landmarks_post:out_data -> ssd_mbv2_draw:in_landmarks
    head_condition:has_head -> ssd_mbv2_draw:in_image
    ssd_mbv2_draw:out_image -> video_out:in_video_frame
}"""
[flow]
desc = "Fatigue_driving_detection run in modelbox-win10-x64"

可以看到条件功能单元head_condition的两个输出分别对接到不同的功能单元,在未检测到头部时,no_head分支直接对接到video_out进行视频编码;检测到头部时,has_head分支对接到crop_head_image以及之后的功能单元做人脸识别以及关键点检测。

6、运行应用

运行前需要安装onnxruntime、pillow以及scipy,切换到Fatigue_driving_detection目录下执行bin/main.bat运行应用,效果如下:

也可以更换其它视频进行测试。

案例小结

本案例从模型训练到应用开发全流程自主设计,实现了5种驾驶行为的实时识别,具备实际部署落地的可行性,关键点检测采用直接回归的方式精确度不高,后续将采用Heatmap的方式进一步提高识别的准确性。

参考文章:

ModelBox AI应用开发——条件功能单元

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。