Source code for ndsampler.abstract_frames

"""
Fast access to subregions of images.

This implements the core convert-and-cache-as-cog logic, which enables us to
read from subregions of images quickly.


TODO:
    - [X] Implement npy memmap backend
    - [X] Implement gdal COG.TIFF backend
        - [X] Use as COG if input file is a COG
        - [X] Convert to COG if needed
"""
import numpy as np
import ubelt as ub
import copy
import warnings
from os.path import exists, join
from ndsampler.utils import util_gdal
from ndsampler.utils import util_lru
from ndsampler.frame_cache import (_ensure_image_cog, _ensure_image_npy)


try:
    from xdev import profile
except Exception:
    profile = ub.identity


[docs] class Frames(object): """ Abstract implementation of Frames. While this is an abstract class, it contains most of the ``Frames`` functionality. The inheriting class needs to overload the constructor and ``_lookup_gpath``, which maps an image-id to its path on disk. Args: hashid_mode (str, default='PATH'): The method used to compute a unique identifier for every image. to can be PATH, PIXELS, or GIVEN. TODO: Add DVC as a method (where it uses the name of the symlink)? workdir (PathLike): This is the directory where `Frames` can store cached results. This SHOULD be specified. backend (str | Dict): Determine the backend to use for fast subimage region lookups. This can either be a string 'cog' or 'npy'. This can also be a config dictionary for fine-grained backend control. For this case, 'type': specified cog or npy, and only COG has additional options which are: { 'type': 'cog', 'config': { 'compress': <'LZW' | 'JPEG | 'DEFLATE' | 'ZSTD' | 'auto'>, } } Example: >>> from ndsampler.abstract_frames import * >>> self = SimpleFrames.demo(backend='npy') >>> file = self.load_image(1) >>> print('file = {!r}'.format(file)) >>> assert self.load_image(1).shape == (512, 512, 3) >>> assert self.load_region(1, (slice(-20), slice(-10))).shape == (492, 502, 3) >>> # xdoctest: +REQUIRES(module:osgeo) >>> self = SimpleFrames.demo(backend='cog') >>> assert self.load_image(1).shape == (512, 512, 3) >>> assert self.load_region(1, (slice(-20), slice(-10))).shape == (492, 502, 3) Benchmark: >>> from ndsampler.abstract_frames import * # NOQA >>> import ubelt as ub >>> # >>> ti = ub.Timerit(100, bestof=3, verbose=2) >>> # >>> self = SimpleFrames.demo(backend='cog') >>> for timer in ti.reset('cog-small-subregion'): >>> self.load_image(1)[10:42, 10:42] >>> # >>> self = SimpleFrames.demo(backend='npy') >>> for timer in ti.reset('npy-small-subregion'): >>> self.load_image(1)[10:42, 10:42] >>> print('----') >>> # >>> self = SimpleFrames.demo(backend='cog') >>> for timer in ti.reset('cog-large-subregion'): >>> self.load_image(1)[3:-3, 3:-3] >>> # >>> self = SimpleFrames.demo(backend='npy') >>> for timer in ti.reset('npy-large-subregion'): >>> self.load_image(1)[3:-3, 3:-3] >>> print('----') >>> # >>> self = SimpleFrames.demo(backend='cog') >>> for timer in ti.reset('cog-loadimage'): >>> self.load_image(1) >>> # >>> self = SimpleFrames.demo(backend='npy') >>> for timer in ti.reset('npy-loadimage'): >>> self.load_image(1) """ DEFAULT_NPY_CONFIG = { 'type': 'npy', 'config': {}, } DEFAULT_COG_CONFIG = { 'type': 'cog', 'config': { 'compress': 'auto', }, '_hack_use_cli': True, # Uses the gdal-CLI to create cogs, which frustratingly seems to be faster } def __init__(self, hashid_mode='PATH', workdir=None, backend=None): self._backend = None self._backend_hashid = None # hash of backend config parameters self._cache_dpath = None # keep an lru cache for repeated access to the same data self._lru = util_lru.LRUDict.new(max_size=1, impl='auto') self._update_backend(backend) # This is a cache that will be populated on the fly self._id_to_pathinfo = {} if workdir is None: workdir = ub.Path.appdir('ndsampler') if self._backend['type'] is not None: warnings.warn('Frames workdir not specified. ' 'Defaulting to {!r}'.format(workdir)) self.workdir = workdir self.hashid_mode = hashid_mode if self.hashid_mode not in ['PATH', 'PIXELS', 'GIVEN']: # TODO: DVC raise KeyError(self.hashid_mode) def __getstate__(self): # don't carry the LRU cache throuch pickle operations state = self.__dict__.copy() if state['_lru'] is not None: state['_lru'] = True return state def __setstate__(self, state): if state['_lru'] is True: state['_lru'] = util_lru.LRUDict.new(max_size=1, impl='auto') self.__dict__.update(state) def _update_backend(self, backend): """ change the backend and update internals accordingly """ self._backend = self._coerce_backend_config(backend) self._cache_dpath = None self._backend_hashid = ub.hash_data( sorted(self._backend['config'].items()))[0:8] self._id_to_pathinfo = {} @classmethod def _coerce_backend_config(cls, backend=None): """ Coerce a backend argument into a valid configuration dictionary. Returns: Dict: a dictionary with two items: 'type', which is a string and and 'config', which is a dictionary of parameters for the specific type. """ if backend is None: return {'type': None, 'config': {}} # TODO: allow for heterogeneous backends if backend == 'auto': # Use defaults that work on the system if util_gdal.have_gdal(): backend = 'cog' else: backend = 'npy' if isinstance(backend, str): backend = {'type': backend, 'config': {}} backend_type = backend.get('type', None) if backend_type == 'cog': util_gdal._fix_conda_gdal_hack() final = copy.deepcopy(cls.DEFAULT_COG_CONFIG) elif backend_type == 'npy': final = copy.deepcopy(cls.DEFAULT_NPY_CONFIG) else: raise ValueError( 'Backend dictionary must specify type as either npy or cog,' ' but we got {}'.format(backend_type) ) # Check the top-level dictionary has no unknown keys unknown_keys = set(backend) - set(final) if unknown_keys: raise ValueError('Backend got unknown keys: {}'.format(unknown_keys)) # Check the config-level dictionary has no unknown keys inner_kw = backend.get('config', {}) default_kw = final['config'] unknown_config_keys = set(inner_kw) - set(default_kw) if unknown_config_keys: raise ValueError('Backend config got unknown keys: {}'.format( unknown_config_keys)) # Only update expected values in the subconfig # Update the outer config outer_kw = backend.copy() outer_kw.pop('config', None) final.update(outer_kw) # Update the inner config final['config'].update(inner_kw) return final @property def cache_dpath(self): """ Returns the path where cached frame representations will be stored. This will be None if there is no backend. """ if self._cache_dpath is None: backend_type = self._backend['type'] if backend_type is None: return None elif backend_type == 'cog': dpath = join(self.workdir, '_cache', 'frames', backend_type, self._backend_hashid) elif backend_type == 'npy': dpath = join(self.workdir, '_cache', 'frames', backend_type) else: raise KeyError('backend_type = {}'.format(backend_type)) self._cache_dpath = ub.ensuredir(dpath) return self._cache_dpath def _build_pathinfo(self, image_id): """ A user specified function that maps an image id to paths to relevant resources on disk. These resources are also indexed by channel. SeeAlso: ``_populate_chan_info`` for helping populate cache info in each channel. Args: image_id: the image id (usually an integer) Returns: Dict: with the following structure: { <NotFinalized> 'channels': { <channel_spec>: {'path': <abspath>, ...}, ... } } """ raise NotImplementedError def _lookup_pathinfo(self, image_id): if image_id in self._id_to_pathinfo: pathinfo = self._id_to_pathinfo[image_id] else: pathinfo = self._build_pathinfo(image_id) self._id_to_pathinfo[image_id] = pathinfo return pathinfo def _populate_chan_info(self, chan, root=''): """ Helper to construct a path dictionary in the ``_build_pathinfo`` method based on the current hashing and caching settings. """ backend_type = self._backend['type'] if 'file_name' in chan: fname = chan['file_name'] gpath = join(root, fname) chan['path'] = gpath elif 'path' in chan: gpath = chan['path'] fname = gpath else: raise Exception('no file_name or path info') if backend_type is not None: if backend_type == 'cog': ext = '.cog.tiff' elif backend_type == 'npy': ext = '.npy' else: raise KeyError('backend_type = {}'.format(backend_type)) hashid_mode = self.hashid_mode # hash directory structure cache_dpath = self.cache_dpath hashid = self._build_file_hashid(root, fname, hashid_mode) cache_gpath = join(cache_dpath, hashid[0:2], hashid[2:] + ext) chan['hashid'] = hashid chan['hashid_mode'] = hashid_mode chan['cache'] = cache_gpath chan['cache_type'] = backend_type @staticmethod def _build_file_hashid(root, suffix, hashid_mode): """ Build a hashid for a specific file given as a path root and suffix. """ gpath = join(root, suffix) if hashid_mode == 'PATH': # Hash the full path to the image data # NOTE: this logic is not machine independent hashid = ub.hash_data(suffix, hasher='sha1', base='hex') elif hashid_mode == 'PIXELS': # Hash the pixels in the image hashid = ub.hash_file(gpath, hasher='sha1', base='hex') elif hashid_mode == 'DVC': raise NotImplementedError('todo') elif hashid_mode == 'GIVEN': raise Exception('given mode no longer supported') else: raise KeyError(hashid_mode) return hashid @property def image_ids(self): raise NotImplementedError def __len__(self): return len(self.image_ids) def __getitem__(self, index): image_id = self.image_ids[index] return self.load_image(image_id)
[docs] @profile def load_region(self, image_id, region=None, channels=ub.NoParam, width=None, height=None): """ Ammortized O(1) image subregion loading (assuming constant region size) if region size is varied, then sampling time scales with the number of tiles needed to overlap the requested region. Args: image_id (int): image identifier region (Tuple[slice, ...]): space-time region within an image channels (str): NotImplemented width (int): if the width of the entire image is know specify it height (int): if the height of the entire image is know specify it """ if region is not None: if len(region) < 2: # Add empty dimensions tail = tuple([slice(None)] * (2 - len(region))) region = tuple(region) + tail if all(r.start is None and r.stop is None for r in region): # Avoid forcing a cache computation when loading the full image flag = ( region[0].stop in [None, height] and region[1].stop in [None, width] ) if flag: region = None # FIXME: above rectification code is duplicated and we # could do more with passed width / height # setting region to None disables memmap/geotiff caching cache = region is not None imgdata = self._load_alignable(image_id, cache=cache) if region is not None: im = imgdata.load_region(region, channels=channels, fused=True) return im
def _load_alignable(self, image_id, cache=True): _lru = self._lru if cache and _lru is not None and image_id in _lru: return _lru[image_id] pathinfo = self._lookup_pathinfo(image_id) imgdata = AlignableImageData(pathinfo, cache_backend=self._backend) if cache and _lru is not None: _lru[image_id] = imgdata return imgdata
[docs] @profile def load_image(self, image_id, channels=ub.NoParam, cache=True, noreturn=False): """ Load the image data for a particular image id Args: image_id (int): the id of the image to load cache (bool, default=True): ensure and return the efficient backend cached representation. channels : NotImplemented noreturn (bool, default=False): if True, nothing is returned. This is useful if you simply want to ensure the cached representation. CAREFUL: THIS NEEDS TO MAINTAIN A STABLE API. OTHER PROJECTS DEPEND ON IT. Returns: ArrayLike: an indexable array like representation, possibly memmapped. """ alignable = self._load_alignable(image_id, cache=cache) if channels is ub.NoParam: # chan_name = channels default_chan = alignable.pathinfo['default'] chan_name = default_chan else: if ub.iterable(channels): raise NotImplementedError else: chan_name = channels data = alignable._load_delayed_channel(chan_name, cache=cache) # this probably breaks things that want the delayed data if hasattr(data, 'finalize'): data = data.finalize() # data = alignable._load_native_channel(chan_name, cache=cache) if noreturn: data = None return data
[docs] def load_frame(self, image_id): """ TODO: FINISHME or rename to lazy frame? Returns a frame object that lazy loads on slice """ class LazyFrame(object): def __init__(self, frames, image_id): self._data = None self._frames = frames self._image_id = image_id def __getitem__(self, region): if self._data is None: if all(r.start is None and r.stop is None for r in region): # Avoid forcing a cache computation when loading the full image self._data = self._frames.load_image(self._image_id, cache=False) else: self._data = self._frames.load_image(self._image_id, cache=True) return self._frame[region] im = LazyFrame(self, image_id) return im
[docs] def prepare(self, gids=None, workers=0, use_stamp=True): """ Precompute the cached frame conversions Args: gids (List[int] | None): specific image ids to prepare. If None prepare all images. workers (int, default=0): number of parallel threads for this io-bound task Example: >>> from ndsampler.abstract_frames import * >>> workdir = ub.Path.appdir('ndsampler/tests/test_cog_precomp').ensuredir() >>> print('workdir = {!r}'.format(workdir)) >>> ub.delete(workdir) >>> ub.ensuredir(workdir) >>> self = SimpleFrames.demo(backend='npy', workdir=workdir) >>> print('self = {!r}'.format(self)) >>> print('self.cache_dpath = {!r}'.format(self.cache_dpath)) >>> #_ = ub.cmd('tree ' + workdir, verbose=3) >>> self.prepare() >>> self.prepare() >>> #_ = ub.cmd('tree ' + workdir, verbose=3) >>> _ = ub.cmd('ls ' + self.cache_dpath, verbose=3) Example: >>> from ndsampler.abstract_frames import * >>> import ndsampler >>> workdir = ub.Path.appdir('ndsampler/tests/test_cog_precomp2') >>> workdir.delete() >>> # TEST NPY >>> # >>> sampler = ndsampler.CocoSampler.demo(workdir=workdir, backend='npy') >>> self = sampler.frames >>> ub.delete(self.cache_dpath) # reset >>> self.prepare() # serial, miss >>> self.prepare() # serial, hit >>> ub.delete(self.cache_dpath) # reset >>> self.prepare(workers=3) # parallel, miss >>> self.prepare(workers=3) # parallel, hit >>> # >>> ## TEST COG >>> # xdoctest: +REQUIRES(module:osgeo) >>> sampler = ndsampler.CocoSampler.demo(workdir=workdir, backend='cog') >>> self = sampler.frames >>> ub.delete(self.cache_dpath) # reset >>> self.prepare() # serial, miss >>> self.prepare() # serial, hit >>> ub.delete(self.cache_dpath) # reset >>> self.prepare(workers=3) # parallel, miss >>> self.prepare(workers=3) # parallel, hit """ if self.cache_dpath is None: print('Frames backend is None, skip prepare') return ub.ensuredir(self.cache_dpath) # Note: this usually acceses the hashid attribute of util.HashIdentifiable hashid = getattr(self, 'hashid', None) # TODO: # Add some image preprocessing ability here? stamp = ub.CacheStamp('prepare_frames_stamp_v2', dpath=self.cache_dpath, depends=hashid, verbose=3) stamp.cacher.enabled = bool(hashid) and bool(use_stamp) and gids is None if stamp.expired() or hashid is None: from concurrent import futures # Use thread mode, because we are mostly in doing io. executor = ub.Executor(mode='thread', max_workers=workers) with executor as executor: if gids is None: gids = self.image_ids missing_cache_infos = [] for gid in ub.ProgIter(gids, desc='lookup missing cache paths'): pathinfo = self._lookup_pathinfo(gid) for chan in pathinfo['channels'].values(): if not exists(chan['cache']): missing_cache_infos.append((gid, chan['channels'])) prog = ub.ProgIter(missing_cache_infos, desc='Frames: submit prepare jobs') job_list = [ executor.submit( self.load_image, image_id, channels=channels, cache=True, noreturn=True) for image_id, channels in prog] for job in ub.ProgIter(futures.as_completed(job_list), total=len(job_list), adjust=False, freq=1, desc='Frames: collect prepare jobs'): job.result() stamp.renew()
[docs] class SimpleFrames(Frames): """ Basic concrete implementation of frames objects for images where there is a strict one-file-to-one-image mapping (i.e. no auxiliary images). Args: id_to_path (Dict): mapping from image-id to image path Example: >>> from ndsampler.abstract_frames import * >>> self = SimpleFrames.demo(backend='npy') >>> pathinfo = self._build_pathinfo(1) >>> print('pathinfo = {}'.format(ub.urepr(pathinfo, nl=3))) >>> assert self.load_image(1).shape == (512, 512, 3) >>> assert self.load_region(1, (slice(-20), slice(-10))).shape == (492, 502, 3) """ def __init__(self, id_to_path, workdir=None, backend=None): super(SimpleFrames, self).__init__( hashid_mode='PATH', workdir=workdir, backend=backend) self.id_to_path = id_to_path def _lookup_gpath(self, image_id): return self.id_to_path[image_id] @ub.memoize_property def image_ids(self): return list(self.id_to_path.keys())
[docs] @classmethod def demo(self, **kw): """ Get a smple frames object """ import kwcoco dset = kwcoco.CocoDataset.demo() id_to_path = { gid: dset.get_image_fpath(gid) for gid in dset.imgs.keys() } if kw.get('workdir', None) is None: kw['workdir'] = ub.Path.appdir('ndsampler').ensuredir() self = SimpleFrames(id_to_path, **kw) return self
def _build_pathinfo(self, image_id): default_channel = { 'path': self.id_to_path[image_id], 'channels': None, 'warp_aux_to_img': None, } pathinfo = { 'id': image_id, 'default': None, 'channels': { None: default_channel, } } for chan in pathinfo['channels'].values(): self._populate_chan_info(chan, root='') return pathinfo
[docs] class AlignableImageData(object): """ Class for sampling channels / frames that are aligned with each other TODO: - [ ] This is more general than the older way of accessing image data however, there is a lot more logic that hasn't been profiled, so we may be able to find meaningful optimizations. - [ ] Make sure adding this didnt significantly hurt performance - [ ] DEPRECATE THIS IN FAVOR OF NEW KWCOCO DELAYED LOGIC Example: >>> from ndsampler.abstract_frames import * >>> frames = SimpleFrames.demo(backend='npy') >>> pathinfo = frames._build_pathinfo(1) >>> cache_backend = frames._backend >>> print('pathinfo = {}'.format(ub.urepr(pathinfo, nl=3))) >>> self = AlignableImageData(pathinfo, cache_backend) >>> img_region = None >>> prefused = self._load_prefused_region(img_region) >>> print('prefused = {!r}'.format(prefused)) >>> img_region = (slice(0, 10), slice(0, 10)) >>> prefused = self._load_prefused_region(img_region) >>> print('prefused = {!r}'.format(prefused)) """ def __init__(self, pathinfo, cache_backend): self.pathinfo = pathinfo self.cache_backend = cache_backend self._channel_memcache = {} @profile def _load_native_channel(self, chan_name, cache=True): """ Load a specific auxiliary channel, optionally caching it """ chan = self.pathinfo['channels'][chan_name] if not cache: import kwimage gpath = chan['path'] data = kwimage.imread(gpath) else: cache_key = chan_name _channel_memcache = self._channel_memcache if _channel_memcache is not None and cache_key in _channel_memcache: return _channel_memcache[cache_key] gpath = chan['path'] cache_type = chan['cache_type'] if cache_type is None: import kwimage data = kwimage.imread(gpath) elif cache_type == 'cog': cache_gpath = chan['cache'] config = self.cache_backend['config'] hack_use_cli = self.cache_backend['_hack_use_cli'] data = _ensure_image_cog(gpath, cache_gpath, config, hack_use_cli) elif cache_type == 'npy': cache_gpath = chan['cache'] data = _ensure_image_npy(gpath, cache_gpath) else: raise KeyError(cache_type) if _channel_memcache is not None: _channel_memcache[cache_key] = data return data @profile def _load_delayed_channel(self, chan_name, cache=True): height = self.pathinfo.get('height', None) width = self.pathinfo.get('width', None) aux = self.pathinfo['channels'][chan_name] warp_aux_to_img = aux.get('warp_aux_to_img', None) data = self._load_native_channel(chan_name, cache=cache) if width is None: width = data.shape[1] if height is None: height = data.shape[0] img_dsize = (width, height) from kwcoco.util.delayed_ops import DelayedIdentity data_dsize = (data.shape[1], data.shape[0]) chan = DelayedIdentity(data, dsize=data_dsize) chan = chan.warp(warp_aux_to_img, dsize=img_dsize) return chan def _coerce_channels(self, channels=ub.NoParam): if isinstance(channels, str): # TODO: document this special key for all channels if channels == '<all>': channels = list(self.pathinfo['channels'].keys()) if channels is ub.NoParam: default_chan = self.pathinfo.get('default', None) if default_chan not in self.pathinfo['channels']: raise Exception( 'Channels is not specified and the image metadata does have a default') channels = [default_chan] return channels @profile def _load_prefused_region(self, img_region, channels=ub.NoParam): """ Loads crops from multiple channels in their native coordinate system packaged with transformation info on how to align them. """ channels = self._coerce_channels(channels) # height = self.pathinfo.get('height', None) # width = self.pathinfo.get('width', None) # if img_region is not None: # height = self.pathinfo.get('height', None) # width = self.pathinfo.get('width', None) # if len(img_region) < 2: # # Add empty dimensions # tail = tuple([slice(None)] * (2 - len(img_region))) # img_region = tuple(img_region) + tail # if all(r.start is None and r.stop is None for r in img_region): # # Avoid forcing a cache computation when loading the full image # flag = ( # img_region[0].stop in [None, height] and # img_region[1].stop in [None, width] # ) # if flag: # img_region = None subregions = [] for chan_name in channels: # Load full image in "virtual" image space im = self._load_delayed_channel(chan_name) # if img_region is not None: im = im.crop(img_region) mode = 1 if mode == 1: im = im.optimize() subregion = { 'im': im, 'channels': channels, } subregions.append(subregion) prefused = { 'subregions': subregions, } return prefused @profile def _load_fused_region(self, img_region, channels=ub.NoParam): """ Loads crops from multiple channels in aligned base coordinates. """ import kwarray prefused = self._load_prefused_region(img_region, channels) subregions = prefused['subregions'] parts = [] for subregion in subregions: chan_crop = subregion['im'] aligned_chan = chan_crop.finalize() aligned_chan = kwarray.atleast_nd(aligned_chan, 3, front=False) parts.append(aligned_chan) if len(parts) > 1: fused = np.concatenate(parts, axis=2) else: fused = parts[0] return fused
[docs] def load_region(self, img_region, channels=ub.NoParam, fused=True): """ Args: img_region (Tuple[slice, ...]): slice into the base image (will be warped into the auxiliary image's frames) """ if fused: return self._load_fused_region(img_region, channels=channels) else: return self._load_prefused_region(img_region, channels=channels)
def __getitem__(self, img_region): return self._load_fused_region(img_region)