oneflow.comm

oneflow communication function

Copyright 2020 The OneFlow Authors. All rights reserved.

Licensed under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

oneflow.comm.all_gather(tensor_list, tensor)

Gathers tensors from the whole group in a list.

Parameters
  • tensor_list (list[Tensor]) – Output list. It should contain correctly-sized tensors to be used for output of the collective.

  • tensor (Tensor) – Tensor to be broadcast from current process.

For example:

>>> # We have 1 process groups, 2 ranks.
>>> import oneflow as flow

>>> input = flow.tensor([[1, 2], [3, 4]], device="cuda") + flow.env.get_local_rank()
>>> # input on rank0
>>> input 
tensor([[1, 2],
        [3, 4]], device='cuda:0', dtype=oneflow.int64)
>>> # input on rank1
>>> input 
tensor([[2, 3],
        [4, 5]], device='cuda:1', dtype=oneflow.int64)
>>> tensor_list = [flow.zeros(2, 2, dtype=flow.int64) for _ in range(2)]
>>> flow.comm.all_gather(tensor_list, input)
>>> # result on rank0
>>> tensor_list 
[tensor([[1, 2],
        [3, 4]], device='cuda:0', dtype=oneflow.int64), tensor([[2, 3],
        [4, 5]], device='cuda:0', dtype=oneflow.int64)]
>>> # result on rank1
>>> tensor_list 
[tensor([[1, 2],
        [3, 4]], device='cuda:1', dtype=oneflow.int64), tensor([[2, 3],
        [4, 5]], device='cuda:1', dtype=oneflow.int64)]
oneflow.comm.all_reduce(tensor)

Reduces the tensor data across all machines in such a way that all get the final result. After the call tensor is going to be bitwise identical in all processes.

Parameters

tensor (Tensor) – the input tensor

For example:

>>> # We have 1 process groups, 2 ranks.
>>> import oneflow as flow

>>> tensor = flow.tensor([[1, 2], [3, 4]], device="cuda") + flow.env.get_local_rank()
>>> # tensor on rank0
>>> tensor 
tensor([[1, 2],
        [3, 4]], device='cuda:0', dtype=oneflow.int64)
>>> # tensor on rank1
>>> tensor 
tensor([[2, 3],
        [4, 5]], device='cuda:1', dtype=oneflow.int64)
>>> flow.comm.all_reduce(tensor)
>>> tensor.numpy()
array([[3, 5],
       [7, 9]], dtype=int64)
oneflow.comm.all_to_all(output_tensor_list, input_tensor_list)

Each process scatters list of input tensors to all processes in a group and return gathered list of tensors in output list.

Parameters
  • output_tensor_list (list[Tensor]) – List of tensors to be gathered one per rank.

  • input_tensor_list (list[Tensor]) – List of tensors to scatter one per rank.

oneflow.comm.barrier()

Synchronizes all processes.

oneflow.comm.broadcast(tensor, src)

Broadcasts the tensor to the whole group. tensor must have the same number of elements in all processes participating in the collective.

Parameters
  • tensor (Tensor) – Data to be sent if src is the rank of current process, and tensor to be used to save received data otherwise.

  • src (int) – Source rank.

>>> # We have 1 process groups, 2 ranks.
>>> import oneflow as flow
>>> tensor = flow.tensor([[1, 2], [3, 4]], device="cuda") + flow.env.get_local_rank()
>>> # input on rank0
>>> tensor 
tensor([[1, 2],
        [3, 4]], device='cuda:0', dtype=oneflow.int64)
>>> # input on rank1
>>> tensor 
tensor([[2, 3],
        [4, 5]], device='cuda:1', dtype=oneflow.int64)
>>> flow.comm.broadcast(tensor, 0)
>>> # result on rank0
>>> tensor 
tensor([[1, 2],
        [3, 4]], device='cuda:0', dtype=oneflow.int64)
oneflow.comm.gather(tensor, gather_list=None, dst=0)

Gathers a list of tensors in a single process.

Parameters
  • tensor (Tensor) – Input tensor.

  • gather_list (list[Tensor], optional) – List of appropriately-sized tensors to use for gathered data (default is None, must be specified on the destination rank)

  • dst (int, optional) – Destination rank (default is 0)

oneflow.comm.recv()

Receives a tensor synchronously.

All(send_meta is False) or none of shape, dtype and device should have value.

Parameters
  • src (int, optional) – Source rank. Will receive from any process if unspecified.

  • shape (optional) – output tensor shape.

  • dataType (optional) – output tensor data type.

  • device (optional) – output tensor device.

  • out (Tensor, optional) – Tensor to fill with received data.

Returns

if out is None, return received tensor. otherwise got data from out self without return.

oneflow.comm.reduce(tensor, dst)

Reduces the tensor data across all machines.

Only the process with rank dst is going to receive the final result.

Parameters
  • tensor (Tensor) – Input and output of the collective. The function operates in-place.

  • dst (int) – Destination rank

oneflow.comm.reduce_scatter(output, input_list)

Reduces, then scatters a list of tensors to all processes in a group.

Parameters
  • output (Tensor) – Output tensor.

  • input_list (list[Tensor]) – List of tensors to reduce and scatter.

oneflow.comm.scatter(tensor, scatter_list=None, src=0)

Scatters a list of tensors to all processes in a group.

Each process will receive exactly one tensor and store its data in the tensor argument.

Parameters
  • tensor (Tensor) – Output tensor.

  • scatter_list (list[Tensor]) – List of tensors to scatter (default is None, must be specified on the source rank)

  • src (int) – Source rank (default is 0)

oneflow.comm.send()

Sends a tensor synchronously.

Parameters
  • tensor (Tensor) – Tensor to send.

  • dst (int) – Destination rank.

  • send_meta (Bool) – Whether to send meta information (default is True)