slime 训练框架解析
昇腾实战派。
作者:昇腾实战派
背景
在深度学习和强化学习领域,高效的大规模训练框架对于提升模型性能至关重要。GLM-5 在推理、编码和智能体任务上取得了全球所有开源模型中的最佳性能,显著缩小了与前沿模型的差距。这背后,Slime 作为其强化学习基础设施,通过优化训练吞吐量和效率,受到了广泛关注。
昇腾平台已对slime 训练框架进行了兼容性适配

整体设计
Slime 采用 Ray 作为单控制器(Single Controller)进行调度,基于 **SGLang **和 Megatron LM 作为唯一后端,构建了一个简洁且支持大规模 RL 的框架。其设计目标是减少训练与推理之间数据传递的开销,并使所有环节尽可能对齐真实生产环境中的组件,从而实现灵活的大规模 RL 训练。
slime 采用分离式架构,将 RLHF 训练流程分解为三个独立协作的模块:
- Training (Megatron): 负责主训练流程,支持完整的 5D 并行(TP/PP/DP/CP/EP)
- Rollout (SGLang): 使用 SGLang router 进行请求分发,生成新数据,基于 SGLang 的采样逻辑;
- Data Buffer: 管理数据流和自定义生成逻辑

基于前卫的设计,slime 的自由度很高:
- 资源调度自由:支持 co-locate 与 dis-aggregate 两种部署策略;在 rollout 和 training 上各自支持 DP/TP/PP/EP;具体实现见 slime/ray/placement_group.py
- 训练方式自由:支持同步训练和异步训练两种模式;具体实现见 slime/train.py 和 slime/train_async.py;注意,后者需要在 dis-aggregate 架构下进行训推分离的异步训练,rollout 始终领先 train 一个 step,也即 one-step off-policy;
- 采样方式自由:支持用户自定义复杂的采样流程,包括多轮工具调用、奖励模型集成、自定义验证器等;具体实现见 slime_plugins/rollout_buffer/
- 模型支持自由:支持 Dense 和 MoE 模型;具体脚本可参考 slime/scripts/run-qwen3-4B.sh 和 slime/scripts/run-deepseek-r1.sh

整体控制流程

训练主流程如下:
# train.py
def train(args):
# 1. 创建 Placement Groups
pgs = create_placement_groups(args)
# 2. 创建 RolloutManager(包含 SGLang 引擎)
rollout_manager, num_rollout_per_epoch = create_rollout_manager(args, pgs["rollout"])
# 3. 创建 Actor 和 Critic 模型
actor_model, critic_model = create_training_models(args, pgs, rollout_manager)
# 4. 训练循环
for rollout_id in range(args.start_rollout_id, args.num_rollout):
# 生成样本
rollout_data_ref = rollout_manager.generate.remote(rollout_id)
# 模型训练
actor_model.async_train(rollout_id, rollout_data_ref)
# 权重同步
actor_model.update_weights()
同步训练 train.py
for rollout_id in range(args.start_rollout_id, args.num_rollout):
# 阻塞等待当前轮 rollout 数据生成完毕
rollout_data_ref = ray.get(rollout_manager.generate.remote(rollout_id))
# 将 rollout 模型从 GPU 卸载以释放显存
if args.offload_rollout:
ray.get(rollout_manager.offload.remote())
# 使用当前 rollout 数据执行一步训练
ray.get(actor_model.async_train(rollout_id, rollout_data_ref))
# 将 rollout 模型权重重新加载回 GPU
if args.offload_rollout:
ray.get(rollout_manager.onload_weights.remote())
# 将最新训练权重同步到 rollout 模型
actor_model.update_weights()
# 权重更新完成后重新加载 KV cache
if args.offload_rollout:
ray.get(rollout_manager.onload_kv.remote())
异步训练 train_async.py
# 预先启动第一轮 rollout,使其在训练循环开始前就在后台运行
rollout_data_next_future = rollout_manager.generate.remote(args.start_rollout_id)
for rollout_id in range(args.start_rollout_id, args.num_rollout):
# 等待上一轮预生成的 rollout 数据就绪
if rollout_data_next_future is not None:
rollout_data_curr_ref = ray.get(rollout_data_next_future)
# 提前异步启动下一轮 rollout,与当前训练并行执行
if rollout_id + 1 < args.num_rollout:
rollout_data_next_future = rollout_manager.generate.remote(rollout_id + 1)
# 执行当前轮训练,此时下一轮 rollout 正在后台并行生成
ray.get(actor_model.async_train(rollout_id, rollout_data_curr_ref))
# 权重更新前等待并缓存正在进行的 rollout,同步屏障保证策略一致性
if (rollout_id + 1) % args.update_weights_interval == 0:
rollout_data_curr_ref = ray.get(x) if (x := rollout_data_next_future) is not None else None
rollout_data_next_future = None
训练初始化
Placement Group 组创建
Slime 使用 Ray 作为其资源细粒度调度的框架,以支持分离(dis-aggregate)或统一(co-located)的资源放置策略。
首先,系统根据当前模式(train、debug_train_only 或 debug_rollout_only)创建 Placement Group (PG):
- Co-located 模式:
rollout和actor(训练) 指向同一个 PG,实现资源共享。 - Debug 模式:
actor或rollout可以独占资源,允许隔离调试 RL 系统的某个组件。
Ray 的 Placement Group Scheduling 机制是实现这一点的关键。它通过分配逻辑上的虚拟资源来管理 GPU。例如,可以分配给 train group 中的每个 actor 0.4 个 GPU,然后 rollout manager 中的每个 engine 再要求 0.2 个 GPU。Ray 本身不限制 GPU 的实际使用,而是按需进行逻辑分配。
# slime/ray/placement_group.py
def allocate_train_group(args, num_nodes, num_gpus_per_node, pg):
return RayTrainGroup(
args=args,
num_nodes=num_nodes,
num_gpus_per_node=num_gpus_per_node,
pg=pg,
num_gpus_per_actor=0.4, # 训练 Actor 请求 0.4 GPU
)
# slime/ray/rollout.py
def init_rollout_engines(args, pg, all_rollout_engines):
# ...
for i in range(num_engines):
num_gpus = 0.2 # 推理引擎请求 0.2 GPU
num_cpus = num_gpus
scheduling_strategy = PlacementGroupSchedulingStrategy(
placement_group=pg,
placement_group_capture_child_tasks=True,
placement_group_bundle_index=reordered_bundle_indices[i * num_gpu_per_engine],
)
rollout_engine = RolloutRayActor.options(
num_cpus=num_cpus,
num_gpus=num_gpus,
scheduling_strategy=scheduling_strategy,
).remote(args, rank=i, worker_type=worker_type, base_gpu_id=base_gpu_id)
对于非 co-located 的情况,由于初始创建的 PG 就是分离的,actor 和 rollout 会自然地被调度到各自指定的资源上。
# slime/ray/placement_group.py
def create_placement_groups(args):
num_gpus = 0
if args.debug_train_only:
num_gpus = args.actor_num_nodes * args.actor_num_gpus_per_node
rollout_offset = 0
# ...
elif args.debug_rollout_only:
num_gpus = args.rollout_num_gpus
rollout_offset = 0
elif args.colocate:
num_gpus = args.actor_num_nodes * args.actor_num_gpus_per_node
rollout_offset = 0 # actor 和 rollout 共享同一组 GPU
# ...
else:
# 分离模式:rollout 从 actor 之后开始
num_gpus = args.actor_num_nodes * args.actor_num_gpus_per_node + args.rollout_num_gpus
rollout_offset = args.actor_num_nodes * args.actor_num_gpus_per_node
# ...
# 返回包含 actor、critic、rollout 的 PG 字典
return {
"actor": (pg, actor_pg_reordered_bundle_indices, actor_pg_reordered_gpu_ids),
"critic": (pg, critic_pg_reordered_bundle_indices, critic_pg_reordered_gpu_ids) if args.use_critic else None,
"rollout": (pg, rollout_pg_reordered_bundle_indices, rollout_pg_reordered_gpu_ids),
}
RayTrainGroup / Actor 初始化
理解这个逻辑可以分为三层:
- 顶层 (
RayTrainGroup): 负责TrainRayActor实例的创建、初始化和资源分配。它接收顶层指令(如train.py),并将其广播到其管理的所有TrainRayActor实例。 - 中层 (
TrainRayActor): 作为分布式训练的基本执行单元,每个TrainRayActor负责一部分计算任务,并与 SGLang 引擎建立连接以进行权重同步。 - 底层 (
Megatron-LM): 通过后端megatron_utils封装的核心训练逻辑。
RayTrainGroup 通过 self._actor_handlers 列表持有多个 TrainRayActor 实例
RayTrainGroup 中的异步方法都使用 def 定义。这是因为在 Ray 中,调用 .remote() 方法会立即同步返回一个句柄对象(类似 Future),而真正的异步任务执行发生在后台。只有当显式调用 ray.get() 时,才会阻塞并获取结果。
# slime/ray/actor_group.py
class RayTrainGroup:
def async_init(self, args, role, with_ref=False):
return [actor.init.remote(args, role, with_ref=with_ref) for actor in self._actor_handlers]
def async_train(self, rollout_id, rollout_data_ref):
return [actor.train.remote(rollout_id, rollout_data_ref) for actor in self._actor_handlers]
def update_weights(self):
return ray.get([actor.update_weights.remote() for actor in self._actor_handlers])
MegatronTrainRayActor 中有几个重要的设计点:
- 权重管理: 使用
torch_memory_saver进行显存管理,支持通过标签(tag)定向地卸载(offload)或加载(onload)参数。
# slime/backends/megatron_utils/actor.py
class MegatronTrainRayActor(TrainRayActor):
def init(self, args, role, with_ref=False):
# ...
if args.offload_train:
if (x := args.train_memory_margin_bytes) > 0:
torch_memory_saver.memory_margin_bytes = x
(self.model, self.optimizer, self.opt_param_scheduler, loaded_rollout_id) = initialize_model_and_optimizer(
args, role
)
- 模型 On/Offload: 通过
torch_memory_saver实现模型的休眠和唤醒:
# slime/backends/megatron_utils/actor.py
def sleep(self) -> None:
assert self.args.offload_train
clear_memory(clear_host_memory=True)
print_memory("before offload model")
destroy_process_groups()
torch_memory_saver.pause()
print_memory("after offload model")
def wake_up(self) -> None:
assert self.args.offload_train
print_memory("before wake_up model")
torch_memory_saver.resume()
clear_memory()
reload_process_groups()
print_memory("after wake_up model")
RolloutManager / SGLang Engine 初始化
同样可以分为三层来理解:
- 顶层 (
RolloutManager): 负责宏观的生命周期管理,包括启动 SGLang 负载均衡器、管理数据源以及实例化一组 SGLang 引擎。 - 中层 (
SGLangEngine): 作为面向 Ray 分布式环境的接口,封装了核心的 SGLang 推理引擎。 - 底层 (
SGLang HTTP Server): 包含了 SGLang 的具体实现,负责实际的文本生成任务。
RolloutManager 是一个 @ray.remote 装饰的 Ray Actor,通过 self.all_rollout_engines 列表持有多个 SGLangEngine 实例
RolloutManager 的初始化核心职责包括:

- 启动 Router (
_start_router): 通过multiprocessing.Process启动一个独立的sglang_router进程或自定义的slime_router进程。这个路由器作为负载均衡器,接收推理请求并分发给后端的 SGLang 服务。
# slime/ray/rollout.py
def _start_router(args):
if args.sglang_router_ip is not None:
return
args.sglang_router_ip = _wrap_ipv6(get_host_info()[1])
if args.sglang_router_port is None:
args.sglang_router_port = find_available_port(random.randint(3000, 4000))
if args.use_slime_router:
from slime.router.router import run_router
router_args = args
else:
from sglang_router.launch_router import RouterArgs
from slime.utils.http_utils import run_router
router_args = RouterArgs.from_cli_args(args, use_router_prefix=True)
router_args.host = args.sglang_router_ip
router_args.port = args.sglang_router_port
# ...
# 启动路由器进程
process = multiprocessing.Process(target=run_router, args=(router_args,))
process.daemon = True
process.start()
- 加载数据源和 Rollout 函数: 动态加载用户指定的数据源和 rollout 函数,实现高度可插拔的设计:
# slime/ray/rollout.py
class RolloutManager:
def __init__(self, args, pg):
# ...
data_source_cls = load_function(self.args.data_source_path)
self.data_source = data_source_cls(args)
self.generate_rollout = load_function(self.args.rollout_function_path)
self.eval_generate_rollout = load_function(self.args.eval_function_path)
- 创建 Rollout Engines (
init_rollout_engines): 创建 SGLang 引擎集群,并为每个引擎分配网络端口(HTTP、NCCL 等):
# slime/ray/rollout.py
def init_rollout_engines(args, pg, all_rollout_engines):
RolloutRayActor = ray.remote(SGLangEngine)
for i in range(num_engines):
rollout_engine = RolloutRayActor.options(
num_cpus=num_cpus,
num_gpus=num_gpus,
scheduling_strategy=scheduling_strategy,
).remote(args, rank=i, worker_type=worker_type, base_gpu_id=base_gpu_id)
all_rollout_engines[i] = rollout_engine
- 创建 Lock Actor 和 Health Monitor: 创建一个分布式锁 Actor,用于在权重更新时防止数据竞争。同时,如果启用了容错机制,会创建
RolloutHealthMonitor来监控引擎健康状态。
# slime/ray/rollout.py
class RolloutManager:
def __init__(self, args, pg):
# ...
self.rollout_engine_lock = Lock.options(num_cpus=1, num_gpus=0).remote()
self._health_monitor = None
if self.args.use_fault_tolerance:
self._health_monitor = RolloutHealthMonitor(self, args)
self._health_monitor.start()
模型与参数同步
模型同步初始化
MegatronTrainRayActor 的 init 方法完成分布式环境和模型的初始化。
- 分布式环境初始化:
init(args)会初始化torch.distributed,并设置 Megatron-LM 的模型并行/流水线并行组(MPU)。
# slime/backends/megatron_utils/initialize.py
def init(args):
set_args(args)
_initialize_distributed(args)
_set_random_seed(args.seed, args.data_parallel_random_init, ...)
_build_tokenizer(args)
- 分词器串行加载: 为了避免多个 rank 同时读写 Hugging Face 缓存目录导致文件损坏,配置和分词器采用串行加载(rank by rank)的方式。
# slime/backends/megatron_utils/actor.py
for i in range(dist.get_world_size()):
if i == dist.get_rank():
self.hf_config = AutoConfig.from_pretrained(args.hf_checkpoint, trust_remote_code=True)
self.tokenizer = AutoTokenizer.from_pretrained(self.args.hf_checkpoint, trust_remote_code=True)
dist.barrier(group=get_gloo_group())
- 模型和优化器初始化: 模型初始化遵循 Megatron 的实践,直接提供
GPTModel的model_provider:
# slime/backends/megatron_utils/model.py
def initialize_model_and_optimizer(args, role="actor"):
model, optimizer, opt_param_scheduler = setup_model_and_optimizer(args, role)
model[0].role = role
iteration, _ = load_checkpoint(model, optimizer, opt_param_scheduler, ...)
return model, optimizer, opt_param_scheduler, iteration
- Reference Model 初始化: 加载与主模型相同的初始权重,但在训练中保持不变,作为计算 KL 散度的基准。
参数同步连接
权重同步相关的代码(转换、通信协议、连接建立)在大规模框架中通常占很大比重。
通过 set_rollout_manager 方法建立训练 Actor 与 Rollout Manager 的连接:
# slime/ray/actor_group.py
class RayTrainGroup:
def set_rollout_manager(self, rollout_manager):
return ray.get([actor.set_rollout_manager.remote(rollout_manager) for actor in self._actor_handlers])
在执行层,根据 colocate 配置,采用两种不同策略建立通信组:
- 策略1: Co-located (Gloo IPC): 当训练和推理 Actor 在同一批 GPU 上时,使用
UpdateWeightFromTensor类。它创建 Gloo 通信组,通过序列化和 Ray IPC 传输权重数据。
# slime/backends/megatron_utils/update_weight/update_weight_from_tensor.py
class UpdateWeightFromTensor:
def __init__(self, args, model, weights_getter, ...):
# 创建 Gloo 通信组
for start_rank in range(0, dist.get_world_size(), self.args.rollout_num_gpus_per_engine):
group_ranks = list(range(start_rank, end_rank))
new_group = dist.new_group(ranks=group_ranks, backend="gloo")
- 策略2: Distributed (NCCL): 当 Actor 分布在不同机器时,使用
UpdateWeightFromDistributed类。它创建 NCCL 通信组,通过dist.broadcast广播权重。
# slime/backends/megatron_utils/update_weight/update_weight_from_distributed.py
class UpdateWeightFromDistributed:
def connect_rollout_engines(self, rollout_engines, rollout_engine_lock):
# 创建 NCCL 通信组
model_update_groups = init_process_group(
backend="nccl",
init_method=f"tcp://{master_address}:{master_port}",
world_size=world_size,
rank=0,
group_name=group_name,
)
由于所有 Megatron 权重最终都转换为 HF 格式供 rollout 侧消费,框架通过 megatron_to_hf 模块支持多种模型的权重转换(LLaMA、Qwen、GLM、DeepSeek 等)。
训练主流程
所有初始化和配置工作都是为了训练能够高效稳定地运行。主流程核心包括:样本生成 -> 模型训练 -> 权重更新。
训练流程 — 生成样本
RolloutManager 直接管理生成任务:
# slime/ray/rollout.py
class RolloutManager:
def generate(self, rollout_id):
start_time = time.time()
self.rollout_id = rollout_id
data, metrics = self._get_rollout_data(rollout_id=rollout_id)
data = self._convert_samples_to_train_data(data)
return self._split_train_data_by_dp(data, self.train_parallel_config["dp_size"])
数据生成逻辑通过 call_rollout_fn 调用用户自定义的 rollout 函数,这使得数据生成逻辑完全可插拔:
def _get_rollout_data(self, rollout_id):
data = call_rollout_fn(self.generate_rollout, self.args, rollout_id, self.data_source, evaluation=False)
metrics = data.metrics
data = data.samples
# flatten the data if it is a list of lists
while isinstance(data[0], list):
data = list(itertools.chain.from_iterable(data))
if not self.args.disable_rollout_trim_samples:
global_batch_size = self.args.global_batch_size
if self.args.use_dynamic_global_batch_size:
logger.info(f"Collected {len(data)} samples from rollout to train with dynamic global batch size")
# TODO: this is a temporary solution, we should directly save dynamic_global_batch_size to rollout data
self._dynamic_global_batch_size = self._compute_dynamic_global_batch_size(len(data))
global_batch_size = self._dynamic_global_batch_size
if len(data) % global_batch_size != 0:
trim_len = (len(data) // global_batch_size) * global_batch_size
if trim_len == 0:
raise ValueError(f"Not enough samples {len(data)} for global_batch_size {global_batch_size}")
origin_data_length = len(data)
data = data[:trim_len]
logger.info(f"trim number of samples from {origin_data_length} to {trim_len}")
logger.info(f"Final collected {len(data)} samples from rollout to train")
return data, metrics
# slime/rollout/base_types.py
def call_rollout_fn(fn, *args, evaluation: bool, **kwargs):
output = fn(*args, **kwargs, evaluation=evaluation)
if not isinstance(output, (RolloutFnTrainOutput, RolloutFnEvalOutput)):
output = RolloutFnEvalOutput(data=output) if evaluation else RolloutFnTrainOutput(samples=output)
return output
数据流: RolloutManager -> Ray Object Store -> Train Actor
Rollout 过程产生的经验(Sample dataclass)被转换为标准的 Python 字典格式(train_data),通过 Ray 对象存储高效地传输给训练 Actor。
训练流程 — 模型训练
# slime/backends/megatron_utils/actor.py
class MegatronTrainRayActor:
def train_actor(self, rollout_id, rollout_data):
# 1. 创建数据迭代器
data_iterator, num_microbatches = get_data_iterator(self.args, self.model, rollout_data)
# 2. Routing Replay 填充(如果启用)
if self.args.use_rollout_routing_replay:
self.fill_routing_replay(data_iterator, num_microbatches, rollout_data)
# 3. 计算 Reference Log Probs(如果需要)
if "ref" in self.weights_backuper.backup_tags:
self._switch_model("ref")
rollout_data.update(self.compute_log_prob(data_iterator, num_microbatches, store_prefix="ref_"))
# 4. 计算 Actor Log Probs
self._switch_model("old_actor" if self.args.keep_old_actor else "actor")
if not self.args.use_rollout_logprobs or self.args.get_mismatch_metrics:
rollout_data.update(self.compute_log_prob(data_iterator, num_microbatches, store_prefix=""))
# 5. 计算 Advantages 和 Returns
compute_advantages_and_returns(self.args, rollout_data)
# 6. 执行训练步骤
train(rollout_id, self.model, self.optimizer, self.opt_param_scheduler, data_iterator, num_microbatches)
训练流程的关键步骤:
- 获取数据:
TrainRayActor根据 DP rank 从 Ray Object Store 获取对应的数据分片(分片已在 RolloutManager 侧预完成)。 - 计算 Log Probs: 分别使用参考模型和策略模型进行前向传播,计算 log probabilities。
- 计算优势函数: 从 rollout 数据中取出 log probs 和 rewards,计算 KL 散度,并根据 GRPO/GSPO/PPO/REINFORCE++ 等算法公式计算
advantages和returns。 - 执行训练步骤: 调用 Megatron 的
forward_backward_func执行前向传播计算 loss -> 反向传播 -> 梯度裁剪 -> 参数更新。

训练流程 — 权重同步
训练完成后,需要将更新后的权重同步到 SGLang 推理引擎。
路径一:Co-located (Gloo IPC)
# slime/backends/megatron_utils/update_weight/update_weight_from_tensor.py
class UpdateWeightFromTensor:
def update_weights(self):
self.weight_version += 1
# 1. 暂停推理
ray.get([engine.pause_generation.remote() for engine in self.rollout_engines])
ray.get([engine.flush_cache.remote() for engine in self.rollout_engines])
# 2. 获取权重并转换为 HF 格式
megatron_local_weights = self.weights_getter()
# 3. 通过 Gloo gather + Ray IPC 发送
for hf_named_tensors in self._hf_weight_iterator.get_hf_weight_chunks(megatron_local_weights):
refs, long_lived_tensors = self._send_hf_params(hf_named_tensors)
ray.get(refs)
# 4. 恢复推理
ray.get([engine.continue_generation.remote() for engine in self.rollout_engines])
路径二:Distributed (NCCL Broadcast)
# slime/backends/megatron_utils/update_weight/update_weight_from_distributed.py
class UpdateWeightFromDistributed:
def update_weights(self):
self.weight_version += 1
# 1. 暂停推理
ray.get([engine.pause_generation.remote() for engine in self.rollout_engines])
ray.get([engine.flush_cache.remote() for engine in self.rollout_engines])
# 2. 非专家参数:gather TP -> 转换 HF -> broadcast
for name, param in named_params_and_buffers(self.args, self.model):
if ".experts." not in name:
param = all_gather_param(name, param)
# 转换并广播
# 3. 专家参数:gather EP -> 转换 HF -> broadcast
for name, param in named_params_and_buffers(self.args, self.model):
if ".experts." in name:
# EP gather + broadcast
# 4. 恢复推理
ray.get([engine.continue_generation.remote() for engine in self.rollout_engines])
数据源管理机制
Slime 的数据管理采用分层设计,从数据源获取、推理生成到训练消费,形成完整的数据流闭环。
数据源架构
数据源通过 DataSource 抽象类定义,主要有两种实现:

# slime/rollout/data_source.py
class DataSource(abc.ABC):
@abc.abstractmethod
def get_samples(self, num_samples: int) -> list[list[Sample]]:
"""获取指定数量的样本"""
@abc.abstractmethod
def add_samples(self, samples: list[list[Sample]]):
"""添加样本到数据源"""
@abc.abstractmethod
def save(self, rollout_id):
"""保存数据源状态"""
@abc.abstractmethod
def load(self, rollout_id=None):
"""加载数据源状态"""
1. RolloutDataSource(只读数据源)
从全局数据集读取 prompt,支持 shuffle 和断点续训:
# slime/rollout/data_source.py
class RolloutDataSource(DataSource):
def __init__(self, args):
self.epoch_id = 0
self.sample_offset = 0
if args.rollout_global_dataset:
self.dataset = Dataset(
args.prompt_data,
tokenizer=tokenizer,
processor=processor,
max_length=args.rollout_max_prompt_len,
)
def get_samples(self, num_samples):
# 从数据集获取 prompt 样本
prompt_samples = self.dataset.samples[self.sample_offset : self.sample_offset + num_samples]
self.sample_offset += num_samples
# 为每个 prompt 创建 n_samples_per_prompt 个副本
samples = []
for prompt_sample in prompt_samples:
group = []
for _ in range(self.args.n_samples_per_prompt):
sample = copy.deepcopy(prompt_sample)
sample.group_index = self.sample_group_index
sample.index = self.sample_index
self.sample_index += 1
group.append(sample)
self.sample_group_index += 1
samples.append(group)
return samples
2. RolloutDataSourceWithBuffer(带缓冲区数据源)
在只读数据源基础上增加缓冲区,支持将中断的样本重新加入队列:
# slime/rollout/data_source.py
class RolloutDataSourceWithBuffer(RolloutDataSource):
def __init__(self, args):
super().__init__(args)
self.buffer = [] # 缓冲区存储未完成的样本组
def get_samples(self, num_samples):
# 优先从缓冲区获取
samples = self._get_samples_from_buffer(num_samples)
num_samples -= len(samples)
# 不足部分从数据集获取
if num_samples > 0:
samples += super().get_samples(num_samples=num_samples)
return samples
def add_samples(self, samples):
# 将样本组添加到缓冲区
for group in samples:
self.buffer.append(group)
Sample 数据结构
Sample 是贯穿整个数据流的核心数据结构:
# slime/utils/types.py
@dataclass
class Sample:
# 标识
group_index: int | None = None # 同一 prompt 的样本组索引
index: int | None = None # 全局唯一索引
# Prompt 相关
prompt: str | list[dict] = "" # 原始 prompt
tokens: list[int] = field(default_factory=list) # 完整的 token 序列
multimodal_inputs: dict = None # 多模态输入
# Response 相关
response: str = "" # 生成的响应
response_length: int = 0 # 响应长度
reward: float | dict = None # 奖励值
loss_mask: list[int] = None # loss 掩码
# 状态
status: Status = Status.PENDING # PENDING/COMPLETED/TRUNCATED/ABORTED
# 元数据
metadata: dict = field(default_factory=dict)
rollout_log_probs: list[float] = None # 推理时的 log probs
推理侧:数据生成与存储
推理侧的数据流分为三个阶段:
- Prompt 获取
#train.py
# train loop.
# note that for async training, one can change the position of the sync operation(ray.get).
for rollout_id in range(args.start_rollout_id, args.num_rollout):
if args.eval_interval is not None and rollout_id == 0 and not args.skip_eval_before_train:
ray.get(rollout_manager.eval.remote(rollout_id))
# 远程调用generate
rollout_data_ref = ray.get(rollout_manager.generate.remote(rollout_id))
# slime/ray/rollout.py
class RolloutManager:
def generate(self, rollout_id):
# 调用用户自定义的 rollout 函数
data = call_rollout_fn(
self.generate_rollout,
self.args,
rollout_id,
self.data_source, # 传入数据源
evaluation=False
)
# slime/rollout/sglang_rollout.py
def generate_rollout(
args: Namespace, rollout_id: int, data_source: Any, evaluation: bool = False
) -> RolloutFnTrainOutput | RolloutFnEvalOutput:
assert args.rollout_global_dataset
if evaluation:
output, _ = run(eval_rollout(args, rollout_id))
return output
output, aborted_samples = run(generate_rollout_async(args, rollout_id, data_source.get_samples))
data_source.add_samples(aborted_samples)
return output
用户自定义的 rollout 函数从数据源获取 prompt:
# slime/rollout/sglang_rollout.py
async def generate_rollout_async(args, rollout_id, data_source):
target_data_size = args.rollout_batch_size
data = []
while len(data) < target_data_size:
# 从数据源获取样本
samples = data_source(args.over_sampling_batch_size)
# 提交生成任务
state.submit_generate_tasks(samples)
# 等待生成完成
done, state.pendings = await asyncio.wait(state.pendings, ...)
for task in done:
group = task.result()
data.append(group)
return RolloutFnTrainOutput(samples=data)
- 推理生成
# slime/rollout/sglang_rollout.py
def submit_generate_tasks(self, samples: list[list[Sample]]) -> None:
for group in samples:
self.pendings.add(
asyncio.create_task(
# submit a group of samples as a single task.
generate_and_rm_group(
self.args,
group,
sampling_params=self.sampling_params.copy(),
evaluation=False,
)
)
)
self.remaining_batch_size += len(samples)
async def generate_and_rm(args, sample, sampling_params):
# 1. 调用 SGLang 进行生成
sample = await generate(args, sample, sampling_params)
# 2. 计算 reward
if sample.reward is None:
sample.reward = await async_rm(args, sample)
return sample
async def generate(args, sample, sampling_params):
url = f"http://{args.sglang_router_ip}:{args.sglang_router_port}/generate"
# 准备请求
payload = {
"input_ids": prompt_ids,
"sampling_params": sampling_params,
"return_logprob": True, # 返回 log probs
}
# 发送请求
output = await post(url, payload)
# 更新 sample
sample.tokens = sample.tokens + new_response_tokens
sample.response_length += len(new_response_tokens)
sample.response += output["text"]
sample.rollout_log_probs += new_response_log_probs
sample.update_from_meta_info(args, output["meta_info"])
return sample
generate_and_rm_group :对样本组进行生成和奖励模型评估,并发生成所有样本generate_and_rm:单个样本的生成和奖励模型评估
- 数据转换与分片
推理完成后,RolloutManager 将 Sample 列表转换为训练数据格式:
# slime/ray/rollout.py
class RolloutManager:
def _convert_samples_to_train_data(self, samples):
# 后处理 reward(如 GRPO 的 group normalization)
raw_rewards, rewards = self._post_process_rewards(samples)
# 构建训练数据字典
train_data = {
"tokens": [sample.tokens for sample in samples],
"response_lengths": [sample.response_length for sample in samples],
"rewards": rewards,
"raw_reward": raw_rewards,
"loss_masks": [sample.loss_mask for sample in samples],
"truncated": [1 if sample.status == Sample.Status.TRUNCATED else 0 for sample in samples],
"sample_indices": [sample.index for sample in samples],
}
# 可选字段
if samples[0].rollout_log_probs is not None:
train_data["rollout_log_probs"] = [sample.rollout_log_probs for sample in samples]
return train_data
def _split_train_data_by_dp(self, data, dp_size):
"""按 DP size 分片数据"""
total_lengths = [len(t) for t in data["tokens"]]
data["total_lengths"] = total_lengths
# 计算分片策略(支持序列长度均衡)
if self.args.balance_data:
partitions = get_seqlen_balanced_partitions(total_lengths, dp_size, equal_size=True)
else:
partitions = [range(i, len(total_lengths), dp_size) for i in range(dp_size)]
# 为每个 DP rank 创建数据分片
rollout_data_refs = []
for i in range(dp_size):
rollout_data = {}
partition = partitions[i]
for key in ["tokens", "rewards", "loss_masks", ...]:
rollout_data[key] = [data[key][j] for j in partition]
# 放入 Ray Object Store
rollout_data_refs.append(Box(ray.put(rollout_data)))
return rollout_data_refs
训练侧:数据获取与消费
训练侧从 Ray Object Store 获取数据:
# slime/utils/data.py
def process_rollout_data(args, rollout_data_ref, dp_rank, dp_size):
# 从 Ray Object Store 获取数据
rollout_data = ray.get(rollout_data_ref[dp_rank].inner)
# 获取分片信息
partition = rollout_data.pop("partition")
total_lengths = rollout_data["total_lengths"]
# 保存序列长度(用于计时)
Timer().seq_lens = total_lengths
rollout_data["total_lengths"] = [total_lengths[i] for i in partition]
return rollout_data
# slime/backends/megatron_utils/actor.py
class MegatronTrainRayActor:
def _get_rollout_data(self, rollout_data_ref):
rollout_data = process_rollout_data(
self.args,
rollout_data_ref,
mpu.get_data_parallel_rank(with_context_parallel=False),
mpu.get_data_parallel_world_size(with_context_parallel=False),
)
# 将 tokens 移动到 GPU
rollout_data["tokens"] = [
torch.tensor(t, dtype=torch.long, device=torch.cuda.current_device())
for t in rollout_data["tokens"]
]
return rollout_data
数据流完整路径
┌─────────────────────────────────────────────────────────────────────────────┐
│ 数据流完整路径 │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ RolloutManager.generate() │ │
│ │ │ │ │
│ │ │ call_rollout_fn() │ │
│ │ ▼ │ │
│ │ ┌──────────────────────────────────────────────────────────┐ │ │
│ │ │ generate_rollout (用户自定义函数) │ │ │
│ │ │ │ │ │
│ │ │ ┌──────────────┐ ┌──────────────┐ │ │ │
│ │ │ │ DataSource │ get_samples() │ Sample │ │ │ │
│ │ │ │ (prompt数据) │ ─────────────────> │ (prompt) │ │ │ │
│ │ │ └──────────────┘ └──────────────┘ │ │ │
│ │ │ │ │ │ │
│ │ │ │ submit_generate_tasks()
│ │ │ ▼ │ │ │
│ │ │ ┌─────────────────────┐ ┌─────────────────────┐
│ │ │ │ SGLang Engine │ 生成完成 │ Sample (completed) │
│ │ │ │ (推理生成) │ ──────────> │ (tokens, response, │
│ │ │ └─────────────────────┘ │ reward, log_probs) │
│ │ │ └─────────────────────┘
│ │ └──────────────────────────────────────────────────────────────────┘ │
│ │ │ │ │
│ │ │ 返回 RolloutFnTrainOutput │ │
│ │ ▼ │ │
│ │ ┌─────────────────────────────────────────────────────────────┐ │ │
│ │ │ _convert_samples_to_train_data() │ │ │
│ │ │ - reward 后处理(group normalization) │ │ │
│ │ │ - 构建 train_data 字典 │ │ │
│ │ └─────────────────────────────────────────────────────────────┘ │ │
│ │ ┌─────────────────────────────────────────────────────────────┐ │ │
│ │ │ _split_train_data_by_dp() │ │ │
│ │ │ - 按序列长度均衡分片 │ │ │
│ │ │ - ray.put() 放入 Object Store │ │ │
│ │ └─────────────────────────────────────────────────────────────┘ │ │
│ └──────────────────────────────────────────────────────────────────┘ │
│ │ │
│ │ ray.put() -> List[ObjectRef] │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ Ray Object Store │ │
│ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │
│ │ │ DP rank 0 │ │ DP rank 1 │ │ DP rank 2 │ │ DP rank N │ │ │
│ │ └────────────┘ └────────────┘ └────────────┘ └────────────┘ │ │
│ └──────────────────────────────────────────────────────────────────┘ │
│ │ │
│ │ ray.get() per DP rank │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ TrainRayActor (per DP rank) │ │
│ │ ┌─────────────────────────────────────────────────────────────┐ │ │
│ │ │ _get_rollout_data() │ │ │
│ │ │ - 从 Object Store 获取数据 │ │ │
│ │ │ - tokens 移动到 GPU │ │ │
│ │ └─────────────────────────────────────────────────────────────┘ │ │
│ │ ┌─────────────────────────────────────────────────────────────┐ │ │
│ │ │ train_actor() │ │ │
│ │ │ - 创建 DataIterator │ │ │
│ │ │ - 计算 log_probs (ref + actor) │ │ │
│ │ │ - 计算 advantages/returns │ │ │
│ │ │ - 执行训练步骤 │ │ │
│ │ └─────────────────────────────────────────────────────────────┘ │ │
│ └──────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
高级特性
slime支持以下高级特性(仅列举一部分):
- 在策略蒸馏 (On-Policy Distillation):在策略蒸馏 (OPD) 让学生模型在自己的 rollout 数据上训练,同时匹配教师模型的 token 级 log-probability,从而实现从大模型到小模型的知识传递。
- 投机采样:加速 rollout 的重要优化手段。推理过程中不再让昂贵的 Target Model 逐个 token 进行 decode,而是先由一个轻量级的 draft model 先进行 decode,生成多个 token 后,再由大模型进行批量验证。
- 低精度训练:FP8 推理与 BF16 训练、FP8 推理与 FP8 训练、INT4 QAT 训练
- 可重现性 (Reproducibility):通过结合 SGLang 的确定性推理和 Megatron-LM 的确定性模式,slime 支持按位实验重现。
- rollout 容灾:slime 会在 rollout 过程中,定期向所有 SGLang server 发送心跳请求(/health_generate),如果心跳超时,则会停止这个 SGLang server。并在这轮 rollout 完成之后进行重启和正确的参数更新。
- PD 分离:可以通过设置 --prefill-num-servers 参数来指定用于 Prefill 的服务器数量。
- Full Async:创建一个全局工作线程,在后台保持运行,不断拉取 Prompt 并启动生成任务。
- Retool: from SFT to RL,使用 retool 功能进行支持工具的语言模型生成。
- Multi-Agent RL
核心优势
-
工业级的"训推一体"架构: 通过 Ray Actor 模型,将专为大规模设计的离线训练引擎(Megatron-LM)和专为低延迟、高吞吐量设计的在线推理引擎(SGLang)无缝地"粘合"在一起。
-
高效的"知识"流动闭环: 框架实现了 RL 的核心飞轮:生成 -> 训练 -> 同步。
- 数据流: 通过
RolloutManager和 Ray Object Store,实现了从"探索"到"学习"的高效数据回传。 - 权重流: 通过自适应的权重同步机制(Co-located Gloo IPC 和 Distributed NCCL Broadcast),实现了从"学习"到"行动"的低延迟知识更新。
- 数据流: 通过
-
高度的灵活性与可扩展性:
- 可插拔的探索与奖励: 通过配置文件指定
rollout_function_path和custom_rm_path,用户可以轻松自定义 Agent 的探索行为和价值判断标准。 - 模型无关性: 通过
slime_plugins和megatron_to_hf模块,支持 LLaMA、Qwen、GLM、DeepSeek 等主流模型。
- 可插拔的探索与奖励: 通过配置文件指定
-
完善的容错机制: 通过
RolloutHealthMonitor实现推理引擎的健康监控和自动恢复,保证大规模训练的稳定性。 -
丰富的优化技术:
- Routing Replay: 复用推理阶段的 MoE 专家路由决策
- True On-Policy Mode: 消除训练和推理之间的数值差异
- Off-Policy 修正: 支持 TIS、OPSM、ICEPOP 等技术
- Speculative Decoding: 推测解码支持和统计
NPU 适配
Slime
- common.py 新增
is_npu函数,用于判断是否是 NPU,走入特定 NPU 处理。 - Ray 的资源分配:由于不支持
ray.remote(num_gpus=1 ...)的形式,需要改造资源分配的方式,通过入参resources来分配资源。 - Ray 不支持
get_gpu_ids()函数:修改为等价实现,由于修改后返回值是 str 类型,因此还要进行 int 类型转换。 - 手动转换部分:
- hccl 手动修改 nccl
- cuda 手动修改 npu
- CUDA_VISIBLE_DEVICES 修改为 ASCEND_RT_VISIBLE_DEVICES
- Megatron 要用上 mindspeed 的插件仓,添加相关代码。
- megatron_bridge 的 model_provider 需要注入 mindspeed 的额外参数:使用 megatron_bridge 加载权重的方式,model_provider 如果没有额外注入,无法识别到 mindspeed 的参数,因此需要额外逻辑注入参数。
Megatron
- @jit_fuser 修改:将@jit_fuser 修饰符全部去除。
- 手动转换,修改 cuda 为 npu。
Megatron-Bridge
- Mindspeed 对 te 模块打 patch 有额外前缀,导致分支未走入,后续会去掉前缀,目前 PR 已合入。
- 修 megatron-bridge 的重计算 bug。
MindSpeed
- mindspeed 落后 megatron 版本,入参有对不齐的问题。
- slime 传入 args 格式与 mindspeed 的格式不符,需要额外转换一遍。
Sglang 精度修复
- 修改了 qwen3_vl.py 中 vision model 中 fast_pos_embed_interpolate。
- 回退了 input_deepstack_embeds 的修改。
- attention_backend 里面走 fia 算子。
参考
鲲鹏昇腾开发者社区是面向全社会开放的“联接全球计算开发者,聚合华为+生态”的社区,内容涵盖鲲鹏、昇腾资源,帮助开发者快速获取所需的知识、经验、软件、工具、算力,支撑开发者易学、好用、成功,成为核心开发者。
更多推荐



所有评论(0)