写给前端的 CANN-samples:昇腾示例代码库到底是啥?
写给前端的 CANN-samples:昇腾示例代码库到底是啥?
·
之前有兄弟想学昇腾开发,问我:“哥,有没有现成的示例代码可以参考?不想从零开始写。”
好问题。今天一次说清楚。
samples 是啥?
samples 是昇腾的官方示例代码库。涵盖算子开发、模型推理、应用开发等各个方面。
一句话说清楚:samples 是昇腾的官方示例代码库,从 Hello World 到完整应用都有,学昇腾开发必备。
你说气人不气人,之前看文档看得云里雾里,跑个 sample 一看就懂了。
为什么需要 samples?
三种情况:
1. 学习昇腾开发
从示例代码入手,比看文档快得多。
2. 验证环境
跑通 sample 说明环境没问题。
3. 代码模板
复制 sample 代码改改就能用。
samples 目录结构
samples/
├── operator/ # 算子开发示例
│ ├── AscendC/ # Ascend C 算子示例
│ ├── KernelLaunch/ # 核函数启动示例
│ └── OPPython/ # Python 算子示例
├── inference/ # 推理示例
│ ├── nptensorflow/ # TensorFlow 推理
│ └── pytorch/ # PyTorch 推理
├── model/ # 模型示例
│ ├── ResNet/ # ResNet 示例
│ ├── BERT/ # BERT 示例
│ └── YOLO/ # YOLO 示例
├── application/ # 应用示例
│ ├── cv/ # 计算机视觉应用
│ ├── nlp/ # 自然语言处理应用
│ └── audio/ # 音频处理应用
├── framework/ # 框架集成示例
│ ├── tensorflow/ # TensorFlow 集成
│ └── pytorch/ # PyTorch 集成
└── contrib/ # 社区贡献
├── ascend910/ # 910 专用示例
└── ascend310/ # 310 专用示例
算子开发示例
Hello World 示例
最简单的 Ascend C 算子。
// samples/operator/AscendC/HelloWorld/helloworld.cpp
#include "kernel_operator.h"
class HelloWorld {
public:
__aicore__ inline HelloWorld() {}
__aicore__ inline void process() {
// 核心逻辑:打印 Hello World
printf("Hello World from Ascend C!\n");
}
};
extern "C" __global__ __aicore__ void helloworld_kernel() {
HelloWorld op;
op.process();
}
运行:
cd samples/operator/AscendC/HelloWorld
mkdir build && cd build
cmake ..
make
./helloworld
# 输出:Hello World from Ascend C!
Add 算子示例
两个张量相加。
// samples/operator/AscendC/Add/add_kernel.cpp
#include "kernel_operator.h"
template <typename T>
class AddKernel {
public:
__aicore__ inline AddKernel(GM_ADDR x, GM_ADDR y, GM_ADDR z, uint32_t totalLength) {
xGm.SetGlobalBuffer((__gm__ T*)x, totalLength);
yGm.SetGlobalBuffer((__gm__ T*)y, totalLength);
zGm.SetGlobalBuffer((__gm__ T*)z, totalLength);
pipe.InitBuffer(inQueueX, 1, totalLength * sizeof(T));
pipe.InitBuffer(inQueueY, 1, totalLength * sizeof(T));
pipe.InitBuffer(outQueueZ, 1, totalLength * sizeof(T));
}
__aicore__ inline void Process() {
CopyIn();
Compute();
CopyOut();
}
private:
__aicore__ inline void CopyIn() {
LocalTensor<T> xLocal = inQueueX.AllocTensor<T>();
LocalTensor<T> yLocal = inQueueY.AllocTensor<T>();
DataCopy(xLocal, xGm, totalLength);
DataCopy(yLocal, yGm, totalLength);
inQueueX.EnQue(xLocal);
inQueueY.EnQue(yLocal);
}
__aicore__ inline void Compute() {
LocalTensor<T> xLocal = inQueueX.DeQue<T>();
LocalTensor<T> yLocal = inQueueY.DeQue<T>();
LocalTensor<T> zLocal = outQueueZ.AllocTensor<T>();
Add(zLocal, xLocal, yLocal, totalLength);
outQueueZ.EnQue<T>(zLocal);
inQueueX.FreeTensor(xLocal);
inQueueY.FreeTensor(yLocal);
}
__aicore__ inline void CopyOut() {
LocalTensor<T> zLocal = outQueueZ.DeQue<T>();
DataCopy(zGm, zLocal, totalLength);
outQueueZ.FreeTensor(zLocal);
}
TPipe pipe;
TQue<QuePosition::VECIN, 1> inQueueX, inQueueY;
TQue<QuePosition::VECOUT, 1> outQueueZ;
GlobalTensor<T> xGm, yGm, zGm;
uint32_t totalLength;
};
extern "C" __global__ __aicore__ void add_kernel(GM_ADDR x, GM_ADDR y, GM_ADDR z, uint32_t totalLength) {
AddKernel<half> op(x, y, z, totalLength);
op.Process();
}
这个示例展示了 Ascend C 算子的标准流程:CopyIn → Compute → CopyOut。
MatMul 算子示例
矩阵乘法。
// samples/operator/AscendC/MatMul/matmul_kernel.cpp
#include "kernel_operator.h"
#include "lib/matmul.h"
class MatMulKernel {
public:
__aicore__ inline MatMulKernel(GM_ADDR a, GM_ADDR b, GM_ADDR c,
uint32_t M, uint32_t N, uint32_t K) {
aGm.SetGlobalBuffer((__gm__ half*)a, M * K);
bGm.SetGlobalBuffer((__gm__ half*)b, K * N);
cGm.SetGlobalBuffer((__gm__ half*)c, M * N);
this->M = M;
this->N = N;
this->K = K;
}
__aicore__ inline void Process() {
// 使用 MatMul API
matmul::MatMul<half, half, half> mm;
mm.SetTensorA(aGm);
mm.SetTensorB(bGm);
mm.template IterateAll<true>(cGm);
}
private:
GlobalTensor<half> aGm, bGm, cGm;
uint32_t M, N, K;
};
MatMul 示例展示了如何调用算子库 API。
推理示例
ResNet 推理
# samples/inference/pytorch/ResNet/resnet_inference.py
import torch
import torch_npu
from torchvision.models import resnet50
# 加载模型
model = resnet50(pretrained=True)
model = model.npu()
model.eval()
# 准备输入
input_tensor = torch.randn(1, 3, 224, 224).npu()
# 推理
with torch.no_grad():
output = model(input_tensor)
# 后处理
probabilities = torch.nn.functional.softmax(output[0], dim=0)
top5 = torch.topk(probabilities, 5)
print("Top 5 predictions:")
for idx, (prob, cls) in enumerate(zip(top5.values, top5.indices)):
print(f"{idx+1}: class {cls.item()}, prob {prob.item():.4f}")
运行:
cd samples/inference/pytorch/ResNet
python resnet_inference.py
BERT 推理
# samples/inference/pytorch/BERT/bert_inference.py
import torch
import torch_npu
from transformers import BertModel, BertTokenizer
# 加载模型
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
model = BertModel.from_pretrained('bert-base-uncased')
model = model.npu()
model.eval()
# 准备输入
text = "Hello, this is a sample text for BERT inference."
inputs = tokenizer(text, return_tensors="pt")
inputs = {k: v.npu() for k, v in inputs.items()}
# 推理
with torch.no_grad():
outputs = model(**inputs)
# 输出
print(f"Last hidden state shape: {outputs.last_hidden_state.shape}")
print(f"Pooler output shape: {outputs.pooler_output.shape}")
YOLO 推理
# samples/inference/pytorch/YOLO/yolo_inference.py
import torch
import torch_npu
import cv2
import numpy as np
# 加载模型
model = torch.hub.load('ultralytics/yolov5', 'yolov5s')
model = model.npu()
model.eval()
# 加载图像
image = cv2.imread("test.jpg")
image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
# 预处理
input_tensor = torch.from_numpy(image_rgb).permute(2, 0, 1).float() / 255.0
input_tensor = input_tensor.unsqueeze(0).npu()
# 推理
with torch.no_grad():
results = model(input_tensor)
# 后处理
results.render() # 绘制检测框
output_image = results.ims[0]
cv2.imwrite("output.jpg", output_image)
print(f"Detected {len(results.xyxy[0])} objects")
模型训练示例
MNIST 训练
# samples/model/MNIST/train.py
import torch
import torch.nn as nn
import torch.optim as optim
import torch_npu
from torchvision import datasets, transforms
# 定义模型
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 32, 3, 1)
self.conv2 = nn.Conv2d(32, 64, 3, 1)
self.fc1 = nn.Linear(9216, 128)
self.fc2 = nn.Linear(128, 10)
def forward(self, x):
x = self.conv1(x)
x = nn.functional.relu(x)
x = self.conv2(x)
x = nn.functional.relu(x)
x = nn.functional.max_pool2d(x, 2)
x = torch.flatten(x, 1)
x = self.fc1(x)
x = nn.functional.relu(x)
x = self.fc2(x)
return nn.functional.log_softmax(x, dim=1)
# 训练
def train(model, device, train_loader, optimizer, epoch):
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = model(data)
loss = nn.functional.nll_loss(output, target)
loss.backward()
optimizer.step()
if batch_idx % 100 == 0:
print(f'Epoch {epoch}, Batch {batch_idx}, Loss: {loss.item():.4f}')
def main():
device = torch.device("npu")
model = Net().to(device)
optimizer = optim.Adam(model.parameters(), lr=0.001)
train_loader = torch.utils.data.DataLoader(
datasets.MNIST('./data', train=True, download=True,
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
])),
batch_size=64, shuffle=True)
for epoch in range(1, 11):
train(model, device, train_loader, optimizer, epoch)
torch.save(model.state_dict(), "mnist_model.pth")
if __name__ == "__main__":
main()
分布式训练
# samples/model/ResNet/distributed_train.py
import torch
import torch.distributed as dist
import torch_npu
from torchvision.models import resnet50
def main():
# 初始化分布式
dist.init_process_group(backend='hccl')
rank = dist.get_rank()
world_size = dist.get_world_size()
# 加载模型
model = resnet50().npu()
model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[rank])
# 数据加载
train_loader = get_dataloader(rank, world_size)
# 训练循环
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
for epoch in range(100):
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.npu(), target.npu()
optimizer.zero_grad()
output = model(data)
loss = nn.functional.cross_entropy(output, target)
loss.backward()
optimizer.step()
if batch_idx % 100 == 0 and rank == 0:
print(f'Epoch {epoch}, Batch {batch_idx}, Loss: {loss.item():.4f}')
if rank == 0:
torch.save(model.state_dict(), "resnet50_distributed.pth")
if __name__ == "__main__":
main()
应用示例
目标检测应用
# samples/application/cv/object_detection/app.py
import torch
import torch_npu
import cv2
import time
class ObjectDetector:
def __init__(self, model_path):
self.model = torch.jit.load(model_path).npu()
self.model.eval()
self.classes = ["person", "car", "dog", "cat"]
def detect(self, image):
# 预处理
input_tensor = self.preprocess(image)
# 推理
with torch.no_grad():
boxes, scores, labels = self.model(input_tensor)
# 后处理
results = self.postprocess(boxes, scores, labels)
return results
def preprocess(self, image):
image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
image_resized = cv2.resize(image_rgb, (640, 640))
input_tensor = torch.from_numpy(image_resized).permute(2, 0, 1).float() / 255.0
return input_tensor.unsqueeze(0).npu()
def postprocess(self, boxes, scores, labels):
results = []
for box, score, label in zip(boxes[0], scores[0], labels[0]):
if score > 0.5:
results.append({
'box': box.cpu().numpy(),
'score': score.item(),
'class': self.classes[label.item()]
})
return results
def visualize(self, image, results):
for result in results:
x1, y1, x2, y2 = result['box'].astype(int)
cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 2)
cv2.putText(image, f"{result['class']}: {result['score']:.2f}",
(x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
return image
def main():
detector = ObjectDetector("yolov5.torchscript")
cap = cv2.VideoCapture(0)
while True:
ret, frame = cap.read()
if not ret:
break
start = time.time()
results = detector.detect(frame)
fps = 1 / (time.time() - start)
frame = detector.visualize(frame, results)
cv2.putText(frame, f"FPS: {fps:.1f}", (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
cv2.imshow('Detection', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
if __name__ == "__main__":
main()
文本分类应用
# samples/application/nlp/text_classification/app.py
import torch
import torch_npu
from transformers import BertTokenizer, BertForSequenceClassification
class TextClassifier:
def __init__(self, model_path):
self.tokenizer = BertTokenizer.from_pretrained(model_path)
self.model = BertForSequenceClassification.from_pretrained(model_path)
self.model = self.model.npu().eval()
self.labels = ["negative", "positive"]
def predict(self, text):
# 编码
inputs = self.tokenizer(text, return_tensors="pt", padding=True, truncation=True)
inputs = {k: v.npu() for k, v in inputs.items()}
# 推理
with torch.no_grad():
outputs = self.model(**inputs)
# 后处理
probs = torch.nn.functional.softmax(outputs.logits, dim=1)
label_idx = torch.argmax(probs, dim=1).item()
return {
'text': text,
'label': self.labels[label_idx],
'confidence': probs[0, label_idx].item()
}
def main():
classifier = TextClassifier("bert-text-classifier")
texts = [
"This movie is amazing! I loved every minute of it.",
"Terrible product, waste of money. Don't buy it.",
"The service was okay, nothing special."
]
for text in texts:
result = classifier.predict(text)
print(f"Text: {result['text']}")
print(f"Label: {result['label']} (confidence: {result['confidence']:.4f})")
print()
if __name__ == "__main__":
main()
框架集成示例
TensorFlow 集成
# samples/framework/tensorflow/mnist_tf.py
import tensorflow as tf
import npu_device
# 配置 NPU
npu_device.open().as_default()
# 加载数据
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
x_train, x_test = x_train / 255.0, x_test / 255.0
# 定义模型
model = tf.keras.models.Sequential([
tf.keras.layers.Flatten(input_shape=(28, 28)),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Dense(10, activation='softmax')
])
# 编译
model.compile(
optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
# 训练
model.fit(x_train, y_train, epochs=5, validation_split=0.1)
# 评估
test_loss, test_acc = model.evaluate(x_test, y_test)
print(f'Test accuracy: {test_acc:.4f}')
PyTorch 集成
# samples/framework/pytorch/custom_op.py
import torch
import torch_npu
# 注册自定义算子
torch_npu.npu.register_op(
"custom_add",
inputs=["x", "y"],
outputs=["z"],
attrs=[],
kernel="custom_add_kernel.so"
)
# 使用自定义算子
x = torch.randn(1024, 1024).npu()
y = torch.randn(1024, 1024).npu()
z = torch_npu.npu.custom_add(x, y)
print(f"Result shape: {z.shape}")
性能优化示例
算子优化
// samples/operator/AscendC/OptimizedAdd/optimized_add.cpp
#include "kernel_operator.h"
template <typename T>
class OptimizedAddKernel {
public:
__aicore__ inline OptimizedAddKernel(GM_ADDR x, GM_ADDR y, GM_ADDR z, uint32_t totalLength) {
// 优化 1:使用双缓冲
pipe.InitBuffer(inQueueX, 2, BLOCK_SIZE * sizeof(T)); // 双缓冲
pipe.InitBuffer(inQueueY, 2, BLOCK_SIZE * sizeof(T));
pipe.InitBuffer(outQueueZ, 2, BLOCK_SIZE * sizeof(T));
// 优化 2:向量化计算
// 优化 3:流水线并行
}
__aicore__ inline void Process() {
int32_t loopCount = totalLength / BLOCK_SIZE;
// 流水线并行:CopyIn、Compute、CopyOut 同时进行
for (int32_t i = 0; i < loopCount + 2; i++) {
if (i < loopCount) {
CopyIn(i);
}
if (i > 0 && i < loopCount + 1) {
Compute(i - 1);
}
if (i > 1) {
CopyOut(i - 2);
}
}
}
};
推理优化
# samples/inference/pytorch/OptimizedInference/optimized_resnet.py
import torch
import torch_npu
from torchvision.models import resnet50
import time
class OptimizedResNet:
def __init__(self):
# 优化 1:加载 TorchScript 模型
self.model = torch.jit.load("resnet50.torchscript").npu()
self.model.eval()
# 优化 2:FP16 推理
self.model = self.model.half()
# 优化 3:预热
self._warmup()
def _warmup(self):
dummy = torch.randn(1, 3, 224, 224, dtype=torch.float16).npu()
with torch.no_grad():
for _ in range(10):
_ = self.model(dummy)
torch.npu.synchronize()
def inference(self, images):
# 优化 4:批量推理
images = images.half()
with torch.no_grad():
outputs = self.model(images)
return outputs
def benchmark():
model = OptimizedResNet()
batch_sizes = [1, 8, 16, 32, 64]
for batch_size in batch_sizes:
images = torch.randn(batch_size, 3, 224, 224, dtype=torch.float16).npu()
# 预热
for _ in range(10):
_ = model.inference(images)
torch.npu.synchronize()
# 计时
start = time.time()
iterations = 100
for _ in range(iterations):
_ = model.inference(images)
torch.npu.synchronize()
elapsed = time.time() - start
fps = iterations * batch_size / elapsed
print(f"Batch {batch_size}: {fps:.1f} FPS")
if __name__ == "__main__":
benchmark()
踩坑指南
常见问题
-
编译失败
- 检查 CANN 版本
- 检查环境变量
- 检查依赖库
-
运行失败
- 检查 NPU 设备状态
- 检查内存是否足够
- 检查权限
-
性能不佳
- 检查是否使用了优化示例
- 检查是否用了 FP16
- 检查是否用了批量推理
调试技巧
import torch_npu
# 检查 NPU 状态
print(torch_npu.npu.current_device())
print(torch_npu.npu.device_count())
print(torch_npu.npu.memory_allocated())
print(torch_npu.npu.memory_reserved())
# 启用调试
torch_npu.npu.set_compile_mode(jit_compile=False)
总结
samples 是昇腾开发的宝库:
- 算子示例:Ascend C、Python 算子
- 推理示例:ResNet、BERT、YOLO
- 训练示例:MNIST、分布式训练
- 应用示例:目标检测、文本分类
- 优化示例:性能优化技巧
鲲鹏昇腾开发者社区是面向全社会开放的“联接全球计算开发者,聚合华为+生态”的社区,内容涵盖鲲鹏、昇腾资源,帮助开发者快速获取所需的知识、经验、软件、工具、算力,支撑开发者易学、好用、成功,成为核心开发者。
更多推荐



所有评论(0)