oneflow.utils

Utils

Copyright 2020 The OneFlow Authors. All rights reserved.

Licensed under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

class oneflow.utils.data.BatchSampler(sampler: oneflow.utils.data.sampler.Sampler[int], batch_size: int, drop_last: bool)

Wraps another sampler to yield a mini-batch of indices.

Parameters
  • sampler (Sampler or Iterable) – Base sampler. Can be any iterable object

  • batch_size (int) – Size of mini-batch.

  • drop_last (bool) – If True, the sampler will drop the last batch if its size would be less than batch_size

Example

>>> list(BatchSampler(SequentialSampler(range(10)), batch_size=3, drop_last=False))
[[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]
>>> list(BatchSampler(SequentialSampler(range(10)), batch_size=3, drop_last=True))
[[0, 1, 2], [3, 4, 5], [6, 7, 8]]
class oneflow.utils.data.ConcatDataset(datasets: Iterable[oneflow.utils.data.dataset.Dataset])

Dataset as a concatenation of multiple datasets.

This class is useful to assemble different existing datasets.

Parameters

datasets (sequence) – List of datasets to be concatenated

static cumsum(sequence)
cumulative_sizes: List[int]
datasets: List[oneflow.utils.data.dataset.Dataset[T_co]]
class oneflow.utils.data.DataLoader(dataset: oneflow.utils.data.dataset.Dataset[T_co], batch_size: Optional[int] = 1, shuffle: bool = False, sampler: Optional[oneflow.utils.data.sampler.Sampler[int]] = None, batch_sampler: Optional[oneflow.utils.data.sampler.Sampler[Sequence[int]]] = None, num_workers: int = 0, collate_fn: Optional[Callable[[List[T]], Any]] = None, pin_memory: bool = False, drop_last: bool = False, timeout: float = 0, worker_init_fn: Optional[Callable[[int], None]] = None, multiprocessing_context=None, generator=<oneflow._oneflow_internal.Generator object>, *, prefetch_factor: int = 2, persistent_workers: bool = False)

Data loader. Combines a dataset and a sampler, and provides an iterable over the given dataset.

The DataLoader supports both map-style and iterable-style datasets with single- or multi-process loading, customizing loading order and optional automatic batching (collation) and memory pinning.

See flow.utils.data documentation page for more details.

In consideration of compatibility, the design of our dataloader is consistent with pytorch, ref:https://github.com/pytorch/pytorch/tree/v1.7.0

Parameters
  • dataset (Dataset) – dataset from which to load the data.

  • batch_size (int, optional) – how many samples per batch to load (default: 1).

  • shuffle (bool, optional) – set to True to have the data reshuffled at every epoch (default: False).

  • sampler (Sampler or Iterable, optional) – defines the strategy to draw samples from the dataset. Can be any Iterable with __len__ implemented. If specified, shuffle must not be specified.

  • batch_sampler (Sampler or Iterable, optional) – like sampler, but returns a batch of indices at a time. Mutually exclusive with batch_size, shuffle, sampler, and drop_last.

  • num_workers (int, optional) – how many subprocesses to use for data loading (default: 0). 0 means that the data will be loaded in the main process.

  • collate_fn (callable, optional) – merges a list of samples to form a mini-batch of Tensor(s). Used when using batched loading from a map-style dataset.

  • pin_memory (bool, optional) – If True, the data loader will copy Tensors into CUDA pinned memory before returning them. If your data elements are a custom type, or your collate_fn returns a batch that is a custom type, see the example below. (default: False)

  • drop_last (bool, optional) – set to True to drop the last incomplete batch, if the dataset size is not divisible by the batch size. If False and the size of dataset is not divisible by the batch size, then the last batch will be smaller. (default: False)

  • timeout (numeric, optional) – if positive, the timeout value for collecting a batch from workers. Should always be non-negative. (default: 0)

  • worker_init_fn (callable, optional) – If not None, this will be called on each worker subprocess with the worker id (an int in [0, num_workers - 1]) as input, after seeding and before data loading. (default: None)

  • prefetch_factor (int, optional, keyword-only arg) – Number of samples loaded in advance by each worker. 2 means there will be a total of 2 * num_workers samples prefetched across all workers. (default: 2)

  • persistent_workers (bool, optional) – If True, the data loader will immediately initialize worker preocesses and not shutdown them after a dataset has been consumed once. This allows to maintain the workers Dataset instances alive. If you are using oneflow with RDMA support in distributed training, the persistent_workers must be True otherwise will encounter segmentation fault. (default: False)

Warning

If the spawn start method is used, worker_init_fn cannot be an unpicklable object, e.g., a lambda function.

Warning

len(dataloader) heuristic is based on the length of the sampler used. When dataset is an IterableDataset, it instead returns an estimate based on len(dataset) / batch_size, with proper rounding depending on drop_last, regardless of multi-process loading configurations. This represents the best guess OneFlow can make because OneFlow trusts user dataset code in correctly handling multi-process loading to avoid duplicate data.

However, if sharding results in multiple workers having incomplete last batches, this estimate can still be inaccurate, because (1) an otherwise complete batch can be broken into multiple ones and (2) more than one batch worth of samples can be dropped when drop_last is set. Unfortunately, OneFlow can not detect such cases in general.

batch_size: Optional[int]
check_worker_number_rationality()
dataset: oneflow.utils.data.dataset.Dataset[T_co]
drop_last: bool
num_workers: int
pin_memory: bool
prefetch_factor: int
sampler: oneflow.utils.data.sampler.Sampler
timeout: float
class oneflow.utils.data.Dataset(*args, **kwds)

An abstract class representing a Dataset.

All datasets that represent a map from keys to data samples should subclass it. All subclasses should overwrite __getitem__(), supporting fetching a data sample for a given key. Subclasses could also optionally overwrite __len__(), which is expected to return the size of the dataset by many Sampler implementations and the default options of DataLoader.

Note

DataLoader by default constructs a index sampler that yields integral indices. To make it work with a map-style dataset with non-integral indices/keys, a custom sampler must be provided.

class oneflow.utils.data.IterableDataset(*args, **kwds)

An iterable Dataset.

All datasets that represent an iterable of data samples should subclass it. Such form of datasets is particularly useful when data come from a stream.

All subclasses should overwrite __iter__(), which would return an iterator of samples in this dataset.

When a subclass is used with DataLoader, each item in the dataset will be yielded from the DataLoader iterator. When num_workers > 0, each worker process will have a different copy of the dataset object, so it is often desired to configure each copy independently to avoid having duplicate data returned from the workers.

Example 1: splitting workload across all workers in __iter__():

>>> class MyIterableDataset(flow.utils.data.IterableDataset):
...     def __init__(self, start, end):
...         super(MyIterableDataset).__init__()
...         assert end > start, "this example code only works with end >= start"
...         self.start = start
...         self.end = end
...
...     def __iter__(self):
...         iter_start = self.start
...         iter_end = self.end
...         return iter(range(iter_start, iter_end))
...
>>> # should give same set of data as range(3, 7), i.e., [3, 4, 5, 6].
>>> ds = MyIterableDataset(start=3, end=7)

>>> # Single-process loading
>>> print(list(flow.utils.data.DataLoader(ds, num_workers=0)))
[3, 4, 5, 6]

Example 2: splitting workload across all workers using worker_init_fn:

>>> class MyIterableDataset(flow.utils.data.IterableDataset):
...     def __init__(self, start, end):
...         super(MyIterableDataset).__init__()
...         assert end > start, "this example code only works with end >= start"
...         self.start = start
...         self.end = end
...
...     def __iter__(self):
...         return iter(range(self.start, self.end))
...
>>> # should give same set of data as range(3, 7), i.e., [3, 4, 5, 6].
>>> ds = MyIterableDataset(start=3, end=7)

>>> # Single-process loading
>>> print(list(flow.utils.data.DataLoader(ds, num_workers=0)))
[3, 4, 5, 6]
functions: Dict[str, Callable] = {}
reduce_ex_hook: Optional[Callable] = None
classmethod register_datapipe_as_function(function_name, cls_to_register)
classmethod register_function(function_name, function)
classmethod set_reduce_ex_hook(hook_fn)
class oneflow.utils.data.RandomSampler(data_source: Sized, replacement: bool = False, num_samples: Optional[int] = None, generator=None)

Samples elements randomly. If without replacement, then sample from a shuffled dataset. If with replacement, then user can specify num_samples to draw.

Parameters
  • data_source (Dataset) – dataset to sample from

  • replacement (bool) – samples are drawn on-demand with replacement if True, default=``False``

  • num_samples (int) – number of samples to draw, default=`len(dataset)`. This argument is supposed to be specified only when replacement is True.

  • generator (Generator) – Generator used in sampling.

data_source: Sized
property num_samples
replacement: bool
class oneflow.utils.data.Sampler(data_source: Optional[Sized])

Base class for all Samplers.

Every Sampler subclass has to provide an __iter__() method, providing a way to iterate over indices of dataset elements, and a __len__() method that returns the length of the returned iterators.

Note

The __len__() method isn’t strictly required by DataLoader, but is expected in any calculation involving the length of a DataLoader.

class oneflow.utils.data.SequentialSampler(data_source)

Samples elements sequentially, always in the same order.

Parameters

data_source (Dataset) – dataset to sample from

data_source: Sized
class oneflow.utils.data.Subset(dataset: oneflow.utils.data.dataset.Dataset[T_co], indices: Sequence[int])

Subset of a dataset at specified indices.

Parameters
  • dataset (Dataset) – The whole Dataset

  • indices (sequence) – Indices in the whole set selected for subset

dataset: oneflow.utils.data.dataset.Dataset[T_co]
indices: Sequence[int]
class oneflow.utils.data.SubsetRandomSampler(indices: Sequence[int], generator=None)

Samples elements randomly from a given list of indices, without replacement.

Parameters
  • indices (sequence) – a sequence of indices

  • generator (Generator) – Generator used in sampling.

indices: Sequence[int]
class oneflow.utils.data.TensorDataset(*tensors: oneflow.Tensor)

Dataset wrapping tensors.

Each sample will be retrieved by indexing tensors along the first dimension.

Parameters

*tensors (Tensor) – tensors that have the same size of the first dimension.

oneflow.utils.data.random_split(dataset: oneflow.utils.data.dataset.Dataset[T], lengths: Sequence[int], generator: Optional[object] = <built-in method default_generator of PyCapsule object>)List[oneflow.utils.data.dataset.Subset[T]]

Randomly split a dataset into non-overlapping new datasets of given lengths. Optionally fix the generator for reproducible results, e.g.:

>>> random_split(range(10), [3, 7], generator=flow.Generator().manual_seed(42))
Parameters
  • dataset (Dataset) – Dataset to be split

  • lengths (sequence) – lengths of splits to be produced

  • generator (Generator) – Generator used for the random permutation.

Copyright 2020 The OneFlow Authors. All rights reserved.

Licensed under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

class oneflow.utils.data.distributed.DistributedSampler(dataset: oneflow.utils.data.dataset.Dataset, num_replicas: Optional[int] = None, rank: Optional[int] = None, shuffle: bool = True, seed: int = 0, drop_last: bool = False)

Sampler that restricts data loading to a subset of the dataset.

It is especially useful in conjunction with flow.nn.parallel.DistributedDataParallel. In such a case, each process can pass a DistributedSampler instance as a DataLoader sampler, and load a subset of the original dataset that is exclusive to it.

Note

Dataset is assumed to be of constant size.

Parameters
  • dataset – Dataset used for sampling.

  • num_replicas (int, optional) – Number of processes participating in distributed training. By default, world_size is retrieved from the current distributed group.

  • rank (int, optional) – Rank of the current process within num_replicas. By default, rank is retrieved from the current distributed group.

  • shuffle (bool, optional) – If True (default), sampler will shuffle the indices.

  • seed (int, optional) – random seed used to shuffle the sampler if shuffle=True. This number should be identical across all processes in the distributed group. Default: 0.

  • drop_last (bool, optional) – if True, then the sampler will drop the tail of the data to make it evenly divisible across the number of replicas. If False, the sampler will add extra indices to make the data evenly divisible across the replicas. Default: False.

Warning

In distributed mode, calling the set_epoch() method at the beginning of each epoch before creating the DataLoader iterator is necessary to make shuffling work properly across multiple epochs. Otherwise, the same ordering will be always used.

For example:

>>> sampler = DistributedSampler(dataset) if is_distributed else None
>>> loader = DataLoader(dataset, shuffle=(sampler is None), sampler=sampler)
>>> for epoch in range(start_epoch, n_epochs):
...     if is_distributed:
...         sampler.set_epoch(epoch)
...     train(loader)
set_epoch(epoch: int)None

Sets the epoch for this sampler. When shuffle=True, this ensures all replicas use a different random ordering for each epoch. Otherwise, the next iteration of this sampler will yield the same ordering.

Parameters

epoch (int) – Epoch number.

oneflow.utils.from_torch(torch_tensor)Tensor

Create a oneflow tensor from torch tensor.

The returned tensor and torch tensor share the same memory.

Note

Currently only cpu tensor, local tensor is supported.

This function can be used in special data processing stages, torch’s some cpu ops can be used.

Parameters

input (torch.Tensor) – Input Tensor

Returns

oneflow.Tensor

For example:

import oneflow as flow
import torch

torch_t = torch.tensor([[1, 2, 3], [4, 5, 6]])
flow_t = flow.utils.from_torch(torch_t)

This feature from_torch is at Alpha Stage.

oneflow.utils.to_torch(flow_tensor)Tensor

Create a torch tensor from oneflow tensor.

The returned tensor and oneflow tensor share the same memory.

Note

Currently only cpu tensor, local tensor is supported.

Parameters

input (oneflow.Tensor) – Input Tensor

Returns

torch.Tensor

For example:

import oneflow as flow
import torch

flow_t = flow.tensor([[1, 2, 3], [4, 5, 6]])
torch_t = flow.utils.to_torch(flow_t)

This feature to_torch is at Alpha Stage.