Dataset
:iter
迭代器的方式实现流的形式输入数据。无需满足内存大于整个数据集,也无需知道全部数据的大小。 简单来说,如果数据集较小时推荐尽量使用Map式Dataset,数据量过大、数据量未知、训练内存无法满足时只能使用Iterable
式构建Dataset。
在分布式时训练中数据并行的时,每块GPU都有一个独立的model和独立的进程(DDP模式)去训练完整数据的子集,在Pytorch中的DDP模式是通过DistributedSampler()
去实现在分布式并行训练时每个模型读取是整个数据集上不同部分,从而避免训练时取数据发生重复。比较坑的是:DistributedSampler()主要通过切片的方式将整个数据集分成独立的子集。Map 式 Dataset无需再多写代码直接可以适用。而iterable式 Dataset是无法切片。
注意,如果它是一个带有一些随机性的 torch.utils.data.IterableDataset
,并且你是以分布式方式进行训练的,你的 iterable dataset
要么使用一个内部的 attribute generator
,该generator
是一个 torch.Generator
用于随机化,且在所有进程上必须是相同的(并且 Trainer
将在每个 epoch
手动设置该 generator
的种子);要么有一个 set_epoch()
方法,在该方法内部设置所用随机数生成器的种子。
def __iter__(self) -> Iterator[T_co]:
if self.shuffle:
# deterministically shuffle based on epoch and seed
g = torch.Generator()
g.manual_seed(self.seed + self.epoch)
indices = torch.randperm(len(self.dataset), generator=g).tolist() # type: ignore
else:
indices = list(range(len(self.dataset))) # type: ignore
from torch.utils.data import IterableDataset
class IterableDataset(IterableDataset):
def __init__(self, file_path):
self.file_path = file_path
def __iter__(self):
with open(self.file_path, "r") as f:
for line in f.readlines():
items = line.split("\t")
# process data
.......
yield {"input1": input1, "input2": input2, "input3": input3}
数据并行:因为求导以及加和都是线性的,数据并行在数学上也有效
image-20230817103103621
model = nn.DataParallel(model)
image-20230817104337458
image-20230817103746077
class DataParallel(Module):
def __init__(self, module, device_ids=None, output_device=None, dim=0):
super(DataParallel, self).__init__()
# 检查是否有可用的 GPU
device_type = _get_available_device_type()
if device_type is None:
self.module = module
self.device_ids = []
return
# 默认使用所有可见的 GPU
if device_ids is None:
device_ids = _get_all_device_indices()
# 默认 server 是 device_ids 列表上第一个
if output_device is None:
output_device = device_ids[0]
self.dim = dim
self.module = module
self.device_ids = list(map(lambda x: _get_device_index(x, True), device_ids))
self.output_device = _get_device_index(output_device, True)
self.src_device_obj = torch.device(device_type, self.device_ids[0])
# 检查负载是否平衡, 不平衡(指内存或者处理器 max/min > 0.75 会有警告)
_check_balance(self.device_ids)
# 单卡
if len(self.device_ids) == 1:
self.module.to(self.src_device_obj)
def forward(self, *inputs, **kwargs):
# 没 GPU 可用
if not self.device_ids:
return self.module(*inputs, **kwargs)
# 运行前 GPU device_ids[0] (即我们的 server )上必须有 parallelized module 的parameters 和 buffers
# 因为 DP 保证 GPU device_ids[0] 和 base parallelized module 共享存储
# 所以在device[0] 上的 in-place 更新也会被保留下来,其他的则不会
for t in chain(self.module.parameters(), self.module.buffers()):
if t.device != self.src_device_obj:
raise RuntimeError("module must have its parameters and buffers "
"on device {} (device_ids[0]) but found one of "
"them on device: {}".format(self.src_device_obj, t.device))
# nice 现在 device[0] 上已经有了 module 和 input, 接下来我们就要开始 PS 算法了
# 可以开始看正文了
inputs, kwargs = self.scatter(inputs, kwargs, self.device_ids)
# 如果仅有单卡可用,直接单卡计算,不用并行
if len(self.device_ids) == 1:
return self.module(*inputs[0], **kwargs[0])
replicas = self.replicate(self.module, self.device_ids[:len(inputs)])
outputs = self.parallel_apply(replicas, inputs, kwargs)
return self.gather(outputs, self.output_device)
def replicate(self, module, device_ids):
return replicate(module, device_ids, not torch.is_grad_enabled())
def scatter(self, inputs, kwargs, device_ids):
return scatter_kwargs(inputs, kwargs, device_ids, dim=self.dim)
def parallel_apply(self, replicas, inputs, kwargs):
return parallel_apply(replicas, inputs, kwargs, self.device_ids[:len(replicas)])
def gather(self, outputs, output_device):
return gather(outputs, output_device, dim=self.dim)
假设所有进程数即 world_size为W,每个节点上的进程数即local_world_size为L,则每个进程上的两个ID:
import torch.distributed as dist
import argparse, os
parser = argparse.ArgumentParser()
parser.add_argument("--local_rank", type=ine, default=0)
args = parser.parse_args()
dist.init_process_group("nccl")
rank = dist.get_rank()
local_rank_arg = args.local_rank # 命令行形式ARGS形式
local_rank_env = int(os.environ['LOCAL_RANK']) # 在利用env初始ENV环境变量形式
local_world_size = int(os.environ['LOCAL_WORLD_SIZE'])
print(f"{rank=}; {local_rank_arg=}; {local_rank_env=}; {local_world_size=}")
$ python3 -m torch.distributed.launch --nproc_per_node=4 test.py
rank=2; local_rank_arg=2; local_rank_env=2, local_world_size=4
rank=0; local_rank_arg=0; local_rank_env=0, local_world_size=4
rank=3; local_rank_arg=3; local_rank_env=3, local_world_size=4
rank=1; local_rank_arg=1; local_rank_env=1, local_world_size=4
torch.distributed.init_process_group(backend, init_method=None, world_size=-1, rank=-1, store=None,...)
backend
NCCL, GLOO,MPI
三种可用的后端GLOO
, GPU的分布式训练就用NCCL
即可init_method
init_method
,可以是TCP连接、File共享文件系统、ENV环境变量三种方式store
,同时指定world_size 和 rank参数。这里的store是一种分布式中核心的key-value存储,用于不同的进程间共享信息。这两种方法是互斥的,其实本质上第一种方式是对第二种的一个更高的封装,最后都要落到store上进行实现。如果这两种方法都没有使用,默认使用init_method='env'
的方式来初始化。
三种init_method
:
init_method='tcp://ip:port'
:通过指定rank 0(即:MASTER进程)的IP和端口,各个进程进行信息交换。需指定 rank 和 world_size 这两个参数。init_method='file://path'
:通过所有进程都可以访问共享文件系统来进行信息共享。需要指定rank和world_size参数。init_method=env://
:从环境变量中读取分布式的信息(os.environ),主要包括 MASTER_ADDR, MASTER_PORT, RANK, WORLD_SIZE
。其中,rank和world_size可以选择手动指定,否则从环境变量读取。torch.multiprocessing
(python的multiprocessing
的封装类)mp.spawn(fn, args=(), nprocs=1, join=True, daemon=False)
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
def fn(rank, ws, nums):
dist.init_process_group('nccl', init_method='tcp://127.0.0.1:28765',
rank=rank, world_size=ws)
rank = dist.get_rank()
print(f"rank = {rank} is initialized")
torch.cuda.set_device(rank)
tensor = torch.tensor(nums).cuda()
print(tensor)
if __name__ == "__main__":
ws = 2
mp.spawn(fn, nprocs=ws, args=(ws, [1, 2, 3, 4]))
rank = 0 is initialized
rank = 1 is initialized
tensor([1, 2, 3, 4], device='cuda:1')
tensor([1, 2, 3, 4], device='cuda:0')
2. launch
$ python3 -m torch.distributed.launch --配置 train.py --args参数
# 通过外部命令运行
# 通过CUDA_VISIBLE_DEVICES控制可见的卡数
# 通过--nproc_per_node确定使用多少卡
CUDA_VISIBLE_DEVICES="0,1,2,3" python -m torch.distributed.run --nproc_per_node 4 train.py
常用配置有:
--nnodes: 使用的机器数量,单机的话,就默认是1了
--nproc_per_node: 单机的进程数,即单机的worldsize
--master_addr/port: 使用的主进程rank0的地址和端口
--node_rank: 当前的进程rank
在单机情况下, 只有--nproc_per_node 是必须指定的,--master_addr/port和node_rank都是可以由launch通过环境自动配置
$ torchrun --nproc_per_node=4 train.py
命令torchrun
来代替torch.distributed.launch
RANK,LOCAL_RANK, WORLD_SIZE
等,尤其是local_rank
不再支持用命令行隐式传递的方式load_checkpoint(path)
和save_checkpoint(path)
这样有worker失败的话,可以通过load最新的模型,重启所有的worker接着训练步骤
dist.init_process_group
DistributedSampler
DistributedDataParallel
封装模型torchrun
或者 mp.spawn
启动分布式训练
image-20230817150712701
1 分布式训练数据加载
torch.utils.data.distributed.DistributedSampler(dataset,
num_replicas=None, rank=None, shuffle=True, seed=0, drop_last=False)
self.seed+self.epoch
这样的好处是,不同的epoch每个进程拿到的数据是不一样,因此需要在每个epoch开始前设置下:sampler.set_epoch(epoch)
其实Sampler的实现也很简单,核心代码就一句:
indices[self.rank: self.total_size: self.num_replicas]
假设4卡12条数据的话,rank=0,1,2,3, num_replicas=4, 那么每个卡取的数据索引就是:
rank0: [0 4 8]; rank1: [1 5 9]; rank2: [2 6 10]; rank3: [3 7 11]
保证不重复不交叉。这样在分布式训练的时候,只需要给Dataloader指定DistributedSampler即可,简单示例如下:
sampler = DistributedSampler(dataset)
loader = DataLoader(dataset, sampler=sampler)
for epoch in range(start_epoch, n_epochs):
sampler.set_epoch(epoch) # 设置epoch 更新种子
train(loader)
train_sampler = RandomSampler(train_dataset) if args.local_rank == -1 else DistributedSampler(train_dataset)
2 模型的分布式训练封装
torch.cuda.set_device(local_rank)
model = Model().cuda()
model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[local_rank])
# 要调用model内的函数或者属性. model.module.xxxx
3 模型保存与加载
# 后面正常训练代码
optimizer = xxx
for epoch:
train_sampler.set_epoch(epoch)
for data in Dataloader:
model(data)
xxx
# 训练完成 只需要保存rank 0上的即可
# 不需要dist.barrior(), all_reduce 操作保证了同步性
if rank == 0:
torch.save(model.module.state_dict(), CHECKPOINT_PATH)
# 保存的是参数,不需要DDP包裹
torch.save(model.module.state_dict())
model = DistributedDataParallel(model, device_ids=[local_rank])
CHECKPOINT_PATH ="./model.checkpoint"
if rank == 0: # 主进程
torch.save(ddp_model.state_dict(), CHECKPOINT_PATH)
# barrier()其他保证rank 0保存完成
dist.barrier()
map_location = {"cuda:0": f"cuda:{local_rank}"}
model.load_state_dict(torch.load(CHECKPOINT_PATH, map_location=map_location))
这样在多卡训练时,每个进程有一个model副本和optimizer,使用自己的数据进行训练,之后反向传播计算完梯度的时候,所有进程的梯度会进行all-reduce操作进行同步,进而保证每个卡上的模型更新梯度是一样的,模型参数也是一致的。
这里有一个需要注意的地方,在save和load模型时候,为了减小所有进程同时读写磁盘,一般处理方法是以主进程为主,rank0先save模型,在map到其他进程。这样的另外一个好处,在最开始训练时,模型随机初始化之后,保证了所有进程的模型参数保持一致。
4 损失函数
# 仍然可以直接调用模型的train()方法
# 但是假如要调用其他你自己写的方法,就得model.module.func()
model.train()
for data in dataloader:
loss = model(data)
optimizer.zero_grad()
loss.backward() # 这个操作自动同步梯度
optimizer.step()
# 但是仍然需要累加得到所有进程loss的值的和
dist.all_reduce(loss, op=dist.ReduceOp.SUM)
# 然后除以并行数,就是这个batch的loss值了
loss /= world_size
一般需要先所有进程的输出结果进行gather,再进行指标的计算,两个常用的函数:
dist.all_gather(tensor_list, tensor)
: 将所有进程的tensor进行收集并拼接成新的tensorlist返回,比如:dist.all_reduce(tensor, op)
这是对tensor
的in-place的操作, 对所有进程的某个tensor进行合并操作,op可以是求和等:pred_list = []
for data in Dataloader:
pred = model(data)
batch_pred = [torch.zeros_like(label) for _ in range(world_size)]
dist.all_gather(batch_pred, pred)
pred_list.extend(batch_pred)
pred_list = torch.cat(pred_list, 1)
# 所有进程pred_list是一致的,保存所有数据模型预测的值
def test(model, test_loader, device):
model.eval()
preds = []
labels = []
with torch.no_grad():
for data in test_loader:
inputs, truth = data
inputs = inputs.to(device)
truth = truth.to(device)
output = model(**inputs)['logits']
predict = torch.max(output.data, 1)[1]
cur_preds = [torch.ones_like(predict) for _ in range(dist.get_world_size())]
cur_truth = [torch.ones_like(truth) for _ in range(dist.get_world_size())]
dist.all_gather(cur_preds, predict)
dist.all_gather(cur_truth, truth)
preds.extend(cur_preds)
labels.extend(cur_truth)
model.train()
predict = torch.cat(preds, 0)
labels = torch.cat(labels, 0)
correct = (predict == labels).sum().item()
return correct * 1.0 / len(predict)
set_epoch()
方法。(需要set_epoch来使用sampleer中shuffle,否则不是随机的)model.module
来获得原始模型,同样保存参数时也保存的是model.module
的参数而不是DDP包裹的。import argparse
import torch
from torch.nn.parallel import DistributedDataParallel as DDP
parser = argparse.ArgumentParser()
parser.add_argument("--save_dir", default='')
parser.add_argument("--local_rank", default=-1)
parser.add_argument("--world_size", default=1)
args = parser.parse_args()
# 初始化后端 建议NCCL
# world_size 指的是总的并行进程数目
# 比如16张卡单卡单进程 就是 16
# 但是如果是8卡单进程 就是 1
# 等到连接的进程数等于world_size,程序才会继续运行
torch.distributed.init_process_group(backend='nccl',
world_size=ws,
init_method='env://')
torch.cuda.set_device(args.local_rank)
device = torch.device(f'cuda:{args.local_rank}')
model = nn.Linear(2,3).to(device)
# train dataset
# train_sampler
# train_loader
my_trainset = torchvision.datasets.CIFAR10(root='./data', train=True)
# 新增1:使用DistributedSampler,DDP帮我们把细节都封装起来了。用,就完事儿!
# sampler的原理,后面也会介绍。
train_sampler = torch.utils.data.distributed.DistributedSampler(my_trainset)
# 需要注意的是,这里的batch_size指的是每个进程下的batch_size。也就是说,总batch_size是这里的batch_size再乘以并行数(world_size)。
trainloader = torch.utils.data.DataLoader(my_trainset, batch_size=batch_size, sampler=train_sampler)
# 初始化 DDP,这里我们通过规定 device_id 用了单卡单进程
# 实际上根据我们前面对 parallel_apply 的解读,DDP 也支持一个进程控制多个线程利用多卡
model = DDP(model,
device_ids=[args.local_rank],
output_device=args.local_rank).to(device)
for epoch in range(epochs):
trainloader.sampler.set_epoch(epoch)
# 后面这部分,则与原来完全一致了。
for data, label in trainloader:
prediction = model(data)
loss = loss_fn(prediction, label)
loss.backward()
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
optimizer.step()
# 保存模型
if torch.distributed.get_rank() == 0:
torch.save(model.module.state_dict(),
'results/%s/model.pth' % args.save_dir)
区别
源码
class DistributedDataParallel(Module):
def __init__(self, module, device_ids=None,
output_device=None, dim=0, broadcast_buffers=True,
process_group=None,
bucket_cap_mb=25,
find_unused_parameters=False,
check_reduction=False,
gradient_as_bucket_view=False):
super(DistributedDataParallel, self).__init__()
assert any((p.requires_grad for p in module.parameters())), (
"DistributedDataParallel is not needed when a module "
"doesn't have any parameter that requires a gradient."
)
self.is_multi_device_module = len({p.device for p in module.parameters()}) > 1
distinct_device_types = {p.device.type for p in module.parameters()}
assert len(distinct_device_types) == 1, (
"DistributedDataParallel's input module must be on "
"the same type of devices, but input module parameters locate in {}."
).format(distinct_device_types)
self.device_type = list(distinct_device_types)[0]
if self.device_type == "cpu" or self.is_multi_device_module:
assert not device_ids and not output_device, (
"DistributedDataParallel device_ids and output_device arguments "
"only work with single-device GPU modules, but got "
"device_ids {}, output_device {}, and module parameters {}."
).format(device_ids, output_device, {p.device for p in module.parameters()})
self.device_ids = None
self.output_device = None
else:
# Use all devices by default for single-device GPU modules
if device_ids is None:
device_ids = _get_all_device_indices()
self.device_ids = list(map(lambda x: _get_device_index(x, True), device_ids))
if output_device is None:
output_device = device_ids[0]
self.output_device = _get_device_index(output_device, True)
if process_group is None:
self.process_group = _get_default_group()
else:
self.process_group = process_group
self.dim = dim
self.module = module
self.device = list(self.module.parameters())[0].device
self.broadcast_buffers = broadcast_buffers
self.find_unused_parameters = find_unused_parameters
self.require_backward_grad_sync = True
self.require_forward_param_sync = True
self.ddp_join_enabled = False
self.gradient_as_bucket_view = gradient_as_bucket_view
if check_reduction:
# This argument is no longer used since the reducer
# will ensure reduction completes even if some parameters
# do not receive gradients.
warnings.warn(
"The `check_reduction` argument in `DistributedDataParallel` "
"module is deprecated. Please avoid using it."
)
pass
# used for intra-node param sync and inter-node sync as well
self.broadcast_bucket_size = int(250 * 1024 * 1024)
#
# reduction bucket size
self.bucket_bytes_cap = int(bucket_cap_mb * 1024 * 1024)
# 保证初始状态一样
# Sync params and buffers
self._sync_params_and_buffers(authoritative_rank=0)
# 下拉看源码
self._ddp_init_helper()
def _sync_params_and_buffers(self, authoritative_rank=0):
module_states = list(self.module.state_dict().values())
if len(module_states) > 0:
self._distributed_broadcast_coalesced(
module_states,
self.broadcast_bucket_size,
authoritative_rank)
def _ddp_init_helper(self):
"""
Initialization helper function that does the following:
(1) replicating the module from device[0] to the other devices (前文提到 DDP 也支持一个进程多线程利用多卡,类似 DP ,这时候就会用到第一步)
(2) bucketing the parameters for reductions (把 parameter 分组,梯度通讯时,先得到梯度的会通讯)
(3) resetting the bucketing states
(4) registering the grad hooks (创建管理器)
(5) passing a handle of DDP to SyncBatchNorm Layer (为 SyncBN 准备)
"""
def parameters(m, recurse=True):
def model_parameters(m):
ps = m._former_parameters.values() \
if hasattr(m, "_former_parameters") \
else m.parameters(recurse=False)
for p in ps:
yield p
for m in m.modules() if recurse else [m]:
for p in model_parameters(m):
yield p
if self.device_ids and len(self.device_ids) > 1:
warnings.warn(
"Single-Process Multi-GPU is not the recommended mode for "
"DDP. In this mode, each DDP instance operates on multiple "
"devices and creates multiple module replicas within one "
"process. The overhead of scatter/gather and GIL contention "
"in every forward pass can slow down training. "
"Please consider using one DDP instance per device or per "
"module replica by explicitly setting device_ids or "
"CUDA_VISIBLE_DEVICES. "
)
# only create replicas for single-device CUDA modules
#
# TODO: we don't need to replicate params in here. they're always going to
# be broadcasted using larger blocks in broadcast_coalesced, so it might be
# better to not pollute the caches with these small blocks
self._module_copies = replicate(self.module, self.device_ids, detach=True)
self._module_copies[0] = self.module
for module_copy in self._module_copies[1:]:
for param, copy_param in zip(self.module.parameters(), parameters(module_copy)):
# Reducer requires param copies have the same strides across replicas.
# Fixes up copy_param strides in case replicate didn't match param strides.
if param.layout is torch.strided and param.stride() != copy_param.stride():
with torch.no_grad():
copy_param.set_(copy_param.clone()
.as_strided(param.size(), param.stride())
.copy_(copy_param))
copy_param.requires_grad = param.requires_grad
else:
self._module_copies = [self.module]
self.modules_params = [list(parameters(m)) for m in self._module_copies]
self.modules_buffers = [list(m.buffers()) for m in self._module_copies]
# Build tuple of (module, parameter) for all parameters that require grads.
modules_and_parameters = [
[
(module, parameter)
for module in replica.modules()
for parameter in filter(
lambda parameter: parameter.requires_grad,
parameters(module, recurse=False))
] for replica in self._module_copies]
# Build list of parameters.
parameters = [
list(parameter for _, parameter in replica)
for replica in modules_and_parameters]
# Checks if a module will produce a sparse gradient.
def produces_sparse_gradient(module):
if isinstance(module, torch.nn.Embedding):
return module.sparse
if isinstance(module, torch.nn.EmbeddingBag):
return module.sparse
return False
# Build list of booleans indicating whether or not to expect sparse
# gradients for the corresponding parameters.
expect_sparse_gradient = [
list(produces_sparse_gradient(module) for module, _ in replica)
for replica in modules_and_parameters]
# The bucket size limit is specified in the constructor.
# Additionally, we allow for a single small bucket for parameters
# that are defined first, such that their gradients don't spill into
# a much larger bucket, adding unnecessary latency after gradient
# computation finishes. Experiments showed 1MB is a reasonable value.
bucket_indices = dist._compute_bucket_assignment_by_size(
parameters[0],
[dist._DEFAULT_FIRST_BUCKET_BYTES, self.bucket_bytes_cap],
expect_sparse_gradient[0])
# Note: reverse list of buckets because we want to approximate the
# order in which their gradients are produced, and assume they
# are used in the forward pass in the order they are defined.
# 管理器
self.reducer = dist.Reducer(
parameters,
list(reversed(bucket_indices)),
self.process_group,
expect_sparse_gradient,
self.bucket_bytes_cap,
self.find_unused_parameters,
self.gradient_as_bucket_view)
# passing a handle to torch.nn.SyncBatchNorm layer
self._passing_sync_batchnorm_handle(self._module_copies)
# -*- coding: utf-8 -*-
import os
import torch
import torch.optim as optim
import torch.distributed as dist
from transformers import BertForSequenceClassification
from torch.utils.data.distributed import DistributedSampler
from utils import bert_name, collate_fn, load_data_and_labels, Data
from torch.utils.data import DataLoader
from sklearn.model_selection import train_test_split
import time
epochs = 15
lr = 2e-5
batch_size = 64
def train():
# dist init
dist.init_process_group(backend='nccl', init_method='env://')
rank = dist.get_rank()
size = dist.get_world_size()
local_rank = int(os.environ['LOCAL_RANK'])
device = torch.device(f"cuda:{local_rank}")
torch.cuda.set_device(device)
# dataset
x_text, y = load_data_and_labels("./data/rt-polarity.pos", "./data/rt-polarity.neg")
x_train, x_test, y_train, y_test = train_test_split(x_text, y, test_size=0.1)
train_data = Data(x_train, y_train)
test_data = Data(x_test, y_test)
train_sampler = DistributedSampler(train_data)
train_loader = DataLoader(train_data, batch_size=batch_size*size, collate_fn=collate_fn, sampler=train_sampler)
test_loader = DataLoader(test_data, batch_size=batch_size*size, shuffle=False,
collate_fn=collate_fn, sampler=DistributedSampler(test_data))
# model
model = BertForSequenceClassification.from_pretrained(bert_name, num_labels=2)
model.to(device)
model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[local_rank])
optimizer = optim.Adam(model.parameters(), lr=lr*size)
best_acc = -0.1
print(f"rank {rank} start training...")
for epoch in range(1, epochs):
total_loss = 0.0
train_sampler.set_epoch(epoch)
model.train()
start_time = time.time()
for step, batch_data in enumerate(train_loader):
inputs, labels = batch_data
inputs = inputs.to(device)
labels = labels.to(device)
optimizer.zero_grad()
output = model(**inputs, labels=labels)
loss = output[0]
loss.backward()
optimizer.step()
total_loss += loss.item()
end_time = time.time()
acc = test(model, test_loader, device)
if acc > best_acc:
best_acc = acc
if rank == 0:
print(f"\t Epoch{epoch}: loss: {total_loss:.4f}, acc: {acc:.4f}, time: {(end_time - start_time):.2f}s")
if rank == 0:
print("*"*20)
print(f"finished; best acc: {best_acc:.4f}")
def test(model, test_loader, device):
model.eval()
preds = []
labels = []
with torch.no_grad():
for data in test_loader:
inputs, truth = data
inputs = inputs.to(device)
truth = truth.to(device)
output = model(**inputs)['logits']
predict = torch.max(output.data, 1)[1]
cur_preds = [torch.ones_like(predict) for _ in range(dist.get_world_size())]
cur_truth = [torch.ones_like(truth) for _ in range(dist.get_world_size())]
dist.all_gather(cur_preds, predict)
dist.all_gather(cur_truth, truth)
preds.extend(cur_preds)
labels.extend(cur_truth)
model.train()
predict = torch.cat(preds, 0)
labels = torch.cat(labels, 0)
correct = (predict == labels).sum().item()
return correct * 1.0 / len(predict)
if __name__ == '__main__':
train()
https://github.com/huggingface/accelerate
使用
import accelerate
accelerator = accelerate.Accelerator()
device = accelerator.device #获取当前进程的设备
...
# 进行封装
model, optimizer, dataloader = accelerator.prepare(model, optimizer, dataloader)
#训练时 loss.backward() 换为:
accelerator.backward(loss)
accelerator.print
:仅仅在主进程输出
accelerator.process_index
: 当前进程ID,没有使用rank命名,而是用的process_index来表示
accelerator.is_local_main_process/is_main_processs:
: 是否local_rank 或则rank为0, 主进程
accelerator.wait_for_everyone()
:类似 dist.barrier() , 等所有进程到达这一步。
accelerator.save
:保存模型
image-20230817153232090
import horovod.torch as hvd
# 初始化
hvd.init()
# Samapler
# *此处num_replicas=hvd.size(), rank=hvd.rank()必须*
train_sampler = torch.utils.data.distributed.DistributedSampler(
train_dataset, num_replicas=hvd.size(), rank=hvd.rank())
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=..., sampler=train_sampler)
# 优化器包装
optimizer = hvd.DistributedOptimizer(optimizer, named_parameters=model.named_parameters())
# 模型分发广播
hvd.broadcast_parameters(model.state_dict(), root_rank=0)
# 模型训练不需要修改
$ horovodrun -np 4 -H localhost:4 python3 train.py
本文分享自 iResearch666 微信公众号,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文参与 腾讯云自媒体同步曝光计划 ,欢迎热爱写作的你一起参与!