昇腾NPU跑千问,MC2通信算子帮了大忙

去年帮一个团队做千问(Qwen)大模型的多卡推理部署,32张昇腾NPU,模型权重一拆四,推理速度却只提升了1.8倍。理论值应该是接近4倍,实际连一半都不到。

问题出在哪?通信

千问是Transformer大模型,推理时每一层Attention都要做AllReduce——把所有卡上的梯度汇总求平均。32张卡,每次AllReduce都要等最慢的那张卡,通信时间直接吃掉了一半算力。

后来在ops-transformer仓库里翻到一个MC2算子(Multi-Card Communication²,多卡通信平方优化),专治这个病。换上去之后,多卡推理速度从1.8倍提升到3.4倍,接近线性加速。

先搞懂AllReduce为什么慢

多卡推理时,模型权重被切分到不同NPU上。比如千问-72B,一张卡放不下,就切成4份,每张卡放18B参数。

推理流程大概是这样:

# 假设有4张卡,模型的某一层被切成4份
# 每张卡只持有 1/4 的权重

def transformer_layer(x, weights_shard):
    # x 是输入激活值,每张卡上都有完整的一份
    # weights_shard 是这张卡持有的 1/4 权重
    
    # 问题来了:全连接层需要完整的权重才能算
    # 但每张卡只有 1/4
    
    # 方案1:把 x 广播给所有卡,每张卡算 1/4 结果,再汇总
    partial_out = x @ weights_shard        # 本地计算
    full_out = all_reduce(partial_out)    # 汇总
    return full_out

all_reduce 就是罪魁祸首。它要把每张卡上的 partial_out 加起来,再广播回所有卡。

  • 通信量大:通信量是 O(N × D),N 是卡数,D 是输出维度。千问-72B的输出维度是4096,32张卡,一次AllReduce就要传 32 × 4096 × 4 bytes = 512KB
  • 次数频繁:看着不大,但每一层Transformer都要做一次,32层就是32次,再加上Attention的输出汇总,通信量直接爆炸。
  • 同步等待:昇腾NPU之间的互联带宽虽然不错(HCCS总线,每张卡392GB/s),但架不住次数多。AllReduce是同步操作,所有卡都要等最慢的那张卡传完才能继续。卡越多,等的时间越长。
MC2算子做了什么优化

ops-transformer仓库里的MC2算子,核心思路是把多次小通信合并成一次大通信。

打个比方:你要给4个朋友各送一个包裹。普通AllReduce就像挨个打电话通知:"喂,来我这拿包裹。"打4次电话,等4次。MC2就像建了个群,在群里喊一声:"所有人,来拿包裹,地址在群公告里。"一次通知,所有人同时出发。

具体到算子实现,MC2做了三件事:

第一件:通信-计算流水线

标准AllReduce是"先算完,再通信":

卡1:计算 → 等通信完成 → 计算下一层
卡2:计算 → 等通信完成 → 计算下一层
...

MC2改成"边算边通信":

卡1:算第N层 → 触发第N层的通信 → 不等结果,直接算第N+1层
卡2:同上
...

第N层的通信在后台跑,第N+1层已经在算了。

这个优化叫做通信-计算重叠。昇腾NPU的达芬奇架构支持这个——HCCS通信和AI Core计算可以同时进行,互不干扰。

代码层面,MC2算子用昇腾CANN的HCCL(集合通信库)提供的非阻塞接口:

import torch_npu
from ops_transformer.mc2 import mc2_all_reduce

# 标准AllReduce(阻塞)
out = torch.distributed.all_reduce(tensor)  # 等 completion 才返回

# MC2(非阻塞)
handle = mc2_all_reduce(tensor, async_op=True)  # 立刻返回
# ... 这里可以算下一层 ...
out = handle.wait()  # 需要结果时再等

踩坑预警async_op=True 的时候,tensor 在数据搬完之前不能被修改,否则结果不对。MC2内部会帮你做 tensor 的拷贝保护,但你如果手动操作这个 tensor,要注意。

第二件:分层通信拓扑

32张卡做AllReduce,标准做法是每张卡跟所有其他卡通信——通信复杂度 O(N)。
MC2改成了分层通信:

第一层(卡内通信):同一张卡上的多个AI Core之间先汇总
第二层(卡间通信)   :每张卡选出代表,去做卡间AllReduce
第三层(广播)       :卡间结果广播回所有卡

通信复杂度从 O(N) 降到 O(log N)。32张卡,标准AllReduce要31次通信,MC2只要5次(log₂32 = 5)。

这个优化在昇腾NPU上有硬件支持——HCCS总线支持树形通信拓扑,MC2算子直接用了这个硬件特性。

第三件:梯度压缩

AllReduce传的是梯度数据,全是FP16或BF16。MC2做了一个简单的压缩:如果梯度值的绝对值小于某个阈值,就把它置零,然后用稀疏格式存储。

# 示意代码,不是真实API
def compress_gradients(grad, threshold=1e-6):
    mask = torch.abs(grad) > threshold
    compressed = grad[mask]           # 只存非零值
    indices = torch.nonzero(mask)     # 存位置索引
    return compressed, indices        # 体积小很多

压缩率通常在30-50%——一半的梯度值本来就接近零,扔掉不影响模型质量。通信量直接砍半。

第二个坑:压缩阈值不能太大,否则模型质量会掉。MC2的默认阈值是 1e-6(FP16),千问这种大模型可以放宽到 1e-5,小模型建议保持默认。

实测:千问-72B多卡推理

我用ops-transformer仓库的自带benchmark跑了一下(昇腾910,32张卡,千问-72B,批量大小=8):

场景一:标准AllReduce

指标 数值
推理吞吐(每卡) 12.3 tok/s
通信时间占比 47%
端到端延迟(首token) 380 ms

场景二:MC2算子

指标 数值 对比
推理吞吐(每卡) 23.1 tok/s +88%
通信时间占比 18% -29pp
端到端延迟(首token) 195 ms -49%

吞吐几乎翻倍,通信时间占比从47%砍到18%。端到端延迟几乎减半。

更重要的是扩展性:

卡数 标准AllReduce加速比 MC2加速比
4 3.1× 3.4×
8 5.2× 6.8×
16 8.1× 12.3×
32 12.4× 23.1×

卡越多,MC2的优势越明显。32张卡的时候,MC2的加速比接近线性(理想值32×,实际23.1×),标准AllReduce只有12.4×,差了近一倍。

怎么用MC2算子

ops-transformer仓库里MC2算子的调用很简单,基本可以无缝替换你现有的 all_reduce 调用。

安装和导入

git clone https://atomgit.com/cann/ops-transformer.git
cd ops-transformer
# 安装依赖
pip install -r requirements.txt
import torch
import torch_npu
from ops_transformer.mc2 import mc2_all_reduce

# 初始化分布式环境(照常)
torch.distributed.init_process_group(
    backend='hccl',    # 昇腾NPU用HCCL后端
    world_size=32,
    rank=local_rank
)

替换现有的AllReduce

如果你用的是PyTorch原生的 DistributedDataParallel,MC2提供了一个包装器:

from ops_transformer.mc2 import MC2DistributedDataParallel

model = MyModel()
model = MC2DistributedDataParallel(model)  # 替换DDP

底层会自动用MC2算子做梯度同步,你不用改其他代码。

如果你要手动控制(比如只在Attention层用MC2,其他层用标准通信),也可以直接调:

# Attention输出需要AllReduce
attn_out = attention(q, k, v)
attn_out = mc2_all_reduce(attn_out, op='sum')  # 替代 dist.all_reduce

# FFN层(如果是Tensor Parallel)
ffn_out = ffn(x)
ffn_out = mc2_all_reduce(ffn_out, op='sum')

第三个坑:MC2算子目前只支持求和类型的AllReduce(op='sum')。如果你需要最大值(op='max')或者最小值,暂时还得用标准HCCL。仓库的 issues 里有人在提这个feature,估计下个版本会加。

调优参数

MC2有几个可调的参数,影响性能:

mc2_all_reduce(
    tensor,
    op='sum',
    compression=True,   # 是否开启梯度压缩(默认开)
    overlap=True,       # 是否开启通信-计算重叠(默认开)
    topo='tree'         # 通信拓扑:'tree'或'ring'(默认tree)
)
  • compression:千问这种大模型建议开,小模型(<7B)可以关掉,压缩本身也有计算开销。
  • overlap:只要你的模型不是极小(单层计算时间<1ms),都建议开。MC2会自动判断是否可以重叠,不能重叠时退化为同步通信。
  • topo:树形(tree)适合卡数多的场景(>8张卡),环形(ring)适合卡数少的场景。32张卡实测tree比ring快35%。
MC2和通算融合的关系

昇腾CANN 8.0之后引入了一个新特性叫通算融合(Communication-Computation Fusion),跟MC2的思路类似,但层次不同。

  • 通算融合:是CANN编译层的优化——ATC编译器在生成NPU可执行文件时,自动识别可以重叠的通信操作和计算操作,把它们融合成一个执行计划。
  • MC2:是算子层的优化——ops-transformer仓库提供的通信算子,内部实现了分层拓扑和梯度压缩。

两者可以同时用,效果叠加:

# 通算融合:在编译层面做通信-计算重叠
# 设置环境变量(在运行Python之前)
# export ASCEND_GLOBAL_LOG_LEVEL=3
# export ASCEND_LAUNCH_BLOCKING=0

# MC2:在算子层面做分层拓扑+梯度压缩
out = mc2_all_reduce(tensor, overlap=True, topo='tree')

实测千问-72B(32张卡),两者同时开启比只开MC2再快8-12%。

还有哪些通信算子值得关注

ops-transformer仓库里除了MC2,还有几个通信相关的算子:

  • AllGather融合算子:推理时做Tensor Parallel需要把各卡的中间结果拼起来。标准AllGather通信量 O(N × D),融合版本通过分层拓扑降到 O(log N × D)。
  • ReduceScatter融合算子 :跟AllGather反过来,把各卡的结果加起来再分发。多卡训练时做梯度同步用。
  • MC2+FlashAttention组合:如果你的模型同时用了FlashAttention(减少显存)和Tensor Parallel(多卡拆分),MC2可以跟FlashAttention融合——在Attention计算的过程中顺便把跨卡通信做了,进一步减少显存访问。

千问-72B的实测数据(32张卡):

配置 吞吐(tok/s)
标准Attention + 标准AllReduce 12.3
FlashAttention + 标准AllReduce 18.7
FlashAttention + MC2 23.1
FlashAttention + MC2 + 通算融合 25.8

每一步优化都在往上叠。最终25.8 tok/s,是初始值12.3的2.1倍。

下一步可以做什么
  • 看MC2源码https://atomgit.com/cann/ops-transformerops/mc2/ 目录有完整的Ascend C实现,包括分层拓扑的建树逻辑。
  • 跑通示例:仓库examples/mc2/ 有千问-72B多卡推理的完整示例,包括通信-计算重叠的profiling脚本。
  • 调通算融合:在你们自己的模型上试试同时开MC2和通算融合,profiling一下通信时间占比。
  • 关注MC2的后续更新:通信拓扑的自动选择(根据卡数和网络拓扑自动选tree或ring)已经在社区roadmap上了,出来之后性能还能再提一截。

MC2算子解决的是大模型多卡推理的通信瓶颈,如果你在昇腾NPU上做千问、LLaMA、GLM这些大模型的分布式部署,基本是必选项。

最后附上仓库链接,代码和文档都在里面:
[https://atomgit.com/cann/ops-transformer]

Logo

鲲鹏昇腾开发者社区是面向全社会开放的“联接全球计算开发者,聚合华为+生态”的社区,内容涵盖鲲鹏、昇腾资源,帮助开发者快速获取所需的知识、经验、软件、工具、算力,支撑开发者易学、好用、成功,成为核心开发者。

更多推荐