Shortcuts

Source code for mmflow.datasets.builder

# Copyright (c) OpenMMLab. All rights reserved.
import platform
import random
from functools import partial
from typing import Optional, Sequence, Union

import mmcv
import numpy as np
import torch
from mmcv.parallel import collate
from mmcv.runner import get_dist_info
from mmcv.utils import Registry, build_from_cfg
from torch.utils.data import DataLoader, Dataset

from .samplers import DistributedSampler, MixedBatchDistributedSampler

if platform.system() != 'Windows':
    # https://github.com/pytorch/pytorch/issues/973
    import resource
    rlimit = resource.getrlimit(resource.RLIMIT_NOFILE)
    base_soft_limit = rlimit[0]
    hard_limit = rlimit[1]
    soft_limit = min(max(4096, base_soft_limit), hard_limit)
    resource.setrlimit(resource.RLIMIT_NOFILE, (soft_limit, hard_limit))

DATASETS = Registry('dataset')
PIPELINES = Registry('pipeline')


[docs]def build_dataset(cfg: Union[mmcv.Config, Sequence[mmcv.Config]], default_args: Optional[dict] = None) -> Dataset: """Build Pytorch dataset. Args: cfg (mmcv.Config): Config dict of dataset or list of config dict. It should at least contain the key "type". default_args (dict, optional): Default initialization arguments. .. note:: If the input config is a list, this function will concatenate them automatically. Returns: dataset: The built dataset based on the input config. """ from .dataset_wrappers import ConcatDataset, RepeatDataset if isinstance(cfg, (list, tuple)): dataset = ConcatDataset([build_dataset(c, default_args) for c in cfg]) elif cfg['type'] == 'ConcatDataset': dataset = ConcatDataset( [build_dataset(c, default_args) for c in cfg['datasets']]) elif cfg['type'] == 'RepeatDataset': dataset = RepeatDataset( build_dataset(cfg['dataset'], default_args), cfg['times']) else: dataset = build_from_cfg(cfg, DATASETS, default_args) return dataset
[docs]def build_dataloader(dataset: Dataset, samples_per_gpu: int, workers_per_gpu: int, sample_ratio: Optional[Sequence] = None, num_gpus: int = 1, dist: bool = True, shuffle: bool = True, seed: Optional[int] = None, persistent_workers: bool = False, **kwargs): """Build PyTorch DataLoader. In distributed training, each GPU/process has a dataloader. In non-distributed training, there is only one dataloader for all GPUs. Args: dataset (Dataset): A PyTorch dataset. samples_per_gpu (int): Number of training samples on each GPU, i.e., batch size of each GPU. workers_per_gpu (int): How many subprocesses to use for data loading for each GPU. sample_ratio (list, optional): The ratio for samples in mixed branch, sum of sample_ratio must be equal to 1. and the length must be equal to the length of datasets, e.g branch=8, sample_ratio=(0.5,0.25,0.25) means in one branch 4 samples from dataset1, 2 samples from dataset2 and 2 samples from dataset3. num_gpus (int): Number of GPUs. Only used in non-distributed training. dist (bool): Distributed training/test or not. Default: True. shuffle (bool): Whether to shuffle the data at every epoch. Default: True. seed (int, optional): the seed for generating random numbers for data workers. Default to None. persistent_workers (bool): If True, the data loader will not shutdown the worker processes after a dataset has been consumed once. This allows to maintain the workers Dataset instances alive. The argument also has effect in PyTorch>=1.7.0. Default: False. kwargs: any keyword argument to be used to initialize DataLoader Returns: DataLoader: A PyTorch dataloader. """ rank, world_size = get_dist_info() if dist: batch_size = samples_per_gpu num_workers = workers_per_gpu if sample_ratio is None: sampler = DistributedSampler( dataset, world_size, rank, shuffle=shuffle, seed=seed) shuffle = False else: from .dataset_wrappers import ConcatDataset sampler = MixedBatchDistributedSampler( datasets=dataset, sample_ratio=sample_ratio, num_replicas=world_size, rank=rank, shuffle=shuffle, seed=seed) shuffle = False dataset = ConcatDataset(dataset) else: sampler = None batch_size = num_gpus * samples_per_gpu num_workers = num_gpus * workers_per_gpu init_fn = partial( worker_init_fn, num_workers=num_workers, rank=rank, seed=seed) if seed is not None else None if torch.__version__ >= '1.7.0': data_loader = DataLoader( dataset, batch_size=batch_size, sampler=sampler, num_workers=num_workers, collate_fn=partial(collate, samples_per_gpu=samples_per_gpu), shuffle=shuffle, worker_init_fn=init_fn, persistent_workers=persistent_workers, **kwargs) else: data_loader = DataLoader( dataset, batch_size=batch_size, sampler=sampler, num_workers=num_workers, collate_fn=partial(collate, samples_per_gpu=samples_per_gpu), shuffle=shuffle, worker_init_fn=init_fn, **kwargs) return data_loader
def worker_init_fn(worker_id: int, num_workers: int, rank: int, seed: int): """Worker initialization function. Args: worker_id (int): the worker id for each worker subprocess. num_workers (int): how many subprocesses to use for data loading. rank (int): the rank of current process group. seed (int): the seed for generating random numbers for data workers. """ # The seed of each worker equals to # num_worker * rank + worker_id + user_seed worker_seed = num_workers * rank + worker_id + seed np.random.seed(worker_seed) random.seed(worker_seed) torch.manual_seed(worker_seed)
Read the Docs v: latest
Versions
latest
stable
Downloads
pdf
html
epub
On Read the Docs
Project Home
Builds

Free document hosting provided by Read the Docs.