前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Pytorch分布式训练

Pytorch分布式训练

作者头像
iResearch666
发布2023-09-13 14:13:32
1.1K0
发布2023-09-13 14:13:32
举报
文章被收录于专栏:AI算法能力提高班

Pytorch分布式训练

Dataset

  • next(iter(dataloader))返回一个batch的数据 , 等价于IterableDataset
  • 可以用 pytorch IterableDataset + python generator function(yield) 来解决,这样按需读取数据,常用的 dataset 是一次全部加载。IterableDataset 因为按需读取,就没法用 len, index 等功能
  • Pytorch提供了两种思路去构建Dataset:
    • Map式数据集: 将整个数据集读取到内存中,通过index映射的方式读取对应的数据,优点速度快,缺点占用内存,大的数据集是无法使用。
    • Iterable式数据集:无需将整个数据集读取到内存中,通过覆盖写iter迭代器的方式实现的形式输入数据。无需满足内存大于整个数据集,也无需知道全部数据的大小。

简单来说,如果数据集较小时推荐尽量使用Map式Dataset,数据量过大、数据量未知、训练内存无法满足时只能使用Iterable式构建Dataset。

  • iterable Dataset 在分布式训练

在分布式时训练中数据并行的时,每块GPU都有一个独立的model和独立的进程(DDP模式)去训练完整数据的子集,在Pytorch中的DDP模式是通过DistributedSampler()去实现在分布式并行训练时每个模型读取是整个数据集上不同部分,从而避免训练时取数据发生重复。比较坑的是:DistributedSampler()主要通过切片的方式将整个数据集分成独立的子集。Map 式 Dataset无需再多写代码直接可以适用。而iterable式 Dataset是无法切片。

注意,如果它是一个带有一些随机性的 torch.utils.data.IterableDataset ,并且你是以分布式方式进行训练的,你的 iterable dataset 要么使用一个内部的 attribute generator ,该generator 是一个 torch.Generator 用于随机化,且在所有进程上必须是相同的(并且 Trainer 将在每个 epoch 手动设置该 generator 的种子);要么有一个 set_epoch() 方法,在该方法内部设置所用随机数生成器的种子。

  • pytorch DistributedSampler的实现
代码语言:javascript
复制
def __iter__(self) -> Iterator[T_co]:
    if self.shuffle:
        # deterministically shuffle based on epoch and seed
        g = torch.Generator()
        g.manual_seed(self.seed + self.epoch)
        indices = torch.randperm(len(self.dataset), generator=g).tolist()  # type: ignore
    else:
        indices = list(range(len(self.dataset)))  # type: ignore
代码语言:javascript
复制
from torch.utils.data import IterableDataset

class IterableDataset(IterableDataset):
    def __init__(self, file_path):
        self.file_path = file_path

    def __iter__(self):
        with open(self.file_path, "r") as f:
            for line in f.readlines():
                items = line.split("\t")
                # process data 
                .......
                yield {"input1": input1, "input2": input2, "input3": input3}

数据并行:因为求导以及加和都是线性的,数据并行在数学上也有效

  • 一个dataset划分为若干个batch
  • 每个GPU复制一份模型
  • 将每个batch平均分配到所有GPU上并行运算

image-20230817103103621

DP

  • 使用
代码语言:javascript
复制
model = nn.DataParallel(model)
  • DataParallel(DP)
  • 适用单机,不适用多机
  • 优点:一行代码即可
  • 缺点

image-20230817104337458

  • 单进程多线程
  • Global Interpreter Lock (GIL)全局解释器锁:一个 Python 进程只能利用一个 CPU kernel,即单核多线程并发时,只能执行一个线程。考虑多核,多核多线程可能出现线程颠簸 (thrashing) 造成资源浪费,所以 Python 想要利用多核最好是多进程
  • 负载不均衡,即存在主次模型(主模型需要整合其它次模型的梯度进行参数更新),主模型负载更大;
  • 通信开销大

  • 过程( 比如device[0]为主模型,其它为次模型)
    • 过程一(图中红色部分):各卡分别计算损失和梯度
    • 过程二(图中蓝色部分):所有梯度整合到 device[0]
    • 过程三(图中绿色部分):device[0] 进行参数更新,其他卡拉取 device[0] 的参数进行更新

image-20230817103746077

  • 采用PS架构(Parameter Server)
  • DP源码
代码语言:javascript
复制
class DataParallel(Module):

    def __init__(self, module, device_ids=None, output_device=None, dim=0):
        super(DataParallel, self).__init__()

        # 检查是否有可用的 GPU
        device_type = _get_available_device_type()
        if device_type is None:
            self.module = module
            self.device_ids = []
            return
                # 默认使用所有可见的 GPU
        if device_ids is None:
            device_ids = _get_all_device_indices()

                # 默认 server 是 device_ids 列表上第一个
        if output_device is None:
            output_device = device_ids[0]

        self.dim = dim
        self.module = module
        self.device_ids = list(map(lambda x: _get_device_index(x, True), device_ids))
        self.output_device = _get_device_index(output_device, True)
        self.src_device_obj = torch.device(device_type, self.device_ids[0])

        # 检查负载是否平衡, 不平衡(指内存或者处理器 max/min > 0.75 会有警告)
        _check_balance(self.device_ids)

        # 单卡
        if len(self.device_ids) == 1:
            self.module.to(self.src_device_obj)

    def forward(self, *inputs, **kwargs):

        # 没 GPU 可用
        if not self.device_ids:
            return self.module(*inputs, **kwargs)

                # 运行前 GPU device_ids[0] (即我们的 server )上必须有 parallelized module 的parameters 和 buffers
        # 因为 DP 保证 GPU device_ids[0] 和 base parallelized module 共享存储
        # 所以在device[0] 上的 in-place 更新也会被保留下来,其他的则不会

        for t in chain(self.module.parameters(), self.module.buffers()):
            if t.device != self.src_device_obj:
                raise RuntimeError("module must have its parameters and buffers "
                                   "on device {} (device_ids[0]) but found one of "
                                   "them on device: {}".format(self.src_device_obj, t.device))

                # nice 现在 device[0] 上已经有了 module 和 input, 接下来我们就要开始 PS 算法了
        # 可以开始看正文了

        inputs, kwargs = self.scatter(inputs, kwargs, self.device_ids)

        # 如果仅有单卡可用,直接单卡计算,不用并行
        if len(self.device_ids) == 1:
            return self.module(*inputs[0], **kwargs[0])

        replicas = self.replicate(self.module, self.device_ids[:len(inputs)])
        outputs = self.parallel_apply(replicas, inputs, kwargs)
        return self.gather(outputs, self.output_device)

    def replicate(self, module, device_ids):
        return replicate(module, device_ids, not torch.is_grad_enabled())

    def scatter(self, inputs, kwargs, device_ids):
        return scatter_kwargs(inputs, kwargs, device_ids, dim=self.dim)

    def parallel_apply(self, replicas, inputs, kwargs):
        return parallel_apply(replicas, inputs, kwargs, self.device_ids[:len(replicas)])

    def gather(self, outputs, output_device):
        return gather(outputs, output_device, dim=self.dim)

DDP

基本概念
  • group: 进程组,一般就需要一个默认的
  • world size: 所有的进程数量
  • rank: 全局的进程id
  • local rank:某个节点上的进程id
  • local_word_size: 某个节点上的进程数 (相对比较少见)

假设所有进程数即 world_size为W,每个节点上的进程数即local_world_size为L,则每个进程上的两个ID:

  • rank的取值范围:[0, W-1],rank=0的进程为主进程,会负责一些同步分发的工作
  • local_rank的取值:[0, L-1]
  • 假定有2个机器或者节点,每个机器上有4块GPU。图中一共有4个进程,即world_size=4,那这样每个进程占用两块GPU,其中rank就是[0,1,2,3],每个节点的local_rank就是[0,1]了(4个进程,2个节点,平均每个节点2个进程),其中local_world_size 也就是2。这里需要注意的是,local_rank是隐式参数,即torch自动分配的。比如local_rank可以通过自动注入命令行参数或者环境变量来获得)** 。
  • 通常--nproc_per_node等于显卡数,也就是一个显卡分配一个进程
代码语言:javascript
复制
import torch.distributed as dist
import argparse, os

parser = argparse.ArgumentParser()
parser.add_argument("--local_rank", type=ine, default=0)
args = parser.parse_args()

dist.init_process_group("nccl")
rank = dist.get_rank()
local_rank_arg = args.local_rank               # 命令行形式ARGS形式
local_rank_env = int(os.environ['LOCAL_RANK']) # 在利用env初始ENV环境变量形式
local_world_size = int(os.environ['LOCAL_WORLD_SIZE'])

print(f"{rank=}; {local_rank_arg=}; {local_rank_env=}; {local_world_size=}")
代码语言:javascript
复制
$ python3 -m torch.distributed.launch --nproc_per_node=4 test.py
rank=2; local_rank_arg=2; local_rank_env=2, local_world_size=4
rank=0; local_rank_arg=0; local_rank_env=0, local_world_size=4
rank=3; local_rank_arg=3; local_rank_env=3, local_world_size=4
rank=1; local_rank_arg=1; local_rank_env=1, local_world_size=4
初始化
代码语言:javascript
复制
torch.distributed.init_process_group(backend, init_method=None, world_size=-1, rank=-1, store=None,...)

backend

  • torch提供了NCCL, GLOO,MPI三种可用的后端
  • CPU的分布式训练选择GLOO, GPU的分布式训练就用NCCL即可

init_method

  • 显式指定init_method,可以是TCP连接、File共享文件系统、ENV环境变量三种方式
  • 显式指定store,同时指定world_size 和 rank参数。这里的store是一种分布式中核心的key-value存储,用于不同的进程间共享信息。

这两种方法是互斥的,其实本质上第一种方式是对第二种的一个更高的封装,最后都要落到store上进行实现。如果这两种方法都没有使用,默认使用init_method='env'的方式来初始化。

三种init_method

  • init_method='tcp://ip:port':通过指定rank 0(即:MASTER进程)的IP和端口,各个进程进行信息交换。需指定 rank 和 world_size 这两个参数。
  • init_method='file://path':通过所有进程都可以访问共享文件系统来进行信息共享。需要指定rank和world_size参数。
  • init_method=env://:从环境变量中读取分布式的信息(os.environ),主要包括 MASTER_ADDR, MASTER_PORT, RANK, WORLD_SIZE。其中,rank和world_size可以选择手动指定,否则从环境变量读取。
运行方法
  1. torch.multiprocessing(python的multiprocessing的封装类)
代码语言:javascript
复制
mp.spawn(fn, args=(), nprocs=1, join=True, daemon=False)
代码语言:javascript
复制
import torch
import torch.distributed as dist
import torch.multiprocessing as mp

def fn(rank, ws, nums):
    dist.init_process_group('nccl', init_method='tcp://127.0.0.1:28765',
                            rank=rank, world_size=ws)
    rank = dist.get_rank()
    print(f"rank = {rank} is initialized")
    torch.cuda.set_device(rank)
    tensor = torch.tensor(nums).cuda()
    print(tensor)

if __name__ == "__main__":
    ws = 2
    mp.spawn(fn, nprocs=ws, args=(ws, [1, 2, 3, 4])) 
代码语言:javascript
复制
rank = 0 is initialized
rank = 1 is initialized
tensor([1, 2, 3, 4], device='cuda:1')
tensor([1, 2, 3, 4], device='cuda:0')

2. launch

代码语言:javascript
复制
$ python3 -m torch.distributed.launch --配置 train.py --args参数

# 通过外部命令运行 
# 通过CUDA_VISIBLE_DEVICES控制可见的卡数
# 通过--nproc_per_node确定使用多少卡
CUDA_VISIBLE_DEVICES="0,1,2,3" python -m torch.distributed.run --nproc_per_node 4 train.py 

常用配置有:
--nnodes: 使用的机器数量,单机的话,就默认是1了
--nproc_per_node: 单机的进程数,即单机的worldsize
--master_addr/port: 使用的主进程rank0的地址和端口
--node_rank: 当前的进程rank

在单机情况下, 只有--nproc_per_node 是必须指定的,--master_addr/port和node_rank都是可以由launch通过环境自动配置
  1. run
代码语言:javascript
复制
$ torchrun --nproc_per_node=4 train.py

命令torchrun来代替torch.distributed.launch

  • 完全使用环境变量配置各类参数,如RANK,LOCAL_RANK, WORLD_SIZE等,尤其是local_rank不再支持用命令行隐式传递的方式
  • 能够更加优雅的处理某个worker失败的情况,重启worker。需要代码中有load_checkpoint(path)save_checkpoint(path) 这样有worker失败的话,可以通过load最新的模型,重启所有的worker接着训练
  • 训练的节点数目可以弹性变化
模型训练

步骤

  • 初始化进程组 dist.init_process_group
  • 设置分布式采样器 DistributedSampler
  • 使用DistributedDataParallel封装模型
  • 使用torchrun 或者 mp.spawn 启动分布式训练

image-20230817150712701

1 分布式训练数据加载

  • Dataloader需要把所有数据分成N份(N为worldsize), 并能正确的分发到不同的进程中,每个进程可以拿到一个数据的子集,不重叠,不交叉。
代码语言:javascript
复制
torch.utils.data.distributed.DistributedSampler(dataset,
    num_replicas=None, rank=None, shuffle=True, seed=0, drop_last=False)
  • dataset: 需要加载的完整数据集
  • num_replicas:把数据集分成多少份,默认是当前dist的world_size
  • rank: 当前进程的id,默认从dist的rank
  • shuffle:是否打乱
  • drop_last: 如果数据长度不能被world_size整除,可以考虑是否将剩下的扔掉
  • seed:随机数种子。这里需要注意,从源码中可以看出,真正的种子其实是 self.seed+self.epoch 这样的好处是,不同的epoch每个进程拿到的数据是不一样,因此需要在每个epoch开始前设置下:sampler.set_epoch(epoch)

其实Sampler的实现也很简单,核心代码就一句:

代码语言:javascript
复制
indices[self.rank: self.total_size: self.num_replicas]

假设4卡12条数据的话,rank=0,1,2,3, num_replicas=4, 那么每个卡取的数据索引就是:

代码语言:javascript
复制
rank0: [0 4 8]; rank1: [1 5 9]; rank2: [2 6 10]; rank3: [3 7 11]

保证不重复不交叉。这样在分布式训练的时候,只需要给Dataloader指定DistributedSampler即可,简单示例如下:

代码语言:javascript
复制
sampler = DistributedSampler(dataset)
loader = DataLoader(dataset, sampler=sampler)
for epoch in range(start_epoch, n_epochs):
  sampler.set_epoch(epoch) # 设置epoch 更新种子
  train(loader)
  • if args.local_rank == -1 表示关闭分布式
代码语言:javascript
复制
train_sampler = RandomSampler(train_dataset) if args.local_rank == -1 else DistributedSampler(train_dataset)

2 模型的分布式训练封装

代码语言:javascript
复制
torch.cuda.set_device(local_rank)
model = Model().cuda()
model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[local_rank])
# 要调用model内的函数或者属性. model.module.xxxx

3 模型保存与加载

  • 训练时候保存模型,只保存rank=0主进程模型,不需要dist.barrior(), all_reduce 操作保证了同步性
代码语言:javascript
复制
# 后面正常训练代码
optimizer = xxx
for epoch:
  train_sampler.set_epoch(epoch)
  for data in Dataloader:
      model(data)
      xxx
    # 训练完成 只需要保存rank 0上的即可
    # 不需要dist.barrior(), all_reduce 操作保证了同步性
  if rank == 0:
     torch.save(model.module.state_dict(), CHECKPOINT_PATH)
代码语言:javascript
复制
# 保存的是参数,不需要DDP包裹
torch.save(model.module.state_dict())
  • 推理时候加载模型,需要barrier()其他保证rank 0保存完成。
代码语言:javascript
复制
model = DistributedDataParallel(model, device_ids=[local_rank])
CHECKPOINT_PATH ="./model.checkpoint"
if rank == 0: # 主进程
  torch.save(ddp_model.state_dict(), CHECKPOINT_PATH)
# barrier()其他保证rank 0保存完成
dist.barrier()
map_location = {"cuda:0": f"cuda:{local_rank}"}
model.load_state_dict(torch.load(CHECKPOINT_PATH, map_location=map_location))

这样在多卡训练时,每个进程有一个model副本和optimizer,使用自己的数据进行训练,之后反向传播计算完梯度的时候,所有进程的梯度会进行all-reduce操作进行同步,进而保证每个卡上的模型更新梯度是一样的,模型参数也是一致的。

这里有一个需要注意的地方,在save和load模型时候,为了减小所有进程同时读写磁盘,一般处理方法是以主进程为主,rank0先save模型,在map到其他进程。这样的另外一个好处,在最开始训练时,模型随机初始化之后,保证了所有进程的模型参数保持一致。

4 损失函数

  • loss.backward() 不变
  • 如果计算loss数值,用下面的all_reduce。或者也可以不用
代码语言:javascript
复制
# 仍然可以直接调用模型的train()方法
# 但是假如要调用其他你自己写的方法,就得model.module.func()
model.train()
for data in dataloader:
    loss = model(data)
    optimizer.zero_grad()
    loss.backward()  # 这个操作自动同步梯度
    optimizer.step()
    
    # 但是仍然需要累加得到所有进程loss的值的和
    dist.all_reduce(loss, op=dist.ReduceOp.SUM)
    # 然后除以并行数,就是这个batch的loss值了
    loss /= world_size
模型推理

一般需要先所有进程的输出结果进行gather,再进行指标的计算,两个常用的函数:

  • dist.all_gather(tensor_list, tensor) : 将所有进程的tensor进行收集并拼接成新的tensorlist返回,比如:dist.all_reduce(tensor, op) 这是对tensor的in-place的操作, 对所有进程的某个tensor进行合并操作,op可以是求和等:
  • 拿到所有进程中模型的输出,最后统一计算指标
代码语言:javascript
复制
pred_list = []
for data in Dataloader:
    pred = model(data)
    batch_pred = [torch.zeros_like(label) for _ in range(world_size)]
    dist.all_gather(batch_pred, pred)
    pred_list.extend(batch_pred)
pred_list = torch.cat(pred_list, 1)
# 所有进程pred_list是一致的,保存所有数据模型预测的值
代码语言:javascript
复制
def test(model, test_loader, device):
    model.eval()
    preds = []
    labels = []
    with torch.no_grad():
        for data in test_loader:
            inputs, truth = data
            inputs = inputs.to(device)
            truth = truth.to(device)
            output = model(**inputs)['logits']
            predict = torch.max(output.data, 1)[1]

            cur_preds = [torch.ones_like(predict) for _ in range(dist.get_world_size())]
            cur_truth = [torch.ones_like(truth) for _ in range(dist.get_world_size())]
            dist.all_gather(cur_preds, predict)
            dist.all_gather(cur_truth, truth)

            preds.extend(cur_preds)
            labels.extend(cur_truth)

    model.train()
    predict = torch.cat(preds, 0)
    labels = torch.cat(labels, 0)
    correct = (predict == labels).sum().item()
    return correct * 1.0 / len(predict)
注意事项
  1. 要把模型和数据放在进程对应的那张卡上
  2. 要使用Sampler来分发训练数据,并且shuffle不设置在Dataloder中而是Sampler中,每个epoch还需要调用Sampler的set_epoch()方法。(需要set_epoch来使用sampleer中shuffle,否则不是随机的)
  3. 训练和验证区分较大,验证一般在主进程中进行一次验证即可,不需要sampler,操作和单卡一样,之后将数据同步给其他进程。
  4. 在多卡时要调用模型的其他方法或者使用单卡的模式,需要用model.module来获得原始模型,同样保存参数时也保存的是model.module的参数而不是DDP包裹的。
使用总结
代码语言:javascript
复制
import argparse
import torch
from torch.nn.parallel import DistributedDataParallel as DDP

parser = argparse.ArgumentParser()
parser.add_argument("--save_dir", default='')
parser.add_argument("--local_rank", default=-1)
parser.add_argument("--world_size", default=1)
args = parser.parse_args()

# 初始化后端 建议NCCL

# world_size 指的是总的并行进程数目
# 比如16张卡单卡单进程 就是 16
# 但是如果是8卡单进程 就是 1
# 等到连接的进程数等于world_size,程序才会继续运行
torch.distributed.init_process_group(backend='nccl',
                                         world_size=ws,
                                         init_method='env://')

torch.cuda.set_device(args.local_rank)

device = torch.device(f'cuda:{args.local_rank}')

model = nn.Linear(2,3).to(device)

# train dataset
# train_sampler
# train_loader

my_trainset = torchvision.datasets.CIFAR10(root='./data', train=True)
# 新增1:使用DistributedSampler,DDP帮我们把细节都封装起来了。用,就完事儿!
#       sampler的原理,后面也会介绍。
train_sampler = torch.utils.data.distributed.DistributedSampler(my_trainset)
# 需要注意的是,这里的batch_size指的是每个进程下的batch_size。也就是说,总batch_size是这里的batch_size再乘以并行数(world_size)。
trainloader = torch.utils.data.DataLoader(my_trainset, batch_size=batch_size, sampler=train_sampler)


# 初始化 DDP,这里我们通过规定 device_id 用了单卡单进程
# 实际上根据我们前面对 parallel_apply 的解读,DDP 也支持一个进程控制多个线程利用多卡
model = DDP(model,
            device_ids=[args.local_rank],
            output_device=args.local_rank).to(device)

for epoch in range(epochs):
    trainloader.sampler.set_epoch(epoch)
    # 后面这部分,则与原来完全一致了。
    for data, label in trainloader:
        prediction = model(data)
        loss = loss_fn(prediction, label)
        loss.backward()
        optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
        optimizer.step()
        
# 保存模型 
if torch.distributed.get_rank() == 0:
  torch.save(model.module.state_dict(),
             'results/%s/model.pth' % args.save_dir)

区别

  • 多进程 和 DP 不同, DDP 采用多进程,最推荐的做法是每张卡一个进程从而避免上一节所说单进程带来的影响。前文也提到了 DP 和 DDP 共用一个 parallel_apply 函数,所以 DDP 同样支持单进程多线程多卡操作,自然也支持多进程多线程,不过需要注意一下 world_size。
  • 通信效率 DP 的通信成本随着 GPU 数量线性增长,而 DDP 支持 Ring AllReduce,其通信成本是恒定的,与 GPU 数量无关。
  • 同步参数 DP 通过收集梯度到 device[0],在device[0] 更新参数,然后其他设备复制 device[0] 的参数实现各个模型同步;DDP 通过保证初始状态相同并且改变量也相同(指同步梯度) ,保证模型同步。

源码

代码语言:javascript
复制
class DistributedDataParallel(Module):       
    def __init__(self, module, device_ids=None,
                 output_device=None, dim=0, broadcast_buffers=True,
                 process_group=None,  
                 bucket_cap_mb=25,       
                 find_unused_parameters=False,       
                 check_reduction=False,      
                 gradient_as_bucket_view=False):

        super(DistributedDataParallel, self).__init__()

        assert any((p.requires_grad for p in module.parameters())), (
            "DistributedDataParallel is not needed when a module "
            "doesn't have any parameter that requires a gradient."
        )

        self.is_multi_device_module = len({p.device for p in module.parameters()}) > 1
        distinct_device_types = {p.device.type for p in module.parameters()}
        assert len(distinct_device_types) == 1, (
            "DistributedDataParallel's input module must be on "
            "the same type of devices, but input module parameters locate in {}."
        ).format(distinct_device_types)
        self.device_type = list(distinct_device_types)[0]

        if self.device_type == "cpu" or self.is_multi_device_module:
            assert not device_ids and not output_device, (
                "DistributedDataParallel device_ids and output_device arguments "
                "only work with single-device GPU modules, but got "
                "device_ids {}, output_device {}, and module parameters {}."
            ).format(device_ids, output_device, {p.device for p in module.parameters()})

            self.device_ids = None
            self.output_device = None
        else:
            # Use all devices by default for single-device GPU modules
            if device_ids is None:
                device_ids = _get_all_device_indices()

            self.device_ids = list(map(lambda x: _get_device_index(x, True), device_ids))

            if output_device is None:
                output_device = device_ids[0]

            self.output_device = _get_device_index(output_device, True)

        if process_group is None:
            self.process_group = _get_default_group()
        else:
            self.process_group = process_group

        self.dim = dim
        self.module = module
        self.device = list(self.module.parameters())[0].device
        self.broadcast_buffers = broadcast_buffers
        self.find_unused_parameters = find_unused_parameters
        self.require_backward_grad_sync = True
        self.require_forward_param_sync = True
        self.ddp_join_enabled = False
        self.gradient_as_bucket_view = gradient_as_bucket_view

        if check_reduction:
            # This argument is no longer used since the reducer
            # will ensure reduction completes even if some parameters
            # do not receive gradients.
            warnings.warn(
                "The `check_reduction` argument in `DistributedDataParallel` "
                "module is deprecated. Please avoid using it."
            )
            pass

        # used for intra-node param sync and inter-node sync as well
        self.broadcast_bucket_size = int(250 * 1024 * 1024)

        #
        # reduction bucket size
        self.bucket_bytes_cap = int(bucket_cap_mb * 1024 * 1024)

        # 保证初始状态一样
        # Sync params and buffers
        self._sync_params_and_buffers(authoritative_rank=0)

        # 下拉看源码
        self._ddp_init_helper()

    def _sync_params_and_buffers(self, authoritative_rank=0):
        module_states = list(self.module.state_dict().values())
        if len(module_states) > 0:
            self._distributed_broadcast_coalesced(
                module_states,
                self.broadcast_bucket_size,
                authoritative_rank)

    def _ddp_init_helper(self):
        """
        Initialization helper function that does the following:

        (1) replicating the module from device[0] to the other devices (前文提到 DDP 也支持一个进程多线程利用多卡,类似 DP ,这时候就会用到第一步)
        (2) bucketing the parameters for reductions (把 parameter 分组,梯度通讯时,先得到梯度的会通讯)
        (3) resetting the bucketing states
        (4) registering the grad hooks (创建管理器)
        (5) passing a handle of DDP to SyncBatchNorm Layer (为 SyncBN 准备)
        """

        def parameters(m, recurse=True):
            def model_parameters(m):
                ps = m._former_parameters.values() \
                    if hasattr(m, "_former_parameters") \
                    else m.parameters(recurse=False)
                for p in ps:
                    yield p

            for m in m.modules() if recurse else [m]:
                for p in model_parameters(m):
                    yield p

        if self.device_ids and len(self.device_ids) > 1:

            warnings.warn(
                "Single-Process Multi-GPU is not the recommended mode for "
                "DDP. In this mode, each DDP instance operates on multiple "
                "devices and creates multiple module replicas within one "
                "process. The overhead of scatter/gather and GIL contention "
                "in every forward pass can slow down training. "
                "Please consider using one DDP instance per device or per "
                "module replica by explicitly setting device_ids or "
                "CUDA_VISIBLE_DEVICES. "
            )

            # only create replicas for single-device CUDA modules
            #
            # TODO: we don't need to replicate params in here. they're always going to
            # be broadcasted using larger blocks in broadcast_coalesced, so it might be
            # better to not pollute the caches with these small blocks
            self._module_copies = replicate(self.module, self.device_ids, detach=True)
            self._module_copies[0] = self.module

            for module_copy in self._module_copies[1:]:
                for param, copy_param in zip(self.module.parameters(), parameters(module_copy)):
                    # Reducer requires param copies have the same strides across replicas.
                    # Fixes up copy_param strides in case replicate didn't match param strides.
                    if param.layout is torch.strided and param.stride() != copy_param.stride():
                        with torch.no_grad():
                            copy_param.set_(copy_param.clone()
                                                      .as_strided(param.size(), param.stride())
                                                      .copy_(copy_param))
                    copy_param.requires_grad = param.requires_grad

        else:
            self._module_copies = [self.module]

        self.modules_params = [list(parameters(m)) for m in self._module_copies]
        self.modules_buffers = [list(m.buffers()) for m in self._module_copies]

        # Build tuple of (module, parameter) for all parameters that require grads.
        modules_and_parameters = [
            [
                (module, parameter)
                for module in replica.modules()
                for parameter in filter(
                    lambda parameter: parameter.requires_grad,
                    parameters(module, recurse=False))
            ] for replica in self._module_copies]

        # Build list of parameters.
        parameters = [
            list(parameter for _, parameter in replica)
            for replica in modules_and_parameters]

        # Checks if a module will produce a sparse gradient.
        def produces_sparse_gradient(module):
            if isinstance(module, torch.nn.Embedding):
                return module.sparse
            if isinstance(module, torch.nn.EmbeddingBag):
                return module.sparse
            return False

        # Build list of booleans indicating whether or not to expect sparse
        # gradients for the corresponding parameters.
        expect_sparse_gradient = [
            list(produces_sparse_gradient(module) for module, _ in replica)
            for replica in modules_and_parameters]

        # The bucket size limit is specified in the constructor.
        # Additionally, we allow for a single small bucket for parameters
        # that are defined first, such that their gradients don't spill into
        # a much larger bucket, adding unnecessary latency after gradient
        # computation finishes. Experiments showed 1MB is a reasonable value.
        bucket_indices = dist._compute_bucket_assignment_by_size(
            parameters[0],
            [dist._DEFAULT_FIRST_BUCKET_BYTES, self.bucket_bytes_cap],
            expect_sparse_gradient[0])

        # Note: reverse list of buckets because we want to approximate the
        # order in which their gradients are produced, and assume they
        # are used in the forward pass in the order they are defined.
        # 管理器
        self.reducer = dist.Reducer(
            parameters,
            list(reversed(bucket_indices)),
            self.process_group,
            expect_sparse_gradient,
            self.bucket_bytes_cap,
            self.find_unused_parameters,
            self.gradient_as_bucket_view)

        # passing a handle to torch.nn.SyncBatchNorm layer
        self._passing_sync_batchnorm_handle(self._module_copies)
完整样例
代码语言:javascript
复制
# -*- coding: utf-8 -*-

import os
import torch
import torch.optim as optim
import torch.distributed as dist
from transformers import BertForSequenceClassification
from torch.utils.data.distributed import DistributedSampler

from utils import bert_name, collate_fn, load_data_and_labels, Data
from torch.utils.data import DataLoader
from sklearn.model_selection import train_test_split
import time

epochs = 15
lr = 2e-5
batch_size = 64


def train():

    # dist init
    dist.init_process_group(backend='nccl', init_method='env://')
    rank = dist.get_rank()
    size = dist.get_world_size()
    local_rank = int(os.environ['LOCAL_RANK'])
    device = torch.device(f"cuda:{local_rank}")
    torch.cuda.set_device(device)

    # dataset
    x_text, y = load_data_and_labels("./data/rt-polarity.pos", "./data/rt-polarity.neg")
    x_train, x_test, y_train, y_test = train_test_split(x_text, y, test_size=0.1)
    train_data = Data(x_train, y_train)
    test_data = Data(x_test, y_test)
    train_sampler = DistributedSampler(train_data)
    train_loader = DataLoader(train_data, batch_size=batch_size*size, collate_fn=collate_fn, sampler=train_sampler)
    test_loader = DataLoader(test_data, batch_size=batch_size*size, shuffle=False,
                             collate_fn=collate_fn, sampler=DistributedSampler(test_data))

    # model
    model = BertForSequenceClassification.from_pretrained(bert_name, num_labels=2)
    model.to(device)
    model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[local_rank])

    optimizer = optim.Adam(model.parameters(), lr=lr*size)
    best_acc = -0.1
    print(f"rank {rank} start training...")
    for epoch in range(1, epochs):
        total_loss = 0.0
        train_sampler.set_epoch(epoch)
        model.train()
        start_time = time.time()
        for step, batch_data in enumerate(train_loader):
            inputs, labels = batch_data
            inputs = inputs.to(device)
            labels = labels.to(device)
            optimizer.zero_grad()
            output = model(**inputs, labels=labels)
            loss = output[0]
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        end_time = time.time()
        acc = test(model, test_loader, device)
        if acc > best_acc:
            best_acc = acc
            if rank == 0:
                print(f"\t Epoch{epoch}: loss: {total_loss:.4f}, acc: {acc:.4f}, time: {(end_time - start_time):.2f}s")

    if rank == 0:
        print("*"*20)
        print(f"finished; best acc: {best_acc:.4f}")


def test(model, test_loader, device):
    model.eval()
    preds = []
    labels = []
    with torch.no_grad():
        for data in test_loader:
            inputs, truth = data
            inputs = inputs.to(device)
            truth = truth.to(device)
            output = model(**inputs)['logits']
            predict = torch.max(output.data, 1)[1]

            cur_preds = [torch.ones_like(predict) for _ in range(dist.get_world_size())]
            cur_truth = [torch.ones_like(truth) for _ in range(dist.get_world_size())]
            dist.all_gather(cur_preds, predict)
            dist.all_gather(cur_truth, truth)

            preds.extend(cur_preds)
            labels.extend(cur_truth)

    model.train()
    predict = torch.cat(preds, 0)
    labels = torch.cat(labels, 0)
    correct = (predict == labels).sum().item()
    return correct * 1.0 / len(predict)


if __name__ == '__main__':
 train()

Accelerator

https://github.com/huggingface/accelerate

使用

代码语言:javascript
复制
import accelerate
accelerator = accelerate.Accelerator()
device = accelerator.device #获取当前进程的设备
...
# 进行封装
model, optimizer, dataloader = accelerator.prepare(model, optimizer, dataloader)

#训练时 loss.backward() 换为:
accelerator.backward(loss)

accelerator.print:仅仅在主进程输出

accelerator.process_index: 当前进程ID,没有使用rank命名,而是用的process_index来表示

accelerator.is_local_main_process/is_main_processs:: 是否local_rank 或则rank为0, 主进程

accelerator.wait_for_everyone() :类似 dist.barrier() , 等所有进程到达这一步。

accelerator.save:保存模型

image-20230817153232090

Horovod

  • https://github.com/horovod/horovod
  • 使用
代码语言:javascript
复制
import horovod.torch as hvd
# 初始化
hvd.init()
# Samapler
# *此处num_replicas=hvd.size(), rank=hvd.rank()必须*
train_sampler = torch.utils.data.distributed.DistributedSampler(
    train_dataset, num_replicas=hvd.size(), rank=hvd.rank())

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=..., sampler=train_sampler)
# 优化器包装
optimizer = hvd.DistributedOptimizer(optimizer, named_parameters=model.named_parameters())
# 模型分发广播
hvd.broadcast_parameters(model.state_dict(), root_rank=0)
# 模型训练不需要修改
代码语言:javascript
复制
$ horovodrun -np 4 -H localhost:4 python3 train.py

Reference

  • https://shomy.top/2022/01/05/torch-ddp-intro/#horovod
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-08-17 16:06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 iResearch666 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Pytorch分布式训练
    • Dataset
      • DP
        • DDP
          • 基本概念
          • 初始化
          • 运行方法
          • 模型训练
          • 模型推理
          • 注意事项
          • 使用总结
          • 完整样例
      • Accelerator
      • Horovod
      • Reference
      相关产品与服务
      文件存储
      文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档