An Overview of Model Compression and Acceleration Author:Jet Date:2023/07
综合现有的深度模型压缩方法,它们主要分为四类:
image-20230727150746346
一般来说,参数修剪和共享,低秩分解和知识蒸馏方法可以用于全连接层和卷积层的 CNN,但另一方面,使用转移/紧凑型卷积核的方法仅支持卷积层。低秩因子分解和基于转换/紧凑型卷积核的方法提供了一个端到端的流水线,可以很容易地在 CPU/GPU 环境中实现。相反参数修剪和共享使用不同的方法,如矢量量化,二进制编码和稀疏约束来执行任务,这导致常需要几个步骤才能达到目标。
基于参数修剪/共享、低秩分解的模型可以从预训练模型或者从头开始训练,因此灵活而有效。然而转移/紧凑的卷积核和知识蒸馏模型只能支持从零开始训练。
三类:模型量化和二进制化、参数共享和结构化矩阵(structural matrix)
image-20230727145531991
典型的 CNN 卷积核是一个 4D 张量,而全连接层也可以当成一个 2D 矩阵,低秩分解同样可行。这些张量中可能存在大量的冗余。所有近似过程都是逐层进行的,在一个层经过低秩滤波器近似之后,该层的参数就被固定了,而之前的层已经用一种重构误差标准(reconstruction error criterion)微调过。
在 Inception 结构中使用了将 3×3 卷积分解成两个 1×1 的卷积;SqueezeNet 提出用 1×1 卷积来代替 3×3 卷积,与 AlexNet 相比,SqueezeNet 创建了一个紧凑的神经网络,参数少了 50 倍,准确度相当。
“学生-教师”的范式,即通过软化“教师”的输出而惩罚“学生”
只能用于具有 Softmax 损失函数分类任务
TensorFlow 支持的是一种静态图,当模型的参数确定之后,便无法继续修改。这对于逐阶段、分层的训练带来了一定的困难。相比之下,Pytorch 使用了动态图,在定义完模型之后还可以边训练边修改其参数,具有很高的灵活性。这也是深度学习未来的发展方向
image-20230731114838134
image-20230731103229622
image-20230727171635419
重参数化优化器, 针对VGG这种结构进行优化
将先验信息用于修改梯度数值,称为梯度重参数化,对应的优化器称为RepOptimizer。我们着重关注VGG式的直筒模型,训练得到RepOptVGG模型,他有着高训练效率,简单直接的结构和极快的推理速度。
https://github.com/meituan/YOLOv6/blob/main/docs/tutorial_repopt.md
def quant_sensitivity_analyse(model_ptq, evaler):
# disable all quantable layer
model_quant_disable(model_ptq)
# analyse each quantable layer
quant_sensitivity = list()
for k, m in model_ptq.named_modules():
if isinstance(m, quant_nn.QuantConv2d) or \
isinstance(m, quant_nn.QuantConvTranspose2d) or \
isinstance(m, quant_nn.MaxPool2d):
module_quant_enable(model_ptq, k)
else:
# module can not be quantized, continue
continue
eval_result = evaler.eval(model_ptq)
print(eval_result)
print("Quantize Layer {}, result mAP0.5 = {:0.4f}, mAP0.5:0.95 = {:0.4f}".format(k,
eval_result[0],
eval_result[1]))
quant_sensitivity.append((k, eval_result[0], eval_result[1]))
# disable this module sensitivity, anlayse next module
module_quant_disable(model_ptq, k)
return quant_sensitivity
EfficientRep:An Efficient Repvgg-style ConvNets with Hardware-aware Neural Network Design
backbone=dict(
type='EfficientRep',
num_repeats=[1, 6, 12, 18, 6],
out_channels=[64, 128, 256, 512, 1024],
),
neck=dict(
type='RepPANNeck',
num_repeats=[12, 12, 12, 12],
out_channels=[256, 128, 128, 256, 256, 512],
),
head=dict(
type='EffiDeHead',
in_channels=[128, 256, 512],
num_layers=3,
begin_indices=24,
anchors=1,
out_indices=[17, 20, 23],
strides=[8, 16, 32],
atss_warmup_epoch=0,
iou_type='giou',
use_dfl=False,
reg_max=0
)
用了RepVGGOptimizer SGD针对VGGBlock进行优化
from ..layers.common import RealVGGBlock, LinearAddBlock
def extract_blocks_into_list(model, blocks):
for module in model.children():
if isinstance(module, LinearAddBlock) or isinstance(module, RealVGGBlock):
blocks.append(module)
else:
extract_blocks_into_list(module, blocks)
class RepVGGOptimizer(SGD):
'''scales is a list, scales[i] is a triple (scale_identity.weight, scale_1x1.weight, scale_conv.weight) or a two-tuple (scale_1x1.weight, scale_conv.weight) (if the block has no scale_identity)'''
def __init__(self, model, scales,
args, cfg, momentum=0, dampening=0,
weight_decay=0, nesterov=True,
reinit=True, use_identity_scales_for_reinit=True,
cpu_mode=False):
defaults = dict(lr=cfg.solver.lr0, momentum=cfg.solver.momentum, dampening=dampening, weight_decay=weight_decay, nesterov=nesterov)
if nesterov and (cfg.solver.momentum <= 0 or dampening != 0):
raise ValueError("Nesterov momentum requires a momentum and zero dampening")
# parameters = set_weight_decay(model)
parameters = get_optimizer_param(args, cfg, model)
super(SGD, self).__init__(parameters, defaults)
self.num_layers = len(scales)
blocks = []
extract_blocks_into_list(model, blocks)
convs = [b.conv for b in blocks]
assert len(scales) == len(convs)
if reinit:
for m in model.modules():
if isinstance(m, nn.BatchNorm2d):
gamma_init = m.weight.mean()
if gamma_init == 1.0:
LOGGER.info('Checked. This is training from scratch.')
else:
LOGGER.warning('========================== Warning! Is this really training from scratch ? =================')
LOGGER.info('##################### Re-initialize #############')
self.reinitialize(scales, convs, use_identity_scales_for_reinit)
self.generate_gradient_masks(scales, convs, cpu_mode)
def reinitialize(self, scales_by_idx, conv3x3_by_idx, use_identity_scales):
for scales, conv3x3 in zip(scales_by_idx, conv3x3_by_idx):
in_channels = conv3x3.in_channels
out_channels = conv3x3.out_channels
kernel_1x1 = nn.Conv2d(in_channels, out_channels, 1, device=conv3x3.weight.device)
if len(scales) == 2:
conv3x3.weight.data = conv3x3.weight * scales[1].view(-1, 1, 1, 1) \
+ F.pad(kernel_1x1.weight, [1, 1, 1, 1]) * scales[0].view(-1, 1, 1, 1)
else:
assert len(scales) == 3
assert in_channels == out_channels
identity = torch.from_numpy(np.eye(out_channels, dtype=np.float32).reshape(out_channels, out_channels, 1, 1)).to(conv3x3.weight.device)
conv3x3.weight.data = conv3x3.weight * scales[2].view(-1, 1, 1, 1) + F.pad(kernel_1x1.weight, [1, 1, 1, 1]) * scales[1].view(-1, 1, 1, 1)
if use_identity_scales: # You may initialize the imaginary CSLA block with the trained identity_scale values. Makes almost no difference.
identity_scale_weight = scales[0]
conv3x3.weight.data += F.pad(identity * identity_scale_weight.view(-1, 1, 1, 1), [1, 1, 1, 1])
else:
conv3x3.weight.data += F.pad(identity, [1, 1, 1, 1])
def generate_gradient_masks(self, scales_by_idx, conv3x3_by_idx, cpu_mode=False):
self.grad_mask_map = {}
for scales, conv3x3 in zip(scales_by_idx, conv3x3_by_idx):
para = conv3x3.weight
if len(scales) == 2:
mask = torch.ones_like(para, device=scales[0].device) * (scales[1] ** 2).view(-1, 1, 1, 1)
mask[:, :, 1:2, 1:2] += torch.ones(para.shape[0], para.shape[1], 1, 1, device=scales[0].device) * (scales[0] ** 2).view(-1, 1, 1, 1)
else:
mask = torch.ones_like(para, device=scales[0].device) * (scales[2] ** 2).view(-1, 1, 1, 1)
mask[:, :, 1:2, 1:2] += torch.ones(para.shape[0], para.shape[1], 1, 1, device=scales[0].device) * (scales[1] ** 2).view(-1, 1, 1, 1)
ids = np.arange(para.shape[1])
assert para.shape[1] == para.shape[0]
mask[ids, ids, 1:2, 1:2] += 1.0
if cpu_mode:
self.grad_mask_map[para] = mask
else:
self.grad_mask_map[para] = mask.cuda()
def __setstate__(self, state):
super(SGD, self).__setstate__(state)
for group in self.param_groups:
group.setdefault('nesterov', False)
def step(self, closure=None):
loss = None
if closure is not None:
loss = closure()
for group in self.param_groups:
weight_decay = group['weight_decay']
momentum = group['momentum']
dampening = group['dampening']
nesterov = group['nesterov']
for p in group['params']:
if p.grad is None:
continue
if p in self.grad_mask_map:
d_p = p.grad.data * self.grad_mask_map[p] # Note: multiply the mask here
else:
d_p = p.grad.data
if weight_decay != 0:
d_p.add_(weight_decay, p.data)
if momentum != 0:
param_state = self.state[p]
if 'momentum_buffer' not in param_state:
buf = param_state['momentum_buffer'] = torch.clone(d_p).detach()
else:
buf = param_state['momentum_buffer']
buf.mul_(momentum).add_(1 - dampening, d_p)
if nesterov:
d_p = d_p.add(momentum, buf)
else:
d_p = buf
p.data.add_(-group['lr'], d_p)
return loss
step
function, RepOptimizers will use the Grad Mults properly. For SGD, please see here. For AdamW, please see here and here.先做参数敏感性分析,再对不敏感的参数进行量化 所谓参数敏感性,就是判断参数变化对性能是否影响,如果影响很大,则敏感,否则不敏感, 采用控制变量法进行逐层分析 评价指标是在数据集上测试mAP,mAP相对全精度变化大则认为敏感 最后做敏感度排序
def quant_sensitivity_analyse(model_ptq, evaler):
# disable all quantable layer
model_quant_disable(model_ptq)
# analyse each quantable layer
quant_sensitivity = list()
for k, m in model_ptq.named_modules():
if isinstance(m, quant_nn.QuantConv2d) or \
isinstance(m, quant_nn.QuantConvTranspose2d) or \
isinstance(m, quant_nn.MaxPool2d):
module_quant_enable(model_ptq, k)
else:
# module can not be quantized, continue
continue
eval_result = evaler.eval(model_ptq)
print(eval_result)
print("Quantize Layer {}, result mAP0.5 = {:0.4f}, mAP0.5:0.95 = {:0.4f}".format(k,
eval_result[0],
eval_result[1]))
quant_sensitivity.append((k, eval_result[0], eval_result[1]))
# disable this module sensitivity, anlayse next module
module_quant_disable(model_ptq, k)
return quant_sensitivity
def module_quant_disable(model, k):
cur_module = get_module(model, k)
if hasattr(cur_module, '_input_quantizer'):
cur_module._input_quantizer.disable()
if hasattr(cur_module, '_weight_quantizer'):
cur_module._weight_quantizer.disable()
def module_quant_enable(model, k):
cur_module = get_module(model, k)
if hasattr(cur_module, '_input_quantizer'):
cur_module._input_quantizer.enable()
if hasattr(cur_module, '_weight_quantizer'):
cur_module._weight_quantizer.enable()
With partial quantization, we finally reach 42.1%, only 0.3% loss in accuracy, while the throughput of the partially quantized model is about 1.56 times that of the FP16 model at a batch size of 32. This method achieves a nice tradeoff between accuracy and throughput.
TensorQuantizer
进行控制image-20230728141622388
if quantize:
self.conv1 = quant_nn.QuantConv2d(3,
self.inplanes,
kernel_size=7,
stride=2,
padding=3,
bias=False)
else:
self.conv1 = nn.Conv2d(3, self.inplanes, kernel_size=7, stride=2, padding=3, bias=False)
https://github.com/Jermmy/pytorch-quantization-demo/blob/master/model.py#L34
QAT需要对需要量化的层进行替换,比如QConv2d QReLU QMaxPooling2d QLinear,相当于用Q重构模型进行训练和推理,其中Qxx参考 https://github.com/Jermmy/pytorch-quantization-demo/blob/master/module.py (最好用官方的进行替换QuantConv2d,或者源码替换QConv2d)
class Net(nn.Module):
def __init__(self, num_channels=1):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(num_channels, 40, 3, 1)
self.conv2 = nn.Conv2d(40, 40, 3, 1, groups=20)
self.fc = nn.Linear(5*5*40, 10)
def forward(self, x):
x = F.relu(self.conv1(x))
x = F.max_pool2d(x, 2, 2)
x = F.relu(self.conv2(x))
x = F.max_pool2d(x, 2, 2)
x = x.view(-1, 5*5*40)
x = self.fc(x)
return x
def quantize(self, num_bits=8):
self.qconv1 = QConv2d(self.conv1, qi=True, qo=True, num_bits=num_bits)
self.qrelu1 = QReLU()
self.qmaxpool2d_1 = QMaxPooling2d(kernel_size=2, stride=2, padding=0)
self.qconv2 = QConv2d(self.conv2, qi=False, qo=True, num_bits=num_bits)
self.qrelu2 = QReLU()
self.qmaxpool2d_2 = QMaxPooling2d(kernel_size=2, stride=2, padding=0)
self.qfc = QLinear(self.fc, qi=False, qo=True, num_bits=num_bits)
def quantize_forward(self, x):
x = self.qconv1(x)
x = self.qrelu1(x)
x = self.qmaxpool2d_1(x)
x = self.qconv2(x)
x = self.qrelu2(x)
x = self.qmaxpool2d_2(x)
x = x.view(-1, 5*5*40)
x = self.qfc(x)
return x
def freeze(self):
self.qconv1.freeze()
self.qrelu1.freeze(self.qconv1.qo)
self.qmaxpool2d_1.freeze(self.qconv1.qo)
self.qconv2.freeze(qi=self.qconv1.qo)
self.qrelu2.freeze(self.qconv2.qo)
self.qmaxpool2d_2.freeze(self.qconv2.qo)
self.qfc.freeze(qi=self.qconv2.qo)
def quantize_inference(self, x):
qx = self.qconv1.qi.quantize_tensor(x)
qx = self.qconv1.quantize_inference(qx)
qx = self.qrelu1.quantize_inference(qx)
qx = self.qmaxpool2d_1.quantize_inference(qx)
qx = self.qconv2.quantize_inference(qx)
qx = self.qrelu2.quantize_inference(qx)
qx = self.qmaxpool2d_2.quantize_inference(qx)
qx = qx.view(-1, 5*5*40)
qx = self.qfc.quantize_inference(qx)
out = self.qfc.qo.dequantize_tensor(qx)
return out
QuantDescriptor 定义量化方法直方图,量化位数8bit,作为量化输入描述子 conv2d_weight_default_desc 作为权重量化描述子 针对Conv2d ConvTranspose2d MaxPool2d分别用相应的量化算子替代即可 PTQ只需迭代1-2epoch即可,而且是推理阶段(前向传播 with torch.no_grad()) 考虑敏感性分析,非敏感层可以用PTQ,敏感层不变,可以减少精度损失
def quant_model_init(model, device):
model_ptq = copy.deepcopy(model)
model_ptq.eval()
model_ptq.to(device)
conv2d_weight_default_desc = tensor_quant.QUANT_DESC_8BIT_CONV2D_WEIGHT_PER_CHANNEL
conv2d_input_default_desc = QuantDescriptor(num_bits=8, calib_method='histogram')
convtrans2d_weight_default_desc = tensor_quant.QUANT_DESC_8BIT_CONVTRANSPOSE2D_WEIGHT_PER_CHANNEL
convtrans2d_input_default_desc = QuantDescriptor(num_bits=8, calib_method='histogram')
for k, m in model_ptq.named_modules():
if 'proj_conv' in k:
print("Skip Layer {}".format(k))
continue
if isinstance(m, nn.Conv2d):
in_channels = m.in_channels
out_channels = m.out_channels
kernel_size =
m.kernel_size
stride = m.stride
padding = m.padding
quant_conv = quant_nn.QuantConv2d(in_channels,
out_channels,
kernel_size,
stride,
padding,
quant_desc_input = conv2d_input_default_desc,
quant_desc_weight = conv2d_weight_default_desc)
quant_conv.weight.data.copy_(m.weight.detach())
if m.bias is not None:
quant_conv.bias.data.copy_(m.bias.detach())
else:
quant_conv.bias = None
set_module(model_ptq, k, quant_conv)
elif isinstance(m, nn.ConvTranspose2d):
in_channels = m.in_channels
out_channels = m.out_channels
kernel_size = m.kernel_size
stride = m.stride
padding = m.padding
quant_convtrans = quant_nn.QuantConvTranspose2d(in_channels,
out_channels,
kernel_size,
stride,
padding,
quant_desc_input = convtrans2d_input_default_desc,
quant_desc_weight = convtrans2d_weight_default_desc)
quant_convtrans.weight.data.copy_(m.weight.detach())
if m.bias is not None:
quant_convtrans.bias.data.copy_(m.bias.detach())
else:
quant_convtrans.bias = None
set_module(model_ptq, k, quant_convtrans)
elif isinstance(m, nn.MaxPool2d):
kernel_size = m.kernel_size
stride = m.stride
padding = m.padding
dilation = m.dilation
ceil_mode = m.ceil_mode
quant_maxpool2d = quant_nn.QuantMaxPool2d(kernel_size,
stride,
padding,
dilation,
ceil_mode,
quant_desc_input = conv2d_input_default_desc)
set_module(model_ptq, k, quant_maxpool2d)
else:
# module can not be quantized, continue
continue
return model_ptq.to(device)
def collect_stats(model, data_loader, num_batches):
"""Feed data to the network and collect statistic"""
# Enable calibrators
for name, module in model.named_modules():
if isinstance(module, quant_nn.TensorQuantizer):
if module._calibrator is not None:
module.disable_quant()
module.enable_calib()
else:
module.disable()
for i, (image, _) in tqdm(enumerate(data_loader), total=num_batches):
model(image.cuda())
if i >= num_batches:
break
# Disable calibrators
for name, module in model.named_modules():
if isinstance(module, quant_nn.TensorQuantizer):
if module._calibrator is not None:
module.enable_quant()
module.disable_calib()
else:
module.enable()
def compute_amax(model, **kwargs):
# Load calib result
for name, module in model.named_modules():
if isinstance(module, quant_nn.TensorQuantizer):
if module._calibrator is not None:
if isinstance(module._calibrator, calib.MaxCalibrator):
module.load_calib_amax()
else:
module.load_calib_amax(**kwargs)
print(F"{name:40}: {module}")
model.cuda()
# It is a bit slow since we collect histograms on CPU
with torch.no_grad():
collect_stats(model, data_loader, num_batches=2)
compute_amax(model, method="percentile", percentile=99.99)
1 As of v0.2.0 release, traditional post-training quantization (PTQ) produces a degraded performance of
YOLOv6-S
from 43.4% to 41.2%. 直接使用PTQ,精度降低了2.2% 2 We apply post-training quantization toYOLOv6-S-RepOpt
, and its mAP slightly drops by 0.5%. 使用了RepOPT、敏感性分析、PTQ之后,精度只降低了0.5% 3 Besides, we involve channel-wise distillation to accelerate the convergence. We finally reach a quantized model at 43.0% mAP. The performance arrives at 43.3% mAP, only 0.1% left to match the fully float precision ofYOLOv6-S
. 再使用QAT中通道蒸馏加快收敛,精度提高了0.4%,最后只损失了0.1%的精度
1 Quantization Aware Training is based on Straight Through Estimator (STE) derivative approximation. 量化感知训练是基于直通式估算器(STE)导数优化 2 After calibration is done, Quantization Aware Training is simply select a training schedule and continue training the calibrated model. Usually, it doesn’t need to fine tune very long. We usually use around 10% of the original training schedule, starting at 1% of the initial training learning rate, and a cosine annealing learning rate schedule that follows the decreasing half of a cosine period, down to 1% of the initial fine tuning learning rate (0.01% of the initial training learning rate). 先标定int8,后QAT,二者需要互斥 在训练的时候大概10%的epochs,学习率要小,初始学习率占1%,用cosine annealing learning衰减,知道学习率衰减到0.01%的初始学习率 3 Do not change quantization representation (scale) during training, at least not too frequently. Changing scale every step 不要频繁的按照step改变学习率
image-20230731114708858
if self.args.distill:
with torch.no_grad():
t_preds, t_featmaps = self.teacher_model(images)
temperature = self.args.temperature
total_loss, loss_items = self.compute_loss_distill(preds, t_preds, s_featmaps, t_featmaps, targets, \
epoch_num, self.max_epoch, temperature, step_num,
batch_height, batch_width)
def distill_loss_cw(self, s_feats, t_feats, temperature=1):
N,C,H,W = s_feats[0].shape
# print(N,C,H,W)
loss_cw = F.kl_div(F.log_softmax(s_feats[0].view(N,C,H*W)/temperature, dim=2),
F.log_softmax(t_feats[0].view(N,C,H*W).detach()/temperature, dim=2),
reduction='sum',
log_target=True) * (temperature * temperature)/ (N*C)
N,C,H,W = s_feats[1].shape
# print(N,C,H,W)
loss_cw += F.kl_div(F.log_softmax(s_feats[1].view(N,C,H*W)/temperature, dim=2),
F.log_softmax(t_feats[1].view(N,C,H*W).detach()/temperature, dim=2),
reduction='sum',
log_target=True) * (temperature * temperature)/ (N*C)
N,C,H,W = s_feats[2].shape
# print(N,C,H,W)
loss_cw += F.kl_div(F.log_softmax(s_feats[2].view(N,C,H*W)/temperature, dim=2),
F.log_softmax(t_feats[2].view(N,C,H*W).detach()/temperature, dim=2),
reduction='sum',
log_target=True) * (temperature * temperature)/ (N*C)
# print(loss_cw)
return loss_cw
# First set static member of TensorQuantizer to use Pytorch’s own fake quantization functions
from pytorch_quantization import nn as quant_nn
quant_nn.TensorQuantizer.use_fb_fake_quant = True
最好用docker-TensorRT-8.5版本
image-20230728144808883
image-20230728150155064
image-20230731102922901
本文分享自 iResearch666 微信公众号,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文参与 腾讯云自媒体同步曝光计划 ,欢迎热爱写作的你一起参与!