import numpy as np
import ubelt as ub
import itertools as it
import warnings
import kwarray
import kwimage
import os
try:
USE_RTREE = os.environ.get('USE_RTREE', '').lower()
USE_RTREE = USE_RTREE in {'0', 'false'} or USE_RTREE
if not USE_RTREE:
raise ImportError
import rtree
pyqtree = None
except ImportError:
import pyqtree
rtree = None
try:
from xdev import profile
except Exception:
profile = ub.identity
[docs]
class FrameIntersectionIndex(ub.NiceRepr):
"""
Build spatial tree for each frame so we can quickly determine if a random
negative is too close to a positive. For each frame/image we built a qtree.
Example:
>>> from ndsampler.isect_indexer import *
>>> import kwcoco
>>> import ubelt as ub
>>> dset = kwcoco.CocoDataset.demo()
>>> dset._ensure_imgsize()
>>> dset.remove_annotations([ann for ann in dset.anns.values()
>>> if 'bbox' not in ann])
>>> # Build intersection index aroung coco dataset
>>> self = FrameIntersectionIndex.from_coco(dset)
>>> gid = 1
>>> box = kwimage.Boxes([0, 10, 100, 100], 'xywh')
>>> isect_aids, ious = self.ious(gid, box)
>>> print(ub.urepr(ious.tolist(), nl=0, precision=4))
[0.0507]
"""
def __init__(self):
self.qtrees = None
self.all_gids = None
def __nice__(self):
if self.all_gids is None:
return 'None'
else:
return len(self.all_gids)
[docs]
@classmethod
def from_coco(cls, dset, verbose=0):
"""
Args:
dset (kwcoco.CocoDataset): positive annotation data
Returns:
FrameIntersectionIndex
"""
self = cls()
self.qtrees = self._build_index(dset, verbose=verbose)
self.all_gids = sorted(self.qtrees.keys())
return self
[docs]
@classmethod
def demo(cls, *args, **kwargs):
"""
Create a demo intersection index.
Args:
*args: see kwcoco.CocoDataset.demo
**kwargs: see kwcoco.CocoDataset.demo
Returns:
FrameIntersectionIndex
"""
import kwcoco
dset = kwcoco.CocoDataset.demo(*args, **kwargs)
dset._ensure_imgsize()
dset.remove_annotations([ann for ann in dset.anns.values()
if 'bbox' not in ann])
self = cls.from_coco(dset)
return self
@staticmethod
@profile
def _build_index(dset, verbose=0):
"""
"""
if verbose:
print('Building isect index')
if rtree is not None:
# Try using trees first
qtrees = {}
for img in ub.ProgIter(dset.dataset['images'], desc='init rtrees', verbose=verbose):
gid = img['id']
qtree = rtree.Index()
qtree.width = img['width']
qtree.height = img['height']
qtrees[gid] = qtree
# qtrees[gid].bounds = [0, 0, img['width'], img['height']]
for qtree in qtrees.values():
qtree.aid_to_ltrb = {} # Add extra index to track boxes
if dset.index is not None:
for gid, aids in ub.ProgIter(dset.index.gid_to_aids.items(),
total=len(dset.index.gid_to_aids),
desc='populate qtrees',
verbose=verbose):
annots = dset.annots(aids)
qtree = qtrees[gid]
ltrb_boxes = annots.boxes.to_ltrb().data
qtree.aid_to_ltrb = ub.dzip(aids, ltrb_boxes)
for aid, ltrb_box in qtree.aid_to_ltrb.items():
qtree.insert(aid, ltrb_box)
else:
for ann in ub.ProgIter(dset.dataset['annotations'],
desc='populate qtrees', verbose=verbose):
bbox = ann.get('bbox', None)
if bbox is not None:
aid = ann['id']
qtree = qtrees[ann['image_id']]
xywh_box = kwimage.Boxes(bbox, 'xywh')
ltrb_box = xywh_box.to_ltrb().data
qtree.insert(aid, ltrb_box)
qtree.aid_to_ltrb[aid] = ltrb_box
else:
qtrees = {
img['id']: pyqtree.Index((0, 0, img['width'], img['height']))
for img in ub.ProgIter(dset.dataset['images'],
desc='init qtrees', verbose=verbose)
}
for qtree in qtrees.values():
qtree.aid_to_ltrb = {} # Add extra index to track boxes
for ann in ub.ProgIter(dset.dataset['annotations'],
desc='populate qtrees', verbose=verbose):
bbox = ann.get('bbox', None)
if bbox is not None:
aid = ann['id']
qtree = qtrees[ann['image_id']]
xywh_box = kwimage.Boxes(bbox, 'xywh')
ltrb_box = xywh_box.to_ltrb().data
qtree.insert(aid, ltrb_box)
qtree.aid_to_ltrb[aid] = ltrb_box
return qtrees
[docs]
@profile
def overlapping_aids(self, gid, box):
"""
Find all annotation-ids within an image that have some overlap with a
bounding box.
Args:
gid (int): an image id
box (kwimage.Boxes): the specified region
Returns:
List[int]: list of annotation ids
CommandLine:
USE_RTREE=0 xdoctest -m ndsampler.isect_indexer FrameIntersectionIndex.overlapping_aids
USE_RTREE=1 xdoctest -m ndsampler.isect_indexer FrameIntersectionIndex.overlapping_aids
Example:
>>> from ndsampler.isect_indexer import * # NOQA
>>> self = FrameIntersectionIndex.demo('shapes128')
>>> for gid, qtree in self.qtrees.items():
>>> box = kwimage.Boxes([0, 0, qtree.width, qtree.height], 'xywh')
>>> print(self.overlapping_aids(gid, box))
"""
boxes1 = box[None, :] if len(box.shape) == 1 else box
qtree = self.qtrees[gid]
query = boxes1.to_ltrb().data[0]
if rtree is None:
isect_aids = sorted(set(qtree.intersect(query)))
else:
isect_aids = sorted(set(qtree.intersection(query)))
return isect_aids
[docs]
def ious(self, gid, box):
"""
Find overlaping annotations in a specific image and their intersection
over union with a a query box.
Args:
gid (int): an image id
box (kwimage.Boxes): the specified region
Returns:
Tuple[List[int], ndarray]:
isect_aids: list of annotation ids
ious: jaccard score for each returned annotation id
"""
boxes1 = box[None, :] if len(box.shape) == 1 else box
isect_aids = self.overlapping_aids(gid, box)
if len(isect_aids):
boxes1 = box[None, :]
boxes2 = [self.qtrees[gid].aid_to_ltrb[aid] for aid in isect_aids]
boxes2 = kwimage.Boxes(np.array(boxes2), 'ltrb')
ious = boxes1.ious(boxes2)[0]
else:
ious = np.empty(0)
return isect_aids, ious
[docs]
def iooas(self, gid, box):
"""
Intersection over other's area
Args:
gid (int): an image id
box (kwimage.Boxes): the specified region
Like iou, but non-symetric, returned number is a percentage of the
other's (groundtruth) area. This means we dont care how big the
(negative) `box` is.
"""
boxes1 = box[None, :] if len(box.shape) == 1 else box
isect_aids = self.overlapping_aids(gid, box)
if len(isect_aids):
boxes2 = [self.qtrees[gid].aid_to_ltrb[aid] for aid in isect_aids]
boxes2 = kwimage.Boxes(np.array(boxes2), 'ltrb')
isect = boxes1.isect_area(boxes2)
denom = boxes2.area.T
eps = 1e-6
iomas = isect / (denom[0] + eps)
else:
iomas = np.empty(0)
return isect_aids, iomas
[docs]
def random_negatives(self, num, anchors=None, window_size=None, gids=None,
thresh=0.0, exact=True, rng=None, patience=None):
"""
Finds random boxes that don't have a large overlap with positive
instances.
Args:
num (int): number of negative boxes to generate (actual number of
boxes returned may be less unless `exact=True`)
anchors (ndarray): prior normalized aspect ratios for negative
boxes. Mutually exclusive with `window_size`.
window_size (ndarray): absolute (W, H) sizes to use for negative
boxes. Mutually exclusive with `anchors`.
gids (List[int]): image-ids to generate negatives for,
if not specified generates for all images.
thresh (float): overlap area threshold as a percentage of
the negative box size. When thresh=0.0, that means
negatives cannot overlap any positive, when threh=1.0, there
are no constrains on negative placement.
exact (bool): if True, ensure that we generate exactly `num` boxes
rng (RandomState): random number generator
Example:
>>> from ndsampler.isect_indexer import *
>>> import ndsampler
>>> import kwcoco
>>> dset = kwcoco.CocoDataset.demo('shapes8')
>>> self = FrameIntersectionIndex.from_coco(dset)
>>> anchors = np.array([[.35, .15], [.2, .2], [.1, .1]])
>>> #num = 25
>>> num = 5
>>> rng = kwarray.ensure_rng(None)
>>> neg_gids, neg_boxes = self.random_negatives(
>>> num, anchors, gids=[1], rng=rng, thresh=0.01, exact=1)
>>> # xdoc: +REQUIRES(--show)
>>> gid = sorted(set(neg_gids))[0]
>>> boxes = neg_boxes.compress(neg_gids == gid)
>>> import kwplot
>>> kwplot.autompl()
>>> img = kwimage.imread(dset.imgs[gid]['file_name'])
>>> kwplot.imshow(img, doclf=True, fnum=1, colorspace='bgr')
>>> support = self._support(gid)
>>> kwplot.draw_boxes(support, color='blue')
>>> kwplot.draw_boxes(boxes, color='orange')
Example:
>>> from ndsampler.isect_indexer import *
>>> import kwcoco
>>> dset = kwcoco.CocoDataset.demo('shapes8')
>>> self = FrameIntersectionIndex.from_coco(dset)
>>> #num = 25
>>> num = 5
>>> rng = kwarray.ensure_rng(None)
>>> window_size = (50, 50)
>>> neg_gids, neg_boxes = self.random_negatives(
>>> num, window_size=window_size, gids=[1], rng=rng,
>>> thresh=0.01, exact=1)
>>> # xdoc: +REQUIRES(--show)
>>> import kwplot
>>> kwplot.autompl()
>>> gid = sorted(set(neg_gids))[0]
>>> boxes = neg_boxes.compress(neg_gids == gid)
>>> img = kwimage.imread(dset.imgs[gid]['file_name'])
>>> kwplot.imshow(img, doclf=True, fnum=1, colorspace='bgr')
>>> support = self._support(gid)
>>> support.draw(color='blue')
>>> boxes.draw(color='orange')
"""
if not ((window_size is None) ^ (anchors is None)):
raise ValueError('window_size and anchors are mutually exclusive')
rng = kwarray.ensure_rng(rng)
all_gids = self.all_gids if gids is None else gids
def _generate_rel(n):
# Generate n candidate boxes in the normalized 0-1 domain
cand_boxes = kwimage.Boxes.random(num=n, scale=1.0, format='ltrb',
anchors=anchors, anchor_std=0,
rng=rng)
chosen_gids = np.array(sorted(rng.choice(all_gids, size=n)))
gid_to_boxes = kwarray.group_items(cand_boxes, chosen_gids, axis=0)
neg_gids = []
neg_boxes = []
for gid, img_boxes in gid_to_boxes.items():
qtree = self.qtrees[gid]
# scale from normalized coordinates to image coordinates
img_boxes = img_boxes.scale((qtree.width, qtree.height))
for box in img_boxes:
# isect_aids, overlaps = self.ious(gid, box)
isect_aids, overlaps = self.iooas(gid, box)
if len(overlaps) == 0 or overlaps.max() < thresh:
neg_gids.append(gid)
neg_boxes.append(box.data)
return neg_gids, neg_boxes
def _generate_abs(n):
# Randomly choose images to generate boxes for
chosen_gids = np.array(sorted(rng.choice(all_gids, size=n)))
gid_to_nboxes = ub.dict_hist(chosen_gids)
neg_gids = []
neg_boxes = []
for gid, nboxes in gid_to_nboxes.items():
qtree = self.qtrees[gid]
scale = (qtree.width, qtree.height)
anchors_ = np.array([window_size]) / np.array(scale)
if np.any(anchors_ > 1.0):
continue
img_boxes = kwimage.Boxes.random(
num=nboxes, scale=1.0, format='ltrb', anchors=anchors_,
anchor_std=0, rng=rng)
img_boxes = img_boxes.scale(scale)
for box in img_boxes:
# isect_aids, overlaps = self.ious(gid, box)
isect_aids, overlaps = self.iooas(gid, box)
if len(overlaps) == 0 or overlaps.max() < thresh:
neg_gids.append(gid)
neg_boxes.append(box.data)
return neg_gids, neg_boxes
if window_size is not None:
_generate = _generate_abs
elif anchors is not None:
_generate = _generate_rel
else:
raise ValueError('must specify at least one window_size or anchors')
if exact:
# TODO: Dont attempt to sample negatives from images where the
# positives cover more than a threshold percent. (Handle the case
# of chip detections)
factor = 2 # oversample factor
if patience is None:
patience = int(np.sqrt(num * 10) + 1)
remaining_patience = patience
timer = ub.Timer().tic()
# Generate boxes until we have enough
neg_gids, neg_boxes = _generate(n=int(num * factor))
n_tries = 1
for n_tries in it.count(n_tries):
want = num - len(neg_boxes)
if want <= 0:
break
extra_gids, extra_boxes = _generate(n=int(want * factor))
neg_gids.extend(extra_gids)
neg_boxes.extend(extra_boxes)
if len(neg_boxes) < num:
# If we haven't found a significant number of boxes our
# patience decreases (if the wall time is getting large)
if len(extra_boxes) <= (num // 10) and timer.toc() > 1.0:
remaining_patience -= 1
if remaining_patience == 0:
break
if len(neg_boxes) < num:
# but throw an error if we don't make any progress
message = (
'Cannot make a negative sample with thresh={} '
'in under {} tries. Found {} but need {}'.format(
thresh, n_tries, len(neg_boxes), num))
if exact == 'warn':
warnings.warn(message)
else:
raise Exception(message)
print('n_tries = {!r}'.format(n_tries))
neg_gids = neg_gids[:num]
neg_boxes = neg_boxes[:num]
else:
neg_gids, neg_boxes = _generate(n=num)
neg_gids = np.array(neg_gids)
neg_boxes = kwimage.Boxes(np.array(neg_boxes), 'ltrb')
return neg_gids, neg_boxes
def _debug_index(self):
from shapely.ops import cascaded_union
def _to_shapely(boxes):
from shapely.geometry import Polygon
from kwimage.structs.boxes import _cat
x1, y1, x2, y2 = boxes.to_ltrb(copy=False).components
a = _cat([x1, y1]).tolist()
b = _cat([x1, y2]).tolist()
c = _cat([x2, y2]).tolist()
d = _cat([x2, y1]).tolist()
polygons = [Polygon(points) for points in zip(a, b, c, d, a)]
return polygons
for gid, qtree in self.qtrees.items():
boxes = kwimage.Boxes(np.array(list(qtree.aid_to_ltrb.values())), 'ltrb')
polygons = _to_shapely(boxes)
bounds = kwimage.Boxes([[0, 0, qtree.width, qtree.height]], 'ltrb')
bounds = _to_shapely(bounds)[0]
merged_polygon = cascaded_union(polygons)
uncovered = (bounds - merged_polygon)
print('uncovered.area = {!r}'.format(uncovered.area))
# plot these two polygons separately
if 1:
from descartes import PolygonPatch
from matplotlib import pyplot as plt
import kwplot
kwplot.autompl()
fig = plt.figure(gid)
ax = fig.add_subplot(111)
ax.cla()
# ax.add_patch(
# PolygonPatch(bounds, alpha=0.5, zorder=2, fc='blue')
# )
# ax.add_patch(
# PolygonPatch(merged_polygon, alpha=0.5, zorder=2, fc='red')
# )
ax.add_patch(
PolygonPatch(uncovered, alpha=0.5, zorder=2, fc='green')
)
ax.set_xlim(0, qtree.width)
ax.set_ylim(0, qtree.height)
ax.set_aspect(1)
def _support(self, gid):
qtree = self.qtrees[gid]
support_boxes = kwimage.Boxes(list(qtree.aid_to_ltrb.values()), 'ltrb')
return support_boxes