驾驶行为检测【玩转华为云】
本文将使用ModelArts从零开始训练模型,并使用ModelBox开发一个驾驶行为的检测的AI应用,实现正常驾驶、闭眼、打哈欠、打电话、左顾右盼等5种行为的识别,最终效果如下:
 
本案例所需资源(代码、模型、测试数据等)均可从网盘链接下载。
ModelArts模型训练
我们使用疲劳/分神样例数据集进行标注用于模型训练:

首先训练一个轻量化的人脸手机检测模型,训练步骤可以参考我发布的Notebook:

训练结束后可以在Tensorboard中查看训练结果:

这里我设置模型训练步数为60k,每10k学习率衰减至原来的95%:

之后再训练一个人脸关键点检测模型,使用之前训练好的SSD目标检测模型以及OpenCV人脸检测库进行图像的自动标注,具体可以参考我发布的Notebook:

运行效果如下:

手工进行数据清洗并使用Mediapipe对嘴部的标注进行优化,最终得到人脸关键点检测数据集V2:

人脸关键点检测模型采用MobileNetV2作为backbone,检测人脸并对关键点进行回归,代码实现可以参考我发布的Notebook:

使用原生onnxruntime进行推理,视频检测的帧率在9~10左右:
视频检测代码如下:
 """
 
 OpenCV 读取摄像头视频视频流,使用原生的onnxruntime推理
 
 """
 
 
 import cv2
 
 import glob
 
 import time
 
 import numpy as np
 
 import onnxruntime
 
 from scipy.spatial import distance as dist
 
 from PIL import Image, ImageDraw, ImageFont
 
 
 video_path = glob.glob('videos/*.mp4')
 
 video_names = [name.split('\\')[-1].split('.')[0] for name in video_path]
 
 
 video_name = video_names[6]
 
 print(video_name)
 
 
 def image_preprocess_ssdlite(img):
 
 
      img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
 
 
      img = cv2.resize(img, (300, 300))
 
 
      return img
 
 
 def image_preprocess_mbv2(img):
 
 
      img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
 
 
      img = cv2.resize(img, (224, 224))
 
 
      img = img.astype(np.float32)
 
 
      img = img/255
 
 
      return img
 
 
 def get_max_roi_head(bboxes, img_data, scale=1):
 
 
          """找出roi最大的头部检测框"""
 
 
          max_bbox = max(bboxes, key = lambda x: (x[2] - x[0]) * (x[3] - x[1]))
 
 
 
          # 原始检测框数据归一化到[0,1],此处需还原到原图中的坐标
 
 
          img_h, img_w, _ = img_data.shape
 
 
          x1, y1, x2, y2 = max_bbox    
 
 
          x1 = int(x1 * img_w)
 
 
          y1 = int(y1 * img_h)
 
 
          x2 = int(x2 * img_w)
 
 
          y2 = int(y2 * img_h)
 
 
         
 
 
          # 检测框进行一定的外扩
 
 
          w_ = max(abs(x2 - x1), abs(y2 - y1))
 
 
          w_ = w_ * scale
 
 
          x_mid = (x1 + x2) / 2
 
 
          y_mid = (y1 + y2) / 2
 
 
          new_x1 = max(0, int(x_mid - w_ / 2))
 
 
          new_y1 = max(0, int(y_mid - w_ / 2))
 
 
          new_x2 = min(int(x_mid + w_ / 2), img_w)
 
 
          new_y2 = min(int(y_mid + w_ / 2), img_h)
 
 
          new_bbox = [new_x1, new_y1, new_x2, new_y2]
 
 
          return new_bbox
 
 
 def get_max_roi_phone(bboxes, img_data, scale=1):
 
 
          """找出roi最大的手机检测框"""
 
 
          max_bbox = max(bboxes, key = lambda x: (x[2] - x[0]) * (x[3] - x[1]))
 
 
 
          # 原始检测框数据归一化到[0,1],此处需还原到原图中的坐标
 
 
          img_h, img_w, _ = img_data.shape
 
 
          x1, y1, x2, y2 = max_bbox    
 
 
          x1 = int(x1 * img_w)
 
 
          y1 = int(y1 * img_h)
 
 
          x2 = int(x2 * img_w)
 
 
          y2 = int(y2 * img_h)
 
 
          new_bbox = [x1, y1, x2, y2]
 
 
          return new_bbox
 
 
 def cover_ratio(box1, box2):
 
 
      '''计算两个矩形框的IOU与box2区域的比值'''
 
 
      def _overlap(x1, x2, x3, x4):
 
 
          left = max(x1, x3)
 
 
          right = min(x2, x4)
 
 
          return right - left
 
 
 
      w = _overlap(box1[0], box1[2], box2[0], box2[2])
 
 
      h = _overlap(box1[1], box1[3], box2[1], box2[3])
 
 
      if w <= 0 or h <= 0:
 
 
          return 0
 
 
      inter_area = w * h
 
 
      small_area = (box2[2] - box2[0]) * (box2[3] - box2[1])
 
 
      return inter_area * 1.0 / small_area
 
 
 def eye_aspect_ratio(eye):
 
 
      '''计算出EAR'''
 
 
      # compute the euclidean distances between the two sets of
 
 
      # vertical eye landmarks (x, y)-coordinates
 
 
      A = dist.euclidean(eye[1], eye[5])
 
 
      B = dist.euclidean(eye[2], eye[4])
 
 
 
      # compute the euclidean distance between the horizontal
 
 
      # eye landmark (x, y)-coordinates
 
 
      C = dist.euclidean(eye[0], eye[3])
 
 
 
      # compute the eye aspect ratio
 
 
      ear = (A + B) / (2.0 * C)
 
 
 
      # return the eye aspect ratio
 
 
      return ear
 
 
 # 绘制中文
 
 def cv2AddChineseText(img, text, position, textColor=(0, 255, 0), textSize=30):
 
 
      img = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
 
 
      draw = ImageDraw.Draw(img)
 
 
      # 字体的格式
 
 
      fontStyle = ImageFont.truetype(
 
 
          "./font/simsun.ttc", textSize, encoding="utf-8")
 
 
      # 绘制文本
 
 
      draw.text(position, text, textColor, font=fontStyle)
 
 
      # 转换回OpenCV格式
 
 
      return cv2.cvtColor(np.asarray(img), cv2.COLOR_RGB2BGR)
 
 
 cap = cv2.VideoCapture('videos/'+video_name+'.mp4')
 
 
 if not cap.isOpened():
 
 
      print('文件不存在或编码错误')
 
 else:
 
 
      fps = 30
 
 
      font = cv2.FONT_HERSHEY_PLAIN
 
 
      width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
 
 
      height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
 
 
      mbv2 = onnxruntime.InferenceSession('mbv2_224x224.onnx')
 
 
      ssdlite = onnxruntime.InferenceSession('ssdlite_mbv2.onnx')
 
 
      writer = cv2.VideoWriter(video_name + '_result.mp4',cv2.VideoWriter_fourcc(*'X264'),fps,(width,height))
 
 # -----------------------------------------------------------------------------------------------------------
 
 EAR_THRESH = 0.2
 
 queue_eye = [0] * 10
 
 queue_phone = [0] * 10
 
 queue_mouth = [0] * 10
 
 queue_look_ground = [0] * 10
 
 # -----------------------------------------------------------------------------------------------------------
 
 start_time = time.time()  
 
 while cap.isOpened():
 
 
      ret,frame = cap.read()
 
 
      if ret:
 
 
          # ---------------------------------------------------------------------------------------------------
 
 
          queue_eye.pop(0)
 
 
          queue_phone.pop(0)
 
 
          queue_mouth.pop(0)
 
 
          queue_look_ground.pop(0)
 
 
          # ---------------------------------------------------------------------------------------------------
 
 
          image = frame.copy()
 
 
          img = image_preprocess_ssdlite(image)
 
 
          data = np.expand_dims(img, axis=0)
 
 
          ssdlite_input ={ssdlite.get_inputs()[0].name: data}
 
 
          detection_boxes, detection_classes, _, detection_scores, _, _, _ = ssdlite.run(None, ssdlite_input)
 
 
          head_bboxes = []
 
 
          phone_bboxes = []
 
 
          for i in range(100):
 
 
              if detection_scores[0][i] > 0.5:
 
 
                  ymin, xmin, ymax, xmax = detection_boxes[0][i][0], detection_boxes[0][i][1], detection_boxes[0][i][2], detection_boxes[0][i][3]
 
 
                  if detection_classes[0][i] == 1:
 
 
                      head_bbox = [xmin, ymin, xmax, ymax]
 
 
                      head_bboxes.append(head_bbox)
 
 
                  else:
 
 
                      phone_bbox = [xmin, ymin, xmax, ymax]
 
 
                      phone_bboxes.append(phone_bbox)
 
 
          max_roi_head = []
 
 
          max_roi_phone = []
 
 
          if head_bboxes:
 
 
              max_roi_head = get_max_roi_head(head_bboxes, image)
 
 
              xmin, ymin, xmax, ymax = max_roi_head[0], max_roi_head[1], max_roi_head[2], max_roi_head[3]
 
 
              cv2.rectangle(img=frame,pt1=(xmin,ymin),pt2=(xmax,ymax),color=(0,255,0),thickness=10)
 
 
          if phone_bboxes:
 
 
              max_roi_phone = get_max_roi_phone(phone_bboxes, image)
 
 
              xmin, ymin, xmax, ymax = max_roi_phone[0], max_roi_phone[1], max_roi_phone[2], max_roi_phone[3]
 
 
              cv2.rectangle(img=frame,pt1=(xmin,ymin),pt2=(xmax,ymax),color=(0,0,255),thickness=10)
 
 
          if max_roi_head:
 
 
              xmin, ymin, xmax, ymax = max_roi_head[0], max_roi_head[1], max_roi_head[2], max_roi_head[3]
 
 
              image_crop = image[ymin:ymax,xmin:xmax]
 
 
              h, w = image_crop.shape[0], image_crop.shape[1]
 
 
              img = image_preprocess_mbv2(image_crop)
 
 
              data = np.expand_dims(img, axis=0)
 
 
              onnx_input ={mbv2.get_inputs()[0].name: data}
 
 
              score, label = mbv2.run(None, onnx_input)
 
 
              confidence = (score[0][0]*100).round(2)
 
 
              points = []
 
 
              if score[0][0]==1:
 
 
                  queue_look_ground.append(0)
 
 
                  for x, y in label[0].reshape(-1, 2):
 
 
                      x = x * w
 
 
                      y = y * h
 
 
                      points.append(int(x)+xmin)
 
 
                      points.append(int(y)+ymin)
 
 
              else:
 
 
                  queue_look_ground.append(1)
 
 
              if points:
 
 
                  keypoint_features = np.array(points)
 
 
                  eye_l = keypoint_features[:12].reshape((-1,1,2))
 
 
                  eye_r = keypoint_features[12:24].reshape((-1,1,2))
 
 
                  mouth = keypoint_features[24:36].reshape((-1,1,2))
 
 
                  l_ear = eye_aspect_ratio(keypoint_features[:12].reshape(-1, 2))
 
 
                  r_ear = eye_aspect_ratio(keypoint_features[12:24].reshape(-1, 2))
 
 
                  m_ear = eye_aspect_ratio(keypoint_features[24:36].reshape(-1, 2))
 
 
                  if l_ear<EAR_THRESH and l_ear<EAR_THRESH:
 
 
                      queue_eye.append(1)
 
 
                  else:
 
 
                      queue_eye.append(0)
 
 
                  if m_ear>EAR_THRESH:
 
 
                      queue_mouth.append(1)
 
 
                  else:
 
 
                      queue_mouth.append(0)
 
 
                  cv2.polylines(img=frame,pts=[eye_l],isClosed=True,color=(255,255,0),thickness=2)
 
 
                  cv2.polylines(img=frame,pts=[eye_r],isClosed=True,color=(255,255,0),thickness=2)
 
 
                  cv2.polylines(img=frame,pts=[mouth],isClosed=True,color=(255,255,0),thickness=2)
 
 
                  cv2.putText(img=frame,text='eye_l_ear:'+str(l_ear.round(2)),org=(xmin+20,ymin+60),fontFace=font,fontScale=4,color=(0,255,0),thickness=4,lineType=cv2.LINE_AA)
 
 
                  cv2.putText(img=frame,text='eye_r_ear:'+str(r_ear.round(2)),org=(xmin+20,ymin+120),fontFace=font,fontScale=4,color=(0,255,0),thickness=4,lineType=cv2.LINE_AA)
 
 
                  cv2.putText(img=frame,text='mouth_ear:'+str(m_ear.round(2)),org=(xmin+20,ymin+180),fontFace=font,fontScale=4,color=(0,255,0),thickness=4,lineType=cv2.LINE_AA)
 
 
                  index = 0
 
 
                  for x, y in keypoint_features.reshape(-1, 2):
 
 
                      index+=1
 
 
                      cv2.circle(frame,(x,y),3,(0,255,0),-1)
 
 
                      cv2.putText(img=frame,text=str(index),org=(x,y),fontFace=font,fontScale=1,color=(255,255,0),thickness=1,lineType=cv2.LINE_AA)
 
 
              else:
 
 
                  queue_eye.append(0)
 
 
                  queue_mouth.append(0)
 
 
              cv2.putText(img=frame,text='has_face:'+str(confidence)+'%',org=(xmin,ymin-10),fontFace=font,fontScale=5,color=(0,255,0),thickness=5,lineType=cv2.LINE_AA)
 
 
              if max_roi_phone:
 
 
                  ratio = cover_ratio(max_roi_head, max_roi_phone)
 
 
                  if ratio>0.5:
 
 
                      queue_phone.append(1)
 
 
                  else:
 
 
                      queue_phone.append(0)
 
 
              else:
 
 
                  queue_phone.append(0)
 
 
          else:
 
 
          # ---------------------------------------------------------------------------------------------------
 
 
              queue_eye.append(0)
 
 
              queue_phone.append(0)
 
 
              queue_mouth.append(0)
 
 
              queue_look_ground.append(0)
 
 
          # ---------------------------------------------------------------------------------------------------
 
 
          # 计算FPS
 
 
          now = time.time()
 
 
          fps_text  = int(1 / ( now - start_time))
 
 
          start_time = now
 
 
          yawn_score = sum(queue_mouth)/len(queue_mouth)
 
 
          phone_score = sum(queue_phone)/len(queue_phone)
 
 
          sleeping_score = sum(queue_eye)/len(queue_eye)
 
 
          look_ground_score = sum(queue_look_ground)/len(queue_look_ground)
 
 
          name_list = ['打哈欠','打电话','打瞌睡','左顾右盼']
 
 
          score_list = [yawn_score, phone_score, sleeping_score, look_ground_score]
 
 
          queue_list = [queue_mouth, queue_phone, queue_eye, queue_look_ground]
 
 
          sorted_id = sorted(range(len(score_list)), key=lambda k: score_list[k], reverse=True)
 
 
          max_score_id = np.argmax(score_list)
 
 
          frame = cv2AddChineseText(frame, 'ssdlite_mbv2使用原生onnxruntime推理:'+str(fps_text)+'fps', (20,50), textColor=(0, 255, 0), textSize=40)
 
 
          frame = cv2AddChineseText(frame, 'AI练习生原创作品', (600,200), textColor=(0, 255, 255), textSize=50)
 
 
          if score_list[max_score_id]>0.9:
 
 
              frame = cv2AddChineseText(frame, '驾驶行为:'+name_list[max_score_id], (400,400), textColor=(0, 255, 255), textSize=50)
 
 
          else:
 
 
              frame = cv2AddChineseText(frame, '驾驶行为:'+'正常驾驶', (400,400), textColor=(0, 255, 255), textSize=50)
 
 
         
 
 
          frame = cv2AddChineseText(frame, name_list[sorted_id[0]]+str(queue_list[sorted_id[0]])+':'+str(np.array(score_list[sorted_id[0]]*100).round(2))+'%', (20,600), textColor=(0, 255, 255), textSize=40)
 
 
          frame = cv2AddChineseText(frame, name_list[sorted_id[1]]+str(queue_list[sorted_id[1]])+':'+str(np.array(score_list[sorted_id[1]]*100).round(2))+'%', (20,700), textColor=(0, 255, 0), textSize=40)
 
 
          frame = cv2AddChineseText(frame, name_list[sorted_id[2]]+str(queue_list[sorted_id[2]])+':'+str(np.array(score_list[sorted_id[2]]*100).round(2))+'%', (20,800), textColor=(0, 255, 0), textSize=40)
 
 
          frame = cv2AddChineseText(frame, name_list[sorted_id[3]]+str(queue_list[sorted_id[3]])+':'+str(np.array(score_list[sorted_id[3]]*100).round(2))+'%', (20,900), textColor=(0, 255, 0), textSize=40)
 
 
          print('ssdlite_mbv2_post')
 
 
          # cv2.imshow('demo', frame)
 
 
          writer.write(frame)
 
 
          if cv2.waitKey(1) & 0xFF == ord('q'):
 
 
                 break
 
 
      else:
 
 
          break
 
 cap.release()
 
 cv2.destroyAllWindows()
 
 
ModelBox应用开发
通过以下章节您将大致了解ModelBox AI应用的开发流程:
1、查看流程图

可以看到我们的流程图有两个推理功能单元,分别是ssd_infer和mbv2_infer。其中ssd_infer使用原生onnxruntime进行推理,主要是由于ssdlite_mbv2.onnx目标检测模型是由TensorFlow 1.x训练转换导出,模型解析存在问题。mbv2_224x224.onnx多输出模型是由TensorFlow 2.x训练导出,可以使用ModelBox内置的推理功能单元进行推理。
2、查看推理功能单元
由于ssdlite_mbv2.onnx使用原生onnxruntime进行推理,需要编写模型的推理代码,首先查看模型的配置文件:
 # Copyright (c) Huawei Technologies Co., Ltd. 2022. All rights reserved.
 
 
 # Basic config
 
 
  [base]
 
 name = "ssd_infer" # The FlowUnit name
 
 device = "cpu" # The flowunit runs on cpu
 
 version = "1.0.0" # The version of the flowunit
 
 type = "python" # Fixed value, do not change
 
 description = "description" # The description of the flowunit
 
 entry = "ssd_infer@ssd_inferFlowUnit" # Python flowunit entry function
 
 group_type = "Generic"  # flowunit group attribution, change as Input/Output/Image/Generic ...
 
 
 # Flowunit Type
 
 stream = false # Whether the flowunit is a stream flowunit
 
 condition = false # Whether the flowunit is a condition flowunit
 
 collapse = false # Whether the flowunit is a collapse flowunit
 
 collapse_all = false # Whether the flowunit will collapse all the data
 
 expand = false #  Whether the flowunit is a expand flowunit
 
 
 # The default Flowunit config
 
 
  [config]
 
 net_h = 300
 
 net_w = 300
 
 # Input ports description
 
 
  [input]
 
 
  [input.input1] # Input port number, the format is input.input[N]
 
 name = "Input" # Input port name
 
 type = "uint8" # Input port type
 
 
 # Output ports description
 
 
  [output]
 
 
  [output.output1] # Output port number, the format is output.output[N]
 
 name = "Output1" # Output port name
 
 type = "float" # Output port type
 
 
 
  [output.output2] # Output port number, the format is output.output[N]
 
 name = "Output2" # Output port name
 
 type = "float" # Output port type
 
 
 
  [output.output3] # Output port number, the format is output.output[N]
 
 name = "Output3" # Output port name
 
 type = "float" # Output port type
 
 
我们的模型输入300x300大小的unit8图像,输出7个float类型的张量,我们只需要其中的检测框、置信度和物体类别,模型推理代码如下:
 # Copyright (c) Huawei Technologies Co., Ltd. 2022. All rights reserved.
 
 
 #!/usr/bin/env python
 
 # -*- coding: utf-8 -*-
 
 import _flowunit as modelbox
 
 import numpy as np
 
 import onnxruntime
 
 
 class ssd_inferFlowUnit(modelbox.FlowUnit):
 
 
      # Derived from modelbox.FlowUnit
 
 
      def __init__(self):
 
 
          super().__init__()
 
 
          self.ssdlite = onnxruntime.InferenceSession('model/ssdlite_mbv2.onnx')
 
 
 
      def open(self, config):
 
 
          # Open the flowunit to obtain configuration information
 
 
          self.net_h = config.get_int('net_h', 300)
 
 
          self.net_w = config.get_int('net_w', 300)
 
 
          return modelbox.Status.StatusCode.STATUS_SUCCESS
 
 
 
      def process(self, data_context):
 
 
          # Process the data
 
 
          in_data = data_context.input("Input")
 
 
          out_data1 = data_context.output("Output1")
 
 
          out_data2 = data_context.output("Output2")
 
 
          out_data3 = data_context.output("Output3")
 
 
 
          # ssd_infer process code.
 
 
          # Remove the following code and add your own code here.
 
 
          for buffer in in_data:
 
 
              # 将输入Buffer转换为numpy对象
 
 
              buffer = np.array(buffer.as_object(), copy=False)
 
 
              img = buffer.reshape((self.net_h, self.net_w, 3))
 
 
              data = np.expand_dims(img, axis=0)
 
 
              ssdlite_input ={self.ssdlite.get_inputs()[0].name: data}
 
 
              detection_boxes, detection_classes, _, detection_scores, _, _, _ = self.ssdlite.run(None, ssdlite_input)
 
 
              # 将业务处理返回的结果数据转换为Buffer
 
 
              boxes = modelbox.Buffer(self.get_bind_device(), detection_boxes)
 
 
              scores = modelbox.Buffer(self.get_bind_device(), detection_scores)
 
 
              classes = modelbox.Buffer(self.get_bind_device(), detection_classes)
 
 
              out_data1.push_back(boxes)
 
 
              out_data2.push_back(classes)
 
 
              out_data3.push_back(scores)
 
 
 
          return modelbox.Status.StatusCode.STATUS_SUCCESS
 
 
 
      def close(self):
 
 
          # Close the flowunit
 
 
          return modelbox.Status()
 
 
 
      def data_pre(self, data_context):
 
 
          # Before streaming data starts
 
 
          return modelbox.Status()
 
 
 
      def data_post(self, data_context):
 
 
          # After streaming data ends
 
 
          return modelbox.Status()
 
 
 
      def data_group_pre(self, data_context):
 
 
          # Before all streaming data starts
 
 
          return modelbox.Status()
 
 
 
      def data_group_post(self, data_context):
 
 
          # After all streaming data ends
 
 
          return modelbox.Status()
 
 
mbv2_224x224.onnx使用ModelBox内置推理功能单元进行推理,有1个输入和2个输出,模型配置文件编辑如下,无需编写推理代码。
 # Copyright (C) 2020 Huawei Technologies Co., Ltd. All rights reserved.
 
 
 [base]
 
 name = "mbv2_infer"
 
 device = "cpu"
 
 version = "1.0.0"
 
 description = "your description"
 
 entry = "./mbv2_224x224.onnx"  # model file path, use relative path
 
 type = "inference" 
 
 virtual_type = "onnx" # inference engine type: win10 now only support onnx
 
 group_type = "Inference"  # flowunit group attribution, do not change
 
 
 # Input ports description
 
 [input]
 
 [input.input1]  # input port number, Format is input.input[N]
 
 name = "Input"  # input port name
 
 type = "float"  # input port data type ,e.g. float or uint8
 
 device = "cpu"  # input buffer type: cpu, win10 now copy input from cpu
 
 
 # Output ports description
 
 [output]
 
 [output.output1] # output port number, Format is output.output[N]
 
 name = "Output1"  # output port name
 
 type = "float"   # output port data type ,e.g. float or uint8
 
 
 [output.output2] # output port number, Format is output.output[N]
 
 name = "Output2"  # output port name
 
 type = "float"   # output port data type ,e.g. float or uint8
 
 
3、查看条件功能单元
a、设置条件功能单元属性:
 # Copyright (c) Huawei Technologies Co., Ltd. 2022. All rights reserved.
 
 
 # Basic config
 
 
  [base]
 
 name = "head_condition" # The FlowUnit name
 
 device = "cpu" # The flowunit runs on cpu
 
 version = "1.0.0" # The version of the flowunit
 
 type = "python" # Fixed value, do not change
 
 description = "description" # The description of the flowunit
 
 entry = "head_condition@head_conditionFlowUnit" # Python flowunit entry function
 
 group_type = "Generic"  # flowunit group attribution, change as Input/Output/Image/Generic ...
 
 
 # Flowunit Type
 
 stream = false # Whether the flowunit is a stream flowunit
 
 condition = true # Whether the flowunit is a condition flowunit
 
 collapse = false # Whether the flowunit is a collapse flowunit
 
 collapse_all = false # Whether the flowunit will collapse all the data
 
 expand = false #  Whether the flowunit is a expand flowunit
 
 
 # The default Flowunit config
 
 
  [config]
 
 item = "value"
 
 
 # Input ports description
 
 
  [input]
 
 
  [input.input1] # 输入数据1
 
 name = "in_image" # 原图
 
 type = "uint8"    # 原图数据格式为 uint8
 
 
 
  [input.input2] # 输入数据2
 
 name = "in_bbox" # 头部和手机检测框
 
 type = "string"  # 检测框数据格式为json字符串
 
 
 # Output ports description
 
 
  [output]
 
 
  [output.output1] # 检测到头部时的输出数据
 
 name = "has_head" # 原图 + 头部和手机检测框(检测框以属性方式附加在原图上)
 
 type = "uint8"    # 原图数据格式为 uint8
 
 
 
  [output.output2] # 未检测到头部时的输出数据
 
 name = "no_head" # 原图
 
 type = "uint8"   # 原图数据格式为 uint8
 
 
与通用功能单元不同的是,condition属性设置为true,且一定有多个输出端口,head_condition条件功能单元有两个输出端口has_head和no_head,分别代表检测到头部和未检测到头部的两个分支,下图展示了条件功能单元的数据流图:

b、实现条件功能单元接口:
条件功能单元process接口逻辑与通用功能单元基本一致,只是在最后输出Buffer上有所不同,逻辑代码如下:
 # Copyright (c) Huawei Technologies Co., Ltd. 2022. All rights reserved.
 
 
 #!/usr/bin/env python
 
 # -*- coding: utf-8 -*-
 
 import _flowunit as modelbox
 
 import numpy as np
 
 import json
 
 
 class head_conditionFlowUnit(modelbox.FlowUnit):
 
 
      # Derived from modelbox.FlowUnit
 
 
      def __init__(self):
 
 
          super().__init__()
 
 
 
      def open(self, config):
 
 
          # Open the flowunit to obtain configuration information
 
 
          return modelbox.Status.StatusCode.STATUS_SUCCESS
 
 
 
      def process(self, data_context):
 
 
          # Process the data
 
 
          in_image = data_context.input("in_image")
 
 
          in_bbox = data_context.input("in_bbox")
 
 
 
          has_head = data_context.output("has_head")
 
 
          no_head = data_context.output("no_head")
 
 
 
          # face_condition process code.
 
 
          # Remove the following code and add your own code here.
 
 
          # 循环处理每一个输入Buffer数据(实际上条件功能单元的batch size为1,此处循环中只有1条数据)
 
 
          for buffer_img, buffer_bbox in zip(in_image, in_bbox):
 
 
              # 获取输入Buffer的属性信息
 
 
              width = buffer_img.get('width')
 
 
              height = buffer_img.get('height')
 
 
              channel = buffer_img.get('channel')
 
 
 
              # 将输入Buffer转换为numpy对象
 
 
              img_data = np.array(buffer_img.as_object(), copy=False)
 
 
              img_data = img_data.reshape((height, width, channel))
 
 
 
              # 字符串数据可以直接用as_object函数转换
 
 
              bbox_str = buffer_bbox.as_object()
 
 
 
              # 解码出头部检测框数据
 
 
              head_bboxes = self.decode_head_bboxes(bbox_str)
 
 
              # 解码出手机检测框数据
 
 
              phone_bboxes = self.decode_phone_bboxes(bbox_str)
 
 
              if phone_bboxes:
 
 
                  max_roi = self.get_max_roi_phone(phone_bboxes, img_data)
 
 
                  buffer_img.set("bbox_phone", max_roi)
 
 
              else:
 
 
                  buffer_img.set("bbox_phone", 0)
 
 
 
              # 此处是将输入Buffer直接作为输出Buffer向后传递
 
 
              # 此时Buffer的Data、Meta等全部内容都将保留,无需构建Buffer、设置Meta
 
 
              if head_bboxes:  # 检测到头部时的输出分支
 
 
                  max_roi = self.get_max_roi_head(head_bboxes, img_data)
 
 
                  buffer_img.set("bbox", max_roi)   # 将头部检测框作为属性附在输出Buffer上
 
 
                  has_head.push_back(buffer_img)
 
 
              else:            # 未检测到头部时的输出分支
 
 
                  no_head.push_back(buffer_img)
 
 
 
          # 返回成功标志,ModelBox框架会将数据发送到后续的功能单元
 
 
          return modelbox.Status.StatusCode.STATUS_SUCCESS
 
 
 
      def decode_head_bboxes(self, bbox_str):
 
 
          """从json字符串中解码出头部检测框"""
 
 
          try:
 
 
              hand_labels = [1]  # 头部对应的类别号是 1
 
 
              det_result = json.loads(bbox_str)['det_result']
 
 
              if det_result == "None":
 
 
                  return []
 
 
              bboxes = json.loads(det_result)
 
 
              hand_bboxes = list(filter(lambda x: int(x[5]) in hand_labels, bboxes))
 
 
          except Exception as ex:
 
 
              modelbox.error(str(ex))
 
 
              return []
 
 
          else:
 
 
              return hand_bboxes
 
 
         
 
 
      def decode_phone_bboxes(self, bbox_str):
 
 
          """从json字符串中解码出头部检测框"""
 
 
          try:
 
 
              phone_labels = [2]  # 手机对应的类别号是 1
 
 
              det_result = json.loads(bbox_str)['det_result']
 
 
              if det_result == "None":
 
 
                  return []
 
 
              bboxes = json.loads(det_result)
 
 
              phone_bboxes = list(filter(lambda x: int(x[5]) in phone_labels, bboxes))
 
 
          except Exception as ex:
 
 
              modelbox.error(str(ex))
 
 
              return []
 
 
          else:
 
 
              return phone_bboxes
 
 
         
 
 
      def get_max_roi_head(self, bboxes, img_data, scale=1):
 
 
          """找出roi最大的头部检测框"""
 
 
          max_bbox = max(bboxes, key = lambda x: (x[2] - x[0]) * (x[3] - x[1]) * x[4])
 
 
 
          # 原始检测框数据归一化到[0,1],此处需还原到原图中的坐标
 
 
          img_h, img_w, _ = img_data.shape
 
 
          x1, y1, x2, y2, _, _ = max_bbox
 
 
          x1 = int(x1 * img_w)
 
 
          y1 = int(y1 * img_h)
 
 
          x2 = int(x2 * img_w)
 
 
          y2 = int(y2 * img_h)
 
 
         
 
 
          # 检测框进行一定的外扩
 
 
          w_ = max(abs(x2 - x1), abs(y2 - y1))
 
 
          w_ = w_ * scale
 
 
          x_mid = (x1 + x2) / 2
 
 
          y_mid = (y1 + y2) / 2
 
 
          new_x1 = max(0, int(x_mid - w_ / 2))
 
 
          new_y1 = max(0, int(y_mid - w_ / 2))
 
 
          new_x2 = min(int(x_mid + w_ / 2), img_w)
 
 
          new_y2 = min(int(y_mid + w_ / 2), img_h)
 
 
          new_bbox = [new_x1, new_y1, new_x2, new_y2]
 
 
          return new_bbox
 
 
     
 
 
      def get_max_roi_phone(self, bboxes, img_data):
 
 
          """找出roi最大的手机检测框"""
 
 
          max_bbox = max(bboxes, key = lambda x: (x[2] - x[0]) * (x[3] - x[1]) * x[4])
 
 
 
          # 原始检测框数据归一化到[0,1],此处需还原到原图中的坐标
 
 
          img_h, img_w, _ = img_data.shape
 
 
          x1, y1, x2, y2, _, _ = max_bbox
 
 
          x1 = int(x1 * img_w)
 
 
          y1 = int(y1 * img_h)
 
 
          x2 = int(x2 * img_w)
 
 
          y2 = int(y2 * img_h)
 
 
          new_bbox = [x1, y1, x2, y2]
 
 
          return new_bbox
 
 
     
 
 
      def close(self):
 
 
          # Close the flowunit
 
 
          return modelbox.Status()
 
 
 
      def data_pre(self, data_context):
 
 
          # Before streaming data starts
 
 
          return modelbox.Status()
 
 
 
      def data_post(self, data_context):
 
 
          # After streaming data ends
 
 
          return modelbox.Status()
 
 
 
      def data_group_pre(self, data_context):
 
 
          # Before all streaming data starts
 
 
          return modelbox.Status()
 
 
 
      def data_group_post(self, data_context):
 
 
          # After all streaming data ends
 
 
          return modelbox.Status()
 
 
4、查看绘图功能单元
绘图功能单元是一个通用功能单元,通用功能单元在处理完数据后,会产生一个或多个输出,后续功能单元在承接时,必须接收它的所有数据(当然可以接收后只处理其中一部分输出);另外,通用功能单元一次可以处理多个Buffer(即batch size>1),数据按batch size设置分成多组,多组数据并发处理,生成结果与输入一致,下图展示了通用功能单元的数据流图:

配置文件内容如下:
 # Copyright (c) Huawei Technologies Co., Ltd. 2022. All rights reserved.
 
 
 # Basic config
 
 
  [base]
 
 name = "ssd_mbv2_draw" # The FlowUnit name
 
 device = "cpu" # The flowunit runs on cpu
 
 version = "1.0.0" # The version of the flowunit
 
 type = "python" # Fixed value, do not change
 
 description = "description" # The description of the flowunit
 
 entry = "ssd_mbv2_draw@ssd_mbv2_drawFlowUnit" # Python flowunit entry function
 
 group_type = "Generic"  # flowunit group attribution, change as Input/Output/Image/Generic ...
 
 
 # Flowunit Type
 
 stream = false # Whether the flowunit is a stream flowunit
 
 condition = false # Whether the flowunit is a condition flowunit
 
 collapse = false # Whether the flowunit is a collapse flowunit
 
 collapse_all = false # Whether the flowunit will collapse all the data
 
 expand = false #  Whether the flowunit is a expand flowunit
 
 
 # The default Flowunit config
 
 
  [config]
 
 item = "value"
 
 
 # Input ports description
 
 # 输入端口描述
 
 
  [input]
 
 
  [input.input1] # 输入数据1:原图 + 头部和手机检测框(检测框以属性方式附加在原图上)
 
 name = "in_image"
 
 type = "uint8"
 
 
 
  [input.input2] # 输入数据2:转为json字符串格式的手部关键点数据
 
 name = "in_landmarks"
 
 type = "string"
 
 
 # Output ports description
 
 
  [output]
 
 
  [output.output1] # Output port number, the format is output.output[N]
 
 name = "out_image" # Output port name
 
 type = "uint8" # Output port type
 
 
补充功能单元逻辑代码:
 # Copyright (c) Huawei Technologies Co., Ltd. 2022. All rights reserved.
 
 
 #!/usr/bin/env python
 
 # -*- coding: utf-8 -*-
 
 import _flowunit as modelbox
 
 from scipy.spatial import distance as dist
 
 from PIL import Image, ImageDraw, ImageFont
 
 import numpy as np
 
 import json
 
 import cv2
 
 
 class ssd_mbv2_drawFlowUnit(modelbox.FlowUnit):
 
 
      # Derived from modelbox.FlowUnit
 
 
      def __init__(self):
 
 
          super().__init__()
 
 
          self.EAR_THRESH = 0.2
 
 
          self.queue_eye = [0] * 10
 
 
          self.queue_phone = [0] * 10
 
 
          self.queue_mouth = [0] * 10
 
 
          self.queue_look_ground = [0] * 10
 
 
          self.font = cv2.FONT_HERSHEY_PLAIN
 
 
 
      def open(self, config):
 
 
          # Open the flowunit to obtain configuration information
 
 
          return modelbox.Status.StatusCode.STATUS_SUCCESS
 
 
 
      def process(self, data_context):
 
 
          # Process the data
 
 
          # 从DataContext中获取输入输出BufferList对象
 
 
          in_image = data_context.input("in_image")
 
 
          in_landmarks = data_context.input("in_landmarks")
 
 
          out_image = data_context.output("out_image")
 
 
 
          # ssd_mbv2_draw process code.
 
 
          # Remove the following code and add your own code here.
 
 
          # 循环处理每一个输入Buffer数据
 
 
          for buffer_img, buffer_landmarks in zip(in_image, in_landmarks):
 
 
              self.queue_eye.pop(0)
 
 
              self.queue_phone.pop(0)
 
 
              self.queue_mouth.pop(0)
 
 
              self.queue_look_ground.pop(0)
 
 
              # 获取输入图像Buffer的宽、高、通道数等属性信息
 
 
              width = buffer_img.get('width')
 
 
              height = buffer_img.get('height')
 
 
              channel = buffer_img.get('channel')
 
 
 
              # 将输入Buffer转换为numpy对象
 
 
              img_data = np.array(buffer_img.as_object(), dtype=np.uint8, copy=False)
 
 
              img_data = img_data.reshape(height, width, channel)
 
 
 
              max_bbox = buffer_img.get("bbox")
 
 
              phone_bbox = buffer_img.get("bbox_phone")
 
 
             
 
 
              # 将输入Buffer转换为字符串,从json字符串中解码出手部关键点数据
 
 
              landmarks_str = buffer_landmarks.as_object()
 
 
              landmarks = self.decode_landmarks(landmarks_str)
 
 
              confidence = landmarks[0]*100
 
 
              # 业务处理:将最大的人形检测框对应的手部关键点数据画在图上
 
 
              img_out = img_data.copy()
 
 
              xmin, ymin, xmax, ymax = max_bbox[0], max_bbox[1], max_bbox[2], max_bbox[3]
 
 
              cv2.rectangle(img_out,pt1=(xmin,ymin),pt2=(xmax,ymax),color=(0,255,0),thickness=10)
 
 
              cv2.putText(img_out,text='has_face:'+str(confidence)+'%',org=(xmin,ymin-10),fontFace=self.font,fontScale=5,color=(0,255,0),thickness=5,lineType=cv2.LINE_AA)
 
 
 
              if landmarks[0]==1:
 
 
                  self.queue_look_ground.append(0)
 
 
                  points = np.array(landmarks[1:])
 
 
                  for x, y in points.reshape(-1, 2):
 
 
                      x = int(x*(xmax-xmin))
 
 
                      y = int(y*(ymax-ymin))
 
 
                      cv2.circle(img_out,(x+xmin,y+ymin),3,(0,255,0),-1)
 
 
                  l_ear = self.eye_aspect_ratio(points[:12].reshape(-1, 2))
 
 
                  r_ear = self.eye_aspect_ratio(points[12:24].reshape(-1, 2))
 
 
                  m_ear = self.eye_aspect_ratio(points[24:36].reshape(-1, 2))
 
 
                  if l_ear<self.EAR_THRESH and l_ear<self.EAR_THRESH:
 
 
                      self.queue_eye.append(1)
 
 
                  else:
 
 
                      self.queue_eye.append(0)
 
 
                  if m_ear>self.EAR_THRESH:
 
 
                      self.queue_mouth.append(1)
 
 
                  else:
 
 
                      self.queue_mouth.append(0)
 
 
                  cv2.putText(img_out,text='eye_l_ear:'+str(np.array(l_ear).round(2)),org=(xmin+20,ymin+60),fontFace=self.font,fontScale=4,color=(0,255,0),thickness=4,lineType=cv2.LINE_AA)
 
 
                  cv2.putText(img_out,text='eye_r_ear:'+str(np.array(r_ear).round(2)),org=(xmin+20,ymin+120),fontFace=self.font,fontScale=4,color=(0,255,0),thickness=4,lineType=cv2.LINE_AA)
 
 
                  cv2.putText(img_out,text='mouth_ear:'+str(np.array(m_ear).round(2)),org=(xmin+20,ymin+180),fontFace=self.font,fontScale=4,color=(0,255,0),thickness=4,lineType=cv2.LINE_AA)
 
 
              else:
 
 
                  self.queue_eye.append(0)
 
 
                  self.queue_mouth.append(0)
 
 
                  self.queue_look_ground.append(1)
 
 
             
 
 
              if phone_bbox:
 
 
                  x1, y1, x2, y2 = phone_bbox[0], phone_bbox[1], phone_bbox[2], phone_bbox[3]
 
 
                  cv2.rectangle(img_out,pt1=(x1,y1),pt2=(x2,y2),color=(255,0,0),thickness=10)
 
 
                  ratio = self.cover_ratio(max_bbox, phone_bbox)
 
 
                  if ratio>0.5:
 
 
                      self.queue_phone.append(1)
 
 
                  else:
 
 
                      self.queue_phone.append(0)
 
 
              else:
 
 
                  self.queue_phone.append(0)
 
 
 
              yawn_score = sum(self.queue_mouth)/len(self.queue_mouth)
 
 
              phone_score = sum(self.queue_phone)/len(self.queue_phone)
 
 
              sleeping_score = sum(self.queue_eye)/len(self.queue_eye)
 
 
              look_ground_score = sum(self.queue_look_ground)/len(self.queue_look_ground)
 
 
              name_list = ['打哈欠','打电话','打瞌睡','左顾右盼']
 
 
              score_list = [yawn_score, phone_score, sleeping_score, look_ground_score]
 
 
              queue_list = [self.queue_mouth, self.queue_phone, self.queue_eye, self.queue_look_ground]
 
 
              sorted_id = sorted(range(len(score_list)), key=lambda k: score_list[k], reverse=True)
 
 
              max_score_id = np.argmax(score_list)
 
 
 
              img_out = self.cv2AddChineseText(img_out, 'AI练习生原创作品', (600,50), textColor=(0, 255, 0), textSize=50)
 
 
              if score_list[max_score_id]>0.9:
 
 
                  img_out = self.cv2AddChineseText(img_out, '驾驶行为:'+name_list[max_score_id], (400,400), textColor=(0, 255, 0), textSize=50)
 
 
              else:
 
 
                  img_out = self.cv2AddChineseText(img_out, '驾驶行为:'+'正常驾驶', (400,400), textColor=(0, 255, 0), textSize=50)
 
 
 
              img_out = self.cv2AddChineseText(img_out, name_list[sorted_id[0]]+str(queue_list[sorted_id[0]])+':'+str(np.array(score_list[sorted_id[0]]*100).round(2))+'%', (20,600), textColor=(0, 255, 0), textSize=40)
 
 
              img_out = self.cv2AddChineseText(img_out, name_list[sorted_id[1]]+str(queue_list[sorted_id[1]])+':'+str(np.array(score_list[sorted_id[1]]*100).round(2))+'%', (20,700), textColor=(0, 255, 0), textSize=40)
 
 
              img_out = self.cv2AddChineseText(img_out, name_list[sorted_id[2]]+str(queue_list[sorted_id[2]])+':'+str(np.array(score_list[sorted_id[2]]*100).round(2))+'%', (20,800), textColor=(0, 255, 0), textSize=40)
 
 
              img_out = self.cv2AddChineseText(img_out, name_list[sorted_id[3]]+str(queue_list[sorted_id[3]])+':'+str(np.array(score_list[sorted_id[3]]*100).round(2))+'%', (20,900), textColor=(0, 255, 0), textSize=40)
 
 
 
              # 将业务处理返回的结果数据转换为Buffer
 
 
              out_buffer = modelbox.Buffer(self.get_bind_device(), img_out)
 
 
 
              # 设置输出Buffer的Meta信息,此处直接拷贝输入Buffer的Meta信息
 
 
              out_buffer.copy_meta(buffer_img)
 
 
 
              # 将输出Buffer放入输出BufferList中
 
 
              out_image.push_back(out_buffer)
 
 
 
          # 返回成功标志,ModelBox框架会将数据发送到后续的功能单元
 
 
          return modelbox.Status.StatusCode.STATUS_SUCCESS
 
 
 
      def decode_landmarks(self, landmarks_str):
 
 
          """从json字符串中解码出人脸关键点数据"""
 
 
          try:
 
 
              landmarks_data = json.loads(landmarks_str)
 
 
              landmarks_list = json.loads(landmarks_data['landmarks_result'])
 
 
          except Exception as ex:
 
 
              modelbox.error(str(ex))
 
 
              return []
 
 
          else:
 
 
              return landmarks_list
 
 
         
 
 
      def cover_ratio(self, box1, box2):
 
 
          '''计算两个矩形框的IOU与box2区域的比值'''
 
 
          def _overlap(x1, x2, x3, x4):
 
 
              left = max(x1, x3)
 
 
              right = min(x2, x4)
 
 
              return right - left
 
 
 
          w = _overlap(box1[0], box1[2], box2[0], box2[2])
 
 
          h = _overlap(box1[1], box1[3], box2[1], box2[3])
 
 
          if w <= 0 or h <= 0:
 
 
              return 0
 
 
          inter_area = w * h
 
 
          small_area = (box2[2] - box2[0]) * (box2[3] - box2[1])
 
 
          return inter_area * 1.0 / small_area
 
 
         
 
 
      def eye_aspect_ratio(self, eye):
 
 
          '''计算出EAR'''
 
 
          # compute the euclidean distances between the two sets of
 
 
          # vertical eye landmarks (x, y)-coordinates
 
 
          A = dist.euclidean(eye[1], eye[5])
 
 
          B = dist.euclidean(eye[2], eye[4])
 
 
 
          # compute the euclidean distance between the horizontal
 
 
          # eye landmark (x, y)-coordinates
 
 
          C = dist.euclidean(eye[0], eye[3])
 
 
 
          # compute the eye aspect ratio
 
 
          ear = (A + B) / (2.0 * C)
 
 
 
          # return the eye aspect ratio
 
 
          return ear
 
 
     
 
 
      def cv2AddChineseText(self, img, text, position, textColor=(0, 255, 0), textSize=30):
 
 
          '''绘制中文'''
 
 
          img = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
 
 
          draw = ImageDraw.Draw(img)
 
 
          # 字体的格式
 
 
          fontStyle = ImageFont.truetype(
 
 
              "./font/simsun.ttc", textSize, encoding="utf-8")
 
 
          # 绘制文本
 
 
          draw.text(position, text, textColor, font=fontStyle)
 
 
          # 转换回OpenCV格式
 
 
          return cv2.cvtColor(np.asarray(img), cv2.COLOR_RGB2BGR)
 
 
 
      def close(self):
 
 
          # Close the flowunit
 
 
          return modelbox.Status()
 
 
 
      def data_pre(self, data_context):
 
 
          # Before streaming data starts
 
 
          return modelbox.Status()
 
 
 
      def data_post(self, data_context):
 
 
          # After streaming data ends
 
 
          return modelbox.Status()
 
 
 
      def data_group_pre(self, data_context):
 
 
          # Before all streaming data starts
 
 
          return modelbox.Status()
 
 
 
      def data_group_post(self, data_context):
 
 
          # After all streaming data ends
 
 
          return modelbox.Status()
 
 
其余功能单元实现与前面类似。
5、查看ModelBox流程图
 # Copyright (C) 2020 Huawei Technologies Co., Ltd. All rights reserved.
 
 
 
  [driver]
 
 dir = ["${HILENS_APP_ROOT}/etc/flowunit",
 
 "${HILENS_APP_ROOT}/etc/flowunit/cpp",
 
 "${HILENS_APP_ROOT}/model",
 
 "${HILENS_MB_SDK_PATH}/flowunit"]
 
 skip-default = true
 
 
  [profile]
 
 profile=false
 
 trace=false
 
 dir="${HILENS_DATA_DIR}/mb_profile"
 
 
  [graph]
 
 format = "graphviz"
 
 graphconf = """digraph Fatigue_driving_detection {
 
     node [shape=Mrecord]
 
     queue_size = 1
 
     batch_size = 1
 
 
     # 定义节点,即功能单元及其属性
 
     input1[type=input, flowunit=input, device=cpu, deviceid=0]
 
     data_source_parser[type=flowunit, flowunit=data_source_parser, device=cpu, deviceid=0]
 
     video_demuxer[type=flowunit, flowunit=video_demuxer, device=cpu, deviceid=0]
 
     video_decoder[type=flowunit, flowunit=video_decoder, device=cpu, deviceid=0, pix_fmt=rgb]
 
     image_resize[type=flowunit, flowunit=resize, device=cpu, deviceid=0, image_width=300, image_height=300]
 
     ssd_infer[type=flowunit, flowunit=ssd_infer, device=cpu, deviceid=0]
 
     ssd_post[type=flowunit, flowunit=ssd_post, device=cpu, deviceid=0]
 
     head_condition[type=flowunit, flowunit=head_condition, device=cpu, deviceid=0]
 
     crop_head_image[type=flowunit, flowunit=crop_head_image, device=cpu, deviceid=0]
 
     image_resize2[type=flowunit, flowunit=resize, device=cpu, deviceid=0, image_width=224, image_height=224]
 
     normalize[type=flowunit flowunit=normalize device=cpu deviceid="0" standard_deviation_inverse="0.003921568627450,0.003921568627450,0.003921568627450"]
 
     mbv2_infer[type=flowunit, flowunit=mbv2_infer, device=cpu, deviceid=0]
 
     landmarks_post[type=flowunit, flowunit=landmarks_post, device=cpu, deviceid=0]
 
     ssd_mbv2_draw[type=flowunit, flowunit=ssd_mbv2_draw, device=cpu, deviceid=0]
 
     video_out[type=flowunit, flowunit=video_out, device=cpu, deviceid=0]
 
 
     # 定义边,即功能间的数据传递关系
 
     input1:input -> data_source_parser:in_data
 
     data_source_parser:out_video_url -> video_demuxer:in_video_url
 
     video_demuxer:out_video_packet -> video_decoder:in_video_packet
 
     video_decoder:out_video_frame -> image_resize:in_image
 
     image_resize:out_image -> ssd_infer:Input
 
     ssd_infer:Output1 -> ssd_post:in_feat1
 
     ssd_infer:Output2 -> ssd_post:in_feat2
 
     ssd_infer:Output3 -> ssd_post:in_feat3
 
     ssd_post:out_feat -> head_condition:in_bbox
 
     video_decoder:out_video_frame -> head_condition:in_image
 
     head_condition:no_head -> video_out:in_video_frame
 
     head_condition:has_head -> crop_head_image:in_data
 
     crop_head_image:roi_image -> image_resize2:in_image
 
     image_resize2:out_image -> normalize:in_data
 
     normalize:out_data -> mbv2_infer:Input
 
     mbv2_infer:Output1 -> landmarks_post:in_feat1
 
     mbv2_infer:Output2 -> landmarks_post:in_feat2
 
     landmarks_post:out_data -> ssd_mbv2_draw:in_landmarks
 
     head_condition:has_head -> ssd_mbv2_draw:in_image
 
     ssd_mbv2_draw:out_image -> video_out:in_video_frame
 
 }"""
 
 
  [flow]
 
 desc = "Fatigue_driving_detection run in modelbox-win10-x64"
 
 
可以看到条件功能单元head_condition的两个输出分别对接到不同的功能单元,在未检测到头部时,no_head分支直接对接到video_out进行视频编码;检测到头部时,has_head分支对接到crop_head_image以及之后的功能单元做人脸识别以及关键点检测。
6、运行应用
运行前需要安装onnxruntime、pillow以及scipy,切换到Fatigue_driving_detection目录下执行bin/main.bat运行应用,效果如下:

也可以更换其它视频进行测试。
案例小结
本案例从模型训练到应用开发全流程自主设计,实现了5种驾驶行为的实时识别,具备实际部署落地的可行性,关键点检测采用直接回归的方式精确度不高,后续将采用Heatmap的方式进一步提高识别的准确性。
参考文章:
ModelBox AI应用开发——条件功能单元
         
        
评论(0)