# Based on: Fast R-CNN
# Written by Ross Girshick
# --------------------------------------------------------
"""Caffe2 blob helper functions."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
import cPickle as pickle
import cv2
import numpy as np
from caffe2.proto import caffe2_pb2
from core.config import cfg
def im_list_to_blob(ims):
"""
将 images 列表转化为网络输入 blob.
假设 images 已经进行 prep_im_for_blob 处理,或类似的如下处理:
- BGR channel 顺序
- 减像素均值
- 输入尺寸调整
- 数据为 float32 numpy ndarray 格式
函数输入: images 列表
函数输出: 4D HCHW 的 imagess 张量,沿着 axis 0 轴连接而成.
"""
max_shape = np.array([im.shape for im in ims]).max(axis=0)
# 图片补零,以便于步长 stride 能够整除.
if cfg.FPN.FPN_ON:
stride = float(cfg.FPN.COARSEST_STRIDE)
max_shape[0] = int(np.ceil(max_shape[0] / stride) * stride)
max_shape[1] = int(np.ceil(max_shape[1] / stride) * stride)
num_images = len(ims)
blob = np.zeros((num_images, max_shape[0], max_shape[1], 3), dtype=np.float32)
for i in range(num_images):
im = ims[i]
blob[i, 0:im.shape[0], 0:im.shape[1], :] = im
# 将 channels (axis 3) 移动到 axis 1.
# 处理后,axis 顺序变为:(batch elem, channel, height, width)
channel_swap = (0, 3, 1, 2)
blob = blob.transpose(channel_swap)
return blob
def prep_im_for_blob(im, pixel_means, target_sizes, max_size):
"""
预处理作为网络输入 blob 的 images,包括:
- per-channel 减像素均值
- 数据转化为 float32
- 将 images 转换到特定尺寸(最大为 max_size)
函数输出:
- 预处理 images 列表 ims;
- 图片反变换的缩放因子(scale factors) im_scales.
"""
im = im.astype(np.float32, copy=False)
im -= pixel_means
im_shape = im.shape
im_size_min = np.min(im_shape[0:2])
im_size_max = np.max(im_shape[0:2])
ims = []
im_scales = []
for target_size in target_sizes:
im_scale = float(target_size) / float(im_size_min)
# 防止最大边的值超过 max_size
if np.round(im_scale * im_size_max) > max_size:
im_scale = float(max_size) / float(im_size_max)
im = cv2.resize(im, None, None, fx=im_scale, fy=im_scale, interpolation=cv2.INTER_LINEAR)
ims.append(im)
im_scales.append(im_scale)
return ims, im_scales
def zeros(shape, int32=False):
"""
返回给定大小的值全为 0 的 blob,数据类型是 int 或 float.
"""
return np.zeros(shape, dtype=np.int32 if int32 else np.float32)
def ones(shape, int32=False):
"""
返回给定大小的值全为 1 的 blob,数据类型是 int 或 float.
"""
return np.ones(shape, dtype=np.int32 if int32 else np.float32)
def py_op_copy_blob(blob_in, blob_out):
"""
将 numpy ndarray 格式的 blob_in 复制到 Caffe2 CPUTensor blob 格式的 blob_out.
函数用于将 numpy 数据复制到 PythonOps 中的 Caffe2 blob.
"""
# 某些情况下可能要求 Caffe2 支持 int32 blobs
needs_int32_init = False
try:
_ = blob.data.dtype # noqa
except Exception:
needs_int32_init = blob_in.dtype == np.int32
if needs_int32_init:
# init can only take a list (failed on tuple)
blob_out.init(list(blob_in.shape), caffe2_pb2.TensorProto.INT32)
else:
blob_out.reshape(blob_in.shape)
blob_out.data[...] = blob_in
def get_loss_gradients(model, loss_blobs):
"""
对于 loss_blobs 中指定的每个 loss,生成 1 的梯度gradient.
"""
loss_gradients = {}
for b in loss_blobs:
loss_grad = model.net.ConstantFill(b, [b + '_grad'], value=1.0)
loss_gradients[str(b)] = str(loss_grad)
return loss_gradients
def serialize(obj):
"""
采用 pickle 序列化 Python 对象object,并编码为数据类型为 float32 的数组,
以便于送入 Caffe workspace.
反序列化 - deserialize().
"""
return np.fromstring(pickle.dumps(obj), dtype=np.uint8).astype(np.float32)
def deserialize(arr):
"""
将 Caffe2 workspace 中拉取的数据类型为 float32 的数据反序列化为 Python对象.
序列化 - serialize().
"""
return pickle.loads(arr.astype(np.uint8).tobytes())
# Based on: Fast/er R-CNN
# Written by Ross Girshick
# --------------------------------------------------------
"""
Box 处理的相关函数.
Detectron 默认的 box 格式为:[x1, y1, x2, y2]
- (x1, y1): box 的左上角(top-left) 坐标
- (x2, y2): box 的右下角(bottom-right) 坐标
如果是其它格式 box ,如 [x, y, w, h],需要进行转换.
boxes.py 提供了一些转换函数(刚开始看可能觉得奇怪的函数):
- box 的 width 计算为: x2 - x1 + 1
- box 的 height 计算为: y2 - y1 + 1
这里 +1 可以追溯到早期目标检测,即坐标是整数像素值,而不是亚像素坐标(subpixel coordinate) 中的浮点数点坐标. x2=x1 和 y2=y1 时的 box 用于包括单个像素,width=1,因此需要 +1.
现在,大部分数据集提供的 boxes 格式一般都是浮点数坐标,因此,width 的计算为 x2 -x1 更合理.
实际上,只要模型训练和测试采用的变换函数是一致的,则结果都是 OK的(至少在 COCO 上已经被验证.)
由于在训练模型时,已经长期都是采用 +1 转换,因此,即使不喜欢使用这种方式,这里也不愿进行改变.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
import numpy as np
from core.config import cfg
import utils.cython_bbox as cython_bbox
import utils.cython_nms as cython_nms
bbox_overlaps = cython_bbox.bbox_overlaps
"""
bbox_overlaps:
输入:
- boxes: (N, 4) 的 float ndarray
- query_boxes: (K, 4) 的 float ndarray
输出:
- overlaps: (N, K) ndarray, boxes 和 query_boxes 间的重叠overlap.
"""
def boxes_area(boxes):
"""
计算 boxes 数组中各 boxes 的面积.
"""
w = (boxes[:, 2] - boxes[:, 0] + 1)
h = (boxes[:, 3] - boxes[:, 1] + 1)
areas = w * h
assert np.all(areas >= 0), 'Negative areas founds'
return areas
def unique_boxes(boxes, scale=1.0):
"""
返回唯一 boxes(unique boxes)的索引.
"""
v = np.array([1, 1e3, 1e6, 1e9])
hashes = np.round(boxes * scale).dot(v)
_, index = np.unique(hashes, return_index=True)
return np.sort(index)
def xywh_to_xyxy(xywh):
"""
将 [x1 y1 w h] 格式 box 转换为 [x1 y1 x2 y2] 格式.
"""
if isinstance(xywh, (list, tuple)):
# 单个 box 是以坐标值列表的形式给定,顺序是 xywh
assert len(xywh) == 4
x1, y1 = xywh[0], xywh[1]
x2 = x1 + np.maximum(0., xywh[2] - 1.)
y2 = y1 + np.maximum(0., xywh[3] - 1.)
return (x1, y1, x2, y2)
elif isinstance(xywh, np.ndarray):
# 多个 boxes 是以 2D ndarray 的形式给定.
return np.hstack((xywh[:, 0:2], xywh[:, 0:2] + np.maximum(0, xywh[:, 2:4] - 1)))
else:
raise TypeError('Argument xywh must be a list, tuple, or numpy array.')
def xyxy_to_xywh(xyxy):
"""
将 [x1 y1 x2 y2] 格式 box 转换为 [x1 y1 w h] 格式.
"""
if isinstance(xyxy, (list, tuple)):
# 单个 box 是以坐标值列表的形式给定,顺序是 xyxy
assert len(xyxy) == 4
x1, y1 = xyxy[0], xyxy[1]
w = xyxy[2] - x1 + 1
h = xyxy[3] - y1 + 1
return (x1, y1, w, h)
elif isinstance(xyxy, np.ndarray):
# 多个 boxes 是以 2D ndarray 的形式给定.
return np.hstack((xyxy[:, 0:2], xyxy[:, 2:4] - xyxy[:, 0:2] + 1))
else:
raise TypeError('Argument xyxy must be a list, tuple, or numpy array.')
def filter_small_boxes(boxes, min_size):
"""
过滤较小的 boxes;
只保留 width 和 height 都大于 min_size 的 boxes.
"""
w = boxes[:, 2] - boxes[:, 0] + 1
h = boxes[:, 3] - boxes[:, 1] + 1
keep = np.where((w > min_size) & (h > min_size))[0]
return keep
def clip_boxes_to_image(boxes, height, width):
"""
根据给定 height 和 width 的图片来裁剪 boxes array.
"""
boxes[:, [0, 2]] = np.minimum(width - 1., np.maximum(0., boxes[:, [0, 2]]))
boxes[:, [1, 3]] = np.minimum(height - 1., np.maximum(0., boxes[:, [1, 3]]))
return boxes
def clip_xyxy_to_image(x1, y1, x2, y2, height, width):
"""
根据给定 height 和 width 的图片来裁剪 boxes 的坐标.
"""
x1 = np.minimum(width - 1., np.maximum(0., x1))
y1 = np.minimum(height - 1., np.maximum(0., y1))
x2 = np.minimum(width - 1., np.maximum(0., x2))
y2 = np.minimum(height - 1., np.maximum(0., y2))
return x1, y1, x2, y2
def clip_tiled_boxes(boxes, im_shape):
"""
将 boxes 裁剪到图片边界boundaries.
- img_shape - [height, width]
- boxes - (N, 4 * num_tiled_boxes).
"""
assert boxes.shape[1] % 4 == 0, 'boxes.shape[1] is {:d}, but must be divisible by 4.'.format(boxes.shape[1])
# x1 >= 0
boxes[:, 0::4] = np.maximum(np.minimum(boxes[:, 0::4], im_shape[1] - 1), 0)
# y1 >= 0
boxes[:, 1::4] = np.maximum(np.minimum(boxes[:, 1::4], im_shape[0] - 1), 0)
# x2 < im_shape[1]
boxes[:, 2::4] = np.maximum(np.minimum(boxes[:, 2::4], im_shape[1] - 1), 0)
# y2 < im_shape[0]
boxes[:, 3::4] = np.maximum(np.minimum(boxes[:, 3::4], im_shape[0] - 1), 0)
return boxes
def bbox_transform(boxes, deltas, weights=(1.0, 1.0, 1.0, 1.0)):
"""
Forward 变换,采用边界框回归 deltas (bounding-box regression deltas) 将 proposals box 映射到 predicted boxes.
详细的权重参数描述可见 bbox_transform_inv 函数.
"""
if boxes.shape[0] == 0:
return np.zeros((0, deltas.shape[1]), dtype=deltas.dtype)
boxes = boxes.astype(deltas.dtype, copy=False)
widths = boxes[:, 2] - boxes[:, 0] + 1.0
heights = boxes[:, 3] - boxes[:, 1] + 1.0
ctr_x = boxes[:, 0] + 0.5 * widths
ctr_y = boxes[:, 1] + 0.5 * heights
wx, wy, ww, wh = weights
dx = deltas[:, 0::4] / wx
dy = deltas[:, 1::4] / wy
dw = deltas[:, 2::4] / ww
dh = deltas[:, 3::4] / wh
# 避免送入 np.exp() 的值太大.
dw = np.minimum(dw, cfg.BBOX_XFORM_CLIP)
dh = np.minimum(dh, cfg.BBOX_XFORM_CLIP)
pred_ctr_x = dx * widths[:, np.newaxis] + ctr_x[:, np.newaxis]
pred_ctr_y = dy * heights[:, np.newaxis] + ctr_y[:, np.newaxis]
pred_w = np.exp(dw) * widths[:, np.newaxis]
pred_h = np.exp(dh) * heights[:, np.newaxis]
pred_boxes = np.zeros(deltas.shape, dtype=deltas.dtype)
# x1
pred_boxes[:, 0::4] = pred_ctr_x - 0.5 * pred_w
# y1
pred_boxes[:, 1::4] = pred_ctr_y - 0.5 * pred_h
# x2 (note: "- 1" is correct; don't be fooled by the asymmetry)
pred_boxes[:, 2::4] = pred_ctr_x + 0.5 * pred_w - 1
# y2 (note: "- 1" is correct; don't be fooled by the asymmetry)
pred_boxes[:, 3::4] = pred_ctr_y + 0.5 * pred_h - 1
return pred_boxes
def bbox_transform_inv(boxes, gt_boxes, weights=(1.0, 1.0, 1.0, 1.0)):
"""
Inverse 变换,给定 proposal boxes 和 groundtruth boxes 时,计算目标边界框回归deltas (target bounding-box regression deltas).
weights 参数应该是 4-tuple 的乘法权重,用于要回归的目标(regression target).
在旧版本的代码中(包括 py-faster-rcnn),weights 参数的设置,是为了使训练数据集上,regression deltas 能够的标准偏差为1.
现在,weights 的设置默认是固定集 (10., 10., 5., 5.)的,而不是精确统计计算得到的.
这是一种近似权重,采用先前的单位标准偏差启发式从 COCO 上得到的.
"""
ex_widths = boxes[:, 2] - boxes[:, 0] + 1.0
ex_heights = boxes[:, 3] - boxes[:, 1] + 1.0
ex_ctr_x = boxes[:, 0] + 0.5 * ex_widths
ex_ctr_y = boxes[:, 1] + 0.5 * ex_heights
gt_widths = gt_boxes[:, 2] - gt_boxes[:, 0] + 1.0
gt_heights = gt_boxes[:, 3] - gt_boxes[:, 1] + 1.0
gt_ctr_x = gt_boxes[:, 0] + 0.5 * gt_widths
gt_ctr_y = gt_boxes[:, 1] + 0.5 * gt_heights
wx, wy, ww, wh = weights
targets_dx = wx * (gt_ctr_x - ex_ctr_x) / ex_widths
targets_dy = wy * (gt_ctr_y - ex_ctr_y) / ex_heights
targets_dw = ww * np.log(gt_widths / ex_widths)
targets_dh = wh * np.log(gt_heights / ex_heights)
targets = np.vstack((targets_dx, targets_dy, targets_dw, targets_dh)).transpose()
return targets
def expand_boxes(boxes, scale):
"""
跟根据给定的 scale 来扩展 boxes array.
"""
w_half = (boxes[:, 2] - boxes[:, 0]) * .5
h_half = (boxes[:, 3] - boxes[:, 1]) * .5
x_c = (boxes[:, 2] + boxes[:, 0]) * .5
y_c = (boxes[:, 3] + boxes[:, 1]) * .5
w_half *= scale
h_half *= scale
boxes_exp = np.zeros(boxes.shape)
boxes_exp[:, 0] = x_c - w_half
boxes_exp[:, 2] = x_c + w_half
boxes_exp[:, 1] = y_c - h_half
boxes_exp[:, 3] = y_c + h_half
return boxes_exp
def flip_boxes(boxes, im_width):
"""
水平反转 boxes.
"""
boxes_flipped = boxes.copy()
boxes_flipped[:, 0::4] = im_width - boxes[:, 2::4] - 1
boxes_flipped[:, 2::4] = im_width - boxes[:, 0::4] - 1
return boxes_flipped
def aspect_ratio(boxes, aspect_ratio):
"""
相对于 width (width-relative) 的长宽比(aspect ratio) 变换.
"""
boxes_ar = boxes.copy()
boxes_ar[:, 0::4] = aspect_ratio * boxes[:, 0::4]
boxes_ar[:, 2::4] = aspect_ratio * boxes[:, 2::4]
return boxes_ar
def box_voting(top_dets, all_dets, thresh, scoring_method='ID', beta=1.0):
"""
对 all_dets 进行边界框投票(bounding-box voting) 来改善 top_dets.
参见: https://arxiv.org/abs/1505.01749.
可选的 score 平均方法(不在参考论文里) 可以根据 scoring_method 进行设置.
"""
# top_dets is [N, 5] each row is [x1 y1 x2 y2, sore]
# all_dets is [N, 5] each row is [x1 y1 x2 y2, sore]
top_dets_out = top_dets.copy()
top_boxes = top_dets[:, :4]
all_boxes = all_dets[:, :4]
all_scores = all_dets[:, 4]
top_to_all_overlaps = bbox_overlaps(top_boxes, all_boxes)
for k in range(top_dets_out.shape[0]):
inds_to_vote = np.where(top_to_all_overlaps[k] >= thresh)[0]
boxes_to_vote = all_boxes[inds_to_vote, :]
ws = all_scores[inds_to_vote]
top_dets_out[k, :4] = np.average(boxes_to_vote, axis=0, weights=ws)
if scoring_method == 'ID':
# Identity, nothing to do
pass
elif scoring_method == 'TEMP_AVG':
# Average probabilities (considered as P(detected class) vs.
# P(not the detected class)) after smoothing with a temperature
# hyperparameter.
P = np.vstack((ws, 1.0 - ws))
P_max = np.max(P, axis=0)
X = np.log(P / P_max)
X_exp = np.exp(X / beta)
P_temp = X_exp / np.sum(X_exp, axis=0)
P_avg = P_temp[0].mean()
top_dets_out[k, 4] = P_avg
elif scoring_method == 'AVG':
# Combine new probs from overlapping boxes
top_dets_out[k, 4] = ws.mean()
elif scoring_method == 'IOU_AVG':
P = ws
ws = top_to_all_overlaps[k, inds_to_vote]
P_avg = np.average(P, weights=ws)
top_dets_out[k, 4] = P_avg
elif scoring_method == 'GENERALIZED_AVG':
P_avg = np.mean(ws**beta)**(1.0 / beta)
top_dets_out[k, 4] = P_avg
elif scoring_method == 'QUASI_SUM':
top_dets_out[k, 4] = ws.sum() / float(len(ws))**beta
else:
raise NotImplementedError('Unknown scoring method {}'.format(scoring_method) )
return top_dets_out
def nms(dets, thresh):
"""
采用经典的 DPM-style 的贪婪 NMS.
"""
if dets.shape[0] == 0:
return []
return cython_nms.nms(dets, thresh)
def soft_nms(dets, sigma=0.5, overlap_thresh=0.3, score_thresh=0.001, method='linear'):
"""
采用论文 https://arxiv.org/abs/1704.04503 的 soft NMS 算法.
"""
if dets.shape[0] == 0:
return dets, []
methods = {'hard': 0, 'linear': 1, 'gaussian': 2}
assert method in methods, 'Unknown soft_nms method: {}'.format(method)
dets, keep = cython_nms.soft_nms(np.ascontiguousarray(dets, dtype=np.float32),
np.float32(sigma),
np.float32(overlap_thresh),
np.float32(score_thresh),
np.uint8(methods[method]) )
return dets, keep
"""Image helper functions."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
import cv2
import numpy as np
def aspect_ratio_rel(im, aspect_ratio):
"""
相对于 width(width-relative) 的长宽比(aspect ratio) 变换.
"""
im_h, im_w = im.shape[:2]
im_ar_w = int(round(aspect_ratio * im_w))
im_ar = cv2.resize(im, dsize=(im_ar_w, im_h))
return im_ar
def aspect_ratio_abs(im, aspect_ratio):
"""
绝对长宽比(absolute aspect ratio) 变换.
"""
im_h, im_w = im.shape[:2]
im_area = im_h * im_w
im_ar_w = np.sqrt(im_area * aspect_ratio)
im_ar_h = np.sqrt(im_area / aspect_ratio)
assert np.isclose(im_ar_w / im_ar_h, aspect_ratio)
im_ar = cv2.resize(im, dsize=(int(im_ar_w), int(im_ar_h)))
return im_ar
c