"""
The CocoSampler is the ndsampler interface for efficiently sampling windowed
data from a :class:`kwcoco.CocoDataset`.
CommandLine:
xdoctest -m ndsampler.coco_sampler __doc__ --show
Example:
>>> # Imagine you have some images
>>> import kwimage
>>> image_paths = [
>>> kwimage.grab_test_image_fpath('astro'),
>>> kwimage.grab_test_image_fpath('carl'),
>>> kwimage.grab_test_image_fpath('airport'),
>>> ] # xdoctest: +IGNORE_WANT
['~/.cache/kwimage/demodata/KXhKM72.png',
'~/.cache/kwimage/demodata/flTHWFD.png',
'~/.cache/kwimage/demodata/Airport.jpg']
>>> # And you want to randomly load subregions of them in O(1) time
>>> import ndsampler
>>> import kwcoco
>>> # First make a COCO dataset that refers to your images
>>> dataset = {
>>> 'images': [{'id': i, 'file_name': fpath} for i, fpath in enumerate(image_paths)],
>>> 'annotations': [],
>>> 'categories': [],
>>> }
>>> coco_dset = kwcoco.CocoDataset(dataset)
>>> # (and possibly annotations)
>>> category_id = coco_dset.ensure_category('face')
>>> image_id = 0
>>> coco_dset.add_annotation(image_id=image_id, category_id=category_id, bbox=kwimage.Boxes([[140, 10, 180, 180]], 'xywh'))
>>> print(coco_dset)
<CocoDataset(tag=None, n_anns=1, n_imgs=3, ... n_cats=1...)>
>>> # Now pass the dataset to a sampler and tell it where it can store temporary files
>>> workdir = ub.Path.appdir('ndsampler/demo').ensuredir()
>>> sampler = ndsampler.CocoSampler(coco_dset, workdir=workdir)
>>> # Now you can load arbirary samples by specifing a target dictionary
>>> # with an image_id (gid) center location (cx, cy) and width, height.
>>> target = {'gid': 0, 'cx': 220, 'cy': 100, 'width': 300, 'height': 300}
>>> sample = sampler.load_sample(target)
>>> # The sample contains the image data, any visible annotations, a reference
>>> # to the original target, and params of the transform used to sample this
>>> # patch
...
>>> print(sorted(sample.keys()))
['annots', 'classes', 'im', 'kp_classes', 'params', 'target', 'tr']
>>> im = sample['im']
>>> print(f'im.shape={im.shape}')
im.shape=(300, 300, 3)
>>> dets = sample['annots']['frame_dets'][0]
>>> print(f'dets={dets}')
>>> print('dets.data = {}'.format(ub.urepr(dets.data, nl=1, sv=1, sort=1)))
dets=<Detections(1)>
dets.data = {
'aids': [1],
'boxes': <Boxes(xywh, array([[ 70., 60., 180., 180.]]))>,
'cids': [1],
'keypoints': <PointsList(n=1)>,
'segmentations': <SegmentationList(n=1)>,
}
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> kwplot.autompl()
>>> kwplot.imshow(im)
>>> dets.draw(labels=False)
>>> kwplot.show_if_requested()
>>> # The load sample function is at the core of what ndsampler does
>>> # There are other helper functions like load_positive / load_negative
>>> # which deal with annotations. See those for more details.
>>> # For random negative sampling see coco_regions.
"""
import ubelt as ub
import numpy as np
import kwimage
import kwcoco
import warnings
from ndsampler import coco_regions
from ndsampler import coco_frames
from ndsampler import abstract_sampler
from ndsampler.utils import util_misc
try:
from xdev import profile
except Exception:
profile = ub.identity
[docs]
class CocoSampler(abstract_sampler.AbstractSampler, util_misc.HashIdentifiable,
ub.NiceRepr):
"""
Samples patches of positives and negative detection windows from a COCO
dataset. Can be used for training FCN or RPN based classifiers / detectors.
Does data loading, padding, etc...
Args:
dset (kwcoco.CocoDataset): a coco-formatted dataset
backend (str | Dict):
Can be None, 'cog' or 'npy', or a dict.
In the case of a dict, it takes the format:
`{'type': str, 'config': Dict}`. See AbstractFrames for more
details. Defaults to None, which does not do anything fancy.
Example:
#print
>>> from ndsampler.coco_sampler import *
>>> self = CocoSampler.demo('photos')
...
>>> print(sorted(self.class_ids))
[0, 1, 2, 3, 4, 5, 6, 7, 8]
>>> print(self.n_positives)
4
Example:
>>> import ndsampler
>>> self = ndsampler.CocoSampler.demo('photos')
>>> p_sample = self.load_positive()
>>> n_sample = self.load_negative()
>>> self = ndsampler.CocoSampler.demo('shapes')
>>> p_sample2 = self.load_positive()
>>> n_sample2 = self.load_negative()
>>> for sample in [p_sample, n_sample, p_sample2, n_sample2]:
>>> assert 'annots' in sample
>>> assert 'im' in sample
>>> assert 'rel_boxes' in sample['annots']
>>> assert 'rel_ssegs' in sample['annots']
>>> assert 'rel_kpts' in sample['annots']
>>> assert 'cids' in sample['annots']
>>> assert 'aids' in sample['annots']
"""
[docs]
@classmethod
def demo(cls, key='shapes', workdir=None, backend=None, **kw):
"""
Create a toy coco sampler for testing and demo puposes
SeeAlso:
* kwcoco.CocoDataset.demo
"""
dset = kwcoco.CocoDataset.demo(key=key, **kw)
if key == 'photos':
toremove = [ann for ann in dset.anns.values() if 'bbox' not in ann]
dset.remove_annotations(toremove)
dset.add_category('background', id=0)
if workdir is None:
workdir = ub.Path.appdir('ndsampler').ensuredir()
self = CocoSampler(dset, workdir=workdir, backend=backend)
return self
def __init__(self, dset, workdir=None, autoinit=True, backend=None,
verbose=0):
super(CocoSampler, self).__init__()
self.workdir = workdir
self.dset = dset
self.regions = None
self.frames = None
# save at least until we init the frames / regions
self._backend = backend
self.verbose = verbose
self.BACKGROUND_CLASS_ID = None
if autoinit:
self._init()
[docs]
@classmethod
def coerce(cls, data, **kwargs):
"""
Attempt to coerce the input data into a sampler. Generally this can be
anything that is already a sampler, or somthing that can be coerced
into a kwcoco dataset.
Args:
data (str | PathLike | CocoDataset | CocoSampler):
something that can be coerced into a CocoSampler.
Returns:
CocoSampler
"""
if isinstance(data, cls):
# Either it is already a sampler
self = data
if kwargs:
raise NotImplementedError(
'data is already a sampler, '
'cannot change kwargs')
else:
# Or if it can be coerce to a kwcoco dataset, then
# it is already a sampler
dset = kwcoco.CocoDataset.coerce(data)
self = cls(dset, **kwargs)
return self
def _init(self):
if hasattr(self.dset, '_ensure_imgsize'):
self.dset._ensure_imgsize()
if self.dset.anns is None:
self.dset._build_index()
self.regions = coco_regions.CocoRegions(self.dset,
workdir=self.workdir,
verbose=self.verbose)
self.frames = coco_frames.CocoFrames(
self.dset,
workdir=self.workdir,
backend=self._backend,
)
# === Hacked in attributes ===
self.kp_classes = self.dset.keypoint_categories()
self.BACKGROUND_CLASS_ID = self.regions.BACKGROUND_CLASS_ID # currently hacked in
@property
def classes(self):
if self.regions is None:
return None
return self.regions.classes
@property
def catgraph(self):
"""
DEPRICATED, use self.classes instead
"""
if self.regions is None:
return None
return self.regions.classes
def _depends(self):
hashid_parts = ub.odict()
hashid_parts['regions_hashid'] = self.regions.hashid
hashid_parts['frames_hashid'] = self.frames.hashid
return hashid_parts
[docs]
def lookup_class_name(self, class_id):
return self.regions.lookup_class_name(class_id)
[docs]
def lookup_class_id(self, class_name):
return self.regions.lookup_class_id(class_name)
@property
def n_positives(self):
return self.regions.n_positives
@property
def n_annots(self):
return self.regions.n_annots
@property
def n_samples(self):
return self.regions.n_samples
def __len__(self):
return self.n_samples
@property
def n_images(self):
return self.regions.n_images
@property
def n_categories(self):
return self.regions.n_categories
@property
def class_ids(self):
return self.regions.class_ids
@property
def image_ids(self):
return self.regions.image_ids
[docs]
def preselect(self, **kwargs):
return self.regions.preselect(**kwargs)
[docs]
def new_sample_grid(self, task, window_dims, window_overlap=0):
sample_grid = self.regions.new_sample_grid(
task, window_dims, window_overlap)
return sample_grid
[docs]
def load_image_with_annots(self, image_id, cache=True):
"""
Args:
image_id (int): the coco image id
cache (bool): if True returns the fast
subregion-indexable file reference. Otherwise, eagerly loads
the entire image. Defaults to True.
Returns:
Tuple[Dict, List[Dict]]:
img: the coco image dict augmented with imdata
anns: the coco annotations in this image
Example:
>>> import ndsampler
>>> self = ndsampler.CocoSampler.demo()
>>> img, anns = self.load_image_with_annots(1)
>>> dets = kwimage.Detections.from_coco_annots(anns, dset=self.dset)
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> kwplot.autompl()
>>> kwplot.imshow(img['imdata'][:], doclf=1)
>>> dets.draw()
>>> kwplot.show_if_requested()
"""
full_image = self.load_image(image_id, cache=cache)
coco_dset = self.dset
img = coco_dset.imgs[image_id].copy()
anns = self.load_annotations(image_id)
img['imdata'] = full_image
return img, anns
[docs]
def load_annotations(self, image_id):
"""
Loads the annotations within an image
Args:
image_id (int): the coco image id
Returns:
List[Dict]: list of coco annotation dictionaries
"""
coco_dset = self.dset
aids = coco_dset.index.gid_to_aids[image_id]
anns = [coco_dset.anns[aid] for aid in aids]
return anns
[docs]
def load_image(self, image_id, cache=True):
"""
Loads the annotations within an image
Args:
image_id (int): the coco image id
cache (bool): if True returns the fast
subregion-indexable file reference. Otherwise, eagerly loads
the entire image. Defaults to True.
Returns:
ArrayLike: either ndarray data or a indexable reference
"""
full_image = self.frames.load_image(image_id, cache=cache)
return full_image
[docs]
def load_item(self, index, with_annots=True, target=None, rng=None, **kw):
"""
Loads item from either positive or negative regions pool.
Lower indexes will return positive regions and higher indexes will
return negative regions.
The main paradigm of the sampler is that sampler.regions maintains a
pool of target regions, you can influence what that pool is at any
point by calling sampler.regions.preselect (usually either at the start
of learning, or maybe after every epoch, etc..), and you use load_item
to load the index-th item from that preselected pool. Depending on how
you preselected the pool, the returned item might correspond to a
positive or negative region.
Args:
index (int): index of target region
with_annots (bool | str):
if True, also extracts information about any annotation that
overlaps the region of interest (subject to visibility_thresh).
Can also be a List[str] that specifies which specific subinfo
should be extracted. Valid strings in this list are: boxes,
keypoints, and segmenation. Defaults to True.
target (Dict): Extra target arguments that update the positive target,
like window_dims, pad, etc.... See :func:`load_sample` for
details on allowed keywords.
rng (None | int | RandomState):
a seed or seeded random number generator.
**kw : other arguments that can be passed to :func:`CocoSampler.load_sample`
Returns:
Dict: sample: dict containing keys
im (ndarray): image data
target (dict): contains the same input items as the input
target but additionally specifies inferred information like
rel_cx and rel_cy, which gives the center of the target
w.r.t the returned **padded** sample.
annots (dict): Dict of aids, cids, and rel/abs boxes
"""
if index < self.n_positives:
sample = self.load_positive(index, target=target,
with_annots=with_annots, rng=rng, **kw)
else:
index = index - self.n_positives
sample = self.load_negative(index, target=target,
with_annots=with_annots, rng=rng, **kw)
return sample
[docs]
def load_positive(self, index=None, with_annots=True, target=None, rng=None,
**kw):
"""
Load an item from the the positive pool of regions.
Args:
index (int): index of positive target
with_annots (bool | str):
if True, also extracts information about any annotation that
overlaps the region of interest (subject to visibility_thresh).
Can also be a List[str] that specifies which specific subinfo
should be extracted. Valid strings in this list are: boxes,
keypoints, and segmentation. Defaults to True.
target (Dict): Extra target arguments that update the positive target,
like window_dims, pad, etc.... See :func:`load_sample` for
details on allowed keywords.
rng (None | int | RandomState):
a seed or seeded random number generator.
**kw : other arguments that can be passed to :func:`CocoSampler.load_sample`
Returns:
Dict: sample: dict containing keys
im (ndarray): image data
tr (dict): contains the same input items as tr but additionally
specifies rel_cx and rel_cy, which gives the center
of the target w.r.t the returned **padded** sample.
annots (dict): Dict of aids, cids, and rel/abs boxes
Example:
>>> import ndsampler
>>> self = ndsampler.CocoSampler.demo()
>>> sample = self.load_positive(pad=(10, 10), tr=dict(window_dims=(3, 3)))
>>> assert sample['im'].shape[0] == 23
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> kwplot.autompl()
>>> kwplot.imshow(sample['im'], doclf=1)
>>> kwplot.show_if_requested()
"""
if 'tr' in kw:
ub.schedule_deprecation('ndsampler', 'tr', 'keyword arg of load_positive',
migration='use target instead',
deprecate='0.7.0', error='1.0.0', remove='1.1.0')
if target is not None:
warnings.warn('deprecated tr is overriding target')
target = kw.pop('tr')
target_ = self.regions.get_positive(index, rng=rng)
if target:
target_ = ub.dict_union(target_, target)
sample = self.load_sample(target_, with_annots=with_annots, **kw)
return sample
[docs]
def load_negative(self, index=None, with_annots=True, target=None,
rng=None, **kw):
"""
Load an item from the the negative pool of regions.
Args:
index (int): if specified loads a specific negative from the
presampled pool, otherwise the next negative in the pool is
returned.
with_annots (bool | str):
if True, also extracts information about any annotation that
overlaps the region of interest (subject to visibility_thresh).
Can also be a List[str] that specifies which specific subinfo
should be extracted. Valid strings in this list are: boxes,
keypoints, and segmentation. Defaults to True.
target (Dict): Extra target arguments that update the positive target,
like window_dims, pad, etc.... See :func:`load_sample` for
details on allowed keywords.
rng (None | int | RandomState):
a seed or seeded random number generator.
Returns:
Dict: sample: dict containing keys
im (ndarray): image data
tr (dict): contains the same input items as tr but additionally
specifies rel_cx and rel_cy, which gives the center
of the target w.r.t the returned **padded** sample.
annots (dict): Dict of aids, cids, and rel/abs boxes
Example:
>>> import ndsampler
>>> self = ndsampler.CocoSampler.demo()
>>> rng = None
>>> sample = self.load_negative(rng=rng, pad=(0, 0))
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> import kwimage
>>> kwplot.autompl()
>>> abs_sample_box = sample['params']['sample_tlbr']
>>> tf_rel_from_abs = kwimage.Affine.coerce(sample['params']['tf_rel_to_abs']).inv()
>>> wh, ww = sample['target']['window_dims']
>>> abs_window_box = kwimage.Boxes([[sample['target']['cx'], sample['target']['cy'], ww, wh]], 'cxywh')
>>> rel_window_box = abs_window_box.warp(tf_rel_from_abs)
>>> rel_sample_box = abs_sample_box.warp(tf_rel_from_abs)
>>> kwplot.imshow(sample['im'], fnum=1, doclf=True)
>>> rel_sample_box.draw(color='kw_green', lw=10)
>>> rel_window_box.draw(color='kw_blue', lw=8)
>>> kwplot.show_if_requested()
Example:
>>> import ndsampler
>>> self = ndsampler.CocoSampler.demo()
>>> rng = None
>>> sample = self.load_negative(rng=rng, pad=(10, 20), target=dict(window_dims=(64, 64)))
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> import kwimage
>>> kwplot.autompl()
>>> abs_sample_box = sample['params']['sample_tlbr']
>>> tf_rel_from_abs = kwimage.Affine.coerce(sample['params']['tf_rel_to_abs']).inv()
>>> wh, ww = sample['target']['window_dims']
>>> abs_window_box = kwimage.Boxes([[sample['target']['cx'], sample['target']['cy'], ww, wh]], 'cxywh')
>>> rel_window_box = abs_window_box.warp(tf_rel_from_abs)
>>> rel_sample_box = abs_sample_box.warp(tf_rel_from_abs)
>>> kwplot.imshow(sample['im'], fnum=1, doclf=True)
>>> rel_sample_box.draw(color='kw_green', lw=10)
>>> rel_window_box.draw(color='kw_blue', lw=8)
>>> kwplot.show_if_requested()
"""
if 'tr' in kw:
ub.schedule_deprecation('ndsampler', 'tr', 'keyword arg of load_negative',
migration='use target instead',
deprecate='0.7.0', error='1.0.0', remove='1.1.0')
if target is not None:
warnings.warn('deprecated tr is overriding target')
target = kw.pop('tr')
target_ = self.regions.get_negative(index, rng=rng)
if target:
target_ = ub.dict_union(target_, target)
sample = self.load_sample(target_, with_annots=with_annots, **kw)
return sample
[docs]
def load_sample(self, target=None, with_annots=True, annot_ids=None,
visible_thresh=0.0, **kwargs):
"""
Loads the volume data associated with the bbox and frame of a target
Args:
target (dict): target dictionary (often abbreviated as tr)
indicating an nd source object (e.g. image or video) and the
coordinate region to sample from. Unspecified coordinate
regions default to the extent of the source object.
For 2D image source objects, target must contain or be able to
infer the key `gid (int)`, to specify an image id.
For 3D video source objects, target must contain the key
`vidid (int)`, to specify a video id (NEW in 0.6.1) or
`gids List[int]`, as a list of images in a video (NEW in 0.6.2)
In general, coordinate regions can specified by the key
`slices`, a numpy-like "fancy index" over each of the n
dimensions. Usually this is a tuple of slices, e.g.
(y1:y2, x1:x2) for images and (t1:t2, y1:y2, x1:x2) for videos.
You may also specify:
`space_slice` as (y1:y2, x1:x2) for both 2D images and 3D
videos and `time_slice` as t1:t2 for 3D videos.
Spatial regions can be specified with keys:
* 'cx' and 'cy' as the center of the region in pixels.
* 'width' and 'height' are in pixels.
* 'window_dims' is a height, width tuple or can be a
special string key 'square', which overrides width and
height to both be the maximum of the two.
Temporal regions are specifiable by `slices`, `time_slice` or
an explicit list of `gids`.
The `aid` key can be specified to indicate a specific
annotation to load. This uses the annotation information to
infer 'gid', 'cx', 'cy', 'width', and 'height' if they are not
present. (NEW in 0.5.10)
The `channels` key can be specified as a channel code or
:class:`kwcoco.ChannelSpec` object. (NEW in 0.6.1)
as_xarray (bool):
if True, return the image data as an xarray object.
default=False
interpolation (str):
type of resample interpolation. Defaults to 'auto'.
antialias (str):
antialias sample or not. Defaults to 'auto'.
nodata: override function level nodata
use_native_scale (bool):
If True, the "im" field is returned as a jagged list of
data that are as close to native resolution as possible
while still maintaining alignment up to a scale factor.
Currently only available for video sampling.
scale (float | Tuple[float, float]):
if specified, the same window is sampled, but the data is
returned warped by the extra scale factor. This augments
the existing image or video scale factor. Any annotations
are also warped according to this factor such that they
align with the returned data. By default this scale is
applied to videospace, unless use_native_scale is given, in
which case it is applied to the native resolution
(generally you dont want to combine these).
pad (tuple): (height, width) extra context to add to window dims.
This helps prevent augmentation from producing boundary effects
padkw (dict): kwargs for `numpy.pad`.
Defaults to {'mode': 'constant'}.
dtype (type | None):
Cast the loaded data to this type. If unspecified returns the
data as-is.
nodata (int | None):
If specified, for integer data with nodata values, this is
passed to kwcoco delayed image finalize. The data is converted
to float32 and nodata values are replaced with nan. These
nan values are handled correctly in subsequent warping
operations. Defaults to None.
with_annots (bool | str):
if True, also extracts information about any annotation that
overlaps the region of interest (subject to visibility_thresh).
Can also be a List[str] that specifies which specific subinfo
should be extracted. Valid strings in this list are: boxes,
keypoints, and segmentation. Defaults to True.
annot_ids (List[int]):
if specified, assume the user has precomputed which annotations
should be loaded for the target region. Skip the spatial lookup
step and just load the data for these annotations instead.
visible_thresh (float): does not return annotations with visibility
less than this threshold.
**kwargs : handles deprecated arguments which are now specified
in the target dictionary itself.
Returns:
Dict: sample: dict containing keys
im (ndarray | DataArray): image / video data
target (dict): contains the same input items as the input
target but additionally specifies inferred information like
rel_cx and rel_cy, which gives the center of the target
w.r.t the returned **padded** sample.
annots (dict): containing items:
frame_dets (List[kwimage.Detections]): a list of detection
objects containing the requested annotation info for each
frame.
aids (list): annotation ids DEPRECATED
cids (list): category ids DEPRECATED
rel_ssegs (ndarray): segmentations relative to the sample DEPRECATED
rel_kpts (ndarray): keypoints relative to the sample DEPRECATED
CommandLine:
xdoctest -m ndsampler.coco_sampler CocoSampler.load_sample:2 --show
xdoctest -m ndsampler.coco_sampler CocoSampler.load_sample:1 --show
xdoctest -m ndsampler.coco_sampler CocoSampler.load_sample:3 --show
Ignore:
globals().update(xdev.get_func_kwargs(ndsampler.CocoSampler.load_sample))
Example:
>>> import ndsampler
>>> self = ndsampler.CocoSampler.demo()
>>> # The target (target) lets you specify an arbitrary window
>>> target = {'gid': 1, 'cx': 5, 'cy': 2, 'width': 6, 'height': 6}
>>> sample = self.load_sample(target)
...
>>> print('sample.shape = {!r}'.format(sample['im'].shape))
sample.shape = (6, 6, 3)
Example:
>>> # Access direct annotation information
>>> import ndsampler
>>> sampler = ndsampler.CocoSampler.demo()
>>> # Sample a region that contains at least one annotation
>>> target = {'gid': 1, 'cx': 5, 'cy': 2, 'width': 600, 'height': 600}
>>> sample = sampler.load_sample(target)
>>> annotation_ids = sample['annots']['aids']
>>> aid = annotation_ids[0]
>>> # Method1: Access ann dict directly via the coco index
>>> ann = sampler.dset.anns[aid]
>>> # Method2: Access ann objects via annots method
>>> dets = sampler.dset.annots(annotation_ids).detections
>>> print('dets.data = {}'.format(ub.urepr(dets.data, nl=1)))
Example:
>>> import ndsampler
>>> self = ndsampler.CocoSampler.demo()
>>> target = self.regions.get_positive(0)
>>> target['window_dims'] = 'square'
>>> target['pad'] = (25, 25)
>>> sample = self.load_sample(target)
>>> print('im.shape = {!r}'.format(sample['im'].shape))
im.shape = (135, 135, 3)
>>> target['window_dims'] = None
>>> target['pad'] = (0, 0)
>>> sample = self.load_sample(target)
>>> print('im.shape = {!r}'.format(sample['im'].shape))
im.shape = (52, 85, 3)
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> kwplot.autompl()
>>> kwplot.imshow(sample['im'])
>>> kwplot.show_if_requested()
Example:
>>> # sample an out of bounds target
>>> from ndsampler.coco_sampler import *
>>> self = CocoSampler.demo('vidshapes8')
>>> test_vidspace = 1
>>> target = self.regions.get_positive(0)
>>> # Toggle to see if this test works in both cases
>>> space = 'image'
>>> if test_vidspace:
>>> space = 'video'
>>> target = target.copy()
>>> target['gids'] = [target.pop('gid')]
>>> target['scale'] = 1.3
>>> #target['scale'] = 0.8
>>> #target['use_native_scale'] = True
>>> #target['realign_native'] = 'largest'
>>> target['window_dims'] = (364, 364)
>>> sample = self.load_sample(target)
>>> annots = sample['annots']
>>> assert len(annots['aids']) > 0
>>> #assert len(annots['rel_cxywh']) == len(annots['aids'])
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> kwplot.autompl()
>>> tf_rel_to_abs = sample['params']['tf_rel_to_abs']
>>> rel_dets = annots['frame_dets'][0]
>>> abs_dets = rel_dets.warp(tf_rel_to_abs)
>>> # Draw box in original image context
>>> #abs_frame = self.frames.load_image(sample['target']['gid'], space=space)[:]
>>> abs_frame = self.dset.coco_image(sample['target']['gid']).delay(space=space).finalize()
>>> kwplot.imshow(abs_frame, pnum=(1, 2, 1), fnum=1)
>>> abs_dets.data['boxes'].translate([-.5, -.5]).draw()
>>> abs_dets.data['keypoints'].draw(color='green', radius=10)
>>> abs_dets.data['segmentations'].draw(color='red', alpha=.5)
>>> # Draw box in relative sample context
>>> if test_vidspace:
>>> kwplot.imshow(sample['im'][0], pnum=(1, 2, 2), fnum=1)
>>> else:
>>> kwplot.imshow(sample['im'], pnum=(1, 2, 2), fnum=1)
>>> rel_dets.data['boxes'].translate([-.5, -.5]).draw()
>>> rel_dets.data['segmentations'].draw(color='red', alpha=.6)
>>> rel_dets.data['keypoints'].draw(color='green', alpha=.4, radius=10)
>>> kwplot.show_if_requested()
Example:
>>> from ndsampler.coco_sampler import *
>>> self = CocoSampler.demo('photos')
>>> target = self.regions.get_positive(1)
>>> target['window_dims'] = (300, 150)
>>> target['pad'] = None
>>> sample = self.load_sample(target)
>>> assert sample['im'].shape[0:2] == target['window_dims']
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> kwplot.autompl()
>>> kwplot.imshow(sample['im'], colorspace='rgb')
>>> kwplot.show_if_requested()
Example:
>>> # Multispectral video sample example
>>> from ndsampler.coco_sampler import *
>>> self = CocoSampler.demo('vidshapes1-multispectral', num_frames=5)
>>> sample_grid = self.new_sample_grid('video_detection', (3, 128, 128))
>>> target = sample_grid['positives'][0]
>>> target['channels'] = 'B1|B8'
>>> target['as_xarray'] = False
>>> sample = self.load_sample(target)
>>> print(ub.urepr(sample['target'], nl=1))
>>> print(sample['im'].shape)
>>> assert sample['im'].shape == (3, 128, 128, 2)
>>> target['channels'] = '<all>'
>>> sample = self.load_sample(target)
>>> assert sample['im'].shape == (3, 128, 128, 5)
Example:
>>> # Multispectral-multisensor jagged video sample example
>>> from ndsampler.coco_sampler import *
>>> self = CocoSampler.demo('vidshapes1-msi-multisensor', num_frames=5)
>>> sample_grid = self.new_sample_grid('video_detection', (3, 128, 128))
>>> target = sample_grid['positives'][0]
>>> target['channels'] = 'B1|B8'
>>> target['as_xarray'] = False
>>> sample1 = self.load_sample(target)
>>> target['scale'] = 2
>>> sample2 = self.load_sample(target)
>>> target['use_native_scale'] = True
>>> sample3 = self.load_sample(target)
>>> ####
>>> assert sample1['im'].shape == (3, 128, 128, 2)
>>> assert sample2['im'].shape == (3, 256, 256, 2)
>>> box1 = sample1['annots']['frame_dets'][0].boxes
>>> box2 = sample2['annots']['frame_dets'][0].boxes
>>> box3 = sample3['annots']['frame_dets'][0].boxes
>>> assert np.allclose((box2.width / box1.width), 2)
>>> # Jagged annotations are still in video space
>>> assert np.allclose((box3.width / box1.width), 2)
>>> jagged_shape = [[p.shape for p in f] for f in sample3['im']]
>>> jagged_align = [[a for a in m['align']] for m in sample3['params']['jagged_meta']]
"""
if target is None:
if 'tr' in kwargs:
ub.schedule_deprecation(
'ndsampler', 'tr', 'keyword arg to load_sample',
migration='the name is now "target", use that instead',
deprecate='0.7.3', error='1.0.0', remove='1.1.0')
target = kwargs.pop('tr', None)
if target is None:
raise ValueError('The target dictionary must be specified')
verbose_ndsample = target.get('verbose_ndsample', False)
target_ = self._infer_target_attributes(target, **kwargs)
if verbose_ndsample:
print('Load Sample:')
print('target_ = {}'.format(ub.urepr(target_, nl=1)))
print('Load Sample Inferred:')
inferred_lines = []
for k in target_:
v1 = target.get(k, None)
v2 = target_.get(k, None)
if v1 != v2:
inferred_lines.append(f'* {k}: {v1} -> {v2}')
print('\n'.join(inferred_lines))
sample = self._load_slice(target_)
if with_annots or ub.iterable(with_annots):
self._populate_overlap(sample, visible_thresh, with_annots,
annot_ids)
sample['classes'] = self.classes
sample['kp_classes'] = self.kp_classes
return sample
@profile
def _infer_target_attributes(self, target, **kwargs):
"""
Infer unpopulated target attribues
Example:
>>> # sample using only an annotation id
>>> from ndsampler.coco_sampler import *
>>> self = CocoSampler.demo()
>>> target = {'aid': 1, 'as_xarray': True}
>>> target_ = self._infer_target_attributes(target)
>>> print('target_ = {}'.format(ub.urepr(target_, nl=1)))
>>> assert target_['gid'] == 1
>>> assert all(k in target_ for k in ['cx', 'cy', 'width', 'height'])
>>> self = CocoSampler.demo('vidshapes8-multispectral')
>>> target = {'aid': 1, 'as_xarray': True}
>>> target_ = self._infer_target_attributes(target)
>>> assert target_['gid'] == 1
>>> assert all(k in target_ for k in ['cx', 'cy', 'width', 'height'])
>>> target = {'vidid': 1, 'as_xarray': True}
>>> target_ = self._infer_target_attributes(target)
>>> print('target_ = {}'.format(ub.urepr(target_, nl=1)))
>>> assert 'gids' in target_
>>> target = {'gids': [1, 2], 'as_xarray': True}
>>> target_ = self._infer_target_attributes(target)
>>> print('target_ = {}'.format(ub.urepr(target_, nl=1)))
"""
# we might modify the target
target_ = target.copy()
# Handle moving old keyword arguments into the target dictionary
deprecated_kwargs_defaults = ub.udict(
pad=None, padkw={'mode': 'constant'}, dtype=None, nodata=None)
for key, default in deprecated_kwargs_defaults.items():
if key in kwargs:
ub.schedule_deprecation(
'ndsampler', key, 'keyword arg to load_slice',
migration=f'specify {key} as an item in the target dictionary',
deprecate='0.7.0', error='1.0.0', remove='1.1.0')
if key in target_:
warnings.warn(
f'{key} was specified in both kwargs and the target dictionary, '
'the deprecated kwarg will take precedence')
target_[key] = kwargs[key]
if 'aid' in target_:
# If the annotation id is specified, infer other unspecified fields
aid = target_['aid']
try:
ann = self.dset.anns[aid]
except KeyError:
pass
else:
if 'gid' not in target_:
target_['gid'] = ann['image_id']
if len({'cx', 'cy', 'width', 'height'} & set(target_)) != 4:
box = kwimage.Boxes([ann['bbox']], 'xywh')
cx, cy, width, height = box.to_cxywh().data[0]
if 'cx' not in target_:
target_['cx'] = cx
if 'cy' not in target_:
target_['cy'] = cy
if 'width' not in target_:
target_['width'] = width
if 'height' not in target_:
target_['height'] = height
if 'category_id' not in target_:
target_['category_id'] = ann['category_id']
gid = target_.get('gid', None)
vidid = target_.get('vidid', None)
gids = target_.get('gids', None)
slices = target_.get('slices', None)
time_slice = target_.get('time_slice', None)
space_slice = target_.get('space_slice', None)
window_dims = target_.get('window_dims', None)
vid_gids = None
ndim = None
if vidid is not None or gids is not None:
# Video sample
if vidid is None:
if gids is None:
raise ValueError('ambiguous image or video object id(s)')
_vidids = self.dset.images(gids).lookup('video_id', None)
if __debug__:
if not ub.allsame(_vidids):
warnings.warn('sampled gids from different videos')
vidid = ub.peek(_vidids)
target_['vidid'] = vidid
# assert vidid == target_['vidid']
ndim = 3
elif gid is not None:
# Image sample
ndim = 2
else:
raise ValueError('no source object id(s)')
# Fix non-determined bounds
if ndim == 2:
img = self.dset.index.imgs[gid]
space_dims = (img['height'], img['width'])
data_dims = space_dims
elif ndim == 3:
if vidid is not None:
video = self.dset.index.videos[vidid]
space_dims = (video['height'], video['width'])
vid_gids = self.dset.index.vidid_to_gids[vidid]
data_dims = (len(vid_gids),) + space_dims
else:
space_dims = None
data_dims = None
else:
raise NotImplementedError
target_['space_dims'] = space_dims
target_['data_dims'] = data_dims
# other spatial specifiers allowed if slices is not given
alternate_keys = {'cx', 'cy', 'height', 'width'}
has_alternate = bool(set(target_) & alternate_keys)
if slices is not None:
if space_slice is None:
if ndim == 3:
space_slice = target_['space_slice'] = slices[1:3]
elif ndim == 2:
space_slice = target_['space_slice'] = slices[0:2]
else:
raise NotImplementedError
if ndim == 3 and gids is None and time_slice is None:
time_slice = target_['time_slice'] = slices[0]
if space_slice is None:
if has_alternate:
# A center / width / height was specified
center = (target_['cy'], target_['cx'])
# Determine the requested window size
if window_dims is None:
window_dims = 'extent'
if isinstance(window_dims, str):
if window_dims == 'extent':
window_dims = (target_['height'], target_['width'])
window_dims = np.ceil(np.array(window_dims)).astype(int)
window_dims = tuple(window_dims.tolist())
elif window_dims == 'square':
window_dims = (target_['height'], target_['width'])
window_dims = np.ceil(np.array(window_dims)).astype(int)
window_dims = tuple(window_dims.tolist())
maxdim = max(window_dims)
window_dims = (maxdim, maxdim)
else:
raise KeyError(window_dims)
target_['window_dims'] = window_dims
space_slice = _center_extent_to_slice(center, window_dims)
else:
height, width = space_dims
space_slice = (slice(0, height), slice(0, width))
target_['space_slice'] = space_slice
if ndim == 2:
target_['slices'] = slices = space_slice
elif ndim == 3:
if gids is None:
if time_slice is None:
if vid_gids is None:
raise ValueError('no gids or ability to infer them')
else:
gids = target_['gids'] = vid_gids
else:
gids = target_['gids'] = vid_gids[time_slice]
if time_slice is None:
time_slice = target_['time_slice'] = slice(0, len(gids))
target_['slices'] = slices = (time_slice,) + space_slice
else:
raise NotImplementedError(ndim)
return target_
@profile
def _load_slice(self, target_):
"""
Called by load_sample after the target dictionary has been resolved.
CommandLine:
xdoctest -m ndsampler.coco_sampler CocoSampler._load_slice --profile
Example:
>>> # sample an out of bounds target
>>> from ndsampler.coco_sampler import *
>>> self = CocoSampler.demo()
>>> target = self.regions.get_positive(0)
>>> target = self._infer_target_attributes(target)
>>> target['as_xarray'] = True
>>> sample = self._load_slice(target)
>>> print('sample = {!r}'.format(ub.map_vals(type, sample)))
>>> # sample an out of bounds target
>>> from ndsampler.coco_sampler import *
>>> self = CocoSampler.demo('vidshapes2')
>>> target = self._infer_target_attributes({'vidid': 1})
>>> target = self._infer_target_attributes(target)
>>> target['as_xarray'] = True
>>> sample = self._load_slice(target)
>>> print('sample = {!r}'.format(ub.map_vals(type, sample)))
>>> target = self._infer_target_attributes({'gids': [1, 2]})
>>> target['as_xarray'] = True
>>> sample = self._load_slice(target)
>>> print('sample = {!r}'.format(ub.map_vals(type, sample)))
Ignore:
import ndsampler
import xdev
globals().update(xdev.get_func_kwargs(ndsampler.CocoSampler._load_slice))
Example:
>>> # Multispectral video sample example
>>> from ndsampler.coco_sampler import *
>>> self = CocoSampler.demo('vidshapes1-multispectral', num_frames=5)
>>> sample_grid = self.new_sample_grid('video_detection', (3, 128, 128))
>>> target = sample_grid['positives'][0]
>>> target = self._infer_target_attributes(target)
>>> target['channels'] = 'B1|B8'
>>> target['as_xarray'] = False
>>> sample = self.load_sample(target)
>>> print(ub.urepr(sample['target'], nl=1))
>>> print(sample['im'].shape)
>>> assert sample['im'].shape == (3, 128, 128, 2)
>>> target['channels'] = '<all>'
>>> sample = self.load_sample(target)
>>> assert sample['im'].shape == (3, 128, 128, 5)
Example:
>>> # Multispectral video sample example
>>> from ndsampler.coco_sampler import *
>>> self = CocoSampler.demo('vidshapes1-multisensor-msi', num_frames=5)
>>> sample_grid = self.new_sample_grid('video_detection', (3, 128, 128))
>>> target = sample_grid['positives'][0]
>>> target = self._infer_target_attributes(target)
>>> target['channels'] = 'B1|B8'
>>> target['as_xarray'] = False
>>> target['space_slice'] = (slice(-64, 64), slice(-64, 64))
>>> sample = self.load_sample(target)
>>> print(ub.urepr(sample['target'], nl=1))
>>> print(sample['im'].shape)
>>> assert sample['im'].shape == (3, 128, 128, 2)
>>> target['channels'] = '<all>'
>>> sample = self.load_sample(target)
>>> assert sample['im'].shape[2] > 5 # probably 16
>>> # Test jagged native scale sampling
>>> target['use_native_scale'] = True
>>> target['as_xarray'] = True
>>> target['channels'] = 'B1|B8|r|g|b|disparity|gauss'
>>> sample = self.load_sample(target)
>>> jagged_meta = sample['params']['jagged_meta']
>>> frames = sample['im']
>>> jagged_shape = [[p.shape for p in f] for f in frames]
>>> jagged_chans = [[p.coords['c'].values.tolist() for p in f] for f in frames]
>>> jagged_chans2 = [m['chans'] for m in jagged_meta]
>>> jagged_align = [[a.concise() for a in m['align']] for m in jagged_meta]
>>> # all frames should have the same number of channels
>>> assert len(frames) == 3
>>> assert all(sum(p.shape[2] for p in f) == 7 for f in frames)
>>> frames[0] == 3
>>> print('jagged_chans = {}'.format(ub.urepr(jagged_chans, nl=1)))
>>> print('jagged_shape = {}'.format(ub.urepr(jagged_shape, nl=1)))
>>> print('jagged_chans2 = {}'.format(ub.urepr(jagged_chans2, nl=1)))
>>> print('jagged_align = {}'.format(ub.urepr(jagged_align, nl=1)))
>>> # Test realigned native scale sampling
>>> target['use_native_scale'] = True
>>> target['realign_native'] = 'largest'
>>> target['as_xarray'] = True
>>> target = self._infer_target_attributes(target)
>>> gid = None
>>> for coco_img in self.dset.images().coco_images:
>>> if coco_img.channels & 'r|g|b':
>>> gid = coco_img.img['id']
>>> break
>>> assert gid is not None, 'need specific image'
>>> target['gids'] = [gid]
>>> # Test channels that are good early fused groups
>>> target['channels'] = 'r|g|b'
>>> sample1 = self.load_sample(target)
>>> target['channels'] = 'B8|B11'
>>> sample2 = self.load_sample(target)
>>> target['channels'] = 'r|g|b|B11'
>>> sample3 = self.load_sample(target)
>>> shape1 = sample1['im'].shape[1:3]
>>> shape2 = sample2['im'].shape[1:3]
>>> shape3 = sample3['im'].shape[1:3]
>>> print(f'shape1={shape1}')
>>> print(f'shape2={shape2}')
>>> print(f'shape3={shape3}')
>>> assert shape1 != shape2
>>> assert shape2 == shape3
"""
gids = target_.get('gids', None)
if gids is not None:
sample = self._load_slice_3d(target_)
else:
sample = self._load_slice_2d(target_)
legacy_target = target_.get('legacy_target', None)
if legacy_target is None:
legacy_target = True
if legacy_target:
ub.schedule_deprecation(
'ndsampler', 'tr', 'key in the returned sample dictionary',
migration=ub.paragraph(
'''
Opt-in to new behavior by specifying legacy_target=False in
the target dictionary.
'''),
deprecate='0.7.0', error='1.0.0', remove='1.1.0',
)
sample['tr'] = sample['target']
return sample
@profile
def _load_slice_3d(self, target_):
"""
Breakout the 2d vs 3d logic so they can evolve somewhat independently.
TODO: the 2D logic needs to be updated to be more consistent with 3d
logic
Or at least the differences between them are more clear.
Example:
>>> # Test time padding case
>>> # xdoctest: +SKIP('not implemented')
>>> from ndsampler.coco_sampler import *
>>> self = CocoSampler.demo('vidshapes-multisensor-msi', num_frames=1, num_videos=1, image_size=(32, 32))
>>> sample_grid = self.new_sample_grid('video_detection', (2, 32, 32))
>>> target = sample_grid['positives'][0]
>>> target_ = self._infer_target_attributes(target)
>>> sample = self.load_sample(target_)
"""
pad = target_.get('pad', None)
dtype = target_.get('dtype', None)
nodata = target_.get('nodata', None)
verbose_ndsample = target_.get('verbose_ndsample', False)
assert 'space_slice' in target_
data_dims = target_['data_dims']
requested_slice = target_['slices']
# Resolve any Nones in the requested slice (without the padding)
# Probably could do this more efficienty
import kwarray
if data_dims is None:
_req_real_slice = requested_slice
_req_extra_pad = [(0, 0), (0, 0)]
resolved_slice = requested_slice
else:
_req_real_slice, _req_extra_pad = kwarray.embed_slice(requested_slice, data_dims)
resolved_slice = tuple([
slice(s.start - p_lo, s.stop + p_hi, s.step)
for s, (p_lo, p_hi) in zip(_req_real_slice, _req_extra_pad)
])
channels = target_.get('channels', ub.NoParam)
if channels is ub.NoParam or isinstance(channels, str) and channels == '<all>':
# Do something special
request_chanspec = None
else:
request_chanspec = kwcoco.ChannelSpec.coerce(channels)
# TODO: disable function level nodata param
interpolation = target_.get('interpolation', 'auto')
antialias = target_.get('antialias', 'auto')
# The user can force disable optimization
optimize = target_.get('optimize', True)
if interpolation == 'auto':
interpolation = 'linear'
if antialias == 'auto':
antialias = interpolation not in {'nearest'}
# Special arg that will let us use "native resolution" or as
# close to it.
extra_scale = target_.get('scale', None) # extra scaling
use_native_scale = target_.get('use_native_scale', False)
realign_native = target_.get('realign_native', False)
# vidid = target_.get('vidid', None)
# if vidid is None:
# raise AssertionError
ndim = 3 # number of space-time dimensions (ignore channel)
# padkw = target_.get('padkw', {'mode': 'constant'}) # TODO
pad_slice = _coerce_pad(pad, ndim)
data_time_dim = None if data_dims is None else data_dims[0]
# data_space_dims = data_dims[1:]
requested_time_slice = resolved_slice[0]
requested_space_slice = resolved_slice[1:]
space_pad = pad_slice[1:]
time_pad = pad_slice[0]
# Determine if we need to pad out temporal samples on the end
if (data_time_dim is not None and requested_time_slice.stop > data_time_dim):
end_time_padding = requested_time_slice.stop - data_time_dim
else:
end_time_padding = 0
if end_time_padding > 0:
raise NotImplementedError(ub.paragraph(
'''
Requested a time slice that is larger the the number of
available time samples. Padding in time is not yet supported
in all cases yet.
'''))
if time_pad[0] != 0 or time_pad[1] != 0 or requested_time_slice.start < 0:
raise NotImplementedError(ub.paragraph(
'''
Padding in time is not yet supported in all cases yet.
'''))
# TODO: we may want to build a efficient temporal sampling
# data structure when there are a lot of timesteps
# As of kwcoco 0.2.1 gids are ordered by frame index
# TODO: frames should have better nd-support, hack it for now to
# just load the 2d data for each image
time_gids = target_['gids']
space_frames = []
jagged_meta = []
as_xarray = target_.get('as_xarray', False)
space = 'video'
# space = 'asset'
coco_img_list = self.dset.images(time_gids).coco_images
if request_chanspec is None:
# Hobble together channels if they aren't given
unique_chan = list(ub.unique([g.channels for g in coco_img_list], key=lambda c: c.spec))
if len(unique_chan):
request_chanspec = kwcoco.FusedChannelSpec.coerce(sorted(sum(unique_chan).fuse().unique()))
# New: for each frame, we need to keep track of the transform from the
# grid space to sample space, possibly for each channel
frame_sample_from_grid_warps = []
# Start off by making the transform assuming no special scaling.
request_space_box = kwimage.Boxes.from_slice(
requested_space_slice, clip=False, wrap=False).to_ltrb()
hpad, wpad = space_pad
final_space_box = request_space_box.pad(
x_left=wpad[0], y_top=hpad[0], x_right=wpad[1], y_bot=hpad[1])
sample_ltrb = final_space_box
st_dims = [(s.start, s.stop) for s in final_space_box.to_slices()[0]]
x_start = sample_ltrb.tl_x.ravel()[0]
y_start = sample_ltrb.tl_y.ravel()[0]
offset = np.array([-x_start, -y_start])
tf_abs_from_rel = kwimage.Affine.affine(offset=-offset)
tf_rel_from_abs = tf_abs_from_rel.inv()
if verbose_ndsample:
print('[ndsampler] Sampling')
print('target_ = {}'.format(ub.urepr(target_, nl=1)))
print('request_chanspec = {}'.format(ub.urepr(request_chanspec, nl=1)))
print('tf_abs_from_rel = {}'.format(ub.urepr(tf_abs_from_rel, nl=1)))
print('tf_rel_from_abs = {}'.format(ub.urepr(tf_rel_from_abs, nl=1)))
for time_idx, coco_img in enumerate(coco_img_list):
# Build up the transform from the grid to the final sample
# TODO: We should be able to use delayed image to do this.
# It may require a reference image, or reference delayed operation
# points (which could be a load). For now just do it here.
warp_sample_from_grid = tf_rel_from_abs
delayed_frame = coco_img.imdelay(
channels=request_chanspec, space=space,
interpolation=interpolation,
nodata_method=nodata,
antialias=antialias
)
# TODO: Use the self.frames mechanism to load sampling friendly
# image representations. May need to extend self.frames for MSI.
# data_clipped = self.frames.load_region(
# image_id=gid, region=data_slice, channels=channels)
if self.frames is not None and self._backend is not None:
# HACK, replace the paths in delayed frames with the old
# self.frames cache stuff. TODO: cleaner integration.
pathinfo = self.frames._lookup_pathinfo(coco_img['id'])
norm_to_chan_lut = pathinfo['norm_to_chan_lut']
for path in list(delayed_frame._leaf_paths()):
leaf = path[0]
spec = leaf.meta['channels'].normalize().spec
# Find the info section that corresponds
chan_key = norm_to_chan_lut[spec]
info = pathinfo['channels'][chan_key]
# Hack to overwrite the path if an efficient cache exists
if info['cache_type'] is not None:
leaf.meta['fpath'] = info['cache']
# assert leaf['lazy_ref'] is None
delayed_crop = delayed_frame.crop(requested_space_slice,
clip=False, wrap=False,
pad=space_pad)
delayed_crop = delayed_crop.prepare()
if optimize:
delayed_crop = delayed_crop.optimize()
# This alt should work, but unsure if the extra floating point
# error from inverses will matter.
# alt_tf_rel_from_abs = delayed_crop.get_transform_from(delayed_frame)
frame_use_native_scale = use_native_scale
undone_parts = None
if frame_use_native_scale:
non_native_crop = delayed_crop
undone_parts, jagged_align = delayed_crop.undo_warps(
remove=['scale'], squash_nans=True,
return_warps=True)
if realign_native:
# User requested to realign all parts back up to a
# comparable scale.
if realign_native == 'largest':
old_dsize = delayed_crop.dsize
dsizes = []
for part in undone_parts:
dsizes.append(part.dsize)
max_dsize = max(dsizes, key=lambda t: t[0] * t[1])
rescaled_parts = []
for part in undone_parts:
rescale_factor = np.array(max_dsize) / np.array(part.dsize)
rescaled = part.warp({'scale': rescale_factor}, dsize=max_dsize)
rescaled_parts.append(rescaled)
from kwcoco.util.delayed_ops import DelayedChannelConcat
delayed_crop = DelayedChannelConcat(rescaled_parts, dsize=max_dsize)
if optimize:
delayed_crop = delayed_crop.optimize()
# Find the scale factor to add into the sample_from_grid
# transform
# TODO: can get a finer-grained scale factor here if we
# avoid dsize divides and instead track the scale
# factors used in the delayed tree. For now just get
# something working.
FIND_SCALE_VIA_DSIZE = 0
if FIND_SCALE_VIA_DSIZE:
new_dsize = delayed_crop.dsize
realign_scale = np.array(new_dsize) / np.array(old_dsize)
realign_tf = kwimage.Affine.scale(realign_scale)
else:
# Determine the transform from the original space
# sample, to the rescaled sample.
realign_tf = delayed_crop.get_transform_from(non_native_crop)
warp_sample_from_grid = realign_tf @ warp_sample_from_grid
else:
raise NotImplementedError
# disable this for the rest of the frame
frame_use_native_scale = False
finalizekw = dict(
# We shouldn't need to pass these args, but
# optimize is bugged and clobbers them, so
# we add them in as a fix.
prepare=False,
optimize=False,
interpolation=interpolation,
nodata_method=nodata,
antialias=antialias
)
if verbose_ndsample:
print('Sample Single Frame')
print(f'frame_use_native_scale={frame_use_native_scale}')
print(f'realign_native={realign_native}')
print('time_idx = {}'.format(ub.urepr(time_idx, nl=1)))
print('coco_img = {}'.format(ub.urepr(coco_img, nl=1)))
print(f'frame_use_native_scale={frame_use_native_scale}')
print(f'warp_sample_from_grid={warp_sample_from_grid}')
if frame_use_native_scale:
# print(undone_parts)
jagged_parts = []
jagged_chans = []
jagged_warps = []
for part in undone_parts:
if extra_scale is not None:
part = part.warp({'scale': extra_scale})
warp_grid_to_part = kwimage.Affine.scale(extra_scale) @ warp_sample_from_grid
else:
warp_grid_to_part = warp_sample_from_grid
if verbose_ndsample:
print('* Native sampler part')
print(f'warp_grid_to_part={warp_grid_to_part}')
part.print_graph()
part2 = part.optimize() if optimize else part
if as_xarray:
frame = part2.as_xarray().finalize(**finalizekw)
else:
frame = part2.finalize(**finalizekw)
jagged_parts.append(frame)
jagged_chans.append(part.channels)
jagged_warps.append(warp_grid_to_part)
space_frames.append(jagged_parts)
jagged_meta.append({
'align': jagged_align,
'chans': jagged_chans,
'grid_to_part_warps': jagged_warps,
})
else:
if extra_scale is not None:
tf_scale = kwimage.Affine.scale(extra_scale)
delayed_crop = delayed_crop.warp({'scale': extra_scale})
warp_sample_from_grid = tf_scale @ warp_sample_from_grid
# warp_sample_from_grid = warp_sample_from_grid @ tf_scale
if verbose_ndsample:
print('* Aligned sample')
print(f'warp_sample_from_grid={warp_sample_from_grid}')
delayed_crop.print_graph()
if optimize:
delayed_crop = delayed_crop.optimize()
print("Optimized:")
delayed_crop.print_graph()
if as_xarray:
to_finalize = delayed_crop.as_xarray()
else:
if optimize:
delayed_crop = delayed_crop.optimize()
to_finalize = delayed_crop
# to_finalize._set_nested_params(**kwargs)
try:
frame = to_finalize.finalize(**finalizekw)
except Exception:
print('ERROR in finalize')
to_finalize.print_graph()
raise
if dtype is not None:
frame = frame.astype(dtype)
space_frames.append(frame)
# warp_sample_from_grid_alt = delayed_crop.get_transform_from(delayed_frame)
# print('warp_sample_from_grid_alt = {}'.format(ub.urepr(warp_sample_from_grid_alt, nl=1)))
# print('warp_sample_from_grid = {}'.format(ub.urepr(warp_sample_from_grid, nl=1)))
frame_sample_from_grid_warps.append(warp_sample_from_grid)
# Concat aligned frames together (add nans for non-existing channels)
if frame_use_native_scale:
data_sliced = space_frames
else:
if as_xarray:
import xarray as xr
data_sliced = xr.concat(space_frames, dim='t')
else:
data_sliced = np.stack(space_frames, axis=0)
# TODO: gids should be padded if it goes oob.
# target_['_data_gids'] = time_gids
if frame_sample_from_grid_warps:
# only works when there is one image
tf_abs_from_rel = frame_sample_from_grid_warps[0].inv()
sample = {
'im': data_sliced,
'target': target_,
'params': {
# TODO: some of these params need to be deprecated or have
# their behavior changed to respect the "window/grid space" and
# "input/sample space".
'offset': offset,
'tf_rel_to_abs': tf_abs_from_rel.matrix, # doesnt make sense here
'sample_tlbr': sample_ltrb,
'st_dims': st_dims,
'data_dims': data_dims,
'pad': pad,
'request_chanspec': request_chanspec,
'frame_sample_from_grid_warps': frame_sample_from_grid_warps,
},
}
if use_native_scale:
sample['params']['jagged_meta'] = jagged_meta
return sample
@profile
def _load_slice_2d(self, target):
"""
Breakout the 2d vs 3d logic so they can evolve somewhat independently.
TODO: the 2D logic needs to be updated to be more consistent with 3d
logic
Or at least the differences between them are more clear.
"""
import skimage
import kwarray
target_ = target
pad = target_.get('pad', None)
padkw = target_.get('padkw', {'mode': 'constant'})
nodata = target_.get('nodata', None)
if pad is None:
pad = 0
assert 'space_slice' in target_
data_dims = target_['data_dims']
requested_slice = target_['slices']
channels = target_.get('channels', ub.NoParam)
# TODO: disable function level nodata param
nodata = target_.get('nodata', nodata)
interpolation = target_.get('interpolation', 'auto')
antialias = target_.get('antialias', 'auto')
if interpolation == 'auto':
interpolation = 'linear'
if antialias == 'auto':
antialias = interpolation not in {'nearest'}
# Special arg that will let us use "native resolution" or as
# close to it.
scale = target_.get('scale', None) # extra scaling
use_native_scale = target_.get('use_native_scale', False)
assert not use_native_scale
vidid = target_.get('vidid', None)
if vidid is not None:
raise Exception
gid = target_['gid']
ndim = 2 # number of space-time dimensions (ignore channel)
pad = tuple(_ensure_iterablen(pad, ndim))
data_slice, extra_padding = kwarray.embed_slice(
requested_slice, data_dims, pad)
if scale is not None:
raise NotImplementedError
# TODO: ensure we are using the kwcoco mechanisms here
data_clipped = self.frames.load_region(
image_id=gid, region=data_slice, channels=channels)
if target_.get('as_xarray', False):
import xarray as xr
# TODO: respect the channels arg in target_
if len(data_clipped.shape) == 1:
num_bands = 1
else:
num_bands = data_clipped.shape[2]
xrkw = {}
if num_bands == 1:
xrkw['c'] = ['gray']
elif num_bands == 3:
xrkw['c'] = ['r', 'g', 'b']
# hack to respect xarray
data_clipped = xr.DataArray(
data_clipped, dims=('y', 'x', 'c'), coords=xrkw)
#####
# TODO: the 2D logic needs to be updated to be more consistent with
# 3d logic
# Apply the padding
if sum(map(sum, extra_padding)) == 0:
# No padding was requested
data_sliced = data_clipped
else:
trailing_dims = len(data_clipped.shape) - len(extra_padding)
if trailing_dims > 0:
extra_padding = extra_padding + ([(0, 0)] * trailing_dims)
if target_.get('as_xarray', False):
coord_pad = dict(zip(data_clipped.dims, extra_padding))
# print('data_clipped.dims = {!r}'.format(data_clipped.dims))
if 'constant_values' not in padkw:
if nodata in {'float', 'auto'}:
padkw['constant_values'] = np.nan
else:
# hack for consistency
padkw['constant_values'] = 0
data_sliced = data_clipped.pad(coord_pad, **padkw)
else:
# print('data_clipped.dims = {!r}'.format(data_clipped.dims))
if 'constant_values' not in padkw:
if nodata in {'float', 'auto'}:
padkw['constant_values'] = np.nan
else:
# hack for consistency
padkw['constant_values'] = 0
# TODO: if the data out of kwcoco is masked, mask the padded
# value.
data_sliced = np.pad(data_clipped, extra_padding, **padkw)
st_dims = [(sl.start - pad_[0], sl.stop + pad_[1])
for sl, pad_ in zip(data_slice, extra_padding)]
(y_start, y_stop), (x_start, x_stop) = st_dims[-2:]
sample_ltrb = kwimage.Boxes([x_start, y_start, x_stop, y_stop], 'ltrb')
offset = np.array([-x_start, -y_start])
tf_rel_to_abs = skimage.transform.AffineTransform(
translation=-offset
).params
sample = {
'im': data_sliced,
'target': target_,
'params': {
'offset': offset,
'tf_rel_to_abs': tf_rel_to_abs,
'sample_tlbr': sample_ltrb,
'st_dims': st_dims,
'data_dims': data_dims,
'pad': pad,
},
}
return sample
@profile
def _populate_overlap(self, sample, visible_thresh=0.1, with_annots=True,
annot_ids=None):
"""
Add information about annotations overlapping the sample.
with_annots can be a + separated string or list of the the special keys:
'segmentation' and 'keypoints'.
Example:
>>> # sample an out of bounds target
>>> import ndsampler
>>> self = ndsampler.CocoSampler.demo()
>>> target = self.regions.get_item(0)
>>> target = self._infer_target_attributes(target)
>>> sample = self._load_slice(target)
>>> sample = self._populate_overlap(sample)
>>> print('sample = {}'.format(ub.urepr(ub.util_dict.dict_diff(sample, ['im']), nl=-1)))
"""
if isinstance(with_annots, int) and with_annots:
with_annots = ['segmentation', 'keypoints', 'boxes']
elif isinstance(with_annots, str):
with_annots = with_annots.split('+')
if __debug__:
for k in with_annots:
assert k in ['segmentation', 'keypoints', 'boxes'], 'k={!r}'.format(k)
target = sample['target']
if 'gids' in target:
gids = target['gids']
else:
gids = [target['gid']]
params = sample['params']
# The sample box is in "absolute sample space",
# which is either video or image space.
sample_box: kwimage.Boxes = params['sample_tlbr']
offset = params['offset']
data_dims = params['data_dims']
if data_dims is not None:
space_dims = data_dims[-2:]
else:
space_dims = None
coco_dset = self.dset
kp_classes = self.kp_classes
classes = self.classes
# accumulate information over all frames
frame_dets = []
scale = sample['target'].get('scale', None) # extra scaling
frame_sample_from_grid_warps = params.get('frame_sample_from_grid_warps', [])
for rel_frame_idx, gid in enumerate(gids):
# Check to see if there is a transform between the image-space and
# the sampling-space (currently this can only be done by a video,
# but in the future the user might be able to adjust sample scale)
if target.get('vidid', None) is not None:
in_video_space = True
# hack to align annots from image space to video space
coco_img = coco_dset.coco_image(gid)
tf_abs_from_img = coco_img.warp_vid_from_img
tf_img_from_abs = tf_abs_from_img.inv()
else:
in_video_space = False
tf_abs_from_img = None
# check overlap in image space
if tf_abs_from_img is None:
imgspace_sample_box = sample_box
else:
imgspace_sample_box = sample_box.warp(tf_img_from_abs.matrix).quantize()
if annot_ids is None:
# Find which bounding boxes are visible in this region
overlap_aids = self.regions.overlapping_aids(
gid, imgspace_sample_box, visible_thresh=visible_thresh)
else:
# If the user gave us a set of annotations, load those instead.
# But only consider the subset that actually belong to this
# image.
overlap_aids = annot_ids
flags = [coco_dset.anns[aid]['image_id'] == gid for aid in annot_ids]
overlap_aids = list(ub.compress(annot_ids, flags))
# Get info about all annotations inside this window
overlap_anns = [coco_dset.anns[aid] for aid in overlap_aids]
overlap_cids = [ann['category_id'] for ann in overlap_anns]
abs_boxes = kwimage.Boxes(
[ann['bbox'] for ann in overlap_anns], 'xywh')
# Handle segmentations and keypoints if they exist
sseg_list = []
kpts_list = []
for ann in overlap_anns:
# TODO: it should probably be the regions's responsibilty to load
# and return these kwimage data structures.
abs_points = None
abs_sseg = None
if 'keypoints' in with_annots:
coco_kpts = ann.get('keypoints', None)
if coco_kpts is not None and len(coco_kpts) > 0:
if isinstance(ub.peek(coco_kpts), dict):
# new style coco keypoint encoding
abs_points = kwimage.Points.from_coco(
coco_kpts, classes=kp_classes)
else:
# using old style coco keypoint encoding, we need look up
# keypoint class from object classes and then pass in the
# relevant info
kpnames = coco_dset._lookup_kpnames(ann['category_id'])
kp_class_idxs = np.array([kp_classes.index(n) for n in kpnames])
abs_points = kwimage.Points.from_coco(
coco_kpts, kp_class_idxs, kp_classes)
if 'segmentation' in with_annots:
coco_sseg = ann.get('segmentation', None)
if coco_sseg is not None:
abs_sseg = kwimage.MultiPolygon.coerce(coco_sseg, dims=space_dims)
sseg_list.append(abs_sseg)
kpts_list.append(abs_points)
abs_ssegs = kwimage.PolygonList(sseg_list)
abs_kpts = kwimage.PointsList(kpts_list)
abs_kpts.meta['classes'] = self.kp_classes
# Construct a detections object containing absolute annotation
# positions
imgspace_dets = kwimage.Detections(
aids=np.array(overlap_aids),
cids=np.array(overlap_cids),
boxes=abs_boxes,
segmentations=abs_ssegs,
keypoints=abs_kpts,
classes=classes,
rel_frame_index=rel_frame_idx,
gid=gid,
datakeys=['aids', 'cids'],
metakeys=['gid', 'rel_frame_index']
)
# Translate the absolute detections to relative sample coordinates
if scale is None:
scale = 1.0
if in_video_space:
# hack to align annots from image space to video space
tf_vid_from_img = tf_abs_from_img
if frame_sample_from_grid_warps:
tf_rel_from_vid = frame_sample_from_grid_warps[rel_frame_idx]
else:
# FIXME: the only case where frame_sample_from_grid_warps
# would be zero is the jagged case, in which case we should
# probably align these to something different, right now it
# just aligns it to the scaled window (but does not account
# for jagged scaling)
tf_rel_from_vid = kwimage.Affine.affine(scale=scale) @ kwimage.Affine.affine(offset=offset)
tf_rel_from_img = tf_rel_from_vid @ tf_vid_from_img
rel_dets = imgspace_dets.warp(tf_rel_from_img.matrix)
else:
rel_dets = imgspace_dets.translate(offset)
if scale != 1.0:
rel_dets = rel_dets.scale(scale)
frame_dets.append(rel_dets)
annots = {}
legacy_annots = target.get('legacy_annots', None)
if legacy_annots is None:
legacy_annots = True
if legacy_annots:
ub.schedule_deprecation(
'ndsampler', 'non-frame_dets', 'return information in sample["annots"]',
migration=ub.paragraph(
'''
Opt-in to new behavior by specifying legacy_annots=False in
the target dictionary.
'''),
deprecate='0.7.0', error='1.0.0', remove='1.1.0',
)
annots.update({
###
# TODO: deprecate everything except for frame dets
'aids': np.hstack([x.data['aids'] for x in frame_dets]),
'cids': np.hstack([x.data['cids'] for x in frame_dets]),
'rel_frame_index': np.hstack([[x.meta['rel_frame_index']] * len(x) for x in frame_dets]),
'rel_boxes': kwimage.Boxes.concatenate([x.data['boxes'] for x in frame_dets]),
'rel_ssegs': kwimage.PolygonList(list(ub.flatten([x.data['segmentations'].data for x in frame_dets]))),
'rel_kpts': kwimage.PointsList(list(ub.flatten([x.data['keypoints'].data for x in frame_dets]))),
###
})
annots['rel_kpts'].meta['classes'] = self.kp_classes
main_aid = target.get('aid', None)
if main_aid is not None:
# Determine which (if any) index in "annots" corresponds to the
# main aid (if we even have a main aid)
cand_idxs = np.where(annots['aids'] == main_aid)[0]
if len(cand_idxs) == 0:
target['annot_idx'] = -1
elif len(cand_idxs) == 1:
target['annot_idx'] = cand_idxs[0]
else:
raise AssertionError('impossible state: len(cand_idxs)={}'.format(len(cand_idxs)))
else:
target['annot_idx'] = -1
annots.update({
'frame_dets': frame_dets,
})
sample['annots'] = annots
return sample
def _center_extent_to_slice(center, window_dims):
"""
Transforms a center and window dimensions into a start/stop slice
Args:
center (Tuple[float]): center location (cy, cx)
window_dims (Tuple[int]): window size (height, width)
Returns:
Tuple[slice, ...]: the slice corresponding to the centered window
Example:
>>> center = (2, 5)
>>> window_dims = (6, 6)
>>> slices = _center_extent_to_slice(center, window_dims)
>>> assert slices == (slice(-1, 5), slice(2, 8))
Example:
>>> center = (2, 5)
>>> window_dims = (64, 64)
>>> slices = _center_extent_to_slice(center, window_dims)
>>> assert slices == (slice(-30, 34, None), slice(-27, 37, None))
Example:
>>> # Test floating point error case
>>> center = (500.5, 974.9999999999999)
>>> window_dims = (100, 100)
>>> slices = _center_extent_to_slice(center, window_dims)
>>> assert slices == (slice(450, 550, None), slice(924, 1024, None))
"""
# Compute lower and upper coordinates of the window bounding box
low_dims = [int(np.floor(c - d_win / 2.0))
for c, d_win in zip(center, window_dims)]
high_dims = [int(np.floor(c + d_win / 2.0))
for c, d_win in zip(center, window_dims)]
# Floating point errors can cause the slice window size to be different
# from the requested one. We check and correct for this.
for idx, tup in enumerate(zip(window_dims, low_dims, high_dims)):
d_win, d_low, d_high = tup
d_win_got = d_high - d_low
delta = d_win - d_win_got
if delta:
high_dims[idx] += delta
if __debug__:
for d_win, d_low, d_high in zip(window_dims, low_dims, high_dims):
d_win_got = d_high - d_low
assert d_win_got == d_win, 'slice has incorrect window size'
slices = tuple([slice(s, t) for s, t in zip(low_dims, high_dims)])
return slices
def _ensure_iterablen(scalar, n):
try:
iter(scalar)
except TypeError:
return [scalar] * n
return scalar
def _coerce_pad(pad, ndims):
if pad is None:
pad_slice = [(0, 0)] * ndims
elif isinstance(pad, int):
pad_slice = [(pad, pad)] * ndims
else:
# Normalize to left/right pad value for each dim
pad_slice = [p if ub.iterable(p) else [p, p] for p in pad]
if len(pad_slice) != ndims:
# We could "fix" it, but the user probably made a mistake
# n_trailing = ndims - len(pad)
# if n_trailing > 0:
# pad = list(pad) + [(0, 0)] * n_trailing
raise ValueError('pad and data_dims must have the same length')
return pad_slice
if __name__ == '__main__':
"""
CommandLine:
xdoctest -m ndsampler.coco_sampler
"""
import xdoctest
xdoctest.doctest_module(__file__)