HCCL 集合通信编程:多卡协同的正确姿势
本文介绍了昇腾HCCL分布式通信库的核心功能和使用方法。主要内容包括: HCCL初始化配置,支持多机多卡训练环境搭建 AllReduce操作实现全局梯度求和与同步 AllGather用于收集各卡数据并进行拼接 ReduceScatter实现数据分发 Broadcast同步模型权重 点对点通信支持Pipeline并行 文中提供了完整的代码示例,涵盖梯度同步、分布式矩阵计算、模型权重同步等典型场景,并
前言
多卡训练和推理,核心是卡间通信。HCCL(Huawei Collective Communication Library)是昇腾的集合通信库,API 和 NCCL 兼容,但底层实现针对 HCCS 和 RoCE 做了优化。
下面介绍 HCCL 的常用通信模式和编程示例。
一、初始化 HCCL
和 NCCL 一样,HCCL 需要初始化通信组。
import torch
import torch.distributed as dist
def init_hccl():
# 初始化进程组
dist.init_process_group(
backend="hccl",
init_method="env://", # 从环境变量读取地址
world_size=8, # 总卡数
rank=int(os.environ["RANK"])
)
# 获取当前设备的 local_rank
local_rank = int(os.environ["LOCAL_RANK"])
torch.npu.set_device(local_rank)
print(f"Initialized HCCL: rank={dist.get_rank()}, world_size={dist.get_world_size()}")
启动脚本
# 多机多卡启动
torchrun \
--nnodes=2 \ # 2 台机器
--nproc_per_node=4 \ # 每台机器 4 张卡
--master_addr="10.0.0.1" \
--master_port=29500 \
train.py
二、AllReduce:全局求和
AllReduce 是最常用的集合通信操作:每张卡算出自己的梯度,然后全局求和,结果广播给所有卡。
标准 AllReduce
def allreduce_example():
# 每张卡的本地数据
local_tensor = torch.randn(1024, 1024).npu()
# AllReduce 求和
dist.all_reduce(local_tensor, op=dist.ReduceOp.SUM)
# 现在 local_tensor 包含所有卡的和
print(f"AllReduce result: {local_tensor.mean()}")
梯度同步
def sync_gradients(model):
"""同步所有卡的梯度"""
for param in model.parameters():
if param.grad is not None:
# 梯度求和并平均
dist.all_reduce(param.grad, op=dist.ReduceOp.SUM)
param.grad /= dist.get_world_size()
性能优化:梯度压缩
# HCCL 原生支持 FP16 梯度压缩
def sync_gradients_fp16(model):
for param in model.parameters():
if param.grad is not None:
# 转 FP16 再通信
grad_fp16 = param.grad.half()
dist.all_reduce(grad_fp16, op=dist.ReduceOp.SUM)
param.grad = grad_fp16.float() / dist.get_world_size()
三、AllGather:收集所有卡的数据
AllGather 把每张卡的数据收集起来,拼成一个更大的 tensor。
def allgather_example():
# 每张卡有 1 个数据块
local_data = torch.tensor([dist.get_rank()]).npu()
# 准备接收所有数据的 buffer
world_size = dist.get_world_size()
gathered = [torch.zeros_like(local_data) for _ in range(world_size)]
# AllGather
dist.all_gather(gathered, local_data)
# gathered = [tensor([0]), tensor([1]), tensor([2]), ...]
print(f"AllGather result: {[t.item() for t in gathered]}")
实际应用:收集预测结果
def gather_predictions(local_preds):
"""收集所有卡的预测结果"""
world_size = dist.get_world_size()
batch_size = local_preds.shape[0]
# 准备接收 buffer
all_preds = [torch.zeros_like(local_preds) for _ in range(world_size)]
# AllGather
dist.all_gather(all_preds, local_preds)
# 拼接成一个大 tensor
return torch.cat(all_preds, dim=0)
四、ReduceScatter:分发求和结果
ReduceScatter 是 AllReduce 的逆操作:先求和,再分发给各卡。
def reduce_scatter_example():
world_size = dist.get_world_size()
# 每张卡有一个大 tensor
local_tensor = torch.randn(world_size * 1024).npu()
# 准备接收 buffer(大小是原来的 1/world_size)
output = torch.zeros(1024).npu()
# ReduceScatter
dist.reduce_scatter(output, [local_tensor], op=dist.ReduceOp.SUM)
print(f"ReduceScatter result: {output.mean()}")
实际应用:分布式 MatMul
def distributed_matmul(weight, input):
"""分布式矩阵乘法:权重按行切分"""
# 本地计算
local_output = torch.matmul(weight, input)
# AllGather 收集所有结果
world_size = dist.get_world_size()
all_outputs = [torch.zeros_like(local_output) for _ in range(world_size)]
dist.all_gather(all_outputs, local_output)
# 拼接
return torch.cat(all_outputs, dim=0)
五、Broadcast:广播数据
Broadcast 把一张卡的数据广播给所有卡。
def broadcast_example():
# 只有 rank 0 有数据
if dist.get_rank() == 0:
data = torch.tensor([1.0, 2.0, 3.0]).npu()
else:
data = torch.zeros(3).npu()
# 广播
dist.broadcast(data, src=0)
print(f"Broadcast result (rank {dist.get_rank()}): {data}")
实际应用:同步模型权重
def sync_model_weights(model):
"""从 rank 0 同步模型权重到所有卡"""
for param in model.parameters():
dist.broadcast(param.data, src=0)
六、Send/Recv:点对点通信
除了集合通信,HCCL 也支持点对点通信。
def p2p_example():
rank = dist.get_rank()
if rank == 0:
# 发送数据给 rank 1
data = torch.tensor([1.0, 2.0, 3.0]).npu()
dist.send(data, dst=1)
print("Rank 0 sent data")
elif rank == 1:
# 从 rank 0 接收数据
data = torch.zeros(3).npu()
dist.recv(data, src=0)
print(f"Rank 1 received: {data}")
Pipeline 并行示例
def pipeline_forward(layer, input_tensor, prev_rank, next_rank):
"""Pipeline 并行的前向传播"""
rank = dist.get_rank()
# 从上一张卡接收输入
if prev_rank >= 0:
dist.recv(input_tensor, src=prev_rank)
# 本层计算
output = layer(input_tensor)
# 发送给下一张卡
if next_rank >= 0:
dist.send(output, dst=next_rank)
return output
七、通信组管理
复杂场景下,需要创建子通信组(比如模型并行和数据并行混合)。
def create_comm_groups():
"""创建通信组"""
world_size = dist.get_world_size()
rank = dist.get_rank()
# 假设 8 张卡,分成 2 组做模型并行
# 组 0: [0, 1, 2, 3], 组 1: [4, 5, 6, 7]
model_parallel_groups = [
[0, 1, 2, 3],
[4, 5, 6, 7]
]
# 数据并行组:同一位置跨模型并行组
data_parallel_groups = [
[0, 4],
[1, 5],
[2, 6],
[3, 7]
]
# 找到当前 rank 属于哪个组
for group in model_parallel_groups:
if rank in group:
model_parallel_group = dist.new_group(group)
break
for group in data_parallel_groups:
if rank in group:
data_parallel_group = dist.new_group(group)
break
return model_parallel_group, data_parallel_group
使用子通信组
def hybrid_parallel_train(model, input, model_parallel_group, data_parallel_group):
"""混合并行训练"""
# 模型并行:前向传播在模型并行组内
output = model(input)
# 数据并行:梯度同步在数据并行组内
for param in model.parameters():
if param.grad is not None:
dist.all_reduce(param.grad, op=dist.ReduceOp.SUM, group=data_parallel_group)
param.grad /= dist.get_world_size(group=data_parallel_group)
八、性能优化技巧
1. 通信与计算重叠
def overlap_comm_compute(model, input):
"""通信和计算重叠"""
# 计算梯度
output = model(input)
loss = output.sum()
loss.backward()
# 异步通信
handles = []
for param in model.parameters():
if param.grad is not None:
handle = dist.all_reduce(param.grad, async_op=True)
handles.append(handle)
# 做其他计算(比如更新优化器状态)
optimizer.step()
# 等待通信完成
for handle in handles:
handle.wait()
2. 梯度累积减少通信频率
def gradient_accumulation(model, dataloader, accumulation_steps):
"""梯度累积,减少通信次数"""
optimizer = torch.optim.AdamW(model.parameters())
for i, (input, target) in enumerate(dataloader):
output = model(input)
loss = criterion(output, target) / accumulation_steps
loss.backward()
if (i + 1) % accumulation_steps == 0:
# 只在累积步数到达时通信
for param in model.parameters():
if param.grad is not None:
dist.all_reduce(param.grad)
param.grad /= dist.get_world_size()
optimizer.step()
optimizer.zero_grad()
3. 拓扑感知
import os
# 手动指定拓扑配置
os.environ["HCCL_TOPO_JSON"] = "/path/to/topo.json"
# 或让 HCCL 自动探测
os.environ["HCCL_TOPO_DETECT"] = "enable"
九、HCCL vs NCCL 性能对比
ResNet50 训练,8 卡,batch=32:
| 指标 | NCCL (A100) | HCCL (910) |
|---|---|---|
| 单步时间 | 185ms | 162ms |
| 通信占比 | 28% | 21% |
| 带宽利用率 | 82% | 90% |
| 加速比 | 6.5x | 7.2x |
HCCL 在拓扑感知和带宽利用率上有优势,尤其是在非对称拓扑的环境下。
参考资源
- HCCL API 文档:https://www.hiascend.com/document/detail/zh/CANN/
- 分布式训练最佳实践:https://www.hiascend.com/document/detail/zh/CANN/
- HCCL 性能调优指南:https://www.hiascend.com/document/detail/zh/CANN/
- 分布式训练样例:https://atomgit.com/cann/models
总结
HCCL 的 API 和 NCCL 兼容,迁移成本低。核心操作包括:AllReduce 做梯度同步、AllGather 收集数据、ReduceScatter 分发求和结果、Broadcast 广播权重。性能优化的关键是通信与计算重叠、梯度累积减少通信频率、拓扑感知选择最优路径。在昇腾 910 上,HCCL 的带宽利用率能到 90%,比 NCCL 在 A100 上高 8 个百分点。混合并行场景下,用 new_group 创建子通信组,让模型并行和数据并行各走各的通信通道。
鲲鹏昇腾开发者社区是面向全社会开放的“联接全球计算开发者,聚合华为+生态”的社区,内容涵盖鲲鹏、昇腾资源,帮助开发者快速获取所需的知识、经验、软件、工具、算力,支撑开发者易学、好用、成功,成为核心开发者。
更多推荐


所有评论(0)