第二十四章 解读Pytorch中多GPU并行计算教程(工具)
利用GPU执行PyTorch程序(支持单卡或多卡配置)
PyTorch GPU操作指令
通过CUDA设置显卡配置
本教程涵盖的代码均可从GitHub获取:https://github.com/WZMIAOMIAO/deep-learning-for-image-processing位于pytorch_classification模块中的train_multi_GPU文件夹中。
常见多GPU使用方法
在模型训练过程中,默认情况下会利用多块GPU设备实现并行计算(即便采用分布式计算架构)。如图所示,在实际应用中,默认情况下会利用多块GPU设备实现并行计算(即便采用分布式计算架构)。常见的采用多GPU实现并行计算的方法主要有两种:一种是同步数据并行策略;另一种是异步参数更新策略。
- 当模型规模较大时,在单卡内存不足以容纳整个模型的情况下,默认会采用将模型分割为若干子模型(如图1左半部分所示),通过多块显卡协同运行的方式实现负载分配(...)。然而,在这种场景下,并行训练的效果并不显著提升。
- 对于较小规模的模型,在单卡内存足以支持的情况下,则可以通过数据并行的方式实现加速训练(如图1右半部分所示),这种方式更为常见也是本文讨论的重点。

此图表比较了采用多块GPU并行加速的训练时间对比情况。该测试环境包括Pytorch1.7、CUDA10.1以及ResNet34模型,并基于flower_photos数据集进行实验设置。批次大小设定为16,并采用Tesla V100 GPU进行运算。从左侧柱状图可以看出,在模型规模固定的情况下逐步增加计算资源能够显著提升训练效率。然而从右侧折线图可以看到,在模型规模固定的情况下逐步增加计算资源能够显著提升训练效率。然而由于在多GPU并行训练过程中会存在各 GPU 之间的通信需求,在实际应用中可能会影响整体性能表现

多GPU并行训练过程中需要注意的事项
以下阐述的注意事项中,在Pytorch框架已经实现了功能的情况下,请问我们需要了解这些内容吗?
- 数据如何分配至各个设备?在采用多GPU并行训练时,请注意以下要点:每个GPU仅负责整个数据集的一部分。
- 不同设备之间的误差梯度如何进行通信?每次执行一次前向传播过程后,在反向传播过程中每一个GPU都会计算得到对应输入数据与各个参数相关的误差梯度(
Gradient如图中右侧所示)。建议在此阶段暂不更新模型参数;而应首先将各个设备上的相同参数对应的误差梯度进行平均融合(即整合不同硬件上的学习信息),随后完成模型参数的同步更新。

- BatchNormalization如何在不同设备间同步。关于BN理论知识不在本文介绍范围内,如果不了解的可以查看我之前写的一篇文章,Batch Normalization理论详解。如果不考虑多设备之间的BN通信的话,每个设备只去计算每个BN层针对该设备输入数据的均值和方差。假如每个设备的
batch_size为2,则每个BN层计算的均值和方差只是针对2个样本的。之前在讲BN理论时有说过,一般batch_size设置越大效果越好,那么如果我们在计算BN层的均值和方差时能够同步多块GPU上的统计信息,那batch_size不就相当于倍增了?确实如此,在Pytorch中也有提供具有同步BN的方法SyncBatchNorm。当GPU显存有限,每个设备上的batch_size设置很小时,通过使用具有同步功能的BN层时是能够提升模型最终的mAP的,但如果每个设备上的batch_size设置的已经很大了,那么个人感觉同步的BN就没太大作用了。注意:如果使用具有同步功能的BN,会降低模型的训练速度,因为在每个BN层处都需要去同步参数,所以会更耗时。

图形呈现了单GPU与多GPU训练(采用同步批归一化处理或未采用)的学习曲线。观察到多GPU与单GPU的学习结果相当接近(但多GPU的速度更快)。达到了稍高于未采用情况的最佳mAP。

Pytorch中提供的两种多GPU训练方法
PyTorch框架提供了两种实现多GPU训练的方法:DataParallel与DistributedDataParallel。其中DataParallel是一种基于早期的方法,在同一台机器上部署多个GPU时能够实现并行计算;而当前推荐使用的DistributedDataParallel则是一种更为复杂的分布式设计,在支持单机多卡的同时也能兼容分布式计算环境,并且在某些情况下展现出更高的效率优势。需要注意的是,在这两种方法中,默认情况下无论是哪种配置模式下,在同一台机器上部署多个GPU时都会体现出较高的性能水平;然而如果仅限于在同一台机器上部署多个GPU时,则DistributedDataParallel依然能够展现出更好的性能优势。
本文将重点介绍单机多卡场景下的训练优化方案。

Pytorch中多GPU常用启动方式
在PyTorch中使用多GPU的常用启动方式有两种:一种是torch.distributed.launch方法, 另一种是torch.multiprocessing模块. 这两种方法各有优劣, 在实际应用中, 我个人认为torch.distributed.launch启动方式更为便捷, 同时官方提供的多GPU训练项目如FasterRCNN源码也是采用该方法实现的, 所以我个人也比较偏爱这种方法. 然而, 在官方提供的教程中则主要采用的是torch.multiprocessing方法. 官方指出这种方法具有更好的控制性和灵活性. 在实际使用体验中我也证实了这一点.

train_multi_gpu_using_launch.py脚本讲解
此代码基于此前讲解的知识进行了扩展。其中主要涉及ResNet模型的搭建过程以及自定义的数据集开发。此处不做详细解释。如需进一步了解,请参考我之前的相关视频内容。
部分内容可能需要特别关注一下。如果您对完整的代码细节感兴趣,请参考文章开头提供的视频链接。
- 首先说下
init_distributed_mode函数,该函数是用来初始化各进程的:
def init_distributed_mode(args):
if 'RANK' in os.environ and 'WORLD_SIZE' in os.environ:
args.rank = int(os.environ["RANK"])
args.world_size = int(os.environ['WORLD_SIZE'])
args.gpu = int(os.environ['LOCAL_RANK'])
else:
print('Not using distributed mode')
args.distributed = False
return
args.distributed = True
torch.cuda.set_device(args.gpu)
args.dist_backend = 'nccl' # 通信后端,nvidia GPU推荐使用NCCL
print('| distributed init (rank {}): {}'.format(
args.rank, args.dist_url), flush=True)
dist.init_process_group(backend=args.dist_backend, init_method=args.dist_url,
world_size=args.world_size, rank=args.rank)
dist.barrier()
当调用命令行指令torch.distributed.launch --use_env执行时,在Python程序的os.environ变量中会被自动地记录RANK、WORLD_SIZE和LOCAL_RANK等数据。
- 在一个单机多卡的环境中 ,WORLD\_SIZE代表使用多少个独立的计算单元 ,其中每个进程都分配了一块独立的GPU 。值得注意的是 ,RANK与LOCAL\_RANK的值相同 ,它们分别表示在全局环境中的位置序号以及该节点内使用的第几位计算单元(GPU)序号 。
- 在一个多节点环境下 ,WORLD\_SIZE代表整个系统的总计算资源数 ,其中每个节点上分配了一块独立的 GPU 。RANK表示当前节点在整个系统中的位置序号 ,而 LOCAL\_RANK则表示该节点内使用的第几位计算单元(GPU)序号 。
在init_distributed_mode函数中会从os.environ获取关键参数RANK、WORLD_SIZE和LOCAL_RANK。根据这些信息确定了自己在多线程系统中的位置,并应选择相应的GPU设备进行操作。为此可以调用torch.cuda.set_device()方法来指定当前使用的GPU设备。随后利用dist.init_process_group()初始化进程组,在此过程中需指定通信后端以及初始化方法:其中backend指定通信后端并建议采用NCCL作为后端;init_method则决定了如何进行资源初始化,默认情况下采用env://配置但也可选择TCP或指向特定共享文件路径;world_size表示该进程中参与通信的总进程数(每个进程对应一块显卡);而rank则指定了当前进程中所处的位置编号。
- 代码中有很多rank判断,例如:
if rank == 0: # 在第一个进程中打印信息,并实例化tensorboard
print(args)
print('Start Tensorboard with "tensorboard --logdir=runs", view at http://localhost:6006/')
tb_writer = SummaryWriter()
if os.path.exists("./weights") is False:
os.makedirs("./weights")
这些判断的目的在于将一系列读写操作集中到第一个进程中(即rank=0)进行处理以避免不同进程中重复执行操作导致的问题
接着介绍torch.utils.data.distributed.DistributedSampler这个类的功能,在前面我们提到过当使用多块GPU进行训练时需要将数据分配到各个GPU上时用到了这个方法。实际上DistributedSampler正是用来完成这种任务的。如果感兴趣的话可以自行查阅源码阅读量不大理解起来也不困难。下面这幅图是我个人对于数据划分流程的理解:
首先会对整个数据集进行随机排序然后根据使用的GPU数量对数据进行补充假设当前的数据集中共有11个样本而使用的是2块GPU设备那么11除以2向上取整得到6再乘以 GPU的数量等于12这样还缺少一个样本于是就会将打乱顺序后的第一个样本复制一份添加到末尾使总数达到12个样本刚好满足需求之后再将这些排序好的样本按照间隔的方式依次分配给各个GPU设备这样就能保证每个GPU拿到的数据互不干扰且分布均匀。

下面我们来详细探讨torch.utils.data.BatchSampler这一方法的功能。它实际上整合了由torch.utils.data.distributed.DistributedSampler分配好的数据块,并按照指定的batch_size进行分组以形成完整的批次(如图所示)。假设我们设定的batch_size为2,则系统会根据预先分配的数据集合以及给定的batch_size将这些数据分成一组一组的小批量进行处理。在构建DataLoader时,我们可以配置一个batch_sampler参数来实现这一功能

接下来介绍如何将模型中的普通BN层转换成具备同步功能的BN层。其实很简单,在下面这行代码中使用PyTorch会自动将模型中的所有普通BN层转换为SyncBatchNorm。需要注意的是,在转换完成后会导致训练速度有所下降。
# 使用SyncBatchNorm后训练会更耗时
model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model).to(device)
本文的主要对象是torch.nn.parallel.DistributedDataParallel这一类模块,在使用该方法后能够将普通模块转换成DDP模式,在反向传播过程中自动计算并平均各块GPU设备上的参数梯度,并且这种机制能够显著提升训练效率
# 转为DDP模型
model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.gpu])
具体来说,在使用这种方法时,默认输入必须是一个张量。需要注意的是,在使用这种方法时,默认输入必须是一个张量。需要注意的是,在使用这种方法时,默认输入必须是一个张量。
def reduce_value(value, average=True):
world_size = get_world_size()
if world_size < 2: # 单GPU的情况
return value
with torch.no_grad():
dist.all_reduce(value)
if average:
value /= world_size
return value
All-Reduce操作如下如所示:

最后呈现全部代码,并做了详细的注释。为了让大家更好地理解和使用它,请复制整个项目以便运行。此外,在该脚本中引入了其他函数如模型和自定义数据集部分。
import os
import math
import tempfile
import argparse
import torch
import torch.optim as optim
import torch.optim.lr_scheduler as lr_scheduler
from torch.utils.tensorboard import SummaryWriter
from torchvision import transforms
from model import resnet34
from my_dataset import MyDataSet
from utils import read_split_data, plot_data_loader_image
from multi_train_utils.distributed_utils import init_distributed_mode, dist, cleanup
from multi_train_utils.train_eval_utils import train_one_epoch, evaluate
def main(args):
if torch.cuda.is_available() is False:
raise EnvironmentError("not find GPU device for training.")
# 初始化各进程环境
init_distributed_mode(args=args)
rank = args.rank
device = torch.device(args.device)
batch_size = args.batch_size
num_classes = args.num_classes
weights_path = args.weights
args.lr *= args.world_size # 学习率要根据并行GPU的数量进行倍增
if rank == 0: # 在第一个进程中打印信息,并实例化tensorboard
print(args)
print('Start Tensorboard with "tensorboard --logdir=runs", view at http://localhost:6006/')
tb_writer = SummaryWriter()
if os.path.exists("./weights") is False:
os.makedirs("./weights")
train_images_path, train_images_label, val_images_path, val_images_label = read_split_data(args.data_path)
data_transform = {
"train": transforms.Compose([transforms.RandomResizedCrop(224),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])]),
"val": transforms.Compose([transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])])}
# 实例化训练数据集
train_data_set = MyDataSet(images_path=train_images_path,
images_class=train_images_label,
transform=data_transform["train"])
# 实例化验证数据集
val_data_set = MyDataSet(images_path=val_images_path,
images_class=val_images_label,
transform=data_transform["val"])
# 给每个rank对应的进程分配训练的样本索引
train_sampler = torch.utils.data.distributed.DistributedSampler(train_data_set)
val_sampler = torch.utils.data.distributed.DistributedSampler(val_data_set)
# 将样本索引每batch_size个元素组成一个list
train_batch_sampler = torch.utils.data.BatchSampler(
train_sampler, batch_size, drop_last=True)
nw = min([os.cpu_count(), batch_size if batch_size > 1 else 0, 8]) # number of workers
if rank == 0:
print('Using {} dataloader workers every process'.format(nw))
train_loader = torch.utils.data.DataLoader(train_data_set,
batch_sampler=train_batch_sampler,
pin_memory=True,
num_workers=nw,
collate_fn=train_data_set.collate_fn)
val_loader = torch.utils.data.DataLoader(val_data_set,
batch_size=batch_size,
sampler=val_sampler,
pin_memory=True,
num_workers=nw,
collate_fn=val_data_set.collate_fn)
# 实例化模型
model = resnet34(num_classes=num_classes).to(device)
# 如果存在预训练权重则载入
if os.path.exists(weights_path):
weights_dict = torch.load(weights_path, map_location=device)
load_weights_dict = {k: v for k, v in weights_dict.items()
if model.state_dict()[k].numel() == v.numel()}
model.load_state_dict(load_weights_dict, strict=False)
else:
checkpoint_path = os.path.join(tempfile.gettempdir(), "initial_weights.pt")
# 如果不存在预训练权重,需要将第一个进程中的权重保存,然后其他进程载入,保持初始化权重一致
if rank == 0:
torch.save(model.state_dict(), checkpoint_path)
dist.barrier()
# 这里注意,一定要指定map_location参数,否则会导致第一块GPU占用更多资源
model.load_state_dict(torch.load(checkpoint_path, map_location=device))
# 是否冻结权重
if args.freeze_layers:
for name, para in model.named_parameters():
# 除最后的全连接层外,其他权重全部冻结
if "fc" not in name:
para.requires_grad_(False)
else:
# 只有训练带有BN结构的网络时使用SyncBatchNorm采用意义
if args.syncBN:
# 使用SyncBatchNorm后训练会更耗时
model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model).to(device)
# 转为DDP模型
model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.gpu])
# optimizer
pg = [p for p in model.parameters() if p.requires_grad]
optimizer = optim.SGD(pg, lr=args.lr, momentum=0.9, weight_decay=0.005)
# Scheduler https://arxiv.org/pdf/1812.01187.pdf
lf = lambda x: ((1 + math.cos(x * math.pi / args.epochs)) / 2) * (1 - args.lrf) + args.lrf # cosine
scheduler = lr_scheduler.LambdaLR(optimizer, lr_lambda=lf)
for epoch in range(args.epochs):
train_sampler.set_epoch(epoch)
mean_loss = train_one_epoch(model=model,
optimizer=optimizer,
data_loader=train_loader,
device=device,
epoch=epoch)
scheduler.step()
sum_num = evaluate(model=model,
data_loader=val_loader,
device=device)
acc = sum_num / val_sampler.total_size
if rank == 0:
print("[epoch {}] accuracy: {}".format(epoch, round(acc, 3)))
tags = ["loss", "accuracy", "learning_rate"]
tb_writer.add_scalar(tags[0], mean_loss, epoch)
tb_writer.add_scalar(tags[1], acc, epoch)
tb_writer.add_scalar(tags[2], optimizer.param_groups[0]["lr"], epoch)
torch.save(model.state_dict(), "./weights/model-{}.pth".format(epoch))
# 删除临时缓存文件
if rank == 0:
if os.path.exists(checkpoint_path) is True:
os.remove(checkpoint_path)
cleanup()
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--num_classes', type=int, default=5)
parser.add_argument('--epochs', type=int, default=30)
parser.add_argument('--batch-size', type=int, default=16)
parser.add_argument('--lr', type=float, default=0.001)
parser.add_argument('--lrf', type=float, default=0.1)
# 是否启用SyncBatchNorm
parser.add_argument('--syncBN', type=bool, default=True)
# 数据集所在根目录
# http://download.tensorflow.org/example_images/flower_photos.tgz
parser.add_argument('--data-path', type=str, default="/home/wz/data_set/flower_data/flower_photos")
# resnet34 官方权重下载地址
# https://download.pytorch.org/models/resnet34-333f7ec4.pth
parser.add_argument('--weights', type=str, default='resNet34.pth',
help='initial weights path')
parser.add_argument('--freeze-layers', type=bool, default=False)
# 不要改该参数,系统会自动分配
parser.add_argument('--device', default='cuda', help='device id (i.e. 0 or 0,1 or cpu)')
# 开启的进程数(注意不是线程),不用设置该参数,会根据nproc_per_node自动设置
parser.add_argument('--world-size', default=4, type=int,
help='number of distributed processes')
parser.add_argument('--dist-url', default='env://', help='url used to set up distributed training')
opt = parser.parse_args()
main(opt)
为了使用该脚本必须在终端执行特定命令。具体启动指令如下:
$ torch.distributed.launch -f distributed TorchScript model.pt
python -m torch.distributed.launch --nproc_per_node=8 --use_env train_multi_gpu_using_launch.py
在单机多卡情况下,在nproc_per_node参数下表示为若干数量的 GPU 设备,在每台 GPU 上运行相应数量的进程时会启动相应数量的进程。当希望指定具体使用的哪几台 GPU 时,请采用以下指令:例如,在命令行中输入:设置第1号和第4号 GPU用于训练。
CUDA_VISIBLE_DEVICES=0,3 python -m torch.distributed.launch --nproc_per_node=2 --use_env train_multi_gpu_using_launch.py
