目录
本系列介绍分布式优化器,分为三篇文章,分别是基石篇,DP/DDP/Horovod 之中数据并行的优化器,PyTorch 分布式优化器,按照深度递进。本文介绍PyTorch 分布式优化器和PipeDream之中的优化器,主要涉及模型并行(流水线并行)。
之前无论是 DP, DDP,或者 Horovod,实质上的都是处理数据并行,比如 DDP 将相同的模型复制到所有 GPU,其中每个 GPU 使用输入数据的不同分区。虽然它可以显着加速训练过程,但它不适用于模型太大而无法放入单个 GPU 的某些用例。于是人们引入了模型并行(model parallel)。
与此对应,优化器也需要做不同的修改以适应模型并行的需求。为了更好的分析,本文首先介绍单机模型并行,然后介绍PyTorch分布式优化器。
下面文字翻译自 https://pytorch.org/tutorials/intermediate/model_parallel_tutorial.html ,加入了一些自己的思考和理解。
模型并行被广泛用于分布式训练。与DataParallel
相比,模型并行将单个模型拆分到不同的 GPU 上,而不是在每个 GPU 上复制整个模型(具体来说,假设一个模型 m
包含 10 层,当使用DataParallel
,每个 GPU 将拥有这 10 层的全部副本,而当在两个 GPU 上使用模型并行时,每个 GPU 可以托管 5 层)。
模型并行的高级思想是将模型的不同子网络放置在不同的设备上,并相应地实现该forward
方法以便跨设备移动中间输出。由于单个设备上只有模型的一部分在运行,因此一组设备可以共同服务于一个更大的模型。
在这篇文章中,我们不会尝试构建巨大的模型并将它们压缩到有限数量的 GPU 中。相反,这篇文章侧重于展示模型并行的想法。读者可以将这些想法应用到实际应用中。
让我们从一个包含两个线性层的玩具模型开始。要在两个 GPU 上运行这个模型,只需将每个线性层放在不同的 GPU 上,并相应地移动输入和中间输出以匹配层设备。
import torch
import torch.nn as nn
import torch.optim as optim
class ToyModel(nn.Module):
def __init__(self):
super(ToyModel, self).__init__()
self.net1 = torch.nn.Linear(10, 10).to('cuda:0')
self.relu = torch.nn.ReLU()
self.net2 = torch.nn.Linear(10, 5).to('cuda:1')
def forward(self, x):
x = self.relu(self.net1(x.to('cuda:0')))
return self.net2(x.to('cuda:1'))
ToyModel
的代码看起来与在单个 GPU 上的实现方式非常相似。只是修改了两个部分:网络构造部分和forward部分。
__init__
方法使用了两个to(device)
语句用来在适当的设备上放置线性层,这样就把整个网络拆分成两个部分,然后就可以分别运行在不同的GPU之上。to(device)
语句用来在适当的设备上放置张量,这样可以把一个layer的输出结果通过tensor.to的语义拷贝到另一个layer所在的GPU上。这是模型中唯一需要更改的地方。backward()
和torch.optim
会可以应付这种情况,它们自动接管梯度,仿佛模型是一个GPU之上。在调用损失函数时,您只需要确保标签与网络的输出在同一设备上。
model = ToyModel()
loss_fn = nn.MSELoss()
optimizer = optim.SGD(model.parameters(), lr=0.001)
optimizer.zero_grad()
outputs = model(torch.randn(20, 10))
labels = torch.randn(20, 5).to('cuda:1')
loss_fn(outputs, labels).backward()
optimizer.step()
这里最重要的是 labels = torch.randn(20, 5).to('cuda:1'),这保证了标签在 cuda:1'。
回忆一下之前forward的代码:self.net2(x.to('cuda:1'))。这两行代码确保标签与输出在同一设备 cuda:1' 上。
初始化之后如下:
+--------------------+ +------------------------+
| cuda:0 | | cuda:1 |
| | | |
| | | |
| | | |
| net1(x) | | net2(x) |
| | | |
| | | |
| | | |
+--------------------+ +------------------------+
forward 操作和设定label之后如下,现在输出和label都在GPU 1 之上:
+--------------------+ +------------------------+
| cuda:0 | | cuda:1 |
| | | |
| | | |
| | | |
x.to('cuda:0')-------> net1(x) +-------> x.to('cuda:1') +--------> net2(x) |
| | | |
| | | labels.to('cuda:1') |
| | | |
+--------------------+ +------------------------+
还可以通过更改几行代码把一个现有的单 GPU 模块转换到在多个 GPU 上运行。下面的代码展示了如何分解 torchvision.models.resnet50()
到两个 GPU之上。基本想法是继承现有ResNet
模块,并在构建过程中将层拆分为两个 GPU。然后,重载forward
方法以便把两个子网络拼接起来,forward
具体是通过相应地移动中间输出来完成。
from torchvision.models.resnet import ResNet, Bottleneck
num_classes = 1000
class ModelParallelResNet50(ResNet):
def __init__(self, *args, **kwargs):
super(ModelParallelResNet50, self).__init__(
Bottleneck, [3, 4, 6, 3], num_classes=num_classes, *args, **kwargs)
self.seq1 = nn.Sequential(
self.conv1,
self.bn1,
self.relu,
self.maxpool,
self.layer1,
self.layer2
).to('cuda:0')
self.seq2 = nn.Sequential(
self.layer3,
self.layer4,
self.avgpool,
).to('cuda:1')
self.fc.to('cuda:1')
def forward(self, x):
x = self.seq2(self.seq1(x).to('cuda:1'))
return self.fc(x.view(x.size(0), -1))
上述实现解决了模型太大而无法放入单个 GPU 的情况下的问题。但是,您可能已经注意到,即使您的模型适合这种情况,它也许会比在单个 GPU 上运行要慢。这是因为,在任何时候,两个 GPU 中只有一个在工作,而另一个坐在那里什么也不做。在 layer2
和layer3
之中需要把中间输出从cuda:0
拷贝到 cuda:1
,这将进一步引起性能恶化。
让我们运行一个实验,以更从一个可以量化地角度来了解执行时间。在这个实验中,我们通过运行随机输入和标签来训练ModelParallelResNet50
和现有 torchvision.models.resnet50()
。训练后,模型不会产生任何有用的预测,但我们可以对执行时间有一个合理的了解。
import torchvision.models as models
num_batches = 3
batch_size = 120
image_w = 128
image_h = 128
def train(model):
model.train(True)
loss_fn = nn.MSELoss()
optimizer = optim.SGD(model.parameters(), lr=0.001)
one_hot_indices = torch.LongTensor(batch_size) \
.random_(0, num_classes) \
.view(batch_size, 1)
for _ in range(num_batches):
# generate random inputs and labels
inputs = torch.randn(batch_size, 3, image_w, image_h)
labels = torch.zeros(batch_size, num_classes) \
.scatter_(1, one_hot_indices, 1)
# run forward pass
optimizer.zero_grad()
outputs = model(inputs.to('cuda:0'))
# run backward pass
labels = labels.to(outputs.device)
loss_fn(outputs, labels).backward()
optimizer.step()
上述train(model)
方法使用nn.MSELoss
用作损失函数,使用optim.SGD
作为优化器。它模仿 128 X 128
图像的训练,这些图像被组织成 3 个批次,每批次包含 120 个图像。然后,我们使用timeit
来运行 train(model)
10 次,并且用标准差来绘制执行时间。
import matplotlib.pyplot as plt
plt.switch_backend('Agg')
import numpy as np
import timeit
num_repeat = 10
stmt = "train(model)"
setup = "model = ModelParallelResNet50()"
mp_run_times = timeit.repeat(
stmt, setup, number=1, repeat=num_repeat, globals=globals())
mp_mean, mp_std = np.mean(mp_run_times), np.std(mp_run_times)
setup = "import torchvision.models as models;" + \
"model = models.resnet50(num_classes=num_classes).to('cuda:0')"
rn_run_times = timeit.repeat(
stmt, setup, number=1, repeat=num_repeat, globals=globals())
rn_mean, rn_std = np.mean(rn_run_times), np.std(rn_run_times)
def plot(means, stds, labels, fig_name):
fig, ax = plt.subplots()
ax.bar(np.arange(len(means)), means, yerr=stds,
align='center', alpha=0.5, ecolor='red', capsize=10, width=0.6)
ax.set_ylabel('ResNet50 Execution Time (Second)')
ax.set_xticks(np.arange(len(means)))
ax.set_xticklabels(labels)
ax.yaxis.grid(True)
plt.tight_layout()
plt.savefig(fig_name)
plt.close(fig)
plot([mp_mean, rn_mean],
[mp_std, rn_std],
['Model Parallel', 'Single GPU'],
'mp_vs_rn.png')
结果表明,模型并行需要的执行时间比但GPU实现需要的时间长 4.02/3.75-1=7%
。所以我们可以得出结论,在 GPU 之间来回复制张量大约有 7% 的开销。
我们总结一下目前状况:
因此我们需要针对这两个问题进行针对性处理:
两个问题解决方案如下:
让所有 GPU 都动起来的一种选择是加入流水线机制:将每个批次进一步划分,组成一个分割(split )管道,这样当一个分割到达第二个子网络时,可以将接下来的分割送入第一个子网络。这样,两个连续的分割(split )就可以在两个 GPU 上同时运行。
为什么可以做到这一点?这是因为 CUDA 的异步并行执行逻辑。
如何减少拷贝传输时间?这个可以使用一些硬件和软件的结合来增加带宽减少延迟,比如:
PyTorch使用了NCCL库(基于CUDA计算)。
在接下来的实验中,我们进一步将每个"120 个图像批次" 分成 "20 个图像分割(split)"。由于 PyTorch 异步启动 CUDA 操作,因此实现不需要产生多个线程来实现并发。
class PipelineParallelResNet50(ModelParallelResNet50):
def __init__(self, split_size=20, *args, **kwargs):
super(PipelineParallelResNet50, self).__init__(*args, **kwargs)
self.split_size = split_size
def forward(self, x):
splits = iter(x.split(self.split_size, dim=0))
s_next = next(splits)
s_prev = self.seq1(s_next).to('cuda:1')
ret = []
for s_next in splits:
# A. s_prev runs on cuda:1
s_prev = self.seq2(s_prev)
ret.append(self.fc(s_prev.view(s_prev.size(0), -1)))
# B. s_next runs on cuda:0, which can run concurrently with A
s_prev = self.seq1(s_next).to('cuda:1')
s_prev = self.seq2(s_prev)
ret.append(self.fc(s_prev.view(s_prev.size(0), -1)))
return torch.cat(ret)
setup = "model = PipelineParallelResNet50()"
pp_run_times = timeit.repeat(
stmt, setup, number=1, repeat=num_repeat, globals=globals())
pp_mean, pp_std = np.mean(pp_run_times), np.std(pp_run_times)
plot([mp_mean, rn_mean, pp_mean],
[mp_std, rn_std, pp_std],
['Model Parallel', 'Single GPU', 'Pipelining Model Parallel'],
'mp_vs_rn_vs_pp.png')
请注意,设备到设备张量复制操作会在源设备和目标设备上的当前流上进行同步。如果创建多个流,则必须确保复制操作正确同步。在完成复制操作之前写入源张量或读取/写入目标张量可能会导致未定义的行为。上述实现仅在源设备和目标设备上使用默认流,因此没有必要强制执行额外的同步操作。
实验结果表明,把流水线输入加入到 ResNet50 的模型并行之后,训练过程加快了大约3.75/2.51-1=49%
。虽然它离理想的 100% 加速还很远。由于我们在流水线并行实现中引入了一个新参数split_sizes
,因此尚不清楚此新参数如何影响整体训练时间。直观地说,使用小的split_size
会导致许多微小的 CUDA 核启动,而使用大split_size
结果会导致在第一次和最后一次拆分期间产生相对较长的空闲时间。两者都不是最优的。split_size
这个特定实验可能有一个最佳配置。让我们尝试通过使用几个不同的split_size
值运行实验来找到它。
means = []
stds = []
split_sizes = [1, 3, 5, 8, 10, 12, 20, 40, 60]
for split_size in split_sizes:
setup = "model = PipelineParallelResNet50(split_size=%d)" % split_size
pp_run_times = timeit.repeat(
stmt, setup, number=1, repeat=num_repeat, globals=globals())
means.append(np.mean(pp_run_times))
stds.append(np.std(pp_run_times))
fig, ax = plt.subplots()
ax.plot(split_sizes, means)
ax.errorbar(split_sizes, means, yerr=stds, ecolor='red', fmt='ro')
ax.set_ylabel('ResNet50 Execution Time (Second)')
ax.set_xlabel('Pipeline Split Size')
ax.set_xticks(split_sizes)
ax.yaxis.grid(True)
plt.tight_layout()
plt.savefig("split_size_tradeoff.png")
plt.close(fig)
结果表明,设置split_size
为 12 实现了最快的训练速度,从而导致3.75/2.43-1=54%
加速。我们仍有机会进一步加快训练进程。例如,目前所有cuda:0
上的操作都放在其默认流上。这意味着下一个拆分的计算不能与上一个拆分的复制操作重叠。但是,由于 prev 和 next 拆分(split)是不同的张量,因此将一个张量的计算与另一个张量的拷贝重叠起来是没有问题的。这种实现需要在两个GPU上使用多个流,并且不同的子网结构需要不同的流管理策略。由于没有一个适用于所有模型并行用例的通用的多流解决方案,我们不会在本教程中讨论它。
这篇文章展示了几个性能测量。在您自己的机器上运行相同的代码时,您可能会看到不同的性能结果,因为结果取决于底层硬件和软件。要为您的环境获得最佳性能,正确的方法是首先生成结果曲线,并根据曲线来确定最佳分割大小,然后将该分割大小应用到管道输入之上。
我们已经了解了单机之上的模型并行,接下来就要看模型跨越多个服务器的分布式模型并行训练。
我们先设想一下如果自己实现分布式优化器,应该如何处理。
假如模型分为三个部分,有三个主机可以训练。
+----------------------------------------------------------------+
| Model |
| |
| +-----------------+ +------------------+ +-----------------+ |
| | Sub+model 1 | | Sub+model 2 | | Sub+model 3 | |
| | | | | | | |
| | | | | | | |
| +-----------------+ +------------------+ +-----------------+ |
| |
+----------------------------------------------------------------+
+-------------------+ +------------------+ +-----------------+
| Host 1 | | Host 2 | | Host 3 |
| | | | | |
| | | | | |
| | | | | |
| | | | | |
| | | | | |
+-------------------+ +------------------+ +-----------------+
我们会显式的把这三个部分分别部署到三个主机之上,在三个主机之上都有一套自己的训练代码,每个训练代码之中都有自己的本地优化器负责优化本地子模型的参数。
+---------------------+ +---------------------+ +---------------------+
| Host 1 | | Host 2 | | Host 3 |
| | | | | |
| +-----------------+ | | +-----------------+ | | +-----------------+ |
| | Sub model 1 | |forward | | Sub model 2 | |forward | | Sub model 3 | |
| | | +-------> | | | +-------> | | | |
| |_parameters <--+ | | | |_parameters <--+ | | | |_parameters <--+ | |
| | | | | <-------+ | | | | <-------+ | | | |
| | | | | backward| | | | | backward| | | | |
| +-----------------+ | | +-----------------+ | | +-----------------+ |
| | | | | | | | |
| | | | | | | | |
| ------------------+ | | +-----------------+ | | +-----------------+ |
| |Optimizer 1 | | | | | Optimizer 2 | | | | | Optimizer 3 | | |
| | | | | | | | | | | | | | |
| | step() +---+ | | | | step() +---+ | | | | step()+---+ | |
| | | | | | | | | | | |
| +-----------------+ | | +-----------------+ | | +-----------------+ |
+---------------------+ +---------------------+ +---------------------+
但是这样有几个问题需要我们解决:
经过思考就会发现,这里面错综复杂。如果我们自己基于 PyTorch 来实现,你会发现这可能最终结果是一个 PipeDream。于是我们看看 PyTorch 如何处理。
PyTorch 使用 RPC 来解决这些问题。
前文我们提到了,PyTorch的分布式框架使用了四大天王:
rpc_sync()
(同步)、 rpc_async()
(异步)和 remote()
(异步并返回对远程返回值的引用)。 remote()
API适用如下情况:需要在远程创建某些内容但从不需要将其获取给调用者。Optimizer()
(例如,SGD()
,Adagrad()
等)和一个RRefs的参数列表。即,在每个不同的Ref所有者之上创建一个 Optimizer()
实例,然后运行step()
相应更新参数。当用户进行分布式前向和后向传播时,参数和梯度将分散在多个 worker 中,因此需要对每个相关 worker 进行优化。Distributed Optimizer 将所有这些本地优化器合而为一,并提供了简洁的构造函数和step()
API。我们使用官方图示,可以看到 PyTorch 分布式包的内部架构和逻辑关系。分布式优化器基于另外三者之上。
我们会在后续结合代码进行讲解如何使用。
首先说明一下,为了清晰的分析,我们后续忽略所有 script 相关部分。
DistributedOptimizer
的使用方法如下:
RRef
)。 这些也可以是包装在本地RRef中的本地参数。Optimizer
类作为本地优化器来运行所有的RRef owner。torch.distributed.optim.DistributedOptimizer.step()
时,分布式优化器使用 RPC 在适当的远程 worker 上远程执行所有本地优化器。torch.distributed.optim.DistributedOptimizer.step
必须获得一个分布式autograd context_id
作为输入,本地优化器将把梯度保存在相关的context之中。看起来有点抽象,我们需要一步一步分析。
综上所述,以下是使用分布式 autograd 和分布式优化器的简单端到端示例。 如果将代码放入名为“ dist_autograd_simple.py”的文件中,则可以使用命令MASTER_ADDR="localhost" MASTER_PORT=29500 python dist_autograd_simple.py
运行该代码:
import multiprocessing as mp
import torch
import torch.distributed.autograd as dist_autograd
from torch.distributed import rpc
from torch import optim
from torch.distributed.optim import DistributedOptimizer
def random_tensor():
return torch.rand((3, 3), requires_grad=True)
def _run_process(rank, dst_rank, world_size):
name = "worker{}".format(rank)
dst_name = "worker{}".format(dst_rank)
# Initialize RPC.
rpc.init_rpc(
name=name,
rank=rank,
world_size=world_size
)
# Use a distributed autograd context.
with dist_autograd.context() as context_id: # 本地优化器将把梯度保存在相关的context之中
# Forward pass (create references on remote nodes).
rref1 = rpc.remote(dst_name, random_tensor) # 在远端创建一个 random_tensor
rref2 = rpc.remote(dst_name, random_tensor) # 在远端创建一个 random_tensor
loss = rref1.to_here() + rref2.to_here() # 获取要优化的远程参数列表 (`RRef`)
# Backward pass (run distributed autograd).
dist_autograd.backward([loss.sum()])
# Build DistributedOptimizer.
dist_optim = DistributedOptimizer( # 分布式优化器在每个 worker 节点上创建其本地Optimizer的实例,并将持有这些本地优化器的 RRef。
optim.SGD,
[rref1, rref2],
lr=0.05,
)
# Run the distributed optimizer step.
dist_optim.step()
def run_process(rank, dst_rank, world_size):
_run_process(rank, dst_rank, world_size)
rpc.shutdown()
processes = []
# Run world_size workers.
world_size = 2
for i in range(world_size):
p = mp.Process(target=run_process, args=(i, (i + 1) % 2, world_size))
p.start()
processes.append(p)
for p in processes:
p.join()
DistributedOptimizer 得到了分散在 workers 之上参数的远端引用,然后对于这些参数在本地运行优化器。
对于单个worker来说,如果它接受到来自相同或不同客户端的~torch.distributed.optim.DistributedOptimizer.step
的并发调用,则这些调用将会在这个worker之上串行进行,因为每个worker的优化器一次只能处理一组梯度。
DistributedOptimizer 的定义其实看不到啥东西,这是因为 Python 的语言特性,我们没办法在统一地方看到类的成员变量,但是有一个 functional_optim_map 值得我们关注。 这里是把每个内置优化器又配置了一个对应的新优化器,比如 optim.Adagrad 对应的是 _FunctionalAdagrad,我们就选择一个新优化器看看。
class DistributedOptimizer:
"""
DistributedOptimizer takes remote references to parameters scattered
across workers and applies the given optimizer locally for each parameter.
This class uses :meth:`~torch.distributed.autograd.get_gradients` in order
to retrieve the gradients for specific parameters.
Concurrent calls to
:meth:`~torch.distributed.optim.DistributedOptimizer.step`,
either from the same or different clients, will
be serialized on each worker -- as each worker's optimizer can only work
on one set of gradients at a time. However, there is no guarantee that
the full forward-backward-optimizer sequence will execute for one client
at a time. This means that the gradients being applied may not correspond
to the latest forward pass executed on a given worker. Also, there is no
guaranteed ordering across workers.
`DistributedOptimizer` creates the local optimizer with TorchScript enabled
by default, so that optimizer updates are not blocked by the Python Global
Interpreter Lock (GIL) in the case of multithreaded training (e.g. Distributed
Model Parallel). This feature is currently enabled for most optimizers. You
can also follow `the recipe`__ in PyTorch tutorials to enable TorchScript support
for your own custom optimizers.
Args:
optimizer_class (optim.Optimizer): the class of optimizer to
instantiate on each worker.
params_rref (list[RRef]): list of RRefs to local or remote parameters
to optimize.
args: arguments to pass to the optimizer constructor on each worker.
kwargs: arguments to pass to the optimizer constructor on each worker.
"""
# dict to map a user passed in optimizer_class to a functional
# optimizer class if we have already defined inside the
# distributed.optim package, this is so that we hide the
# functional optimizer to user and still provide the same API.
functional_optim_map = {
optim.Adagrad: _FunctionalAdagrad,
optim.Adam: _FunctionalAdam,
optim.AdamW: _FunctionalAdamW,
optim.SGD: _FunctionalSGD,
optim.Adadelta: _FunctionalAdadelta,
optim.RMSprop: _FunctionalRMSprop,
optim.Rprop: _FunctionalRprop,
optim.Adamax: _FunctionalAdamax,
}
optim.SGD 对应的是 _FunctionalSGD。其代码位于 torch/distributed/optim/functional_sgd.py。具体是定义一个与TorchScript兼容的函数式SGD优化器,PyTorch 将以函数的方式使用这些优化器。在更新参数时,PyTorch 不使用 param.grad,而是显式地允许分布式优化器将梯度传递给 step 函数。注意:此优化器应该仅由分布式优化器内部使用,而不是向用户公开。
import torch.optim._functional as F
# Define a TorchScript compatible Functional SGD Optimizer
# where we use these optimizer in a functional way.
# Instead of using the `param.grad` when updating parameters,
# we explicitly allow the distributed optimizer pass gradients to
# the `step` function. In this way, we could separate the gradients
# and parameters and allow multithreaded trainer to update the
# parameters without data traces on accumulating to the same .grad.
# NOTE: This should be only used by distributed optimizer internals
# and not meant to expose to the user.
@torch.jit.script
class _FunctionalSGD(object):
def __init__(
self,
params: List[Tensor],
lr: float = 1e-2,
momentum: float = 0.0,
dampening: float = 0.0,
weight_decay: float = 0.0,
nesterov: bool = False
):
self.defaults = {
"lr": lr,
"momentum": momentum,
"dampening": dampening,
"weight_decay": weight_decay,
}
self.nesterov = nesterov
self.state = torch.jit.annotate(Dict[torch.Tensor, Dict[str, torch.Tensor]], {})
# NOTE: we only have one param_group and don't allow user to add additional
# param group as it's not a common use case.
self.param_group = {"params": params}
def step(self, gradients: List[Optional[Tensor]]):
params = self.param_group['params']
grads = []
momentum_buffer_list: List[Optional[Tensor]] = []
lr = self.defaults['lr']
weight_decay = self.defaults['weight_decay']
momentum = self.defaults['momentum']
dampening = self.defaults['dampening']
for param, gradient in zip(params, gradients):
if gradient is not None:
grads.append(gradient)
if param not in self.state:
self.state[param] = {}
state = self.state[param]
if 'momentum_buffer' not in state:
momentum_buffer_list.append(None)
else:
momentum_buffer_list.append(state['momentum_buffer'])
with torch.no_grad():
F.sgd(params,
grads,
momentum_buffer_list,
weight_decay=weight_decay,
momentum=momentum,
lr=lr,
dampening=dampening,
nesterov=self.nesterov)
# update momentum_buffers in state
for i, p in enumerate(params):
state = self.state[p]
momentum_buffer = momentum_buffer_list[i]
if momentum_buffer is not None:
state['momentum_buffer'] = momentum_buffer
这部分代码主要对应了:分布式优化器在每个 worker 节点上创建其本地Optimizer的实例,并将持有这些本地优化器的 RRef。具体结合我们之前示例代码来看,params_rref 就是需要优化的参数列表,每个会对应一个优化器,就是 DistributedOptimizer 生成了所有节点上的优化器,以 rpc.RRef(_LocalOptimizer) 形式保存在 self.remote_optimizers 之中。
def __init__(self, optimizer_class, params_rref, *args, **kwargs):
per_worker_params_rref = defaultdict(list)
for param in params_rref: #
per_worker_params_rref[param.owner()].append(param) # [owner] = param
# 拿到对应的本地优化器类
if optimizer_class in DistributedOptimizer.functional_optim_map and jit._state._enabled:
optim_ctor = DistributedOptimizer.functional_optim_map.get(optimizer_class)
else:
optim_ctor = optimizer_class
self.is_functional_optim = (optim_ctor != optimizer_class)
if self.is_functional_optim:
optimizer_new_func = _new_script_local_optimizer
else:
optimizer_new_func = _new_local_optimizer # 下面会介绍
remote_optim_futs = []
for worker, param_rrefs in per_worker_params_rref.items():
remote_optim_rref_fut = rpc.rpc_async(
worker, # 在 worker 之上生成其本地优化器
optimizer_new_func, # rpc_async 会调用
args=(optim_ctor, param_rrefs) + args,
kwargs=kwargs,
)
remote_optim_futs.append(remote_optim_rref_fut)
self.remote_optimizers = _wait_for_all(remote_optim_futs) # 本地保存的远端各个节点上优化器
_new_local_optimizer
是生成了_LocalOptimizer
。
def _new_local_optimizer(optim_cls, local_params_rref, *args, **kwargs):
return rpc.RRef(
_LocalOptimizer(optim_cls, local_params_rref, *args, **kwargs))
_LocalOptimizer 是本地优化器,其运行在远端worker节点之上,master 拥有这些优化器的代理。
class _LocalOptimizer(object):
# Ideally we would only need to share a lock for instances of
# _LocalOptimizer that deal with the same parameters. We are
# making a simplifying assumption here that if there is more
# than one instance of _LocalOptimizer per worker, they will
# be optimizing the same parameters (e.g. each data parallel
# trainer will create its own instance of _LocalOptimizer but
# they will all optimize the same parameters on each worker)
global_lock = Lock()
def __init__(self, optim_cls, local_params_rref, *args, **kwargs):
self._local_params = [rref.local_value() for rref in local_params_rref]
self.optim = optim_cls( # 优化器还是普通的优化器,因为优化器代码还是之前的,只是优化的参数对象变成了异地节点参数
self._local_params, # 用参数代理初始化
*args,
**kwargs)
def step(self, autograd_ctx_id):
# 获取到分布上下文里面计算好的梯度
all_local_grads = dist_autograd.get_gradients(autograd_ctx_id)
with _LocalOptimizer.global_lock:
for param, grad in all_local_grads.items():
param.grad = grad
self.optim.step() # 参数优化
用 _wait_for_all 等待异步完成。
def _wait_for_all(rpc_futs):
# TODO: improve error propagation
exception = None
results = []
for fut in rpc_futs:
try:
results.append(fut.wait())
except Exception as e:
results.append(e)
exception = e
if exception is not None:
raise exception
return results
对应的逻辑如下:
+----------------------------------+
+--------------------------------------------+ | Node 2 worker 1|
| Node 1 master | | |
| | | +--------------------------+ |
| | | | _LocalOptimizer | |
| +---------------------------------+ | | | | |
| | DistributedOptimizer | | | | | |
| | | | | | optim = _FunctionalSGD | |
| | | | | | | |
| | remote_optimizers = [ | | | | _local_params = rref1 | |
| | optim_rref1 +------------------------> | + | |
| | , | | | | | | |
| | optim_rref2 +-------+ | | +--------------------------+ |
| | ] | | | | | |
| | | | | | v |
| | | | | +--------------> torch.rand((3, 3)) |
| | | | | | | |
| +---------------------------------+ | | | +----------------------------------+
| | | |
| | | | +-----------------------------------+
| | | | | Node 3 worker 2 |
| | | | | |
| | | | | +--------------------------+ |
| | | | | | _LocalOptimizer | |
| | | | | | | |
| +-----------------> | | |
| | | | | optim = _FunctionalSGD | |
| | | | | | |
| rref1 +------------+ | | _local_params = rref2 | |
| | | | + | |
| | | | | | |
| rref2 +------------+ | +--------------------------+ |
| | | | | |
| | | | | |
| | | | v |
| | +---------------> torch.rand((3, 3)) |
| | | |
+--------------------------------------------+ +-----------------------------------+
DistributedOptimizer 在优化时候,会遍历保存的优化器,逐一调用 _local_optimizer_step。
为什么可以在Node 1 之上统一调用这些远端优化器?因为最后更新所有参数完毕之后,才能调用下一轮前向传播,所以可以统一调用然后等待都完成。
def step(self, context_id):
"""
Performs a single optimization step.
This will call :meth:`torch.optim.Optimizer.step` on each worker
containing parameters to be optimized, and will block until all workers
return. The provided ``context_id`` will be used to retrieve the
corresponding :class:`~torch.distributed.autograd.context` that
contains the gradients that should be applied to the parameters.
Args:
context_id: the autograd context id for which we should run the
optimizer step.
"""
dist_autograd._is_valid_context(context_id)
if self.is_functional_optim:
optimizer_step_func = _script_local_optimizer_step
else:
optimizer_step_func = _local_optimizer_step #
rpc_futs = []
for optimizer in self.remote_optimizers: # 遍历 _LocalOptimizer
rpc_futs.append(rpc.rpc_async( # 异步异地调用
optimizer.owner(),
optimizer_step_func, # 逐一调用
args=(optimizer, context_id),
))
_wait_for_all(rpc_futs)
_local_optimizer_step 就是得到 _LocalOptimizer,然后调用其 step。
def _local_optimizer_step(local_optim_rref, autograd_ctx_id):
local_optim = local_optim_rref.local_value()
local_optim.step(autograd_ctx_id)
_LocalOptimizer 的 step 首先获取分布式梯度,然后用这个梯度进行参数优化。
class _LocalOptimizer(object):
def step(self, autograd_ctx_id):
# 获取到分布上下文里面计算好的梯度
all_local_grads = dist_autograd.get_gradients(autograd_ctx_id)
with _LocalOptimizer.global_lock:
for param, grad in all_local_grads.items():
param.grad = grad
self.optim.step() # 参数优化
get_gradients 的 Python 代码其实没有意义。
def get_gradients(context_id): # real signature unknown; restored from __doc__
"""
get_gradients(context_id: int) -> Dict[Tensor, Tensor]
Retrieves a map from Tensor to the appropriate gradient for that Tensor
accumulated in the provided context corresponding to the given ``context_id``
as part of the distributed autograd backward pass.
Arguments:
context_id(int): The autograd context id for which we should retrieve the
gradients.
Returns:
A map where the key is the Tensor and the value is the associated gradient
for that Tensor.
Example::
>>> import torch.distributed.autograd as dist_autograd
>>> with dist_autograd.context() as context_id:
>>> t1 = torch.rand((3, 3), requires_grad=True)
>>> t2 = torch.rand((3, 3), requires_grad=True)
>>> loss = t1 + t2
>>> dist_autograd.backward(context_id, [loss.sum()])
>>> grads = dist_autograd.get_gradients(context_id)
>>> print(grads[t1])
>>> print(grads[t2])
"""
return {}
其对应 C++ 的位于 torch/csrc/jit/runtime/register_distributed_ops.cpp。是调用了上下文的函数。
// Implementations located in
// torch/csrc/jit/runtime/register_distributed_ops.cpp
TORCH_LIBRARY_IMPL(aten, CatchAll, m) {
m.impl("get_gradients", [](int64_t context_id) {
const auto& autogradContext =
dist_autograd::DistAutogradContainer::getInstance().retrieveContext(
context_id);
return autogradContext->getGradients(); // 上下文
});
}
C++世界的 getGradients 代码如下:
const c10::Dict<torch::Tensor, torch::Tensor> DistAutogradContext::
getGradients() const {
std::lock_guard<std::mutex> guard(lock_);
// block current streams before accessing gradients to make sure that
// gradient computations are finished before use.
for (auto& entry : gradReadyEvents_) {
auto& event = entry.second;
event.block(impl_.getStream(event.device()));
}
return accumulatedGrads_; // 分布式梯度累积在这里
}
在 torch/csrc/distributed/autograd/context/context.h之中有:
// DistAutogradContext which stores information for a single distributed
// autograd pass on a worker.
class TORCH_API DistAutogradContext {
// Gradients accumulated in this context so far. The key is the variable on
// which the gradient needs to be accumulated and the value is the gradient
// that needs to be accumulated on that variable..
c10::Dict<torch::Tensor, torch::Tensor> accumulatedGrads_;
所以我们逻辑拓展如下:
_LocalOptimizer
分别获得对本地 _local_params_
进行优化。_Node DistAutogradContext
之中的accumulatedGrads_
累积。这样,整个模型的各个子模型就在各个 Node 之上以统一的步骤进行训练/优化。
+--------------------------------------+
| Node 2 worker 1 |
| |
| +--------------------------+ |
| | DistAutogradContext | |
| | | 3 |
| | accumulatedGrads_ <------+ |
+-----------------------------------------+ | | | | |
| Node 1 master | | +--------------------------+ | |
| | | +--------------------------+ | |
| +-------------------------------+ | +--------> | _LocalOptimizer | | |
| | DistributedOptimizer | | | | | | | |
| | | | | | | optim = _FunctionalSGD | | |
| | | | | | | | | |
| | remote_optimizers = [ | | | | | _local_params = rref1 | | |
| | optim_rref1 +---------------+ | | + | | |
| | , | | +---------> step() +-------------------+ |
| | optim_rref2 +-------+ | | | | | | |
| | | | | | | +--------------------------+ |
| | ] +----------------->+ | 2 | |
| | | | | | | v |
| | | | | | +----------------> torch.rand((3, 3)) |
| | 1 | | | | | | |
| | step() { | | | | | +--------------------------------------+
| | | | | | |
| | optim_rref1.step()+--+ | | | | +--------------------------------------+
| | | | | | | Node 3 worker 2 |
| | optim_rref2.step()+--+ | | | | | |
| | | | | | | | +--------------------------+ |
| | } | | | | | | | _LocalOptimizer | |
| | | | | | | | | | |
| +-------------------------------+ +-----------------> | | |
| | | | | | optim = _FunctionalSGD | |
| | | | | | | |
| 1 | | | | | _local_params = rref2 | |
| | | | | | + | 3 |
| +-----------------------------> step() +------------------v |
| | | | | | | | |
| rref1 +-------------+ | +--------------------------+ | |
| | | 2 | | |
| | | v | |
| rref2 +-------------------------------> torch.rand((3, 3)) | |
| | | | |
+-----------------------------------------+ | +--------------------------+ | |
| | DistAutogradContext | | |
| | | | |
| | accumulatedGrads_ <-----+ |
| | | |
| +--------------------------+ |
+--------------------------------------+
最后,我们来看看 PipeDream,看看它是怎么实现分布式优化器的,我们探寻的思路是:
我们先提前说一下:
我们需要从头梳理。
来到 runtime/translation/main_with_runtime.py。这里首先构建一个 StageRuntime,然后用 StageRuntime 的参数来构建优化器。
def main():
r = runtime.StageRuntime(
model=model, distributed_backend=args.distributed_backend,
fp16=args.fp16, loss_scale=args.loss_scale,
training_tensor_shapes=training_tensor_shapes,
eval_tensor_shapes=eval_tensor_shapes,
training_tensor_dtypes=dtypes,
inputs_module_destinations=inputs_module_destinations,
target_tensor_names=target_tensor_names,
configuration_maps=configuration_maps,
master_addr=args.master_addr,
rank=args.rank, local_rank=args.local_rank,
num_ranks_in_server=args.num_ranks_in_server,
verbose_freq=args.verbose_frequency,
model_type=runtime.TRANSLATION,
enable_recompute=args.recompute)
if use_adam_optimizer:
optimizer = adam.AdamWithWeightStashing(
modules=r.modules(), master_parameters=r.master_parameters,
model_parameters=r.model_parameters, loss_scale=args.loss_scale,
num_versions=num_versions, lr=args.lr, betas=(0.9,0.999),
weight_decay=args.weight_decay, verbose_freq=args.verbose_frequency,
macrobatch=args.macrobatch)
else:
optimizer = sgd.SGDWithWeightStashing(
modules=r.modules(), master_parameters=r.master_parameters,
model_parameters=r.model_parameters, loss_scale=args.loss_scale,
num_versions=num_versions, lr=args.lr, momentum=args.momentum,
weight_decay=args.weight_decay, verbose_freq=args.verbose_frequency)
StageRuntime 的 initialize 函数会构建 module,这里通过本 node 的stage 来构建自己的 modules。
我们从前面文章中摘录。
stage_to_module_map 就是设置 stage 到 modules 的关系,目的是为了得到本stage所对应的modules。 本stage(数值为 3)对应的是 index 为 3,4 的两个 module,就是下面的 3 ,3. module_to_stage_map = {list: 5} [0, 1, 2, 3, 3]
具体代码是:
def initialize(self, model, inputs_module_destinations,
configuration_maps, master_addr, rank,
local_rank, num_ranks_in_server):
if module_to_stage_map is None:
self.modules_with_dependencies = ModulesWithDependencies(model)
else:
# 依据本stage来找到自己的modules。
modules = stage_to_module_map[self.stage]
self.modules_with_dependencies = ModulesWithDependencies(
[model[module] for module in modules])
# 确定哪些模型layers
modules = self.modules_with_dependencies.modules()
# 拿到 master_parameters 和 model_parameters
if self.fp16:
self.master_parameters = []
self.model_parameters = []
for i in range(len(modules)):
import apex.fp16_utils as fp16_utils
module_parameters, module_master_parameters = \
fp16_utils.prep_param_lists(modules[i])
self.master_parameters.extend(module_master_parameters)
self.model_parameters.extend(module_parameters)
else:
self.master_parameters = list(self.parameters())
self.model_parameters = None
比如模型被分配到两个node之上,每个node两个layers,这里 Node 2有一个DDP数据并行。
每个 Node 的模型参数就是不同的,Node 1 的待优化参数是 Layer 1,Layer 2 的参数;Node 2 的待优化参数是 Layer 3,Layer 4 的参数。
Node 2
+----------------------------------------+
| Stage 2 StageRuntime |
| |
Node 1 | CommunicationHandler |
+---------------------------------------+ | |
| Stage 1 StageRuntime | | +----------------------------+ |
| | | | +------------------------+ | |
| | | | |Rank 2 | | |
| CommunicationHandler | | | | | | |
| | | | | | | |
| +-----------------------+ | | | | Layer 3 +---> Layer 4 | | |
| |Rank 1 | | | | | | | |
| | | | | DDP | | | | |
| | Layer 1 +---> Layer 2 | +----------->+ +------------------------+ | |
| | | | | | +------------------------+ | |
| | | | | | |Rank 3 | | |
| +-----------------------+ | | | | | | |
| | | | | | | |
| master_parameters = Parameters( | | | | Layer 3 +---> Layer 4 | | |
| Layer 1, Layer 2) | | | | | | |
| | | | | | | |
| model_parameters | | | +------------------------+ | |
| | | +----------------------------+ |
+---------------------------------------+ | |
| |
| master_parameters = Parameters( |
| Layer 3, Layer 4) |
| |
| |
| model_parameters |
| |
+----------------------------------------+
然后用 runtime 的 master_parameters 和 model_parameters 构建本地优化器 SGDWithWeightStashing。
OptimizerWithWeightStashing 是 SGDWithWeightStashing 的基类。
class SGDWithWeightStashing(OptimizerWithWeightStashing): # 基类
"""
SGD optimizer with weight stashing.
"""
def __init__(self, modules, master_parameters, model_parameters,
loss_scale, num_versions, lr=required, momentum=0,
dampening=0, weight_decay=0, nesterov=False, verbose_freq=0,
macrobatch=False):
super(SGDWithWeightStashing, self).__init__(
optim_name='SGD',
modules=modules, master_parameters=master_parameters,
model_parameters=model_parameters, loss_scale=loss_scale,
num_versions=num_versions, lr=lr, momentum=momentum,
dampening=dampening, weight_decay=weight_decay,
nesterov=nesterov, verbose_freq=verbose_freq,
macrobatch=macrobatch,
)
基类 OptimizerWithWeightStashing 会生成一个原生优化器,赋值在 base_optimizer。
class OptimizerWithWeightStashing(torch.optim.Optimizer):
"""Wrapper class that adds weight stashing to a vanilla torch.optim.Optimizer.
Arguments:
- optim_name: the name of optimizer, required to create the corresponding
base_optimizer (torch.optim.{optim_name}).
- optimizer_args: the keyword arguments passed to base_optimizer.
"""
def __init__(self, optim_name, modules, master_parameters, model_parameters,
loss_scale, num_versions, verbose_freq=0, macrobatch=False,
**optimizer_args):
self.modules = modules
self.master_parameters = master_parameters
self.model_parameters = model_parameters # model_parameters is None if not fp16.
self.loss_scale = loss_scale
# Only need at most 2 versions if using macrobatching.
if macrobatch:
num_versions = min(2, num_versions)
self.num_versions = num_versions
# 生成一个原生优化器
self.base_optimizer = getattr(torch.optim, optim_name)(
master_parameters, **optimizer_args)
self.latest_version = Version()
self.current_version = Version()
self.initialize_queue()
self.verbose_freq = verbose_freq
self.batch_counter = 0
# If macrobatching, push and pop versions at the right rate.
if macrobatch:
self.update_interval = self.num_versions
else:
self.update_interval = 1
逻辑拓展如下,每个优化器使用自己 Node 的参数进行优化。
+----------------------------------------+
| Stage 2 StageRuntime |
| |
| CommunicationHandler |
+---------------------------------------+ | |
| Stage 1 StageRuntime | | +----------------------------+ |
| | | | +------------------------+ | |
| | | | |Rank 2 | | |
| CommunicationHandler | | | | | | |
| | | | | | | |
| +-----------------------+ | | | | Layer 3 +---> Layer 4 | | |
| |Rank 1 | | | | | | | |
| | | | | DDP | | | | |
| | Layer 1 +---> Layer 2 | +----------->+ +------------------------+ | |
| | | | | | +------------------------+ | |
| | | | | | |Rank 3 | | |
| +-----------------------+ | | | | | | |
| | | | | | | |
| master_parameters = Parameters( | | | | Layer 3 +---> Layer 4 | | |
| Layer 1, Layer 2) | | | | | | |
| + | | | | | | |
| model_parameters | | | | +------------------------+ | |
| | | | +----------------------------+ |
| +---------------------------------+ | | |
| |SGDWithWeightStashing | | | | |
| | | | | | master_parameters = Parameters( |
| | base_optimizer = SGB( v | | | Layer 3, Layer 4) |
| | master_parameters) | | | + |
| | | | | model_parameters | |
| +---------------------------------+ | | | |
| | | +----------------------------------+ |
+---------------------------------------+ | |SGDWithWeightStashing | | |
| | | | |
| | base_optimizer = SGB( v | |
| | master_parameters) | |
| +----------------------------------+ |
| |
+----------------------------------------+
整体是异步运行,也就是异步优化。
def train(train_loader, r, optimizer, epoch):
# 省略其他
# start num_warmup_minibatches forward passes
for i in range(num_warmup_minibatches):
r.run_forward()
for i in range(n - num_warmup_minibatches):
# perform forward pass
r.run_forward()
# perform backward pass
if args.fp16:
r.zero_grad()
else:
optimizer.zero_grad()
optimizer.load_old_params()
r.run_backward()
optimizer.load_new_params()
optimizer.step()
# finish remaining backward passes
for i in range(num_warmup_minibatches):
optimizer.zero_grad()
optimizer.load_old_params()
r.run_backward()
optimizer.load_new_params()
optimizer.step()
# wait for all helper threads to complete
r.wait()
优化直接使用 SGDWithWeightStashing 的 step 方法。其最后也是 class OptimizerWithWeightStashing(torch.optim.Optimizer) 的 step 方法。
def step(self, closure=None):
"""Performs a single optimization step.
Arguments:
closure (callable, optional): A closure that reevaluates the model
and returns the loss.
"""
# Update the gradient every `update_interval` steps.
if self.batch_counter % self.update_interval != self.update_interval - 1:
self.batch_counter += 1
return None
if self.model_parameters is not None:
import apex.fp16_utils as fp16_utils
fp16_utils.model_grads_to_master_grads(self.model_parameters,
self.master_parameters)
if self.loss_scale != 1.0:
# 处理梯度
for parameter in self.master_parameters:
parameter.grad.data = parameter.grad.data / self.loss_scale
for p in self.param_groups[0]['params']:
if p.grad is not None: # 继续处理累积的梯度
p.grad.div_(self.update_interval)
loss = self.base_optimizer.step() # 进行优化
if self.model_parameters is not None:
import apex.fp16_utils as fp16_utils
fp16_utils.master_params_to_model_params(self.model_parameters,
self.master_parameters)
self.latest_version = self.latest_version.incr()
if self.num_versions > 1:
self.buffered_state_dicts = self.queue[0][0]
self.queue.append(self.get_params(clone=False))
self.batch_counter += 1
return loss
具体如下:
Node 2
+-----------------------------------------+
| Stage 2 StageRuntime |
| |
Node 1 | CommunicationHandler |
+-----------------------------------------+ | |
| Stage 1 StageRuntime | | +----------------------------+ |
| | | | +------------------------+ | |
| | | | |Rank 2 | | |
| CommunicationHandler | | | | | | |
| | | | | | | |
| +-----------------------+ | | | | Layer 3 +---> Layer 4 | | |
| |Rank 1 | | | | | | | |
| | | | | DDP | | | | |
| | Layer 1 +---> Layer 2 | +---------->+ +------------------------+ | |
| | | | | | +------------------------+ | |
| | | | | | |Rank 3 | | |
| +-----------------------+ | | | | | | |
| | | | | | | |
| master_parameters = Parameters( | | | | Layer 3 +---> Layer 4 | | |
| Layer 1, Layer 2) | | | | | | |
| + | | | | | | |
| model_parameters | | | | +------------------------+ | |
| | | | +----------------------------+ |
| step() | | | |
| + | | | |
| | | | | master_parameters = Parameters( |
| | +-------------------------------+ | | Layer 3, Layer 4) |
| | |SGDWithWeightStashing | | | | + |
| | | | | | | model_parameters | |
| | | base_optimizer = SGB( v | | | | |
| | | master_parameters) | | | step() | |
| | | | | | + | |
| +----> base_optimizer.step() | | | | | |
| | | | | | +-------------------------------+ |
| +-------------------------------+ | | | |SGDWithWeightStashing | | |
| | | | | | | |
+-----------------------------------------+ | | | base_optimizer = SGB( v | |
| | | master_parameters) | |
| | | | |
| +------> base_optimizer.step() | |
| | | |
| +-------------------------------+ |
| |
+-----------------------------------------+
至此,分布式优化器系列完成,在后续分析ZeRO时候,我们还会介绍 PyTorch ZeroRedundancyOptimizer,估计要等待几周之后了。我们从下一篇开始,介绍 PyTorch 分布式 的几个官方文档应用例子,以此来把 PyTorch 分布式整个逻辑串联起来看看在实际之中应该如何应用,敬请期待。
torch.optim.optimizer源码阅读和灵活使用
pytorch 优化器(optim)不同参数组,不同学习率设置的操作
各种优化方法总结比较(sgd/momentum/Nesterov/adagrad/adadelta)
【优化器】优化器算法及PyTorch实现(一):永不磨灭的SGD
Pytorch学习笔记08----优化器算法Optimizer详解(SGD、Adam)
pytorch中使用torch.optim优化神经网络以及优化器的选择 - pytorch中文网
https://developer.nvidia.com/gpudirect