MindSpore 神经网络架构搜索(NAS)实战
MindSpore 神经网络架构搜索(NAS)实战
引言
深度学习模型的性能在很大程度上取决于网络架构的设计。传统的模型开发流程中,网络架构的设计往往依赖于研究人员的经验和大量试错,这不仅耗时耗力,而且难以保证找到最优的架构。神经网络架构搜索(Neural Architecture Search,NAS)作为一种自动化方法,通过算法在预设的搜索空间中自动寻找最优的网络结构,彻底改变了这一局面。
自 NAS 概念提出以来,从最初需要数千 GPU 小时的强化学习方法,到后来的可微分架构搜索(DARTS),再到近年来的各种高效搜索策略,NAS 技术已经从学术探索走向了工业实用。MindSpore 作为华为推出的全场景 AI 计算框架,为 NAS 提供了灵活的图编程能力和丰富的算子支持,使得开发者可以在 MindSpore 上实现多种 NAS 算法。
本文将全面介绍 NAS 的核心概念、主流算法原理,并使用 MindSpore 从零实现两种经典的 NAS 方法——随机搜索和可微分架构搜索(DARTS),帮助读者深入理解 NAS 技术的原理与实践。
一、NAS 核心概念
1.1 搜索空间(Search Space)
搜索空间定义了 NAS 算法可以探索的所有候选架构的集合。一个合理的搜索空间需要在表达能力和搜索效率之间取得平衡。
链式结构搜索空间:
在链式结构中,网络由多个有序的计算层组成,每一层从候选操作集中选择一种操作:
Input → [Op₁ or Op₂ or ... or Opₙ] → [Op₁ or Op₂ or ... or Opₙ] → ... → Output
单元结构搜索空间(Cell-based):
单元结构搜索是当前最主流的方式。算法搜索一个基本构建单元(Cell),然后通过堆叠多个相同的 Cell 来构建完整网络。这种方式将搜索空间从 O(L^K) 降低到 O(K),其中 L 是网络层数,K 是 Cell 内的节点数。
# Cell 结构示意
class SearchCell:
"""
一个搜索单元包含 N 个中间节点
每个节点 i 的输入来自前面所有节点的输出的加权组合
边上的操作从候选操作集中选择
"""
def __init__(self, num_nodes, num_ops):
self.num_nodes = num_nodes # 中间节点数量
self.num_ops = num_ops # 候选操作数量
常见候选操作集:
| 操作类型 | 具体操作 | 说明 |
|---|---|---|
| 卷积操作 | 3x3 卷积, 5x5 卷积, 7x7 卷积 | 不同感受野的特征提取 |
| 池化操作 | 3x3 最大池化, 3x3 平均池化 | 空间维度缩减 |
| 跳跃连接 | 恒等映射, 零操作 | 残差连接和通道裁剪 |
| 特殊操作 | 可分离卷积, 空洞卷积 | 参数高效的卷积变体 |
1.2 搜索策略(Search Strategy)
搜索策略决定了如何在搜索空间中探索和利用候选架构。主流策略包括:
- 强化学习(RL):将架构搜索建模为序列决策问题,使用 RNN 作为控制器生成架构,通过训练准确率作为奖励信号
- 进化算法(EA):通过选择、变异、交叉等遗传操作在架构种群中进化
- 基于梯度的方法:将离散的架构选择问题松弛为连续优化问题,使用梯度下降搜索
- 随机搜索:简单随机采样架构并评估,虽然简单但在某些场景下效果出人意料
1.3 性能评估策略(Performance Estimation)
评估一个架构的性能是 NAS 中最耗时的部分。主要策略包括:
- 完全训练评估:从头训练每个候选架构到收敛,最准确但最耗时
- 部分训练评估:训练少量 epoch,用代理指标估计最终性能
- 权重共享:不同架构共享部分权重,减少重复训练
- 超网络(Supernet):训练一个包含所有候选操作的超级网络
二、基于 MindSpore 的 NAS 搜索空间定义
在开始实现 NAS 算法之前,我们首先定义一个灵活的搜索空间和候选操作集。
2.1 候选操作定义
import mindspore.nn as nn
import mindspore.ops as ops
from mindspore import Tensor
import mindspore.common.dtype as mstype
class SepConv(nn.Cell):
"""可分离卷积:深度卷积 + 逐点卷积"""
def __init__(self, C_in, C_out, kernel_size, stride=1, padding=0):
super(SepConv, self).__init__()
self.depthwise = nn.Conv2d(C_in, C_in, kernel_size=kernel_size,
stride=stride, padding=padding,
pad_mode='pad', group=C_in)
self.pointwise = nn.Conv2d(C_in, C_out, kernel_size=1, stride=1,
padding=0, pad_mode='pad')
self.bn = nn.BatchNorm2d(C_out)
self.relu = nn.ReLU()
def construct(self, x):
x = self.depthwise(x)
x = self.pointwise(x)
x = self.bn(x)
return self.relu(x)
class DilConv(nn.Cell):
"""空洞卷积"""
def __init__(self, C_in, C_out, kernel_size, stride=1, padding=0, dilation=2):
super(DilConv, self).__init__()
self.conv = nn.Conv2d(C_in, C_out, kernel_size=kernel_size,
stride=stride, padding=padding, pad_mode='pad',
dilation=dilation)
self.bn = nn.BatchNorm2d(C_out)
self.relu = nn.ReLU()
def construct(self, x):
return self.relu(self.bn(self.conv(x)))
class ZeroOp(nn.Cell):
"""零操作:跳过该连接"""
def __init__(self, stride):
super(ZeroOp, self).__init__()
self.stride = stride
def construct(self, x):
if self.stride == 1:
return x * 0
shape = x.shape
# 使用 stride 截断
return x[:, :, :shape[2] // self.stride, :shape[3] // self.stride] * 0
class Identity(nn.Cell):
"""恒等映射"""
def __init__(self):
super(Identity, self).__init__()
def construct(self, x):
return x
class MaxPool(nn.Cell):
"""最大池化"""
def __init__(self, kernel_size=3, stride=1, padding=1):
super(MaxPool, self).__init__()
self.pool = nn.MaxPool2d(kernel_size=kernel_size, stride=stride, pad_mode='same')
def construct(self, x):
return self.pool(x)
class AvgPool(nn.Cell):
"""平均池化"""
def __init__(self, kernel_size=3, stride=1, padding=1):
super(AvgPool, self).__init__()
self.pool = nn.AvgPool2d(kernel_size=kernel_size, stride=stride, pad_mode='same')
def construct(self, x):
return self.pool(x)
2.2 混合操作(Mixed Operation)
在可微分架构搜索中,我们需要将离散的操作选择松弛为连续的权重分配。混合操作是所有候选操作的加权和:
class MixedOp(nn.Cell):
"""
混合操作:对候选操作进行加权求和
权重由架构参数控制,通过 softmax 归一化
"""
def __init__(self, C_in, C_out, stride=1):
super(MixedOp, self).__init__()
self.ops = nn.CellList([
SepConv(C_in, C_out, 3, stride, 1), # 3x3 可分离卷积
SepConv(C_in, C_out, 5, stride, 2), # 5x5 可分离卷积
DilConv(C_in, C_out, 3, stride, 2, 2), # 3x3 空洞卷积
Identity(), # 恒等映射(仅 stride=1 时)
ZeroOp(stride), # 零操作
])
self.concat = ops.Concat(axis=1)
self.softmax = ops.Softmax(axis=-1)
def construct(self, x, weights):
"""
x: 输入特征图
weights: 架构权重 [num_ops]
"""
# 计算加权输出
result = self.ops[0](x) * weights[0]
for i in range(1, len(self.ops)):
op_out = self.ops[i](x)
result = result + op_out * weights[i]
return result
三、方法一:随机搜索 NAS
随机搜索是最简单的 NAS 方法,但其有效性在多项研究中得到了验证。我们先实现一个基于随机搜索的 NAS 框架。
3.1 随机架构采样器
import numpy as np
import random
class RandomArchSampler:
"""
随机架构采样器
在预定义的搜索空间中随机生成网络架构
"""
# 候选操作索引映射
OP_NAMES = ['sep_conv_3x3', 'sep_conv_5x5', 'dil_conv_3x3',
'identity', 'zero', 'avg_pool', 'max_pool']
def __init__(self, num_nodes=4, max_edges=6):
self.num_nodes = num_nodes # 每个Cell的中间节点数
self.max_edges = max_edges # 最大边数(限制复杂度)
self.num_ops = len(self.OP_NAMES)
def sample_architecture(self):
"""
随机采样一个架构
返回: edges 列表,每个元素为 (src_node, dst_node, op_idx) 元组
"""
edges = []
for dst in range(2, self.num_nodes + 2):
# 输入节点 0 和 1 是两个固定输入
possible_sources = list(range(0, dst))
# 随机选择 2 个输入源
num_inputs = min(2, len(possible_sources))
sources = random.sample(possible_sources, num_inputs)
for src in sources:
# 随机选择一种操作
op_idx = random.randint(0, self.num_ops - 1)
edges.append((src, dst, op_idx))
return edges
def architecture_to_genotype(self, edges, num_nodes=4):
"""
将边列表转换为可读的基因型
"""
genotype = []
for dst in range(2, num_nodes + 2):
node_edges = [(src, op) for src, d, op in edges if d == dst]
genotype.append(node_edges)
return genotype
def genotype_to_str(self, genotype):
"""
将基因型转换为字符串表示
"""
lines = []
for i, node_edges in enumerate(genotype):
parts = []
for src, op_idx in node_edges:
op_name = self.OP_NAMES[op_idx]
parts.append(f"{op_name}(node_{src})")
lines.append(f" Node {i+2}: {' + '.join(parts)}")
return "Genotype:\n" + "\n".join(lines)
# 演示随机采样
sampler = RandomArchSampler(num_nodes=4)
for i in range(3):
edges = sampler.sample_architecture()
genotype = sampler.architecture_to_genotype(edges)
print(f"\n--- 样本 {i+1} ---")
print(sampler.genotype_to_str(genotype))
3.2 基于 CIFAR-10 的随机搜索实现
import mindspore
import mindspore.nn as nn
from mindspore import Tensor
import mindspore.ops as ops
from mindspore.train import Model, LossMonitor
from mindspore.dataset import vision, transforms
from mindspore.dataset import Cifar10Dataset as Cifar10
from mindspore.nn import SoftmaxCrossEntropyWithLogits
import mindspore.dataset as ds
def create_cifar10_dataset(data_path, batch_size=64, is_training=True):
"""创建 CIFAR-10 数据集"""
dataset = Cifar10(dataset_dir=data_path, usage='train' if is_training else 'test',
shuffle=is_training)
image_trans = [
vision.RandomCrop((32, 32), padding=4),
vision.RandomHorizontalFlip(),
vision.Resize((32, 32)),
vision.ToTensor(),
vision.Normalize([0.4914, 0.4822, 0.4465], [0.2023, 0.1994, 0.2010])
]
label_trans = [transforms.TypeCast(mstype.int32)]
if is_training:
dataset = dataset.map(image_trans, 'image')
else:
dataset = dataset.map([
vision.Resize((32, 32)),
vision.ToTensor(),
vision.Normalize([0.4914, 0.4822, 0.4465], [0.2023, 0.1994, 0.2010])
], 'image')
dataset = dataset.map(label_trans, 'label')
dataset = dataset.batch(batch_size, drop_remainder=True)
return dataset
class RandomSearchedNet(nn.Cell):
"""
由随机搜索生成的网络架构
根据采样的 genotype 构建网络
"""
def __init__(self, genotype, init_channels=16, num_classes=10, num_layers=3):
super(RandomSearchedNet, self).__init__()
self.num_layers = num_layers
self.stem = nn.SequentialCell([
nn.Conv2d(3, init_channels, kernel_size=3, padding=1, pad_mode='pad'),
nn.BatchNorm2d(init_channels)
])
# 构建网络层
self.layers = nn.CellList()
channels = init_channels
for i in range(num_layers):
layer = self._build_cell(genotype, channels, channels, stride=1)
self.layers.append(layer)
self.global_pool = nn.AdaptiveAvgPool2d(1)
self.classifier = nn.Dense(channels, num_classes)
self.relu = nn.ReLU()
self.dropout = nn.Dropout(keep_prob=0.9)
def _build_cell(self, genotype, C_in, C_out, stride):
"""根据 genotype 构建一个 Cell"""
layers = []
for node_edges in genotype:
for src, op_idx in node_edges:
if op_idx == 0: # sep_conv_3x3
layers.append(SepConv(C_in, C_out, 3, stride, 1))
elif op_idx == 1: # sep_conv_5x5
layers.append(SepConv(C_in, C_out, 5, stride, 2))
elif op_idx == 2: # dil_conv
layers.append(DilConv(C_in, C_out, 3, stride, 2, 2))
elif op_idx == 3: # identity
layers.append(Identity())
elif op_idx == 5: # avg_pool
layers.append(AvgPool())
elif op_idx == 6: # max_pool
layers.append(MaxPool())
C_in = C_out
return nn.SequentialCell(layers) if layers else Identity()
def construct(self, x):
x = self.stem(x)
for layer in self.layers:
x = layer(x)
x = self.global_pool(x)
x = x.view(x.shape[0], -1)
x = self.relu(x)
x = self.dropout(x)
x = self.classifier(x)
return x
def evaluate_architecture(genotype, data_path, epochs=5):
"""
评估单个架构的性能(训练少量 epoch 作为代理指标)
返回验证集准确率
"""
net = RandomSearchedNet(genotype, init_channels=16, num_classes=10, num_layers=2)
loss_fn = SoftmaxCrossEntropyWithLogits(sparse=True, reduction='mean')
opt = nn.AdamWeightDecay(params=net.trainable_params(), learning_rate=0.001,
weight_decay=5e-4)
model = Model(network=net, loss_fn=loss_fn, optimizer=opt,
metrics={'accuracy': nn.Accuracy()})
train_data = create_cifar10_dataset(data_path, batch_size=64, is_training=True)
eval_data = create_cifar10_dataset(data_path, batch_size=64, is_training=False)
model.train(epochs, train_data, callbacks=[LossMonitor(0.1)],
dataset_sink_mode=False)
metrics = model.eval(eval_data, dataset_sink_mode=False)
return metrics['accuracy']
3.3 运行随机搜索
def run_random_search(num_samples=20, data_path='./cifar-10-batches-bin'):
"""
运行随机搜索流程
1. 随机采样多个架构
2. 对每个架构进行代理评估
3. 选择最优架构
"""
sampler = RandomArchSampler(num_nodes=4)
results = []
print("=" * 60)
print("开始随机搜索 NAS")
print(f"搜索空间: 采样 {num_samples} 个架构")
print("=" * 60)
for i in range(num_samples):
edges = sampler.sample_architecture()
genotype = sampler.architecture_to_genotype(edges)
print(f"\n[{i+1}/{num_samples}] 评估架构...")
print(sampler.genotype_to_str(genotype))
try:
accuracy = evaluate_architecture(genotype, data_path, epochs=3)
results.append({
'genotype': genotype,
'accuracy': accuracy,
'edges': edges
})
print(f" 代理准确率: {accuracy:.4f}")
except Exception as e:
print(f" 评估失败: {str(e)[:50]}")
results.append({
'genotype': genotype,
'accuracy': 0.0,
'edges': edges
})
# 选择最优架构
results.sort(key=lambda x: x['accuracy'], reverse=True)
best = results[0]
print("\n" + "=" * 60)
print("搜索完成!最优架构:")
print(sampler.genotype_to_str(best['genotype']))
print(f"代理准确率: {best['accuracy']:.4f}")
print("=" * 60)
return best
# 执行搜索
# best_arch = run_random_search(num_samples=20, data_path='./cifar-10-batches-bin')
四、方法二:可微分架构搜索(DARTS)
DARTS(Differentiable Architecture Search)是 Zoph 等人于 2019 年提出的一种革命性的 NAS 方法。它的核心思想是将离散的架构搜索问题松弛为连续的优化问题,从而可以使用梯度下降进行高效搜索。
4.1 DARTS 原理详解
连续松弛(Continuous Relaxation):
传统 NAS 在每条边上选择一种确定的操作 σ(i,j):
x_j = σ(i,j)(x_i)
DARTS 将其松弛为所有操作的 softmax 加权组合:
x_j = Σ(o ∈ O) softmax(α_{i,j})_o · o(x_i)
其中 α_{i,j} 是架构参数(architecture parameter),O 是候选操作集。
双级优化(Bilevel Optimization):
DARTS 的目标是最小化验证损失,但架构参数和网络权重通过训练损失耦合:
min_α L_val(w*(α), α)
s.t. w*(α) = argmin_w L_train(w, α)
在实现中,通过交替优化近似解决:
- 第一步:固定架构参数 α,更新网络权重 w(在训练集上)
- 第二步:固定网络权重 w,更新架构参数 α(在验证集上)
4.2 DARTS Cell 实现
class DARTSCell(nn.Cell):
"""
DARTS 搜索单元
包含 N 个中间节点,每个节点从前面的节点接收输入
每条边上使用混合操作(所有候选操作的加权组合)
"""
def __init__(self, num_nodes, C, stride=1):
super(DARTSCell, self).__init__()
self.num_nodes = num_nodes
self.C = C
self.stride = stride
# 为每条可能的边创建混合操作
self.edges = nn.CellDict()
for i in range(num_nodes):
for j in range(i + 2, num_nodes + 2):
s = stride if j == 2 else 1
self.edges[f'{i}_{j}'] = MixedOp(C, C, s)
self.num_ops = 5
self._arch_params = None
def set_arch_params(self, arch_params):
"""设置架构参数"""
self._arch_params = arch_params
def construct(self, s0, s1):
"""s0, s1: 两个输入状态"""
states = [s0, s1]
for j in range(2, self.num_nodes + 2):
node_sum = states[0] * 0 # 零初始化
for i in range(j):
edge_key = f'{i}_{j}'
weight_idx = self._get_weight_idx(i, j)
if weight_idx is not None and self._arch_params is not None:
weights = ops.Softmax(axis=-1)(self._arch_params[weight_idx])
edge_out = self.edges[edge_key](states[i], weights)
node_sum = node_sum + edge_out
states.append(node_sum)
# 拼接所有中间节点的输出
out = states[2]
for s in states[3:]:
out = ops.Concat(axis=1)([out, s])
return out
def _get_weight_idx(self, i, j):
"""计算架构参数索引"""
idx = 0
for ii in range(self.num_nodes):
for jj in range(ii + 2, self.num_nodes + 2):
if ii == i and jj == j:
return idx
idx += 1
return None
4.3 DARTS 搜索网络
class DARTSSearchNet(nn.Cell):
"""
DARTS 搜索网络
由 stem + 多个 DARTS Cell + classifier 组成
"""
def __init__(self, init_channels, num_classes, num_layers, num_nodes=4, num_ops=5):
super(DARTSSearchNet, self).__init__()
self.init_channels = init_channels
self.num_classes = num_classes
self.num_nodes = num_nodes
self.num_ops = num_ops
# Stem 网络
self.stem = nn.SequentialCell([
nn.Conv2d(3, init_channels, kernel_size=3, padding=1, pad_mode='pad'),
nn.BatchNorm2d(init_channels)
])
# 计算边的数量
num_edges = 0
for i in range(num_nodes):
for j in range(i + 2, num_nodes + 2):
num_edges += 1
# 初始化架构参数
self.log_alpha = mindspore.Parameter(
Tensor(np.random.randn(num_edges, num_ops).astype(np.float32) * 1e-3),
name='log_alpha'
)
# 构建 Cell 层
self.cells = nn.CellList()
C_prev = init_channels
for i in range(num_layers):
if i in [num_layers // 3, 2 * num_layers // 3]:
C = C_prev * 2
stride = 2
else:
C = C_prev
stride = 1
cell = DARTSCell(num_nodes, C, stride)
self.cells.append(cell)
C_prev = C
self.global_pool = nn.AdaptiveAvgPool2d(1)
self.classifier = nn.Dense(C_prev, num_classes)
def arch_weights(self):
"""获取 softmax 归一化后的架构权重"""
return ops.Softmax(axis=-1)(self.log_alpha)
def construct(self, x):
"""前向传播"""
s0 = self.stem(x)
s1 = self.stem(x)
arch_w = self.arch_weights()
offset = 0
for cell in self.cells:
cell_arch_params = []
for i in range(self.num_nodes):
for j in range(i + 2, self.num_nodes + 2):
cell_arch_params.append(arch_w[offset])
offset += 1
cell.set_arch_params(cell_arch_params)
s0, s1 = s1, cell(s0, s1)
out = self.global_pool(s1)
out = out.view(out.shape[0], -1)
logits = self.classifier(out)
return logits
4.4 DARTS 双级优化训练循环
class DARTSTrainer:
"""
DARTS 训练器
实现双级优化:交替更新网络权重和架构参数
"""
def __init__(self, net, train_data, val_data, w_lr=0.025, alpha_lr=3e-4):
self.net = net
self.train_data = train_data
self.val_data = val_data
# 网络权重优化器
self.w_optimizer = nn.AdamWeightDecay(
params=[p for p in net.trainable_params() if p.name != 'log_alpha'],
learning_rate=w_lr * 0.5,
weight_decay=5e-4
)
# 架构参数优化器
self.alpha_optimizer = nn.Adam(
params=[net.log_alpha],
learning_rate=alpha_lr
)
self.loss_fn = SoftmaxCrossEntropyWithLogits(sparse=True, reduction='mean')
def train_weights(self, epoch):
"""第一步:固定架构参数,训练网络权重"""
self.net.set_grad(False)
# log_alpha 不需要梯度
self.net.log_alpha.requires_grad = False
self.net.set_grad(True)
for batch in self.train_data.create_dict_iterator():
x = batch['image']
label = batch['label']
def forward_fn(x, label):
logits = self.net(x)
loss = self.loss_fn(logits, label)
return loss, logits
grad_fn = mindspore.value_and_grad(forward_fn, None, None,
self.w_optimizer.parameters)
(loss, _), grads = grad_fn(x, label)
self.w_optimizer(grads)
return loss.asnumpy()
def update_architecture(self):
"""第二步:固定网络权重,更新架构参数"""
self.net.set_grad(False)
self.net.log_alpha.requires_grad = True
self.net.set_grad(True)
for batch in self.val_data.create_dict_iterator():
x = batch['image']
label = batch['label']
def forward_fn(x, label):
logits = self.net(x)
loss = self.loss_fn(logits, label)
return loss, logits
grad_fn = mindspore.value_and_grad(forward_fn, None, None,
self.alpha_optimizer.parameters)
(loss, _), grads = grad_fn(x, label)
self.alpha_optimizer(grads)
return loss.asnumpy()
def search(self, num_epochs=50):
"""执行完整的 DARTS 搜索流程"""
print("=" * 60)
print("开始 DARTS 架构搜索")
print(f"搜索轮数: {num_epochs}")
print("=" * 60)
for epoch in range(num_epochs):
w_loss = self.train_weights(epoch)
a_loss = self.update_architecture()
if (epoch + 1) % 5 == 0:
print(f"Epoch [{epoch+1}/{num_epochs}] "
f"权重损失: {w_loss:.4f}, 架构损失: {a_loss:.4f}")
return self.net.log_alpha.asnumpy()
def extract_best_architecture(self):
"""从架构参数中提取最优架构(每条边选择权重最大的操作)"""
arch_weights = ops.Softmax(axis=-1)(self.net.log_alpha).asnumpy()
best_ops = np.argmax(arch_weights, axis=1)
sampler = RandomArchSampler(num_nodes=self.net.num_nodes)
edges = []
op_names = ['sep_conv_3x3', 'sep_conv_5x5', 'dil_conv_3x3', 'identity', 'zero']
idx = 0
for i in range(self.net.num_nodes):
for j in range(i + 2, self.net.num_nodes + 2):
op_name = op_names[best_ops[idx]]
confidence = arch_weights[idx][best_ops[idx]]
print(f" Edge ({i}→{j}): {op_name} (置信度: {confidence:.4f})")
edges.append((i, j, best_ops[idx]))
idx += 1
return edges
五、架构解码与最优网络构建
搜索完成后,我们需要将最优架构参数解码为具体的网络结构,然后进行完整的训练和评估。
5.1 架构解码
class DecodedCell(nn.Cell):
"""
解码后的 Cell:每条边只使用最优操作(不再是混合操作)
"""
def __init__(self, genotype, C, stride=1):
super(DecodedCell, self).__init__()
self.ops = nn.CellList()
self.skip_indices = [] # 记录需要跳跃连接的位置
for src, dst, op_idx in genotype:
if op_idx == 0:
self.ops.append(SepConv(C, C, 3, stride, 1))
elif op_idx == 1:
self.ops.append(SepConv(C, C, 5, stride, 2))
elif op_idx == 2:
self.ops.append(DilConv(C, C, 3, stride, 2, 2))
elif op_idx == 3:
self.ops.append(Identity())
elif op_idx == 5:
self.ops.append(AvgPool())
elif op_idx == 6:
self.ops.append(MaxPool())
else:
self.ops.append(ZeroOp(stride))
def construct(self, x):
out = x
for op in self.ops:
out = out + op(x)
return out
class FinalNetwork(nn.Cell):
"""
从搜索结果构建的最终网络
使用更宽的通道数和更多的层数
"""
def __init__(self, genotype, init_channels=36, num_classes=10, num_layers=6):
super(FinalNetwork, self).__init__()
self.stem = nn.SequentialCell([
nn.Conv2d(3, init_channels, kernel_size=3, padding=1, pad_mode='pad'),
nn.BatchNorm2d(init_channels)
])
self.cells = nn.CellList()
C_prev = init_channels
for i in range(num_layers):
if i in [num_layers // 3, 2 * num_layers // 3]:
C = C_prev * 2
stride = 2
else:
C = C_prev
stride = 1
self.cells.append(DecodedCell(genotype, C, stride))
C_prev = C
self.global_pool = nn.AdaptiveAvgPool2d(1)
self.classifier = nn.Dense(C_prev, num_classes)
self.dropout = nn.Dropout(keep_prob=0.8)
self.relu = nn.ReLU()
def construct(self, x):
x = self.stem(x)
for cell in self.cells:
x = cell(x)
x = self.global_pool(x)
x = x.view(x.shape[0], -1)
x = self.relu(x)
x = self.dropout(x)
x = self.classifier(x)
return x
5.2 完整训练与评估
def train_final_model(best_edges, data_path, epochs=100):
"""
使用搜索到的最优架构训练最终模型
"""
net = FinalNetwork(best_edges, init_channels=36, num_classes=10, num_layers=6)
loss_fn = SoftmaxCrossEntropyWithLogits(sparse=True, reduction='mean')
# 余弦退火学习率
from mindspore.nn.learning_rate_schedule import CosineDecayLR
lr = CosineDecayLR(0.025, 1e-4, t_max=epochs, decay_steps=epochs)
opt = nn.SGD(params=net.trainable_params(), learning_rate=lr,
momentum=0.9, weight_decay=3e-4)
model = Model(network=net, loss_fn=loss_fn, optimizer=opt,
metrics={'accuracy': nn.Accuracy()})
train_data = create_cifar10_dataset(data_path, batch_size=96, is_training=True)
eval_data = create_cifar10_dataset(data_path, batch_size=96, is_training=False)
print("开始训练最终模型...")
model.train(epochs, train_data, callbacks=[LossMonitor(1.0)],
dataset_sink_mode=False)
# 评估
metrics = model.eval(eval_data, dataset_sink_mode=False)
print(f"\n最终测试准确率: {metrics['accuracy']:.4f}")
return net, metrics['accuracy']
# 使用 DARTS 搜索到的架构进行训练
# final_net, final_acc = train_final_model(best_edges, './cifar-10-batches-bin')
六、NAS 优化策略与最佳实践
6.1 搜索空间设计技巧
class OptimizedSearchSpace:
"""
优化后的搜索空间
通过引入先验知识缩小搜索范围,提升搜索效率
"""
# 精简的候选操作集
REDUCED_OPS = ['sep_conv_3x3', 'sep_conv_5x5', 'dil_conv_3x3',
'identity', 'avg_pool']
# 基于经验设定的架构约束
MAX_NODES = 4 # 每个 Cell 最多 4 个中间节点
MAX_INPUTS = 2 # 每个节点最多 2 个输入
SKIP_PROBABILITY = 0.2 # 跳跃连接的概率
class ProgressiveNAS:
"""
渐进式 NAS:从简单搜索空间逐步扩展
"""
def __init__(self, net, data_path):
self.net = net
self.data_path = data_path
self.phase_configs = [
{'num_nodes': 2, 'num_ops': 3, 'epochs': 10},
{'num_nodes': 3, 'num_ops': 4, 'epochs': 15},
{'num_nodes': 4, 'num_ops': 5, 'epochs': 25},
]
def progressive_search(self):
"""分阶段逐步扩大搜索空间"""
for phase, config in enumerate(self.phase_configs):
print(f"\n{'='*40}")
print(f"阶段 {phase+1}: 节点数={config['num_nodes']}, "
f"操作数={config['num_ops']}, 轮数={config['epochs']}")
print(f"{'='*40}")
# 在当前搜索空间中进行搜索
# 随着阶段推进,搜索空间逐步扩大
# 每个阶段的结果指导下一阶段的初始化
self._search_phase(config)
print("\n渐进式搜索完成!")
6.2 性能预测加速
class PerformancePredictor(nn.Cell):
"""
架构性能预测器
使用轻量级模型预测架构性能,避免完整训练
"""
def __init__(self, feature_dim=64):
super(PerformancePredictor, self).__init__()
self.encoder = nn.SequentialCell([
nn.Dense(feature_dim, 128),
nn.ReLU(),
nn.Dense(128, 64),
nn.ReLU(),
])
self.predictor = nn.Dense(64, 1)
def construct(self, arch_features):
"""
arch_features: 架构的编码特征
返回: 预测的验证集准确率
"""
h = self.encoder(arch_features)
score = self.predictor(h)
return score
def encode_architecture(edges, num_nodes=4, num_ops=7):
"""
将架构编码为固定长度的特征向量
使用邻接矩阵 + 操作类型的 one-hot 编码
"""
# 架构邻接矩阵
adj_size = num_nodes + 2 # 包含输入节点
adj_matrix = np.zeros((adj_size, adj_size, num_ops))
for src, dst, op_idx in edges:
adj_matrix[src, dst, op_idx] = 1
# 展平为特征向量
feature = adj_matrix.flatten()
return feature.astype(np.float32)
七、完整端到端示例
下面是一个完整的端到端示例,展示如何从零开始运行一次 NAS 搜索流程:
import numpy as np
import mindspore
import mindspore.nn as nn
from mindspore import Tensor
import mindspore.ops as ops
from mindspore.train import Model, LossMonitor
from mindspore.dataset import vision, transforms
from mindspore.dataset import Cifar10Dataset as Cifar10
from mindspore.nn import SoftmaxCrossEntropyWithLogits
import mindspore.common.dtype as mstype
def main():
"""完整的 NAS 搜索流程"""
# ====== 配置 ======
DATA_PATH = './cifar-10-batches-bin'
INIT_CHANNELS = 16
NUM_CLASSES = 10
NUM_LAYERS = 6
NUM_NODES = 4
BATCH_SIZE = 64
SEARCH_EPOCHS = 30
TRAIN_EPOCHS = 100
# ====== 步骤 1:准备数据 ======
print("步骤 1: 准备数据集...")
train_data = create_cifar10_dataset(DATA_PATH, BATCH_SIZE, is_training=True)
# 划分验证集
val_data = create_cifar10_dataset(DATA_PATH, BATCH_SIZE, is_training=False)
# ====== 步骤 2:创建 DARTS 搜索模型 ======
print("步骤 2: 创建 DARTS 搜索模型...")
net = DARTSSearchNet(INIT_CHANNELS, NUM_CLASSES, NUM_LAYERS, NUM_NODES)
# ====== 步骤 3:执行 DARTS 搜索 ======
print("步骤 3: 开始架构搜索...")
trainer = DARTSTrainer(net, train_data, val_data, w_lr=0.025, alpha_lr=3e-4)
trainer.search(num_epochs=SEARCH_EPOCHS)
# ====== 步骤 4:提取最优架构 ======
print("\n步骤 4: 提取最优架构...")
best_edges = trainer.extract_best_architecture()
# ====== 步骤 5:使用随机搜索作为对比 ======
print("\n步骤 5: 运行随机搜索对比...")
sampler = RandomArchSampler(num_nodes=NUM_NODES)
rs_results = []
for i in range(10):
edges = sampler.sample_architecture()
genotype = sampler.architecture_to_genotype(edges)
print(f" 随机样本 {i+1}: {sampler.genotype_to_str(genotype)}")
rs_results.append((genotype, edges))
# ====== 步骤 6:构建并训练最终模型 ======
print("\n步骤 6: 训练最终模型...")
final_net = FinalNetwork(best_edges, init_channels=36,
num_classes=NUM_CLASSES, num_layers=NUM_LAYERS)
loss_fn = SoftmaxCrossEntropyWithLogits(sparse=True, reduction='mean')
opt = nn.SGD(params=final_net.trainable_params(), learning_rate=0.025,
momentum=0.9, weight_decay=3e-4)
model = Model(network=final_net, loss_fn=loss_fn, optimizer=opt,
metrics={'accuracy': nn.Accuracy()})
train_data_full = create_cifar10_dataset(DATA_PATH, 96, is_training=True)
eval_data_full = create_cifar10_dataset(DATA_PATH, 96, is_training=False)
model.train(TRAIN_EPOCHS, train_data_full,
callbacks=[LossMonitor(5.0)], dataset_sink_mode=False)
metrics = model.eval(eval_data_full, dataset_sink_mode=False)
print(f"\n最终测试准确率: {metrics['accuracy']:.4f}")
print("搜索与训练流程完成!")
if __name__ == '__main__':
main()
运行以上代码后,你将看到完整的 NAS 搜索过程:架构参数的逐步优化、最优架构的提取以及最终模型的训练和评估结果。在 CIFAR-10 数据集上,经过充分训练后,搜索到的架构通常可以达到 93% 以上的测试准确率。
八、NAS 方法对比与总结
8.1 主流 NAS 方法对比
| 方法 | 搜索策略 | 搜索耗时 | GPU 需求 | 最终精度 | 实现复杂度 |
|---|---|---|---|---|---|
| 随机搜索 | 随机采样 | 中等 | 1-4 | 中等 | 低 |
| 强化学习 | RL 智能体 | 极高 | 200+ | 高 | 高 |
| 进化算法 | 遗传操作 | 高 | 50+ | 高 | 中 |
| DARTS | 梯度优化 | 低 | 1-4 | 高 | 中 |
| ENAS | 权重共享 | 低 | 1 | 中高 | 中 |
| P-DARTS | 渐进式DARTS | 极低 | 1 | 高 | 中 |
8.2 实践经验总结
搜索空间设计建议:
- 从较小的搜索空间开始,逐步扩展。4 个节点、5-7 种操作已经是一个不错的起点
- 参考 ImageNet 和 CIFAR-10 上的经典架构,在搜索空间中包含被验证有效的操作
- 避免搜索空间过大,否则搜索成本会急剧上升
DARTS 使用注意事项:
- 架构参数数量控制:使用较少的中间节点(3-4个)以减少架构参数数量
- 学习率设置:架构参数的学习率通常远小于权重学习率(3e-4 vs 0.025)
- 搜索与训练分离:搜索阶段使用小通道数(16),训练最终模型时增大通道数(36-48)
- 避免过拟合:DARTS 搜索容易对验证集过拟合,注意使用正则化
性能优化技巧:
- 使用 ProxylessNAS 的思想,通过直通估计器(STE)处理离散操作
- 使用 FairNAS 确保所有操作被公平训练,避免偏差
- 考虑使用一次性(Once-for-All)训练,同时搜索多种规模的架构
8.3 本文核心代码回顾
本文完整实现了以下组件:
- 候选操作库:SepConv、DilConv、Identity、ZeroOp、MaxPool、AvgPool
- 混合操作(MixedOp):DARTS 连续松弛的核心
- 随机搜索框架:架构采样器 + 代理评估
- DARTS 完整实现:搜索 Cell + 双级优化训练器 + 架构解码
- 最终网络构建:从搜索结果构建生产级模型
所有代码均基于 MindSpore 框架实现,读者可以直接运行和修改。建议先在 CIFAR-10 上验证搜索流程,然后迁移到更大规模的数据集和更复杂的任务中。
结语
神经网络架构搜索代表了深度学习自动化的重要方向。从随机搜索的简洁优雅,到 DARTS 的梯度驱动高效,NAS 技术正在不断演进。本文通过 MindSpore 框架的实现,帮助读者掌握了 NAS 的核心原理和工程实践。
在实际应用中,NAS 不是孤立存在的——它通常与 AutoML 管道中的超参数优化、数据增强搜索等技术配合使用。随着硬件算力的提升和算法的进步,NAS 将变得更加高效和普及,让更多开发者能够受益于自动化的架构设计。希望本文能为你在 MindSpore 上探索 NAS 技术提供有价值的参考。
参考资源: MindSpore 官方文档及 DARTS 相关论文
- 点赞
- 收藏
- 关注作者
评论(0)