Shortcuts

Source code for mmflow.datasets.pipelines.transforms

# Copyright (c) OpenMMLab. All rights reserved.
import copy
import warnings
from math import ceil
from typing import List, Sequence, Tuple, Union

import cv2
import mmcv
import numpy as np
from mmcv.image import adjust_brightness, adjust_color, adjust_contrast
from numpy import random

from ..builder import PIPELINES
from ..utils import adjust_gamma, adjust_hue


def get_flow_keys(results: dict) -> List[str]:
    """Get keys of optical flow in results.

    Args:
        results (dict): The data that includes data and meta information and
            used in data augmentation pipeline.

    Returns:
        List: keys of optical flow in results.
    """
    flow_keys = []
    if 'ann_fields' in results:
        ann_keys = copy.deepcopy(results['ann_fields'])
        for k in ann_keys:
            if k.find('flow') > -1:
                flow_keys.append(k)
    return flow_keys


def get_img_keys(results: dict) -> List:
    """Get image keys in results.

    Args:
        results (dict): The data that includes data and meta information and
            used in data augmentation pipeline.

    Returns:
        List: Now it will return ['img1', 'img2'].
    """
    img_keys = copy.deepcopy(results['img_fields'])
    return img_keys


def get_map_keys(results: dict) -> list:
    """Get image and annotation keys in results.

    This annotation don't include 'valid' or 'valid_fw' and 'valid_bw'.

    Args:
        results (dict): The data that includes data and meta information and
            used in data augmentation pipeline.

    Returns:
        list: List of image keys and annotation keys.
    """
    img_fields = copy.deepcopy(results['img_fields'])
    if 'ann_fields' in results:
        return img_fields + results['ann_fields']
    else:
        return img_fields


def get_valid_keys(results: dict) -> list:
    """Get valid keys in results.

    Args:
        results (dict): The data that includes data and meta information and
            used in data augmentation pipeline.

    Returns:
        list: Now it will return ['valid'] or [] if there is not 'valid' in
            results.
    """
    flow_keys = get_flow_keys(results)
    if len(flow_keys) == 0:
        return []
    # -3 is for _gt
    valid_keys = [k.replace('flow', 'valid')[:-3] for k in flow_keys]

    if results.get(valid_keys[0], None) is None:
        return []
    else:
        return valid_keys


[docs]@PIPELINES.register_module() class SpacialTransform: """Spacial Transform API for RAFT Args: spacial_prob (float): probability to do spacial transform. stretch_prob (float): probability to do stretch. crop_size (tuple, list): the base size for resize. min_scale (float): the exponent for min scale. Defaults to -0.2. max_scale (float): the exponent for max scale. Defaults to 0.5. Returns: dict: Resized results, 'img_shape', """ def __init__(self, spacial_prob: float, stretch_prob: float, crop_size: Sequence, min_scale: float = -0.2, max_scale: float = 0.5, max_stretch: float = 0.2) -> None: super().__init__() assert spacial_prob >= 0. and spacial_prob <= 1. and isinstance( spacial_prob, float) assert stretch_prob >= 0. and stretch_prob <= 1. and isinstance( stretch_prob, float) assert isinstance( crop_size, (tuple, list)) and len(crop_size) == 2 and isinstance( crop_size[0], int) and isinstance(crop_size[1], int) assert isinstance(min_scale, float) assert isinstance(max_scale, float) assert isinstance(max_stretch, float) self.spacial_prob = spacial_prob self.stretch_prob = stretch_prob self.crop_size = crop_size self.min_scale = min_scale self.max_scale = max_scale self.max_stretch = max_stretch def __call__(self, results: dict) -> dict: """Call function to do spacial transform to images and annotation, including optical flow, occlusion mask and valid mask. Args: results (dict): Result dict from :obj:`mmflow.BaseDataset`. Returns: dict: The dict contains transformed data and transform information. """ if np.random.rand() < self.spacial_prob: img_keys = get_img_keys(results) flow_keys = get_flow_keys(results) if results.get('valid') is None: map_keys = get_map_keys(results) flow_inds = [map_keys.index(k) for k in flow_keys] maps = [results[k] for k in map_keys] maps, scale_x, scale_y, _, _ = self.spacial_transform(maps) for idx in flow_inds: maps[idx] *= [scale_x, scale_y] results['scale'] = (scale_x, scale_y) results['img_shape'] = maps[0].shape for i, k in enumerate(map_keys): results[k] = maps[i] else: # sparse spacial_transform imgs = [results[k] for k in img_keys] imgs, scale_x, scale_y, x0, y0 = self.spacial_transform(imgs) for i, k in enumerate(img_keys): results[k] = imgs[i] results['scale'] = (scale_x, scale_y) results['img_shape'] = imgs[0].shape flow, valid = self.resize_sparse_flow_map( results['flow_gt'], results['valid'], fx=scale_x, fy=scale_y, x0=x0, y0=y0) results['flow_gt'] = flow results['valid'] = valid.astype(np.float32) else: results['scale'] = (1., 1.) return results
[docs] def resize_sparse_flow_map(self, flow: np.ndarray, valid: np.ndarray, fx: float = 1.0, fy: float = 1.0, x0: int = 0, y0: int = 0) -> Sequence[np.ndarray]: """Resize sparse optical flow function. Args: flow (ndarray): optical flow data will be resized. valid (ndarray): valid mask for sparse optical flow. fx (float, optional): horizontal scale factor. Defaults to 1.0. fy (float, optional): vertical scale factor. Defaults to 1.0. x0 (int, optional): abscissa of left-top point where the flow map will be crop from. Defaults to 0. y0 (int, optional): ordinate of left-top point where the flow map will be crop from. Defaults to 0. Returns: Sequence[ndarray]: the transformed flow map and valid mask. """ ht, wd = flow.shape[:2] coords = np.meshgrid(np.arange(wd), np.arange(ht)) coords = np.stack(coords, axis=-1) coords = coords.reshape(-1, 2).astype(np.float32) flow = flow.reshape(-1, 2).astype(np.float32) valid = valid.reshape(-1).astype(np.float32) coords0 = coords[valid >= 1] flow0 = flow[valid >= 1] ht1 = int(round(ht * fy)) wd1 = int(round(wd * fx)) coords1 = coords0 * [fx, fy] flow1 = flow0 * [fx, fy] xx = np.round(coords1[:, 0]).astype(np.int32) yy = np.round(coords1[:, 1]).astype(np.int32) v = (xx > 0) & (xx < wd1) & (yy > 0) & (yy < ht1) xx = xx[v] yy = yy[v] flow1 = flow1[v] flow_img = np.zeros([ht1, wd1, 2], dtype=np.float32) valid_img = np.zeros([ht1, wd1], dtype=np.int32) flow_img[yy, xx] = flow1 valid_img[yy, xx] = 1. flow_img = flow_img[y0:y0 + self.crop_size[0], x0:x0 + self.crop_size[1]] valid_img = valid_img[y0:y0 + self.crop_size[0], x0:x0 + self.crop_size[1]] return flow_img, valid_img
[docs] def spacial_transform( self, imgs: np.ndarray) -> Tuple[np.ndarray, float, float, int, int]: """Spacial transform function. Args: imgs (ndarray): the images that will be transformed. Returns: Tuple[ndarray, float, float, int, int]: the transformed images, horizontal scale factor, vertical scale factor, coordinate of left-top point where the image maps will be crop from. """ H, W = imgs[0].shape[:2] min_scale = np.maximum((self.crop_size[0] + 8) / float(H), (self.crop_size[1] + 8) / float(W)) scale = 2**np.random.uniform(self.min_scale, self.max_scale) scale_x = scale scale_y = scale if np.random.rand() < self.stretch_prob: scale_x *= 2**np.random.uniform(-self.max_stretch, self.max_stretch) scale_y *= 2**np.random.uniform(-self.max_stretch, self.max_stretch) scale_x = np.clip(scale_x, min_scale, None) scale_y = np.clip(scale_y, min_scale, None) newW, newH = int(W * float(scale_x) + 0.5), int(H * float(scale_y) + 0.5) y0 = np.random.randint(0, newH - self.crop_size[0]) x0 = np.random.randint(0, newW - self.crop_size[1]) imgs_ = [] for img in imgs: img_, scale_x, scale_y = mmcv.imresize( img, (newW, newH), return_scale=True) img_ = img_[y0:y0 + self.crop_size[0], x0:x0 + self.crop_size[1]] imgs_.append(img_) return imgs_, scale_x, scale_y, x0, y0
def __repr__(self) -> str: repr_str = self.__class__.__name__ repr_str += (f'(spacial_prob={self.spacial_prob} ' f'stretch_prob={self.stretch_prob} ' f'crop_size={self.crop_size} ' f'min_scale={self.min_scale} ' f'max_scale = {self.max_scale}') return repr_str
[docs]@PIPELINES.register_module() class Validation: """This Validation transform from RAFT is for return a mask for the flow is less than max_flow. Args: max_flow (float, int): the max flow for validated flow. Returns: dict: Resized results, 'valid' and 'max_flow' keys are added into result dict. """ def __init__(self, max_flow: Union[float, int]) -> None: assert isinstance(max_flow, (float, int)) self.max_flow = max_flow def __call__(self, results: dict) -> dict: """Call function to get the valid mask. Args: results (dict): Result dict from :obj:`mmflow.BaseDataset`. Returns: dict: dict added 'valid' key and its value. """ flow_keys = get_flow_keys(results) for k in flow_keys: flow = results[k] valid = ((np.abs(flow[:, :, 0]) < self.max_flow) & (np.abs(flow[:, :, 1]) < self.max_flow)) valid_key = k.replace('flow', 'valid') # [:-3] is for '_gt'. results[valid_key[:-3]] = valid.astype(np.float32) results['max_flow'] = self.max_flow return results def __repr__(self) -> str: repr_str = self.__class__.__name__ repr_str += f'(max_flow={self.img_scale})' return repr_str
[docs]@PIPELINES.register_module() class Erase: """Erase transform from RAFT is randomly erasing rectangular regions in img2 to simulate occlusions. Args: prob (float): the probability for erase transform. bounds (list, tuple): the bounds for erase regions (bound_x, bound_y). max_num (int): the max number of erase regions. Returns: dict: revised results, 'img2' and 'erase_num' are added into results. """ def __init__(self, prob: float, bounds: Sequence = [50, 100], max_num: int = 3) -> None: assert isinstance(prob, float), ('Probability for erase transform must' f' be float, but got {type(prob)}') assert prob >= 0 and prob <= 1, ('The range of probability is [0.,1.],' f' but got {type(prob)}') assert isinstance(max_num, int), f'max_num must be int, but got {type(max_num)}' self.prob = prob self.bounds = bounds self.max_num = max_num def __call__(self, results: dict) -> dict: """Call function to do erase on images. Args: results (dict): Result dict from :obj:`mmflow.BaseDataset`. Returns: dict: the values of 'img1' and 'img2' is updated, and add 'erase_num' and 'erase_bounds' keys and their values. """ img2 = results['img2'] H, W, _ = img2.shape erase_bounds = [] num = 0 if np.random.rand() < self.prob: mean_color = np.mean(img2.reshape(-1, 3), axis=0) num = np.random.randint(1, self.max_num) for _ in range(num): x0 = np.random.randint(0, W) y0 = np.random.randint(0, H) dx = np.random.randint(self.bounds[0], self.bounds[1]) dy = np.random.randint(self.bounds[0], self.bounds[1]) img2[y0:y0 + dy, x0:x0 + dx, :] = mean_color erase_bounds.append((y0, x0, y0 + dy, x0 + dx)) results['img2'] = img2 results['erase_num'] = num results['erase_bounds'] = erase_bounds return results def __repr__(self) -> str: repr_str = self.__class__.__name__ repr_str += (f'(prob={self.prob} ' f'bounds={self.bounds} ' f'max_num={self.max_num})') return repr_str
[docs]@PIPELINES.register_module() class InputResize: """Resize images such that dimensions are divisible by 2^n Args: exponent(int): the exponent n of 2^n Returns: dict: Resized results, 'img_shape', 'scale_factor' keys are added into result dict. """ def __init__(self, exponent) -> None: super().__init__() assert isinstance(exponent, int) self.exponent = exponent def __call__(self, results): """Call function to resize images and flow map. Args: results (dict): Result dict from loading pipeline. Returns: dict: Resized results, 'img_shape', 'scale_factor' keys are added into result dict. """ img_keys = get_img_keys(results) imgs = [results[k] for k in img_keys] imgs, scale_factor = self._resize_img(imgs) for i, k in enumerate(img_keys): results[k] = imgs[i] results['scale_factor'] = scale_factor results['img_shape'] = imgs[0].shape return results def __repr__(self): repr_str = self.__class__.__name__ repr_str += (f'(exponent={self.exponent})') return repr_str def _resize_img(self, imgs): """Resize images with ``results['scale']``.""" times = int(2**self.exponent) H, W = imgs[0].shape[:2] newH = int(ceil(H / times) * times) newW = int(ceil(W / times) * times) imgs_resize = [] for img in imgs: img_ = mmcv.imresize(img, (newW, newH), return_scale=False) imgs_resize.append(img_) w_scale = newW / W h_scale = newH / H scale_factor = np.array([w_scale, h_scale], dtype=np.float32) return imgs_resize, scale_factor
[docs]@PIPELINES.register_module() class InputPad: """Pad images such that dimensions are divisible by 2^n used in test. Args: exponent(int): the exponent n of 2^n mode(str): mode for numpy.pad(). Defaults to 'edge'. position(str) the position of origin image, and valid value is one of 'center', 'left', 'right', 'top' and 'down'. Defaults to 'center' """ def __init__(self, exponent, mode='edge', position='center', **kwargs): assert position in ('center', 'left', 'right', 'top', 'down') assert isinstance(exponent, int) self.exponent = exponent self.mode = mode self.position = position self.kwargs = kwargs def __call__(self, results): img_keys = get_img_keys(results) imgs = [results[k] for k in img_keys] imgs = self.pad(imgs) for i, k in enumerate(img_keys): results[k] = imgs[i] results['pad_shape'] = imgs[0].shape results['pad'] = self._pad[:2] return results def pad(self, imgs): times = int(2**self.exponent) H, W = imgs[0].shape[:2] pad_h = (((H // times) + 1) * times - H) % times pad_w = (((W // times) + 1) * times - W) % times if self.position == 'center': self._pad = [[pad_h // 2, pad_h - pad_h // 2], [pad_w // 2, pad_w - pad_w // 2]] elif self.position == 'left': self._pad = [[pad_h // 2, pad_h - pad_h // 2], [0, pad_w]] elif self.position == 'right': self._pad = [[pad_h // 2, pad_h - pad_h // 2], [pad_w, 0]] elif self.position == 'top': self._pad = [[0, pad_h, pad_w // 2], [pad_w - pad_w // 2]] elif self.position == 'down': self._pad = [[pad_h, 0], [pad_w // 2, pad_w - pad_w // 2]] if len(imgs[0].shape) > 2: self._pad.append([0, 0]) imgs = [ np.pad(img, self._pad, mode=self.mode, **self.kwargs) for img in imgs ] return imgs def __repr__(self): repr_str = self.__class__.__name__ repr_str += (f'(exponent={self.exponent} ' f'mode={self.mode} ' f'position={self.position})') return repr_str
[docs]@PIPELINES.register_module() class RandomFlip: """Flip the image and flow map. Args: prob (float): The flipping probability. direction(str): The flipping direction. Options are 'horizontal' and 'vertical'. Default: 'horizontal'. """ def __init__(self, prob, direction='horizontal'): assert isinstance(prob, (float, int)) and prob >= 0 and prob <= 1 assert direction in ['horizontal', 'vertical'] self.prob = prob self.direction = direction def __call__(self, results): """Call function to flip optical flow map. Args: results (dict): Result dict from loading pipeline. Returns: dict: Flipped results, 'flip', 'flip_direction' keys are added into result dict. """ flip = True if np.random.rand() < self.prob else False if flip: # flip image map_keys = get_map_keys(results) flow_keys = get_flow_keys(results) valid_keys = get_valid_keys(results) for k in map_keys: results[k] = mmcv.imflip(results[k], direction=self.direction) for valid_key in valid_keys: results[valid_key] = mmcv.imflip( results[valid_key], direction=self.direction).copy() # flip flow if self.direction == 'horizontal': coeff = [-1, 1] else: coeff = [1, -1] for fk in flow_keys: results[fk] = results[fk] * coeff if 'flip' in results and 'flip_direction' in results: results['flip'].append(flip) results['flip_direction'].append(self.direction) else: results['flip'] = [flip] results['flip_direction'] = [self.direction] return results def __repr__(self): return self.__class__.__name__ + f'(prob={self.prob})'
[docs]@PIPELINES.register_module() class Normalize: """Normalize the image. Added key is "img_norm_cfg". Args: mean (sequence): Mean values of 3 channels. std (sequence): Std values of 3 channels. to_rgb (bool): Whether to convert the image from BGR to RGB, default is true. """ def __init__(self, mean, std, to_rgb=True): self.mean = np.array(mean, dtype=np.float32) self.std = np.array(std, dtype=np.float32) self.to_rgb = to_rgb def __call__(self, results): """Call function to normalize images. Args: results (dict): Result dict from loading pipeline. Returns: dict: Normalized results, 'img_norm_cfg' key is added into result dict. """ if results.get('channels_order') == 'RGB': self.to_rgb = False warnings.warn('The channels order is RBG, ' 'and image will not convert it again') img_keys = get_img_keys(results) for k in img_keys: results[k] = mmcv.imnormalize(results[k], self.mean, self.std, self.to_rgb) results['img_norm_cfg'] = dict( mean=self.mean, std=self.std, to_rgb=self.to_rgb) if self.to_rgb: results['channels_order'] = 'RGB' return results def __repr__(self): repr_str = self.__class__.__name__ repr_str += f'(mean={self.mean}, std={self.std}, to_rgb=' \ f'{self.to_rgb})' return repr_str
@PIPELINES.register_module() class BGR2RGB: """Convert image channels from BGR to RGB order. Returns: dict: results contained converted images. """ def __init__(self): super().__init__() def __call__(self, results): img_keys = get_img_keys(results) for k in img_keys: results[k] = mmcv.bgr2rgb(results[k]) results['channels_order'] = 'RGB' return results
[docs]@PIPELINES.register_module() class Rerange: """Rerange the image pixel value. Args: min_value (float or int): Minimum value of the reranged image. Default: 0. max_value (float or int): Maximum value of the reranged image. Default: 255. """ def __init__(self, min_value=0, max_value=255): assert isinstance(min_value, float) or isinstance(min_value, int) assert isinstance(max_value, float) or isinstance(max_value, int) assert min_value < max_value self.min_value = min_value self.max_value = max_value def __call__(self, results): """Call function to rerange images. Args: results (dict): Result dict from loading pipeline. Returns: dict: Reranged results. """ img_keys = get_img_keys(results) for k in img_keys: img = results[k] img_min_value = np.min(img) img_max_value = np.max(img) assert img_min_value < img_max_value # rerange to [0, 1] img = (img - img_min_value) / (img_max_value - img_min_value) # rerange to [min_value, max_value] img = img * (self.max_value - self.min_value) + self.min_value results[k] = img return results def __repr__(self): repr_str = self.__class__.__name__ repr_str += f'(min_value={self.min_value}, max_value={self.max_value})' return repr_str
[docs]@PIPELINES.register_module() class RandomCrop: """Random crop the image & flow. Args: crop_size (tuple): Expected size after cropping, (h, w). """ def __init__(self, crop_size): assert crop_size[0] > 0 and crop_size[1] > 0 self.crop_size = crop_size
[docs] def get_crop_bbox(self, img_shape): """Randomly get a crop bounding box.""" margin_h = max(img_shape[0] - self.crop_size[0], 0) margin_w = max(img_shape[1] - self.crop_size[1], 0) offset_h = np.random.randint(0, margin_h + 1) offset_w = np.random.randint(0, margin_w + 1) crop_y1, crop_y2 = offset_h, offset_h + self.crop_size[0] crop_x1, crop_x2 = offset_w, offset_w + self.crop_size[1] return crop_y1, crop_y2, crop_x1, crop_x2
[docs] def crop(self, img, crop_bbox): """Crop from ``img``""" crop_y1, crop_y2, crop_x1, crop_x2 = crop_bbox img = img[crop_y1:crop_y2, crop_x1:crop_x2, ...] return img
def __call__(self, results): """Call function to randomly crop images, flow maps. Args: results (dict): Result dict from loading pipeline. Returns: dict: Randomly cropped results, 'img_shape' key in result dict is updated according to crop size. """ map_keys = get_map_keys(results) img_shape = copy.deepcopy(results['img_shape']) valid_keys = get_valid_keys(results) crop_bbox = self.get_crop_bbox(img_shape) for k in map_keys: results[k] = self.crop(results[k], crop_bbox=crop_bbox) for k in valid_keys: results[k] = self.crop(results[k], crop_bbox=crop_bbox) results['img_shape'] = results['img1'].shape results['crop_bbox'] = crop_bbox return results def __repr__(self): return self.__class__.__name__ + f'(crop_size={self.crop_size})'
[docs]@PIPELINES.register_module() class ColorJitter: """Randomly change the brightness, contrast, saturation and hue of an image. Args: asymmetric_prob (float): the probability to do color jitter for two images asymmetrically. brightness (float, tuple): How much to jitter brightness. brightness_factor is chosen uniformly from [max(0, 1 - brightness), 1 + brightness] or the given [min, max]. Should be non negative numbers. contrast (float, tuple): How much to jitter contrast. contrast_factor is chosen uniformly from [max(0, 1 - contrast), 1 + contrast] or the given [min, max]. Should be non negative numbers. saturation (float, tuple): How much to jitter saturation. saturation_factor is chosen uniformly from [max(0, 1 - saturation), 1 + saturation] or the given [min, max]. Should be non negative numbers. hue (float, tuple): How much to jitter hue. hue_factor is chosen uniformly from [-hue, hue] or the given [min, max]. Should have 0<= hue <= 0.5 or -0.5 <= min <= max <= 0.5. """ def __init__(self, asymmetric_prob=0., brightness=0., contrast=0., saturation=0., hue=0.): assert isinstance( asymmetric_prob, float ), f'asymmetric_prob must be float, but got {type(asymmetric_prob)}' self.asymmetric_prob = asymmetric_prob self._brightness = self._check_input(brightness, 'brightness') self._contrast = self._check_input(contrast, 'contrast') self._saturation = self._check_input(saturation, 'saturation') self._hue = self._check_input( hue, 'hue', center=0., bound=(-0.5, 0.5), clip_first_on_zero=False) def _get_param(self): fn_idx = np.random.permutation(4) b = None if self._brightness is None else np.random.uniform( self._brightness[0], self._brightness[1]) c = None if self._contrast is None else np.random.uniform( self._contrast[0], self._contrast[1]) s = None if self._saturation is None else np.random.uniform( self._saturation[0], self._saturation[1]) h = None if self._hue is None else np.random.uniform( self._hue[0], self._hue[1]) return fn_idx, b, c, s, h def _check_input(self, value, name, center=1, bound=(0, float('inf')), clip_first_on_zero=True): if isinstance(value, (float, int)): if value < 0: raise ValueError( f'If {name} is a single number, it must be non negative.') value = [center - float(value), center + float(value)] if clip_first_on_zero: value[0] = max(value[0], 0.0) if not bound[0] <= value[0] <= value[1] <= bound[1]: raise ValueError(f'{name} values should be between {bound}') elif isinstance(value, (tuple, list)) and len(value) == 2: if not bound[0] <= value[0] <= value[1] <= bound[1]: raise ValueError(f'{name} values should be between {bound}') else: raise TypeError( f'{name} should be a single number or a list/tuple with ' f'length 2, but got {value}.') # if value is 0 or (1., 1.) for brightness/contrast/saturation # or (0., 0.) for hue, do nothing if value[0] == value[1] == center: value = None return value def color_jitter(self, img): fn_idx, brightness, contrast, saturation, hue = self._get_param() img = img if isinstance(img, (list, tuple)) else [img] length = len(img) for i in fn_idx: if i == 0 and brightness: img = [adjust_brightness(i, brightness) for i in img] if i == 1 and contrast: img = [adjust_contrast(i, contrast) for i in img] if i == 2 and saturation: img = [adjust_color(i, saturation) for i in img] if i == 3 and hue: img = [adjust_hue(i, hue) for i in img] if length == 1: return img[0] return img def __call__(self, results): """Call function to perform photometric distortion on images. Args: results (dict): Result dict from loading pipeline. Returns: dict: Result dict with images distorted. """ img_keys = get_img_keys(results) imgs = [] for k in img_keys: imgs.append(results[k]) asym = np.random.rand() # asymmetric if asym < self.asymmetric_prob: imgs_ = [] for i in imgs: i = self.color_jitter(i) imgs_.append(i) imgs = imgs_ else: # symmetric imgs = self.color_jitter(imgs) for i, k in enumerate(img_keys): results[k] = imgs[i] return results def __repr__(self): repr_str = self.__class__.__name__ repr_str += (f'asymmetric_prob={self.asymmetric_prob}, ' f'brightness_range={self._brightness}, ' f'contrast_range={self._contrast}, ' f'saturation_range={self._saturation}, ' f'hue_range={self._hue}') return repr_str
[docs]@PIPELINES.register_module() class PhotoMetricDistortion: """Apply photometric distortion to image sequentially, every transformation is applied with a probability of 0.5. The position of random contrast is in second or second to last. 1. random brightness 2. random contrast (mode 0) 3. convert color from BGR to HSV 4. random saturation 5. random hue 6. convert color from HSV to BGR 7. random contrast (mode 1) 8. randomly swap channels Args: brightness_delta (int): delta of brightness. contrast_range (tuple): range of contrast. saturation_range (tuple): range of saturation. hue_delta (int): delta of hue. """ def __init__(self, brightness_delta=32, contrast_range=(0.5, 1.5), saturation_range=(0.5, 1.5), hue_delta=18): self.brightness_delta = brightness_delta self.contrast_lower, self.contrast_upper = contrast_range self.saturation_lower, self.saturation_upper = saturation_range self.hue_delta = hue_delta
[docs] def convert(self, img, alpha=1, beta=0): """Multiple with alpha and add beat with clip.""" img = img.astype(np.float32) * alpha + beta img = np.clip(img, 0, 255) return img.astype(np.uint8)
[docs] def brightness(self, img): """Brightness distortion.""" if random.randint(2): beta = random.uniform(-self.brightness_delta, self.brightness_delta) img_ = [] for i_img in img: img_.append(self.convert(i_img, beta=beta)) return img_ else: return img
[docs] def contrast(self, img): """Contrast distortion.""" if random.randint(2): alpha = random.uniform(self.contrast_lower, self.contrast_upper) img_ = [] for i_img in img: img_.append(self.convert(i_img, alpha=alpha)) return img_ else: return img
[docs] def saturation(self, img): """Saturation distortion.""" if random.randint(2): alpha = random.uniform(self.saturation_lower, self.saturation_upper) img_ = [] for i_img in img: i_img = mmcv.bgr2hsv(i_img) i_img[:, :, 1] = self.convert(i_img[:, :, 1], alpha=alpha) i_img = mmcv.hsv2bgr(i_img) img_.append(i_img) return img_ else: return img
[docs] def hue(self, img): """Hue distortion.""" if random.randint(2): hue_val = random.randint(-self.hue_delta, self.hue_delta) img_ = [] for i_img in img: i_img = mmcv.bgr2hsv(i_img) i_img[:, :, 0] = (i_img[:, :, 0].astype(int) + hue_val) % 180 i_img = mmcv.hsv2bgr(i_img) img_.append(i_img) return img_ else: return img
def __call__(self, results): """Call function to perform photometric distortion on images. Args: results (dict): Result dict from loading pipeline. Returns: dict: Result dict with images distorted. """ img_keys = get_img_keys(results) img = [] for k in img_keys: img.append(results[k]) # random brightness img = self.brightness(img) # mode == 0 --> do random contrast first # mode == 1 --> do random contrast last mode = random.randint(2) if mode == 1: img = self.contrast(img) # random saturation img = self.saturation(img) # random hue img = self.hue(img) # random contrast if mode == 0: img = self.contrast(img) for i, k in enumerate(img_keys): results[k] = img[i] return results def __repr__(self): repr_str = self.__class__.__name__ repr_str += (f'(brightness_delta={self.brightness_delta}, ' f'contrast_range=({self.contrast_lower}, ' f'{self.contrast_upper}), ' f'saturation_range=({self.saturation_lower}, ' f'{self.saturation_upper}), ' f'hue_delta={self.hue_delta})') return repr_str
[docs]@PIPELINES.register_module() class RandomRotation: """Random rotation of the image from -angle to angle (in degrees). .. note: This augmentation is for dense optical flow data, not for sparse optical flow data. Args: prob (float): The rotation probability. angle (float): max angle of the rotation in the range from -180 to 180. auto_bound (bool): Whether to adjust the image size to cover the whole rotated image. Default: False """ def __init__(self, prob, angle, auto_bound=False): assert isinstance(prob, (float)) and prob >= 0. and prob <= 1. assert isinstance(angle, float) and angle >= -180. and angle <= 180. self.prob = prob self.angle = angle self.auto_bound = auto_bound def __call__(self, results): """Call function to rotate the images and optical flow map. Args: results (dict): Result dict from loading pipeline. Returns: dict: Processed results. """ rotate = True if np.random.rand() < self.prob else False angle = random.uniform(-self.angle, self.angle) if rotate: angle_rad = angle * np.pi / 180. cos = np.cos(angle_rad) sin = np.sin(angle_rad) map_keys = get_map_keys(results) flow_keys = get_flow_keys(results) for k in map_keys: img = results[k] img = mmcv.imrotate(img, angle, auto_bound=self.auto_bound) results[k] = img # Rotation matrix in image coordinate (origin is assumed to be # the top-left corners) with the angle that positive values mean # clockwise rotation # |cos -sin| # |sin cos| for fk in flow_keys: flow_ = copy.deepcopy(results[fk]) results[fk][:, :, 0] = \ cos * flow_[:, :, 0] - sin * flow_[:, :, 1] results[fk][:, :, 1] = \ sin * flow_[:, :, 0] + cos * flow_[:, :, 1] results['rotate'] = rotate results['rotate_angle'] = angle if self.auto_bound: results['img_shape'] = results['img1'].shape else: pass return results def __repr__(self): return self.__class__.__name__ + f'(angle={self.angle})'
[docs]@PIPELINES.register_module() class GaussianNoise: """Add Gaussian Noise to images. Add Gaussian Noise, with mean 0 and std sigma uniformly sampled from sigma_range, to images. And then clamp the images to clamp_range. Args: sigma_range(list(float) | tuple(float)): Uniformly sample sigma of gaussian noise in sigma_range. Default: (0, 0.04) clamp_range(list(float) | tuple(float)): The min and max value to clamp the images after adding gaussian noise. Default: (float('-inf'), float('inf')). """ def __init__(self, sigma_range=(0, 0.04), clamp_range=(float('-inf'), float('inf'))): assert isinstance(sigma_range, (list, tuple)) assert len(sigma_range) == 2 assert 0 <= sigma_range[0] < sigma_range[1] self.sigma_range = sigma_range assert isinstance(clamp_range, (list, tuple)) assert len(clamp_range) == 2 assert clamp_range[0] < clamp_range[1] self.clamp_range = clamp_range def __call__(self, results): """Call function to add gaussian noise to images. And then clamp the images to clamp_range. Args: results (dict): Result dict from loading pipeline. Returns: dict: Processed results. """ img_keys = get_img_keys(results) # create new meta 'sigma' results['sigma'] = random.uniform(*self.sigma_range) for k in img_keys: assert results[k].dtype == np.float32, \ 'Before add Gaussian noise, it needs do normalize.' results[k] += np.random.randn( *results['img_shape']) * results['sigma'] results[k] = np.clip( results[k], a_min=self.clamp_range[0], a_max=self.clamp_range[1]).astype(np.float32) return results def __repr__(self): return self.__class__.__name__ + f'(sigma_range={self.sigma_range})'
[docs]@PIPELINES.register_module() class RandomTranslate: """Random translation of the images and flow map. .. note: This augmentation is for dense optical flow data, not for sparse optical flow data. Args: prob (float): the probability to do translation. x_offset (float | tuple): translate ratio on x axis, randomly choice [-x_offset, x_offset] or the given [min, max]. Default: 0. y_offset (float | tuple): translate ratio on y axis, randomly choice [-x_offset, x_offset] or the given [min, max]. Default: 0. """ def __init__(self, prob=0., x_offset=0., y_offset=0.): assert isinstance(prob, float) and prob <= 1. and prob >= 0. self.prob = prob self.x_offset = self._check_input(x_offset) self.y_offset = self._check_input(y_offset) def _check_input(self, v): value = [] if isinstance(v, float): assert v >= 0. and v <= 1. value = [-v, v] elif isinstance(v, tuple): assert v[0] >= -0.1 and v[1] <= 1. value[0] = [v[0], v[1]] else: raise TypeError('Translate offset should be a single number or a ' f'list/tuple with length 2, but got {v}.') return np.random.uniform(value[0], value[1]) def __call__(self, results): """Call function to translate the images and optical flow map. Args: results (dict): Result dict from loading pipeline. Returns: dict: Processed results. """ translate = np.random.uniform(0, 1) < self.prob if translate: h, w, _ = results['img_shape'] tw = w * self.x_offset th = h * self.y_offset M = np.float32([[1, 0, tw], [0, 1, th]]) map_keys = get_map_keys(results) for k in map_keys: results[k] = cv2.warpAffine(results[k], M, (w, h)) results['translate'] = translate results['translate_offset'] = (self.x_offset, self.y_offset) return results
@PIPELINES.register_module() class RandomGamma: """Random gamma correction of images. Note: gamma larger than 1 make the shadows darker, while gamma smaller than 1 make dark regions lighter. Args: gamma_range(list | tuple): A list or tuple of length 2. Uniformly sample gamma from gamma_range. Defaults to (0.7, 1.5). """ def __init__(self, gamma_range=(0.7, 1.5)): assert isinstance(gamma_range, (list, tuple)) assert len(gamma_range) == 2 assert 0 <= gamma_range[0] <= gamma_range[1] self.gamma_range = gamma_range def __call__(self, results): """Call function to process images using gamma correction. Args: results (dict): Result dict from loading pipeline. Returns: dict: Processed results. """ img_keys = get_img_keys(results) # create new meta 'gamma' results['gamma'] = random.uniform(*self.gamma_range) for k in img_keys: results[k] = adjust_gamma(results[k], results['gamma']) return results def __repr__(self): return self.__class__.__name__ + f'(gamma_range={self.gamma_range})'
Read the Docs v: stable
Versions
latest
stable
1.x
dev-1.x
Downloads
pdf
html
epub
On Read the Docs
Project Home
Builds

Free document hosting provided by Read the Docs.