MindSpore 人脸识别系统实战
【摘要】 前言人脸识别是计算机视觉领域最成熟、应用最广泛的技术之一,从手机解锁到安防监控,从支付验证到门禁系统,无处不在。一个完整的人脸识别系统包含三个核心模块:人脸检测、人脸对齐、人脸识别,以及工业级应用必需的活体检测。本文将使用 MindSpore 框架,从零构建一个完整的人脸识别系统,涵盖:人脸检测(MTCNN 算法)人脸关键点对齐人脸识别(FaceNet/InsightFace 风格)活体检测...
前言
人脸识别是计算机视觉领域最成熟、应用最广泛的技术之一,从手机解锁到安防监控,从支付验证到门禁系统,无处不在。一个完整的人脸识别系统包含三个核心模块:人脸检测、人脸对齐、人脸识别,以及工业级应用必需的活体检测。
本文将使用 MindSpore 框架,从零构建一个完整的人脸识别系统,涵盖:
- 人脸检测(MTCNN 算法)
- 人脸关键点对齐
- 人脸识别(FaceNet/InsightFace 风格)
- 活体检测(静默活体)
- 完整工程化部署
一、人脸识别系统架构
1.1 系统流程
输入图像 → 人脸检测 → 人脸对齐 → 特征提取 → 活体检测 → 身份识别
1.2 各模块功能
| 模块 | 功能 | 输出 |
|---|---|---|
| 人脸检测 | 定位图像中所有人脸位置 | 边界框 (x, y, w, h) |
| 人脸对齐 | 根据关键点矫正人脸姿态 | 对齐后的人脸图像 |
| 特征提取 | 提取高维人脸特征向量 | 128/512 维特征 |
| 活体检测 | 判断是否为真人 | 真人/假体概率 |
| 身份识别 | 与数据库比对确定身份 | 身份 ID + 相似度 |
二、人脸检测 - MTCNN 实现
2.1 MTCNN 原理
MTCNN(Multi-task Cascaded Convolutional Networks)是一种级联的多任务卷积网络,包含三个阶段:
P-Net (Proposal Network):快速生成候选框
R-Net (Refine Network):精修候选框位置
O-Net (Output Network):输出最终检测结果和关键点
2.2 数据集准备
人脸识别系统涉及多种数据集,分别用于不同模块的训练和评估:
| 数据集 | 用途 | 规模 | 下载地址 |
|---|---|---|---|
| WIDER FACE | 人脸检测训练 | 32,203 张图像 / 393,703 张人脸 | 见官方渠道获取 |
| CASIA-WebFace | 人脸识别训练 | 10,575 人 / 494,414 张图像 | 见官方渠道获取 |
| LFW | 人脸识别评估 | 5,749 人 / 13,233 张图像 | 见官方渠道获取 |
| CASIA-SURF | 活体检测训练 | 1,000 人 / 21,000 段视频 | 见官方渠道获取 |
| CelebA | 人脸关键点/属性 | 200,000 张 / 5 个关键点 | 见官方渠道获取 |
2.2.1 WIDER FACE 数据集加载(人脸检测)
import os
import json
import numpy as np
import cv2
import mindspore.dataset as ds
class WIDERFACEDataset:
"""
WIDER FACE 数据集加载器
目录结构:
WIDER_FACE/
├── wider_face_train/
│ ├── 0--Parade/
│ │ ├── 0_Parade_marchingband_1_101.jpg
│ │ └── ...
│ └── ...
├── wider_face_val/
└── wider_face_test/
"""
def __init__(self, root_dir, split='train', target_size=640):
self.root_dir = root_dir
self.split = split
self.target_size = target_size
self.image_dir = os.path.join(root_dir, f'wider_face_{split}')
self.anno_path = os.path.join(root_dir, f'wider_face_{split}_bbx_gt.txt')
self.samples = self._parse_annotations()
print(f"📂 WIDER FACE {split}: {len(self.samples)} 张图像, "
f"{sum(len(s['boxes']) for s in self.samples)} 个人脸")
def _parse_annotations(self):
"""解析 WIDER FACE 标注文件"""
samples = []
with open(self.anno_path, 'r') as f:
lines = f.readlines()
idx = 0
while idx < len(lines):
# 图像路径(相对路径)
img_path = lines[idx].strip()
idx += 1
# 人脸数量
num_faces = int(lines[idx].strip())
idx += 1
# 读取边界框
boxes = []
for _ in range(num_faces):
parts = lines[idx].strip().split()
x, y, w, h = map(float, parts[:4])
# 跳过无效标注(宽或高为0)
if w > 0 and h > 0:
boxes.append([x, y, x + w, y + h])
idx += 1
# 跳过没有人脸的样本
if len(boxes) > 0:
full_path = os.path.join(self.image_dir, img_path)
if os.path.exists(full_path):
samples.append({
'image_path': full_path,
'boxes': np.array(boxes, dtype=np.float32)
})
return samples
def __len__(self):
return len(self.samples)
def __getitem__(self, idx):
sample = self.samples[idx]
# 读取图像
image = cv2.imread(sample['image_path'])
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
h, w = image.shape[:2]
# 缩放到目标尺寸
scale = min(self.target_size / w, self.target_size / h)
if scale != 1.0:
new_w, new_h = int(w * scale), int(h * scale)
image = cv2.resize(image, (new_w, new_h))
# 同时缩放边界框
boxes = sample['boxes'].copy() * scale
else:
boxes = sample['boxes'].copy()
# 归一化图像
image = image.astype(np.float32) / 255.0
image = (image - np.array([0.485, 0.456, 0.406])) / np.array([0.229, 0.224, 0.225])
image = image.transpose(2, 0, 1) # HWC -> CHW
return image, boxes
# 创建 MindSpore 数据集
def create_detection_dataset(root_dir, split='train', batch_size=8, shuffle=True):
"""创建人脸检测数据集管道"""
dataset = WIDERFACEDataset(root_dir, split=split)
# 使用 GeneratorDataset
ms_dataset = ds.GeneratorDataset(
source=dataset,
column_names=["image", "boxes"],
shuffle=shuffle,
num_parallel_workers=4,
)
# 设置 batch_size(需要 pad 到统一尺寸)
ms_dataset = ms_dataset.batch(batch_size, drop_remainder=True)
return ms_dataset
2.2.2 CASIA-WebFace 数据集加载(人脸识别)
class CASIAWebFaceDataset:
"""
CASIA-WebFace 数据集加载器
目录结构:
CASIA-WebFace/
├── 000001/
│ ├── 000001_0.jpg
│ ├── 000001_1.jpg
│ └── ...
├── 000002/
└── ...
"""
def __init__(self, root_dir, target_size=(112, 112)):
self.root_dir = root_dir
self.target_size = target_size
self.samples = []
self.class_to_idx = {}
self._scan_directory()
print(f"📂 CASIA-WebFace: {len(self.samples)} 张图像, "
f"{len(self.class_to_idx)} 个人")
def _scan_directory(self):
"""扫描数据集目录"""
person_dirs = sorted(os.listdir(self.root_dir))
for idx, person_dir in enumerate(person_dirs):
person_path = os.path.join(self.root_dir, person_dir)
if not os.path.isdir(person_path):
continue
self.class_to_idx[person_dir] = idx
for img_name in os.listdir(person_path):
if img_name.lower().endswith(('.jpg', '.jpeg', '.png')):
self.samples.append({
'image_path': os.path.join(person_path, img_name),
'label': idx
})
def __len__(self):
return len(self.samples)
def __getitem__(self, idx):
sample = self.samples[idx]
# 读取图像
image = cv2.imread(sample['image_path'])
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
# 缩放
image = cv2.resize(image, self.target_size)
# 归一化到 [-1, 1]
image = image.astype(np.float32) / 255.0
image = (image - 0.5) / 0.5
image = image.transpose(2, 0, 1) # HWC -> CHW
return image, sample['label']
def create_recognition_dataset(root_dir, batch_size=64, shuffle=True):
"""创建人脸识别数据集管道"""
dataset = CASIAWebFaceDataset(root_dir)
ms_dataset = ds.GeneratorDataset(
source=dataset,
column_names=["image", "label"],
shuffle=shuffle,
num_parallel_workers=4,
)
ms_dataset = ms_dataset.batch(batch_size, drop_remainder=True)
return ms_dataset, len(dataset.class_to_idx)
2.2.3 活体检测数据集加载
class CASIASURFDataset:
"""
CASIA-SURF 活体检测数据集加载器
包含真人(live)和攻击(fake)样本
"""
def __init__(self, root_dir, split='train', target_size=(112, 112)):
self.root_dir = root_dir
self.split = split
self.target_size = target_size
self.samples = self._load_samples()
live_count = sum(1 for s in self.samples if s['label'] == 1)
fake_count = sum(1 for s in self.samples if s['label'] == 0)
print(f"📂 CASIA-SURF {split}: {len(self.samples)} 张图像 "
f"(真人: {live_count}, 攻击: {fake_count})")
def _load_samples(self):
"""加载样本列表"""
samples = []
# 真人样本
live_dir = os.path.join(self.root_dir, self.split, 'live')
if os.path.exists(live_dir):
for img_name in os.listdir(live_dir):
if img_name.lower().endswith(('.jpg', '.jpeg', '.png')):
samples.append({
'image_path': os.path.join(live_dir, img_name),
'label': 1 # 真人
})
# 攻击样本(打印攻击、视频回放、3D面具等)
fake_dir = os.path.join(self.root_dir, self.split, 'fake')
if os.path.exists(fake_dir):
for attack_type in os.listdir(fake_dir):
attack_path = os.path.join(fake_dir, attack_type)
if os.path.isdir(attack_path):
for img_name in os.listdir(attack_path):
if img_name.lower().endswith(('.jpg', '.jpeg', '.png')):
samples.append({
'image_path': os.path.join(attack_path, img_name),
'label': 0 # 假体
})
return samples
def __len__(self):
return len(self.samples)
def __getitem__(self, idx):
sample = self.samples[idx]
image = cv2.imread(sample['image_path'])
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
image = cv2.resize(image, self.target_size)
image = image.astype(np.float32) / 255.0
image = (image - np.array([0.485, 0.456, 0.406])) / np.array([0.229, 0.224, 0.225])
image = image.transpose(2, 0, 1)
return image, sample['label']
def create_liveness_dataset(root_dir, split='train', batch_size=32, shuffle=True):
"""创建活体检测数据集管道"""
dataset = CASIASURFDataset(root_dir, split=split)
ms_dataset = ds.GeneratorDataset(
source=dataset,
column_names=["image", "label"],
shuffle=shuffle,
num_parallel_workers=4,
)
ms_dataset = ms_dataset.batch(batch_size, drop_remainder=True)
return ms_dataset
2.2.4 数据增强
import mindspore.dataset.vision as vision
import mindspore.dataset.transforms as transforms
class FaceDataAugmenter:
"""人脸数据增强器"""
def __init__(self, mode='train'):
self.mode = mode
def augment(self, image, boxes=None, landmarks=None):
"""
数据增强
Args:
image: RGB 图像 (H, W, C)
boxes: 人脸边界框 [N, 4]
landmarks: 人脸关键点 [N, 10]
"""
if self.mode != 'train':
return image, boxes, landmarks
h, w = image.shape[:2]
# 1. 随机水平翻转
if np.random.random() > 0.5:
image = image[:, ::-1, :].copy()
if boxes is not None:
boxes[:, [0, 2]] = w - boxes[:, [2, 0]]
if landmarks is not None:
landmarks[:, 0::2] = w - landmarks[:, 0::2]
# 2. 随机亮度/对比度
if np.random.random() > 0.5:
alpha = np.random.uniform(0.8, 1.2) # 对比度
beta = np.random.uniform(-0.1, 0.1) # 亮度
image = np.clip(image * alpha + beta, 0, 1)
# 3. 随机色彩抖动
if np.random.random() > 0.5:
for c in range(3):
image[:, :, c] *= np.random.uniform(0.8, 1.2)
image = np.clip(image, 0, 1)
# 4. 随机高斯噪声
if np.random.random() > 0.7:
noise = np.random.normal(0, 0.01, image.shape).astype(np.float32)
image = np.clip(image + noise, 0, 1)
return image, boxes, landmarks
2.3 MindSpore 实现
import mindspore
import mindspore.nn as nn
import mindspore.ops as ops
from mindspore import Tensor
import numpy as np
class PNet(nn.Cell):
"""Proposal Network - 快速候选框生成"""
def __init__(self):
super(PNet, self).__init__()
# 特征提取
self.conv1 = nn.Conv2d(3, 10, 3, stride=1, pad_mode='valid')
self.prelu1 = nn.PReLU(10)
self.pool1 = nn.MaxPool2d(2, stride=2)
self.conv2 = nn.Conv2d(10, 16, 3, stride=1, pad_mode='valid')
self.prelu2 = nn.PReLU(16)
self.conv3 = nn.Conv2d(16, 32, 3, stride=1, pad_mode='valid')
self.prelu3 = nn.PReLU(32)
# 多任务输出
self.conv4_1 = nn.Conv2d(32, 2, 1, stride=1, pad_mode='valid') # 分类
self.conv4_2 = nn.Conv2d(32, 4, 1, stride=1, pad_mode='valid') # 回归
def construct(self, x):
x = self.prelu1(self.conv1(x))
x = self.pool1(x)
x = self.prelu2(self.conv2(x))
x = self.prelu3(self.conv3(x))
cls = ops.Softmax(axis=1)(self.conv4_1(x))
bbox = self.conv4_2(x)
return cls, bbox
class RNet(nn.Cell):
"""Refine Network - 候选框精修"""
def __init__(self):
super(RNet, self).__init__()
self.conv1 = nn.Conv2d(3, 28, 3, stride=1, pad_mode='valid')
self.prelu1 = nn.PReLU(28)
self.pool1 = nn.MaxPool2d(3, stride=2, pad_mode='same')
self.conv2 = nn.Conv2d(28, 48, 3, stride=1, pad_mode='valid')
self.prelu2 = nn.PReLU(48)
self.pool2 = nn.MaxPool2d(3, stride=2)
self.conv3 = nn.Conv2d(48, 64, 2, stride=1, pad_mode='valid')
self.prelu3 = nn.PReLU(64)
self.flatten = nn.Flatten()
self.fc4 = nn.Dense(64 * 2 * 2, 128)
self.prelu4 = nn.PReLU(128)
self.fc5_1 = nn.Dense(128, 2) # 分类
self.fc5_2 = nn.Dense(128, 4) # 回归
def construct(self, x):
x = self.prelu1(self.conv1(x))
x = self.pool1(x)
x = self.prelu2(self.conv2(x))
x = self.pool2(x)
x = self.prelu3(self.conv3(x))
x = self.flatten(x)
x = self.prelu4(self.fc4(x))
cls = ops.Softmax(axis=1)(self.fc5_1(x))
bbox = self.fc5_2(x)
return cls, bbox
class ONet(nn.Cell):
"""Output Network - 最终检测 + 关键点"""
def __init__(self):
super(ONet, self).__init__()
self.conv1 = nn.Conv2d(3, 32, 3, stride=1, pad_mode='valid')
self.prelu1 = nn.PReLU(32)
self.pool1 = nn.MaxPool2d(3, stride=2, pad_mode='same')
self.conv2 = nn.Conv2d(32, 64, 3, stride=1, pad_mode='valid')
self.prelu2 = nn.PReLU(64)
self.pool2 = nn.MaxPool2d(3, stride=2)
self.conv3 = nn.Conv2d(64, 64, 3, stride=1, pad_mode='valid')
self.prelu3 = nn.PReLU(64)
self.pool3 = nn.MaxPool2d(2, stride=2)
self.conv4 = nn.Conv2d(64, 128, 2, stride=1, pad_mode='valid')
self.prelu4 = nn.PReLU(128)
self.flatten = nn.Flatten()
self.fc5 = nn.Dense(128 * 2 * 2, 256)
self.prelu5 = nn.PReLU(256)
self.fc6_1 = nn.Dense(256, 2) # 分类
self.fc6_2 = nn.Dense(256, 4) # 回归
self.fc6_3 = nn.Dense(256, 10) # 关键点 (5点 x 2坐标)
def construct(self, x):
x = self.prelu1(self.conv1(x))
x = self.pool1(x)
x = self.prelu2(self.conv2(x))
x = self.pool2(x)
x = self.prelu3(self.conv3(x))
x = self.pool3(x)
x = self.prelu4(self.conv4(x))
x = self.flatten(x)
x = self.prelu5(self.fc5(x))
cls = ops.Softmax(axis=1)(self.fc6_1(x))
bbox = self.fc6_2(x)
landmarks = self.fc6_3(x)
return cls, bbox, landmarks
2.3 检测流程整合
class MTCNNDetector:
"""MTCNN 完整检测器"""
def __init__(self, pnet_path=None, rnet_path=None, onet_path=None):
self.pnet = PNet()
self.rnet = RNet()
self.onet = ONet()
# 加载预训练权重
if pnet_path:
mindspore.load_checkpoint(pnet_path, self.pnet)
if rnet_path:
mindspore.load_checkpoint(rnet_path, self.rnet)
if onet_path:
mindspore.load_checkpoint(onet_path, self.onet)
self.pnet.set_train(False)
self.rnet.set_train(False)
self.onet.set_train(False)
def detect(self, image, min_face_size=20, thresholds=[0.6, 0.7, 0.8]):
"""
检测图像中的人脸
Args:
image: 输入图像 (H, W, C)
min_face_size: 最小人脸尺寸
thresholds: 三阶段阈值
Returns:
boxes: 人脸边界框 [N, 5] (x1, y1, x2, y2, score)
landmarks: 人脸关键点 [N, 10] (5点 x 2坐标)
"""
# Stage 1: P-Net
boxes = self._stage1(image, min_face_size, thresholds[0])
if boxes.shape[0] == 0:
return np.array([]), np.array([])
# Stage 2: R-Net
boxes = self._stage2(image, boxes, thresholds[1])
if boxes.shape[0] == 0:
return np.array([]), np.array([])
# Stage 3: O-Net
boxes, landmarks = self._stage3(image, boxes, thresholds[2])
return boxes, landmarks
def _stage1(self, image, min_face_size, threshold):
"""P-Net 阶段"""
# 构建图像金字塔
scales = self._compute_scale_pyramid(image.shape, min_face_size)
all_boxes = []
for scale in scales:
resized = self._resize_image(image, scale)
img_tensor = Tensor(resized.transpose(2, 0, 1)[np.newaxis, ...], mindspore.float32)
cls, bbox = self.pnet(img_tensor)
cls = cls.asnumpy()[0, 1, :, :] # 人脸概率
bbox = bbox.asnumpy()[0]
# 生成候选框
boxes = self._generate_boxes(cls, bbox, scale, threshold)
all_boxes.append(boxes)
all_boxes = np.vstack(all_boxes)
# NMS
keep = self._nms(all_boxes[:, :4], all_boxes[:, 4], 0.5)
all_boxes = all_boxes[keep]
return all_boxes
def _compute_scale_pyramid(self, img_shape, min_face_size):
"""计算图像金字塔尺度"""
factor = 0.709
min_length = min(img_shape[:2])
m = 12 / min_face_size
min_length *= m
scales = []
while min_length >= 12:
scales.append(m)
m *= factor
min_length *= factor
return scales
def _generate_boxes(self, cls_map, bbox_map, scale, threshold):
"""从特征图生成候选框"""
stride = 2
cell_size = 12
# 找到置信度大于阈值的点
indices = np.where(cls_map > threshold)
if len(indices[0]) == 0:
return np.array([])
# 计算边界框
dx1 = bbox_map[0, indices[0], indices[1]]
dy1 = bbox_map[1, indices[0], indices[1]]
dx2 = bbox_map[2, indices[0], indices[1]]
dy2 = bbox_map[3, indices[0], indices[1]]
score = cls_map[indices[0], indices[1]]
# 映射回原图坐标
x1 = (stride * indices[1] + 1) / scale
y1 = (stride * indices[0] + 1) / scale
x2 = (stride * indices[1] + 1 + cell_size) / scale
y2 = (stride * indices[0] + 1 + cell_size) / scale
# 应用回归偏移
x1 += dx1 * cell_size / scale
y1 += dy1 * cell_size / scale
x2 += dx2 * cell_size / scale
y2 += dy2 * cell_size / scale
boxes = np.stack([x1, y1, x2, y2, score], axis=1)
return boxes
def _nms(self, boxes, scores, threshold):
"""非极大值抑制"""
x1, y1, x2, y2 = boxes[:, 0], boxes[:, 1], boxes[:, 2], boxes[:, 3]
areas = (x2 - x1 + 1) * (y2 - y1 + 1)
order = scores.argsort()[::-1]
keep = []
while order.size > 0:
i = order[0]
keep.append(i)
xx1 = np.maximum(x1[i], x1[order[1:]])
yy1 = np.maximum(y1[i], y1[order[1:]])
xx2 = np.minimum(x2[i], x2[order[1:]])
yy2 = np.minimum(y2[i], y2[order[1:]])
w = np.maximum(0.0, xx2 - xx1 + 1)
h = np.maximum(0.0, yy2 - yy1 + 1)
inter = w * h
iou = inter / (areas[i] + areas[order[1:]] - inter)
inds = np.where(iou <= threshold)[0]
order = order[inds + 1]
return keep
三、人脸对齐
3.1 基于关键点的对齐
class FaceAligner:
"""人脸对齐器"""
def __init__(self, target_size=(112, 112)):
self.target_size = target_size
# 标准人脸关键点位置(基于 112x112 图像)
self.dst_pts = np.array([
[38.2946, 51.6963], # 左眼
[73.5318, 51.5014], # 右眼
[56.0252, 71.7366], # 鼻子
[41.5493, 92.3655], # 左嘴角
[70.7299, 92.2041] # 右嘴角
], dtype=np.float32)
def align(self, image, landmarks):
"""
对齐人脸
Args:
image: 原始图像
landmarks: 5个关键点 [10] (x1,y1,x2,y2,...)
Returns:
aligned_face: 对齐后的人脸图像
"""
# 提取5个关键点
src_pts = landmarks.reshape(5, 2).astype(np.float32)
# 计算相似变换矩阵
M = self._estimate_similarity_transform(src_pts, self.dst_pts)
# 应用变换
aligned = cv2.warpAffine(image, M, self.target_size)
return aligned
def _estimate_similarity_transform(self, src, dst):
"""估计相似变换矩阵"""
# 中心化
src_mean = np.mean(src, axis=0)
dst_mean = np.mean(dst, axis=0)
src_demean = src - src_mean
dst_demean = dst - dst_mean
# 计算缩放和旋转
A = np.dot(dst_demean.T, src_demean) / len(src)
d = np.array([1.0, 1.0])
T = np.array([
[A[0, 0], A[0, 1], dst_mean[0] - np.dot(A[0, :], src_mean)],
[A[1, 0], A[1, 1], dst_mean[1] - np.dot(A[1, :], src_mean)]
])
return T
# 使用示例
def detect_and_align(detector, aligner, image):
"""检测并对齐人脸"""
boxes, landmarks = detector.detect(image)
aligned_faces = []
for box, landmark in zip(boxes, landmarks):
x1, y1, x2, y2 = map(int, box[:4])
face = image[y1:y2, x1:x2]
# 调整关键点坐标到裁剪区域
landmark_adj = landmark.copy()
landmark_adj[::2] -= x1
landmark_adj[1::2] -= y1
aligned = aligner.align(face, landmark_adj)
aligned_faces.append(aligned)
return aligned_faces, boxes
四、人脸识别网络
4.1 基于 MobileFaceNet 的特征提取
class MobileFaceNet(nn.Cell):
"""
MobileFaceNet - 轻量级人脸识别网络
适用于移动端和嵌入式设备
"""
def __init__(self, embedding_size=128, num_classes=None):
super(MobileFaceNet, self).__init__()
self.conv1 = nn.Conv2d(3, 64, 3, stride=2, pad_mode='pad', padding=1)
self.bn1 = nn.BatchNorm2d(64)
self.prelu1 = nn.PReLU(64)
# 倒残差块
self.blocks = nn.SequentialCell([
self._make_block(64, 64, 2, 1),
self._make_block(64, 64, 2, 2),
self._make_block(64, 128, 2, 2),
self._make_block(128, 128, 2, 4),
self._make_block(128, 128, 2, 2),
self._make_block(128, 256, 2, 1),
])
self.conv2 = nn.Conv2d(256, 512, 1, stride=1)
self.bn2 = nn.BatchNorm2d(512)
self.prelu2 = nn.PReLU(512)
# 线性全局深度卷积
self.linear = nn.Conv2d(512, 512, 7, stride=1, pad_mode='valid', group=512)
self.bn_linear = nn.BatchNorm2d(512)
self.flatten = nn.Flatten()
self.embedding = nn.Dense(512, embedding_size)
self.bn_embedding = nn.BatchNorm1d(embedding_size)
# 分类层(训练时使用)
if num_classes:
self.classifier = nn.Dense(embedding_size, num_classes)
else:
self.classifier = None
def _make_block(self, in_channels, out_channels, stride, num_blocks):
"""构建倒残差块组"""
layers = []
layers.append(InvertedResidual(in_channels, out_channels, stride))
for _ in range(1, num_blocks):
layers.append(InvertedResidual(out_channels, out_channels, 1))
return nn.SequentialCell(layers)
def construct(self, x):
x = self.prelu1(self.bn1(self.conv1(x)))
x = self.blocks(x)
x = self.prelu2(self.bn2(self.conv2(x)))
x = self.bn_linear(self.linear(x))
x = self.flatten(x)
x = self.bn_embedding(self.embedding(x))
# L2 归一化
x = x / ops.Sqrt()(ops.ReduceSum(keep_dims=True)(x ** 2, 1))
if self.classifier is not None:
return self.classifier(x)
return x
class InvertedResidual(nn.Cell):
"""倒残差块"""
def __init__(self, in_channels, out_channels, stride, expand_ratio=2):
super(InvertedResidual, self).__init__()
hidden_dim = in_channels * expand_ratio
self.use_res_connect = stride == 1 and in_channels == out_channels
layers = []
# 扩展
if expand_ratio != 1:
layers.append(nn.Conv2d(in_channels, hidden_dim, 1, stride=1))
layers.append(nn.BatchNorm2d(hidden_dim))
layers.append(nn.PReLU(hidden_dim))
# 深度可分离卷积
layers.extend([
nn.Conv2d(hidden_dim, hidden_dim, 3, stride, pad_mode='pad', padding=1, group=hidden_dim),
nn.BatchNorm2d(hidden_dim),
nn.PReLU(hidden_dim),
# 投影
nn.Conv2d(hidden_dim, out_channels, 1, stride=1),
nn.BatchNorm2d(out_channels),
])
self.conv = nn.SequentialCell(layers)
def construct(self, x):
if self.use_res_connect:
return x + self.conv(x)
else:
return self.conv(x)
4.2 ArcFace 损失函数
class ArcFaceLoss(nn.Cell):
"""
ArcFace 损失 - 加性角度间隔损失
增强类间区分度
"""
def __init__(self, num_classes, embedding_size=128, margin=0.5, scale=64):
super(ArcFaceLoss, self).__init__()
self.num_classes = num_classes
self.margin = margin
self.scale = scale
# 权重矩阵
self.weight = mindspore.Parameter(
mindspore.Tensor(np.random.randn(embedding_size, num_classes).astype(np.float32) * 0.01)
)
self.cos_m = np.cos(margin)
self.sin_m = np.sin(margin)
self.th = np.cos(np.pi - margin)
self.mm = np.sin(np.pi - margin) * margin
def construct(self, embedding, label):
"""
Args:
embedding: 归一化后的特征 [B, E]
label: 类别标签 [B]
"""
# 归一化权重
weight_norm = self.weight / ops.Sqrt()(ops.ReduceSum(keep_dims=True)(self.weight ** 2, 0))
# 计算 cos(theta)
cos_t = ops.MatMul()(embedding, weight_norm)
# 计算 sin(theta)
sin_t = ops.Sqrt()(ops.clip_by_value(1.0 - cos_t ** 2, 0.0, 1.0))
# cos(theta + m) = cos(theta)cos(m) - sin(theta)sin(m)
cos_t_plus_m = cos_t * self.cos_m - sin_t * self.sin_m
# 处理边界情况
cos_t_plus_m = ops.where(cos_t > self.th, cos_t_plus_m, cos_t - self.mm)
# one-hot 编码
one_hot = ops.OneHot()(label, self.num_classes, mindspore.Tensor(1.0, mindspore.float32),
mindspore.Tensor(0.0, mindspore.float32))
# 应用 margin
output = one_hot * cos_t_plus_m + (1.0 - one_hot) * cos_t
output *= self.scale
# 交叉熵损失
loss = ops.CrossEntropyLoss()(output, label)
return loss
五、活体检测
5.1 静默活体检测
class SilentLiveNet(nn.Cell):
"""
静默活体检测网络
基于纹理分析和深度信息
"""
def __init__(self):
super(SilentLiveNet, self).__init__()
# 主干网络
self.backbone = nn.SequentialCell([
nn.Conv2d(3, 32, 3, stride=2, pad_mode='pad', padding=1),
nn.BatchNorm2d(32),
nn.ReLU(),
nn.Conv2d(32, 64, 3, stride=1, pad_mode='pad', padding=1),
nn.BatchNorm2d(64),
nn.ReLU(),
nn.MaxPool2d(2, stride=2),
nn.Conv2d(64, 128, 3, stride=1, pad_mode='pad', padding=1),
nn.BatchNorm2d(128),
nn.ReLU(),
nn.MaxPool2d(2, stride=2),
nn.Conv2d(128, 256, 3, stride=1, pad_mode='pad', padding=1),
nn.BatchNorm2d(256),
nn.ReLU(),
nn.AdaptiveAvgPool2d((1, 1)),
])
self.flatten = nn.Flatten()
self.fc = nn.Dense(256, 2) # 真人 / 假体
def construct(self, x):
x = self.backbone(x)
x = self.flatten(x)
x = self.fc(x)
return x
class LiveDetection:
"""活体检测器"""
def __init__(self, model_path=None):
self.model = SilentLiveNet()
if model_path:
mindspore.load_checkpoint(model_path, self.model)
self.model.set_train(False)
# 预处理
self.mean = np.array([0.485, 0.456, 0.406])
self.std = np.array([0.229, 0.224, 0.225])
def predict(self, face_image):
"""
预测是否为活体
Returns:
is_live: bool
confidence: float
"""
# 预处理
img = cv2.resize(face_image, (112, 112))
img = img.astype(np.float32) / 255.0
img = (img - self.mean) / self.std
img = img.transpose(2, 0, 1)[np.newaxis, ...]
# 推理
tensor = Tensor(img, mindspore.float32)
output = self.model(tensor)
probs = ops.Softmax(axis=1)(output).asnumpy()[0]
is_live = probs[1] > probs[0]
confidence = probs[1] if is_live else probs[0]
return is_live, float(confidence)
六、模型训练
6.1 人脸检测训练(MTCNN)
from mindspore import Model, LossMonitor, TimeMonitor, CheckpointConfig, ModelCheckpoint
# ============ P-Net 训练 ============
def train_pnet(data_dir, epochs=30, lr=0.001, batch_size=64):
"""训练 P-Net"""
# 加载数据
dataset = create_detection_dataset(data_dir, split='train', batch_size=batch_size)
# 构建网络
pnet = PNet()
# 多任务损失:分类 + 回归
class PNetLoss(nn.Cell):
def __init__(self):
super(PNetLoss, self).__init__()
self.cls_loss = nn.CrossEntropyLoss()
self.bbox_loss = nn.SmoothL1Loss()
def construct(self, cls_pred, bbox_pred, cls_label, bbox_label):
# 分类损失(只计算正样本和负样本,忽略部分样本)
cls_mask = (cls_label != -1) # -1 表示忽略
cls_loss = self.cls_loss(cls_pred[cls_mask], cls_label[cls_mask])
# 回归损失(只计算正样本)
pos_mask = (cls_label == 1)
bbox_loss = self.bbox_loss(bbox_pred[pos_mask], bbox_label[pos_mask])
return cls_loss + 0.5 * bbox_loss
loss_fn = PNetLoss()
optimizer = nn.Adam(pnet.trainable_params(), learning_rate=lr)
model = Model(pnet, loss_fn, optimizer)
# 保存检查点
config = CheckpointConfig(save_checkpoint_steps=500, keep_checkpoint_max=3)
ckpt_cb = ModelCheckpoint(prefix='pnet', directory='checkpoints/', config=config)
model.train(epochs, dataset, callbacks=[LossMonitor(50), TimeMonitor(), ckpt_cb])
print("✅ P-Net 训练完成")
# ============ R-Net 训练 ============
def train_rnet(data_dir, epochs=30, lr=0.001, batch_size=64):
"""训练 R-Net"""
dataset = create_detection_dataset(data_dir, split='train', batch_size=batch_size)
rnet = RNet()
class RNetLoss(nn.Cell):
def __init__(self):
super(RNetLoss, self).__init__()
self.cls_loss = nn.CrossEntropyLoss()
self.bbox_loss = nn.SmoothL1Loss()
def construct(self, cls_pred, bbox_pred, cls_label, bbox_label):
cls_mask = (cls_label != -1)
cls_loss = self.cls_loss(cls_pred[cls_mask], cls_label[cls_mask])
pos_mask = (cls_label == 1)
bbox_loss = self.bbox_loss(bbox_pred[pos_mask], bbox_label[pos_mask])
return cls_loss + 0.5 * bbox_loss
loss_fn = RNetLoss()
optimizer = nn.Adam(rnet.trainable_params(), learning_rate=lr)
model = Model(rnet, loss_fn, optimizer)
config = CheckpointConfig(save_checkpoint_steps=500, keep_checkpoint_max=3)
ckpt_cb = ModelCheckpoint(prefix='rnet', directory='checkpoints/', config=config)
model.train(epochs, dataset, callbacks=[LossMonitor(50), TimeMonitor(), ckpt_cb])
print("✅ R-Net 训练完成")
# ============ O-Net 训练 ============
def train_onet(data_dir, epochs=30, lr=0.001, batch_size=64):
"""训练 O-Net(含关键点损失)"""
dataset = create_detection_dataset(data_dir, split='train', batch_size=batch_size)
onet = ONet()
class ONetLoss(nn.Cell):
def __init__(self):
super(ONetLoss, self).__init__()
self.cls_loss = nn.CrossEntropyLoss()
self.bbox_loss = nn.SmoothL1Loss()
self.landmark_loss = nn.SmoothL1Loss()
def construct(self, cls_pred, bbox_pred, landmark_pred,
cls_label, bbox_label, landmark_label):
cls_mask = (cls_label != -1)
cls_loss = self.cls_loss(cls_pred[cls_mask], cls_label[cls_mask])
pos_mask = (cls_label == 1)
bbox_loss = self.bbox_loss(bbox_pred[pos_mask], bbox_label[pos_mask])
landmark_loss = self.landmark_loss(landmark_pred[pos_mask],
landmark_label[pos_mask])
return cls_loss + 0.5 * bbox_loss + 0.5 * landmark_loss
loss_fn = ONetLoss()
optimizer = nn.Adam(onet.trainable_params(), learning_rate=lr)
model = Model(onet, loss_fn, optimizer)
config = CheckpointConfig(save_checkpoint_steps=500, keep_checkpoint_max=3)
ckpt_cb = ModelCheckpoint(prefix='onet', directory='checkpoints/', config=config)
model.train(epochs, dataset, callbacks=[LossMonitor(50), TimeMonitor(), ckpt_cb])
print("✅ O-Net 训练完成")
6.2 人脸识别训练(ArcFace)
def train_recognizer(data_dir, epochs=50, lr=0.01, batch_size=128):
"""训练人脸识别网络"""
# 加载数据
dataset, num_classes = create_recognition_dataset(data_dir, batch_size=batch_size)
# 构建网络(含分类层)
network = MobileFaceNet(embedding_size=128, num_classes=num_classes)
# ArcFace 损失
arcface_loss = ArcFaceLoss(num_classes=num_classes,
embedding_size=128,
margin=0.5,
scale=64)
# 训练步骤
class TrainOneStep(nn.Cell):
def __init__(self, network, loss_fn, optimizer):
super(TrainOneStep, self).__init__()
self.network = network
self.loss_fn = loss_fn
self.optimizer = optimizer
self.grad_fn = ops.value_and_grad(self.forward, None,
self.optimizer.parameters)
def forward(self, image, label):
embedding = self.network(image)
loss = self.loss_fn(embedding, label)
return loss
def construct(self, image, label):
loss, grads = self.grad_fn(image, label)
self.optimizer(grads)
return loss
optimizer = nn.SGD(network.trainable_params() + arcface_loss.trainable_params(),
learning_rate=nn.CosineDecayLR(lr, lr * 0.01, epochs * len(dataset)),
momentum=0.9, weight_decay=5e-4)
train_step = TrainOneStep(network, arcface_loss, optimizer)
# 训练循环
for epoch in range(epochs):
epoch_loss = 0
step = 0
for image, label in dataset.create_tuple_iterator():
loss = train_step(image, label)
epoch_loss += float(loss.asnumpy())
step += 1
avg_loss = epoch_loss / step
print(f"Epoch [{epoch+1}/{epochs}], Loss: {avg_loss:.4f}")
# 保存检查点
if (epoch + 1) % 10 == 0:
mindspore.save_checkpoint(network,
f'checkpoints/mobilefacenet_epoch{epoch+1}.ckpt')
print("✅ 人脸识别网络训练完成")
6.3 活体检测训练
def train_liveness(data_dir, epochs=30, lr=0.001, batch_size=32):
"""训练活体检测模型"""
# 加载数据
dataset = create_liveness_dataset(data_dir, split='train', batch_size=batch_size)
# 构建网络
network = SilentLiveNet()
# 加权交叉熵(处理样本不均衡)
class_weights = Tensor([1.0, 2.0], mindspore.float32) # 假体样本加权
loss_fn = nn.CrossEntropyLoss(weight=class_weights)
optimizer = nn.Adam(network.trainable_params(), learning_rate=lr)
model = Model(network, loss_fn, optimizer, metrics={'accuracy'})
# 验证集
val_dataset = create_liveness_dataset(data_dir, split='val', batch_size=batch_size,
shuffle=False)
config = CheckpointConfig(save_checkpoint_steps=200, keep_checkpoint_max=3)
ckpt_cb = ModelCheckpoint(prefix='silent_live', directory='checkpoints/', config=config)
model.train(epochs, dataset, callbacks=[LossMonitor(50), TimeMonitor(), ckpt_cb],
valid_dataset=val_dataset, valid_frequency=5)
print("✅ 活体检测模型训练完成")
6.4 一键训练脚本
def train_all(wider_dir, casia_dir, surf_dir):
"""一键训练所有模块"""
print("=" * 60)
print("🚀 开始训练人脸识别系统所有模块")
print("=" * 60)
# Stage 1: MTCNN 检测器(级联训练)
print("\n📍 Stage 1/4: 训练 P-Net...")
train_pnet(wider_dir, epochs=30, lr=0.001)
print("\n📍 Stage 2/4: 训练 R-Net...")
train_rnet(wider_dir, epochs=30, lr=0.001)
print("\n📍 Stage 3/4: 训练 O-Net...")
train_onet(wider_dir, epochs=30, lr=0.001)
# Stage 2: 人脸识别网络
print("\n📍 Stage 4/4: 训练 MobileFaceNet + ArcFace...")
train_recognizer(casia_dir, epochs=50, lr=0.01)
# Stage 3: 活体检测
print("\n📍 Stage 5: 训练活体检测模型...")
train_liveness(surf_dir, epochs=30, lr=0.001)
print("\n" + "=" * 60)
print("🎉 所有模块训练完成!")
print("📁 权重文件保存在: checkpoints/")
print("=" * 60)
# 运行训练
if __name__ == '__main__':
train_all(
wider_dir='data/WIDER_FACE',
casia_dir='data/CASIA-WebFace',
surf_dir='data/CASIA-SURF'
)
七、完整系统整合
7.1 人脸识别系统类
6.1 人脸识别系统类
class FaceRecognitionSystem:
"""完整的人脸识别系统"""
def __init__(self,
detector_path=None,
recognizer_path=None,
liveness_path=None,
database_path=None):
"""
初始化系统
Args:
detector_path: MTCNN 检测器权重路径
recognizer_path: 识别网络权重路径
liveness_path: 活体检测模型路径
database_path: 人脸数据库路径
"""
# 初始化各模块
self.detector = MTCNNDetector()
self.aligner = FaceAligner()
self.recognizer = MobileFaceNet(embedding_size=128)
self.liveness_detector = LiveDetection()
# 加载权重
if detector_path:
# 加载检测器权重
pass
if recognizer_path:
mindspore.load_checkpoint(recognizer_path, self.recognizer)
if liveness_path:
mindspore.load_checkpoint(liveness_path, self.liveness_detector.model)
self.recognizer.set_train(False)
# 加载人脸数据库
self.database = {}
if database_path and os.path.exists(database_path):
self.load_database(database_path)
def register(self, image, person_id, name):
"""
注册新人脸
Args:
image: 人脸图像
person_id: 人员 ID
name: 人员姓名
"""
# 检测人脸
boxes, landmarks = self.detector.detect(image)
if len(boxes) == 0:
raise ValueError("未检测到人脸")
if len(boxes) > 1:
raise ValueError("检测到多张人脸,请确保只有一人")
# 对齐
aligned_faces, _ = detect_and_align(self.detector, self.aligner, image)
face = aligned_faces[0]
# 活体检测
is_live, conf = self.liveness_detector.predict(face)
if not is_live:
raise ValueError("活体检测失败,请确保是真人")
# 提取特征
feature = self._extract_feature(face)
# 保存到数据库
self.database[person_id] = {
'name': name,
'feature': feature,
'registered_at': datetime.now().isoformat()
}
print(f"✅ 注册成功: {name} (ID: {person_id})")
return True
def recognize(self, image, threshold=0.6):
"""
识别图像中的人脸
Args:
image: 输入图像
threshold: 相似度阈值
Returns:
results: 识别结果列表
"""
# 检测所有人脸
boxes, landmarks = self.detector.detect(image)
if len(boxes) == 0:
return []
results = []
aligned_faces, boxes = detect_and_align(self.detector, self.aligner, image)
for face, box in zip(aligned_faces, boxes):
result = {
'bbox': box[:4].tolist(),
'confidence': float(box[4])
}
# 活体检测
is_live, live_conf = self.liveness_detector.predict(face)
result['is_live'] = bool(is_live)
result['live_confidence'] = live_conf
if not is_live:
result['identity'] = 'FAKE'
result['name'] = '活体检测失败'
results.append(result)
continue
# 特征提取
feature = self._extract_feature(face)
# 比对数据库
best_match, similarity = self._find_best_match(feature)
if similarity > threshold:
result['identity'] = best_match
result['name'] = self.database[best_match]['name']
result['similarity'] = float(similarity)
else:
result['identity'] = 'UNKNOWN'
result['name'] = '未知人员'
result['similarity'] = float(similarity)
results.append(result)
return results
def _extract_feature(self, face_image):
"""提取人脸特征"""
# 预处理
img = cv2.resize(face_image, (112, 112))
img = img.astype(np.float32) / 255.0
img = (img - 0.5) / 0.5 # 归一化到 [-1, 1]
img = img.transpose(2, 0, 1)[np.newaxis, ...]
tensor = Tensor(img, mindspore.float32)
feature = self.recognizer(tensor).asnumpy()
return feature[0]
def _find_best_match(self, feature):
"""在数据库中查找最佳匹配"""
if len(self.database) == 0:
return None, 0.0
best_id = None
best_sim = -1.0
for person_id, data in self.database.items():
db_feature = data['feature']
similarity = np.dot(feature, db_feature) / (
np.linalg.norm(feature) * np.linalg.norm(db_feature)
)
if similarity > best_sim:
best_sim = similarity
best_id = person_id
return best_id, best_sim
def save_database(self, path):
"""保存人脸数据库"""
np.save(path, self.database)
print(f"💾 数据库已保存: {path}")
def load_database(self, path):
"""加载人脸数据库"""
self.database = np.load(path, allow_pickle=True).item()
print(f"📂 数据库已加载: {len(self.database)} 人")
7.2 使用示例
import cv2
from face_recognition_system import FaceRecognitionSystem
# 初始化系统
system = FaceRecognitionSystem(
detector_path='mtcnn.ckpt',
recognizer_path='mobilefacenet.ckpt',
liveness_path='silent_live.ckpt',
database_path='face_db.npy'
)
# ========== 注册新人脸 ==========
print("=== 注册模式 ===")
image = cv2.imread('zhangsan.jpg')
system.register(image, person_id='001', name='张三')
image = cv2.imread('lisi.jpg')
system.register(image, person_id='002', name='李四')
# 保存数据库
system.save_database('face_db.npy')
# ========== 人脸识别 ==========
print("\n=== 识别模式 ===")
test_image = cv2.imread('test_group.jpg')
results = system.recognize(test_image, threshold=0.6)
# 可视化结果
for result in results:
x1, y1, x2, y2 = map(int, result['bbox'])
name = result['name']
is_live = result['is_live']
# 绘制边界框
color = (0, 255, 0) if is_live else (0, 0, 255)
cv2.rectangle(test_image, (x1, y1), (x2, y2), color, 2)
# 显示信息
label = f"{name} ({result['similarity']:.2f})"
if not is_live:
label = "FAKE!"
cv2.putText(test_image, label, (x1, y1-10),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
cv2.imwrite('result.jpg', test_image)
print("✅ 结果已保存: result.jpg")
八、性能优化与部署
8.1 模型量化
# MindSpore Lite 量化导出
from mindspore_lite import converter
config = {
"model_file": "mobilefacenet.ckpt",
"output_file": "mobilefacenet_int8.mindir",
"quant_type": "QUANT_WEIGHT",
"weight_quant_bits": 8
}
converter.convert(fmk_type=converter.FmkType.MS, **config)
8.2 批量推理优化
class BatchRecognizer:
"""批量人脸识别优化"""
def __init__(self, model, batch_size=32):
self.model = model
self.batch_size = batch_size
def recognize_batch(self, face_images):
"""批量推理"""
features = []
for i in range(0, len(face_images), self.batch_size):
batch = face_images[i:i+self.batch_size]
# 填充到 batch_size
while len(batch) < self.batch_size:
batch.append(batch[-1])
# 堆叠成 batch
batch_tensor = np.stack(batch, axis=0)
tensor = Tensor(batch_tensor, mindspore.float32)
# 批量推理
batch_features = self.model(tensor).asnumpy()
features.extend(batch_features[:len(face_images) - i])
return features
九、总结
本文实现了一个完整的 MindSpore 人脸识别系统,包括:
| 模块 | 技术方案 | 特点 |
|---|---|---|
| 人脸检测 | MTCNN | 级联结构,检测 + 关键点 |
| 人脸对齐 | 相似变换 | 5 点对齐,标准化姿态 |
| 特征提取 | MobileFaceNet + ArcFace | 轻量级,高精度 |
| 活体检测 | 静默活体 | 无需用户配合 |
| 系统整合 | 端到端流程 | 注册 → 识别 → 防攻击 |
工业级优化建议:
- 模型压缩:INT8 量化可将模型缩小 4 倍,推理速度提升 2-3 倍
- GPU 加速:使用 MindSpore GPU 后端,批量推理提升吞吐量
- 数据库优化:使用向量数据库(如 Milvus)支持百万级人脸检索
- 多帧融合:视频场景下多帧特征融合提升识别稳定性
参考资源:
- MTCNN 论文:见官方渠道获取
- MobileFaceNet 论文:见官方渠道获取
- ArcFace 论文:见官方渠道获取
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)