如何用MindSpore实现实现自定义单步训练
EMA是什么
深度学习模型使用EMA更新介绍
我们都知道,深度学习模型中权重的更新基本都是使用基于梯度的优化方法来更新的,但是也有一部分参数是使用特征的统计量来更新的,就比如BatchNormalization层。
这里展示出PyTorch的源码,来自_BatchNorm基类。可以看到_BatchNorm层的Bn层也会基于一定的单层Batch特征进行一个特征统计,用来对self.running_mean 和self.running_var 进行一个统计程度的更新。可以参考博客Pytorch的BatchNorm层使用中容易出现的问题,作者给出了详细的解释
if self.track_running_stats:
self.running_mean = UninitializedBuffer(**factory_kwargs)
self.running_var = UninitializedBuffer(**factory_kwargs)
self.num_batches_tracked = torch.tensor(
0, dtype=torch.long, **{k: v for k, v in factory_kwargs.items() if k != 'dtype'})
...
if self.training and self.track_running_stats:
# TODO: if statement only here to tell the jit to skip emitting this when it is None
if self.num_batches_tracked is not None: # type: ignore[has-type]
self.num_batches_tracked = self.num_batches_tracked + 1 # type: ignore[has-type]
if self.momentum is None: # use cumulative moving average
exponential_average_factor = 1.0 / float(self.num_batches_tracked)
else: # use exponential moving average
exponential_average_factor = self.momentum
基于PyTorch的EMA权重更新方法
深度学习PyTorch Image Models (timm)是一个图像模型(models)、层(layers)、实用程序(utilities)、优化器(optimizers)、调度器(schedulers)、数据加载/增强(data-loaders / augmentations)和参考训练/验证脚本(reference training / validation scripts)的集合,目的是将各种SOTA模型组合在一起,从而能够重现ImageNet的训练结果。
作者:Ross Wightman,来自加拿大温哥华。用timm必定要致敬大佬!
这里贴上大佬的github链接
timm库实现了最新的几乎所有的具有影响力的视觉模型、模型的权重、一个很棒的分布式训练和评估的代码框架,它几乎已经是目前从事深度学习模型设计工作者必备的一个工具包,并且这个仓库有一个非常可靠的团队保持稳定的更新。
def __init__(self, model, decay=0.9999, device=None):
super(ModelEmaV2, self).__init__()
# make a copy of the model for accumulating moving average of weights
self.module = deepcopy(model)
self.module.eval()
self.decay = decay
self.device = device # perform ema on different device from model if set
if self.device is not None:
self.module.to(device=device)
def _update(self, model, update_fn):
with torch.no_grad():
for ema_v, model_v in zip(self.module.state_dict().values(), model.state_dict().values()):
if self.device is not None:
model_v = model_v.to(device=self.device)
ema_v.copy_(update_fn(ema_v, model_v))
关于深度学习模型EMA的更新,可以简单联想BatchNormalization中的mean和var的更新方式。简而言之就是对于模型M,创建一个模型副本
并且给定一个动量系数,在timm中默认为0.9999
在训练的过程中
由于PyTorch的动态图属性,我们可以很简单的通过简单的python程序完成副本模型的创建和模型的更新。对于使用TensorFlow的同学,可以参考tf.train.ExponentialMovingAverage,这里就不过多叙述。
MindSpore介绍
MindSpore官网
MindSpore是端边云全场景按需协同的华为自研AI计算框架,提供全场景统一API,为全场景AI的模型开发、模型运行、模型部署提供端到端能力。MindSpore采用端-边-云按需协作分布式架构、微分原生编程新范式以及AI Native新执行模式,实现更好的资源效率、安全可信,同时降低行业AI开发门槛、释放昇腾芯片算力,助力普惠AI。
如何用MindSpore实现自定义单步训练
尽管MindSpore为我们提供了许多高阶的API接口,可以让我们快速定义模型完成训练,但是由于我们经常会有一切特殊的训练需求,这个时候MindSpore的model.train并不能给我们很多提供需求,因此我们需要自定义模型的训练的过程。
- 当模型容易梯度爆炸时,需要附加clip_grad_norm
- 当模型的梯度不够大时,需要使用gradient_accumulation
- 为模型创建ema副本
官网给我们提供了一部分自定义的范式,梯度裁剪、梯度累积,但是官网的梯度裁剪方案并不能应用到global_norm,如果对单层进行裁剪,我们很难估计要给定一个大概什么样子的数值,同时不同的层的梯度缩放也会产生不均匀的情况,比较考验使用者的经验,而梯度累积的写法太过于复杂了,非常不利于使用,模型的ema更新是目前MindSpore的官网缺失,这里非常感谢大佬给了我们一个可以参考的模板,因此这篇博客中,我将提供一个可以简单使用的模板,希望可以作为大家祖传的TrainWrapper使用。
本篇博客会从$$自定义训练\rightarrow附加梯度裁剪\rightarrow附加梯度累积\rightarrow附加EMA模型更新$$三个步骤对这次的代码历程进行介绍,博客会有些长,如果大家对整体已经十分了解,可以从我的github中获取代码,代码链接
自定义训练
在定义自定义训练的时候,我们主要需要考虑2件事情
- 针对混合精度,我们进可能在优化器外部完成梯度的缩放处理,避免在优化器内部不够清楚
- 针对并行计算,我们需要调用接口聚合不同进程的loss,完成对不同进程权重的同步更新
很感动我们在MindSpore官网的model仓库找到了TrainWrapper的主要模板,因此我们我们就主要基于这个模板进行开发,省却了很多找分布式训练功能的代码,我会在第一步详细介绍代码。
grad_scale = C.MultitypeFuncGraph("grad_scale")
""""
在MindSpore中,如果要对列表型的张量做一个统一的数学操作,我们需要先用C.MultitypeFuncGraph
创建一个图,然后在图里面完成操作的定义。一个图里面可以定义针对不同的入参数量定义若干个op,调用
的时候会自动选择的。
"""
@grad_scale.register("Tensor", "Tensor")
def tensor_grad_scale(scale, grad):
return grad * P.Reciprocal()(scale)
class TrainingWrapper(nn.Cell):
def __init__(self, network, optimizer, sens=1.0):
super(TrainingWrapper, self).__init__(auto_prefix=False)
self.network = network
self.network.set_grad()
"""
虽然使用两种方法都可以将对应的trainable_parameters传入Cell,但是强烈建议使用
optimizer.parameters,有一部分参数需要weight_decay另一部分不需要导致传入的参数
是字典形状的时候,第二种network.trainable_parameters()方法可能会导致梯度和对应
的权重不对齐,优化器报错
"""
self.weights = optimizer.parameters
# self.weights = network.trainable_parameters()
self.optimizer = optimizer
self.grad = C.GradOperation(get_by_list=True, sens_param=True)
self.sens = float(sens)
self.reducer_flag = False
self.grad_reducer = None
"""
这里是对模型分布式训练的时候起作用的代码,主要是为了聚合不同进程的损失进行梯度的求导
"""
self.parallel_mode = context.get_auto_parallel_context("parallel_mode")
if self.parallel_mode in [ParallelMode.DATA_PARALLEL, ParallelMode.HYBRID_PARALLEL]:
self.reducer_flag = True
if self.reducer_flag:
mean = context.get_auto_parallel_context("gradients_mean")
if auto_parallel_context().get_device_num_is_set():
degree = context.get_auto_parallel_context("device_num")
else:
degree = get_group_size()
self.grad_reducer = nn.DistributedGradReducer(optimizer.parameters, mean, degree)
self.hyper_map = C.HyperMap()
def construct(self, *args):
"""opt"""
weights = self.weights
loss = self.network(*args)
# 这里相当于是在混合精度的时候对loss完成缩放,这里我们统一使用fixedlossscale, sens == self.sens
sens = P.Fill()(P.DType()(loss), P.Shape()(loss), self.sens)
# 关于使用loss对weight求导的方法建议直接模仿这个写法,估计是Fill操作loss和sens已经创建了联系
grads = self.grad(self.network, weights)(*args, sens)
if self.reducer_flag:
# apply grad reducer on grads
grads = self.grad_reducer(grads)
if self.sens > 1:
"""
重点学习使用C.MultitypeFuncGraph进行操作的范式,这是我们后期添加梯度聚合、EMA的基础
self.sens作为不可迭代的对象,将会和grad_scale(函数)放在partial一个函数里面
grad作为可迭代的对象会放在外面,最外面套一个C.HyperMap()可以完成对单一梯度的更新
"""
grads = self.hyper_map(F.partial(grad_scale, F.scalar_to_array(self.sens)), grads)
self.optimizer(grads)
return loss
梯度裁剪
在上文中已经介绍,我们这个教程主要介绍如何使用全局梯度裁剪,如果要用单一的梯度裁剪,见官网。
我们再次明确,需要聚合全局的梯度,我们需要添加两个参数
- 是否进行去全局梯度裁剪 use_global_norm
- 裁剪的范数值 clip_global_norm_value
限于篇幅,这里主要粘贴不同的代码
class TrainingWrapper(nn.Cell):
def __init__(self, network, optimizer, sens=1.0, use_global_norm=True, clip_global_norm_value=1.0):
super(TrainingWrapper, self).__init__(auto_prefix=False)
...
self.use_global_norm = use_global_norm
self.clip_global_norm_value = clip_global_norm_value
...
def construct(self, *args):
...
if self.sens > 1:
grads = self.hyper_map(F.partial(grad_scale, F.scalar_to_array(self.sens)), grads)
"""
在完成梯度缩放之后,对梯度进行适当的裁剪
说来比较奇怪,本来这一步我是想应用在继承优化器,然后在grad传入optimizer之前进行的,但是那种情况
仅仅无法在model.train的原生静态图条件下跑通,可能是我违反了一些图的规则;上述方法可以使用单一的
梯度裁剪
"""
if self.use_global_norm:
grads = C.clip_by_global_norm(grads, clip_norm=self.clip_global_norm_value)
self.optimizer(grads)
return loss
梯度聚合
在mindspore官网的代码模板中,我们可以得知,mindspore完成梯度聚合的核心步骤为
- 创建_grad_sum变量用来存储梯度的聚合值
- 使用_zeros用来对梯度和进行清零
# 这里不做过多介绍两个功能图的使用方法,模仿上面的梯度缩放代码即可
_sum_op = C.MultitypeFuncGraph("grad_sum_op")
_clear_op = C.MultitypeFuncGraph("clear_op")
@_clear_op.register("Tensor", "Tensor")
def _clear_grad_sum(grad_sum, zero):
"""Apply zero to clear grad_sum."""
success = True
success = F.depend(success, F.assign(grad_sum, zero))
return success
@grad_scale.register("Tensor", "Tensor")
def tensor_grad_scale(scale, grad):
return grad * P.Reciprocal()(scale)
class TrainingWrapper(nn.Cell):
def __init__(self, network, optimizer, sens=1.0, use_global_norm=True, clip_global_norm_value=1.0,
accumulation_step=1):
...
self.clip_global_norm_value = clip_global_norm_value / accumulation_step
self.accumulation_step = int(accumulation_step)
# 用来记录当前的训练步数,用来做梯度累积的辅助参数
self.cur_step_num = mindspore.Parameter(Tensor(0, mstype.int64), requires_grad=False)
# 存储梯度和的变量,使用clone方法进行复制,初始化全为0
self._grad_sum = optimizer.parameters.clone(prefix="grad_sum", init='zeros')
self._zeros = optimizer.parameters.clone(prefix="zeros", init='zeros')
def construct(self, *args):
self.cur_step_num = self.cur_step_num + 1
weights = self.weights
loss = self.network(*args)
if self.accumulation_step == 1:
...
else:
# 因为是做累积,因此每次的loss还是得除以一下的
loss = loss / self.accumulation_step
sens = P.Fill()(P.DType()(loss), P.Shape()(loss), self.sens)
grads = self.grad(self.network, weights)(*args, sens)
if self.reducer_flag:
# apply grad reducer on grads
grads = self.grad_reducer(grads)
if self.sens > 1:
grads = self.hyper_map(F.partial(grad_scale, F.scalar_to_array(self.sens)), grads)
if self.use_global_norm:
grads = C.clip_by_global_norm(grads, clip_norm=self.clip_global_norm_value)
# assign to self._grad_sum 梯度累积
self.hyper_map(F.partial(_sum_op), self._grad_sum, grads)
if self.cur_step_num % self.accumulation_step == 0:
# optimizer
self.optimizer(self._grad_sum)
# clear grads 累积梯度->更新模型->梯度清零
self.hyper_map(F.partial(_clear_op), self._grad_sum, self._zeros)
EMA权重模型更新
针对EMA模型更新,我们的思路很简单,就是在一开始为模型创建一个Cell副本,和大佬的提供的模板具体实现步骤有一些不同
_ema_op = C.MultitypeFuncGraph("grad_ema_op")
@_ema_op.register("Tensor", "Tensor", "Tensor")
def _ema_weights(factor, ema_weight, weight):
"""Apply grad sum to cumulative gradient."""
add = P.Assign()
# 应用了动量更新
return add(ema_weight, ema_weight * factor + weight * (1 - factor))
class EMACell(nn.Cell):
def __init__(self, weights, ema_decay=0.9999):
super(EMACell, self).__init__()
self.ema_weights = weights.clone(prefix="ema_weights")
self.ema_decay = Tensor(ema_decay, mstype.float32)
self.hyper_map = C.HyperMap()
self.print = P.Print()
def construct(self, weights):
# 实现的思路很简单,传入的参数是M(n+1)我们更新到M'(n+1)中
success = self.hyper_map(F.partial(_ema_op, self.ema_decay), self.ema_weights, weights)
return success
很简单可以看到,我们在初始化的时候为EMA权重模型创建了一个副本,用来对齐进行更新,同时,这份权重也是算在模型里面的,当我们训练的时候,也将作为ckpt保存下来,可以全部存下来统一测试,看看是EMA的权重比较好还是原来的模型比较好
class TrainingWrapper(nn.Cell):
def __init__(self, network, optimizer, sens=1.0, use_global_norm=True, clip_global_norm_value=1.0,
accumulation_step=1, **kwargs):
...
# ema weight update
self.enable_ema = kwargs.get("enable_ema", False)
if self.enable_ema:
ema_decay = kwargs.get("ema_decay", 0.9999)
self._ema_cell = EMACell(self.weights, ema_decay=ema_decay)
def construct(self, *args):
...
if self.accumulation_step == 1:
...
self.optimizer(grads)
if self.enable_ema:
self._ema_cell(self.weights)
else:
...
if self.cur_step_num % self.accumulation_step == 0:
# optimizer
self.optimizer(self._grad_sum)
# clear grads
self.hyper_map(F.partial(_clear_op), self._grad_sum, self._zeros)
if self.enable_ema:
self._ema_cell(self.weights)
...
模型训练
"""
这份代码是作为分类任务的代码模板的
MindSpore定义完成TrainWrapper的流程完成后,要定义训练
1. 定义网络
2. 定义模型是输出输入到lossfuction然后输出loss的cell,封装
3. 将net_with_loss用Model封装
4. 如果要使用model.eval功能,就需要对eval_network进行封装,这里官网是WithEvalCell return的是loss, outputs, label,给 metrics={"acc", "loss"}即可
"""
net_with_loss = TrainingWrapper(net_with_loss, optimizer, sens=args.loss_scale,
clip_global_norm_value=args.clip_global_norm_value,
use_global_norm=True,
accumulation_step=args.accumulation_step,
enable_ema=args.enable_ema,
ema_decay=args.ema_decay)
eval_network = nn.WithEvalCell(net, criterion, args.amp_level in ["O2", "O3", "auto"])
eval_indexes = [0, 1, 2]
model = Model(net_with_loss, metrics={"acc", "loss"},
eval_network=eval_network,
eval_indexes=eval_indexes)
开始训练
在完成上述操作后,我们就可以开始训练了,记得开启dataset_sink_mode数据下沉,这是MindSpore对于我们普通玩家来说最好的功能之一,开启数据下沉之后可以大大提升模型的数据传输效率,充分利用硬件性能。
model.train(args.epochs, data.train_dataset, callbacks=[time_cb, ckpoint_cb, loss_cb, eval_cb],
dataset_sink_mode=True)
引用
- 点赞
- 收藏
- 关注作者
评论(0)