
目录
在前面的文章之中,我们已经学习了PyTorch 分布式的基本模块,接下来我们通过几篇文章来看看如何把这些模块应用到实践之中,顺便把PyTorch分布式逻辑整体梳理一下。本文介绍如何把DDP和RPC framework结合起来。
本文以 COMBINING DISTRIBUTED DATAPARALLEL WITH DISTRIBUTED RPC FRAMEWORK 的翻译为基础,加入了自己的理解
本教程使用一个简单的示例来演示如何将 DistributedDataParallel (DDP) 与分布式 RPC 框架 相结合,将分布式数据并行性与分布式模型并行性相结合,以训练一个简单的模型。该示例的源代码可以在这里找到。
前面的教程 入门分布式数据并行 和入门分布式RPC框架 分别描述了如何执行分布式数据并行和分布式模型平行训练。尽管如此,您可能希望在多种训练范式中结合这两种技术。例如:
在本教程中,我们将介绍上述案例 1。我们的设置中共有 4 个 worker,如下所示:
整个训练过程执行如下:
HybridModel,其首先使用 master 提供的远程模块执行嵌入查找(embedding lookup),然后执行封装在 DDP 中的 FC 层。注意:如果您将 DDP 和 RPC 结合使用,则应始终使用Distributed Autograd进行反向传播。
我们看看系统如何启动。首先,在进行训练之前,需要设置所有worker。我们创建了 4 个进程,其中 rank 0 和 rank 1 是我们的trainer,rank 2是master,rank 3是参数服务器。
初始化逻辑如下:
_run_trainer 在每个trainer之上启动训练循环。ProcessGroup。_run_trainer RPC。具体代码如下:
def run_worker(rank, world_size):
r"""
A wrapper function that initializes RPC, calls the function, and shuts down
RPC.
"""
# We need to use different port numbers in TCP init_method for init_rpc and
# init_process_group to avoid port conflicts.
rpc_backend_options = TensorPipeRpcBackendOptions()
rpc_backend_options.init_method = "tcp://localhost:29501"
# Rank 2 is master, 3 is ps and 0 and 1 are trainers.
if rank == 2: # Master代码
rpc.init_rpc(
"master",
rank=rank,
world_size=world_size,
rpc_backend_options=rpc_backend_options,
)
remote_emb_module = RemoteModule( # 指向一个在参数服务器上保存的EmbeddingBag层
"ps",
torch.nn.EmbeddingBag,
args=(NUM_EMBEDDINGS, EMBEDDING_DIM),
kwargs={"mode": "sum"},
)
# Run the training loop on trainers.
futs = []
for trainer_rank in [0, 1]:
trainer_name = "trainer{}".format(trainer_rank)
fut = rpc.rpc_async( # 启动 trainer循环
trainer_name, _run_trainer, args=(remote_emb_module, trainer_rank)
)
futs.append(fut)
# Wait for all training to finish.
for fut in futs:
fut.wait()
elif rank <= 1:
# Initialize process group for Distributed DataParallel on trainers.
dist.init_process_group(
backend="gloo", rank=rank, world_size=2, init_method="tcp://localhost:29500"
)
# Initialize RPC.
trainer_name = "trainer{}".format(rank)
rpc.init_rpc(
trainer_name,
rank=rank,
world_size=world_size,
rpc_backend_options=rpc_backend_options,
)
# 只需等待来自 master的 _run_trainer RPC
# Trainer just waits for RPCs from master.
else:
rpc.init_rpc( # 参数服务器
"ps",
rank=rank,
world_size=world_size,
rpc_backend_options=rpc_backend_options,
)
# parameter server do nothing
pass # 啥也不干,只是等待来自trainer和master的 RPC
# block until all rpcs finish
rpc.shutdown()
if __name__ == "__main__":
# 2 trainers, 1 parameter server, 1 master.
world_size = 4
mp.spawn(run_worker, args=(world_size,), nprocs=world_size, join=True)目前逻辑如下,我们后续会继续拓展:
torch.multiprocessing.spawn
+
|
|
+----------------------------------------------------------------+----------------------------------+
| | | |
| | | |
v v v v
+-------------+-------------+ +----------+---------------+ +------------------+------------------+ +-------------+--------+
|trainer 0 rank = 0 | |trainer 1 rank = 1 | | master rank = 2 | |ps rank = 3 |
| | | | | | | |
| | | | | rpc.init_rpc | | rpc.init_rpc |
| | | | | | | |
| dist.init_process_group | | dist.init_process_group | | remote_emb_module = RemoteModule | | |
| | | | | | | |
| | | | | | | |
| rpc.init_rpc | | rpc.init_rpc | | fut = rpc.rpc_async(_run_trainer) | | |
| | | | | | | |
| | | | | | | |
+---------------------------+ +--------------------------+ +-------------------------------------+ +----------------------+手机如下:

支撑系统主要指的就是 _RemoteModule,其作用是在异地建立一个模型,具体代码在:torch/distributed/nn/api/remote_module.py。
RemoteModule实例只能在RPC初始化之后创建,它可以在指定的远程节点上创建用户指定的模块,其行为类似于常规的nn.Module方法,但不同之处是 RemoteModule 在远程节点上执行forward方法。RemoteModule 负责autograd recording,以确保向后传播可以将梯度传播回相应的远程模块。
RemoteModule 可以使用RPC framework <https://pytorch.org/docs/stable/rpc.html> 在处理器之间共享,且不会产生复制实际模块的任何开销,这相当于使用一个~torch.distributed.rpc.RRef指向远程模块。
要创建混合模型,通常应该在远程模块之外创建本地模块,而不是作为任何远程模块的子模块。如果远程模块放置在cuda设备上,那么任何输入CPU张量将自动移动到同一cuda设备之上。混合模型例子如下:
>>> class HybridModel(nn.Module):
>>> def __init__(self):
>>> nn.Module.__init__(self)
>>> self.remote_embedding = RemoteModule(...) # 在远端创建嵌入层
>>> self.local_linear = nn.Linear(...)使用例子如下,需要在两个不同进程上运行如下代码,例子之中,RemoteModule 创建时候,传入了一个"worker1/cpu"参数,意思是在 worker1 的 cpu 设备上运行这个RemoteModule。具体格式是: <workername> / <device>,其中 <device> 是torch.device类型。
Example::
>>> # On worker 0:
>>> import torch
>>> import torch.distributed.rpc as rpc
>>> from torch import nn, Tensor
>>> from torch.distributed.nn.api.remote_module import RemoteModule
>>>
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> remote_linear_module = RemoteModule(
>>> "worker1/cpu", nn.Linear, args=(20, 30),
>>> )
>>> input = torch.randn(128, 20)
>>> ret_fut = remote_linear_module.forward_async(input)
>>> ret = ret_fut.wait()
>>> rpc.shutdown()
>>> # On worker 1:
>>> import torch
>>> import torch.distributed.rpc as rpc
>>>
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()_RemoteModule定义如下,具体初始化逻辑是:
_module_interface_cls 。 _module_interface_cls 来在远端构建模块。_class _RemoteModule(nn.Module):
def __init__(
self,
remote_device: str,
module_cls: nn.Module,
args: Tuple = None,
kwargs: Dict[str, Any] = None,
_module_interface_cls: Any = None,
):
"""
Args:
remote_device (str): Device on the destination worker where we'd like to place this module.
The format should be "<workername>/<device>", where the device field can be parsed as torch.device type.
E.g., "trainer0/cpu", "trainer0", "ps0/cuda:0".
In addition, the device field can be optional and the default value is "cpu".
Returns:
A remote module instance which wraps the :class:`~nn.Module` created by the
user-provided ``module_cls``, it has a blocking ``forward`` method and an
asynchronous ``forward_async`` method that returns a future of the ``forward`` call
on the user-provided module on the remote side.
"""
super().__init__()
# NOTE: if a new attribute is added to this class, also need to add it
# to ``_REMOTE_MODULE_PICKLED_ATTRIBUTES`` for pickling/unpickling.
# Default arguments preperation.
# 1. 准备参数
args = args if args is not None else ()
kwargs = kwargs if kwargs is not None else {}
# 2. 设置运行的远端worker和远端设备
self.on, self.device = _parse_remote_device(remote_device)
agent = rpc._get_current_rpc_agent()
# If the device map of the remote worker is set,
# then enable moving any input CPU tensors to the same cuda device.
self.is_device_map_set = bool(
agent._get_device_map(agent.get_worker_info(self.on))
)
# ``enable_moving_cpu_tensors_to_cuda`` is less strict than ``is_device_map_set``:
# If ``enable_moving_cpu_tensors_to_cuda`` is true, but the device map is not set,
# then any CPU tensors can still be moved to a cuda device to run forward,
# but the output must be moved back to CPU before being sent over the wire.
enable_moving_cpu_tensors_to_cuda = torch.device(self.device).type == "cuda"
# 3. 如果设置了_module_interface_cls
if _module_interface_cls is not None:
# Users reply on this field to know if this generated RemoteModule is TorchScript-able.
self.is_scriptable = True
# 3.1 使用 _module_interface_cls 来在远端构建模块
# Instantiate template on remote side.
fut = rpc.rpc_async(
self.on,
_instantiate_template,
(_module_interface_cls, enable_moving_cpu_tensors_to_cuda),
)
# 3.2 在本地构建函数代理生成器
# Instantiate template on local side.
generated_module = (
instantiator.instantiate_scriptable_remote_module_template(
_module_interface_cls, enable_moving_cpu_tensors_to_cuda
)
)
self.generated_methods = generated_module._generated_methods
# 3.3 等待创建完成
# Create the module on the remote side.
fut.wait() # Ensure remote_module_cls is available on remote side.
# 3.4 在本地构建句柄
self.module_rref = rpc.rpc_sync(
self.on,
_create_module_with_interface,
(module_cls, args, kwargs, self.device, _module_interface_cls),
)
else: # 4 没有设置_module_interface_cls
self.is_scriptable = False
# 4.1 在本地构建函数代理生成器
self.generated_methods = (
_NON_SCRIPTABLE_REMOTE_MODULE_MODULE._generated_methods
)
# 4.2 在远端创建模块
# Create the module on the remote side.
self.module_rref = rpc.remote(
self.on,
_create_module,
(module_cls, args, kwargs, self.device),
)
# Install generated methods.
# 5. 在本地创建远端函数代理
for method in self.generated_methods:
method_name = method.__name__
method = torch.jit.export(method)
setattr(self, method_name, types.MethodType(method, self))其主要函数如下:
~torch.distributed.rpc.RRef列表。通常可以与~torch.distributed.optim.DistributedOptimizer结合使用。
~torch.distributed.rpc.RRef(RRef[nn.Module])类。
def remote_parameters(self, recurse: bool = True) -> List[rpc.RRef[Parameter]]:
"""
Returns a list of :class:`~torch.distributed.rpc.RRef` pointing to the
remote module's parameters. This can typically be used in conjuction
with :class:`~torch.distributed.optim.DistributedOptimizer`.
Args:
recurse (bool): if True, then returns parameters of the remote
module and all submodules of the remote module. Otherwise,
returns only parameters that are direct members of the
remote module.
Returns:
A list of :class:`~torch.distributed.rpc.RRef` (``List[RRef[nn.Parameter]]``)
to remote module's parameters.
"""
return rpc.rpc_sync(self.on, _param_rrefs, args=(self.module_rref, recurse))
def get_module_rref(self) -> rpc.RRef[nn.Module]:
"""
Returns an :class:`~torch.distributed.rpc.RRef` (``RRef[nn.Module]``)
pointing to the remote module.
"""
return self.module_rref于是逻辑图转换如下,在上图基础之上多了一个remote_emb_module,其在ps之上创建了一个RemoteModule。
torch.multiprocessing.spawn
+
|
|
+----------------------------------------------------------------+----------------------------------+
| | | |
| | | |
v v v v
+--------------+-------------+ +-----------+--------------+ +-------------------+-----------------+ +-------------+--------+
|trainer 0 rank = 0 | |trainer 1 rank = 1 | | master rank = 2 | |ps rank = 3 |
| | | | | | | |
| | | | | rpc.init_rpc | | rpc.init_rpc |
| | | | | | | |
| dist.init_process_group | | dist.init_process_group | | remote_emb_module +----------------------> RemoteModule |
| | | | | | | |
| | | | | | | |
| rpc.init_rpc | | rpc.init_rpc | | fut = rpc.rpc_async(_run_trainer) | | |
| | | | | | | |
| | | | | | | |
| | | | | | | |
+----------------------------+ +--------------------------+ +-------------------------------------+ +----------------------+手机如下:

在讨论 Trainer 的细节之前,让我们先介绍一下 Trainer使用的HybridModel。该模型由稀疏部分和稠密部分组成。
remote_emb_module) ,它持有一个在参数服务器上的nn.EmbeddingBag。即,此远程模块可以获取参数服务器上嵌入表的远程引用。
该模型的前向方法非常简单。它使用 RemoteModule 在参数服务器上执行嵌入查找forward ,并将其输出传播到 FC 层,这里的 FC 使用了DDP。
class HybridModel(torch.nn.Module):
r"""
The model consists of a sparse part and a dense part.
1) The dense part is an nn.Linear module that is replicated across all trainers using DistributedDataParallel.
2) The sparse part is a Remote Module that holds an nn.EmbeddingBag on the parameter server.
This remote model can get a Remote Reference to the embedding table on the parameter server.
"""
def __init__(self, remote_emb_module, device):
super(HybridModel, self).__init__()
self.remote_emb_module = remote_emb_module
self.fc = DDP(torch.nn.Linear(16, 8).cuda(device), device_ids=[device])
self.device = device
def forward(self, indices, offsets):
emb_lookup = self.remote_emb_module.forward(indices, offsets)
return self.fc(emb_lookup.cuda(self.device))逻辑拓展如下,两个trainer 之上也建立了remote_emb_module,指向了ps之上的RemoteModule。
torch.multiprocessing.spawn
+
|
|
+-----------------------------------------------------------------------------------+----------------------------------+
| | | |
| | | |
v v v v
+-----------+-------------+ +-----------------------+-------------------+ +---------------------+---------------+ +-------------+--------+
|trainer 0 rank = 0 | | trainer 1 rank = 1 | | master rank = 2 | |ps rank = 3 |
| | | | | | | |
| | | | | rpc.init_rpc | | rpc.init_rpc |
| dist.init_process_group | | dist.init_process_group | | | | |
| | | | | remote_emb_module +------------------------> RemoteModule |
| rpc.init_rpc | | rpc.init_rpc | | | | ^ ^ |
| | | | | | | | | |
| | | | | fut = rpc.rpc_async(_run_trainer) | | | | |
| | | | | | | | | |
| +---------------------+ | | +---------------------------+ | | | | | | |
| | HybridModel | | | |HybridModel | | | | | | | |
| | | | | | | | +-------------------------------------+ +----------------------+
| | | | | | | | | |
| | fc = DDP(Linear) | | | | fc = DDP(Linear()) | | | |
| | | | | | | | | |
| | remote_emb_module | | | | remote_emb_module+-------------------------------------------------------------+ |
| | + | | | | | | |
| +---------------------+ | | +---------------------------+ | |
| | | | | |
+-------------------------+ +-------------------------------------------+ |
| |
+--------------------------------------------------------------------------------------------------------------------+手机如下:

之前初始化时候,我们漏过了trainer的初始化,这里我们分析一下。
我们先看看 Trainer 上的设置。
HybridModel,远程模块持有参数服务器上的嵌入表。model.fc.parameters()来完成的,其将为每个参数创建一个 RRef 并将其附加到从remote_parameters()返回的列表中。model.parameters(),因为它会递归调用model.remote_emb_module.parameters(),而RemoteModule不支持这种操作。def _run_trainer(remote_emb_module, rank):
r"""
Each trainer runs a forward pass which involves an embedding lookup on the
parameter server and running nn.Linear locally. During the backward pass,
DDP is responsible for aggregating the gradients for the dense part
(nn.Linear) and distributed autograd ensures gradients updates are
propagated to the parameter server.
"""
# Setup the model.
model = HybridModel(remote_emb_module, rank)
# Retrieve all model parameters as rrefs for DistributedOptimizer.
# Retrieve parameters for embedding table.
model_parameter_rrefs = model.remote_emb_module.remote_parameters()
# model.fc.parameters() only includes local parameters.
# NOTE: Cannot call model.parameters() here,
# because this will call remote_emb_module.parameters(),
# which supports remote_parameters() but not parameters().
for param in model.fc.parameters():
model_parameter_rrefs.append(RRef(param)) # 这里添加了需要分布式优化的 DDP 的参数
# Setup distributed optimizer
opt = DistributedOptimizer(
optim.SGD,
model_parameter_rrefs, # dense参数和sparse参数一起分布式优化
lr=0.05,
)
criterion = torch.nn.CrossEntropyLoss()我们逻辑拓展如下,这里省略了 trainer 0 指向 参数服务器的箭头,与上图相比,增加了 DistributedOptimizer。
torch.multiprocessing.spawn
+
|
|
+-----------------------------------------------------------------------------------+----------------------------------+
| | | |
| | | |
v v v v
+--------------+-------------+ +-----------------------+-------------------+ +---------------------+---------------+ +---------------+-------------+
|trainer 0 rank = 0 | | trainer 1 rank = 1 | | master rank = 2 | | ps rank = 3 |
| | | | | | | |
| | | | | | | rpc.init_rpc |
| dist.init_process_group | | dist.init_process_group | | rpc.init_rpc | | |
| | | | | | | +----------------------+ |
| rpc.init_rpc | | rpc.init_rpc | | 1 | | | RemoteModule | |
| | | | | remote_emb_module +---------------------> | | |
| +------------------------+ | | +---------------------------------------+ | | | | | | |
| | _run_trainer | | | | _run_trainer | | | | | | remote_parameters() | |
| | | | | | | | | fut = rpc.rpc_async(_run_trainer) | | | | |
| | | | | | output = model(indices, offsets) | | | | | | | |
| | | | | | dist_autograd.backward | | | | | +------+--------+------+ |
| | | | | | opt.step | | | | | ^ ^ |
| | | | | | | | | | | | | |
| | +-------------------+ | | | | | | +-------------------------------------+ +-----------------------------+
| | | HybridModel | | | | | +-----------------------------+ | | | |
| | | | | | | | | HybridModel | | | | |
| | | fc = DDP(Linear) | | | | | | | | | | |
| | | remote_emb_module | | | | | | fc = DDP(Linear().cuda() | | | | |
| | | | | | | | | remote_emb_module+-------------------------------------------------------------------------> |
| | +-------------------+ | | | | | | | | 2 |
| | | | | | +-----------------------------+ | | |
| | +--------------------+ | | | | +-----------------------------+ | | |
| | |DistributedOptimizer| | | | | |DistributedOptimizer | | | |
| | +--------------------+ | | | | | +------------------------------------------------------------------------>
| | | | | | +-----------------------------+ | | 3
| +------------------------+ | | +---------------------------------------+ |
+----------------------------+ +-------------------------------------------+手机如下:

现在我们介绍在每个trainer上运行的主训练循环。这里 get_next_batch只是一个辅助函数,用于生成随机输入和训练目标。我们为多个epoch和每个batch运行该训练循环:
def get_next_batch(rank):
for _ in range(10):
num_indices = random.randint(20, 50)
indices = torch.LongTensor(num_indices).random_(0, NUM_EMBEDDINGS)
# Generate offsets.
offsets = []
start = 0
batch_size = 0
while start < num_indices:
offsets.append(start)
start += random.randint(1, 10)
batch_size += 1
offsets_tensor = torch.LongTensor(offsets)
target = torch.LongTensor(batch_size).random_(8).cuda(rank)
yield indices, offsets_tensor, target
# Train for 100 epochs
for epoch in range(100):
# create distributed autograd context
for indices, offsets, target in get_next_batch(rank):
with dist_autograd.context() as context_id:
output = model(indices, offsets)
loss = criterion(output, target)
# Run distributed backward pass
dist_autograd.backward(context_id, [loss])
# Tun distributed optimizer
opt.step(context_id)
# Not necessary to zero grads as each iteration creates a different
# distributed autograd context which hosts different grads
print("Training done for epoch {}".format(epoch))因为篇幅所限,我们只是把上面的trainer再细化如下图:
这些序号与下图中数字对应。
+---------------------------------------------------------------------+
| trainer 1 rank = 1 |
| +-----------------------------------+ |
| | dist.init_process_group 1 | |
| | | |
| | rpc.init_rpc | |
| | | |
| +-----------------------------------+ |
| +-----------------------------------------------------------------+ |
| | _run_trainer | |
| | | |
| | output = model(indices, offsets) | |
| | dist_autograd.backward + | |
| | opt.step | | |
| | +-----------------------------------------------------------+ | |
| | | HybridModel | 2 | | |
| | | | | | |
| | | fc = DDP(Linear().cuda() | | | |
| | | |4 | | |
| | | remote_emb_module | | | |
| | | | | | |
| | | v | | |
| | | +--------------------------+--------------------------+ | | |
| | | |forward | | | |
| | | | emb_lookup = remote_emb_module.forward() | | | |
| | | | + | | | |
| | | | | 5 | | | |
| | | | | | | | |
| | | | v | | | |
| | | | fc(emb_lookup.cuda(device) | | | |
| | | | | | | |
| | | +-----------------------------------------------------+ | | |
| | +-----------------------------------------------------------+ | |
| | +-----------------------------------------------------------+ | |
| | | DistributedOptimizer 3 | | |
| | | | | |
| | | HybridModel.remote_emb_module.remote_parameters() | | |
| | | | | |
| | | HybridModel.fc.parameters() | | |
| | | | | |
| | +-----------------------------------------------------------+ | |
| +-----------------------------------------------------------------+ |
+---------------------------------------------------------------------+手机如下:

注,可以在此处找到整个示例的源代码。
我们已经看了三篇PyTorch官方样例,里面对参数服务器的实现各有不同。对于本文来说,又加入了一个master作为协调者来统一各个worker。
总的来说,在PyTorch 之中,因为有了 RPC 机制,所以PyTorch 的参数服务器实现比 ps-lite, paracel 更佳灵活机动:
COMBINING DISTRIBUTED DATAPARALLEL WITH DISTRIBUTED RPC FRAMEWORK