Detectron data loader.py 是通用设计,与 minibatch 的实现细节无关. 多线程的数据加载.
minibatch 是字典形式,存储了 blob name keys 和对应的 numpy (float32 或 int32) ndarray values.
设计结构:
loader thread\
loader thread \ / GPU 1 enqueue thread -> feed -> EnqueueOp
... -> minibatch queue -> ...
loader thread / \ GPU N enqueue thread -> feed -> EnqueueOp
loader thread/
<---------------------------- CPU -----------------------------|---- GPU ---->
loader 线程池(threads pool) 来构建 minibatches, 其共享 minibatch 队列(quene).
每一个 GPU 有一个队列线程,从 minibatch 队列中拉取一个 minibatch,并将该 minibatch blobs 送入 workspace;然后运行 EnqueueBlobsOp 将 minibatch blobs 放入 GPU 的 blobs 队列.
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from collections import deque
from collections import OrderedDict
import logging
import numpy as np
import Queue
import signal
import threading
import time
import uuid
from caffe2.python import core, workspace
from core.config import cfg
from roi_data.minibatch import get_minibatch
from roi_data.minibatch import get_minibatch_blob_names
from utils.coordinator import coordinated_get
from utils.coordinator import coordinated_put
from utils.coordinator import Coordinator
import utils.c2 as c2_utils
logger = logging.getLogger(__name__)
class RoIDataLoader(object):
def __init__(self, roidb, num_loaders=4, minibatch_queue_size=64, blobs_queue_capacity=8 ):
self._roidb = roidb
self._lock = threading.Lock()
self._perm = deque(range(len(self._roidb)))
self._cur = 0 # _perm cursor
# minibatch 队列将训练数据保存在 host(CPU) 内存里.
# 当 GPUs N>1 时, minibatch 队列里的每个元素实际上是部分的(partial minibatch),
# 对整个 minibatch,每个 GPU 分别有 1/N 个样本.
self._minibatch_queue = Queue.Queue(maxsize=minibatch_queue_size)
self._blobs_queue_capacity = blobs_queue_capacity
# 随机队列名,in case one instantiates multple RoIDataLoaders
self._loader_id = uuid.uuid4()
self._blobs_queue_name = 'roi_blobs_queue_{}'.format(self._loader_id)
# Loader 线程(threads) 构建 (partial) minibatches, 并将其送入 minibatch 队列
self._num_loaders = num_loaders
self._num_gpus = cfg.NUM_GPUS
self.coordinator = Coordinator()
self._output_names = get_minibatch_blob_names() # 根据data loader 读取顺序,获取 blob names.
self._shuffle_roidb_inds()
self.create_threads()
def minibatch_loader_thread(self):
"""
加载 mini-batches,并送入 mini-batch 队列.
"""
with self.coordinator.stop_on_exception():
while not self.coordinator.should_stop():
blobs = self.get_next_minibatch()
# 必须根据 self.get_output_names 的顺序将 Blobs 存在队列里
ordered_blobs = OrderedDict()
for key in self.get_output_names():
assert blobs[key].dtype in (np.int32, np.float32), \
'Blob {} of dtype {} must have dtype of ' \
'np.int32 or np.float32'.format(key, blobs[key].dtype)
ordered_blobs[key] = blobs[key]
coordinated_put(self.coordinator, self._minibatch_queue, ordered_blobs)
logger.info('Stopping mini-batch loading thread')
def enqueue_blobs_thread(self, gpu_id, blob_names):
"""
从 mini-batch 队列将 mini-batches 迁移到 BlobsQueue.
"""
with self.coordinator.stop_on_exception():
while not self.coordinator.should_stop():
if self._minibatch_queue.qsize == 0:
logger.warning('Mini-batch queue is empty')
blobs = coordinated_get(self.coordinator, self._minibatch_queue)
self.enqueue_blobs(gpu_id, blob_names, blobs.values())
logger.debug('batch queue size {}'.format(self._minibatch_queue.qsize()))
logger.info('Stopping enqueue thread')
def get_next_minibatch(self):
"""
返回 next minibatch 所需要的 blobs.
Thread safe.
"""
valid = False
while not valid:
db_inds = self._get_next_minibatch_inds()
minibatch_db = [self._roidb[i] for i in db_inds]
blobs, valid = get_minibatch(minibatch_db)
return blobs
def _shuffle_roidb_inds(self):
"""
随机打乱训练 roidb.
Not thread safe.
"""
if cfg.TRAIN.ASPECT_GROUPING:
widths = np.array([r['width'] for r in self._roidb])
heights = np.array([r['height'] for r in self._roidb])
horz = (widths >= heights)
vert = np.logical_not(horz)
horz_inds = np.where(horz)[0]
vert_inds = np.where(vert)[0]
inds = np.hstack((np.random.permutation(horz_inds),
np.random.permutation(vert_inds) ))
inds = np.reshape(inds, (-1, 2))
row_perm = np.random.permutation(np.arange(inds.shape[0]))
inds = np.reshape(inds[row_perm, :], (-1, ))
self._perm = inds
else:
self._perm = np.random.permutation(np.arange(len(self._roidb)))
self._perm = deque(self._perm)
self._cur = 0
def _get_next_minibatch_inds(self):
"""
返回 next minibatch 的 roidb 索引indices.
Thread safe.
"""
with self._lock:
# We use a deque and always take the *first* IMS_PER_BATCH items
# followed by *rotating* the deque so that we see fresh items
# each time. If the length of _perm is not divisible by
# IMS_PER_BATCH, then we end up wrapping around the permutation.
db_inds = [self._perm[i] for i in range(cfg.TRAIN.IMS_PER_BATCH)]
self._perm.rotate(-cfg.TRAIN.IMS_PER_BATCH)
self._cur += cfg.TRAIN.IMS_PER_BATCH
if self._cur >= len(self._perm):
self._shuffle_roidb_inds()
return db_inds
def get_output_names(self):
return self._output_names
def enqueue_blobs(self, gpu_id, blob_names, blobs):
"""
将 mini-batch 放入 BlobsQueue.
"""
assert len(blob_names) == len(blobs)
t = time.time()
dev = c2_utils.CudaDevice(gpu_id)
queue_name = 'gpu_{}/{}'.format(gpu_id, self._blobs_queue_name)
blob_names = ['gpu_{}/{}'.format(gpu_id, b) for b in blob_names]
for (blob_name, blob) in zip(blob_names, blobs):
workspace.FeedBlob(blob_name, blob, device_option=dev)
logger.debug('enqueue_blobs {}: workspace.FeedBlob: {}'.
format(gpu_id, time.time() - t) )
t = time.time()
op = core.CreateOperator( 'SafeEnqueueBlobs', [queue_name] + blob_names,
blob_names + [queue_name + '_enqueue_status'],
device_option=dev )
workspace.RunOperatorOnce(op)
logger.debug( 'enqueue_blobs {}: workspace.RunOperatorOnce: {}'.
format(gpu_id, time.time() - t) )
def create_threads(self):
"""
创建 mini-batch loader 线程(threads),
每个线程构建 mini-batches,并送入 CPU 内存中的一个队列.
"""
self._workers = [threading.Thread(target=self.minibatch_loader_thread)
for _ in range(self._num_loaders) ]
# 每个 GPU 创建一个 BlobsQueue
# (enqueue_blob_names are unscoped)
enqueue_blob_names = self.create_blobs_queues()
# 每个 GPU 创建一个队列线程enqueuer thread
self._enqueuers = [threading.Thread(target=self.enqueue_blobs_thread,
args=(gpu_id, enqueue_blob_names))
for gpu_id in range(self._num_gpus) ]
def start(self, prefill=False):
for w in self._workers + self._enqueuers:
w.start()
if prefill:
logger.info('Pre-filling mini-batch queue...')
while not self._minibatch_queue.full():
logger.info(' [{:d}/{:d}]'.format(self._minibatch_queue.qsize(),
self._minibatch_queue.maxsize))
time.sleep(0.1)
# Detect failure and shutdown
if self.coordinator.should_stop():
self.shutdown()
break
def shutdown(self):
self.coordinator.request_stop()
self.coordinator.wait_for_stop()
self.close_blobs_queues()
for w in self._workers + self._enqueuers:
w.join()
def create_blobs_queues(self):
"""
对每个 GPU 创建一个 BlobsQueue,以保存 mini-batches.
"""
for gpu_id in range(self._num_gpus):
with c2_utils.GpuNameScope(gpu_id):
workspace.RunOperatorOnce(core.CreateOperator('CreateBlobsQueue',
[],
[self._blobs_queue_name],
num_blobs=len(self.get_output_names()),
capacity=self._blobs_queue_capacity ) )
return self.create_enqueue_blobs()
def close_blobs_queues(self):
"""
关闭 BlobsQueue.
"""
for gpu_id in range(self._num_gpus):
with core.NameScope('gpu_{}'.format(gpu_id)):
workspace.RunOperatorOnce(core.CreateOperator('CloseBlobsQueue',
[self._blobs_queue_name],
[] ) )
def create_enqueue_blobs(self):
blob_names = self.get_output_names()
enqueue_blob_names = [ '{}_enqueue_{}'.format(b, self._loader_id) for b in blob_names ]
for gpu_id in range(self._num_gpus):
with c2_utils.NamedCudaScope(gpu_id):
for blob in enqueue_blob_names:
workspace.CreateBlob(core.ScopedName(blob))
return enqueue_blob_names
def register_sigint_handler(self):
def signal_handler(signal, frame):
logger.info('SIGINT: Shutting down RoIDataLoader threads and exiting...')
self.shutdown()
signal.signal(signal.SIGINT, signal_handler)
"""
创建 Detectron 网络的 minibatches.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
import cv2
import logging
import numpy as np
from core.config import cfg
import roi_data.fast_rcnn
import roi_data.retinanet
import roi_data.rpn
import utils.blob as blob_utils
logger = logging.getLogger(__name__)
def get_minibatch_blob_names(is_training=True):
"""
根据 data loader 读取的顺序,返回 blob names.
"""
# data blob: holds a batch of N images, each with 3 channels
blob_names = ['data']
if cfg.RPN.RPN_ON:
# RPN-only or end-to-end Faster R-CNN
blob_names += roi_data.rpn.get_rpn_blob_names(is_training=is_training)
elif cfg.RETINANET.RETINANET_ON:
blob_names += roi_data.retinanet.get_retinanet_blob_names(is_training=is_training)
else:
# Fast R-CNN like models trained on precomputed proposals
blob_names += roi_data.fast_rcnn.get_fast_rcnn_blob_names(is_training=is_training)
return blob_names
def get_minibatch(roidb):
"""
给定 roidb,从中采样构建 minibatch.
"""
"""
根据图片生成 blobs,并连接为的单个 tensor.
因此,初始化每个 blob 为空列表.
"""
blobs = {k: [] for k in get_minibatch_blob_names()}
# 获取 input image blob, caffe2 格式
im_blob, im_scales = _get_image_blob(roidb)
blobs['data'] = im_blob
if cfg.RPN.RPN_ON:
# RPN-only or end-to-end Faster/Mask R-CNN
valid = roi_data.rpn.add_rpn_blobs(blobs, im_scales, roidb)
elif cfg.RETINANET.RETINANET_ON:
im_width, im_height = im_blob.shape[3], im_blob.shape[2]
# im_width, im_height corresponds to the network input: padded image
# (if needed) width and height. We pass it as input and slice the data
# accordingly so that we don't need to use SampleAsOp
valid = roi_data.retinanet.add_retinanet_blobs(blobs, im_scales, roidb,
im_width, im_height )
else:
# Fast R-CNN like models trained on precomputed proposals
valid = roi_data.fast_rcnn.add_fast_rcnn_blobs(blobs, im_scales, roidb)
return blobs, valid
def _get_image_blob(roidb):
"""
根据 roidb 中的图片和指定的 scales,构建输入 blob.
"""
num_images = len(roidb)
# 随机采样 scales,用于 batch 内的每张图片
scale_inds = np.random.randint(0, high=len(cfg.TRAIN.SCALES), size=num_images )
processed_ims = []
im_scales = []
for i in range(num_images):
im = cv2.imread(roidb[i]['image'])
if roidb[i]['flipped']:
im = im[:, ::-1, :]
target_size = cfg.TRAIN.SCALES[scale_inds[i]]
im, im_scale = blob_utils.prep_im_for_blob(im,
cfg.PIXEL_MEANS,
[target_size],
cfg.TRAIN.MAX_SIZE )
im_scales.append(im_scale[0])
processed_ims.append(im[0])
# 创建输入图片 blob
blob = blob_utils.im_list_to_blob(processed_ims)
return blob, im_scales
"""
RPN 和 RetinaNet minibtach blobs 处理的通用辅助函数.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from collections import namedtuple
import logging
import numpy as np
import threading
from core.config import cfg
from modeling.generate_anchors import generate_anchors
import utils.boxes as box_utils
logger = logging.getLogger(__name__)
# octave 和 aspect fields 只在 RetinaNet 中使用.
# Octave 对应于 anchor 的尺度scale;
# Aspect 表示所使用的 aspect ratio,其范围是 aspect ratios
FieldOfAnchors = namedtuple('FieldOfAnchors', ['field_of_anchors',
'num_cell_anchors',
'stride', 'field_size',
'octave', 'aspect'] )
# Cache for memoizing _get_field_of_anchors
_threadlocal_foa = threading.local()
def get_field_of_anchors(stride, anchor_sizes, anchor_aspect_ratios, octave=None, aspect=None):
global _threadlocal_foa
if not hasattr(_threadlocal_foa, 'cache'):
_threadlocal_foa.cache = {}
cache_key = str(stride) + str(anchor_sizes) + str(anchor_aspect_ratios)
if cache_key in _threadlocal_foa.cache:
return _threadlocal_foa.cache[cache_key]
# Anchors at a single feature cell
cell_anchors = generate_anchors(stride=stride,
sizes=anchor_sizes,
aspect_ratios=anchor_aspect_ratios)
num_cell_anchors = cell_anchors.shape[0]
# Generate canonical proposals from shifted anchors
# Enumerate all shifted positions on the (H, W) grid
fpn_max_size = cfg.FPN.COARSEST_STRIDE * np.ceil(cfg.TRAIN.MAX_SIZE / float(cfg.FPN.COARSEST_STRIDE))
field_size = int(np.ceil(fpn_max_size / float(stride)))
shifts = np.arange(0, field_size) * stride
shift_x, shift_y = np.meshgrid(shifts, shifts)
shift_x = shift_x.ravel()
shift_y = shift_y.ravel()
shifts = np.vstack((shift_x, shift_y, shift_x, shift_y)).transpose()
# Broacast anchors over shifts to enumerate all anchors at all positions
# in the (H, W) grid:
# - add A cell anchors of shape (1, A, 4) to
# - K shifts of shape (K, 1, 4) to get
# - all shifted anchors of shape (K, A, 4)
# - reshape to (K*A, 4) shifted anchors
A = num_cell_anchors
K = shifts.shape[0]
field_of_anchors = (cell_anchors.reshape((1, A, 4)) +
shifts.reshape((1, K, 4)).transpose((1, 0, 2)) )
field_of_anchors = field_of_anchors.reshape((K * A, 4))
foa = FieldOfAnchors(field_of_anchors=field_of_anchors.astype(np.float32),
num_cell_anchors=num_cell_anchors,
stride=stride,
field_size=field_size,
octave=octave,
aspect=aspect )
_threadlocal_foa.cache[cache_key] = foa
return foa
def unmap(data, count, inds, fill=0):
"""
Unmap a subset of item (data) back to the original set of items (of size count)
"""
if count == len(inds):
return data
if len(data.shape) == 1:
ret = np.empty((count, ), dtype=data.dtype)
ret.fill(fill)
ret[inds] = data
else:
ret = np.empty((count, ) + data.shape[1:], dtype=data.dtype)
ret.fill(fill)
ret[inds, :] = data
return ret
def compute_targets(ex_rois, gt_rois, weights=(1.0, 1.0, 1.0, 1.0)):
"""
计算图片的边界框回归目标值.
"""
return box_utils.bbox_transform_inv(ex_rois, gt_rois, weights).astype(np.float32, copy=False)