MindSpore社区活动:基于MindSpore框架实现one-stage目标检测模型SSD-个人体验
""""Args:""""""""""# 定位损失# 类别损失。

1 模型简介
SSD,全称Single Shot MultiBox Detector,是作者Wei Liu在ECCV 2016上发表的论文。对于输入尺寸300x300的网络,使用Nvidia Titan X在VOC 2007测试集上达到74.3%mAP以及59FPS,对于512x512的网络,达到了76.9%mAP。超越当时最强的Faster RCNN(73.2%mAP)。
SSD是一种目标检测算法,目标检测主流算法分成可以两个类型:
(1). two-stage方法
(2). one-stage方法
SSD是单阶段的目标检测算法,通过卷积神经网络进行特征提取,取不同的特征层进行检测输出,所以SSD是一种多尺度的检测方法。在需要检测的特征层,直接使用一个3x3卷积,进行通道的变换。SSD采用了anchor的策略,预设不同长宽比例的anchor,每一个输出特征层基于anchor预测多个检测框(4或者6)。采用了多尺度检测方法,浅层用于检测小目标,深层用于检测大目标。
1.1 模型结构
SSD采用VGG16作为基础模型,然后在VGG16的基础上新增了卷积层来获得更多的特征图以用于检测。SSD的网络结构如图所示。上面是SSD模型,下面是Yolo模型,可以明显看到SSD利用了多尺度的特征图做检测。
SSD对比了YOLO系列目标检测方法,不同的是SSD通过卷积得到最后的边界框,而YOLO对最后的输出采用全连接的形式得到一维向量,对向量进行拆解得到最终的检测框。
1.2 模型特点
a)多尺度检测
在SSD的网络结构图中我们可以看到,SSD使用了多个特征层,一共6种不同的特征图尺寸。大尺度特征图(较靠前的特征图)可以用来检测小物体,而小尺度特征图(较靠后的特征图)用来检测大物体。多尺度检测的方式,可以使得检测更加充分(SSD属于密集检测),更能检测出小目标。
b)采用卷积进行检测
与Yolo最后采用全连接层不同,SSD直接采用卷积对不同的特征图来进行提取检测结果。
c)预设anchor
在yolov1中,直接由网络预测目标的尺寸,这种方式使得预测框的长宽比和尺寸没有限制,难以训练。在SSD中,采用预设边界框,我们习惯称它为anchor(在SSD论文中叫default bounding boxes),预测框的尺寸在anchor的指导下进行微调。
2.案例实现
其中数据集组织如下

数据集处理

数据预处理

数据集创建

构建模型

代码:
import mindspore as ms
import mindspore.nn as nn
from src.vgg16 import vgg16
import mindspore.ops as ops
import ml_collections
from src.config import get_config
config = get_config()
def _make_divisible(v, divisor, min_value=None):
"""ensures that all layers have a channel number that is divisible by 8."""
if min_value is None:
min_value = divisor
new_v = max(min_value, int(v + divisor / 2) // divisor * divisor)
# Make sure that round down does not go down by more than 10%.
if new_v < 0.9 * v:
new_v += divisor
return new_v
def _conv2d(in_channel, out_channel, kernel_size=3, stride=1, pad_mod='same'):
return nn.Conv2d(in_channel, out_channel, kernel_size=kernel_size, stride=stride,
padding=0, pad_mode=pad_mod, has_bias=True)
def _bn(channel):
return nn.BatchNorm2d(channel, eps=1e-3, momentum=0.97,
gamma_init=1, beta_init=0, moving_mean_init=0, moving_var_init=1)
def _last_conv2d(in_channel, out_channel, kernel_size=3, stride=1, pad_mod='same', pad=0):
in_channels = in_channel
out_channels = in_channel
depthwise_conv = nn.Conv2d(in_channels, out_channels, kernel_size, stride, pad_mode='same',
padding=pad, group=in_channels)
conv = _conv2d(in_channel, out_channel, kernel_size=1)
return nn.SequentialCell([depthwise_conv, _bn(in_channel), nn.ReLU6(), conv])
class FlattenConcat(nn.Cell):
def __init__(self, config):
super(FlattenConcat, self).__init__()
self.num_ssd_boxes = config.num_ssd_boxes
self.concat = ops.Concat(axis=1)
self.transpose = ops.Transpose()
def construct(self, inputs):
output = ()
batch_size = ops.shape(inputs[0])[0]
for x in inputs:
x = self.transpose(x, (0, 2, 3, 1))
output += (ops.reshape(x, (batch_size, -1)),)
res = self.concat(output)
return ops.reshape(res, (batch_size, self.num_ssd_boxes, -1))
class GridAnchorGenerator:
"""
Anchor Generator
"""
def __init__(self, image_shape, scale, scales_per_octave, aspect_ratios):
super(GridAnchorGenerator, self).__init__()
self.scale = scale
self.scales_per_octave = scales_per_octave
self.aspect_ratios = aspect_ratios
self.image_shape = image_shape
def generate(self, step):
scales = np.array([2**(float(scale) / self.scales_per_octave)
for scale in range(self.scales_per_octave)]).astype(np.float32)
aspects = np.array(list(self.aspect_ratios)).astype(np.float32)
scales_grid, aspect_ratios_grid = np.meshgrid(scales, aspects)
scales_grid = scales_grid.reshape([-1])
aspect_ratios_grid = aspect_ratios_grid.reshape([-1])
feature_size = [self.image_shape[0] / step, self.image_shape[1] / step]
grid_height, grid_width = feature_size
base_size = np.array([self.scale * step, self.scale * step]).astype(np.float32)
anchor_offset = step / 2.0
ratio_sqrt = np.sqrt(aspect_ratios_grid)
heights = scales_grid / ratio_sqrt * base_size[0]
widths = scales_grid * ratio_sqrt * base_size[1]
y_centers = np.arange(grid_height).astype(np.float32)
y_centers = y_centers * step + anchor_offset
x_centers = np.arange(grid_width).astype(np.float32)
x_centers = x_centers * step + anchor_offset
x_centers, y_centers = np.meshgrid(x_centers, y_centers)
x_centers_shape = x_centers.shape
y_centers_shape = y_centers.shape
widths_grid, x_centers_grid = np.meshgrid(widths, x_centers.reshape([-1]))
heights_grid, y_centers_grid = np.meshgrid(heights, y_centers.reshape([-1]))
x_centers_grid = x_centers_grid.reshape(*x_centers_shape, -1)
y_centers_grid = y_centers_grid.reshape(*y_centers_shape, -1)
widths_grid = widths_grid.reshape(-1, *x_centers_shape)
heights_grid = heights_grid.reshape(-1, *y_centers_shape)
bbox_centers = np.stack([y_centers_grid, x_centers_grid], axis=3)
bbox_sizes = np.stack([heights_grid, widths_grid], axis=3)
bbox_centers = bbox_centers.reshape([-1, 2])
bbox_sizes = bbox_sizes.reshape([-1, 2])
bbox_corners = np.concatenate([bbox_centers - 0.5 * bbox_sizes, bbox_centers + 0.5 * bbox_sizes], axis=1)
self.bbox_corners = bbox_corners / np.array([*self.image_shape, *self.image_shape]).astype(np.float32)
self.bbox_centers = np.concatenate([bbox_centers, bbox_sizes], axis=1)
self.bbox_centers = self.bbox_centers / np.array([*self.image_shape, *self.image_shape]).astype(np.float32)
print(self.bbox_centers.shape)
return self.bbox_centers, self.bbox_corners
def generate_multi_levels(self, steps):
bbox_centers_list = []
bbox_corners_list = []
for step in steps:
bbox_centers, bbox_corners = self.generate(step)
bbox_centers_list.append(bbox_centers)
bbox_corners_list.append(bbox_corners)
self.bbox_centers = np.concatenate(bbox_centers_list, axis=0)
self.bbox_corners = np.concatenate(bbox_corners_list, axis=0)
return self.bbox_centers, self.bbox_corners
class MultiBox(nn.Cell):
"""
Multibox conv layers. Each multibox layer contains class conf scores and localization predictions.
"""
def __init__(self, config):
super(MultiBox, self).__init__()
num_classes = 81
out_channels = [512, 1024, 512, 256, 256, 256]
num_default = config.num_default
loc_layers = []
cls_layers = []
for k, out_channel in enumerate(out_channels):
loc_layers += [_last_conv2d(out_channel, 4 * num_default[k],
kernel_size=3, stride=1, pad_mod='same', pad=0)]
cls_layers += [_last_conv2d(out_channel, num_classes * num_default[k],
kernel_size=3, stride=1, pad_mod='same', pad=0)]
self.multi_loc_layers = nn.layer.CellList(loc_layers)
self.multi_cls_layers = nn.layer.CellList(cls_layers)
self.flatten_concat = FlattenConcat(config)
def construct(self, inputs):
loc_outputs = ()
cls_outputs = ()
for i in range(len(self.multi_loc_layers)):
loc_outputs += (self.multi_loc_layers[i](inputs[i]),)
cls_outputs += (self.multi_cls_layers[i](inputs[i]),)
return self.flatten_concat(loc_outputs), self.flatten_concat(cls_outputs)
class SSD300VGG16(nn.Cell):
def __init__(self, config):
super(SSD300VGG16, self).__init__()
# VGG16 backbone: block1~5
self.backbone = vgg16()
# SSD blocks: block6~7
self.b6_1 = nn.Conv2d(in_channels=512, out_channels=1024, kernel_size=3, padding=6, dilation=6, pad_mode='pad')
self.b6_2 = nn.Dropout(0.5)
self.b7_1 = nn.Conv2d(in_channels=1024, out_channels=1024, kernel_size=1)
self.b7_2 = nn.Dropout(0.5)
# Extra Feature Layers: block8~11
self.b8_1 = nn.Conv2d(in_channels=1024, out_channels=256, kernel_size=1, padding=1, pad_mode='pad')
self.b8_2 = nn.Conv2d(in_channels=256, out_channels=512, kernel_size=3, stride=2, pad_mode='valid')
self.b9_1 = nn.Conv2d(in_channels=512, out_channels=128, kernel_size=1, padding=1, pad_mode='pad')
self.b9_2 = nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, stride=2, pad_mode='valid')
self.b10_1 = nn.Conv2d(in_channels=256, out_channels=128, kernel_size=1)
self.b10_2 = nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, pad_mode='valid')
self.b11_1 = nn.Conv2d(in_channels=256, out_channels=128, kernel_size=1)
self.b11_2 = nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, pad_mode='valid')
# boxes
self.multi_box = MultiBox(config)
if not self.training:
self.activation = ops.Sigmoid()
def construct(self, x):
# VGG16 backbone: block1~5
block4, x = self.backbone(x)
# SSD blocks: block6~7
x = self.b6_1(x) # 1024
x = self.b6_2(x)
x = self.b7_1(x) # 1024
x = self.b7_2(x)
block7 = x
# Extra Feature Layers: block8~11
x = self.b8_1(x) # 256
x = self.b8_2(x) # 512
block8 = x
x = self.b9_1(x) # 128
x = self.b9_2(x) # 256
block9 = x
x = self.b10_1(x) # 128
x = self.b10_2(x) # 256
block10 = x
x = self.b11_1(x) # 128
x = self.b11_2(x) # 256
block11 = x
# boxes
multi_feature = (block4, block7, block8, block9, block10, block11)
pred_loc, pred_label = self.multi_box(multi_feature)
if not self.training:
pred_label = self.activation(pred_label)
pred_loc = ops.cast(pred_loc, ms.float32)
pred_label = ops.cast(pred_label, ms.float32)
return pred_loc, pred_label
def ssd_vgg16(**kwargs):
return SSD300VGG16(**kwargs)
定义损失函数

代码:
import mindspore.nn as nn
import mindspore.ops as ops
from mindspore import Tensor
grad_scale = ops.MultitypeFuncGraph("grad_scale")
class SigmoidFocalClassificationLoss(nn.Cell):
""""
Sigmoid focal-loss for classification.
Args:
gamma (float): Hyper-parameter to balance the easy and hard examples. Default: 2.0
alpha (float): Hyper-parameter to balance the positive and negative example. Default: 0.25
"""
def __init__(self, gamma=2.0, alpha=0.25):
super(SigmoidFocalClassificationLoss, self).__init__()
self.sigmiod_cross_entropy = ops.SigmoidCrossEntropyWithLogits()
self.sigmoid = ops.Sigmoid()
self.pow = ops.Pow()
self.onehot = ops.OneHot()
self.on_value = Tensor(1.0, ms.float32)
self.off_value = Tensor(0.0, ms.float32)
self.gamma = gamma
self.alpha = alpha
def construct(self, logits, label):
label = self.onehot(label, ops.shape(logits)[-1], self.on_value, self.off_value)
sigmiod_cross_entropy = self.sigmiod_cross_entropy(logits, label)
sigmoid = self.sigmoid(logits)
label = ops.cast(label, ms.float32)
p_t = label * sigmoid + (1 - label) * (1 - sigmoid)
modulating_factor = self.pow(1 - p_t, self.gamma)
alpha_weight_factor = label * self.alpha + (1 - label) * (1 - self.alpha)
focal_loss = modulating_factor * alpha_weight_factor * sigmiod_cross_entropy
return focal_loss
class SSDWithLossCell(nn.Cell):
""""
Provide SSD training loss through network.
"""
def __init__(self, network, config):
super(SSDWithLossCell, self).__init__()
self.network = network
self.less = ops.Less()
self.tile = ops.Tile()
self.reduce_sum = ops.ReduceSum()
self.expand_dims = ops.ExpandDims()
self.class_loss = SigmoidFocalClassificationLoss(config.gamma, config.alpha)
self.loc_loss = nn.SmoothL1Loss()
def construct(self, x, gt_loc, gt_label, num_matched_boxes):
pred_loc, pred_label = self.network(x)
mask = ops.cast(self.less(0, gt_label), ms.float32)
num_matched_boxes = self.reduce_sum(ops.cast(num_matched_boxes, ms.float32))
# 定位损失
mask_loc = self.tile(self.expand_dims(mask, -1), (1, 1, 4))
smooth_l1 = self.loc_loss(pred_loc, gt_loc) * mask_loc
loss_loc = self.reduce_sum(self.reduce_sum(smooth_l1, -1), -1)
# 类别损失
loss_cls = self.class_loss(pred_label, gt_label)
loss_cls = self.reduce_sum(loss_cls, (1, 2))
return self.reduce_sum((loss_cls + loss_loc) / num_matched_boxes)
# net = SSDWithLossCell(ssd, config)
非极大值抑制设置

训练代码:
import os
import math
import itertools as it
from mindspore.train import Model
from src.config import get_config
import mindspore as ms
from mindspore.train.callback import CheckpointConfig, ModelCheckpoint, LossMonitor, TimeMonitor
from src.init_params import init_net_param
from src.lr_schedule import get_lr
from mindspore.common import set_seed
class TrainingWrapper(nn.Cell):
def __init__(self, network, optimizer, sens=1.0):
super(TrainingWrapper, self).__init__(auto_prefix=False)
self.network = network
self.network.set_grad()
self.weights = ms.ParameterTuple(network.trainable_params())
self.optimizer = optimizer
self.grad = ops.GradOperation(get_by_list=True, sens_param=True)
self.sens = sens
self.hyper_map = ops.HyperMap()
def construct(self, *args):
weights = self.weights
loss = self.network(*args)
sens = ops.Fill()(ops.DType()(loss), ops.Shape()(loss), self.sens)
grads = self.grad(self.network, weights)(*args, sens)
self.optimizer(grads)
return loss
def generate_multi_levels(self, steps):
bbox_centers_list = []
bbox_corners_list = []
for step in steps:
bbox_centers, bbox_corners = self.generate(step)
bbox_centers_list.append(bbox_centers)
bbox_corners_list.append(bbox_corners)
self.bbox_centers = np.concatenate(bbox_centers_list, axis=0)
self.bbox_corners = np.concatenate(bbox_corners_list, axis=0)
return self.bbox_centers, self.bbox_corners
class GeneratDefaultBoxes():
"""
Generate Default boxes for SSD, follows the order of (W, H, archor_sizes).
`self.default_boxes` has a shape of [archor_sizes, H, W, 4], the last dimension is [y, x, h, w].
`self.default_boxes_tlbr` has a shape as `self.default_boxes`, the last dimension is [y1, x1, y2, x2].
"""
def __init__(self):
# print(config)
fk = config.img_shape[0] / np.array(config.steps)
scale_rate = (config.max_scale - config.min_scale) / (len(config.num_default) - 1)
scales = [config.min_scale + scale_rate * i for i in range(len(config.num_default))] + [1.0]
self.default_boxes = []
for idex, feature_size in enumerate(config.feature_size):
sk1 = scales[idex]
sk2 = scales[idex + 1]
sk3 = math.sqrt(sk1 * sk2)
if idex == 0 and not config.aspect_ratios[idex]:
w, h = sk1 * math.sqrt(2), sk1 / math.sqrt(2)
all_sizes = [(0.1, 0.1), (w, h), (h, w)]
else:
all_sizes = [(sk1, sk1)]
for aspect_ratio in config.aspect_ratios[idex]:
w, h = sk1 * math.sqrt(aspect_ratio), sk1 / math.sqrt(aspect_ratio)
all_sizes.append((w, h))
all_sizes.append((h, w))
all_sizes.append((sk3, sk3))
assert len(all_sizes) == config.num_default[idex]
for i, j in it.product(range(feature_size), repeat=2):
for w, h in all_sizes:
cx, cy = (j + 0.5) / fk[idex], (i + 0.5) / fk[idex]
self.default_boxes.append([cy, cx, h, w])
def to_tlbr(cy, cx, h, w):
return cy - h / 2, cx - w / 2, cy + h / 2, cx + w / 2
# For IoU calculation
self.default_boxes_tlbr = np.array(tuple(to_tlbr(*i) for i in self.default_boxes), dtype='float32')
self.default_boxes = np.array(self.default_boxes, dtype='float32')
if hasattr(config, 'use_anchor_generator') and config.use_anchor_generator:
generator = GridAnchorGenerator(config.img_shape, 4, 2, [1.0, 2.0, 0.5])
default_boxes, default_boxes_tlbr = generator.generate_multi_levels(config.steps)
else:
default_boxes_tlbr = GeneratDefaultBoxes().default_boxes_tlbr
default_boxes = GeneratDefaultBoxes().default_boxes
y1, x1, y2, x2 = np.split(default_boxes_tlbr[:, :4], 4, axis=-1)
vol_anchors = (x2 - x1) * (y2 - y1)
matching_threshold = config.match_threshold
set_seed(1)
# 自定义参数获取
config = get_config()
ms.set_context(mode=ms.GRAPH_MODE, device_target= "CPU")
# 数据加载
mindrecord_dir = os.path.join(config.data_path, config.mindrecord_dir)
mindrecord_file = os.path.join(mindrecord_dir, "ssd.mindrecord"+ "0")
dataset = create_ssd_dataset(mindrecord_file, batch_size=config.batch_size,rank=0, use_multiprocessing=False)
dataset_size = dataset.get_dataset_size()
# checkpoint
ckpt_config = CheckpointConfig(save_checkpoint_steps=dataset_size * config.save_checkpoint_epochs)
ckpt_save_dir = config.output_path + '/ckpt_{}/'.format(0)
ckpoint_cb = ModelCheckpoint(prefix="ssd", directory=ckpt_save_dir, config=ckpt_config)
# 网络定义与初始化
ssd = ssd_vgg16(config=config)
init_net_param(ssd)
# print(ssd)
net = SSDWithLossCell(ssd, config)
# print(net)
lr = Tensor(get_lr(global_step=config.pre_trained_epoch_size * dataset_size,
lr_init=config.lr_init, lr_end=config.lr_end_rate * config.lr, lr_max=config.lr,
warmup_epochs=config.warmup_epochs,total_epochs=config.epoch_size,steps_per_epoch=dataset_size))
opt = nn.Momentum(filter(lambda x: x.requires_grad, net.get_parameters()), lr,
config.momentum, config.weight_decay,float(config.loss_scale))
net = TrainingWrapper(net, opt, float(config.loss_scale))
callback = [TimeMonitor(data_size=dataset_size), LossMonitor(), ckpoint_cb]
model = Model(net)
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"
model.train(config.epoch_size, dataset, callbacks=callback)
训练后进行评估
代码:
import os
import mindspore as ms
from mindspore import Tensor
from src.config import get_config
config = get_config()
#print(config)
def ssd_eval(dataset_path, ckpt_path, anno_json):
"""SSD evaluation."""
batch_size = 1
ds = create_ssd_dataset(dataset_path, batch_size=batch_size,
is_training=False, use_multiprocessing=False)
net = ssd_vgg16(config=config)
net = SsdInferWithDecoder(net, Tensor(default_boxes), config)
print("Load Checkpoint!")
param_dict = ms.load_checkpoint(ckpt_path)
net.init_parameters_data()
ms.load_param_into_net(net, param_dict)
net.set_train(False)
total = ds.get_dataset_size() * batch_size
print("\n========================================\n")
print("total images num: ", total)
print("Processing, please wait a moment.")
eval_param_dict = {"net": net, "dataset": ds, "anno_json": anno_json}
mAP = apply_eval(eval_param_dict)
print("\n========================================\n")
print(f"mAP: {mAP}")
def eval_net():
if hasattr(config, 'num_ssd_boxes') and config.num_ssd_boxes == -1:
num = 0
h, w = config.img_shape
for i in range(len(config.steps)):
num += (h // config.steps[i]) * (w // config.steps[i]) * config.num_default[i]
config.num_ssd_boxes = num
coco_root = config.coco_root
json_path = os.path.join(coco_root, config.instances_set.format(config.val_data_type))
ms.set_context(mode=ms.GRAPH_MODE, device_target=config.device_target)
mindrecord_dir = os.path.join(config.data_path, config.mindrecord_dir)
mindrecord_file = os.path.join(mindrecord_dir, "ssd_eval.mindrecord"+ "0")
#mindrecord_file = create_mindrecord(config.dataset, "ssd_eval.mindrecord", False)
print("Start Eval!")
ssd_eval(mindrecord_file, config.checkpoint_file_path, json_path)
eval_net()
最终评估效果(本地cpu环境下)

参考
鲲鹏昇腾开发者社区是面向全社会开放的“联接全球计算开发者,聚合华为+生态”的社区,内容涵盖鲲鹏、昇腾资源,帮助开发者快速获取所需的知识、经验、软件、工具、算力,支撑开发者易学、好用、成功,成为核心开发者。
更多推荐


所有评论(0)