Compare commits

..

2 Commits

Author SHA1 Message Date
Wing Lian
cec99c4133 fix test dims 2026-04-20 20:45:19 -04:00
Wing Lian
d248242490 support for vllm 0.19.1 2026-04-19 18:09:46 -04:00
10 changed files with 66 additions and 1190 deletions

View File

@@ -89,7 +89,7 @@ def parse_requirements(extras_require_map):
]
if not install_xformers:
_install_requires.pop(_install_requires.index(xformers_version))
extras_require_map["vllm"] = ["vllm>=0.19.0"]
extras_require_map["vllm"] = ["vllm>=0.19.1"]
elif (major, minor) >= (2, 9):
extras_require_map.pop("fbgemm-gpu")
extras_require_map["fbgemm-gpu"] = [

View File

@@ -1103,22 +1103,11 @@ class AsyncGRPOTrainer(GRPOTrainer):
- vllm_lora_sync: saves adapter to filesystem, vLLM loads natively
- PEFT no-merge: computes merged weights as new tensors, NCCL broadcast
- Non-PEFT: stock sync_weights via merge_adapter + NCCL
This is the canonical sync trigger and runs in BOTH async and
synchronous modes from ``_prepare_inputs_with_data_producer`` /
``_prepare_inputs_legacy_async``. The ``_generate_single_turn``
patch is a parallel backup for non-data-producer paths (vanilla
GRPO without NeMo Gym), where the data producer is bypassed
entirely and TRL's stock generate-then-sync flow is used instead.
"""
if not self.use_vllm:
if not (self.use_vllm and self.args.async_prefetch):
return
step = self.state.global_step
# Default to syncing every step when no interval is configured —
# otherwise ``step % None`` would TypeError, and the previous
# behavior of crashing on the first sync was strictly worse than
# the standard "sync every optimizer step".
interval = self.args.vllm_sync_interval or 1
interval = self.args.vllm_sync_interval
if step != self._last_synced_step and step % interval == 0:
if step == 0:
logger.info("Skipping vLLM weight sync at step 0 (no training yet)")
@@ -1213,42 +1202,13 @@ class AsyncGRPOTrainer(GRPOTrainer):
# Permanently replace vllm_generation.sync_weights with our custom
# sync to avoid merge_adapter (fails on FP8 / races with training).
#
# The design has two modes that have to be threaded carefully:
#
# - Async prefetch ON: BG generation thread can't safely call
# sync_weights mid-rollout (it races with the trainer's optimizer
# step and can corrupt weights). We no-op the stock sync hook and
# drive sync ourselves from ``_maybe_sync_vllm_weights`` after the
# optimizer step on the main thread.
#
# - Async prefetch OFF (synchronous mode): TRL's stock
# ``_generate_single_turn`` calls ``sync_weights`` once per step
# boundary. There's no BG thread to race with, and
# ``_maybe_sync_vllm_weights`` short-circuits with
# ``if not async_prefetch: return``, so we MUST wire the stock
# hook directly to our LoRA sync helper — otherwise nothing ever
# pushes weights to vLLM and the trainer becomes a no-op (vLLM
# keeps serving the base model, every rollout in every group
# produces identical outputs, advantages are zero, optimizer
# step gets skipped, repeat).
# For LoRA sync mode, make it a no-op here since _maybe_sync_vllm_weights
# handles the sync with proper interval tracking.
if not getattr(self, "_patched_sync_weights", False):
if self.use_vllm and hasattr(self, "vllm_generation"):
if getattr(self.args, "vllm_lora_sync", False):
if getattr(self.args, "async_prefetch", False):
# Async: drive sync from main thread via
# _maybe_sync_vllm_weights instead.
self.vllm_generation.sync_weights = lambda: None
else:
# Sync mode: TRL's _generate_single_turn already
# calls sync_weights once per step boundary. Wire
# it directly to our LoRA filesystem sync helper.
sync_helper = self._sync_lora_adapter
def _lora_filesystem_sync():
sync_helper()
self.vllm_generation.sync_weights = _lora_filesystem_sync
# No-op: LoRA sync is driven by _maybe_sync_vllm_weights
self.vllm_generation.sync_weights = lambda: None
self._patched_sync_weights = True
else:
from accelerate.utils import is_peft_model

View File

@@ -19,7 +19,6 @@ Supports two modes:
from __future__ import annotations
import os
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Union
from axolotl.integrations.base import BasePlugin
@@ -31,107 +30,6 @@ if TYPE_CHECKING:
LOG = get_logger(__name__)
# ---- vLLM weight-sync transport probe ------------------------------------
@dataclass
class VLLMWeightSyncCapabilities:
"""What weight-sync routes a vLLM server actually exposes.
Discovered once at ``pre_model_load`` time by fetching the server's
``/openapi.json``. Drives the transport-selection table below.
"""
nccl: bool = False # /init_communicator/ + /update_named_param/
lora_filesystem: bool = False # /v1/load_lora_adapter (vLLM native)
lora_axolotl: bool = False # /set_lora_adapter/ (axolotl serve_lora extension)
http_full: bool = False # /http_update_weights/ (axolotl serve_lora extension)
probed: bool = False
probe_error: str | None = None
routes: list[str] = field(default_factory=list)
@property
def any_full_param_sync(self) -> bool:
"""True if at least one transport can push full-model weights."""
return self.nccl or self.http_full
@property
def any_lora_sync(self) -> bool:
"""True if at least one transport can push LoRA adapters."""
return self.lora_filesystem or self.lora_axolotl or self.nccl
def probe_vllm_weight_sync(
base_url: str, timeout: float = 5.0
) -> VLLMWeightSyncCapabilities:
"""Detect which weight-sync routes the configured vLLM server exposes.
Uses the server's FastAPI ``/openapi.json`` — every weight-sync transport
we care about is mounted as a POST route there. Falls back to all-False
on any error so the caller can still decide what to do (typically: raise
a clear error rather than silently no-op).
"""
import requests
caps = VLLMWeightSyncCapabilities()
try:
r = requests.get(f"{base_url.rstrip('/')}/openapi.json", timeout=timeout)
r.raise_for_status()
spec = r.json()
routes = sorted((spec.get("paths") or {}).keys())
caps.routes = routes
caps.nccl = "/init_communicator/" in routes and "/update_named_param/" in routes
caps.lora_filesystem = "/v1/load_lora_adapter" in routes
caps.lora_axolotl = "/set_lora_adapter/" in routes
caps.http_full = "/http_update_weights/" in routes
caps.probed = True
except Exception as exc:
caps.probe_error = f"{type(exc).__name__}: {exc}"
LOG.warning(
"NeMo Gym: failed to probe vLLM /openapi.json at %s%s. "
"Will fall back to LoRA-only behavior.",
base_url,
caps.probe_error,
)
return caps
def select_weight_sync_transport(
caps: VLLMWeightSyncCapabilities,
*,
has_lora: bool,
vllm_lora_sync_pref: bool,
) -> str:
"""Pick the right transport for a (server caps, model type) combo.
Returns one of: ``"lora_filesystem"``, ``"nccl"``, ``"http_full"``, or
``"none"``. The caller decides what to do with ``"none"`` (typically:
raise an error explaining the misconfiguration).
Selection table:
LoRA model + lora endpoint + lora-sync pref → lora_filesystem
LoRA model + lora endpoint → lora_filesystem
LoRA model + nccl endpoint → nccl (broadcast merged adapter)
Full model + nccl endpoint → nccl
Full model + http endpoint → http_full
anything else → none
"""
if has_lora:
if (caps.lora_filesystem or caps.lora_axolotl) and vllm_lora_sync_pref:
return "lora_filesystem"
if caps.lora_filesystem or caps.lora_axolotl:
return "lora_filesystem"
if caps.nccl:
return "nccl"
return "none"
# Full-parameter model
if caps.nccl:
return "nccl"
if caps.http_full:
return "http_full"
return "none"
class NemoGymPlugin(BasePlugin):
"""Plugin for NVIDIA NeMo Gym integration with Axolotl.
@@ -152,69 +50,37 @@ class NemoGymPlugin(BasePlugin):
self._reward_fn = None
self._dataset_lookup = None
self._agent_servers = {}
self._vllm_caps: VLLMWeightSyncCapabilities | None = None
def get_input_args(self):
return "axolotl.integrations.nemo_gym.NemoGymArgs"
def pre_model_load(self, cfg):
"""Probe vLLM weight-sync routes and conditionally bypass NCCL init.
Replaces the previous unconditional ``init_communicator`` monkey-patch
with a probe of the configured vLLM server's ``/openapi.json``. We only
bypass NCCL init when the server we're talking to actually lacks the
``/init_communicator/`` route (i.e. stock ``vllm serve``); against
TRL/axolotl serve modules that DO expose NCCL routes, we leave the
standard TRL flow alone so full-finetune training can sync weights.
"""
"""Apply monkeypatches before trainer creation."""
if not cfg.nemo_gym_enabled:
return
# Always skip NCCL communicator init in NeMo Gym mode.
# NeMo Gym uses its own vLLM server (standard OpenAI API), not the TRL
# colocate/NCCL path. The NCCL init fails with vLLM V1 and standard servers.
trl_cfg = getattr(cfg, "trl", None)
if not (trl_cfg and getattr(trl_cfg, "vllm_mode", "server") == "server"):
return
host = getattr(trl_cfg, "vllm_server_host", None) or "127.0.0.1"
port = getattr(trl_cfg, "vllm_server_port", None) or 8000
base_url = f"http://{host}:{port}"
self._vllm_caps = probe_vllm_weight_sync(base_url)
if self._vllm_caps.probed:
LOG.info(
"NeMo Gym: vLLM weight-sync probe @ %s — nccl=%s lora_native=%s "
"lora_axolotl=%s http_full=%s",
base_url,
self._vllm_caps.nccl,
self._vllm_caps.lora_filesystem,
self._vllm_caps.lora_axolotl,
self._vllm_caps.http_full,
)
# Only bypass NCCL init when the server doesn't speak it. If NCCL is
# available we leave VLLMClient.init_communicator alone so the
# standard TRL sync flow can run for full-parameter training.
if not self._vllm_caps.nccl:
if trl_cfg and getattr(trl_cfg, "vllm_mode", "server") == "server":
self._patch_skip_nccl_init()
def _patch_skip_nccl_init(self):
"""Monkeypatch VLLMClient.init_communicator to no-op.
Only called when the configured vLLM server doesn't expose
``/init_communicator/`` (e.g. stock ``vllm serve``). In that case
TRL's standard ``init_communicator`` would 404 inside trainer
construction; we no-op it so the LoRA filesystem path can install
its own sync in ``post_trainer_create``.
NeMo Gym uses its own vLLM server (standard OpenAI API or custom LoRA
serve script). The NCCL communicator is not needed and fails with both
vLLM V1 engine and standard OpenAI server mode.
"""
try:
from trl.generation.vllm_client import VLLMClient
VLLMClient._original_init_communicator = VLLMClient.init_communicator
VLLMClient.init_communicator = lambda self, **kwargs: LOG.info(
"Skipping NCCL init_communicator (server has no /init_communicator/)"
)
LOG.info(
"Patched VLLMClient.init_communicator to no-op (server has no NCCL routes)"
"Skipping NCCL init_communicator (LoRA sync mode)"
)
LOG.info("Patched VLLMClient.init_communicator to no-op for LoRA sync")
except Exception as exc:
LOG.warning(f"Failed to patch VLLMClient: {exc}")
@@ -368,80 +234,30 @@ class NemoGymPlugin(BasePlugin):
verify_timeout = cfg.nemo_gym_verify_timeout or 30
multi_turn = cfg.nemo_gym_multi_turn or False
# Pick a weight-sync transport based on what the configured vLLM
# server actually exposes (see ``pre_model_load`` probe) and what
# kind of model we're training. The selection table is documented
# in ``select_weight_sync_transport``.
# Handle weight sync. NeMo Gym skips NCCL init, so we need to either:
# - Install LoRA sync (when vllm_lora_sync=True)
# - Or no-op sync_weights (when using standard vLLM server)
trl_cfg = getattr(cfg, "trl", None)
if hasattr(trainer, "vllm_generation") and trainer.vllm_generation:
vllm_gen = trainer.vllm_generation
adapter = getattr(cfg, "adapter", None)
has_lora = adapter in ("lora", "qlora")
vllm_lora_sync_pref = bool(
trl_cfg and getattr(trl_cfg, "vllm_lora_sync", False)
)
caps = self._vllm_caps or VLLMWeightSyncCapabilities()
transport = select_weight_sync_transport(
caps,
has_lora=has_lora,
vllm_lora_sync_pref=vllm_lora_sync_pref,
)
if transport == "lora_filesystem":
if trl_cfg and getattr(trl_cfg, "vllm_lora_sync", False):
self._setup_lora_sync(trainer)
# Verify the vLLM server supports runtime LoRA loading
self._check_lora_endpoint(vllm_gen)
LOG.info("NeMo Gym weight sync: LoRA filesystem")
elif transport == "nccl":
# Standard TRL NCCL path. We leave ``VLLMClient.init_communicator``
# alone (pre_model_load only patched it when the probe found no
# NCCL route) so the trainer's normal weight-sync flow runs.
LOG.info(
"NeMo Gym weight sync: NCCL (server exposes /init_communicator/)"
else:
# No NCCL, no LoRA sync — skip all weight sync paths
vllm_gen.sync_weights = lambda: LOG.debug(
"Weight sync skipped (NeMo Gym mode)"
)
elif transport == "http_full":
# Full-parameter HTTP sync — implementation lands in step 3.
# For now, fail loudly so users know the path is detected but
# not yet wired up, instead of silently no-oping like before.
raise NotImplementedError(
"NeMo Gym + full fine-tune + HTTP weight sync is detected "
"but the client-side sync helper is not yet implemented "
"(planned). Use `adapter: lora|qlora` for now, or use a "
"vLLM serve module that exposes /init_communicator/ for "
"NCCL sync."
type(vllm_gen).sync_weights = lambda self: LOG.debug(
"Weight sync skipped (NeMo Gym mode)"
)
else: # transport == "none"
# No viable sync path. Build a precise error so the user knows
# exactly what's missing and how to fix it.
if not caps.probed:
msg = (
"could not probe the vLLM server's "
f"/openapi.json: {caps.probe_error}. "
"Verify that vLLM is reachable at "
f"{getattr(trl_cfg, 'vllm_server_host', '?')}:"
f"{getattr(trl_cfg, 'vllm_server_port', '?')}."
# Also patch the async trainer's internal sync method
if hasattr(trainer, "_maybe_sync_vllm_weights"):
trainer._maybe_sync_vllm_weights = lambda: LOG.debug(
"Async weight sync skipped (NeMo Gym mode)"
)
elif has_lora:
msg = (
"the vLLM server has neither NCCL routes "
"(/init_communicator/) nor a LoRA-loading route "
"(/v1/load_lora_adapter or /set_lora_adapter/). "
"Restart vLLM with `--enable-lora --max-lora-rank N "
"VLLM_ALLOW_RUNTIME_LORA_UPDATING=1` for the stock "
"server, or use `axolotl vllm-serve` for the "
"NCCL-capable serve module."
)
else:
msg = (
"the vLLM server exposes no full-parameter sync route "
"(/init_communicator/ for NCCL or /http_update_weights/ "
"for HTTP). Use `axolotl vllm-serve` (which has both) "
"or set `adapter: lora|qlora`."
)
raise ValueError(
f"NeMo Gym: no usable weight-sync transport — {msg} Without "
"weight sync the trainer's gradient updates never reach the "
"rollout policy (functionally a no-op trainer)."
)
LOG.info("Disabled weight sync (NeMo Gym mode, no LoRA sync)")
if multi_turn:
self._wire_multi_turn(cfg, trainer, model_name, verify_timeout)

View File

@@ -130,41 +130,21 @@ def start_servers(
)
def get_server_configs(head_port: int = 11000, timeout: float = 30.0) -> dict:
def get_server_configs(head_port: int = 11000) -> dict:
"""Fetch the global config from the NeMo Gym head server.
Retries up to 3 times with exponential backoff. The default per-attempt
timeout is 30s (raised from the original 5s) because head servers can
be slow to respond when they're concurrently serving rollouts from a
prior training run. A 5s timeout was empirically too tight to survive
a kill-and-relaunch cycle.
Returns:
Dict mapping server_name -> server config.
"""
url = f"http://127.0.0.1:{head_port}/global_config_dict_yaml"
last_exc: Exception | None = None
for attempt in (1, 2, 3):
try:
response = requests.get(url, timeout=timeout)
response.raise_for_status()
result = yaml.safe_load(response.text)
# NeMo Gym head server double-encodes: YAML string inside a YAML string
if isinstance(result, str):
result = yaml.safe_load(result)
return result
except (requests.exceptions.RequestException, OSError) as exc:
last_exc = exc
LOG.warning(
"NeMo Gym head probe attempt %d/3 failed: %s. Retrying...",
attempt,
type(exc).__name__,
)
if attempt < 3:
time.sleep(2.0 * attempt)
raise RuntimeError(
f"NeMo Gym head server at {url} did not respond after 3 attempts: {last_exc}"
response = requests.get(
f"http://127.0.0.1:{head_port}/global_config_dict_yaml", timeout=5
)
response.raise_for_status()
result = yaml.safe_load(response.text)
# NeMo Gym head server double-encodes: YAML string inside a YAML string
if isinstance(result, str):
result = yaml.safe_load(result)
return result
def get_agent_servers(

View File

@@ -320,15 +320,6 @@ def main(script_args: ScriptArguments):
# --- Active LoRA state (shared across endpoints via closure) ---
active_lora: dict = {"request": None}
# Serializes access to the worker pipe. The underlying
# multiprocessing.Connection is a single full-duplex stream shared
# across all HTTP handlers; concurrent requests interleave bytes on
# the wire and corrupt the pickle framing (seen as
# ``UnpicklingError: pickle data was truncated``). Any endpoint that
# does ``conn.send(...); conn.recv()`` MUST hold this lock across
# the round-trip so only one inflight call at a time per pipe.
worker_pipe_lock = asyncio.Lock()
# ------------------------------------------------------------------
# LoRA-specific endpoints
# ------------------------------------------------------------------
@@ -640,147 +631,6 @@ def main(script_args: ScriptArguments):
},
}
@app.post("/v1/completions")
async def openai_completions(request_body: dict):
"""OpenAI-compatible text-completions endpoint.
Accepts either a string ``prompt`` or a list-of-int
``prompt_token_ids`` (as the text-completions spec allows). Routes
to the internal vLLM generate method with the active LoRA adapter
and returns an OpenAI /v1/completions-shaped response including
per-choice ``prompt_token_ids``, ``generation_token_ids``, and
``generation_log_probs`` for NeMo Gym agents that need raw
tokens + logprobs.
"""
import uuid
prompt_raw = request_body.get("prompt")
temperature = request_body.get("temperature", 1.0)
max_tokens = request_body.get("max_tokens", 512)
top_p = request_body.get("top_p", 1.0)
n = request_body.get("n", 1)
logprobs = request_body.get("logprobs") or 0
stop_token_ids = request_body.get("stop_token_ids") or None
# Accept either a string or a list[int] token id prompt. Lists
# must contain ints only (raise on lists of strings so callers get
# a clear error). Also accept [[int, int, ...]] nesting for the
# rare case callers pass a single-prompt batch.
if isinstance(prompt_raw, list) and prompt_raw and isinstance(prompt_raw[0], list):
prompt_raw = prompt_raw[0]
if isinstance(prompt_raw, list):
prompt_dict = {"prompt_token_ids": prompt_raw}
elif isinstance(prompt_raw, str):
prompt_dict = {"prompt": prompt_raw}
else:
return {
"error": {
"message": (
"prompt must be a string or a list of token ids"
),
"type": "invalid_request",
}
}
generation_kwargs: dict[str, Any] = {
"n": n,
"temperature": temperature,
"top_p": top_p,
"max_tokens": max_tokens,
"logprobs": logprobs,
}
if stop_token_ids:
generation_kwargs["stop_token_ids"] = stop_token_ids
sampling_params = SamplingParams(
**{k: v for k, v in generation_kwargs.items() if v is not None}
)
chunked = chunk_list([prompt_dict], script_args.data_parallel_size)
# Hold the pipe lock across send+recv — concurrent requests would
# otherwise interleave pickle frames on the worker connection.
async with worker_pipe_lock:
for conn, chunk in zip(connections, chunked, strict=True):
if not chunk:
chunk = [{"prompt": "<placeholder>"}]
kwargs = {
"prompts": chunk,
"sampling_params": sampling_params,
"lora_request": active_lora["request"],
}
conn.send({"type": "call", "method": "generate", "kwargs": kwargs})
loop = asyncio.get_running_loop()
all_outputs = await asyncio.gather(
*(loop.run_in_executor(None, safe_recv, conn) for conn in connections)
)
all_outputs = [
o for o, c in zip(all_outputs, chunked, strict=True) if c
]
for o in all_outputs:
if isinstance(o, dict) and "error" in o:
raise RuntimeError(f"vLLM worker error: {o['error']}")
all_outputs = list(chain.from_iterable(all_outputs))
if not all_outputs:
return {"choices": [], "model": script_args.model}
choices = []
for i, output in enumerate(all_outputs):
for j, out in enumerate(output.outputs):
text = out.text
# OpenAI-style `logprobs` block for text-completions:
# { "tokens": [...], "token_logprobs": [...] }
lp_block = None
if out.logprobs:
tokens_str: list[str] = []
token_lps: list[float] = []
for step in out.logprobs:
chosen = next(iter(step.values()))
tokens_str.append(getattr(chosen, "decoded_token", "") or "")
token_lps.append(float(chosen.logprob))
lp_block = {
"tokens": tokens_str,
"token_logprobs": token_lps,
}
choice = {
"index": i * n + j,
"text": text,
"finish_reason": "stop" if out.finish_reason == "stop" else "length",
"logprobs": lp_block,
# NeMo-Gym / retrace agent extras — preserved on the
# choice so callers with raw-token pipelines don't
# have to re-tokenize.
"prompt_token_ids": output.prompt_token_ids,
"generation_token_ids": list(out.token_ids),
"generation_log_probs": (
[float(next(iter(lp.values())).logprob) for lp in out.logprobs]
if out.logprobs
else []
),
}
choices.append(choice)
prompt_tokens = len(all_outputs[0].prompt_token_ids) if all_outputs else 0
completion_tokens = sum(
len(out.token_ids) for o in all_outputs for out in o.outputs
)
return {
"id": f"cmpl-{uuid.uuid4().hex[:8]}",
"object": "text_completion",
"model": script_args.model,
"choices": choices,
"usage": {
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": prompt_tokens + completion_tokens,
},
}
# --- Weight sync endpoints (legacy fallback, same as TRL) ---
@app.post("/init_communicator/")

View File

@@ -770,88 +770,6 @@ class RLValidationMixin:
)
return data
@model_validator(mode="before")
@classmethod
def check_grpo_batch_size_divisibility(cls, data):
"""Surface GRPO batch-shape mismatches at config-parse time.
TRL's GRPOTrainer requires that the per-step generation batch size be
evenly divisible by ``num_generations`` so that every prompt can be
replicated exactly ``num_generations`` times. The runtime check inside
``GRPOTrainer.__init__`` only fires after the model has been loaded —
too late and too cryptic for the user. We replicate the check here so
the failure is immediate and actionable.
Also enforces:
- ``num_generations >= 2`` (group-relative advantage needs variance)
- ``effective_gbs >= num_generations * world_size`` when capabilities
indicate multiple ranks (each rank needs at least one full group)
"""
if data.get("rl") != "grpo":
return data
trl_cfg = data.get("trl") or {}
num_gen = trl_cfg.get("num_generations")
if num_gen is None:
# TRL's own default is 8 — but if the user didn't set it, we
# don't have enough info to validate anything. Let TRL's own
# init handle the default-vs-batch interaction.
return data
if num_gen < 2:
raise ValueError(
f"GRPO requires `trl.num_generations >= 2` (got {num_gen}). "
"With num_generations=1, every group has zero advantage and "
"the policy never updates."
)
explicit_gbs = trl_cfg.get("generation_batch_size")
if explicit_gbs is not None:
effective_gbs = int(explicit_gbs)
gbs_source = "trl.generation_batch_size"
else:
mb = data.get("micro_batch_size") or 1
ga = data.get("gradient_accumulation_steps") or 1
effective_gbs = int(mb) * int(ga)
gbs_source = f"micro_batch_size ({mb}) * gradient_accumulation_steps ({ga})"
if effective_gbs % num_gen != 0:
# Suggest the smallest GA bump that fixes it for the common case
# where the user hasn't set generation_batch_size explicitly.
hint = ""
if explicit_gbs is None:
from math import gcd
mb_val = int(data.get("micro_batch_size") or 1)
# smallest GA such that mb*GA is a multiple of num_gen
lcm = num_gen * mb_val // gcd(num_gen, mb_val)
suggested_ga = lcm // mb_val
hint = (
f" Smallest fix: set `gradient_accumulation_steps: "
f"{suggested_ga}` (so micro_batch_size * GA = "
f"{mb_val * suggested_ga} is a multiple of {num_gen})."
)
raise ValueError(
f"GRPO: generation batch size must be divisible by "
f"`trl.num_generations`. Got effective_gbs={effective_gbs} "
f"(from {gbs_source}) and num_generations={num_gen}.{hint}"
)
# Multi-rank check: each rank must receive at least one full group
# per step. Without `capabilities` populated yet (mode='before'), we
# fall back to user-set distributed fields.
world_size = (
(data.get("capabilities") or {}).get("n_gpu") or data.get("world_size") or 1
)
if world_size and world_size > 1 and effective_gbs < num_gen * world_size:
raise ValueError(
f"GRPO with world_size={world_size} requires effective_gbs "
f">= num_generations * world_size = {num_gen * world_size}, "
f"got {effective_gbs}. Increase gradient_accumulation_steps "
f"or micro_batch_size."
)
return data
class OptimizationValidationMixin:
"""Validation methods related to optimization and performance."""

View File

@@ -216,197 +216,5 @@ class TestValidateQuantPatchRestore(unittest.TestCase):
self.assertIs(_trainer_module.validate_quantization_for_training, original)
class TestVllmLoraSyncPatch(unittest.TestCase):
"""The ``_generate_single_turn`` patch wires sync_weights to the right place.
These tests exercise the patch-installation branch in isolation. They build
a stub trainer with just enough attributes to look like
``AsyncGRPOTrainer`` for the duration of the relevant code path.
Background — there are two correct behaviors and we historically had a bug
where both modes used the same one:
- Async prefetch ON: the BG generation thread can't safely call
sync_weights mid-rollout. We no-op the stock hook and drive sync from
the main thread via ``_maybe_sync_vllm_weights``.
- Async prefetch OFF: TRL's stock ``_generate_single_turn`` already
calls ``sync_weights`` once per step boundary on the main thread. We
wire that hook directly to ``_sync_lora_adapter`` because
``_maybe_sync_vllm_weights`` short-circuits when async is off.
Before the fix, both modes installed ``lambda: None``, so sync mode never
pushed any LoRA adapter to vLLM and the trainer was a no-op.
"""
@staticmethod
def _make_stub_trainer(*, vllm_lora_sync, async_prefetch):
from axolotl.core.trainers.grpo.async_trainer import (
AsyncGRPOTrainer,
)
class FakeArgs:
pass
args = FakeArgs()
args.vllm_lora_sync = vllm_lora_sync
args.async_prefetch = async_prefetch
class FakeVllmGen:
sync_weights = staticmethod(lambda: None)
model = MagicMock()
# Use object.__new__ so we don't run __init__ (which needs a real
# model, dataset, etc.). We only need the `_generate_single_turn`
# method's patch branch to run, so we set up the minimum state.
trainer = object.__new__(AsyncGRPOTrainer)
trainer.args = args
trainer.use_vllm = True
trainer.vllm_generation = FakeVllmGen()
trainer._patched_sync_weights = False
# Spy on _sync_lora_adapter so we can assert it's the function the
# hook delegates to in sync mode.
trainer._sync_lora_adapter = MagicMock(name="_sync_lora_adapter_spy")
trainer._sync_peft_weights_no_merge = MagicMock(
name="_sync_peft_weights_no_merge_spy"
)
return trainer
@staticmethod
def _run_patch_branch(trainer):
"""Execute just the sync_weights-patching branch in isolation.
We can't easily call the real ``_generate_single_turn`` because it
does a full vLLM generate. Instead we copy the exact branch out of
the source so the test verifies the same logic the trainer runs.
"""
if not getattr(trainer, "_patched_sync_weights", False):
if trainer.use_vllm and hasattr(trainer, "vllm_generation"):
if getattr(trainer.args, "vllm_lora_sync", False):
if getattr(trainer.args, "async_prefetch", False):
trainer.vllm_generation.sync_weights = lambda: None
else:
sync_helper = trainer._sync_lora_adapter
def _lora_filesystem_sync():
sync_helper()
trainer.vllm_generation.sync_weights = _lora_filesystem_sync
trainer._patched_sync_weights = True
def test_sync_mode_with_lora_sync_wires_to_sync_lora_adapter(self):
trainer = self._make_stub_trainer(vllm_lora_sync=True, async_prefetch=False)
self._run_patch_branch(trainer)
assert trainer._patched_sync_weights is True
# Trigger the patched hook — it must call _sync_lora_adapter.
trainer.vllm_generation.sync_weights()
trainer._sync_lora_adapter.assert_called_once()
def test_async_mode_with_lora_sync_installs_noop_hook(self):
trainer = self._make_stub_trainer(vllm_lora_sync=True, async_prefetch=True)
self._run_patch_branch(trainer)
assert trainer._patched_sync_weights is True
# Hook must be a no-op so BG-thread generation doesn't fight the
# main-thread optimizer step over the model weights.
trainer.vllm_generation.sync_weights()
trainer._sync_lora_adapter.assert_not_called()
def test_sync_mode_with_lora_sync_does_not_call_during_install(self):
"""Installing the patch should not pre-emptively sync."""
trainer = self._make_stub_trainer(vllm_lora_sync=True, async_prefetch=False)
self._run_patch_branch(trainer)
# _sync_lora_adapter should only be called when the patched hook
# itself is invoked (e.g., from TRL's _generate_single_turn).
trainer._sync_lora_adapter.assert_not_called()
def test_patch_is_idempotent(self):
trainer = self._make_stub_trainer(vllm_lora_sync=True, async_prefetch=False)
self._run_patch_branch(trainer)
first_hook = trainer.vllm_generation.sync_weights
# Second call must not re-patch (otherwise we'd lose the original).
self._run_patch_branch(trainer)
assert trainer.vllm_generation.sync_weights is first_hook
class TestMaybeSyncVllmWeightsIntervalDefault(unittest.TestCase):
"""``_maybe_sync_vllm_weights`` must not crash when interval is unset.
Before the fix, ``step % self.args.vllm_sync_interval`` would TypeError
on the very first call when ``vllm_sync_interval`` was ``None`` (which
is the default for any config that doesn't explicitly set it). We now
fall back to interval=1 so unset means "sync every step", matching the
behavior of TRL's own ``_generate_single_turn``.
"""
@staticmethod
def _make_stub_trainer(interval, async_prefetch):
from axolotl.core.trainers.grpo.async_trainer import (
AsyncGRPOTrainer,
)
class FakeArgs:
pass
args = FakeArgs()
args.async_prefetch = async_prefetch
args.vllm_sync_interval = interval
args.vllm_lora_sync = True
class FakeState:
global_step = 1
trainer = object.__new__(AsyncGRPOTrainer)
trainer.args = args
trainer.use_vllm = True
trainer.state = FakeState()
trainer._last_synced_step = 0
trainer._sync_lora_adapter = MagicMock(name="sync_spy")
return trainer
def test_interval_none_in_async_mode_does_not_crash(self):
trainer = self._make_stub_trainer(interval=None, async_prefetch=True)
from axolotl.core.trainers.grpo.async_trainer import (
AsyncGRPOTrainer,
)
# Should not raise TypeError — defaults to every-step sync
AsyncGRPOTrainer._maybe_sync_vllm_weights(trainer)
trainer._sync_lora_adapter.assert_called_once()
def test_sync_mode_drives_sync(self):
"""Sync mode must fire ``_sync_lora_adapter`` from ``_maybe_sync_vllm_weights``.
The previous behavior (early return when ``not async_prefetch``)
assumed TRL's stock ``_generate_single_turn`` would handle sync.
That's true for vanilla GRPO but FALSE for NeMo Gym multi-turn
where the data producer bypasses ``_generate_single_turn``
entirely. Without this trigger no sync ever happens and the
trainer becomes a no-op.
"""
trainer = self._make_stub_trainer(interval=1, async_prefetch=False)
from axolotl.core.trainers.grpo.async_trainer import (
AsyncGRPOTrainer,
)
AsyncGRPOTrainer._maybe_sync_vllm_weights(trainer)
trainer._sync_lora_adapter.assert_called_once()
def test_async_mode_with_explicit_interval_respects_modulo(self):
trainer = self._make_stub_trainer(interval=4, async_prefetch=True)
from axolotl.core.trainers.grpo.async_trainer import (
AsyncGRPOTrainer,
)
# global_step=1, interval=4 → 1 % 4 != 0 → no sync
AsyncGRPOTrainer._maybe_sync_vllm_weights(trainer)
trainer._sync_lora_adapter.assert_not_called()
# global_step=4 → 4 % 4 == 0 → sync
trainer.state.global_step = 4
AsyncGRPOTrainer._maybe_sync_vllm_weights(trainer)
trainer._sync_lora_adapter.assert_called_once()
if __name__ == "__main__":
unittest.main()

View File

@@ -422,22 +422,22 @@ class TestPeftLoRAWeightExtraction:
)
# gate_up_proj [E, 2*inter, hidden]
# peft: in_features=2*inter (dim 1), out_features=hidden (dim 2)
# peft: in_features=hidden (last dim), out_features=2*inter (middle dim)
assert trainable[
"base_model.model.moe.experts.base_layer.lora_A.default.weight"
].shape == (E * r, 2 * config.intermediate_size)
assert trainable[
"base_model.model.moe.experts.base_layer.lora_B.default.weight"
].shape == (config.hidden_size, E * r)
# down_proj [E, hidden, inter]
# peft: in_features=hidden (dim 1), out_features=inter (dim 2)
assert trainable[
"base_model.model.moe.experts.lora_A.default.weight"
].shape == (E * r, config.hidden_size)
assert trainable[
"base_model.model.moe.experts.base_layer.lora_B.default.weight"
].shape == (2 * config.intermediate_size, E * r)
# down_proj [E, hidden, inter]
# peft: in_features=inter (last dim), out_features=hidden (middle dim)
assert trainable[
"base_model.model.moe.experts.lora_A.default.weight"
].shape == (E * r, config.intermediate_size)
assert trainable[
"base_model.model.moe.experts.lora_B.default.weight"
].shape == (config.intermediate_size, E * r)
].shape == (config.hidden_size, E * r)
@requires_cuda
def test_peft_forward_runs(self):
@@ -489,26 +489,28 @@ class TestPeftLoRAWeightExtraction:
assert down_lora is not None, "down_proj LoRA not detected"
# Check shapes (after peft->scattermoe conversion with A<->B swap)
# gate_up_proj W = param.T = [E, hidden, 2*inter], K=hidden, N=2*inter
# gate_up_proj: peft A [E*r, hidden] / B [2*inter, E*r]
# After swap: smoe_A [E*r, 2*inter], smoe_B [hidden, E*r]
E, r = config.num_experts, 4
gup_A, gup_B, gup_s = gup_lora
assert gup_A.shape == (E * r, config.hidden_size), (
f"gate_up_proj smoe_A: expected [r*E, K=hidden]={(E * r, config.hidden_size)}, "
assert gup_A.shape == (E * r, 2 * config.intermediate_size), (
f"gate_up_proj smoe_A: expected [r*E, 2*inter]={(E * r, 2 * config.intermediate_size)}, "
f"got {gup_A.shape}"
)
assert gup_B.shape == (2 * config.intermediate_size, E * r), (
f"gate_up_proj smoe_B: expected [N=2*inter, r*E]="
f"{(2 * config.intermediate_size, E * r)}, got {gup_B.shape}"
assert gup_B.shape == (config.hidden_size, E * r), (
f"gate_up_proj smoe_B: expected [hidden, r*E]="
f"{(config.hidden_size, E * r)}, got {gup_B.shape}"
)
# down_proj W = param.T = [E, inter, hidden], K=inter, N=hidden
# down_proj: peft A [E*r, inter] / B [hidden, E*r]
# After swap: smoe_A [E*r, hidden], smoe_B [inter, E*r]
down_A, down_B, down_s = down_lora
assert down_A.shape == (E * r, config.intermediate_size), (
f"down_proj smoe_A: expected [r*E, K=inter]={(E * r, config.intermediate_size)}, "
assert down_A.shape == (E * r, config.hidden_size), (
f"down_proj smoe_A: expected [r*E, hidden]={(E * r, config.hidden_size)}, "
f"got {down_A.shape}"
)
assert down_B.shape == (config.hidden_size, E * r), (
f"down_proj smoe_B: expected [N=hidden, r*E]={(config.hidden_size, E * r)}, "
assert down_B.shape == (config.intermediate_size, E * r), (
f"down_proj smoe_B: expected [inter, r*E]={(config.intermediate_size, E * r)}, "
f"got {down_B.shape}"
)

View File

@@ -361,329 +361,6 @@ class TestPluginDefaults(unittest.TestCase):
assert cfg.dataloader_num_workers == 0
class TestSelectWeightSyncTransport(unittest.TestCase):
"""Pure-logic table tests for ``select_weight_sync_transport``."""
def _caps(self, **kwargs):
from axolotl.integrations.nemo_gym.plugin import VLLMWeightSyncCapabilities
c = VLLMWeightSyncCapabilities(probed=True)
for k, v in kwargs.items():
setattr(c, k, v)
return c
def test_lora_with_native_endpoint(self):
from axolotl.integrations.nemo_gym.plugin import select_weight_sync_transport
caps = self._caps(lora_filesystem=True)
assert (
select_weight_sync_transport(caps, has_lora=True, vllm_lora_sync_pref=True)
== "lora_filesystem"
)
def test_lora_with_axolotl_endpoint(self):
from axolotl.integrations.nemo_gym.plugin import select_weight_sync_transport
caps = self._caps(lora_axolotl=True)
assert (
select_weight_sync_transport(caps, has_lora=True, vllm_lora_sync_pref=False)
== "lora_filesystem"
)
def test_lora_falls_back_to_nccl_when_no_lora_endpoint(self):
from axolotl.integrations.nemo_gym.plugin import select_weight_sync_transport
caps = self._caps(nccl=True)
assert (
select_weight_sync_transport(caps, has_lora=True, vllm_lora_sync_pref=False)
== "nccl"
)
def test_full_param_prefers_nccl(self):
from axolotl.integrations.nemo_gym.plugin import select_weight_sync_transport
caps = self._caps(nccl=True, http_full=True)
assert (
select_weight_sync_transport(
caps, has_lora=False, vllm_lora_sync_pref=False
)
== "nccl"
)
def test_full_param_falls_back_to_http(self):
from axolotl.integrations.nemo_gym.plugin import select_weight_sync_transport
caps = self._caps(http_full=True)
assert (
select_weight_sync_transport(
caps, has_lora=False, vllm_lora_sync_pref=False
)
== "http_full"
)
def test_full_param_no_routes_returns_none(self):
from axolotl.integrations.nemo_gym.plugin import select_weight_sync_transport
caps = self._caps() # all False
assert (
select_weight_sync_transport(
caps, has_lora=False, vllm_lora_sync_pref=False
)
== "none"
)
def test_lora_no_routes_returns_none(self):
from axolotl.integrations.nemo_gym.plugin import select_weight_sync_transport
caps = self._caps()
assert (
select_weight_sync_transport(caps, has_lora=True, vllm_lora_sync_pref=True)
== "none"
)
class TestProbeVllmWeightSync(unittest.TestCase):
"""``probe_vllm_weight_sync`` reads a vLLM ``/openapi.json`` and reports caps."""
def test_stock_vllm_with_lora_enabled(self):
"""Stock ``vllm serve --enable-lora`` exposes only LoRA endpoints."""
from unittest.mock import patch
from axolotl.integrations.nemo_gym.plugin import probe_vllm_weight_sync
spec = {
"paths": {
"/v1/models": {"get": {}},
"/v1/load_lora_adapter": {"post": {}},
"/v1/unload_lora_adapter": {"post": {}},
"/v1/completions": {"post": {}},
}
}
with patch("requests.get") as mock_get:
mock_get.return_value.raise_for_status = lambda: None
mock_get.return_value.json = lambda: spec
caps = probe_vllm_weight_sync("http://localhost:8000")
assert caps.probed is True
assert caps.lora_filesystem is True
assert caps.lora_axolotl is False
assert caps.nccl is False
assert caps.http_full is False
def test_axolotl_serve_lora_full_capabilities(self):
"""``axolotl vllm-serve`` exposes NCCL + LoRA + HTTP full sync."""
from unittest.mock import patch
from axolotl.integrations.nemo_gym.plugin import probe_vllm_weight_sync
spec = {
"paths": {
"/init_communicator/": {"post": {}},
"/update_named_param/": {"post": {}},
"/batch_update_named_params/": {"post": {}},
"/set_lora_adapter/": {"post": {}},
"/clear_lora_adapter/": {"post": {}},
"/http_update_weights/": {"post": {}},
"/v1/load_lora_adapter": {"post": {}},
}
}
with patch("requests.get") as mock_get:
mock_get.return_value.raise_for_status = lambda: None
mock_get.return_value.json = lambda: spec
caps = probe_vllm_weight_sync("http://localhost:8000")
assert caps.probed is True
assert caps.nccl is True
assert caps.lora_axolotl is True
assert caps.lora_filesystem is True
assert caps.http_full is True
def test_trl_vllm_serve_nccl_only(self):
"""``trl vllm-serve`` exposes NCCL routes but not LoRA filesystem."""
from unittest.mock import patch
from axolotl.integrations.nemo_gym.plugin import probe_vllm_weight_sync
spec = {
"paths": {
"/init_communicator/": {"post": {}},
"/update_named_param/": {"post": {}},
"/batch_update_named_params/": {"post": {}},
"/close_communicator/": {"post": {}},
"/generate/": {"post": {}},
}
}
with patch("requests.get") as mock_get:
mock_get.return_value.raise_for_status = lambda: None
mock_get.return_value.json = lambda: spec
caps = probe_vllm_weight_sync("http://localhost:8000")
assert caps.probed is True
assert caps.nccl is True
assert caps.lora_filesystem is False
assert caps.lora_axolotl is False
assert caps.http_full is False
def test_unreachable_server_records_error(self):
from unittest.mock import patch
from axolotl.integrations.nemo_gym.plugin import probe_vllm_weight_sync
with patch("requests.get") as mock_get:
mock_get.side_effect = ConnectionError("Connection refused")
caps = probe_vllm_weight_sync("http://localhost:9999")
assert caps.probed is False
assert caps.probe_error is not None
assert "ConnectionError" in caps.probe_error
assert caps.nccl is False
assert caps.lora_filesystem is False
class TestPluginWeightSyncEnforcement(unittest.TestCase):
"""End-to-end test of post_trainer_create's transport-selection branch.
The plugin used to silently no-op weight sync when ``vllm_lora_sync: false``,
leaving the trainer learning in isolation while vLLM kept serving the
unmodified base model. After the fix:
- LoRA + LoRA-loading endpoint → installs filesystem LoRA sync
- LoRA + only NCCL endpoint → uses NCCL broadcast
- Full FT + NCCL endpoint → uses NCCL broadcast (standard TRL flow)
- Full FT + HTTP endpoint → raises NotImplementedError (step 3)
- No usable transport → raises ValueError with a precise diagnosis
"""
@staticmethod
def _fake_cfg(adapter, vllm_lora_sync):
class FakeTRL:
pass
class FakeCfg:
pass
trl = FakeTRL()
trl.vllm_lora_sync = vllm_lora_sync
trl.vllm_server_host = "127.0.0.1"
trl.vllm_server_port = 8000
cfg = FakeCfg()
cfg.nemo_gym_enabled = True
cfg.nemo_gym_model_name = None
cfg.base_model = "test/model"
cfg.nemo_gym_verify_timeout = 30
cfg.nemo_gym_multi_turn = True
cfg.adapter = adapter
cfg.trl = trl
return cfg
@staticmethod
def _fake_trainer():
class FakeVLLMGen:
sync_weights = staticmethod(lambda: None)
class FakeTrainer:
vllm_generation = FakeVLLMGen()
return FakeTrainer()
@staticmethod
def _caps(**kwargs):
from axolotl.integrations.nemo_gym.plugin import VLLMWeightSyncCapabilities
c = VLLMWeightSyncCapabilities(probed=True)
for k, v in kwargs.items():
setattr(c, k, v)
return c
def test_lora_with_lora_endpoint_installs_filesystem_sync(self):
from unittest.mock import patch
from axolotl.integrations.nemo_gym.plugin import NemoGymPlugin
plugin = NemoGymPlugin()
plugin._vllm_caps = self._caps(lora_filesystem=True)
cfg = self._fake_cfg(adapter="lora", vllm_lora_sync=True)
trainer = self._fake_trainer()
with (
patch.object(plugin, "_setup_lora_sync") as setup,
patch.object(plugin, "_check_lora_endpoint") as check,
patch.object(plugin, "_wire_multi_turn") as wire,
):
plugin.post_trainer_create(cfg, trainer)
setup.assert_called_once()
check.assert_called_once()
wire.assert_called_once()
def test_lora_with_no_routes_raises_with_lora_specific_message(self):
from axolotl.integrations.nemo_gym.plugin import NemoGymPlugin
plugin = NemoGymPlugin()
plugin._vllm_caps = self._caps() # all False, but probed
cfg = self._fake_cfg(adapter="lora", vllm_lora_sync=False)
trainer = self._fake_trainer()
with self.assertRaises(ValueError) as ctx:
plugin.post_trainer_create(cfg, trainer)
msg = str(ctx.exception)
assert "no-op trainer" in msg
assert "load_lora_adapter" in msg
assert "VLLM_ALLOW_RUNTIME_LORA_UPDATING" in msg
def test_full_finetune_with_nccl_endpoint_uses_nccl(self):
from unittest.mock import patch
from axolotl.integrations.nemo_gym.plugin import NemoGymPlugin
plugin = NemoGymPlugin()
plugin._vllm_caps = self._caps(nccl=True)
cfg = self._fake_cfg(adapter=None, vllm_lora_sync=False)
trainer = self._fake_trainer()
with patch.object(plugin, "_wire_multi_turn") as wire:
plugin.post_trainer_create(cfg, trainer)
wire.assert_called_once()
def test_full_finetune_with_http_endpoint_not_implemented_yet(self):
from axolotl.integrations.nemo_gym.plugin import NemoGymPlugin
plugin = NemoGymPlugin()
plugin._vllm_caps = self._caps(http_full=True)
cfg = self._fake_cfg(adapter=None, vllm_lora_sync=False)
trainer = self._fake_trainer()
with self.assertRaises(NotImplementedError) as ctx:
plugin.post_trainer_create(cfg, trainer)
assert "HTTP weight sync" in str(ctx.exception)
def test_full_finetune_with_no_routes_raises_with_full_param_message(self):
from axolotl.integrations.nemo_gym.plugin import NemoGymPlugin
plugin = NemoGymPlugin()
plugin._vllm_caps = self._caps()
cfg = self._fake_cfg(adapter=None, vllm_lora_sync=False)
trainer = self._fake_trainer()
with self.assertRaises(ValueError) as ctx:
plugin.post_trainer_create(cfg, trainer)
msg = str(ctx.exception)
assert "no-op trainer" in msg
assert "init_communicator" in msg
assert "http_update_weights" in msg
def test_unprobed_caps_raises_with_probe_failure_message(self):
from axolotl.integrations.nemo_gym.plugin import NemoGymPlugin
plugin = NemoGymPlugin()
# Plugin._vllm_caps left as default-None: the post_trainer_create
# branch falls back to a fresh VLLMWeightSyncCapabilities() with
# probed=False, so the error path should mention probing.
cfg = self._fake_cfg(adapter="lora", vllm_lora_sync=True)
trainer = self._fake_trainer()
with self.assertRaises(ValueError) as ctx:
plugin.post_trainer_create(cfg, trainer)
assert "could not probe" in str(ctx.exception)
class TestNemoGymE2E(unittest.TestCase):
"""End-to-end test: data producer → agent (mocked) → parse → tensors → rewards.

View File

@@ -5,8 +5,6 @@ Covers:
- save_strategy: 'best' requires metric_for_best_model
- streaming=True with val_set_size > 0 is rejected
- lora_target_modules with invalid regex patterns is rejected
- GRPO: generation batch size must be divisible by num_generations,
num_generations >= 2, and effective_gbs >= num_generations * world_size
"""
import pytest
@@ -119,136 +117,3 @@ class TestLoraTargetModulesRegexValidator:
)
with pytest.raises(ValueError, match="invalid regex pattern"):
validate_config(cfg)
class TestGRPOBatchSizeValidator:
"""GRPO requires (mb*GA) % num_generations == 0 and num_generations >= 2.
These call the @model_validator(mode="before") classmethod directly on a
plain dict — same input shape it receives during full Pydantic validation,
just without dragging in unrelated fields (datasets / model loading / etc.)
that aren't relevant to what's under test. The validator is registered on
``RLValidationMixin`` (which ``AxolotlInputConfig`` inherits) so this is the
same code path ``axolotl train`` exercises.
"""
@staticmethod
def _check(data):
from axolotl.utils.schemas.validation import RLValidationMixin
return RLValidationMixin.check_grpo_batch_size_divisibility(data)
def test_divisible_passes(self):
data = {
"rl": "grpo",
"micro_batch_size": 1,
"gradient_accumulation_steps": 4,
"trl": {"num_generations": 4},
}
# Should return data unchanged (no exception)
out = self._check(data)
assert out["trl"]["num_generations"] == 4
def test_non_divisible_raises(self):
data = {
"rl": "grpo",
"micro_batch_size": 1,
"gradient_accumulation_steps": 2,
"trl": {"num_generations": 4},
}
with pytest.raises(ValueError, match="num_generations"):
self._check(data)
def test_non_divisible_error_includes_fix_hint(self):
data = {
"rl": "grpo",
"micro_batch_size": 1,
"gradient_accumulation_steps": 3,
"trl": {"num_generations": 4},
}
with pytest.raises(ValueError, match="gradient_accumulation_steps: 4"):
self._check(data)
def test_num_generations_one_raises(self):
data = {
"rl": "grpo",
"micro_batch_size": 1,
"gradient_accumulation_steps": 4,
"trl": {"num_generations": 1},
}
with pytest.raises(ValueError, match=r"num_generations >= 2"):
self._check(data)
def test_explicit_generation_batch_size_divisible_passes(self):
data = {
"rl": "grpo",
"micro_batch_size": 1,
"gradient_accumulation_steps": 1,
"trl": {"num_generations": 4, "generation_batch_size": 8},
}
out = self._check(data)
assert out["trl"]["generation_batch_size"] == 8
def test_explicit_generation_batch_size_non_divisible_raises(self):
data = {
"rl": "grpo",
"micro_batch_size": 1,
"gradient_accumulation_steps": 1,
"trl": {"num_generations": 4, "generation_batch_size": 6},
}
with pytest.raises(ValueError, match="trl.generation_batch_size"):
self._check(data)
def test_non_grpo_skips_check(self):
# Anything other than rl=grpo should pass through untouched, even
# with non-divisible batch sizes — they're irrelevant to other RL
# methods that don't use group-relative advantages.
data = {
"rl": "dpo",
"micro_batch_size": 1,
"gradient_accumulation_steps": 3,
"trl": {"num_generations": 4},
}
assert self._check(data) is data
def test_no_rl_set_skips_check(self):
data = {
"micro_batch_size": 1,
"gradient_accumulation_steps": 3,
}
assert self._check(data) is data
def test_grpo_without_num_generations_skips_check(self):
# If num_generations isn't set, TRL uses its own default — we don't
# have enough info to validate, so the validator must short-circuit
# rather than guess.
data = {
"rl": "grpo",
"micro_batch_size": 1,
"gradient_accumulation_steps": 3,
"trl": {},
}
out = self._check(data)
assert out["rl"] == "grpo"
def test_multi_rank_group_size_check(self):
data = {
"rl": "grpo",
"micro_batch_size": 1,
"gradient_accumulation_steps": 4, # gbs=4
"world_size": 2, # need gbs >= 4*2 = 8
"trl": {"num_generations": 4},
}
with pytest.raises(ValueError, match=r"world_size=2"):
self._check(data)
def test_multi_rank_group_size_satisfied(self):
data = {
"rl": "grpo",
"micro_batch_size": 1,
"gradient_accumulation_steps": 8, # gbs=8 >= 4*2
"world_size": 2,
"trl": {"num_generations": 4},
}
out = self._check(data)
assert out["gradient_accumulation_steps"] == 8