之前有兄弟想学昇腾开发,问我:“哥,有没有现成的示例代码可以参考?不想从零开始写。”

好问题。今天一次说清楚。

samples 是啥?

samples 是昇腾的官方示例代码库。涵盖算子开发、模型推理、应用开发等各个方面。

一句话说清楚:samples 是昇腾的官方示例代码库,从 Hello World 到完整应用都有,学昇腾开发必备。

你说气人不气人,之前看文档看得云里雾里,跑个 sample 一看就懂了。

为什么需要 samples?

三种情况:

1. 学习昇腾开发
从示例代码入手,比看文档快得多。

2. 验证环境
跑通 sample 说明环境没问题。

3. 代码模板
复制 sample 代码改改就能用。

samples 目录结构

samples/
├── operator/           # 算子开发示例
│   ├── AscendC/       # Ascend C 算子示例
│   ├── KernelLaunch/  # 核函数启动示例
│   └── OPPython/      # Python 算子示例
├── inference/          # 推理示例
│   ├── nptensorflow/  # TensorFlow 推理
│   └── pytorch/       # PyTorch 推理
├── model/              # 模型示例
│   ├── ResNet/        # ResNet 示例
│   ├── BERT/          # BERT 示例
│   └── YOLO/          # YOLO 示例
├── application/        # 应用示例
│   ├── cv/            # 计算机视觉应用
│   ├── nlp/           # 自然语言处理应用
│   └── audio/         # 音频处理应用
├── framework/          # 框架集成示例
│   ├── tensorflow/    # TensorFlow 集成
│   └── pytorch/       # PyTorch 集成
└── contrib/            # 社区贡献
    ├── ascend910/     # 910 专用示例
    └── ascend310/     # 310 专用示例

算子开发示例

Hello World 示例

最简单的 Ascend C 算子。

// samples/operator/AscendC/HelloWorld/helloworld.cpp
#include "kernel_operator.h"

class HelloWorld {
public:
    __aicore__ inline HelloWorld() {}
    __aicore__ inline void process() {
        // 核心逻辑:打印 Hello World
        printf("Hello World from Ascend C!\n");
    }
};

extern "C" __global__ __aicore__ void helloworld_kernel() {
    HelloWorld op;
    op.process();
}

运行:

cd samples/operator/AscendC/HelloWorld
mkdir build && cd build
cmake ..
make
./helloworld
# 输出:Hello World from Ascend C!

Add 算子示例

两个张量相加。

// samples/operator/AscendC/Add/add_kernel.cpp
#include "kernel_operator.h"

template <typename T>
class AddKernel {
public:
    __aicore__ inline AddKernel(GM_ADDR x, GM_ADDR y, GM_ADDR z, uint32_t totalLength) {
        xGm.SetGlobalBuffer((__gm__ T*)x, totalLength);
        yGm.SetGlobalBuffer((__gm__ T*)y, totalLength);
        zGm.SetGlobalBuffer((__gm__ T*)z, totalLength);
        pipe.InitBuffer(inQueueX, 1, totalLength * sizeof(T));
        pipe.InitBuffer(inQueueY, 1, totalLength * sizeof(T));
        pipe.InitBuffer(outQueueZ, 1, totalLength * sizeof(T));
    }

    __aicore__ inline void Process() {
        CopyIn();
        Compute();
        CopyOut();
    }

private:
    __aicore__ inline void CopyIn() {
        LocalTensor<T> xLocal = inQueueX.AllocTensor<T>();
        LocalTensor<T> yLocal = inQueueY.AllocTensor<T>();
        DataCopy(xLocal, xGm, totalLength);
        DataCopy(yLocal, yGm, totalLength);
        inQueueX.EnQue(xLocal);
        inQueueY.EnQue(yLocal);
    }

    __aicore__ inline void Compute() {
        LocalTensor<T> xLocal = inQueueX.DeQue<T>();
        LocalTensor<T> yLocal = inQueueY.DeQue<T>();
        LocalTensor<T> zLocal = outQueueZ.AllocTensor<T>();
        Add(zLocal, xLocal, yLocal, totalLength);
        outQueueZ.EnQue<T>(zLocal);
        inQueueX.FreeTensor(xLocal);
        inQueueY.FreeTensor(yLocal);
    }

    __aicore__ inline void CopyOut() {
        LocalTensor<T> zLocal = outQueueZ.DeQue<T>();
        DataCopy(zGm, zLocal, totalLength);
        outQueueZ.FreeTensor(zLocal);
    }

    TPipe pipe;
    TQue<QuePosition::VECIN, 1> inQueueX, inQueueY;
    TQue<QuePosition::VECOUT, 1> outQueueZ;
    GlobalTensor<T> xGm, yGm, zGm;
    uint32_t totalLength;
};

extern "C" __global__ __aicore__ void add_kernel(GM_ADDR x, GM_ADDR y, GM_ADDR z, uint32_t totalLength) {
    AddKernel<half> op(x, y, z, totalLength);
    op.Process();
}

这个示例展示了 Ascend C 算子的标准流程:CopyIn → Compute → CopyOut。

MatMul 算子示例

矩阵乘法。

// samples/operator/AscendC/MatMul/matmul_kernel.cpp
#include "kernel_operator.h"
#include "lib/matmul.h"

class MatMulKernel {
public:
    __aicore__ inline MatMulKernel(GM_ADDR a, GM_ADDR b, GM_ADDR c,
                                    uint32_t M, uint32_t N, uint32_t K) {
        aGm.SetGlobalBuffer((__gm__ half*)a, M * K);
        bGm.SetGlobalBuffer((__gm__ half*)b, K * N);
        cGm.SetGlobalBuffer((__gm__ half*)c, M * N);
        this->M = M;
        this->N = N;
        this->K = K;
    }

    __aicore__ inline void Process() {
        // 使用 MatMul API
        matmul::MatMul<half, half, half> mm;
        mm.SetTensorA(aGm);
        mm.SetTensorB(bGm);
        mm.template IterateAll<true>(cGm);
    }

private:
    GlobalTensor<half> aGm, bGm, cGm;
    uint32_t M, N, K;
};

MatMul 示例展示了如何调用算子库 API。

推理示例

ResNet 推理

# samples/inference/pytorch/ResNet/resnet_inference.py
import torch
import torch_npu
from torchvision.models import resnet50

# 加载模型
model = resnet50(pretrained=True)
model = model.npu()
model.eval()

# 准备输入
input_tensor = torch.randn(1, 3, 224, 224).npu()

# 推理
with torch.no_grad():
    output = model(input_tensor)

# 后处理
probabilities = torch.nn.functional.softmax(output[0], dim=0)
top5 = torch.topk(probabilities, 5)

print("Top 5 predictions:")
for idx, (prob, cls) in enumerate(zip(top5.values, top5.indices)):
    print(f"{idx+1}: class {cls.item()}, prob {prob.item():.4f}")

运行:

cd samples/inference/pytorch/ResNet
python resnet_inference.py

BERT 推理

# samples/inference/pytorch/BERT/bert_inference.py
import torch
import torch_npu
from transformers import BertModel, BertTokenizer

# 加载模型
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
model = BertModel.from_pretrained('bert-base-uncased')
model = model.npu()
model.eval()

# 准备输入
text = "Hello, this is a sample text for BERT inference."
inputs = tokenizer(text, return_tensors="pt")
inputs = {k: v.npu() for k, v in inputs.items()}

# 推理
with torch.no_grad():
    outputs = model(**inputs)

# 输出
print(f"Last hidden state shape: {outputs.last_hidden_state.shape}")
print(f"Pooler output shape: {outputs.pooler_output.shape}")

YOLO 推理

# samples/inference/pytorch/YOLO/yolo_inference.py
import torch
import torch_npu
import cv2
import numpy as np

# 加载模型
model = torch.hub.load('ultralytics/yolov5', 'yolov5s')
model = model.npu()
model.eval()

# 加载图像
image = cv2.imread("test.jpg")
image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

# 预处理
input_tensor = torch.from_numpy(image_rgb).permute(2, 0, 1).float() / 255.0
input_tensor = input_tensor.unsqueeze(0).npu()

# 推理
with torch.no_grad():
    results = model(input_tensor)

# 后处理
results.render()  # 绘制检测框
output_image = results.ims[0]
cv2.imwrite("output.jpg", output_image)
print(f"Detected {len(results.xyxy[0])} objects")

模型训练示例

MNIST 训练

# samples/model/MNIST/train.py
import torch
import torch.nn as nn
import torch.optim as optim
import torch_npu
from torchvision import datasets, transforms

# 定义模型
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, 3, 1)
        self.conv2 = nn.Conv2d(32, 64, 3, 1)
        self.fc1 = nn.Linear(9216, 128)
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):
        x = self.conv1(x)
        x = nn.functional.relu(x)
        x = self.conv2(x)
        x = nn.functional.relu(x)
        x = nn.functional.max_pool2d(x, 2)
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = nn.functional.relu(x)
        x = self.fc2(x)
        return nn.functional.log_softmax(x, dim=1)

# 训练
def train(model, device, train_loader, optimizer, epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = nn.functional.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % 100 == 0:
            print(f'Epoch {epoch}, Batch {batch_idx}, Loss: {loss.item():.4f}')

def main():
    device = torch.device("npu")
    model = Net().to(device)
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    
    train_loader = torch.utils.data.DataLoader(
        datasets.MNIST('./data', train=True, download=True,
                      transform=transforms.Compose([
                          transforms.ToTensor(),
                          transforms.Normalize((0.1307,), (0.3081,))
                      ])),
        batch_size=64, shuffle=True)
    
    for epoch in range(1, 11):
        train(model, device, train_loader, optimizer, epoch)
    
    torch.save(model.state_dict(), "mnist_model.pth")

if __name__ == "__main__":
    main()

分布式训练

# samples/model/ResNet/distributed_train.py
import torch
import torch.distributed as dist
import torch_npu
from torchvision.models import resnet50

def main():
    # 初始化分布式
    dist.init_process_group(backend='hccl')
    rank = dist.get_rank()
    world_size = dist.get_world_size()
    
    # 加载模型
    model = resnet50().npu()
    model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[rank])
    
    # 数据加载
    train_loader = get_dataloader(rank, world_size)
    
    # 训练循环
    optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
    
    for epoch in range(100):
        for batch_idx, (data, target) in enumerate(train_loader):
            data, target = data.npu(), target.npu()
            optimizer.zero_grad()
            output = model(data)
            loss = nn.functional.cross_entropy(output, target)
            loss.backward()
            optimizer.step()
            
            if batch_idx % 100 == 0 and rank == 0:
                print(f'Epoch {epoch}, Batch {batch_idx}, Loss: {loss.item():.4f}')
    
    if rank == 0:
        torch.save(model.state_dict(), "resnet50_distributed.pth")

if __name__ == "__main__":
    main()

应用示例

目标检测应用

# samples/application/cv/object_detection/app.py
import torch
import torch_npu
import cv2
import time

class ObjectDetector:
    def __init__(self, model_path):
        self.model = torch.jit.load(model_path).npu()
        self.model.eval()
        self.classes = ["person", "car", "dog", "cat"]
    
    def detect(self, image):
        # 预处理
        input_tensor = self.preprocess(image)
        
        # 推理
        with torch.no_grad():
            boxes, scores, labels = self.model(input_tensor)
        
        # 后处理
        results = self.postprocess(boxes, scores, labels)
        return results
    
    def preprocess(self, image):
        image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        image_resized = cv2.resize(image_rgb, (640, 640))
        input_tensor = torch.from_numpy(image_resized).permute(2, 0, 1).float() / 255.0
        return input_tensor.unsqueeze(0).npu()
    
    def postprocess(self, boxes, scores, labels):
        results = []
        for box, score, label in zip(boxes[0], scores[0], labels[0]):
            if score > 0.5:
                results.append({
                    'box': box.cpu().numpy(),
                    'score': score.item(),
                    'class': self.classes[label.item()]
                })
        return results
    
    def visualize(self, image, results):
        for result in results:
            x1, y1, x2, y2 = result['box'].astype(int)
            cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 2)
            cv2.putText(image, f"{result['class']}: {result['score']:.2f}",
                       (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
        return image

def main():
    detector = ObjectDetector("yolov5.torchscript")
    cap = cv2.VideoCapture(0)
    
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        start = time.time()
        results = detector.detect(frame)
        fps = 1 / (time.time() - start)
        
        frame = detector.visualize(frame, results)
        cv2.putText(frame, f"FPS: {fps:.1f}", (10, 30),
                   cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
        
        cv2.imshow('Detection', frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    
    cap.release()
    cv2.destroyAllWindows()

if __name__ == "__main__":
    main()

文本分类应用

# samples/application/nlp/text_classification/app.py
import torch
import torch_npu
from transformers import BertTokenizer, BertForSequenceClassification

class TextClassifier:
    def __init__(self, model_path):
        self.tokenizer = BertTokenizer.from_pretrained(model_path)
        self.model = BertForSequenceClassification.from_pretrained(model_path)
        self.model = self.model.npu().eval()
        self.labels = ["negative", "positive"]
    
    def predict(self, text):
        # 编码
        inputs = self.tokenizer(text, return_tensors="pt", padding=True, truncation=True)
        inputs = {k: v.npu() for k, v in inputs.items()}
        
        # 推理
        with torch.no_grad():
            outputs = self.model(**inputs)
        
        # 后处理
        probs = torch.nn.functional.softmax(outputs.logits, dim=1)
        label_idx = torch.argmax(probs, dim=1).item()
        
        return {
            'text': text,
            'label': self.labels[label_idx],
            'confidence': probs[0, label_idx].item()
        }

def main():
    classifier = TextClassifier("bert-text-classifier")
    
    texts = [
        "This movie is amazing! I loved every minute of it.",
        "Terrible product, waste of money. Don't buy it.",
        "The service was okay, nothing special."
    ]
    
    for text in texts:
        result = classifier.predict(text)
        print(f"Text: {result['text']}")
        print(f"Label: {result['label']} (confidence: {result['confidence']:.4f})")
        print()

if __name__ == "__main__":
    main()

框架集成示例

TensorFlow 集成

# samples/framework/tensorflow/mnist_tf.py
import tensorflow as tf
import npu_device

# 配置 NPU
npu_device.open().as_default()

# 加载数据
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
x_train, x_test = x_train / 255.0, x_test / 255.0

# 定义模型
model = tf.keras.models.Sequential([
    tf.keras.layers.Flatten(input_shape=(28, 28)),
    tf.keras.layers.Dense(128, activation='relu'),
    tf.keras.layers.Dropout(0.2),
    tf.keras.layers.Dense(10, activation='softmax')
])

# 编译
model.compile(
    optimizer='adam',
    loss='sparse_categorical_crossentropy',
    metrics=['accuracy']
)

# 训练
model.fit(x_train, y_train, epochs=5, validation_split=0.1)

# 评估
test_loss, test_acc = model.evaluate(x_test, y_test)
print(f'Test accuracy: {test_acc:.4f}')

PyTorch 集成

# samples/framework/pytorch/custom_op.py
import torch
import torch_npu

# 注册自定义算子
torch_npu.npu.register_op(
    "custom_add",
    inputs=["x", "y"],
    outputs=["z"],
    attrs=[],
    kernel="custom_add_kernel.so"
)

# 使用自定义算子
x = torch.randn(1024, 1024).npu()
y = torch.randn(1024, 1024).npu()

z = torch_npu.npu.custom_add(x, y)

print(f"Result shape: {z.shape}")

性能优化示例

算子优化

// samples/operator/AscendC/OptimizedAdd/optimized_add.cpp
#include "kernel_operator.h"

template <typename T>
class OptimizedAddKernel {
public:
    __aicore__ inline OptimizedAddKernel(GM_ADDR x, GM_ADDR y, GM_ADDR z, uint32_t totalLength) {
        // 优化 1:使用双缓冲
        pipe.InitBuffer(inQueueX, 2, BLOCK_SIZE * sizeof(T));  // 双缓冲
        pipe.InitBuffer(inQueueY, 2, BLOCK_SIZE * sizeof(T));
        pipe.InitBuffer(outQueueZ, 2, BLOCK_SIZE * sizeof(T));
        
        // 优化 2:向量化计算
        // 优化 3:流水线并行
    }

    __aicore__ inline void Process() {
        int32_t loopCount = totalLength / BLOCK_SIZE;
        
        // 流水线并行:CopyIn、Compute、CopyOut 同时进行
        for (int32_t i = 0; i < loopCount + 2; i++) {
            if (i < loopCount) {
                CopyIn(i);
            }
            if (i > 0 && i < loopCount + 1) {
                Compute(i - 1);
            }
            if (i > 1) {
                CopyOut(i - 2);
            }
        }
    }
};

推理优化

# samples/inference/pytorch/OptimizedInference/optimized_resnet.py
import torch
import torch_npu
from torchvision.models import resnet50
import time

class OptimizedResNet:
    def __init__(self):
        # 优化 1:加载 TorchScript 模型
        self.model = torch.jit.load("resnet50.torchscript").npu()
        self.model.eval()
        
        # 优化 2:FP16 推理
        self.model = self.model.half()
        
        # 优化 3:预热
        self._warmup()
    
    def _warmup(self):
        dummy = torch.randn(1, 3, 224, 224, dtype=torch.float16).npu()
        with torch.no_grad():
            for _ in range(10):
                _ = self.model(dummy)
        torch.npu.synchronize()
    
    def inference(self, images):
        # 优化 4:批量推理
        images = images.half()
        
        with torch.no_grad():
            outputs = self.model(images)
        
        return outputs

def benchmark():
    model = OptimizedResNet()
    batch_sizes = [1, 8, 16, 32, 64]
    
    for batch_size in batch_sizes:
        images = torch.randn(batch_size, 3, 224, 224, dtype=torch.float16).npu()
        
        # 预热
        for _ in range(10):
            _ = model.inference(images)
        torch.npu.synchronize()
        
        # 计时
        start = time.time()
        iterations = 100
        for _ in range(iterations):
            _ = model.inference(images)
        torch.npu.synchronize()
        elapsed = time.time() - start
        
        fps = iterations * batch_size / elapsed
        print(f"Batch {batch_size}: {fps:.1f} FPS")

if __name__ == "__main__":
    benchmark()

踩坑指南

常见问题

  1. 编译失败

    • 检查 CANN 版本
    • 检查环境变量
    • 检查依赖库
  2. 运行失败

    • 检查 NPU 设备状态
    • 检查内存是否足够
    • 检查权限
  3. 性能不佳

    • 检查是否使用了优化示例
    • 检查是否用了 FP16
    • 检查是否用了批量推理

调试技巧

import torch_npu

# 检查 NPU 状态
print(torch_npu.npu.current_device())
print(torch_npu.npu.device_count())
print(torch_npu.npu.memory_allocated())
print(torch_npu.npu.memory_reserved())

# 启用调试
torch_npu.npu.set_compile_mode(jit_compile=False)

总结

samples 是昇腾开发的宝库:

  • 算子示例:Ascend C、Python 算子
  • 推理示例:ResNet、BERT、YOLO
  • 训练示例:MNIST、分布式训练
  • 应用示例:目标检测、文本分类
  • 优化示例:性能优化技巧
Logo

鲲鹏昇腾开发者社区是面向全社会开放的“联接全球计算开发者,聚合华为+生态”的社区,内容涵盖鲲鹏、昇腾资源,帮助开发者快速获取所需的知识、经验、软件、工具、算力,支撑开发者易学、好用、成功,成为核心开发者。

更多推荐