驾驶行为检测【玩转华为云】
本文将使用ModelArts从零开始训练模型,并使用ModelBox开发一个驾驶行为的检测的AI应用,实现正常驾驶、闭眼、打哈欠、打电话、左顾右盼等5种行为的识别,最终效果如下:
本案例所需资源(代码、模型、测试数据等)均可从网盘链接下载。
ModelArts模型训练
我们使用疲劳/分神样例数据集进行标注用于模型训练:
首先训练一个轻量化的人脸手机检测模型,训练步骤可以参考我发布的Notebook:
训练结束后可以在Tensorboard中查看训练结果:
这里我设置模型训练步数为60k,每10k学习率衰减至原来的95%:
之后再训练一个人脸关键点检测模型,使用之前训练好的SSD目标检测模型以及OpenCV人脸检测库进行图像的自动标注,具体可以参考我发布的Notebook:
运行效果如下:
手工进行数据清洗并使用Mediapipe对嘴部的标注进行优化,最终得到人脸关键点检测数据集V2:
人脸关键点检测模型采用MobileNetV2作为backbone,检测人脸并对关键点进行回归,代码实现可以参考我发布的Notebook:
使用原生onnxruntime进行推理,视频检测的帧率在9~10左右:
视频检测代码如下:
"""
OpenCV 读取摄像头视频视频流,使用原生的onnxruntime推理
"""
import cv2
import glob
import time
import numpy as np
import onnxruntime
from scipy.spatial import distance as dist
from PIL import Image, ImageDraw, ImageFont
video_path = glob.glob('videos/*.mp4')
video_names = [name.split('\\')[-1].split('.')[0] for name in video_path]
video_name = video_names[6]
print(video_name)
def image_preprocess_ssdlite(img):
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = cv2.resize(img, (300, 300))
return img
def image_preprocess_mbv2(img):
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = cv2.resize(img, (224, 224))
img = img.astype(np.float32)
img = img/255
return img
def get_max_roi_head(bboxes, img_data, scale=1):
"""找出roi最大的头部检测框"""
max_bbox = max(bboxes, key = lambda x: (x[2] - x[0]) * (x[3] - x[1]))
# 原始检测框数据归一化到[0,1],此处需还原到原图中的坐标
img_h, img_w, _ = img_data.shape
x1, y1, x2, y2 = max_bbox
x1 = int(x1 * img_w)
y1 = int(y1 * img_h)
x2 = int(x2 * img_w)
y2 = int(y2 * img_h)
# 检测框进行一定的外扩
w_ = max(abs(x2 - x1), abs(y2 - y1))
w_ = w_ * scale
x_mid = (x1 + x2) / 2
y_mid = (y1 + y2) / 2
new_x1 = max(0, int(x_mid - w_ / 2))
new_y1 = max(0, int(y_mid - w_ / 2))
new_x2 = min(int(x_mid + w_ / 2), img_w)
new_y2 = min(int(y_mid + w_ / 2), img_h)
new_bbox = [new_x1, new_y1, new_x2, new_y2]
return new_bbox
def get_max_roi_phone(bboxes, img_data, scale=1):
"""找出roi最大的手机检测框"""
max_bbox = max(bboxes, key = lambda x: (x[2] - x[0]) * (x[3] - x[1]))
# 原始检测框数据归一化到[0,1],此处需还原到原图中的坐标
img_h, img_w, _ = img_data.shape
x1, y1, x2, y2 = max_bbox
x1 = int(x1 * img_w)
y1 = int(y1 * img_h)
x2 = int(x2 * img_w)
y2 = int(y2 * img_h)
new_bbox = [x1, y1, x2, y2]
return new_bbox
def cover_ratio(box1, box2):
'''计算两个矩形框的IOU与box2区域的比值'''
def _overlap(x1, x2, x3, x4):
left = max(x1, x3)
right = min(x2, x4)
return right - left
w = _overlap(box1[0], box1[2], box2[0], box2[2])
h = _overlap(box1[1], box1[3], box2[1], box2[3])
if w <= 0 or h <= 0:
return 0
inter_area = w * h
small_area = (box2[2] - box2[0]) * (box2[3] - box2[1])
return inter_area * 1.0 / small_area
def eye_aspect_ratio(eye):
'''计算出EAR'''
# compute the euclidean distances between the two sets of
# vertical eye landmarks (x, y)-coordinates
A = dist.euclidean(eye[1], eye[5])
B = dist.euclidean(eye[2], eye[4])
# compute the euclidean distance between the horizontal
# eye landmark (x, y)-coordinates
C = dist.euclidean(eye[0], eye[3])
# compute the eye aspect ratio
ear = (A + B) / (2.0 * C)
# return the eye aspect ratio
return ear
# 绘制中文
def cv2AddChineseText(img, text, position, textColor=(0, 255, 0), textSize=30):
img = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
draw = ImageDraw.Draw(img)
# 字体的格式
fontStyle = ImageFont.truetype(
"./font/simsun.ttc", textSize, encoding="utf-8")
# 绘制文本
draw.text(position, text, textColor, font=fontStyle)
# 转换回OpenCV格式
return cv2.cvtColor(np.asarray(img), cv2.COLOR_RGB2BGR)
cap = cv2.VideoCapture('videos/'+video_name+'.mp4')
if not cap.isOpened():
print('文件不存在或编码错误')
else:
fps = 30
font = cv2.FONT_HERSHEY_PLAIN
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
mbv2 = onnxruntime.InferenceSession('mbv2_224x224.onnx')
ssdlite = onnxruntime.InferenceSession('ssdlite_mbv2.onnx')
writer = cv2.VideoWriter(video_name + '_result.mp4',cv2.VideoWriter_fourcc(*'X264'),fps,(width,height))
# -----------------------------------------------------------------------------------------------------------
EAR_THRESH = 0.2
queue_eye = [0] * 10
queue_phone = [0] * 10
queue_mouth = [0] * 10
queue_look_ground = [0] * 10
# -----------------------------------------------------------------------------------------------------------
start_time = time.time()
while cap.isOpened():
ret,frame = cap.read()
if ret:
# ---------------------------------------------------------------------------------------------------
queue_eye.pop(0)
queue_phone.pop(0)
queue_mouth.pop(0)
queue_look_ground.pop(0)
# ---------------------------------------------------------------------------------------------------
image = frame.copy()
img = image_preprocess_ssdlite(image)
data = np.expand_dims(img, axis=0)
ssdlite_input ={ssdlite.get_inputs()[0].name: data}
detection_boxes, detection_classes, _, detection_scores, _, _, _ = ssdlite.run(None, ssdlite_input)
head_bboxes = []
phone_bboxes = []
for i in range(100):
if detection_scores[0][i] > 0.5:
ymin, xmin, ymax, xmax = detection_boxes[0][i][0], detection_boxes[0][i][1], detection_boxes[0][i][2], detection_boxes[0][i][3]
if detection_classes[0][i] == 1:
head_bbox = [xmin, ymin, xmax, ymax]
head_bboxes.append(head_bbox)
else:
phone_bbox = [xmin, ymin, xmax, ymax]
phone_bboxes.append(phone_bbox)
max_roi_head = []
max_roi_phone = []
if head_bboxes:
max_roi_head = get_max_roi_head(head_bboxes, image)
xmin, ymin, xmax, ymax = max_roi_head[0], max_roi_head[1], max_roi_head[2], max_roi_head[3]
cv2.rectangle(img=frame,pt1=(xmin,ymin),pt2=(xmax,ymax),color=(0,255,0),thickness=10)
if phone_bboxes:
max_roi_phone = get_max_roi_phone(phone_bboxes, image)
xmin, ymin, xmax, ymax = max_roi_phone[0], max_roi_phone[1], max_roi_phone[2], max_roi_phone[3]
cv2.rectangle(img=frame,pt1=(xmin,ymin),pt2=(xmax,ymax),color=(0,0,255),thickness=10)
if max_roi_head:
xmin, ymin, xmax, ymax = max_roi_head[0], max_roi_head[1], max_roi_head[2], max_roi_head[3]
image_crop = image[ymin:ymax,xmin:xmax]
h, w = image_crop.shape[0], image_crop.shape[1]
img = image_preprocess_mbv2(image_crop)
data = np.expand_dims(img, axis=0)
onnx_input ={mbv2.get_inputs()[0].name: data}
score, label = mbv2.run(None, onnx_input)
confidence = (score[0][0]*100).round(2)
points = []
if score[0][0]==1:
queue_look_ground.append(0)
for x, y in label[0].reshape(-1, 2):
x = x * w
y = y * h
points.append(int(x)+xmin)
points.append(int(y)+ymin)
else:
queue_look_ground.append(1)
if points:
keypoint_features = np.array(points)
eye_l = keypoint_features[:12].reshape((-1,1,2))
eye_r = keypoint_features[12:24].reshape((-1,1,2))
mouth = keypoint_features[24:36].reshape((-1,1,2))
l_ear = eye_aspect_ratio(keypoint_features[:12].reshape(-1, 2))
r_ear = eye_aspect_ratio(keypoint_features[12:24].reshape(-1, 2))
m_ear = eye_aspect_ratio(keypoint_features[24:36].reshape(-1, 2))
if l_ear<EAR_THRESH and l_ear<EAR_THRESH:
queue_eye.append(1)
else:
queue_eye.append(0)
if m_ear>EAR_THRESH:
queue_mouth.append(1)
else:
queue_mouth.append(0)
cv2.polylines(img=frame,pts=[eye_l],isClosed=True,color=(255,255,0),thickness=2)
cv2.polylines(img=frame,pts=[eye_r],isClosed=True,color=(255,255,0),thickness=2)
cv2.polylines(img=frame,pts=[mouth],isClosed=True,color=(255,255,0),thickness=2)
cv2.putText(img=frame,text='eye_l_ear:'+str(l_ear.round(2)),org=(xmin+20,ymin+60),fontFace=font,fontScale=4,color=(0,255,0),thickness=4,lineType=cv2.LINE_AA)
cv2.putText(img=frame,text='eye_r_ear:'+str(r_ear.round(2)),org=(xmin+20,ymin+120),fontFace=font,fontScale=4,color=(0,255,0),thickness=4,lineType=cv2.LINE_AA)
cv2.putText(img=frame,text='mouth_ear:'+str(m_ear.round(2)),org=(xmin+20,ymin+180),fontFace=font,fontScale=4,color=(0,255,0),thickness=4,lineType=cv2.LINE_AA)
index = 0
for x, y in keypoint_features.reshape(-1, 2):
index+=1
cv2.circle(frame,(x,y),3,(0,255,0),-1)
cv2.putText(img=frame,text=str(index),org=(x,y),fontFace=font,fontScale=1,color=(255,255,0),thickness=1,lineType=cv2.LINE_AA)
else:
queue_eye.append(0)
queue_mouth.append(0)
cv2.putText(img=frame,text='has_face:'+str(confidence)+'%',org=(xmin,ymin-10),fontFace=font,fontScale=5,color=(0,255,0),thickness=5,lineType=cv2.LINE_AA)
if max_roi_phone:
ratio = cover_ratio(max_roi_head, max_roi_phone)
if ratio>0.5:
queue_phone.append(1)
else:
queue_phone.append(0)
else:
queue_phone.append(0)
else:
# ---------------------------------------------------------------------------------------------------
queue_eye.append(0)
queue_phone.append(0)
queue_mouth.append(0)
queue_look_ground.append(0)
# ---------------------------------------------------------------------------------------------------
# 计算FPS
now = time.time()
fps_text = int(1 / ( now - start_time))
start_time = now
yawn_score = sum(queue_mouth)/len(queue_mouth)
phone_score = sum(queue_phone)/len(queue_phone)
sleeping_score = sum(queue_eye)/len(queue_eye)
look_ground_score = sum(queue_look_ground)/len(queue_look_ground)
name_list = ['打哈欠','打电话','打瞌睡','左顾右盼']
score_list = [yawn_score, phone_score, sleeping_score, look_ground_score]
queue_list = [queue_mouth, queue_phone, queue_eye, queue_look_ground]
sorted_id = sorted(range(len(score_list)), key=lambda k: score_list[k], reverse=True)
max_score_id = np.argmax(score_list)
frame = cv2AddChineseText(frame, 'ssdlite_mbv2使用原生onnxruntime推理:'+str(fps_text)+'fps', (20,50), textColor=(0, 255, 0), textSize=40)
frame = cv2AddChineseText(frame, 'AI练习生原创作品', (600,200), textColor=(0, 255, 255), textSize=50)
if score_list[max_score_id]>0.9:
frame = cv2AddChineseText(frame, '驾驶行为:'+name_list[max_score_id], (400,400), textColor=(0, 255, 255), textSize=50)
else:
frame = cv2AddChineseText(frame, '驾驶行为:'+'正常驾驶', (400,400), textColor=(0, 255, 255), textSize=50)
frame = cv2AddChineseText(frame, name_list[sorted_id[0]]+str(queue_list[sorted_id[0]])+':'+str(np.array(score_list[sorted_id[0]]*100).round(2))+'%', (20,600), textColor=(0, 255, 255), textSize=40)
frame = cv2AddChineseText(frame, name_list[sorted_id[1]]+str(queue_list[sorted_id[1]])+':'+str(np.array(score_list[sorted_id[1]]*100).round(2))+'%', (20,700), textColor=(0, 255, 0), textSize=40)
frame = cv2AddChineseText(frame, name_list[sorted_id[2]]+str(queue_list[sorted_id[2]])+':'+str(np.array(score_list[sorted_id[2]]*100).round(2))+'%', (20,800), textColor=(0, 255, 0), textSize=40)
frame = cv2AddChineseText(frame, name_list[sorted_id[3]]+str(queue_list[sorted_id[3]])+':'+str(np.array(score_list[sorted_id[3]]*100).round(2))+'%', (20,900), textColor=(0, 255, 0), textSize=40)
print('ssdlite_mbv2_post')
# cv2.imshow('demo', frame)
writer.write(frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
else:
break
cap.release()
cv2.destroyAllWindows()
ModelBox应用开发
通过以下章节您将大致了解ModelBox AI应用的开发流程:
1、查看流程图
可以看到我们的流程图有两个推理功能单元,分别是ssd_infer和mbv2_infer。其中ssd_infer使用原生onnxruntime进行推理,主要是由于ssdlite_mbv2.onnx目标检测模型是由TensorFlow 1.x训练转换导出,模型解析存在问题。mbv2_224x224.onnx多输出模型是由TensorFlow 2.x训练导出,可以使用ModelBox内置的推理功能单元进行推理。
2、查看推理功能单元
由于ssdlite_mbv2.onnx使用原生onnxruntime进行推理,需要编写模型的推理代码,首先查看模型的配置文件:
# Copyright (c) Huawei Technologies Co., Ltd. 2022. All rights reserved.
# Basic config
[base]
name = "ssd_infer" # The FlowUnit name
device = "cpu" # The flowunit runs on cpu
version = "1.0.0" # The version of the flowunit
type = "python" # Fixed value, do not change
description = "description" # The description of the flowunit
entry = "ssd_infer@ssd_inferFlowUnit" # Python flowunit entry function
group_type = "Generic" # flowunit group attribution, change as Input/Output/Image/Generic ...
# Flowunit Type
stream = false # Whether the flowunit is a stream flowunit
condition = false # Whether the flowunit is a condition flowunit
collapse = false # Whether the flowunit is a collapse flowunit
collapse_all = false # Whether the flowunit will collapse all the data
expand = false # Whether the flowunit is a expand flowunit
# The default Flowunit config
[config]
net_h = 300
net_w = 300
# Input ports description
[input]
[input.input1] # Input port number, the format is input.input[N]
name = "Input" # Input port name
type = "uint8" # Input port type
# Output ports description
[output]
[output.output1] # Output port number, the format is output.output[N]
name = "Output1" # Output port name
type = "float" # Output port type
[output.output2] # Output port number, the format is output.output[N]
name = "Output2" # Output port name
type = "float" # Output port type
[output.output3] # Output port number, the format is output.output[N]
name = "Output3" # Output port name
type = "float" # Output port type
我们的模型输入300x300大小的unit8图像,输出7个float类型的张量,我们只需要其中的检测框、置信度和物体类别,模型推理代码如下:
# Copyright (c) Huawei Technologies Co., Ltd. 2022. All rights reserved.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import _flowunit as modelbox
import numpy as np
import onnxruntime
class ssd_inferFlowUnit(modelbox.FlowUnit):
# Derived from modelbox.FlowUnit
def __init__(self):
super().__init__()
self.ssdlite = onnxruntime.InferenceSession('model/ssdlite_mbv2.onnx')
def open(self, config):
# Open the flowunit to obtain configuration information
self.net_h = config.get_int('net_h', 300)
self.net_w = config.get_int('net_w', 300)
return modelbox.Status.StatusCode.STATUS_SUCCESS
def process(self, data_context):
# Process the data
in_data = data_context.input("Input")
out_data1 = data_context.output("Output1")
out_data2 = data_context.output("Output2")
out_data3 = data_context.output("Output3")
# ssd_infer process code.
# Remove the following code and add your own code here.
for buffer in in_data:
# 将输入Buffer转换为numpy对象
buffer = np.array(buffer.as_object(), copy=False)
img = buffer.reshape((self.net_h, self.net_w, 3))
data = np.expand_dims(img, axis=0)
ssdlite_input ={self.ssdlite.get_inputs()[0].name: data}
detection_boxes, detection_classes, _, detection_scores, _, _, _ = self.ssdlite.run(None, ssdlite_input)
# 将业务处理返回的结果数据转换为Buffer
boxes = modelbox.Buffer(self.get_bind_device(), detection_boxes)
scores = modelbox.Buffer(self.get_bind_device(), detection_scores)
classes = modelbox.Buffer(self.get_bind_device(), detection_classes)
out_data1.push_back(boxes)
out_data2.push_back(classes)
out_data3.push_back(scores)
return modelbox.Status.StatusCode.STATUS_SUCCESS
def close(self):
# Close the flowunit
return modelbox.Status()
def data_pre(self, data_context):
# Before streaming data starts
return modelbox.Status()
def data_post(self, data_context):
# After streaming data ends
return modelbox.Status()
def data_group_pre(self, data_context):
# Before all streaming data starts
return modelbox.Status()
def data_group_post(self, data_context):
# After all streaming data ends
return modelbox.Status()
mbv2_224x224.onnx使用ModelBox内置推理功能单元进行推理,有1个输入和2个输出,模型配置文件编辑如下,无需编写推理代码。
# Copyright (C) 2020 Huawei Technologies Co., Ltd. All rights reserved.
[base]
name = "mbv2_infer"
device = "cpu"
version = "1.0.0"
description = "your description"
entry = "./mbv2_224x224.onnx" # model file path, use relative path
type = "inference"
virtual_type = "onnx" # inference engine type: win10 now only support onnx
group_type = "Inference" # flowunit group attribution, do not change
# Input ports description
[input]
[input.input1] # input port number, Format is input.input[N]
name = "Input" # input port name
type = "float" # input port data type ,e.g. float or uint8
device = "cpu" # input buffer type: cpu, win10 now copy input from cpu
# Output ports description
[output]
[output.output1] # output port number, Format is output.output[N]
name = "Output1" # output port name
type = "float" # output port data type ,e.g. float or uint8
[output.output2] # output port number, Format is output.output[N]
name = "Output2" # output port name
type = "float" # output port data type ,e.g. float or uint8
3、查看条件功能单元
a、设置条件功能单元属性:
# Copyright (c) Huawei Technologies Co., Ltd. 2022. All rights reserved.
# Basic config
[base]
name = "head_condition" # The FlowUnit name
device = "cpu" # The flowunit runs on cpu
version = "1.0.0" # The version of the flowunit
type = "python" # Fixed value, do not change
description = "description" # The description of the flowunit
entry = "head_condition@head_conditionFlowUnit" # Python flowunit entry function
group_type = "Generic" # flowunit group attribution, change as Input/Output/Image/Generic ...
# Flowunit Type
stream = false # Whether the flowunit is a stream flowunit
condition = true # Whether the flowunit is a condition flowunit
collapse = false # Whether the flowunit is a collapse flowunit
collapse_all = false # Whether the flowunit will collapse all the data
expand = false # Whether the flowunit is a expand flowunit
# The default Flowunit config
[config]
item = "value"
# Input ports description
[input]
[input.input1] # 输入数据1
name = "in_image" # 原图
type = "uint8" # 原图数据格式为 uint8
[input.input2] # 输入数据2
name = "in_bbox" # 头部和手机检测框
type = "string" # 检测框数据格式为json字符串
# Output ports description
[output]
[output.output1] # 检测到头部时的输出数据
name = "has_head" # 原图 + 头部和手机检测框(检测框以属性方式附加在原图上)
type = "uint8" # 原图数据格式为 uint8
[output.output2] # 未检测到头部时的输出数据
name = "no_head" # 原图
type = "uint8" # 原图数据格式为 uint8
与通用功能单元不同的是,condition属性设置为true,且一定有多个输出端口,head_condition条件功能单元有两个输出端口has_head和no_head,分别代表检测到头部和未检测到头部的两个分支,下图展示了条件功能单元的数据流图:
b、实现条件功能单元接口:
条件功能单元process接口逻辑与通用功能单元基本一致,只是在最后输出Buffer上有所不同,逻辑代码如下:
# Copyright (c) Huawei Technologies Co., Ltd. 2022. All rights reserved.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import _flowunit as modelbox
import numpy as np
import json
class head_conditionFlowUnit(modelbox.FlowUnit):
# Derived from modelbox.FlowUnit
def __init__(self):
super().__init__()
def open(self, config):
# Open the flowunit to obtain configuration information
return modelbox.Status.StatusCode.STATUS_SUCCESS
def process(self, data_context):
# Process the data
in_image = data_context.input("in_image")
in_bbox = data_context.input("in_bbox")
has_head = data_context.output("has_head")
no_head = data_context.output("no_head")
# face_condition process code.
# Remove the following code and add your own code here.
# 循环处理每一个输入Buffer数据(实际上条件功能单元的batch size为1,此处循环中只有1条数据)
for buffer_img, buffer_bbox in zip(in_image, in_bbox):
# 获取输入Buffer的属性信息
width = buffer_img.get('width')
height = buffer_img.get('height')
channel = buffer_img.get('channel')
# 将输入Buffer转换为numpy对象
img_data = np.array(buffer_img.as_object(), copy=False)
img_data = img_data.reshape((height, width, channel))
# 字符串数据可以直接用as_object函数转换
bbox_str = buffer_bbox.as_object()
# 解码出头部检测框数据
head_bboxes = self.decode_head_bboxes(bbox_str)
# 解码出手机检测框数据
phone_bboxes = self.decode_phone_bboxes(bbox_str)
if phone_bboxes:
max_roi = self.get_max_roi_phone(phone_bboxes, img_data)
buffer_img.set("bbox_phone", max_roi)
else:
buffer_img.set("bbox_phone", 0)
# 此处是将输入Buffer直接作为输出Buffer向后传递
# 此时Buffer的Data、Meta等全部内容都将保留,无需构建Buffer、设置Meta
if head_bboxes: # 检测到头部时的输出分支
max_roi = self.get_max_roi_head(head_bboxes, img_data)
buffer_img.set("bbox", max_roi) # 将头部检测框作为属性附在输出Buffer上
has_head.push_back(buffer_img)
else: # 未检测到头部时的输出分支
no_head.push_back(buffer_img)
# 返回成功标志,ModelBox框架会将数据发送到后续的功能单元
return modelbox.Status.StatusCode.STATUS_SUCCESS
def decode_head_bboxes(self, bbox_str):
"""从json字符串中解码出头部检测框"""
try:
hand_labels = [1] # 头部对应的类别号是 1
det_result = json.loads(bbox_str)['det_result']
if det_result == "None":
return []
bboxes = json.loads(det_result)
hand_bboxes = list(filter(lambda x: int(x[5]) in hand_labels, bboxes))
except Exception as ex:
modelbox.error(str(ex))
return []
else:
return hand_bboxes
def decode_phone_bboxes(self, bbox_str):
"""从json字符串中解码出头部检测框"""
try:
phone_labels = [2] # 手机对应的类别号是 1
det_result = json.loads(bbox_str)['det_result']
if det_result == "None":
return []
bboxes = json.loads(det_result)
phone_bboxes = list(filter(lambda x: int(x[5]) in phone_labels, bboxes))
except Exception as ex:
modelbox.error(str(ex))
return []
else:
return phone_bboxes
def get_max_roi_head(self, bboxes, img_data, scale=1):
"""找出roi最大的头部检测框"""
max_bbox = max(bboxes, key = lambda x: (x[2] - x[0]) * (x[3] - x[1]) * x[4])
# 原始检测框数据归一化到[0,1],此处需还原到原图中的坐标
img_h, img_w, _ = img_data.shape
x1, y1, x2, y2, _, _ = max_bbox
x1 = int(x1 * img_w)
y1 = int(y1 * img_h)
x2 = int(x2 * img_w)
y2 = int(y2 * img_h)
# 检测框进行一定的外扩
w_ = max(abs(x2 - x1), abs(y2 - y1))
w_ = w_ * scale
x_mid = (x1 + x2) / 2
y_mid = (y1 + y2) / 2
new_x1 = max(0, int(x_mid - w_ / 2))
new_y1 = max(0, int(y_mid - w_ / 2))
new_x2 = min(int(x_mid + w_ / 2), img_w)
new_y2 = min(int(y_mid + w_ / 2), img_h)
new_bbox = [new_x1, new_y1, new_x2, new_y2]
return new_bbox
def get_max_roi_phone(self, bboxes, img_data):
"""找出roi最大的手机检测框"""
max_bbox = max(bboxes, key = lambda x: (x[2] - x[0]) * (x[3] - x[1]) * x[4])
# 原始检测框数据归一化到[0,1],此处需还原到原图中的坐标
img_h, img_w, _ = img_data.shape
x1, y1, x2, y2, _, _ = max_bbox
x1 = int(x1 * img_w)
y1 = int(y1 * img_h)
x2 = int(x2 * img_w)
y2 = int(y2 * img_h)
new_bbox = [x1, y1, x2, y2]
return new_bbox
def close(self):
# Close the flowunit
return modelbox.Status()
def data_pre(self, data_context):
# Before streaming data starts
return modelbox.Status()
def data_post(self, data_context):
# After streaming data ends
return modelbox.Status()
def data_group_pre(self, data_context):
# Before all streaming data starts
return modelbox.Status()
def data_group_post(self, data_context):
# After all streaming data ends
return modelbox.Status()
4、查看绘图功能单元
绘图功能单元是一个通用功能单元,通用功能单元在处理完数据后,会产生一个或多个输出,后续功能单元在承接时,必须接收它的所有数据(当然可以接收后只处理其中一部分输出);另外,通用功能单元一次可以处理多个Buffer(即batch size>1),数据按batch size设置分成多组,多组数据并发处理,生成结果与输入一致,下图展示了通用功能单元的数据流图:
配置文件内容如下:
# Copyright (c) Huawei Technologies Co., Ltd. 2022. All rights reserved.
# Basic config
[base]
name = "ssd_mbv2_draw" # The FlowUnit name
device = "cpu" # The flowunit runs on cpu
version = "1.0.0" # The version of the flowunit
type = "python" # Fixed value, do not change
description = "description" # The description of the flowunit
entry = "ssd_mbv2_draw@ssd_mbv2_drawFlowUnit" # Python flowunit entry function
group_type = "Generic" # flowunit group attribution, change as Input/Output/Image/Generic ...
# Flowunit Type
stream = false # Whether the flowunit is a stream flowunit
condition = false # Whether the flowunit is a condition flowunit
collapse = false # Whether the flowunit is a collapse flowunit
collapse_all = false # Whether the flowunit will collapse all the data
expand = false # Whether the flowunit is a expand flowunit
# The default Flowunit config
[config]
item = "value"
# Input ports description
# 输入端口描述
[input]
[input.input1] # 输入数据1:原图 + 头部和手机检测框(检测框以属性方式附加在原图上)
name = "in_image"
type = "uint8"
[input.input2] # 输入数据2:转为json字符串格式的手部关键点数据
name = "in_landmarks"
type = "string"
# Output ports description
[output]
[output.output1] # Output port number, the format is output.output[N]
name = "out_image" # Output port name
type = "uint8" # Output port type
补充功能单元逻辑代码:
# Copyright (c) Huawei Technologies Co., Ltd. 2022. All rights reserved.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import _flowunit as modelbox
from scipy.spatial import distance as dist
from PIL import Image, ImageDraw, ImageFont
import numpy as np
import json
import cv2
class ssd_mbv2_drawFlowUnit(modelbox.FlowUnit):
# Derived from modelbox.FlowUnit
def __init__(self):
super().__init__()
self.EAR_THRESH = 0.2
self.queue_eye = [0] * 10
self.queue_phone = [0] * 10
self.queue_mouth = [0] * 10
self.queue_look_ground = [0] * 10
self.font = cv2.FONT_HERSHEY_PLAIN
def open(self, config):
# Open the flowunit to obtain configuration information
return modelbox.Status.StatusCode.STATUS_SUCCESS
def process(self, data_context):
# Process the data
# 从DataContext中获取输入输出BufferList对象
in_image = data_context.input("in_image")
in_landmarks = data_context.input("in_landmarks")
out_image = data_context.output("out_image")
# ssd_mbv2_draw process code.
# Remove the following code and add your own code here.
# 循环处理每一个输入Buffer数据
for buffer_img, buffer_landmarks in zip(in_image, in_landmarks):
self.queue_eye.pop(0)
self.queue_phone.pop(0)
self.queue_mouth.pop(0)
self.queue_look_ground.pop(0)
# 获取输入图像Buffer的宽、高、通道数等属性信息
width = buffer_img.get('width')
height = buffer_img.get('height')
channel = buffer_img.get('channel')
# 将输入Buffer转换为numpy对象
img_data = np.array(buffer_img.as_object(), dtype=np.uint8, copy=False)
img_data = img_data.reshape(height, width, channel)
max_bbox = buffer_img.get("bbox")
phone_bbox = buffer_img.get("bbox_phone")
# 将输入Buffer转换为字符串,从json字符串中解码出手部关键点数据
landmarks_str = buffer_landmarks.as_object()
landmarks = self.decode_landmarks(landmarks_str)
confidence = landmarks[0]*100
# 业务处理:将最大的人形检测框对应的手部关键点数据画在图上
img_out = img_data.copy()
xmin, ymin, xmax, ymax = max_bbox[0], max_bbox[1], max_bbox[2], max_bbox[3]
cv2.rectangle(img_out,pt1=(xmin,ymin),pt2=(xmax,ymax),color=(0,255,0),thickness=10)
cv2.putText(img_out,text='has_face:'+str(confidence)+'%',org=(xmin,ymin-10),fontFace=self.font,fontScale=5,color=(0,255,0),thickness=5,lineType=cv2.LINE_AA)
if landmarks[0]==1:
self.queue_look_ground.append(0)
points = np.array(landmarks[1:])
for x, y in points.reshape(-1, 2):
x = int(x*(xmax-xmin))
y = int(y*(ymax-ymin))
cv2.circle(img_out,(x+xmin,y+ymin),3,(0,255,0),-1)
l_ear = self.eye_aspect_ratio(points[:12].reshape(-1, 2))
r_ear = self.eye_aspect_ratio(points[12:24].reshape(-1, 2))
m_ear = self.eye_aspect_ratio(points[24:36].reshape(-1, 2))
if l_ear<self.EAR_THRESH and l_ear<self.EAR_THRESH:
self.queue_eye.append(1)
else:
self.queue_eye.append(0)
if m_ear>self.EAR_THRESH:
self.queue_mouth.append(1)
else:
self.queue_mouth.append(0)
cv2.putText(img_out,text='eye_l_ear:'+str(np.array(l_ear).round(2)),org=(xmin+20,ymin+60),fontFace=self.font,fontScale=4,color=(0,255,0),thickness=4,lineType=cv2.LINE_AA)
cv2.putText(img_out,text='eye_r_ear:'+str(np.array(r_ear).round(2)),org=(xmin+20,ymin+120),fontFace=self.font,fontScale=4,color=(0,255,0),thickness=4,lineType=cv2.LINE_AA)
cv2.putText(img_out,text='mouth_ear:'+str(np.array(m_ear).round(2)),org=(xmin+20,ymin+180),fontFace=self.font,fontScale=4,color=(0,255,0),thickness=4,lineType=cv2.LINE_AA)
else:
self.queue_eye.append(0)
self.queue_mouth.append(0)
self.queue_look_ground.append(1)
if phone_bbox:
x1, y1, x2, y2 = phone_bbox[0], phone_bbox[1], phone_bbox[2], phone_bbox[3]
cv2.rectangle(img_out,pt1=(x1,y1),pt2=(x2,y2),color=(255,0,0),thickness=10)
ratio = self.cover_ratio(max_bbox, phone_bbox)
if ratio>0.5:
self.queue_phone.append(1)
else:
self.queue_phone.append(0)
else:
self.queue_phone.append(0)
yawn_score = sum(self.queue_mouth)/len(self.queue_mouth)
phone_score = sum(self.queue_phone)/len(self.queue_phone)
sleeping_score = sum(self.queue_eye)/len(self.queue_eye)
look_ground_score = sum(self.queue_look_ground)/len(self.queue_look_ground)
name_list = ['打哈欠','打电话','打瞌睡','左顾右盼']
score_list = [yawn_score, phone_score, sleeping_score, look_ground_score]
queue_list = [self.queue_mouth, self.queue_phone, self.queue_eye, self.queue_look_ground]
sorted_id = sorted(range(len(score_list)), key=lambda k: score_list[k], reverse=True)
max_score_id = np.argmax(score_list)
img_out = self.cv2AddChineseText(img_out, 'AI练习生原创作品', (600,50), textColor=(0, 255, 0), textSize=50)
if score_list[max_score_id]>0.9:
img_out = self.cv2AddChineseText(img_out, '驾驶行为:'+name_list[max_score_id], (400,400), textColor=(0, 255, 0), textSize=50)
else:
img_out = self.cv2AddChineseText(img_out, '驾驶行为:'+'正常驾驶', (400,400), textColor=(0, 255, 0), textSize=50)
img_out = self.cv2AddChineseText(img_out, name_list[sorted_id[0]]+str(queue_list[sorted_id[0]])+':'+str(np.array(score_list[sorted_id[0]]*100).round(2))+'%', (20,600), textColor=(0, 255, 0), textSize=40)
img_out = self.cv2AddChineseText(img_out, name_list[sorted_id[1]]+str(queue_list[sorted_id[1]])+':'+str(np.array(score_list[sorted_id[1]]*100).round(2))+'%', (20,700), textColor=(0, 255, 0), textSize=40)
img_out = self.cv2AddChineseText(img_out, name_list[sorted_id[2]]+str(queue_list[sorted_id[2]])+':'+str(np.array(score_list[sorted_id[2]]*100).round(2))+'%', (20,800), textColor=(0, 255, 0), textSize=40)
img_out = self.cv2AddChineseText(img_out, name_list[sorted_id[3]]+str(queue_list[sorted_id[3]])+':'+str(np.array(score_list[sorted_id[3]]*100).round(2))+'%', (20,900), textColor=(0, 255, 0), textSize=40)
# 将业务处理返回的结果数据转换为Buffer
out_buffer = modelbox.Buffer(self.get_bind_device(), img_out)
# 设置输出Buffer的Meta信息,此处直接拷贝输入Buffer的Meta信息
out_buffer.copy_meta(buffer_img)
# 将输出Buffer放入输出BufferList中
out_image.push_back(out_buffer)
# 返回成功标志,ModelBox框架会将数据发送到后续的功能单元
return modelbox.Status.StatusCode.STATUS_SUCCESS
def decode_landmarks(self, landmarks_str):
"""从json字符串中解码出人脸关键点数据"""
try:
landmarks_data = json.loads(landmarks_str)
landmarks_list = json.loads(landmarks_data['landmarks_result'])
except Exception as ex:
modelbox.error(str(ex))
return []
else:
return landmarks_list
def cover_ratio(self, box1, box2):
'''计算两个矩形框的IOU与box2区域的比值'''
def _overlap(x1, x2, x3, x4):
left = max(x1, x3)
right = min(x2, x4)
return right - left
w = _overlap(box1[0], box1[2], box2[0], box2[2])
h = _overlap(box1[1], box1[3], box2[1], box2[3])
if w <= 0 or h <= 0:
return 0
inter_area = w * h
small_area = (box2[2] - box2[0]) * (box2[3] - box2[1])
return inter_area * 1.0 / small_area
def eye_aspect_ratio(self, eye):
'''计算出EAR'''
# compute the euclidean distances between the two sets of
# vertical eye landmarks (x, y)-coordinates
A = dist.euclidean(eye[1], eye[5])
B = dist.euclidean(eye[2], eye[4])
# compute the euclidean distance between the horizontal
# eye landmark (x, y)-coordinates
C = dist.euclidean(eye[0], eye[3])
# compute the eye aspect ratio
ear = (A + B) / (2.0 * C)
# return the eye aspect ratio
return ear
def cv2AddChineseText(self, img, text, position, textColor=(0, 255, 0), textSize=30):
'''绘制中文'''
img = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
draw = ImageDraw.Draw(img)
# 字体的格式
fontStyle = ImageFont.truetype(
"./font/simsun.ttc", textSize, encoding="utf-8")
# 绘制文本
draw.text(position, text, textColor, font=fontStyle)
# 转换回OpenCV格式
return cv2.cvtColor(np.asarray(img), cv2.COLOR_RGB2BGR)
def close(self):
# Close the flowunit
return modelbox.Status()
def data_pre(self, data_context):
# Before streaming data starts
return modelbox.Status()
def data_post(self, data_context):
# After streaming data ends
return modelbox.Status()
def data_group_pre(self, data_context):
# Before all streaming data starts
return modelbox.Status()
def data_group_post(self, data_context):
# After all streaming data ends
return modelbox.Status()
其余功能单元实现与前面类似。
5、查看ModelBox流程图
# Copyright (C) 2020 Huawei Technologies Co., Ltd. All rights reserved.
[driver]
dir = ["${HILENS_APP_ROOT}/etc/flowunit",
"${HILENS_APP_ROOT}/etc/flowunit/cpp",
"${HILENS_APP_ROOT}/model",
"${HILENS_MB_SDK_PATH}/flowunit"]
skip-default = true
[profile]
profile=false
trace=false
dir="${HILENS_DATA_DIR}/mb_profile"
[graph]
format = "graphviz"
graphconf = """digraph Fatigue_driving_detection {
node [shape=Mrecord]
queue_size = 1
batch_size = 1
# 定义节点,即功能单元及其属性
input1[type=input, flowunit=input, device=cpu, deviceid=0]
data_source_parser[type=flowunit, flowunit=data_source_parser, device=cpu, deviceid=0]
video_demuxer[type=flowunit, flowunit=video_demuxer, device=cpu, deviceid=0]
video_decoder[type=flowunit, flowunit=video_decoder, device=cpu, deviceid=0, pix_fmt=rgb]
image_resize[type=flowunit, flowunit=resize, device=cpu, deviceid=0, image_width=300, image_height=300]
ssd_infer[type=flowunit, flowunit=ssd_infer, device=cpu, deviceid=0]
ssd_post[type=flowunit, flowunit=ssd_post, device=cpu, deviceid=0]
head_condition[type=flowunit, flowunit=head_condition, device=cpu, deviceid=0]
crop_head_image[type=flowunit, flowunit=crop_head_image, device=cpu, deviceid=0]
image_resize2[type=flowunit, flowunit=resize, device=cpu, deviceid=0, image_width=224, image_height=224]
normalize[type=flowunit flowunit=normalize device=cpu deviceid="0" standard_deviation_inverse="0.003921568627450,0.003921568627450,0.003921568627450"]
mbv2_infer[type=flowunit, flowunit=mbv2_infer, device=cpu, deviceid=0]
landmarks_post[type=flowunit, flowunit=landmarks_post, device=cpu, deviceid=0]
ssd_mbv2_draw[type=flowunit, flowunit=ssd_mbv2_draw, device=cpu, deviceid=0]
video_out[type=flowunit, flowunit=video_out, device=cpu, deviceid=0]
# 定义边,即功能间的数据传递关系
input1:input -> data_source_parser:in_data
data_source_parser:out_video_url -> video_demuxer:in_video_url
video_demuxer:out_video_packet -> video_decoder:in_video_packet
video_decoder:out_video_frame -> image_resize:in_image
image_resize:out_image -> ssd_infer:Input
ssd_infer:Output1 -> ssd_post:in_feat1
ssd_infer:Output2 -> ssd_post:in_feat2
ssd_infer:Output3 -> ssd_post:in_feat3
ssd_post:out_feat -> head_condition:in_bbox
video_decoder:out_video_frame -> head_condition:in_image
head_condition:no_head -> video_out:in_video_frame
head_condition:has_head -> crop_head_image:in_data
crop_head_image:roi_image -> image_resize2:in_image
image_resize2:out_image -> normalize:in_data
normalize:out_data -> mbv2_infer:Input
mbv2_infer:Output1 -> landmarks_post:in_feat1
mbv2_infer:Output2 -> landmarks_post:in_feat2
landmarks_post:out_data -> ssd_mbv2_draw:in_landmarks
head_condition:has_head -> ssd_mbv2_draw:in_image
ssd_mbv2_draw:out_image -> video_out:in_video_frame
}"""
[flow]
desc = "Fatigue_driving_detection run in modelbox-win10-x64"
可以看到条件功能单元head_condition的两个输出分别对接到不同的功能单元,在未检测到头部时,no_head分支直接对接到video_out进行视频编码;检测到头部时,has_head分支对接到crop_head_image以及之后的功能单元做人脸识别以及关键点检测。
6、运行应用
运行前需要安装onnxruntime、pillow以及scipy,切换到Fatigue_driving_detection目录下执行bin/main.bat运行应用,效果如下:
也可以更换其它视频进行测试。
案例小结
本案例从模型训练到应用开发全流程自主设计,实现了5种驾驶行为的实时识别,具备实际部署落地的可行性,关键点检测采用直接回归的方式精确度不高,后续将采用Heatmap的方式进一步提高识别的准确性。
参考文章:
ModelBox AI应用开发——条件功能单元
评论(0)