Compare commits
2 Commits
53391a10d7
...
vllm-0191
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cec99c4133 | ||
|
|
d248242490 |
2
setup.py
2
setup.py
@@ -89,7 +89,7 @@ def parse_requirements(extras_require_map):
|
||||
]
|
||||
if not install_xformers:
|
||||
_install_requires.pop(_install_requires.index(xformers_version))
|
||||
extras_require_map["vllm"] = ["vllm>=0.19.0"]
|
||||
extras_require_map["vllm"] = ["vllm>=0.19.1"]
|
||||
elif (major, minor) >= (2, 9):
|
||||
extras_require_map.pop("fbgemm-gpu")
|
||||
extras_require_map["fbgemm-gpu"] = [
|
||||
|
||||
@@ -1103,22 +1103,11 @@ class AsyncGRPOTrainer(GRPOTrainer):
|
||||
- vllm_lora_sync: saves adapter to filesystem, vLLM loads natively
|
||||
- PEFT no-merge: computes merged weights as new tensors, NCCL broadcast
|
||||
- Non-PEFT: stock sync_weights via merge_adapter + NCCL
|
||||
|
||||
This is the canonical sync trigger and runs in BOTH async and
|
||||
synchronous modes from ``_prepare_inputs_with_data_producer`` /
|
||||
``_prepare_inputs_legacy_async``. The ``_generate_single_turn``
|
||||
patch is a parallel backup for non-data-producer paths (vanilla
|
||||
GRPO without NeMo Gym), where the data producer is bypassed
|
||||
entirely and TRL's stock generate-then-sync flow is used instead.
|
||||
"""
|
||||
if not self.use_vllm:
|
||||
if not (self.use_vllm and self.args.async_prefetch):
|
||||
return
|
||||
step = self.state.global_step
|
||||
# Default to syncing every step when no interval is configured —
|
||||
# otherwise ``step % None`` would TypeError, and the previous
|
||||
# behavior of crashing on the first sync was strictly worse than
|
||||
# the standard "sync every optimizer step".
|
||||
interval = self.args.vllm_sync_interval or 1
|
||||
interval = self.args.vllm_sync_interval
|
||||
if step != self._last_synced_step and step % interval == 0:
|
||||
if step == 0:
|
||||
logger.info("Skipping vLLM weight sync at step 0 (no training yet)")
|
||||
@@ -1213,42 +1202,13 @@ class AsyncGRPOTrainer(GRPOTrainer):
|
||||
|
||||
# Permanently replace vllm_generation.sync_weights with our custom
|
||||
# sync to avoid merge_adapter (fails on FP8 / races with training).
|
||||
#
|
||||
# The design has two modes that have to be threaded carefully:
|
||||
#
|
||||
# - Async prefetch ON: BG generation thread can't safely call
|
||||
# sync_weights mid-rollout (it races with the trainer's optimizer
|
||||
# step and can corrupt weights). We no-op the stock sync hook and
|
||||
# drive sync ourselves from ``_maybe_sync_vllm_weights`` after the
|
||||
# optimizer step on the main thread.
|
||||
#
|
||||
# - Async prefetch OFF (synchronous mode): TRL's stock
|
||||
# ``_generate_single_turn`` calls ``sync_weights`` once per step
|
||||
# boundary. There's no BG thread to race with, and
|
||||
# ``_maybe_sync_vllm_weights`` short-circuits with
|
||||
# ``if not async_prefetch: return``, so we MUST wire the stock
|
||||
# hook directly to our LoRA sync helper — otherwise nothing ever
|
||||
# pushes weights to vLLM and the trainer becomes a no-op (vLLM
|
||||
# keeps serving the base model, every rollout in every group
|
||||
# produces identical outputs, advantages are zero, optimizer
|
||||
# step gets skipped, repeat).
|
||||
# For LoRA sync mode, make it a no-op here since _maybe_sync_vllm_weights
|
||||
# handles the sync with proper interval tracking.
|
||||
if not getattr(self, "_patched_sync_weights", False):
|
||||
if self.use_vllm and hasattr(self, "vllm_generation"):
|
||||
if getattr(self.args, "vllm_lora_sync", False):
|
||||
if getattr(self.args, "async_prefetch", False):
|
||||
# Async: drive sync from main thread via
|
||||
# _maybe_sync_vllm_weights instead.
|
||||
self.vllm_generation.sync_weights = lambda: None
|
||||
else:
|
||||
# Sync mode: TRL's _generate_single_turn already
|
||||
# calls sync_weights once per step boundary. Wire
|
||||
# it directly to our LoRA filesystem sync helper.
|
||||
sync_helper = self._sync_lora_adapter
|
||||
|
||||
def _lora_filesystem_sync():
|
||||
sync_helper()
|
||||
|
||||
self.vllm_generation.sync_weights = _lora_filesystem_sync
|
||||
# No-op: LoRA sync is driven by _maybe_sync_vllm_weights
|
||||
self.vllm_generation.sync_weights = lambda: None
|
||||
self._patched_sync_weights = True
|
||||
else:
|
||||
from accelerate.utils import is_peft_model
|
||||
|
||||
@@ -19,7 +19,6 @@ Supports two modes:
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from dataclasses import dataclass, field
|
||||
from typing import TYPE_CHECKING, Union
|
||||
|
||||
from axolotl.integrations.base import BasePlugin
|
||||
@@ -31,107 +30,6 @@ if TYPE_CHECKING:
|
||||
LOG = get_logger(__name__)
|
||||
|
||||
|
||||
# ---- vLLM weight-sync transport probe ------------------------------------
|
||||
|
||||
|
||||
@dataclass
|
||||
class VLLMWeightSyncCapabilities:
|
||||
"""What weight-sync routes a vLLM server actually exposes.
|
||||
|
||||
Discovered once at ``pre_model_load`` time by fetching the server's
|
||||
``/openapi.json``. Drives the transport-selection table below.
|
||||
"""
|
||||
|
||||
nccl: bool = False # /init_communicator/ + /update_named_param/
|
||||
lora_filesystem: bool = False # /v1/load_lora_adapter (vLLM native)
|
||||
lora_axolotl: bool = False # /set_lora_adapter/ (axolotl serve_lora extension)
|
||||
http_full: bool = False # /http_update_weights/ (axolotl serve_lora extension)
|
||||
probed: bool = False
|
||||
probe_error: str | None = None
|
||||
routes: list[str] = field(default_factory=list)
|
||||
|
||||
@property
|
||||
def any_full_param_sync(self) -> bool:
|
||||
"""True if at least one transport can push full-model weights."""
|
||||
return self.nccl or self.http_full
|
||||
|
||||
@property
|
||||
def any_lora_sync(self) -> bool:
|
||||
"""True if at least one transport can push LoRA adapters."""
|
||||
return self.lora_filesystem or self.lora_axolotl or self.nccl
|
||||
|
||||
|
||||
def probe_vllm_weight_sync(
|
||||
base_url: str, timeout: float = 5.0
|
||||
) -> VLLMWeightSyncCapabilities:
|
||||
"""Detect which weight-sync routes the configured vLLM server exposes.
|
||||
|
||||
Uses the server's FastAPI ``/openapi.json`` — every weight-sync transport
|
||||
we care about is mounted as a POST route there. Falls back to all-False
|
||||
on any error so the caller can still decide what to do (typically: raise
|
||||
a clear error rather than silently no-op).
|
||||
"""
|
||||
import requests
|
||||
|
||||
caps = VLLMWeightSyncCapabilities()
|
||||
try:
|
||||
r = requests.get(f"{base_url.rstrip('/')}/openapi.json", timeout=timeout)
|
||||
r.raise_for_status()
|
||||
spec = r.json()
|
||||
routes = sorted((spec.get("paths") or {}).keys())
|
||||
caps.routes = routes
|
||||
caps.nccl = "/init_communicator/" in routes and "/update_named_param/" in routes
|
||||
caps.lora_filesystem = "/v1/load_lora_adapter" in routes
|
||||
caps.lora_axolotl = "/set_lora_adapter/" in routes
|
||||
caps.http_full = "/http_update_weights/" in routes
|
||||
caps.probed = True
|
||||
except Exception as exc:
|
||||
caps.probe_error = f"{type(exc).__name__}: {exc}"
|
||||
LOG.warning(
|
||||
"NeMo Gym: failed to probe vLLM /openapi.json at %s — %s. "
|
||||
"Will fall back to LoRA-only behavior.",
|
||||
base_url,
|
||||
caps.probe_error,
|
||||
)
|
||||
return caps
|
||||
|
||||
|
||||
def select_weight_sync_transport(
|
||||
caps: VLLMWeightSyncCapabilities,
|
||||
*,
|
||||
has_lora: bool,
|
||||
vllm_lora_sync_pref: bool,
|
||||
) -> str:
|
||||
"""Pick the right transport for a (server caps, model type) combo.
|
||||
|
||||
Returns one of: ``"lora_filesystem"``, ``"nccl"``, ``"http_full"``, or
|
||||
``"none"``. The caller decides what to do with ``"none"`` (typically:
|
||||
raise an error explaining the misconfiguration).
|
||||
|
||||
Selection table:
|
||||
LoRA model + lora endpoint + lora-sync pref → lora_filesystem
|
||||
LoRA model + lora endpoint → lora_filesystem
|
||||
LoRA model + nccl endpoint → nccl (broadcast merged adapter)
|
||||
Full model + nccl endpoint → nccl
|
||||
Full model + http endpoint → http_full
|
||||
anything else → none
|
||||
"""
|
||||
if has_lora:
|
||||
if (caps.lora_filesystem or caps.lora_axolotl) and vllm_lora_sync_pref:
|
||||
return "lora_filesystem"
|
||||
if caps.lora_filesystem or caps.lora_axolotl:
|
||||
return "lora_filesystem"
|
||||
if caps.nccl:
|
||||
return "nccl"
|
||||
return "none"
|
||||
# Full-parameter model
|
||||
if caps.nccl:
|
||||
return "nccl"
|
||||
if caps.http_full:
|
||||
return "http_full"
|
||||
return "none"
|
||||
|
||||
|
||||
class NemoGymPlugin(BasePlugin):
|
||||
"""Plugin for NVIDIA NeMo Gym integration with Axolotl.
|
||||
|
||||
@@ -152,69 +50,37 @@ class NemoGymPlugin(BasePlugin):
|
||||
self._reward_fn = None
|
||||
self._dataset_lookup = None
|
||||
self._agent_servers = {}
|
||||
self._vllm_caps: VLLMWeightSyncCapabilities | None = None
|
||||
|
||||
def get_input_args(self):
|
||||
return "axolotl.integrations.nemo_gym.NemoGymArgs"
|
||||
|
||||
def pre_model_load(self, cfg):
|
||||
"""Probe vLLM weight-sync routes and conditionally bypass NCCL init.
|
||||
|
||||
Replaces the previous unconditional ``init_communicator`` monkey-patch
|
||||
with a probe of the configured vLLM server's ``/openapi.json``. We only
|
||||
bypass NCCL init when the server we're talking to actually lacks the
|
||||
``/init_communicator/`` route (i.e. stock ``vllm serve``); against
|
||||
TRL/axolotl serve modules that DO expose NCCL routes, we leave the
|
||||
standard TRL flow alone so full-finetune training can sync weights.
|
||||
"""
|
||||
"""Apply monkeypatches before trainer creation."""
|
||||
if not cfg.nemo_gym_enabled:
|
||||
return
|
||||
|
||||
# Always skip NCCL communicator init in NeMo Gym mode.
|
||||
# NeMo Gym uses its own vLLM server (standard OpenAI API), not the TRL
|
||||
# colocate/NCCL path. The NCCL init fails with vLLM V1 and standard servers.
|
||||
trl_cfg = getattr(cfg, "trl", None)
|
||||
if not (trl_cfg and getattr(trl_cfg, "vllm_mode", "server") == "server"):
|
||||
return
|
||||
|
||||
host = getattr(trl_cfg, "vllm_server_host", None) or "127.0.0.1"
|
||||
port = getattr(trl_cfg, "vllm_server_port", None) or 8000
|
||||
base_url = f"http://{host}:{port}"
|
||||
self._vllm_caps = probe_vllm_weight_sync(base_url)
|
||||
|
||||
if self._vllm_caps.probed:
|
||||
LOG.info(
|
||||
"NeMo Gym: vLLM weight-sync probe @ %s — nccl=%s lora_native=%s "
|
||||
"lora_axolotl=%s http_full=%s",
|
||||
base_url,
|
||||
self._vllm_caps.nccl,
|
||||
self._vllm_caps.lora_filesystem,
|
||||
self._vllm_caps.lora_axolotl,
|
||||
self._vllm_caps.http_full,
|
||||
)
|
||||
|
||||
# Only bypass NCCL init when the server doesn't speak it. If NCCL is
|
||||
# available we leave VLLMClient.init_communicator alone so the
|
||||
# standard TRL sync flow can run for full-parameter training.
|
||||
if not self._vllm_caps.nccl:
|
||||
if trl_cfg and getattr(trl_cfg, "vllm_mode", "server") == "server":
|
||||
self._patch_skip_nccl_init()
|
||||
|
||||
def _patch_skip_nccl_init(self):
|
||||
"""Monkeypatch VLLMClient.init_communicator to no-op.
|
||||
|
||||
Only called when the configured vLLM server doesn't expose
|
||||
``/init_communicator/`` (e.g. stock ``vllm serve``). In that case
|
||||
TRL's standard ``init_communicator`` would 404 inside trainer
|
||||
construction; we no-op it so the LoRA filesystem path can install
|
||||
its own sync in ``post_trainer_create``.
|
||||
NeMo Gym uses its own vLLM server (standard OpenAI API or custom LoRA
|
||||
serve script). The NCCL communicator is not needed and fails with both
|
||||
vLLM V1 engine and standard OpenAI server mode.
|
||||
"""
|
||||
try:
|
||||
from trl.generation.vllm_client import VLLMClient
|
||||
|
||||
VLLMClient._original_init_communicator = VLLMClient.init_communicator
|
||||
VLLMClient.init_communicator = lambda self, **kwargs: LOG.info(
|
||||
"Skipping NCCL init_communicator (server has no /init_communicator/)"
|
||||
)
|
||||
LOG.info(
|
||||
"Patched VLLMClient.init_communicator to no-op (server has no NCCL routes)"
|
||||
"Skipping NCCL init_communicator (LoRA sync mode)"
|
||||
)
|
||||
LOG.info("Patched VLLMClient.init_communicator to no-op for LoRA sync")
|
||||
except Exception as exc:
|
||||
LOG.warning(f"Failed to patch VLLMClient: {exc}")
|
||||
|
||||
@@ -368,80 +234,30 @@ class NemoGymPlugin(BasePlugin):
|
||||
verify_timeout = cfg.nemo_gym_verify_timeout or 30
|
||||
multi_turn = cfg.nemo_gym_multi_turn or False
|
||||
|
||||
# Pick a weight-sync transport based on what the configured vLLM
|
||||
# server actually exposes (see ``pre_model_load`` probe) and what
|
||||
# kind of model we're training. The selection table is documented
|
||||
# in ``select_weight_sync_transport``.
|
||||
# Handle weight sync. NeMo Gym skips NCCL init, so we need to either:
|
||||
# - Install LoRA sync (when vllm_lora_sync=True)
|
||||
# - Or no-op sync_weights (when using standard vLLM server)
|
||||
trl_cfg = getattr(cfg, "trl", None)
|
||||
if hasattr(trainer, "vllm_generation") and trainer.vllm_generation:
|
||||
vllm_gen = trainer.vllm_generation
|
||||
adapter = getattr(cfg, "adapter", None)
|
||||
has_lora = adapter in ("lora", "qlora")
|
||||
vllm_lora_sync_pref = bool(
|
||||
trl_cfg and getattr(trl_cfg, "vllm_lora_sync", False)
|
||||
)
|
||||
caps = self._vllm_caps or VLLMWeightSyncCapabilities()
|
||||
transport = select_weight_sync_transport(
|
||||
caps,
|
||||
has_lora=has_lora,
|
||||
vllm_lora_sync_pref=vllm_lora_sync_pref,
|
||||
)
|
||||
|
||||
if transport == "lora_filesystem":
|
||||
if trl_cfg and getattr(trl_cfg, "vllm_lora_sync", False):
|
||||
self._setup_lora_sync(trainer)
|
||||
# Verify the vLLM server supports runtime LoRA loading
|
||||
self._check_lora_endpoint(vllm_gen)
|
||||
LOG.info("NeMo Gym weight sync: LoRA filesystem")
|
||||
elif transport == "nccl":
|
||||
# Standard TRL NCCL path. We leave ``VLLMClient.init_communicator``
|
||||
# alone (pre_model_load only patched it when the probe found no
|
||||
# NCCL route) so the trainer's normal weight-sync flow runs.
|
||||
LOG.info(
|
||||
"NeMo Gym weight sync: NCCL (server exposes /init_communicator/)"
|
||||
else:
|
||||
# No NCCL, no LoRA sync — skip all weight sync paths
|
||||
vllm_gen.sync_weights = lambda: LOG.debug(
|
||||
"Weight sync skipped (NeMo Gym mode)"
|
||||
)
|
||||
elif transport == "http_full":
|
||||
# Full-parameter HTTP sync — implementation lands in step 3.
|
||||
# For now, fail loudly so users know the path is detected but
|
||||
# not yet wired up, instead of silently no-oping like before.
|
||||
raise NotImplementedError(
|
||||
"NeMo Gym + full fine-tune + HTTP weight sync is detected "
|
||||
"but the client-side sync helper is not yet implemented "
|
||||
"(planned). Use `adapter: lora|qlora` for now, or use a "
|
||||
"vLLM serve module that exposes /init_communicator/ for "
|
||||
"NCCL sync."
|
||||
type(vllm_gen).sync_weights = lambda self: LOG.debug(
|
||||
"Weight sync skipped (NeMo Gym mode)"
|
||||
)
|
||||
else: # transport == "none"
|
||||
# No viable sync path. Build a precise error so the user knows
|
||||
# exactly what's missing and how to fix it.
|
||||
if not caps.probed:
|
||||
msg = (
|
||||
"could not probe the vLLM server's "
|
||||
f"/openapi.json: {caps.probe_error}. "
|
||||
"Verify that vLLM is reachable at "
|
||||
f"{getattr(trl_cfg, 'vllm_server_host', '?')}:"
|
||||
f"{getattr(trl_cfg, 'vllm_server_port', '?')}."
|
||||
# Also patch the async trainer's internal sync method
|
||||
if hasattr(trainer, "_maybe_sync_vllm_weights"):
|
||||
trainer._maybe_sync_vllm_weights = lambda: LOG.debug(
|
||||
"Async weight sync skipped (NeMo Gym mode)"
|
||||
)
|
||||
elif has_lora:
|
||||
msg = (
|
||||
"the vLLM server has neither NCCL routes "
|
||||
"(/init_communicator/) nor a LoRA-loading route "
|
||||
"(/v1/load_lora_adapter or /set_lora_adapter/). "
|
||||
"Restart vLLM with `--enable-lora --max-lora-rank N "
|
||||
"VLLM_ALLOW_RUNTIME_LORA_UPDATING=1` for the stock "
|
||||
"server, or use `axolotl vllm-serve` for the "
|
||||
"NCCL-capable serve module."
|
||||
)
|
||||
else:
|
||||
msg = (
|
||||
"the vLLM server exposes no full-parameter sync route "
|
||||
"(/init_communicator/ for NCCL or /http_update_weights/ "
|
||||
"for HTTP). Use `axolotl vllm-serve` (which has both) "
|
||||
"or set `adapter: lora|qlora`."
|
||||
)
|
||||
raise ValueError(
|
||||
f"NeMo Gym: no usable weight-sync transport — {msg} Without "
|
||||
"weight sync the trainer's gradient updates never reach the "
|
||||
"rollout policy (functionally a no-op trainer)."
|
||||
)
|
||||
LOG.info("Disabled weight sync (NeMo Gym mode, no LoRA sync)")
|
||||
|
||||
if multi_turn:
|
||||
self._wire_multi_turn(cfg, trainer, model_name, verify_timeout)
|
||||
|
||||
@@ -130,41 +130,21 @@ def start_servers(
|
||||
)
|
||||
|
||||
|
||||
def get_server_configs(head_port: int = 11000, timeout: float = 30.0) -> dict:
|
||||
def get_server_configs(head_port: int = 11000) -> dict:
|
||||
"""Fetch the global config from the NeMo Gym head server.
|
||||
|
||||
Retries up to 3 times with exponential backoff. The default per-attempt
|
||||
timeout is 30s (raised from the original 5s) because head servers can
|
||||
be slow to respond when they're concurrently serving rollouts from a
|
||||
prior training run. A 5s timeout was empirically too tight to survive
|
||||
a kill-and-relaunch cycle.
|
||||
|
||||
Returns:
|
||||
Dict mapping server_name -> server config.
|
||||
"""
|
||||
url = f"http://127.0.0.1:{head_port}/global_config_dict_yaml"
|
||||
last_exc: Exception | None = None
|
||||
for attempt in (1, 2, 3):
|
||||
try:
|
||||
response = requests.get(url, timeout=timeout)
|
||||
response.raise_for_status()
|
||||
result = yaml.safe_load(response.text)
|
||||
# NeMo Gym head server double-encodes: YAML string inside a YAML string
|
||||
if isinstance(result, str):
|
||||
result = yaml.safe_load(result)
|
||||
return result
|
||||
except (requests.exceptions.RequestException, OSError) as exc:
|
||||
last_exc = exc
|
||||
LOG.warning(
|
||||
"NeMo Gym head probe attempt %d/3 failed: %s. Retrying...",
|
||||
attempt,
|
||||
type(exc).__name__,
|
||||
)
|
||||
if attempt < 3:
|
||||
time.sleep(2.0 * attempt)
|
||||
raise RuntimeError(
|
||||
f"NeMo Gym head server at {url} did not respond after 3 attempts: {last_exc}"
|
||||
response = requests.get(
|
||||
f"http://127.0.0.1:{head_port}/global_config_dict_yaml", timeout=5
|
||||
)
|
||||
response.raise_for_status()
|
||||
result = yaml.safe_load(response.text)
|
||||
# NeMo Gym head server double-encodes: YAML string inside a YAML string
|
||||
if isinstance(result, str):
|
||||
result = yaml.safe_load(result)
|
||||
return result
|
||||
|
||||
|
||||
def get_agent_servers(
|
||||
|
||||
@@ -320,15 +320,6 @@ def main(script_args: ScriptArguments):
|
||||
# --- Active LoRA state (shared across endpoints via closure) ---
|
||||
active_lora: dict = {"request": None}
|
||||
|
||||
# Serializes access to the worker pipe. The underlying
|
||||
# multiprocessing.Connection is a single full-duplex stream shared
|
||||
# across all HTTP handlers; concurrent requests interleave bytes on
|
||||
# the wire and corrupt the pickle framing (seen as
|
||||
# ``UnpicklingError: pickle data was truncated``). Any endpoint that
|
||||
# does ``conn.send(...); conn.recv()`` MUST hold this lock across
|
||||
# the round-trip so only one inflight call at a time per pipe.
|
||||
worker_pipe_lock = asyncio.Lock()
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# LoRA-specific endpoints
|
||||
# ------------------------------------------------------------------
|
||||
@@ -640,147 +631,6 @@ def main(script_args: ScriptArguments):
|
||||
},
|
||||
}
|
||||
|
||||
@app.post("/v1/completions")
|
||||
async def openai_completions(request_body: dict):
|
||||
"""OpenAI-compatible text-completions endpoint.
|
||||
|
||||
Accepts either a string ``prompt`` or a list-of-int
|
||||
``prompt_token_ids`` (as the text-completions spec allows). Routes
|
||||
to the internal vLLM generate method with the active LoRA adapter
|
||||
and returns an OpenAI /v1/completions-shaped response including
|
||||
per-choice ``prompt_token_ids``, ``generation_token_ids``, and
|
||||
``generation_log_probs`` for NeMo Gym agents that need raw
|
||||
tokens + logprobs.
|
||||
"""
|
||||
import uuid
|
||||
|
||||
prompt_raw = request_body.get("prompt")
|
||||
temperature = request_body.get("temperature", 1.0)
|
||||
max_tokens = request_body.get("max_tokens", 512)
|
||||
top_p = request_body.get("top_p", 1.0)
|
||||
n = request_body.get("n", 1)
|
||||
logprobs = request_body.get("logprobs") or 0
|
||||
stop_token_ids = request_body.get("stop_token_ids") or None
|
||||
|
||||
# Accept either a string or a list[int] token id prompt. Lists
|
||||
# must contain ints only (raise on lists of strings so callers get
|
||||
# a clear error). Also accept [[int, int, ...]] nesting for the
|
||||
# rare case callers pass a single-prompt batch.
|
||||
if isinstance(prompt_raw, list) and prompt_raw and isinstance(prompt_raw[0], list):
|
||||
prompt_raw = prompt_raw[0]
|
||||
|
||||
if isinstance(prompt_raw, list):
|
||||
prompt_dict = {"prompt_token_ids": prompt_raw}
|
||||
elif isinstance(prompt_raw, str):
|
||||
prompt_dict = {"prompt": prompt_raw}
|
||||
else:
|
||||
return {
|
||||
"error": {
|
||||
"message": (
|
||||
"prompt must be a string or a list of token ids"
|
||||
),
|
||||
"type": "invalid_request",
|
||||
}
|
||||
}
|
||||
|
||||
generation_kwargs: dict[str, Any] = {
|
||||
"n": n,
|
||||
"temperature": temperature,
|
||||
"top_p": top_p,
|
||||
"max_tokens": max_tokens,
|
||||
"logprobs": logprobs,
|
||||
}
|
||||
if stop_token_ids:
|
||||
generation_kwargs["stop_token_ids"] = stop_token_ids
|
||||
sampling_params = SamplingParams(
|
||||
**{k: v for k, v in generation_kwargs.items() if v is not None}
|
||||
)
|
||||
|
||||
chunked = chunk_list([prompt_dict], script_args.data_parallel_size)
|
||||
|
||||
# Hold the pipe lock across send+recv — concurrent requests would
|
||||
# otherwise interleave pickle frames on the worker connection.
|
||||
async with worker_pipe_lock:
|
||||
for conn, chunk in zip(connections, chunked, strict=True):
|
||||
if not chunk:
|
||||
chunk = [{"prompt": "<placeholder>"}]
|
||||
kwargs = {
|
||||
"prompts": chunk,
|
||||
"sampling_params": sampling_params,
|
||||
"lora_request": active_lora["request"],
|
||||
}
|
||||
conn.send({"type": "call", "method": "generate", "kwargs": kwargs})
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
all_outputs = await asyncio.gather(
|
||||
*(loop.run_in_executor(None, safe_recv, conn) for conn in connections)
|
||||
)
|
||||
|
||||
all_outputs = [
|
||||
o for o, c in zip(all_outputs, chunked, strict=True) if c
|
||||
]
|
||||
for o in all_outputs:
|
||||
if isinstance(o, dict) and "error" in o:
|
||||
raise RuntimeError(f"vLLM worker error: {o['error']}")
|
||||
all_outputs = list(chain.from_iterable(all_outputs))
|
||||
|
||||
if not all_outputs:
|
||||
return {"choices": [], "model": script_args.model}
|
||||
|
||||
choices = []
|
||||
for i, output in enumerate(all_outputs):
|
||||
for j, out in enumerate(output.outputs):
|
||||
text = out.text
|
||||
# OpenAI-style `logprobs` block for text-completions:
|
||||
# { "tokens": [...], "token_logprobs": [...] }
|
||||
lp_block = None
|
||||
if out.logprobs:
|
||||
tokens_str: list[str] = []
|
||||
token_lps: list[float] = []
|
||||
for step in out.logprobs:
|
||||
chosen = next(iter(step.values()))
|
||||
tokens_str.append(getattr(chosen, "decoded_token", "") or "")
|
||||
token_lps.append(float(chosen.logprob))
|
||||
lp_block = {
|
||||
"tokens": tokens_str,
|
||||
"token_logprobs": token_lps,
|
||||
}
|
||||
|
||||
choice = {
|
||||
"index": i * n + j,
|
||||
"text": text,
|
||||
"finish_reason": "stop" if out.finish_reason == "stop" else "length",
|
||||
"logprobs": lp_block,
|
||||
# NeMo-Gym / retrace agent extras — preserved on the
|
||||
# choice so callers with raw-token pipelines don't
|
||||
# have to re-tokenize.
|
||||
"prompt_token_ids": output.prompt_token_ids,
|
||||
"generation_token_ids": list(out.token_ids),
|
||||
"generation_log_probs": (
|
||||
[float(next(iter(lp.values())).logprob) for lp in out.logprobs]
|
||||
if out.logprobs
|
||||
else []
|
||||
),
|
||||
}
|
||||
choices.append(choice)
|
||||
|
||||
prompt_tokens = len(all_outputs[0].prompt_token_ids) if all_outputs else 0
|
||||
completion_tokens = sum(
|
||||
len(out.token_ids) for o in all_outputs for out in o.outputs
|
||||
)
|
||||
|
||||
return {
|
||||
"id": f"cmpl-{uuid.uuid4().hex[:8]}",
|
||||
"object": "text_completion",
|
||||
"model": script_args.model,
|
||||
"choices": choices,
|
||||
"usage": {
|
||||
"prompt_tokens": prompt_tokens,
|
||||
"completion_tokens": completion_tokens,
|
||||
"total_tokens": prompt_tokens + completion_tokens,
|
||||
},
|
||||
}
|
||||
|
||||
# --- Weight sync endpoints (legacy fallback, same as TRL) ---
|
||||
|
||||
@app.post("/init_communicator/")
|
||||
|
||||
@@ -770,88 +770,6 @@ class RLValidationMixin:
|
||||
)
|
||||
return data
|
||||
|
||||
@model_validator(mode="before")
|
||||
@classmethod
|
||||
def check_grpo_batch_size_divisibility(cls, data):
|
||||
"""Surface GRPO batch-shape mismatches at config-parse time.
|
||||
|
||||
TRL's GRPOTrainer requires that the per-step generation batch size be
|
||||
evenly divisible by ``num_generations`` so that every prompt can be
|
||||
replicated exactly ``num_generations`` times. The runtime check inside
|
||||
``GRPOTrainer.__init__`` only fires after the model has been loaded —
|
||||
too late and too cryptic for the user. We replicate the check here so
|
||||
the failure is immediate and actionable.
|
||||
|
||||
Also enforces:
|
||||
- ``num_generations >= 2`` (group-relative advantage needs variance)
|
||||
- ``effective_gbs >= num_generations * world_size`` when capabilities
|
||||
indicate multiple ranks (each rank needs at least one full group)
|
||||
"""
|
||||
if data.get("rl") != "grpo":
|
||||
return data
|
||||
|
||||
trl_cfg = data.get("trl") or {}
|
||||
num_gen = trl_cfg.get("num_generations")
|
||||
if num_gen is None:
|
||||
# TRL's own default is 8 — but if the user didn't set it, we
|
||||
# don't have enough info to validate anything. Let TRL's own
|
||||
# init handle the default-vs-batch interaction.
|
||||
return data
|
||||
if num_gen < 2:
|
||||
raise ValueError(
|
||||
f"GRPO requires `trl.num_generations >= 2` (got {num_gen}). "
|
||||
"With num_generations=1, every group has zero advantage and "
|
||||
"the policy never updates."
|
||||
)
|
||||
|
||||
explicit_gbs = trl_cfg.get("generation_batch_size")
|
||||
if explicit_gbs is not None:
|
||||
effective_gbs = int(explicit_gbs)
|
||||
gbs_source = "trl.generation_batch_size"
|
||||
else:
|
||||
mb = data.get("micro_batch_size") or 1
|
||||
ga = data.get("gradient_accumulation_steps") or 1
|
||||
effective_gbs = int(mb) * int(ga)
|
||||
gbs_source = f"micro_batch_size ({mb}) * gradient_accumulation_steps ({ga})"
|
||||
|
||||
if effective_gbs % num_gen != 0:
|
||||
# Suggest the smallest GA bump that fixes it for the common case
|
||||
# where the user hasn't set generation_batch_size explicitly.
|
||||
hint = ""
|
||||
if explicit_gbs is None:
|
||||
from math import gcd
|
||||
|
||||
mb_val = int(data.get("micro_batch_size") or 1)
|
||||
# smallest GA such that mb*GA is a multiple of num_gen
|
||||
lcm = num_gen * mb_val // gcd(num_gen, mb_val)
|
||||
suggested_ga = lcm // mb_val
|
||||
hint = (
|
||||
f" Smallest fix: set `gradient_accumulation_steps: "
|
||||
f"{suggested_ga}` (so micro_batch_size * GA = "
|
||||
f"{mb_val * suggested_ga} is a multiple of {num_gen})."
|
||||
)
|
||||
raise ValueError(
|
||||
f"GRPO: generation batch size must be divisible by "
|
||||
f"`trl.num_generations`. Got effective_gbs={effective_gbs} "
|
||||
f"(from {gbs_source}) and num_generations={num_gen}.{hint}"
|
||||
)
|
||||
|
||||
# Multi-rank check: each rank must receive at least one full group
|
||||
# per step. Without `capabilities` populated yet (mode='before'), we
|
||||
# fall back to user-set distributed fields.
|
||||
world_size = (
|
||||
(data.get("capabilities") or {}).get("n_gpu") or data.get("world_size") or 1
|
||||
)
|
||||
if world_size and world_size > 1 and effective_gbs < num_gen * world_size:
|
||||
raise ValueError(
|
||||
f"GRPO with world_size={world_size} requires effective_gbs "
|
||||
f">= num_generations * world_size = {num_gen * world_size}, "
|
||||
f"got {effective_gbs}. Increase gradient_accumulation_steps "
|
||||
f"or micro_batch_size."
|
||||
)
|
||||
|
||||
return data
|
||||
|
||||
|
||||
class OptimizationValidationMixin:
|
||||
"""Validation methods related to optimization and performance."""
|
||||
|
||||
@@ -216,197 +216,5 @@ class TestValidateQuantPatchRestore(unittest.TestCase):
|
||||
self.assertIs(_trainer_module.validate_quantization_for_training, original)
|
||||
|
||||
|
||||
class TestVllmLoraSyncPatch(unittest.TestCase):
|
||||
"""The ``_generate_single_turn`` patch wires sync_weights to the right place.
|
||||
|
||||
These tests exercise the patch-installation branch in isolation. They build
|
||||
a stub trainer with just enough attributes to look like
|
||||
``AsyncGRPOTrainer`` for the duration of the relevant code path.
|
||||
|
||||
Background — there are two correct behaviors and we historically had a bug
|
||||
where both modes used the same one:
|
||||
|
||||
- Async prefetch ON: the BG generation thread can't safely call
|
||||
sync_weights mid-rollout. We no-op the stock hook and drive sync from
|
||||
the main thread via ``_maybe_sync_vllm_weights``.
|
||||
- Async prefetch OFF: TRL's stock ``_generate_single_turn`` already
|
||||
calls ``sync_weights`` once per step boundary on the main thread. We
|
||||
wire that hook directly to ``_sync_lora_adapter`` because
|
||||
``_maybe_sync_vllm_weights`` short-circuits when async is off.
|
||||
|
||||
Before the fix, both modes installed ``lambda: None``, so sync mode never
|
||||
pushed any LoRA adapter to vLLM and the trainer was a no-op.
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def _make_stub_trainer(*, vllm_lora_sync, async_prefetch):
|
||||
from axolotl.core.trainers.grpo.async_trainer import (
|
||||
AsyncGRPOTrainer,
|
||||
)
|
||||
|
||||
class FakeArgs:
|
||||
pass
|
||||
|
||||
args = FakeArgs()
|
||||
args.vllm_lora_sync = vllm_lora_sync
|
||||
args.async_prefetch = async_prefetch
|
||||
|
||||
class FakeVllmGen:
|
||||
sync_weights = staticmethod(lambda: None)
|
||||
model = MagicMock()
|
||||
|
||||
# Use object.__new__ so we don't run __init__ (which needs a real
|
||||
# model, dataset, etc.). We only need the `_generate_single_turn`
|
||||
# method's patch branch to run, so we set up the minimum state.
|
||||
trainer = object.__new__(AsyncGRPOTrainer)
|
||||
trainer.args = args
|
||||
trainer.use_vllm = True
|
||||
trainer.vllm_generation = FakeVllmGen()
|
||||
trainer._patched_sync_weights = False
|
||||
# Spy on _sync_lora_adapter so we can assert it's the function the
|
||||
# hook delegates to in sync mode.
|
||||
trainer._sync_lora_adapter = MagicMock(name="_sync_lora_adapter_spy")
|
||||
trainer._sync_peft_weights_no_merge = MagicMock(
|
||||
name="_sync_peft_weights_no_merge_spy"
|
||||
)
|
||||
return trainer
|
||||
|
||||
@staticmethod
|
||||
def _run_patch_branch(trainer):
|
||||
"""Execute just the sync_weights-patching branch in isolation.
|
||||
|
||||
We can't easily call the real ``_generate_single_turn`` because it
|
||||
does a full vLLM generate. Instead we copy the exact branch out of
|
||||
the source so the test verifies the same logic the trainer runs.
|
||||
"""
|
||||
if not getattr(trainer, "_patched_sync_weights", False):
|
||||
if trainer.use_vllm and hasattr(trainer, "vllm_generation"):
|
||||
if getattr(trainer.args, "vllm_lora_sync", False):
|
||||
if getattr(trainer.args, "async_prefetch", False):
|
||||
trainer.vllm_generation.sync_weights = lambda: None
|
||||
else:
|
||||
sync_helper = trainer._sync_lora_adapter
|
||||
|
||||
def _lora_filesystem_sync():
|
||||
sync_helper()
|
||||
|
||||
trainer.vllm_generation.sync_weights = _lora_filesystem_sync
|
||||
trainer._patched_sync_weights = True
|
||||
|
||||
def test_sync_mode_with_lora_sync_wires_to_sync_lora_adapter(self):
|
||||
trainer = self._make_stub_trainer(vllm_lora_sync=True, async_prefetch=False)
|
||||
self._run_patch_branch(trainer)
|
||||
|
||||
assert trainer._patched_sync_weights is True
|
||||
# Trigger the patched hook — it must call _sync_lora_adapter.
|
||||
trainer.vllm_generation.sync_weights()
|
||||
trainer._sync_lora_adapter.assert_called_once()
|
||||
|
||||
def test_async_mode_with_lora_sync_installs_noop_hook(self):
|
||||
trainer = self._make_stub_trainer(vllm_lora_sync=True, async_prefetch=True)
|
||||
self._run_patch_branch(trainer)
|
||||
|
||||
assert trainer._patched_sync_weights is True
|
||||
# Hook must be a no-op so BG-thread generation doesn't fight the
|
||||
# main-thread optimizer step over the model weights.
|
||||
trainer.vllm_generation.sync_weights()
|
||||
trainer._sync_lora_adapter.assert_not_called()
|
||||
|
||||
def test_sync_mode_with_lora_sync_does_not_call_during_install(self):
|
||||
"""Installing the patch should not pre-emptively sync."""
|
||||
trainer = self._make_stub_trainer(vllm_lora_sync=True, async_prefetch=False)
|
||||
self._run_patch_branch(trainer)
|
||||
# _sync_lora_adapter should only be called when the patched hook
|
||||
# itself is invoked (e.g., from TRL's _generate_single_turn).
|
||||
trainer._sync_lora_adapter.assert_not_called()
|
||||
|
||||
def test_patch_is_idempotent(self):
|
||||
trainer = self._make_stub_trainer(vllm_lora_sync=True, async_prefetch=False)
|
||||
self._run_patch_branch(trainer)
|
||||
first_hook = trainer.vllm_generation.sync_weights
|
||||
# Second call must not re-patch (otherwise we'd lose the original).
|
||||
self._run_patch_branch(trainer)
|
||||
assert trainer.vllm_generation.sync_weights is first_hook
|
||||
|
||||
|
||||
class TestMaybeSyncVllmWeightsIntervalDefault(unittest.TestCase):
|
||||
"""``_maybe_sync_vllm_weights`` must not crash when interval is unset.
|
||||
|
||||
Before the fix, ``step % self.args.vllm_sync_interval`` would TypeError
|
||||
on the very first call when ``vllm_sync_interval`` was ``None`` (which
|
||||
is the default for any config that doesn't explicitly set it). We now
|
||||
fall back to interval=1 so unset means "sync every step", matching the
|
||||
behavior of TRL's own ``_generate_single_turn``.
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def _make_stub_trainer(interval, async_prefetch):
|
||||
from axolotl.core.trainers.grpo.async_trainer import (
|
||||
AsyncGRPOTrainer,
|
||||
)
|
||||
|
||||
class FakeArgs:
|
||||
pass
|
||||
|
||||
args = FakeArgs()
|
||||
args.async_prefetch = async_prefetch
|
||||
args.vllm_sync_interval = interval
|
||||
args.vllm_lora_sync = True
|
||||
|
||||
class FakeState:
|
||||
global_step = 1
|
||||
|
||||
trainer = object.__new__(AsyncGRPOTrainer)
|
||||
trainer.args = args
|
||||
trainer.use_vllm = True
|
||||
trainer.state = FakeState()
|
||||
trainer._last_synced_step = 0
|
||||
trainer._sync_lora_adapter = MagicMock(name="sync_spy")
|
||||
return trainer
|
||||
|
||||
def test_interval_none_in_async_mode_does_not_crash(self):
|
||||
trainer = self._make_stub_trainer(interval=None, async_prefetch=True)
|
||||
from axolotl.core.trainers.grpo.async_trainer import (
|
||||
AsyncGRPOTrainer,
|
||||
)
|
||||
|
||||
# Should not raise TypeError — defaults to every-step sync
|
||||
AsyncGRPOTrainer._maybe_sync_vllm_weights(trainer)
|
||||
trainer._sync_lora_adapter.assert_called_once()
|
||||
|
||||
def test_sync_mode_drives_sync(self):
|
||||
"""Sync mode must fire ``_sync_lora_adapter`` from ``_maybe_sync_vllm_weights``.
|
||||
|
||||
The previous behavior (early return when ``not async_prefetch``)
|
||||
assumed TRL's stock ``_generate_single_turn`` would handle sync.
|
||||
That's true for vanilla GRPO but FALSE for NeMo Gym multi-turn
|
||||
where the data producer bypasses ``_generate_single_turn``
|
||||
entirely. Without this trigger no sync ever happens and the
|
||||
trainer becomes a no-op.
|
||||
"""
|
||||
trainer = self._make_stub_trainer(interval=1, async_prefetch=False)
|
||||
from axolotl.core.trainers.grpo.async_trainer import (
|
||||
AsyncGRPOTrainer,
|
||||
)
|
||||
|
||||
AsyncGRPOTrainer._maybe_sync_vllm_weights(trainer)
|
||||
trainer._sync_lora_adapter.assert_called_once()
|
||||
|
||||
def test_async_mode_with_explicit_interval_respects_modulo(self):
|
||||
trainer = self._make_stub_trainer(interval=4, async_prefetch=True)
|
||||
from axolotl.core.trainers.grpo.async_trainer import (
|
||||
AsyncGRPOTrainer,
|
||||
)
|
||||
|
||||
# global_step=1, interval=4 → 1 % 4 != 0 → no sync
|
||||
AsyncGRPOTrainer._maybe_sync_vllm_weights(trainer)
|
||||
trainer._sync_lora_adapter.assert_not_called()
|
||||
|
||||
# global_step=4 → 4 % 4 == 0 → sync
|
||||
trainer.state.global_step = 4
|
||||
AsyncGRPOTrainer._maybe_sync_vllm_weights(trainer)
|
||||
trainer._sync_lora_adapter.assert_called_once()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
@@ -422,22 +422,22 @@ class TestPeftLoRAWeightExtraction:
|
||||
)
|
||||
|
||||
# gate_up_proj [E, 2*inter, hidden]
|
||||
# peft: in_features=2*inter (dim 1), out_features=hidden (dim 2)
|
||||
# peft: in_features=hidden (last dim), out_features=2*inter (middle dim)
|
||||
assert trainable[
|
||||
"base_model.model.moe.experts.base_layer.lora_A.default.weight"
|
||||
].shape == (E * r, 2 * config.intermediate_size)
|
||||
assert trainable[
|
||||
"base_model.model.moe.experts.base_layer.lora_B.default.weight"
|
||||
].shape == (config.hidden_size, E * r)
|
||||
|
||||
# down_proj [E, hidden, inter]
|
||||
# peft: in_features=hidden (dim 1), out_features=inter (dim 2)
|
||||
assert trainable[
|
||||
"base_model.model.moe.experts.lora_A.default.weight"
|
||||
].shape == (E * r, config.hidden_size)
|
||||
assert trainable[
|
||||
"base_model.model.moe.experts.base_layer.lora_B.default.weight"
|
||||
].shape == (2 * config.intermediate_size, E * r)
|
||||
|
||||
# down_proj [E, hidden, inter]
|
||||
# peft: in_features=inter (last dim), out_features=hidden (middle dim)
|
||||
assert trainable[
|
||||
"base_model.model.moe.experts.lora_A.default.weight"
|
||||
].shape == (E * r, config.intermediate_size)
|
||||
assert trainable[
|
||||
"base_model.model.moe.experts.lora_B.default.weight"
|
||||
].shape == (config.intermediate_size, E * r)
|
||||
].shape == (config.hidden_size, E * r)
|
||||
|
||||
@requires_cuda
|
||||
def test_peft_forward_runs(self):
|
||||
@@ -489,26 +489,28 @@ class TestPeftLoRAWeightExtraction:
|
||||
assert down_lora is not None, "down_proj LoRA not detected"
|
||||
|
||||
# Check shapes (after peft->scattermoe conversion with A<->B swap)
|
||||
# gate_up_proj W = param.T = [E, hidden, 2*inter], K=hidden, N=2*inter
|
||||
# gate_up_proj: peft A [E*r, hidden] / B [2*inter, E*r]
|
||||
# After swap: smoe_A [E*r, 2*inter], smoe_B [hidden, E*r]
|
||||
E, r = config.num_experts, 4
|
||||
gup_A, gup_B, gup_s = gup_lora
|
||||
assert gup_A.shape == (E * r, config.hidden_size), (
|
||||
f"gate_up_proj smoe_A: expected [r*E, K=hidden]={(E * r, config.hidden_size)}, "
|
||||
assert gup_A.shape == (E * r, 2 * config.intermediate_size), (
|
||||
f"gate_up_proj smoe_A: expected [r*E, 2*inter]={(E * r, 2 * config.intermediate_size)}, "
|
||||
f"got {gup_A.shape}"
|
||||
)
|
||||
assert gup_B.shape == (2 * config.intermediate_size, E * r), (
|
||||
f"gate_up_proj smoe_B: expected [N=2*inter, r*E]="
|
||||
f"{(2 * config.intermediate_size, E * r)}, got {gup_B.shape}"
|
||||
assert gup_B.shape == (config.hidden_size, E * r), (
|
||||
f"gate_up_proj smoe_B: expected [hidden, r*E]="
|
||||
f"{(config.hidden_size, E * r)}, got {gup_B.shape}"
|
||||
)
|
||||
|
||||
# down_proj W = param.T = [E, inter, hidden], K=inter, N=hidden
|
||||
# down_proj: peft A [E*r, inter] / B [hidden, E*r]
|
||||
# After swap: smoe_A [E*r, hidden], smoe_B [inter, E*r]
|
||||
down_A, down_B, down_s = down_lora
|
||||
assert down_A.shape == (E * r, config.intermediate_size), (
|
||||
f"down_proj smoe_A: expected [r*E, K=inter]={(E * r, config.intermediate_size)}, "
|
||||
assert down_A.shape == (E * r, config.hidden_size), (
|
||||
f"down_proj smoe_A: expected [r*E, hidden]={(E * r, config.hidden_size)}, "
|
||||
f"got {down_A.shape}"
|
||||
)
|
||||
assert down_B.shape == (config.hidden_size, E * r), (
|
||||
f"down_proj smoe_B: expected [N=hidden, r*E]={(config.hidden_size, E * r)}, "
|
||||
assert down_B.shape == (config.intermediate_size, E * r), (
|
||||
f"down_proj smoe_B: expected [inter, r*E]={(config.intermediate_size, E * r)}, "
|
||||
f"got {down_B.shape}"
|
||||
)
|
||||
|
||||
|
||||
@@ -361,329 +361,6 @@ class TestPluginDefaults(unittest.TestCase):
|
||||
assert cfg.dataloader_num_workers == 0
|
||||
|
||||
|
||||
class TestSelectWeightSyncTransport(unittest.TestCase):
|
||||
"""Pure-logic table tests for ``select_weight_sync_transport``."""
|
||||
|
||||
def _caps(self, **kwargs):
|
||||
from axolotl.integrations.nemo_gym.plugin import VLLMWeightSyncCapabilities
|
||||
|
||||
c = VLLMWeightSyncCapabilities(probed=True)
|
||||
for k, v in kwargs.items():
|
||||
setattr(c, k, v)
|
||||
return c
|
||||
|
||||
def test_lora_with_native_endpoint(self):
|
||||
from axolotl.integrations.nemo_gym.plugin import select_weight_sync_transport
|
||||
|
||||
caps = self._caps(lora_filesystem=True)
|
||||
assert (
|
||||
select_weight_sync_transport(caps, has_lora=True, vllm_lora_sync_pref=True)
|
||||
== "lora_filesystem"
|
||||
)
|
||||
|
||||
def test_lora_with_axolotl_endpoint(self):
|
||||
from axolotl.integrations.nemo_gym.plugin import select_weight_sync_transport
|
||||
|
||||
caps = self._caps(lora_axolotl=True)
|
||||
assert (
|
||||
select_weight_sync_transport(caps, has_lora=True, vllm_lora_sync_pref=False)
|
||||
== "lora_filesystem"
|
||||
)
|
||||
|
||||
def test_lora_falls_back_to_nccl_when_no_lora_endpoint(self):
|
||||
from axolotl.integrations.nemo_gym.plugin import select_weight_sync_transport
|
||||
|
||||
caps = self._caps(nccl=True)
|
||||
assert (
|
||||
select_weight_sync_transport(caps, has_lora=True, vllm_lora_sync_pref=False)
|
||||
== "nccl"
|
||||
)
|
||||
|
||||
def test_full_param_prefers_nccl(self):
|
||||
from axolotl.integrations.nemo_gym.plugin import select_weight_sync_transport
|
||||
|
||||
caps = self._caps(nccl=True, http_full=True)
|
||||
assert (
|
||||
select_weight_sync_transport(
|
||||
caps, has_lora=False, vllm_lora_sync_pref=False
|
||||
)
|
||||
== "nccl"
|
||||
)
|
||||
|
||||
def test_full_param_falls_back_to_http(self):
|
||||
from axolotl.integrations.nemo_gym.plugin import select_weight_sync_transport
|
||||
|
||||
caps = self._caps(http_full=True)
|
||||
assert (
|
||||
select_weight_sync_transport(
|
||||
caps, has_lora=False, vllm_lora_sync_pref=False
|
||||
)
|
||||
== "http_full"
|
||||
)
|
||||
|
||||
def test_full_param_no_routes_returns_none(self):
|
||||
from axolotl.integrations.nemo_gym.plugin import select_weight_sync_transport
|
||||
|
||||
caps = self._caps() # all False
|
||||
assert (
|
||||
select_weight_sync_transport(
|
||||
caps, has_lora=False, vllm_lora_sync_pref=False
|
||||
)
|
||||
== "none"
|
||||
)
|
||||
|
||||
def test_lora_no_routes_returns_none(self):
|
||||
from axolotl.integrations.nemo_gym.plugin import select_weight_sync_transport
|
||||
|
||||
caps = self._caps()
|
||||
assert (
|
||||
select_weight_sync_transport(caps, has_lora=True, vllm_lora_sync_pref=True)
|
||||
== "none"
|
||||
)
|
||||
|
||||
|
||||
class TestProbeVllmWeightSync(unittest.TestCase):
|
||||
"""``probe_vllm_weight_sync`` reads a vLLM ``/openapi.json`` and reports caps."""
|
||||
|
||||
def test_stock_vllm_with_lora_enabled(self):
|
||||
"""Stock ``vllm serve --enable-lora`` exposes only LoRA endpoints."""
|
||||
from unittest.mock import patch
|
||||
|
||||
from axolotl.integrations.nemo_gym.plugin import probe_vllm_weight_sync
|
||||
|
||||
spec = {
|
||||
"paths": {
|
||||
"/v1/models": {"get": {}},
|
||||
"/v1/load_lora_adapter": {"post": {}},
|
||||
"/v1/unload_lora_adapter": {"post": {}},
|
||||
"/v1/completions": {"post": {}},
|
||||
}
|
||||
}
|
||||
with patch("requests.get") as mock_get:
|
||||
mock_get.return_value.raise_for_status = lambda: None
|
||||
mock_get.return_value.json = lambda: spec
|
||||
caps = probe_vllm_weight_sync("http://localhost:8000")
|
||||
|
||||
assert caps.probed is True
|
||||
assert caps.lora_filesystem is True
|
||||
assert caps.lora_axolotl is False
|
||||
assert caps.nccl is False
|
||||
assert caps.http_full is False
|
||||
|
||||
def test_axolotl_serve_lora_full_capabilities(self):
|
||||
"""``axolotl vllm-serve`` exposes NCCL + LoRA + HTTP full sync."""
|
||||
from unittest.mock import patch
|
||||
|
||||
from axolotl.integrations.nemo_gym.plugin import probe_vllm_weight_sync
|
||||
|
||||
spec = {
|
||||
"paths": {
|
||||
"/init_communicator/": {"post": {}},
|
||||
"/update_named_param/": {"post": {}},
|
||||
"/batch_update_named_params/": {"post": {}},
|
||||
"/set_lora_adapter/": {"post": {}},
|
||||
"/clear_lora_adapter/": {"post": {}},
|
||||
"/http_update_weights/": {"post": {}},
|
||||
"/v1/load_lora_adapter": {"post": {}},
|
||||
}
|
||||
}
|
||||
with patch("requests.get") as mock_get:
|
||||
mock_get.return_value.raise_for_status = lambda: None
|
||||
mock_get.return_value.json = lambda: spec
|
||||
caps = probe_vllm_weight_sync("http://localhost:8000")
|
||||
|
||||
assert caps.probed is True
|
||||
assert caps.nccl is True
|
||||
assert caps.lora_axolotl is True
|
||||
assert caps.lora_filesystem is True
|
||||
assert caps.http_full is True
|
||||
|
||||
def test_trl_vllm_serve_nccl_only(self):
|
||||
"""``trl vllm-serve`` exposes NCCL routes but not LoRA filesystem."""
|
||||
from unittest.mock import patch
|
||||
|
||||
from axolotl.integrations.nemo_gym.plugin import probe_vllm_weight_sync
|
||||
|
||||
spec = {
|
||||
"paths": {
|
||||
"/init_communicator/": {"post": {}},
|
||||
"/update_named_param/": {"post": {}},
|
||||
"/batch_update_named_params/": {"post": {}},
|
||||
"/close_communicator/": {"post": {}},
|
||||
"/generate/": {"post": {}},
|
||||
}
|
||||
}
|
||||
with patch("requests.get") as mock_get:
|
||||
mock_get.return_value.raise_for_status = lambda: None
|
||||
mock_get.return_value.json = lambda: spec
|
||||
caps = probe_vllm_weight_sync("http://localhost:8000")
|
||||
|
||||
assert caps.probed is True
|
||||
assert caps.nccl is True
|
||||
assert caps.lora_filesystem is False
|
||||
assert caps.lora_axolotl is False
|
||||
assert caps.http_full is False
|
||||
|
||||
def test_unreachable_server_records_error(self):
|
||||
from unittest.mock import patch
|
||||
|
||||
from axolotl.integrations.nemo_gym.plugin import probe_vllm_weight_sync
|
||||
|
||||
with patch("requests.get") as mock_get:
|
||||
mock_get.side_effect = ConnectionError("Connection refused")
|
||||
caps = probe_vllm_weight_sync("http://localhost:9999")
|
||||
|
||||
assert caps.probed is False
|
||||
assert caps.probe_error is not None
|
||||
assert "ConnectionError" in caps.probe_error
|
||||
assert caps.nccl is False
|
||||
assert caps.lora_filesystem is False
|
||||
|
||||
|
||||
class TestPluginWeightSyncEnforcement(unittest.TestCase):
|
||||
"""End-to-end test of post_trainer_create's transport-selection branch.
|
||||
|
||||
The plugin used to silently no-op weight sync when ``vllm_lora_sync: false``,
|
||||
leaving the trainer learning in isolation while vLLM kept serving the
|
||||
unmodified base model. After the fix:
|
||||
|
||||
- LoRA + LoRA-loading endpoint → installs filesystem LoRA sync
|
||||
- LoRA + only NCCL endpoint → uses NCCL broadcast
|
||||
- Full FT + NCCL endpoint → uses NCCL broadcast (standard TRL flow)
|
||||
- Full FT + HTTP endpoint → raises NotImplementedError (step 3)
|
||||
- No usable transport → raises ValueError with a precise diagnosis
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def _fake_cfg(adapter, vllm_lora_sync):
|
||||
class FakeTRL:
|
||||
pass
|
||||
|
||||
class FakeCfg:
|
||||
pass
|
||||
|
||||
trl = FakeTRL()
|
||||
trl.vllm_lora_sync = vllm_lora_sync
|
||||
trl.vllm_server_host = "127.0.0.1"
|
||||
trl.vllm_server_port = 8000
|
||||
|
||||
cfg = FakeCfg()
|
||||
cfg.nemo_gym_enabled = True
|
||||
cfg.nemo_gym_model_name = None
|
||||
cfg.base_model = "test/model"
|
||||
cfg.nemo_gym_verify_timeout = 30
|
||||
cfg.nemo_gym_multi_turn = True
|
||||
cfg.adapter = adapter
|
||||
cfg.trl = trl
|
||||
return cfg
|
||||
|
||||
@staticmethod
|
||||
def _fake_trainer():
|
||||
class FakeVLLMGen:
|
||||
sync_weights = staticmethod(lambda: None)
|
||||
|
||||
class FakeTrainer:
|
||||
vllm_generation = FakeVLLMGen()
|
||||
|
||||
return FakeTrainer()
|
||||
|
||||
@staticmethod
|
||||
def _caps(**kwargs):
|
||||
from axolotl.integrations.nemo_gym.plugin import VLLMWeightSyncCapabilities
|
||||
|
||||
c = VLLMWeightSyncCapabilities(probed=True)
|
||||
for k, v in kwargs.items():
|
||||
setattr(c, k, v)
|
||||
return c
|
||||
|
||||
def test_lora_with_lora_endpoint_installs_filesystem_sync(self):
|
||||
from unittest.mock import patch
|
||||
|
||||
from axolotl.integrations.nemo_gym.plugin import NemoGymPlugin
|
||||
|
||||
plugin = NemoGymPlugin()
|
||||
plugin._vllm_caps = self._caps(lora_filesystem=True)
|
||||
cfg = self._fake_cfg(adapter="lora", vllm_lora_sync=True)
|
||||
trainer = self._fake_trainer()
|
||||
|
||||
with (
|
||||
patch.object(plugin, "_setup_lora_sync") as setup,
|
||||
patch.object(plugin, "_check_lora_endpoint") as check,
|
||||
patch.object(plugin, "_wire_multi_turn") as wire,
|
||||
):
|
||||
plugin.post_trainer_create(cfg, trainer)
|
||||
setup.assert_called_once()
|
||||
check.assert_called_once()
|
||||
wire.assert_called_once()
|
||||
|
||||
def test_lora_with_no_routes_raises_with_lora_specific_message(self):
|
||||
from axolotl.integrations.nemo_gym.plugin import NemoGymPlugin
|
||||
|
||||
plugin = NemoGymPlugin()
|
||||
plugin._vllm_caps = self._caps() # all False, but probed
|
||||
cfg = self._fake_cfg(adapter="lora", vllm_lora_sync=False)
|
||||
trainer = self._fake_trainer()
|
||||
|
||||
with self.assertRaises(ValueError) as ctx:
|
||||
plugin.post_trainer_create(cfg, trainer)
|
||||
msg = str(ctx.exception)
|
||||
assert "no-op trainer" in msg
|
||||
assert "load_lora_adapter" in msg
|
||||
assert "VLLM_ALLOW_RUNTIME_LORA_UPDATING" in msg
|
||||
|
||||
def test_full_finetune_with_nccl_endpoint_uses_nccl(self):
|
||||
from unittest.mock import patch
|
||||
|
||||
from axolotl.integrations.nemo_gym.plugin import NemoGymPlugin
|
||||
|
||||
plugin = NemoGymPlugin()
|
||||
plugin._vllm_caps = self._caps(nccl=True)
|
||||
cfg = self._fake_cfg(adapter=None, vllm_lora_sync=False)
|
||||
trainer = self._fake_trainer()
|
||||
|
||||
with patch.object(plugin, "_wire_multi_turn") as wire:
|
||||
plugin.post_trainer_create(cfg, trainer)
|
||||
wire.assert_called_once()
|
||||
|
||||
def test_full_finetune_with_http_endpoint_not_implemented_yet(self):
|
||||
from axolotl.integrations.nemo_gym.plugin import NemoGymPlugin
|
||||
|
||||
plugin = NemoGymPlugin()
|
||||
plugin._vllm_caps = self._caps(http_full=True)
|
||||
cfg = self._fake_cfg(adapter=None, vllm_lora_sync=False)
|
||||
trainer = self._fake_trainer()
|
||||
with self.assertRaises(NotImplementedError) as ctx:
|
||||
plugin.post_trainer_create(cfg, trainer)
|
||||
assert "HTTP weight sync" in str(ctx.exception)
|
||||
|
||||
def test_full_finetune_with_no_routes_raises_with_full_param_message(self):
|
||||
from axolotl.integrations.nemo_gym.plugin import NemoGymPlugin
|
||||
|
||||
plugin = NemoGymPlugin()
|
||||
plugin._vllm_caps = self._caps()
|
||||
cfg = self._fake_cfg(adapter=None, vllm_lora_sync=False)
|
||||
trainer = self._fake_trainer()
|
||||
with self.assertRaises(ValueError) as ctx:
|
||||
plugin.post_trainer_create(cfg, trainer)
|
||||
msg = str(ctx.exception)
|
||||
assert "no-op trainer" in msg
|
||||
assert "init_communicator" in msg
|
||||
assert "http_update_weights" in msg
|
||||
|
||||
def test_unprobed_caps_raises_with_probe_failure_message(self):
|
||||
from axolotl.integrations.nemo_gym.plugin import NemoGymPlugin
|
||||
|
||||
plugin = NemoGymPlugin()
|
||||
# Plugin._vllm_caps left as default-None: the post_trainer_create
|
||||
# branch falls back to a fresh VLLMWeightSyncCapabilities() with
|
||||
# probed=False, so the error path should mention probing.
|
||||
cfg = self._fake_cfg(adapter="lora", vllm_lora_sync=True)
|
||||
trainer = self._fake_trainer()
|
||||
with self.assertRaises(ValueError) as ctx:
|
||||
plugin.post_trainer_create(cfg, trainer)
|
||||
assert "could not probe" in str(ctx.exception)
|
||||
|
||||
|
||||
class TestNemoGymE2E(unittest.TestCase):
|
||||
"""End-to-end test: data producer → agent (mocked) → parse → tensors → rewards.
|
||||
|
||||
|
||||
@@ -5,8 +5,6 @@ Covers:
|
||||
- save_strategy: 'best' requires metric_for_best_model
|
||||
- streaming=True with val_set_size > 0 is rejected
|
||||
- lora_target_modules with invalid regex patterns is rejected
|
||||
- GRPO: generation batch size must be divisible by num_generations,
|
||||
num_generations >= 2, and effective_gbs >= num_generations * world_size
|
||||
"""
|
||||
|
||||
import pytest
|
||||
@@ -119,136 +117,3 @@ class TestLoraTargetModulesRegexValidator:
|
||||
)
|
||||
with pytest.raises(ValueError, match="invalid regex pattern"):
|
||||
validate_config(cfg)
|
||||
|
||||
|
||||
class TestGRPOBatchSizeValidator:
|
||||
"""GRPO requires (mb*GA) % num_generations == 0 and num_generations >= 2.
|
||||
|
||||
These call the @model_validator(mode="before") classmethod directly on a
|
||||
plain dict — same input shape it receives during full Pydantic validation,
|
||||
just without dragging in unrelated fields (datasets / model loading / etc.)
|
||||
that aren't relevant to what's under test. The validator is registered on
|
||||
``RLValidationMixin`` (which ``AxolotlInputConfig`` inherits) so this is the
|
||||
same code path ``axolotl train`` exercises.
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def _check(data):
|
||||
from axolotl.utils.schemas.validation import RLValidationMixin
|
||||
|
||||
return RLValidationMixin.check_grpo_batch_size_divisibility(data)
|
||||
|
||||
def test_divisible_passes(self):
|
||||
data = {
|
||||
"rl": "grpo",
|
||||
"micro_batch_size": 1,
|
||||
"gradient_accumulation_steps": 4,
|
||||
"trl": {"num_generations": 4},
|
||||
}
|
||||
# Should return data unchanged (no exception)
|
||||
out = self._check(data)
|
||||
assert out["trl"]["num_generations"] == 4
|
||||
|
||||
def test_non_divisible_raises(self):
|
||||
data = {
|
||||
"rl": "grpo",
|
||||
"micro_batch_size": 1,
|
||||
"gradient_accumulation_steps": 2,
|
||||
"trl": {"num_generations": 4},
|
||||
}
|
||||
with pytest.raises(ValueError, match="num_generations"):
|
||||
self._check(data)
|
||||
|
||||
def test_non_divisible_error_includes_fix_hint(self):
|
||||
data = {
|
||||
"rl": "grpo",
|
||||
"micro_batch_size": 1,
|
||||
"gradient_accumulation_steps": 3,
|
||||
"trl": {"num_generations": 4},
|
||||
}
|
||||
with pytest.raises(ValueError, match="gradient_accumulation_steps: 4"):
|
||||
self._check(data)
|
||||
|
||||
def test_num_generations_one_raises(self):
|
||||
data = {
|
||||
"rl": "grpo",
|
||||
"micro_batch_size": 1,
|
||||
"gradient_accumulation_steps": 4,
|
||||
"trl": {"num_generations": 1},
|
||||
}
|
||||
with pytest.raises(ValueError, match=r"num_generations >= 2"):
|
||||
self._check(data)
|
||||
|
||||
def test_explicit_generation_batch_size_divisible_passes(self):
|
||||
data = {
|
||||
"rl": "grpo",
|
||||
"micro_batch_size": 1,
|
||||
"gradient_accumulation_steps": 1,
|
||||
"trl": {"num_generations": 4, "generation_batch_size": 8},
|
||||
}
|
||||
out = self._check(data)
|
||||
assert out["trl"]["generation_batch_size"] == 8
|
||||
|
||||
def test_explicit_generation_batch_size_non_divisible_raises(self):
|
||||
data = {
|
||||
"rl": "grpo",
|
||||
"micro_batch_size": 1,
|
||||
"gradient_accumulation_steps": 1,
|
||||
"trl": {"num_generations": 4, "generation_batch_size": 6},
|
||||
}
|
||||
with pytest.raises(ValueError, match="trl.generation_batch_size"):
|
||||
self._check(data)
|
||||
|
||||
def test_non_grpo_skips_check(self):
|
||||
# Anything other than rl=grpo should pass through untouched, even
|
||||
# with non-divisible batch sizes — they're irrelevant to other RL
|
||||
# methods that don't use group-relative advantages.
|
||||
data = {
|
||||
"rl": "dpo",
|
||||
"micro_batch_size": 1,
|
||||
"gradient_accumulation_steps": 3,
|
||||
"trl": {"num_generations": 4},
|
||||
}
|
||||
assert self._check(data) is data
|
||||
|
||||
def test_no_rl_set_skips_check(self):
|
||||
data = {
|
||||
"micro_batch_size": 1,
|
||||
"gradient_accumulation_steps": 3,
|
||||
}
|
||||
assert self._check(data) is data
|
||||
|
||||
def test_grpo_without_num_generations_skips_check(self):
|
||||
# If num_generations isn't set, TRL uses its own default — we don't
|
||||
# have enough info to validate, so the validator must short-circuit
|
||||
# rather than guess.
|
||||
data = {
|
||||
"rl": "grpo",
|
||||
"micro_batch_size": 1,
|
||||
"gradient_accumulation_steps": 3,
|
||||
"trl": {},
|
||||
}
|
||||
out = self._check(data)
|
||||
assert out["rl"] == "grpo"
|
||||
|
||||
def test_multi_rank_group_size_check(self):
|
||||
data = {
|
||||
"rl": "grpo",
|
||||
"micro_batch_size": 1,
|
||||
"gradient_accumulation_steps": 4, # gbs=4
|
||||
"world_size": 2, # need gbs >= 4*2 = 8
|
||||
"trl": {"num_generations": 4},
|
||||
}
|
||||
with pytest.raises(ValueError, match=r"world_size=2"):
|
||||
self._check(data)
|
||||
|
||||
def test_multi_rank_group_size_satisfied(self):
|
||||
data = {
|
||||
"rl": "grpo",
|
||||
"micro_batch_size": 1,
|
||||
"gradient_accumulation_steps": 8, # gbs=8 >= 4*2
|
||||
"world_size": 2,
|
||||
"trl": {"num_generations": 4},
|
||||
}
|
||||
out = self._check(data)
|
||||
assert out["gradient_accumulation_steps"] == 8
|
||||
|
||||
Reference in New Issue
Block a user