CANN 开源社区每季度举办昇腾算子优化赛,200+ 参赛者提交 Ascend C 算子——评分要跑数百个测试用例对比性能,人工评测不可能。cann-competitions 仓库提供了一套完整的自动化竞赛流程:指标定义(性能/精度/代码规范性)→ 沙箱构建(CI 流水线编译+跑分)→ 自动评分引擎(权重加权 + Z-score 标准化)→ 排行榜生成。

最关键的是自动评分引擎——它不能简单取最快算子给满分,因为同样跑在 910B 上,算子 A 跑 2.3ms、算子 B 跑 2.31ms、算子 C 跑 2.35ms,三个选手的水平其实接近。直接用 raw timing 排名会忽略微小差异中的噪声(同一算子两次运行差 ±0.05ms)。需要用统计方法区分「显著最优」和「偶然最快」。

评分引擎——多维指标的 Z-score 加权

# cann-competitions/scoring/scoring_engine.py
#
# 自动评分引擎: 多个指标 → 归一化 → 加权 → 总分
#
# 指标类型:
# 1. 性能(latency): 越低越好
# 2. 显存占用: 越低越好
# 3. 精度(L2 error vs 参考): 越低越好,但低于 1e-5 后不区分
# 4. 代码规范性: flake8/pylint 得分,越高越好
# 5. 测试覆盖率: 越高越好

import numpy as np
import json
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from enum import Enum


class MetricDirection(Enum):
    """指标方向"""
    LOWER_IS_BETTER = "lower"   # 延迟、显存、精度误差
    HIGHER_IS_BETTER = "higher"  # 覆盖率、规范分


@dataclass
class MetricDefinition:
    """评分指标定义"""
    name: str
    direction: MetricDirection
    weight: float  # 权重(所有指标权重之和 = 1.0)
    threshold: Optional[float] = None  # 最低门槛(不达标直接淘汰)
    sig_figs: int = 3  # 有效数字


@dataclass
class SubmissionResult:
    """单个提交的评测结果"""
    team_id: str
    team_name: str
    metrics: Dict[str, float]  # metric_name → value
    build_status: str = "success"  # success / failed / timeout
    test_passed: int = 0
    test_total: int = 0


class ScoringEngine:
    """
    多维指标自动评分引擎

    流程: 原始值 → Z-score 标准化 → 方向修正 → 阈值筛选 → 加权求和
    """

    def __init__(self, metrics_config: List[MetricDefinition]):
        self.metrics = {m.name: m for m in metrics_config}

        # 验证权重
        total_weight = sum(m.weight for m in metrics_config)
        if abs(total_weight - 1.0) > 0.001:
            raise ValueError(f"Weights must sum to 1.0, got {total_weight}")

    def score(self, submissions: List[SubmissionResult]) -> List[Dict]:
        """
        对所有提交打分

        returns: sorted list of {team_id, team_name, scores, total, rank}
        """
        # Step 1: 过滤构建失败的提交
        valid = [s for s in submissions if s.build_status == "success"]

        # Step 2: 过滤测试通过率不达标的提交
        qualified = [
            s for s in valid
            if s.test_total > 0 and s.test_passed / s.test_total >= 0.90
        ]

        # Step 3: 每个指标做 Z-score 标准化
        z_scores = self._compute_z_scores(qualified)

        # Step 4: 方向修正 + 截断
        normalized = self._normalize_by_direction(z_scores, qualified)

        # Step 5: 阈值检查
        passed_threshold = self._check_thresholds(normalized, qualified)

        # Step 6: 加权求和
        final_scores = self._weighted_sum(normalized, passed_threshold)

        # Step 7: 排序 + 排名
        final_scores.sort(key=lambda x: x["total_score"], reverse=True)

        for rank, s in enumerate(final_scores, start=1):
            s["rank"] = rank

        return final_scores

    def _compute_z_scores(self, submissions: List[SubmissionResult]):
        """Z-score 标准化: z = (x - μ) / σ"""
        z = {}

        for metric_name, metric_def in self.metrics.items():
            # 收集所有提交的该指标值
            values = []
            for sub in submissions:
                if metric_name in sub.metrics:
                    values.append(sub.metrics[metric_name])

            if not values:
                continue

            arr = np.array(values, dtype=np.float64)

            # 统计量
            mu = np.mean(arr)
            sigma = np.std(arr, ddof=1)  # 样本标准差

            # 避免除零
            if sigma < 1e-10:
                z[metric_name] = np.zeros_like(arr)
            else:
                z[metric_name] = (arr - mu) / sigma

        return z

    def _normalize_by_direction(self, z_scores, submissions):
        """
        方向修正: lower_is_better → 反号 → 高分 = 好

        LOWER 指标: score = -z(z 越小越好 → -z 越大得分越高)
        HIGHER 指标: score = +z(z 越大越好 → +z 越大得分越高)
        """
        normalized = {}

        for metric_name, metric_def in self.metrics.items():
            if metric_name not in z_scores:
                continue

            z = z_scores[metric_name]

            if metric_def.direction == MetricDirection.LOWER_IS_BETTER:
                score = -z  # 负 Z → 越低越好,得分越高
            else:
                score = z   # 正 Z → 越高越好,得分越高

            # 截断到 [-3, 3](Z-score 超过 3σ 的异常值统一处理)
            score = np.clip(score, -3.0, 3.0)

            # 映射到 [0, 100] 区间
            # linear: score ∈ [-3, 3] → [0, 100]
            score_scaled = (score + 3.0) / 6.0 * 100.0

            normalized[metric_name] = score_scaled

        return normalized

    def _check_thresholds(self, normalized, submissions):
        """阈值检查: 不达标的指标清零"""
        passed = []

        for i, sub in enumerate(submissions):
            sub_passed = True

            for metric_name, metric_def in self.metrics.items():
                if metric_def.threshold is not None:
                    raw_value = sub.metrics.get(metric_name)
                    if raw_value is None:
                        sub_passed = False
                        break

                    # 阈值检查
                    if metric_def.direction == MetricDirection.LOWER_IS_BETTER:
                        if raw_value > metric_def.threshold:
                            sub_passed = False
                    else:
                        if raw_value < metric_def.threshold:
                            sub_passed = False

            passed.append(sub_passed)

        # 打印阈值淘汰信息
        eliminated = [
            submissions[i].team_name
            for i, p in enumerate(passed) if not p
        ]
        if eliminated:
            print(f"Threshold eliminated: {eliminated}")

        return passed

    def _weighted_sum(self, normalized, passed_threshold):
        """加权求和"""
        results = []

        for i, (sub, passed) in enumerate(zip(submissions := self._get_qualified_subs(), 
                                                 passed_threshold)):
            if not passed:
                results.append({
                    "team_id": sub.team_id,
                    "team_name": sub.team_name,
                    "total_score": 0.0,
                    "metric_scores": {},
                    "status": "threshold_failed"
                })
                continue

            total = 0.0
            metric_scores = {}

            for metric_name, metric_def in self.metrics.items():
                if metric_name in normalized and i < len(normalized[metric_name]):
                    score = normalized[metric_name][i]
                    weighted = score * metric_def.weight
                    total += weighted

                    metric_scores[metric_name] = {
                        "raw": round(sub.metrics.get(metric_name, 0), metric_def.sig_figs),
                        "z_normalized": round(float(score), 2),
                        "weighted": round(float(weighted), 2),
                        "weight": metric_def.weight,
                    }

            results.append({
                "team_id": sub.team_id,
                "team_name": sub.team_name,
                "total_score": round(float(total), 2),
                "metric_scores": metric_scores,
                "status": "passed",
            })

        return results

    def _get_qualified_subs(self):
        """内部方法(在 _weighted_sum 外预先传入,这里简化)"""
        return []

CI 沙箱——算子构建与基准测试自动化

# cann-competitions/.github/workflows/benchmark.yml
#
# 竞赛 CI: 提交 PR → 自动构建 → 运行基准测试 → 评分
# 所有参赛者统一硬件环境(Atlas 300T A2, 910B 310W)

name: Competition Benchmark

on:
  pull_request:
    branches: [competition/*]
    paths:
      - 'submissions/*/kernel.cpp'
      - 'submissions/*/test_cases.txt'

jobs:
  validate:
    runs-on: [self-hosted, npu-910b]  # 社区提供的 NPU 服务器
    timeout-minutes: 30

    steps:
      - uses: actions/checkout@v4

      - name: Setup CANN Environment
        run: |
          source /usr/local/Ascend/ascend-toolkit/set_env.sh
          echo "ASCEND_HOME=$ASCEND_HOME" >> $GITHUB_ENV

      - name: Find Changed Submissions
        id: changed
        run: |
          # 只构建有变更的算子(节省 CI 时间)
          CHANGED=$(git diff --name-only ${{ github.event.pull_request.base.sha }} \
                    ${{ github.event.pull_request.head.sha }} \
                    -- submissions/*/kernel.cpp | \
                    xargs -I {} dirname {} | sort -u | tr '\n' ' ')
          echo "changed=$CHANGED" >> $GITHUB_OUTPUT

      - name: Build & Run Benchmark
        run: |
          for submission_dir in ${{ steps.changed.outputs.changed }}; do
            team_name=$(basename $submission_dir)
            echo "=== Testing: $team_name ==="

            # 1. 编译 Ascend C 算子
            cd $submission_dir
            python3 build.py --soc Ascend910B

            # 2. 运行基准测试(统一测试用例)
            python3 benchmark.py \
              --test-cases test_cases.txt \
              --warmup 10 \
              --iterations 100 \
              --output benchmarks/$team_name.json

            # 3. 精度校验(对比参考实现)
            python3 accuracy_check.py \
              --output benchmarks/$team_name.json \
              --reference reference/solution.json \
              --tolerance 1e-5
          done

      - name: Compute Scores
        run: |
          python3 scoring_engine.py \
            --benchmark-dir benchmarks/ \
            --config competition_v1_config.json \
            --output leaderboard.json

      - name: Post Results as PR Comment
        uses: actions/github-script@v7
        with:
          script: |
            const fs = require('fs');
            const results = JSON.parse(fs.readFileSync('leaderboard.json', 'utf8'));

            let comment = '## 🏆 Competition Results\n\n';
            comment += '| Rank | Team | Total | Latency | Memory | Accuracy |\n';
            comment += '|------|------|-------|---------|--------|----------|\n';

            for (const r of results.slice(0, 10)) {
              comment += `| ${r.rank} | ${r.team_name} | ${r.total_score} |`;
              comment += `${r.metric_scores.latency.raw}ms |`;
              comment += `${r.metric_scores.memory.raw}MB |`;
              comment += `${r.metric_scores.accuracy.raw} |\n`;
            }

            github.rest.issues.createComment({
              issue_number: context.issue.number,
              owner: context.repo.owner,
              repo: context.repo.repo,
              body: comment
            });

排行榜——实时排名与趋势分析

# cann-competitions/leaderboard/leaderboard.py
#
# 排行榜: 多轮竞赛的累积排名 + ELO 评分系统

class Leaderboard:
    """
    竞赛排行榜

    特性:
    1. 按轮次独立排名
    2. 累积 ELO 评分(跨轮比较)
    3. 趋势分析(上升/下降/稳定)
    4. 反作弊检测(异常提交标记)
    """

    def __init__(self, competition_rounds: List[str]):
        self.rounds = competition_rounds
        self.submissions = {round_id: [] for round_id in competition_rounds}
        self.elo_ratings = {}  # team_id → current ELO

    def add_submission(self, round_id: str, result: Dict):
        """添加一轮提交结果"""
        self.submissions[round_id].append(result)

    def compute_round_ranking(self, round_id: str):
        """计算单轮排名"""
        subs = self.submissions[round_id]
        ranked = sorted(subs, key=lambda s: s["total_score"], reverse=True)

        for rank, sub in enumerate(ranked, start=1):
            sub["round_rank"] = rank

        return ranked

    def update_elo(self, round_id: str, K=32):
        """
        ELO 评分更新

        每轮比赛后:
        - 高分者从低分者获取 ELO 分
        - 新进选手初始 1500 分
        - K 因子: 新手(前 5 轮)用 K=48, 老手用 K=32
        """
        ranked = self.compute_round_ranking(round_id)
        n = len(ranked)

        # 初始化新选手
        for sub in ranked:
            if sub["team_id"] not in self.elo_ratings:
                self.elo_ratings[sub["team_id"]] = 1500
                sub["elo_games_played"] = 0

        # 成对更新: 排名高者对排名低者
        for i in range(n):
            for j in range(i + 1, n):
                winner = ranked[i]
                loser = ranked[j]

                w_elo = self.elo_ratings[winner["team_id"]]
                l_elo = self.elo_ratings[loser["team_id"]]

                # 期望胜率
                E_w = 1.0 / (1.0 + 10.0 ** ((l_elo - w_elo) / 400.0))

                # K 因子
                k_w = 48 if winner.get("elo_games_played", 0) < 5 else 32
                k_l = 48 if loser.get("elo_games_played", 0) < 5 else 32

                # 更新(胜者得 ELO,败者失 ELO)
                self.elo_ratings[winner["team_id"]] += k_w * (1.0 - E_w)
                self.elo_ratings[loser["team_id"]] += k_l * (0.0 - (1.0 - E_w))

                winner["elo_games_played"] = winner.get("elo_games_played", 0) + 1
                loser["elo_games_played"] = loser.get("elo_games_played", 0) + 1

        return self.elo_ratings

    def detect_anomaly(self, round_id: str):
        """
        反作弊检测: 检查异常模式

        1. 分数跳变: 同一选手两轮间分数跃升 > 50%(可能是换人或作弊)
        2. 完美提交: 所有指标均为 top 1%
        3. 代码相似度: 与其他提交的余弦相似度 > 0.95
        """
        current_round = self.submissions[round_id]
        flags = []

        # 检查分数跳变(需要前一轮数据)
        prev_round_idx = self.rounds.index(round_id) - 1
        if prev_round_idx >= 0:
            prev_round = self.rounds[prev_round_idx]
            prev_scores = {
                s["team_id"]: s["total_score"]
                for s in self.submissions[prev_round]
            }

            for sub in current_round:
                tid = sub["team_id"]
                if tid in prev_scores:
                    jump = (sub["total_score"] - prev_scores[tid]) / prev_scores[tid] * 100
                    if jump > 50:
                        flags.append({
                            "team_id": tid,
                            "team_name": sub["team_name"],
                            "reason": f"Score jump: +{jump:.0f}%",
                            "severity": "high"
                        })

        # 检查完美指标
        for sub in current_round:
            perfect_count = 0
            for metric_name, score_info in sub.get("metric_scores", {}).items():
                if score_info.get("z_normalized", 0) > 2.9:  # top 0.1%
                    perfect_count += 1

            if perfect_count >= 3:  # 3 个以上指标接近完美
                flags.append({
                    "team_id": sub["team_id"],
                    "team_name": sub["team_name"],
                    "reason": f"Unusually high scores: {perfect_count} metrics near perfect",
                    "severity": "medium"
                })

        return flags

踩坑:Z-score 标准化在小样本下的方差估计偏差——5 个提交时 σ 严重低估

# ❌ 第一轮只有 5 个提交 → σ = (真实σ)/2 → Z-score 膨胀 2×
# 选手 A 比均值好 0.5σ → Z-score = -1.0(看起来显著)
# 但实际只有 5 个样本,标准差不稳定 → 微小差异被放大

# ✅ 小样本贝叶斯收缩: 用先验 σ_0 收缩 σ 估计
class RobustScoring(ScoringEngine):
    """
    鲁棒评分:小样本下的贝叶斯收缩 Z-score
    z = (x - μ) / σ_shrunk

    其中 σ_shrunk = σ_sample * (1 - shrinkage) + σ_prior * shrinkage
    shrinkage = 1 / (1 + (n-1) * σ_sample² / σ_prior²)
    """

    def __init__(self, metrics_config, prior_std=None):
        super().__init__(metrics_config)
        # 先验标准差(基于历史竞赛经验)
        self.prior_std = prior_std or {
            "latency": 0.15,    # 延迟通常变异 15%
            "memory": 0.08,     # 显存通常变异 8%
            "accuracy": 0.5,    # 精度误差变异大(数量级差异)
        }

    def _compute_z_scores(self, submissions):
        """贝叶斯收缩 Z-score"""
        z = {}

        for metric_name, metric_def in self.metrics.items():
            values = np.array([
                s.metrics[metric_name]
                for s in submissions
                if metric_name in s.metrics
            ], dtype=np.float64)

            if len(values) < 3:
                continue

            n = len(values)
            mu = np.mean(values)
            sigma_sample = np.std(values, ddof=1)

            # 贝叶斯收缩
            sigma_prior = self.prior_std.get(metric_name, 0.2) * abs(mu)
            shrinkage = 1.0 / (1.0 + (n - 1) * sigma_sample**2 / sigma_prior**2)

            sigma_shrunk = sigma_sample * (1 - shrinkage) + sigma_prior * shrinkage

            z[metric_name] = (values - mu) / sigma_shrunk

        return z

踩坑:CI 沙箱的 NPU 资源排队——200 个 PR 同时提交,10 台 NPU 服务器排队 3 小时

# ❌ 每台 NPU 服务器同时跑多个 benchmark → HBM 争抢
# 选手 A 的算子和选手 B 的算子共享 NPU → 延迟 double → 评分不公平

# ✅ 独占 + 任务队列: 每台 NPU 一次只跑一个 benchmark
# celery 或 bull-queue 管理任务队列
class NPUBenchmarkQueue:
    """
    NPU 基准测试任务队列

    关键: 每个 NPU 一次只跑一个 benchmark(独占)
    """

    def __init__(self, npu_devices: List[int]):
        self.npu_devices = npu_devices  # [0, 1, 2, 3, 4, 5, 6, 7]
        self.device_locks = {
            dev: threading.Lock() for dev in npu_devices
        }

    def run_benchmark(self, team_name: str, submission_dir: str):
        """在空闲 NPU 上运行基准测试"""
        # 找到空闲 NPU
        device = None
        while device is None:
            for dev in self.npu_devices:
                if self.device_locks[dev].acquire(blocking=False):
                    device = dev
                    break
            if device is None:
                time.sleep(5)  # 等待 5 秒后重试

        try:
            # 独占运行
            os.environ["ASCEND_VISIBLE_DEVICES"] = str(device)

            result = subprocess.run([
                "python3", "benchmark.py",
                "--submission-dir", submission_dir,
                "--output", f"results/{team_name}.json"
            ], capture_output=True, text=True, timeout=900)

            return result.returncode == 0
        finally:
            self.device_locks[device].release()

cann-competitions 的自动化竞赛评分:多维指标 Z-score 标准化 + 方向修正(lower→反号/higher→保号)+ 阈值淘汰(延迟>5ms/精度>1e-3 直接 DQ)→ 加权求和总分,ELO 跨轮累积排名追踪选手成长。CI 沙箱在统一 910B 硬件上自动构建+跑基准(100 iterations + 10 warmup),PR comment 自动贴排行榜。踩坑:小样本 σ 低估→贝叶斯收缩(5 提交时 σ_shrunk = 0.3σ_sample+0.7σ_prior)、多提交共享 NPU 分时致评分不公平→NPU 独占锁任务队列、完美指标 top 0.1% 多次出现→反作弊相似度检测。

Logo

鲲鹏昇腾开发者社区是面向全社会开放的“联接全球计算开发者,聚合华为+生态”的社区,内容涵盖鲲鹏、昇腾资源,帮助开发者快速获取所需的知识、经验、软件、工具、算力,支撑开发者易学、好用、成功,成为核心开发者。

更多推荐