oneflow.distributed¶
Note
Please refer to OneFlow Distributed Overview for a brief introduction to all features related to distributed training.
OneFlow provides two ways to accomplish Distributed Training:
The first way is that users are recommended to use OneFlow’s global Tensor for distributed training. Global Tensor regards the computing cluster as a supercomputing device, allowing users to write distributed training code just like in a single-machine environment.
OneFlow also provides a DDP(DistributedDataParallel) module aligned with PyTorch. DDP has been well-known and widely used in data parallelism by the majority of PyTorch users. Also see PyTorch DDP introduction.
Basic¶
When you start distributed training in OneFlow, the following functions can be used.
Returns the number of processes in the current process group. |
|
Returns the rank of current process group. |
|
Returns the local rank of current machine. |
|
Returns the number of machines in the current process group. |
|
Init RDMA in the current envirment. |
|
Returns whether RDMA is initialized in the current envirment or not. |
Global Tensor¶
Construct Global Tensor¶
A Global Tensor can be created with a placement
and a sbp
. The placement
describes the physical devices of the global tensor will be allocated, and the sbp
describes its distribution among these devices.
>>>import oneflow as flow
>>> # Place a global tensor on cuda device of rank(process) 0 and 1
>>> placement = flow.placement(type="cuda", ranks=[0, 1])
>>> # Each rank's local data is a part data as a result of spliting global data on dim 0
>>> sbp = flow.sbp.split(dim=0)
>>> # Create a global tensor by randn
>>> x = flow.randn(4, 5, placement=placement, sbp=sbp)
>>> x.shape
oneflow.Size([4, 5])
Convert Local Tensor to Global Tensor¶
With Tensor.to_global
interface, Local Tensor can create a Global Tensor and use that Local Tensor as its local component at the current node.
Two local tensors with the shape of (2,5)
are created separately on two devices. While after the to_global
method, the global tensor with a shape of (4,5)
is obtained.
Code running on Node 0
import oneflow as flow
x = flow.randn(2,5)
placement = flow.placement("cuda", [0,1])
sbp = flow.sbp.split(0)
x_global = x.to_global(placement=placement, sbp=sbp)
x_global.shape
Code running on Node 1
import oneflow as flow
x = flow.randn(2,5)
placement = flow.placement("cuda", [0,1])
sbp = flow.sbp.split(0)
x_global = x.to_global(placement=placement, sbp=sbp)
x_global.shape
Redistribute Global Tensor¶
Redistributing a Global Tensor means moving its data to another device group (or placement), or changing its data distribution (or SBP) across the group, or both at the same time. The redistributed tensor is still a Global Tensor.
>>> import oneflow as flow
>>> x = flow.tensor([1.0, 2.0], placement=flow.placement("cuda", ranks=[0, 1]), sbp=flow.sbp.split(0))
>>> y = x.to_global(placement=flow.placement("cuda", ranks=[2, 3]), sbp=flow.sbp.broadcast)
According to the operator’s semantics, OneFlow defines a sequence of valid input and output SBP combinations for each built-in operator. So OneFlow could automatically redistribute the Global Tensor to satisfy the operator’s SBP requirements for its input Tensor. For example, the following code:
>>> import oneflow as flow
>>> x = flow.randn(4, 4,
placement=flow.placement("cuda", ranks=[0, 1]),
sbp=flow.sbp.split(0))
>>> y = flow.randn(4, 4,
placement=flow.placement("cuda", ranks=[0, 1]),
sbp=flow.sbp.split(1))
>>> z = x + y
When x + y
is executed, since x is split along dimension 0
and y is split along dimension 1
, their local components at each node can not be added directly, then OneFlow will automatically redistribute one of x and y to make them have the same SBP, and complete the add operation successfully.
Note
Global Tensor can not be used in combination with DDP currently.
Global Tensor requires all devices to execute at the same pace, otherwise, it may cause multi-process deadlock.
Get Local Tensor from Global Tensor¶
With Tensor.to_local
interface, the Global Tensor can return its local component at the current node.
y = x.to_local()
y.is_local
True
y
tensor([[ 2.9186e-01, -3.9442e-01, 4.7072e-04, -3.2216e-01, 1.7788e-01],
[-4.5284e-01, 1.2361e-01, -3.5962e-01, 2.6651e-01, 1.2951e+00]],
device='cuda:0', dtype=oneflow.float32)
DistributedDataParallel¶
For more information about DistributedDataParallel, see nn.parallel.DistributedDataParallel
The following script shows the process of using oneflow.nn.parallel.DistributedDataParallel
for training data parallel:
import oneflow as flow
from oneflow.nn.parallel import DistributedDataParallel as ddp
train_x = [
flow.tensor([[1, 2], [2, 3]], dtype=flow.float32),
flow.tensor([[4, 6], [3, 1]], dtype=flow.float32),
]
train_y = [
flow.tensor([[8], [13]], dtype=flow.float32),
flow.tensor([[26], [9]], dtype=flow.float32),
]
class Model(flow.nn.Module):
def __init__(self):
super().__init__()
self.lr = 0.01
self.iter_count = 500
self.w = flow.nn.Parameter(flow.tensor([[0], [0]], dtype=flow.float32))
def forward(self, x):
x = flow.matmul(x, self.w)
return x
m = Model().to("cuda")
m = ddp(m)
loss = flow.nn.MSELoss(reduction="sum")
optimizer = flow.optim.SGD(m.parameters(), m.lr)
for i in range(0, m.iter_count):
rank = flow.env.get_rank()
x = train_x[rank].to("cuda")
y = train_y[rank].to("cuda")
y_pred = m(x)
l = loss(y_pred, y)
if (i + 1) % 50 == 0:
print(f"{i+1}/{m.iter_count} loss:{l}")
optimizer.zero_grad()
l.backward()
optimizer.step()
print(f"\nw:{m.w}")
There are only two differences between the data parallelism training code and the stand-alone single-card script:
Use DistributedDataParallel to wrap the module object (m = ddp(m))
Use get_rank to get the current device number and distribute the data to the device.
Then use launcher to run the script, leave everything else to OneFlow, which makes distributed training as simple as stand-alone single-card training:
python3 -m oneflow.distributed.launch --nproc_per_node 2 ./ddp_train.py
Communication collectives¶
Reduces the tensor data across all machines in such a way that all get the final result. |
|
Gathers tensors from the whole group in a list. |
|
Gather tensors from all ranks and put them in a single output tensor. |
|
Each process scatters list of input tensors to all processes in a group and return gathered list of tensors in output list. |
|
Broadcasts the tensor to the whole group. |
|
Synchronizes all processes. |
|
Gathers a list of tensors in a single process. |
|
Reduces the tensor data across all machines. |
|
Reduces, then scatters a list of tensors to all processes in a group. |
|
Reduces, then scatters a tensor to all ranks. |
|
Receives a tensor synchronously. |
|
Scatters a list of tensors to all processes in a group. |
|
Sends a tensor synchronously. |
We also provide PyTorch-compatible APIs for communication collectives, for example, oneflow.distributed.all_reduce(tensor, op=ReduceOp.SUM, group=None, async_op=False). For more information, see PyTorch Distributed Communication. Note that we currently only support op=ReduceOp.SUM, group=None and async_op=False in these operations.
Launching distributed training¶
run commands below to see more about usage.
python3 -m oneflow.distributed.launch -h
usage: launch.py [-h] [--nnodes NNODES] [--node_rank NODE_RANK]
[--nproc_per_node NPROC_PER_NODE] [--master_addr MASTER_ADDR]
[--master_port MASTER_PORT] [-m] [--no_python]
[--redirect_stdout_and_stderr] [--logdir LOGDIR]
training_script ...
OneFlow distributed training launch helper utility that will spawn up multiple
distributed processes
positional arguments:
training_script The full path to the single GPU training program/script to be
launched in parallel, followed by all the arguments for the
training script
training_script_args
optional arguments:
-h, --help show this help message and exit
--nnodes NNODES The number of nodes to use for distributed training
--node_rank NODE_RANK
The rank of the node for multi-node distributed training
--nproc_per_node NPROC_PER_NODE
The number of processes to launch on each node, for GPU
training, this is recommended to be set to the number of GPUs in
your system so that each process can be bound to a single GPU.
--master_addr MASTER_ADDR
Master node (rank 0)'s address, should be either the IP address
or the hostname of node 0, for single node multi-proc training,
the --master_addr can simply be 127.0.0.1
--master_port MASTER_PORT
Master node (rank 0)'s free port that needs to be used for
communication during distributed training
-m, --module Changes each process to interpret the launch script as a python
module, executing with the same behavior as'python -m'.
--no_python Do not prepend the training script with "python" - just exec it
directly. Useful when the script is not a Python script.
--redirect_stdout_and_stderr
write the stdout and stderr to files 'stdout' and 'stderr'. Only
available when logdir is set
--logdir LOGDIR Relative path to write subprocess logs to. Passing in a relative
path will create a directory if needed. Note that successive
runs with the same path to write logs to will overwrite existing
logs, so be sure to save logs as needed.