### 构建模型脚本

```plain

# 导入依赖和库

import torch

from torch import nn

import torch_npu # 导入torch_npu包

import torch.distributed as dist

from torch.utils.data import DataLoader

from torchvision import datasets

from torchvision.transforms import ToTensor

import time

import torch.multiprocessing as mp

import os

torch.manual_seed(0)

# 下载训练数据集

training_data = datasets.FashionMNIST(

    root="./data",

    train=True,

    download=True,

    transform=ToTensor(),

)

# 下载测试数据集

test_data = datasets.FashionMNIST(

    root="./data",

    train=False,

    download=True,

    transform=ToTensor(),

)

# 构建模型

class NeuralNetwork(nn.Module):

    def __init__(self):

        super().__init__()

        self.flatten = nn.Flatten()

        self.linear_relu_stack = nn.Sequential(

            nn.Linear(28*28, 512),

            nn.ReLU(),

            nn.Linear(512, 512),

            nn.ReLU(),

            nn.Linear(512, 10)

        )

    def forward(self, x):

        x = self.flatten(x)

        logits = self.linear_relu_stack(x)

        return logits

def test(dataloader, model, loss_fn):

    size = len(dataloader.dataset)

    num_batches = len(dataloader)

    model.eval()

    test_loss, correct = 0, 0

    with torch.no_grad():

        for X, y in dataloader:

            X, y = X.to(device), y.to(device)

            pred = model(X)

            test_loss += loss_fn(pred, y).item()

            correct += (pred.argmax(1) == y).type(torch.float).sum().item()

    test_loss /= num_batches

    correct /= size

    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

```

### 获取分布式超参数

在模型脚本中,构建main,在其中获取分布式训练所需的超参数

```plain

def main(world_size: int,  batch_size, args):   # 使用python拉起命令中设置的超参数

    ngpus_per_node = world_size

    args.gpu = args.local_rank    # 任务拉起后,lacal_rank自动获取的device号

    main_worker(args.gpu, ngpus_per_node, args)

```

### 设置地址和端口号

在模型脚本中设置地址与端口号,用于拉起分布式训练。由于昇腾AI处理器初始化进程组时initmethod只支持env:// (即环境变量初始化方式),所以在初始化前需要根据实际情况配置MASTER_ADDR、MASTER_PORT等参数。

```plain

def ddp_setup(rank, world_size):

    """

    Args:

        rank: Unique identifier of each process

        world_size: Total number of processes

    """

    dist.init_process_group(backend="hccl", rank=rank, world_size=world_size)

```

### 添加分布式逻辑

Python方式:任务拉起后,local_rank自动获得device号。

```plain

def main_worker(gpu, ngpus_per_node, args):

    start_epoch = 0

    end_epoch = 5

    args.gpu = args.local_rank    # 任务拉起后,local_rank自动获得device号

    ddp_setup(args.gpu, args.world_size)

    torch_npu.npu.set_device(args.gpu)

    total_batch_size = args.batch_size

    total_workers = ngpus_per_node

    batch_size = int(total_batch_size / ngpus_per_node)    

    workers = int((total_workers + ngpus_per_node - 1) / ngpus_per_node)

    model = NeuralNetwork()

    device = torch.device("npu")

    train_sampler = torch.utils.data.distributed.DistributedSampler(training_data)

    test_sampler = torch.utils.data.distributed.DistributedSampler(test_data)

    train_loader = torch.utils.data.DataLoader(

        training_data, batch_size=batch_size, shuffle=(train_sampler is None),

        num_workers=workers, pin_memory=False, sampler=train_sampler, drop_last=True)

    val_loader = torch.utils.data.DataLoader(

        test_data, batch_size=batch_size, shuffle=(test_sampler is None),

        num_workers=workers, pin_memory=False, sampler=test_sampler, drop_last=True)

    loc = 'npu:{}'.format(args.gpu)

    model = model.to(loc)

    criterion = nn.CrossEntropyLoss().to(loc)

    optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)

    model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.gpu])

    for epoch in range(start_epoch, end_epoch):

        print("curr epoch: ", epoch)

        train_sampler.set_epoch(epoch)

        train(train_loader, model, criterion, optimizer, epoch, args.gpu)


 

def train(train_loader, model, criterion, optimizer, epoch, gpu):

    size = len(train_loader.dataset)

    model.train()

    end = time.time()

    for i, (images, target) in enumerate(train_loader):

        # measure data loading time

        loc = 'npu:{}'.format(gpu)

        target = target.to(torch.int32)        

        images, target = images.to(loc, non_blocking=False), target.to(loc, non_blocking=False)

        # compute output

        output = model(images)

        loss = criterion(output, target)


 

        # compute gradient and do SGD step

        optimizer.zero_grad()

        loss.backward()

        optimizer.step()

        end = time.time()

        if i % 100 == 0:

            loss, current = loss.item(), i * len(target)

            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

```

### 配置传参逻辑

```plain

if __name__ == "__main__":

    import argparse

    parser = argparse.ArgumentParser(description='simple distributed training job')

    parser.add_argument('--batch_size', default=512, type=int, help='Input batch size on each device (default: 32)')

    parser.add_argument('--gpu', default=None, type=int,

                    help='GPU id to use.')

    parser.add_argument("--local_rank", default=-1, type=int)  

    args = parser.parse_args()

    args.local_rank = int(os.environ['LOCAL_RANK'])   # 获取环境中的local_rank变量作为指定的device

    world_size = torch.npu.device_count()

    args.world_size = world_size

    main(args.world_size, args.batch_size, args)    # 将python拉起命令中设置参数传入main函数

```

### 拉起单机两块卡训练

```plain

export HCCL_WHITELIST_DISABLE=1

python3 -m torch.distributed.launch --nproc_per_node 2 ddp_test.py

```

### 手工迁移的核心要点

1、导入NPU相关库

```plain

import torch

import torch_npu

```

2、指定NPU作为训练设备。指定训练设备需修改模型训练脚本,有两种指定方式:

to(device)方式:定义好device后可通过_xx_.to(device)的方式将模型或数据集等加载到GPU或NPU上,如model.to(device)。该方式可以指定需要的训练资源,使用比较灵活。

迁移前:

```plain

    device = torch.device("cuda:{}")

```

迁移后:

```plain

    device = torch.device("npu:{}")

```

set_device方式:调用set_device接口,指定训练设备。需注意该方式不会自动使用NPU,用户需要手动在想使用NPU的地方,添加***xx*********.npu()****代码****,****将模型数据集等加载到NPU上,如****model.npu()**。

迁移前:

```plain

torch.cuda.set_device(args.gpu)

```

迁移后:

```plain

torch_npu.npu.set_device(args.gpu)

```

3、替换CUDA接口:将训练脚本中的CUDA接口替换为NPU接口,例如模型、损失函数、数据集等迁移到NPU上。替换接口请参见[常见PyTorch迁移替换接口](https://www.hiascend.com/document/detail/zh/Pytorch/60RC2/ptmoddevg/trainingmigrguide/PT_LMTMOG_0076.html)。

CUDA接口替换为NPU接口。

迁移前:

```plain

torch.cuda.is_available()

```

迁移后:

```plain

torch_npu.npu.is_available()

```

模型迁移。

迁移前:

```plain

model.cuda(args.gpu)

```

迁移后:

```plain

model.npu(args.gpu)

```

数据集迁移。

迁移前:

```plain

images = images.cuda(args.gpu, non_blocking=True)

target = target.cuda(args.gpu, non_blocking=True)

```

迁移后:

```plain

images = images.npu(args.gpu, non_blocking=True)

target = target.npu(args.gpu, non_blocking=True)

```

### 多卡迁移

修改前,gpu使用nccl方式

```plain

dist.init_process_group(backend='nccl',init_method = "tcp://127.0.0.1:**", ...... ,rank = args.rank)    # **为端口号,根据实际选择一个闲置端口填写

```

修改后,npu使用hccl方式

```plain

dist.init_process_group(backend='hccl',init_method = "tcp://127.0.0.1:**", ...... ,rank = args.rank)    # **为端口号,根据实际选择一个闲置端口填写

```

### 多卡训练结果

![](https://cdn.nlark.com/yuque/0/2024/png/47561744/1724225804222-8d3ec9a4-0145-4307-8e2d-74752bf78352.png)

```plain

epoch: 0 loss: 2.309577  [    0/60000]

epoch: 0 loss: 2.303763  [    0/60000]

tensor([-0.0354,  0.0182, -0.0217, -0.0118,  0.0049, -0.0105, -0.0190,  0.0270,

        -0.0349,  0.0104], device='npu:0', grad_fn=<SliceBackward0>)

tensor([-0.0354,  0.0182, -0.0217, -0.0118,  0.0049, -0.0105, -0.0190,  0.0270,

        -0.0349,  0.0104], device='npu:1', grad_fn=<SliceBackward0>)

epoch: 0 loss: 1.928245  [25600/60000]

epoch: 0 loss: 1.948958  [25600/60000]

tensor([-0.0354,  0.0182, -0.0217, -0.0118,  0.0049, -0.0105, -0.0190,  0.0271,

        -0.0348,  0.0103], device='npu:0', grad_fn=<SliceBackward0>)

tensor([-0.0354,  0.0182, -0.0217, -0.0118,  0.0049, -0.0105, -0.0190,  0.0271,

        -0.0348,  0.0103], device='npu:1', grad_fn=<SliceBackward0>)

epoch: 1 loss: 2.338537  [    0/60000]

epoch: 1 loss: 2.308641  [    0/60000]

tensor([-0.0354,  0.0182, -0.0217, -0.0118,  0.0049, -0.0105, -0.0190,  0.0271,

        -0.0348,  0.0103], device='npu:1', grad_fn=<SliceBackward0>)

tensor([-0.0354,  0.0182, -0.0217, -0.0118,  0.0049, -0.0105, -0.0190,  0.0271,

        -0.0348,  0.0103], device='npu:0', grad_fn=<SliceBackward0>)

epoch: 1 loss: 1.964545  [25600/60000]

epoch: 1 loss: 1.969860  [25600/60000]

tensor([-0.0354,  0.0182, -0.0217, -0.0118,  0.0049, -0.0105, -0.0190,  0.0271,

        -0.0348,  0.0103], device='npu:0', grad_fn=<SliceBackward0>)

tensor([-0.0354,  0.0182, -0.0217, -0.0118,  0.0049, -0.0105, -0.0190,  0.0271,

        -0.0348,  0.0103], device='npu:1', grad_fn=<SliceBackward0>)

epoch: 2 loss: 1.717261  [    0/60000]

epoch: 2 loss: 1.670934  [    0/60000]

tensor([-0.0354,  0.0182, -0.0217, -0.0118,  0.0049, -0.0105, -0.0190,  0.0271,

        -0.0348,  0.0103], device='npu:0', grad_fn=<SliceBackward0>)

tensor([-0.0354,  0.0182, -0.0217, -0.0118,  0.0049, -0.0105, -0.0190,  0.0271,

        -0.0348,  0.0103], device='npu:1', grad_fn=<SliceBackward0>)

epoch: 2 loss: 1.789608  [25600/60000]

epoch: 2 loss: 1.736871  [25600/60000]

tensor([-0.0354,  0.0181, -0.0219, -0.0122,  0.0043, -0.0119, -0.0206,  0.0244,

        -0.0407, -0.0009], device='npu:0', grad_fn=<SliceBackward0>)

tensor([-0.0354,  0.0181, -0.0219, -0.0122,  0.0043, -0.0119, -0.0206,  0.0244,

        -0.0407, -0.0009], device='npu:1', grad_fn=<SliceBackward0>)

epoch: 3 loss: 1.579726  [    0/60000]

epoch: 3 loss: 1.639683  [    0/60000]

tensor([-0.0354,  0.0181, -0.0219, -0.0122,  0.0043, -0.0119, -0.0206,  0.0244,

        -0.0407, -0.0009], device='npu:0', grad_fn=<SliceBackward0>)

tensor([-0.0354,  0.0181, -0.0219, -0.0122,  0.0043, -0.0119, -0.0206,  0.0244,

        -0.0407, -0.0009], device='npu:1', grad_fn=<SliceBackward0>)

epoch: 3 loss: 1.569699  [25600/60000]

epoch: 3 loss: 1.583106  [25600/60000]

tensor([-0.0354,  0.0181, -0.0219, -0.0122,  0.0043, -0.0119, -0.0206,  0.0244,

        -0.0407, -0.0009], device='npu:0', grad_fn=<SliceBackward0>)

tensor([-0.0354,  0.0181, -0.0219, -0.0122,  0.0043, -0.0119, -0.0206,  0.0244,

        -0.0407, -0.0009], device='npu:1', grad_fn=<SliceBackward0>)

epoch: 4 loss: 1.692256  [    0/60000]

epoch: 4 loss: 1.599752  [    0/60000]

tensor([-0.0354,  0.0181, -0.0219, -0.0122,  0.0043, -0.0119, -0.0206,  0.0244,

        -0.0407, -0.0009], device='npu:0', grad_fn=<SliceBackward0>)

tensor([-0.0354,  0.0181, -0.0219, -0.0122,  0.0043, -0.0119, -0.0206,  0.0244,

        -0.0407, -0.0009], device='npu:1', grad_fn=<SliceBackward0>)

```

在DDP中,每个GPU上都复制了完整的模型。当数据被分配到各个GPU上时,每个模型实例都会独立地对数据进行前向传播,并计算损失。然后,每个GPU上的模型实例都会独立地进行反向传播,计算梯度。这些梯度随后会被聚合(通常使用平均),并用于更新模型的参数。

### 错误集

```plain

1、error: unrecognized arguments: --local-rank=0

原因是使用了 torch.distributed.launch  需要加入 --user-env

解决方法:python3 -m torch.distributed.launch --nproc_per_node 2 --use-env ddp_test.py

2、RuntimeError: Invalid device string: 'npu:-1'

原因:If your script expects `--local-rank` argument to be set, please change it to read from `os.environ['LOCAL_RANK']` instead.

解决方法:在传参逻辑中添加args.local_rank = int(os.environ['LOCAL_RANK'])

```


 

Logo

鲲鹏昇腾开发者社区是面向全社会开放的“联接全球计算开发者,聚合华为+生态”的社区,内容涵盖鲲鹏、昇腾资源,帮助开发者快速获取所需的知识、经验、软件、工具、算力,支撑开发者易学、好用、成功,成为核心开发者。

更多推荐