昇腾AI实践篇---基于torh_npu的多级多卡训练
### 构建模型脚本
```plain
# 导入依赖和库
import torch
from torch import nn
import torch_npu # 导入torch_npu包
import torch.distributed as dist
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor
import time
import torch.multiprocessing as mp
import os
torch.manual_seed(0)
# 下载训练数据集
training_data = datasets.FashionMNIST(
root="./data",
train=True,
download=True,
transform=ToTensor(),
)
# 下载测试数据集
test_data = datasets.FashionMNIST(
root="./data",
train=False,
download=True,
transform=ToTensor(),
)
# 构建模型
class NeuralNetwork(nn.Module):
def __init__(self):
super().__init__()
self.flatten = nn.Flatten()
self.linear_relu_stack = nn.Sequential(
nn.Linear(28*28, 512),
nn.ReLU(),
nn.Linear(512, 512),
nn.ReLU(),
nn.Linear(512, 10)
)
def forward(self, x):
x = self.flatten(x)
logits = self.linear_relu_stack(x)
return logits
def test(dataloader, model, loss_fn):
size = len(dataloader.dataset)
num_batches = len(dataloader)
model.eval()
test_loss, correct = 0, 0
with torch.no_grad():
for X, y in dataloader:
X, y = X.to(device), y.to(device)
pred = model(X)
test_loss += loss_fn(pred, y).item()
correct += (pred.argmax(1) == y).type(torch.float).sum().item()
test_loss /= num_batches
correct /= size
print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")
```
### 获取分布式超参数
在模型脚本中,构建main,在其中获取分布式训练所需的超参数
```plain
def main(world_size: int, batch_size, args): # 使用python拉起命令中设置的超参数
ngpus_per_node = world_size
args.gpu = args.local_rank # 任务拉起后,lacal_rank自动获取的device号
main_worker(args.gpu, ngpus_per_node, args)
```
### 设置地址和端口号
在模型脚本中设置地址与端口号,用于拉起分布式训练。由于昇腾AI处理器初始化进程组时initmethod只支持env:// (即环境变量初始化方式),所以在初始化前需要根据实际情况配置MASTER_ADDR、MASTER_PORT等参数。
```plain
def ddp_setup(rank, world_size):
"""
Args:
rank: Unique identifier of each process
world_size: Total number of processes
"""
dist.init_process_group(backend="hccl", rank=rank, world_size=world_size)
```
### 添加分布式逻辑
Python方式:任务拉起后,local_rank自动获得device号。
```plain
def main_worker(gpu, ngpus_per_node, args):
start_epoch = 0
end_epoch = 5
args.gpu = args.local_rank # 任务拉起后,local_rank自动获得device号
ddp_setup(args.gpu, args.world_size)
torch_npu.npu.set_device(args.gpu)
total_batch_size = args.batch_size
total_workers = ngpus_per_node
batch_size = int(total_batch_size / ngpus_per_node)
workers = int((total_workers + ngpus_per_node - 1) / ngpus_per_node)
model = NeuralNetwork()
device = torch.device("npu")
train_sampler = torch.utils.data.distributed.DistributedSampler(training_data)
test_sampler = torch.utils.data.distributed.DistributedSampler(test_data)
train_loader = torch.utils.data.DataLoader(
training_data, batch_size=batch_size, shuffle=(train_sampler is None),
num_workers=workers, pin_memory=False, sampler=train_sampler, drop_last=True)
val_loader = torch.utils.data.DataLoader(
test_data, batch_size=batch_size, shuffle=(test_sampler is None),
num_workers=workers, pin_memory=False, sampler=test_sampler, drop_last=True)
loc = 'npu:{}'.format(args.gpu)
model = model.to(loc)
criterion = nn.CrossEntropyLoss().to(loc)
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.gpu])
for epoch in range(start_epoch, end_epoch):
print("curr epoch: ", epoch)
train_sampler.set_epoch(epoch)
train(train_loader, model, criterion, optimizer, epoch, args.gpu)
def train(train_loader, model, criterion, optimizer, epoch, gpu):
size = len(train_loader.dataset)
model.train()
end = time.time()
for i, (images, target) in enumerate(train_loader):
# measure data loading time
loc = 'npu:{}'.format(gpu)
target = target.to(torch.int32)
images, target = images.to(loc, non_blocking=False), target.to(loc, non_blocking=False)
# compute output
output = model(images)
loss = criterion(output, target)
# compute gradient and do SGD step
optimizer.zero_grad()
loss.backward()
optimizer.step()
end = time.time()
if i % 100 == 0:
loss, current = loss.item(), i * len(target)
print(f"loss: {loss:>7f} [{current:>5d}/{size:>5d}]")
```
### 配置传参逻辑
```plain
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='simple distributed training job')
parser.add_argument('--batch_size', default=512, type=int, help='Input batch size on each device (default: 32)')
parser.add_argument('--gpu', default=None, type=int,
help='GPU id to use.')
parser.add_argument("--local_rank", default=-1, type=int)
args = parser.parse_args()
args.local_rank = int(os.environ['LOCAL_RANK']) # 获取环境中的local_rank变量作为指定的device
world_size = torch.npu.device_count()
args.world_size = world_size
main(args.world_size, args.batch_size, args) # 将python拉起命令中设置参数传入main函数
```
### 拉起单机两块卡训练
```plain
export HCCL_WHITELIST_DISABLE=1
python3 -m torch.distributed.launch --nproc_per_node 2 ddp_test.py
```
### 手工迁移的核心要点
1、导入NPU相关库
```plain
import torch
import torch_npu
```
2、指定NPU作为训练设备。指定训练设备需修改模型训练脚本,有两种指定方式:
to(device)方式:定义好device后可通过_xx_.to(device)的方式将模型或数据集等加载到GPU或NPU上,如model.to(device)。该方式可以指定需要的训练资源,使用比较灵活。
迁移前:
```plain
device = torch.device("cuda:{}")
```
迁移后:
```plain
device = torch.device("npu:{}")
```
set_device方式:调用set_device接口,指定训练设备。需注意该方式不会自动使用NPU,用户需要手动在想使用NPU的地方,添加***xx*********.npu()****代码****,****将模型数据集等加载到NPU上,如****model.npu()**。
迁移前:
```plain
torch.cuda.set_device(args.gpu)
```
迁移后:
```plain
torch_npu.npu.set_device(args.gpu)
```
3、替换CUDA接口:将训练脚本中的CUDA接口替换为NPU接口,例如模型、损失函数、数据集等迁移到NPU上。替换接口请参见[常见PyTorch迁移替换接口](https://www.hiascend.com/document/detail/zh/Pytorch/60RC2/ptmoddevg/trainingmigrguide/PT_LMTMOG_0076.html)。
CUDA接口替换为NPU接口。
迁移前:
```plain
torch.cuda.is_available()
```
迁移后:
```plain
torch_npu.npu.is_available()
```
模型迁移。
迁移前:
```plain
model.cuda(args.gpu)
```
迁移后:
```plain
model.npu(args.gpu)
```
数据集迁移。
迁移前:
```plain
images = images.cuda(args.gpu, non_blocking=True)
target = target.cuda(args.gpu, non_blocking=True)
```
迁移后:
```plain
images = images.npu(args.gpu, non_blocking=True)
target = target.npu(args.gpu, non_blocking=True)
```
### 多卡迁移
修改前,gpu使用nccl方式
```plain
dist.init_process_group(backend='nccl',init_method = "tcp://127.0.0.1:**", ...... ,rank = args.rank) # **为端口号,根据实际选择一个闲置端口填写
```
修改后,npu使用hccl方式
```plain
dist.init_process_group(backend='hccl',init_method = "tcp://127.0.0.1:**", ...... ,rank = args.rank) # **为端口号,根据实际选择一个闲置端口填写
```
### 多卡训练结果

```plain
epoch: 0 loss: 2.309577 [ 0/60000]
epoch: 0 loss: 2.303763 [ 0/60000]
tensor([-0.0354, 0.0182, -0.0217, -0.0118, 0.0049, -0.0105, -0.0190, 0.0270,
-0.0349, 0.0104], device='npu:0', grad_fn=<SliceBackward0>)
tensor([-0.0354, 0.0182, -0.0217, -0.0118, 0.0049, -0.0105, -0.0190, 0.0270,
-0.0349, 0.0104], device='npu:1', grad_fn=<SliceBackward0>)
epoch: 0 loss: 1.928245 [25600/60000]
epoch: 0 loss: 1.948958 [25600/60000]
tensor([-0.0354, 0.0182, -0.0217, -0.0118, 0.0049, -0.0105, -0.0190, 0.0271,
-0.0348, 0.0103], device='npu:0', grad_fn=<SliceBackward0>)
tensor([-0.0354, 0.0182, -0.0217, -0.0118, 0.0049, -0.0105, -0.0190, 0.0271,
-0.0348, 0.0103], device='npu:1', grad_fn=<SliceBackward0>)
epoch: 1 loss: 2.338537 [ 0/60000]
epoch: 1 loss: 2.308641 [ 0/60000]
tensor([-0.0354, 0.0182, -0.0217, -0.0118, 0.0049, -0.0105, -0.0190, 0.0271,
-0.0348, 0.0103], device='npu:1', grad_fn=<SliceBackward0>)
tensor([-0.0354, 0.0182, -0.0217, -0.0118, 0.0049, -0.0105, -0.0190, 0.0271,
-0.0348, 0.0103], device='npu:0', grad_fn=<SliceBackward0>)
epoch: 1 loss: 1.964545 [25600/60000]
epoch: 1 loss: 1.969860 [25600/60000]
tensor([-0.0354, 0.0182, -0.0217, -0.0118, 0.0049, -0.0105, -0.0190, 0.0271,
-0.0348, 0.0103], device='npu:0', grad_fn=<SliceBackward0>)
tensor([-0.0354, 0.0182, -0.0217, -0.0118, 0.0049, -0.0105, -0.0190, 0.0271,
-0.0348, 0.0103], device='npu:1', grad_fn=<SliceBackward0>)
epoch: 2 loss: 1.717261 [ 0/60000]
epoch: 2 loss: 1.670934 [ 0/60000]
tensor([-0.0354, 0.0182, -0.0217, -0.0118, 0.0049, -0.0105, -0.0190, 0.0271,
-0.0348, 0.0103], device='npu:0', grad_fn=<SliceBackward0>)
tensor([-0.0354, 0.0182, -0.0217, -0.0118, 0.0049, -0.0105, -0.0190, 0.0271,
-0.0348, 0.0103], device='npu:1', grad_fn=<SliceBackward0>)
epoch: 2 loss: 1.789608 [25600/60000]
epoch: 2 loss: 1.736871 [25600/60000]
tensor([-0.0354, 0.0181, -0.0219, -0.0122, 0.0043, -0.0119, -0.0206, 0.0244,
-0.0407, -0.0009], device='npu:0', grad_fn=<SliceBackward0>)
tensor([-0.0354, 0.0181, -0.0219, -0.0122, 0.0043, -0.0119, -0.0206, 0.0244,
-0.0407, -0.0009], device='npu:1', grad_fn=<SliceBackward0>)
epoch: 3 loss: 1.579726 [ 0/60000]
epoch: 3 loss: 1.639683 [ 0/60000]
tensor([-0.0354, 0.0181, -0.0219, -0.0122, 0.0043, -0.0119, -0.0206, 0.0244,
-0.0407, -0.0009], device='npu:0', grad_fn=<SliceBackward0>)
tensor([-0.0354, 0.0181, -0.0219, -0.0122, 0.0043, -0.0119, -0.0206, 0.0244,
-0.0407, -0.0009], device='npu:1', grad_fn=<SliceBackward0>)
epoch: 3 loss: 1.569699 [25600/60000]
epoch: 3 loss: 1.583106 [25600/60000]
tensor([-0.0354, 0.0181, -0.0219, -0.0122, 0.0043, -0.0119, -0.0206, 0.0244,
-0.0407, -0.0009], device='npu:0', grad_fn=<SliceBackward0>)
tensor([-0.0354, 0.0181, -0.0219, -0.0122, 0.0043, -0.0119, -0.0206, 0.0244,
-0.0407, -0.0009], device='npu:1', grad_fn=<SliceBackward0>)
epoch: 4 loss: 1.692256 [ 0/60000]
epoch: 4 loss: 1.599752 [ 0/60000]
tensor([-0.0354, 0.0181, -0.0219, -0.0122, 0.0043, -0.0119, -0.0206, 0.0244,
-0.0407, -0.0009], device='npu:0', grad_fn=<SliceBackward0>)
tensor([-0.0354, 0.0181, -0.0219, -0.0122, 0.0043, -0.0119, -0.0206, 0.0244,
-0.0407, -0.0009], device='npu:1', grad_fn=<SliceBackward0>)
```
在DDP中,每个GPU上都复制了完整的模型。当数据被分配到各个GPU上时,每个模型实例都会独立地对数据进行前向传播,并计算损失。然后,每个GPU上的模型实例都会独立地进行反向传播,计算梯度。这些梯度随后会被聚合(通常使用平均),并用于更新模型的参数。
### 错误集
```plain
1、error: unrecognized arguments: --local-rank=0
原因是使用了 torch.distributed.launch 需要加入 --user-env
解决方法:python3 -m torch.distributed.launch --nproc_per_node 2 --use-env ddp_test.py
2、RuntimeError: Invalid device string: 'npu:-1'
原因:If your script expects `--local-rank` argument to be set, please change it to read from `os.environ['LOCAL_RANK']` instead.
解决方法:在传参逻辑中添加args.local_rank = int(os.environ['LOCAL_RANK'])
```
鲲鹏昇腾开发者社区是面向全社会开放的“联接全球计算开发者,聚合华为+生态”的社区,内容涵盖鲲鹏、昇腾资源,帮助开发者快速获取所需的知识、经验、软件、工具、算力,支撑开发者易学、好用、成功,成为核心开发者。
更多推荐



所有评论(0)