RK3588部署CNN-LSTM驾驶行为识别模型

举报
HouYanSong 发表于 2025/12/02 19:13:35 2025/12/02
【摘要】 CNN(卷积神经网络)擅长提取图像的空间特征,LSTM(长短期记忆网络)则擅长处理序列数据的时间特征。首先使用CNN提取视频每一帧特征,之后将提取出的所有特征送入LSTM捕捉视频中的时空特征并对视频特征序列进行分类,实现正常驾驶、闭眼、打哈欠、打电话、左顾右盼5种驾驶行为的识别。

RK3588部署CNN-LSTM驾驶行为识别模型


CNN(卷积神经网络)擅长提取图像的空间特征,LSTM(长短期记忆网络)则擅长处理序列数据的时间特征。首先使用CNN提取视频每一帧特征,之后将提取出的所有特征送入LSTM捕捉视频中的时空特征并对视频特征序列进行分类,实现正常驾驶、闭眼、打哈欠、打电话、左顾右盼5种驾驶行为的识别。

一. 模型训练


我们在ModelArts创建Notebook完成模型的训练,使用规格是GPU: 1*Pnt1(16GB)|CPU: 8核 64GB,镜像为tensorflow_2.1.0-cuda_10.1-py_3.7-ubuntu_18.04,首先下载数据集:

import os
import moxing as mox
    
if not os.path.exists('fatigue_driving'):
    mox.file.copy_parallel('obs://modelbox-course/fatigue_driving', 'fatigue_driving')
    
if not os.path.exists('rknn_toolkit2-2.3.2-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl'):
    mox.file.copy_parallel('obs://modelbox-course/rknn_toolkit2-2.3.2-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl', 
                           'rknn_toolkit2-2.3.2-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl')

该数据集包含1525段视频,总共有5个类别:0:正常驾驶1:闭眼2:打哈欠3:打电话4:左顾右盼

我们从原视频中裁剪出主驾驶位画面,并将画面缩放到特征提取网络的输入大小:

def crop_driving_square(frame):
    h, w = frame.shape[:2]
    start_x = w // 2 
    end_x = w
    start_y = 0
    end_y = h
    return frame[start_y:end_y, start_x:end_x]


使用在imagenet上预训练的MobileNetV2网络作为卷积基创建并保存图像特征提取器:

def get_feature_extractor():
    feature_extractor = keras.applications.mobilenet_v2.MobileNetV2(
        weights = 'imagenet',
        include_top = False,
        pooling = 'avg',
        input_shape = (IMG_SIZE, IMG_SIZE, 3)
    )
    
    preprocess_input = keras.applications.mobilenet_v2.preprocess_input
    
    inputs = keras.Input((IMG_SIZE, IMG_SIZE, 3))
    preprocessed = preprocess_input(inputs)
    outputs = feature_extractor(preprocessed)
    
    model = keras.Model(inputs, outputs, name = 'feature_extractor')
    
    return model

feature_extractor = get_feature_extractor()
feature_extractor.save('feature_extractor')
feature_extractor.summary()
Model: "feature_extractor"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
=================================================================
input_2 (InputLayer)         [(None, 256, 256, 3)]     0         
_________________________________________________________________
tf_op_layer_truediv (TensorF [(None, 256, 256, 3)]     0         
_________________________________________________________________
tf_op_layer_sub (TensorFlowO [(None, 256, 256, 3)]     0         
_________________________________________________________________
mobilenetv2_1.00_224 (Model) (None, 1280)              2257984   
=================================================================
Total params: 2,257,984
Trainable params: 2,223,872
Non-trainable params: 34,112

设置网络的输入大小为256x256,每隔6帧截取一帧提取视频的图像特征,特征向量的大小为1280,最终得到每个视频的特征序列,序列的最大长度为40,不足用0补齐:

def load_video(file_name):
    cap = cv2.VideoCapture(file_name) 
    frame_interval = 6
    frames = []
    count = 0
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        if count % frame_interval == 0:
            frame = crop_driving_square(frame)
            frame = cv2.resize(frame, (IMG_SIZE, IMG_SIZE))
            frame = frame[:, :, [2, 1, 0]]
            frames.append(frame)
        count += 1
    return np.array(frames)  

def load_data(videos, labels):
    video_features = []
    for video in tqdm(videos):
        frames = load_video(video)
        counts = len(frames)
        # 如果帧数小于MAX_SEQUENCE_LENGTH
        if counts < MAX_SEQUENCE_LENGTH:
            # 补白
            diff = MAX_SEQUENCE_LENGTH - counts
            # 创建全0的numpy数组
            padding = np.zeros((diff, IMG_SIZE, IMG_SIZE, 3))
            # 数组拼接
            frames = np.concatenate((frames, padding))
        # 获取前MAX_SEQUENCE_LENGTH帧画面
        frames = frames[:MAX_SEQUENCE_LENGTH, :]
        # 批量提取图像特征
        video_feature = feature_extractor.predict(frames)
        video_features.append(video_feature)
        
    return np.array(video_features), np.array(labels)

video_features, classes = load_data(videos, labels)
video_features.shape, classes.shape
((1525, 40, 1280), (1525,))

总共提取了1525个视频的特征序列,按照8:2的比例划分训练集和测试集(batchsize的大小设为16):

batch_size = 16

dataset = tf.data.Dataset.from_tensor_slices((video_features, classes))

dataset = dataset.shuffle(len(videos))

test_count = int(len(videos) * 0.2)
train_count = len(videos) - test_count

dataset_train = dataset.skip(test_count).cache().repeat()
dataset_test = dataset.take(test_count).cache().repeat()

train_dataset = dataset_train.shuffle(train_count).batch(batch_size)
test_dataset = dataset_test.shuffle(test_count).batch(batch_size)

train_dataset, train_count, test_dataset, test_count
(<BatchDataset shapes: ((None, 40, 1280), (None,)), types: (tf.float32, tf.int64)>,
 1220,
 <BatchDataset shapes: ((None, 40, 1280), (None,)), types: (tf.float32, tf.int64)>,
 305)

之后创建LSTM提取视频特征序列的时间信息送入Dense分类器,模型的定义如下:

def video_cls_model(class_vocab):
    # 类别数量
    classes_num = len(class_vocab)
    # 定义模型
    model = keras.Sequential([
        layers.Input(shape=(MAX_SEQUENCE_LENGTH, NUM_FEATURES)),
        layers.LSTM(64, return_sequences=True),
        layers.Flatten(),
        layers.Dense(classes_num, activation='softmax')
    ])
    # 编译模型
    model.compile(optimizer = keras.optimizers.Adam(1e-5), 
                  loss = keras.losses.SparseCategoricalCrossentropy(from_logits=False),
                  metrics = ['accuracy']
    )
    
    return model


# 模型实例化
model = video_cls_model(np.unique(labels))
# 保存检查点
checkpoint = keras.callbacks.ModelCheckpoint(filepath='best.h5', monitor='val_loss', save_weights_only=True, save_best_only=True, verbose=1, mode='min')
# 模型结构
model.summary()

网络的输入大小为(N, 40, 1280),使用softmax进行激活,输出5个类别的概率:

Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
=================================================================
lstm (LSTM)                  (None, 40, 64)            344320    
_________________________________________________________________
flatten (Flatten)            (None, 2560)              0         
_________________________________________________________________
dense (Dense)                (None, 5)                 12805     
=================================================================
Total params: 357,125
Trainable params: 357,125
Non-trainable params: 0
_________________________________________________________________

实验表明模型训练300Epoch基本收敛:

history = model.fit(train_dataset,
                    epochs = 300,
                    steps_per_epoch = train_count // batch_size, 
                    validation_steps = test_count // batch_size, 
                    validation_data = test_dataset,
                    callbacks=[checkpoint])

plt.plot(history.epoch, history.history['loss'], 'r', label='loss')
plt.plot(history.epoch, history.history['val_loss'], 'g--', label='val_loss')
plt.title('LSTM')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()

plt.plot(history.epoch, history.history['accuracy'], 'r', label='acc')
plt.plot(history.epoch, history.history['val_accuracy'], 'g--', label='val_acc')
plt.title('LSTM')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()

加载模型最优权重,模型在测试集上的分类准确率为95.8%,保存为saved_model格式:

model.load_weights('best.h5')
model.evaluate(dataset.batch(batch_size))
model.save('saved_model')
96/96 [==============================] - 0s 5ms/step - loss: 0.2169 - accuracy: 0.9580
[0.21687692414949802, 0.9580328]

二、模型转换

首先将图像特征提取器feature_extractor转为tflite格式,并开启模型量化:

import tensorflow as tf

converter = tf.lite.TFLiteConverter.from_saved_model('feature_extractor')
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS, tf.lite.OpsSet.SELECT_TF_OPS]
converter.post_training_quantize = True  # 模型量化
tflite_model = converter.convert()

with open('mbv2.tflite', 'wb') as f:
  f.write(tflite_model)

再将视频序列分类模型转为onnx格式,由于lstm参数量较少,不需要进行量化:

python -m tf2onnx.convert --saved-model saved_model --output lstm.onnx --opset 12

最后导出RKNN格式的模型,可根据需要设置target_platformrk3568/rk3588

from rknn.api import RKNN

rknn = RKNN(verbose=False)
rknn.config(target_platform="rk3588")
rknn.load_tflite(model="mbv2.tflite")
rknn.build(do_quantization=False)  
rknn.export_rknn('mbv2.rknn')
rknn.release()

rknn = RKNN(verbose=False)
rknn.config(target_platform="rk3588")
rknn.load_onnx(
    model="lstm.onnx",
    inputs=['input_3'],              # 输入节点名称
    input_size_list=[[1, 40, 1280]]  # 固定输入尺寸
)
rknn.build(do_quantization=False)  
rknn.export_rknn('lstm.rknn')
rknn.release()

三、模型部署

我们在RK3588上部署MobileNetV2LSTM模型,以下是板侧的推理代码:

import os
import cv2
import glob
import shutil
import imageio
import numpy as np
from IPython.display import Image
from rknnlite.api import RKNNLite


MAX_SEQUENCE_LENGTH = 40

IMG_SIZE = 256

NUM_FEATURES = 1280


def crop_driving_square(img):
    h, w = img.shape[:2]
    
    start_x = w // 2 
    end_x = w
    
    start_y = 0
    end_y = h
    
    result = img[start_y:end_y, start_x:end_x]
    
    return result


def load_video(file_name):
    cap = cv2.VideoCapture(file_name) 
    # 每隔多少帧抽取一次
    frame_interval = 6
    frames = []
    count = 0
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        # 每隔frame_interval帧保存一次
        if count % frame_interval == 0:
            # 中心裁剪    
            frame = crop_driving_square(frame)
            # 缩放
            frame = cv2.resize(frame, (IMG_SIZE, IMG_SIZE))
            # BGR -> RGB  [0,1,2] -> [2,1,0]
            frame = frame[:, :, [2, 1, 0]]
            frames.append(frame)
        count += 1

    cap.release()
        
    return np.array(frames).astype(np.uint8)   


# 获取视频特征序列
def getVideoFeat(frames):
    
    frames_count = len(frames)
    
    # 如果帧数小于MAX_SEQUENCE_LENGTH
    if frames_count < MAX_SEQUENCE_LENGTH:
        # 补白
        diff = MAX_SEQUENCE_LENGTH - frames_count
        # 创建全0的numpy数组
        padding = np.zeros((diff, IMG_SIZE, IMG_SIZE, 3))
        # 数组拼接
        frames = np.concatenate((frames, padding))

    # 取前MAX_SEQ_LENGTH帧
    frames = frames[:MAX_SEQUENCE_LENGTH,:]
    frames = frames.astype(np.float32)

    # 提取视频每一帧特征
    feats = []
    for frame in frames:
        frame = np.expand_dims(frame, axis=0)
        result = rknn_lite_mbv2.inference(inputs=[frame])
        feats.append(result[0])
        
    return feats


rknn_lite_mbv2 = RKNNLite() 
rknn_lite_lstm = RKNNLite()
rknn_lite_mbv2.load_rknn('model/mbv2.rknn')
rknn_lite_lstm.load_rknn('model/lstm.rknn')
rknn_lite_mbv2.init_runtime(core_mask=RKNNLite.NPU_CORE_0_1_2)
rknn_lite_lstm.init_runtime(core_mask=RKNNLite.NPU_CORE_0_1_2)

files = glob.glob("video/*.mp4")
for video_path in files:
    label_to_name = {0:'正常驾驶', 1:'闭眼', 2:'打哈欠', 3:'打电话', 4:'左顾右盼'}
    
    frames = load_video(video_path)
    frames = frames[:MAX_SEQUENCE_LENGTH]
    
    imageio.mimsave('test.gif', frames, durations=10, loop=0)
    display(Image(open('test.gif', 'rb').read()))

    feats = getVideoFeat(frames)
    
    feats = np.concatenate(feats, axis=0)
    feats = np.expand_dims(feats, axis=0)
    preds = rknn_lite_lstm.inference(inputs=[feats])[0][0]
    for i in np.argsort(preds)[::-1][:5]:
        print('{}: {}%'.format(label_to_name[i], round(preds[i]*100, 2)))

rknn_lite_mbv2.release()
rknn_lite_lstm.release()

最终的视频识别效果如下:🚀

四、本文小结

本文详细阐述了基于RK3588平台的CNN-LSTM驾驶行为识别模型全流程,利用MobileNetV2提取图像的空间特征、LSTM处理视频的时序特征完成对正常驾驶、闭眼、打哈欠、打电话和左顾右盼5类驾驶行为的精准识别,在ModelArts上训练达到95.8%分类准确率,并分别将mbv2.tflitelstm.onnx转换为RKNN格式实现板侧的高效推理部署。

【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。