From 6778856804e2c6d5f9903006916f9cd4bfb27756 Mon Sep 17 00:00:00 2001 From: NanoCode012 Date: Fri, 30 May 2025 11:21:47 +0700 Subject: [PATCH] Fix: RL base feature parity (#2133) * feat: add num_proc and load from cache for rl mapping * fix: refactor sft and rl trainer to set same base args * feat: add report_to to set run name * fix: consolidate handling of fp16, bf16, tf32 kwarg * chore: consolidate eval_strat, loraplus, lr sched, max_length * fix: deprecate old types * fix: adding missing Any * fix: max_steps incorrectly set * fix: remove unnecessary datacollator kwarg insert and pop * fix: update default max_steps * fix: add missing weight_decay handling * fix: ignore max_length for grpo * feat: update CI on trainer_builder * fix: comments * improve handling of warmup/logging steps * use transformers default for logging steps, not None * fix: remove redundant override * fix: lint * feat: allow custom optim for rl methods * fix: duplicate optim setting * fix(test): set sequence_parallel_degree default in base cfg * feat: add handling for seed and SP/ring-attn config * chore: add back return typing from rebase * fix(test): use RLType directly to skip needing to validate * feat: split training builder into sub modules * fix: remove deprecated clause * chore: add missing config to doc * fix: update quarto autodoc * fix: import path for trainer builder and submodules * fix: remove redundant configs from rebase mistake * chore: simplify dynamo check * fix: optimizer_cls_and_kwargs to be passed into trainer_kwargs * fix: add missing rex from rebase * fix: move pop optimizer_cls_and_kwargs * fix: pop optimizer cls in rl too * fix: leftover bug from rebase * fix: update handling of trainer_cls in RL * fix: address pr feedback * feat: call hook_pre_create_trainer for rl * chore: lint * fix: return notimplemented for ppo * feat: moved torch compile to base and refactor collator setting * chore: remove unused importlib.util import * fix: optimizer cls not being popped * feat: move epoch setting to base * fix: catch unhandled custom optimizer * fix: remove duplicate lora plus setting * chore: refactor if condition * chore: refactor set_base_training_args into smaller modules * fix: address TrainerBuilderBase class variables to instance var * fix: add handling for beta3 and episilon2 * fix: change to pass dict via arg instead of updating dict * chore: simplify if condition * fix: force access to lr & weight decay in case not provided to early error * fix: remove log sweep * chore: refactor if condition * fix: address renamed cfg * fix: improve handling of cosine hyp * fix: remove unused params * chore: refactor * chore: clarify doc safetensors * fix: update import path to be unified following comments * fix: duplicate kwargs passed * feat: return separate trainer_kwargs * chore: refactor * chore: refactor based on comments * chore: refactor based on comments * fix: move gpustats callback to base * chore: create trainer_cls_args first based on comments * fix: ipo label smoothing passed incorrectly * feat: add optimizer parity for RL methods with test * feat: add parity for optimizer in RM/PRM and add test * fix: remove redundant function override for orpo/cpo batch metrics * fix: improve handling of dpo_label_smoothing and merge issue * fix: test fixture returning wrong field * fix: address avoid direct modify fixture * chore: minor refactor * Revert "chore: refactor" This reverts commit 99c8859eb02d1c4bde465d1fb0290a8024d2fd87. * feat: rename trainer_builder to builders --------- Co-authored-by: Wing Lian --- _quarto.yml | 4 +- docs/config.qmd | 3 +- src/axolotl/core/builders/__init__.py | 6 + src/axolotl/core/builders/base.py | 503 +++++++ src/axolotl/core/builders/causal.py | 487 +++++++ src/axolotl/core/builders/rl.py | 246 ++++ src/axolotl/core/trainer_builder.py | 1252 ----------------- src/axolotl/core/trainers/dpo/trainer.py | 44 +- src/axolotl/core/trainers/grpo/__init__.py | 4 +- src/axolotl/core/trainers/grpo/trainer.py | 7 +- src/axolotl/core/trainers/mixins/optimizer.py | 17 + src/axolotl/core/trainers/trl.py | 169 +-- src/axolotl/core/training_args.py | 6 - src/axolotl/train.py | 2 +- src/axolotl/utils/callbacks/__init__.py | 2 +- src/axolotl/utils/callbacks/comet_.py | 2 +- src/axolotl/utils/callbacks/lisa.py | 2 +- src/axolotl/utils/callbacks/mlflow_.py | 2 +- src/axolotl/utils/data/rl.py | 3 +- src/axolotl/utils/schemas/config.py | 1 + src/axolotl/utils/trainer.py | 2 +- tests/core/test_builders.py | 595 ++++++++ tests/core/test_trainer_builder.py | 90 -- tests/e2e/test_imports.py | 4 +- 24 files changed, 1899 insertions(+), 1554 deletions(-) create mode 100644 src/axolotl/core/builders/__init__.py create mode 100644 src/axolotl/core/builders/base.py create mode 100644 src/axolotl/core/builders/causal.py create mode 100644 src/axolotl/core/builders/rl.py delete mode 100755 src/axolotl/core/trainer_builder.py create mode 100644 tests/core/test_builders.py delete mode 100644 tests/core/test_trainer_builder.py diff --git a/_quarto.yml b/_quarto.yml index e05a1c35c..a970cd08b 100644 --- a/_quarto.yml +++ b/_quarto.yml @@ -17,7 +17,9 @@ quartodoc: - convert - prompt_tokenizers - logging_config - - core.trainer_builder + - core.builders.base + - core.builders.causal + - core.builders.rl - core.training_args - core.chat.messages - core.chat.format.chatml diff --git a/docs/config.qmd b/docs/config.qmd index 5a36ca8aa..519065554 100644 --- a/docs/config.qmd +++ b/docs/config.qmd @@ -514,6 +514,7 @@ output_dir: ./completed-model # setting to `auto` will enable torch compile when torch>=2.5.1 torch_compile: # Optional[Union[Literal["auto"], bool]] torch_compile_backend: # Optional[str] +torch_compile_mode: # 'default' | 'reduce-overhead' | 'max-autotune' # Training hyperparameters @@ -560,7 +561,7 @@ profiler_steps: # enable the pytorch profiler to capture the first N steps of tr loss_watchdog_threshold: # High loss value, indicating the learning has broken down (a good estimate is ~2 times the loss at the start of training) loss_watchdog_patience: # Number of high-loss steps in a row before the trainer aborts (default: 3) -# Save model as safetensors (require safetensors package) +# Save model as safetensors (require safetensors package). Default True save_safetensors: # Whether to mask out or include the human's prompt from the training labels diff --git a/src/axolotl/core/builders/__init__.py b/src/axolotl/core/builders/__init__.py new file mode 100644 index 000000000..5bd244434 --- /dev/null +++ b/src/axolotl/core/builders/__init__.py @@ -0,0 +1,6 @@ +"""Trainer builder classes""" + +from .causal import HFCausalTrainerBuilder +from .rl import HFRLTrainerBuilder + +__all__ = ["HFCausalTrainerBuilder", "HFRLTrainerBuilder"] diff --git a/src/axolotl/core/builders/base.py b/src/axolotl/core/builders/base.py new file mode 100644 index 000000000..907de056b --- /dev/null +++ b/src/axolotl/core/builders/base.py @@ -0,0 +1,503 @@ +# Copyright 2024 Axolotl AI. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Base class for trainer builder""" + +import abc +import importlib +import logging +import sys +from abc import abstractmethod +from contextlib import suppress +from pathlib import Path +from typing import Any + +import torch +from transformers import ( + TrainerCallback, +) +from transformers.training_args import OptimizerNames + +from axolotl.integrations.base import PluginManager +from axolotl.monkeypatch.trainer.lr import patch_trainer_get_lr +from axolotl.utils import is_comet_available, is_mlflow_available +from axolotl.utils.callbacks import ( + GCCallback, + GPUStatsCallback, + SaveAxolotlConfigtoWandBCallback, +) +from axolotl.utils.callbacks.profiler import PytorchProfilerCallback +from axolotl.utils.schemas.enums import CustomSupportedOptimizers + +LOG = logging.getLogger(__name__) + +with suppress(ImportError): + import torch._dynamo # pylint: disable=ungrouped-imports + + +class TrainerBuilderBase(abc.ABC): + """Base class for trainer builder.""" + + def __init__(self, cfg, model, tokenizer, processor=None): + self.cfg = cfg + self.model = model + self.tokenizer = tokenizer + self.processor = processor + + self._train_dataset = None + self._eval_dataset = None + self._model_ref = None + self._peft_config = None + + # If the model supports tagging, add the axolotl tag. + # This makes sure the tag is correctly pushed even if a user calls + # model.push_to_hub instead of trainer.push_to_hub. + if hasattr(model, "add_model_tags"): + model.add_model_tags(["axolotl"]) + + patch_trainer_get_lr() + + @property + def model_ref(self): + return self._model_ref + + @model_ref.setter + def model_ref(self, model): + self._model_ref = model + + @property + def train_dataset(self): + return self._train_dataset + + @train_dataset.setter + def train_dataset(self, dataset): + self._train_dataset = dataset + + @property + def eval_dataset(self): + return self._eval_dataset + + @eval_dataset.setter + def eval_dataset(self, dataset): + self._eval_dataset = dataset + + @property + def peft_config(self): + return self._peft_config + + @peft_config.setter + def peft_config(self, peft_config): + self._peft_config = peft_config + + @abstractmethod + def build(self, total_num_steps): + pass + + def get_callbacks(self) -> list[TrainerCallback]: + callbacks = [] + + plugin_manager = PluginManager.get_instance() + callbacks.extend( + plugin_manager.add_callbacks_pre_trainer(cfg=self.cfg, model=self.model) + ) + + if self.cfg.profiler_steps: + callbacks.append( + PytorchProfilerCallback( + steps_to_profile=self.cfg.profiler_steps, + ) + ) + + if self.cfg.gc_steps: + callbacks.append(GCCallback(gc_steps=self.cfg.gc_steps)) + + if self.cfg.use_wandb: + callbacks.append( + SaveAxolotlConfigtoWandBCallback(self.cfg.axolotl_config_path) + ) + if self.cfg.use_mlflow and is_mlflow_available(): + from axolotl.utils.callbacks.mlflow_ import ( + SaveAxolotlConfigtoMlflowCallback, + ) + + callbacks.extend( + [ + SaveAxolotlConfigtoMlflowCallback(self.cfg.axolotl_config_path), + ] + ) + if self.cfg.use_comet and is_comet_available(): + from axolotl.utils.callbacks.comet_ import SaveAxolotlConfigtoCometCallback + + callbacks.append( + SaveAxolotlConfigtoCometCallback(self.cfg.axolotl_config_path) + ) + + callbacks.append(GPUStatsCallback(cfg=self.cfg)) + + return callbacks + + def get_post_trainer_create_callbacks(self, trainer): + """ + Callbacks added after the trainer is created, usually b/c these need access to the trainer + """ + callbacks = [] + if self.cfg.plugins: + plugin_manager = PluginManager.get_instance() + callbacks.extend( + [ + cb + for cb in plugin_manager.add_callbacks_post_trainer( + self.cfg, trainer + ) + if cb + ] + ) + return callbacks + + def hook_pre_create_training_args(self, training_arguments_kwargs): + # TODO + return training_arguments_kwargs + + def hook_post_create_training_args(self, training_arguments): + # TODO + return training_arguments + + def hook_pre_create_trainer(self, trainer_kwargs, trainer_cls): + # TODO + return trainer_kwargs, trainer_cls + + def hook_post_create_trainer(self, trainer): + # TODO + return trainer + + def _configure_warmup_and_logging( + self, total_num_steps: int, training_args_kwargs: dict + ): + warmup_steps = 0 + warmup_ratio = 0.0 + if self.cfg.warmup_steps: + warmup_steps = self.cfg.warmup_steps + elif self.cfg.warmup_ratio: + if total_num_steps: + warmup_steps = max(int(self.cfg.warmup_ratio * total_num_steps), 0) + else: + warmup_ratio = self.cfg.warmup_ratio + elif total_num_steps: + warmup_steps = min(int(0.03 * total_num_steps), 100) + else: + warmup_ratio = 0.03 + + if warmup_steps == 1: + warmup_steps = 2 + + if self.cfg.logging_steps is not None: + training_args_kwargs["logging_steps"] = self.cfg.logging_steps + else: + training_args_kwargs["logging_steps"] = ( + 500 # transformers defaults to 500 + if not total_num_steps + else max(min(int(0.005 * total_num_steps), 10), 1) + ) + + training_args_kwargs["warmup_ratio"] = warmup_ratio + training_args_kwargs["warmup_steps"] = warmup_steps + + def _configure_precision_settings(self, training_args_kwargs: dict): + training_args_kwargs["fp16"] = (self.cfg.fp16 and not self.cfg.bf16) or False + training_args_kwargs["tf32"] = self.cfg.tf32 + if self.cfg.bf16 == "full": + training_args_kwargs["bf16_full_eval"] = True + else: + training_args_kwargs["bf16"] = self.cfg.bf16 or self.cfg.bfloat16 + + def _configure_scheduler(self, training_args_kwargs: dict): + if self.cfg.lr_scheduler in ["one_cycle", "rex"]: + training_args_kwargs["lr_scheduler_type"] = "cosine" + training_args_kwargs["alternate_lr_scheduler_type"] = self.cfg.lr_scheduler + else: + training_args_kwargs["lr_scheduler_type"] = ( + self.cfg.lr_scheduler if self.cfg.lr_scheduler else "cosine" + ) + training_args_kwargs["lr_scheduler_kwargs"] = ( + self.cfg.lr_scheduler_kwargs if self.cfg.lr_scheduler_kwargs else {} + ) + + def _configure_optimizer(self, training_args_kwargs: dict, trainer_kwargs: dict): + def _configure_custom_optimizer( + training_args_kwargs: dict, trainer_kwargs: dict + ): + # Common optimizer kwargs + optimizer_kwargs = { + "lr": training_args_kwargs["learning_rate"], + "weight_decay": training_args_kwargs["weight_decay"], + } + + # Adam-specific kwargs + adam_kwargs: dict = {} + if training_args_kwargs.get("adam_beta1") and training_args_kwargs.get( + "adam_beta2" + ): + adam_kwargs["betas"] = ( + training_args_kwargs.get("adam_beta1"), + training_args_kwargs.get("adam_beta2"), + ) + if training_args_kwargs.get("adam_epsilon"): + adam_kwargs["eps"] = training_args_kwargs.get("adam_epsilon") + + if self.cfg.optimizer == "muon": + from axolotl.contribs.mit.muon import ( # pylint: disable=no-name-in-module + MuonOptimizerFactory, + ) + + optimizer_cls = MuonOptimizerFactory + optimizer_kwargs.update(adam_kwargs) + elif self.cfg.optimizer == "optimi_adamw": + from optimi import AdamW + + optimizer_kwargs["foreach"] = False + optimizer_cls = AdamW + optimizer_kwargs.update(adam_kwargs) + elif self.cfg.optimizer == "ao_adamw_4bit": + # TODO remove 20250401 + from torchao.prototype.low_bit_optim import AdamW4bit + + optimizer_cls = AdamW4bit + optimizer_kwargs.update(adam_kwargs) + + LOG.warning( + f"`ao_adamw_4bit` will be deprecated soon. Please use `{OptimizerNames.ADAMW_TORCH_4BIT}` instead." + ) + elif self.cfg.optimizer == "ao_adamw_8bit": + from torchao.prototype.low_bit_optim import AdamW8bit + + optimizer_cls = AdamW8bit + optimizer_kwargs.update(adam_kwargs) + elif self.cfg.optimizer == "ao_adamw_fp8": + from torchao.prototype.low_bit_optim import AdamWFp8 + + optimizer_cls = AdamWFp8 + optimizer_kwargs.update(adam_kwargs) + elif self.cfg.optimizer == "adopt_adamw": + from axolotl.utils.optimizers.adopt import ADOPT + + optimizer_cls = ADOPT + adam_kwargs["decouple"] = True + optimizer_kwargs.update(adam_kwargs) + elif self.cfg.optimizer == "came_pytorch": + from came_pytorch import CAME + + optimizer_cls = CAME + + beta1 = training_args_kwargs.get("adam_beta1", 0.9) + beta2 = training_args_kwargs.get("adam_beta2", 0.999) + beta3 = training_args_kwargs.get("adam_beta3", 0.9999) + eps1 = training_args_kwargs.get("adam_epsilon", 1e-30) + eps2 = training_args_kwargs.get("adam_epsilon2", 1e-16) + adam_kwargs["betas"] = (beta1, beta2, beta3) + adam_kwargs["eps"] = (eps1, eps2) + + optimizer_kwargs.update(adam_kwargs) + else: + raise ValueError( + f"Unhandled optimizer: {self.cfg.optimizer}. Please raise an Issue." + ) + + # Parse any additional optimizer args from config + if self.cfg.optim_args: + if isinstance(self.cfg.optim_args, dict): + optimizer_kwargs.update(self.cfg.optim_args) + else: + # Parse string format "key1=value1,key2=value2" + for mapping in self.cfg.optim_args.replace(" ", "").split(","): + key, value = mapping.split("=") + optimizer_kwargs[key] = value + + # Note: This is not used in training_args_kwargs, but in trainer_kwargs + trainer_kwargs["optimizer_cls_and_kwargs"] = ( + optimizer_cls, + optimizer_kwargs, + ) + + # Handle custom optimizer + custom_supported_optimizers = [opt.value for opt in CustomSupportedOptimizers] + if self.cfg.optimizer in custom_supported_optimizers: + _configure_custom_optimizer(training_args_kwargs, trainer_kwargs) + else: + # Use transformers' optimizer + training_args_kwargs["optim"] = self.cfg.optimizer + + # Parse any additional optimizer args from config + if self.cfg.optim_args: + if isinstance(self.cfg.optim_args, dict): + optim_args = ",".join( + [f"{key}={value}" for key, value in self.cfg.optim_args.items()] + ) + else: + optim_args = self.cfg.optim_args + training_args_kwargs["optim_args"] = optim_args + + if ( + self.cfg.optimizer == "adamw_anyprecision" + and Path(self.cfg.torchdistx_path).exists() + ): + sys.path.append(self.cfg.torchdistx_path) + importlib.import_module("torchdistx") + + def _configure_hub_parameters(self, training_args_kwargs: dict): + if self.cfg.hub_model_id: + training_args_kwargs["hub_model_id"] = self.cfg.hub_model_id + training_args_kwargs["push_to_hub"] = True + training_args_kwargs["hub_private_repo"] = True + training_args_kwargs["hub_always_push"] = True + + if self.cfg.hub_strategy: + training_args_kwargs["hub_strategy"] = self.cfg.hub_strategy + + def _configure_save_and_eval_strategy(self, training_args_kwargs: dict): + # save_strategy and save_steps + if self.cfg.save_steps: + training_args_kwargs["save_strategy"] = "steps" + training_args_kwargs["save_steps"] = self.cfg.save_steps + elif self.cfg.save_strategy: + training_args_kwargs["save_strategy"] = self.cfg.save_strategy + else: + # default to saving each epoch if not defined + training_args_kwargs["save_strategy"] = "epoch" + + training_args_kwargs["save_total_limit"] = ( + self.cfg.save_total_limit if self.cfg.save_total_limit else 4 + ) + + # eval_strategy and eval_steps + if not self.eval_dataset or self.cfg.val_set_size == 0: + # do not eval if no eval_dataset or val_set_size=0 + training_args_kwargs["eval_strategy"] = "no" + elif self.cfg.eval_steps: + training_args_kwargs["eval_strategy"] = "steps" + training_args_kwargs["eval_steps"] = self.cfg.eval_steps + elif self.cfg.eval_strategy: + training_args_kwargs["eval_strategy"] = self.cfg.eval_strategy + + def _configure_reporting(self, training_args_kwargs: dict): + report_to = [] + if self.cfg.use_wandb: + report_to.append("wandb") + if self.cfg.use_mlflow: + report_to.append("mlflow") + if self.cfg.use_tensorboard: + report_to.append("tensorboard") + if self.cfg.use_comet: + report_to.append("comet_ml") + + training_args_kwargs["report_to"] = report_to + + if self.cfg.use_wandb: + training_args_kwargs["run_name"] = self.cfg.wandb_name + elif self.cfg.use_mlflow: + training_args_kwargs["run_name"] = self.cfg.mlflow_run_name + else: + training_args_kwargs["run_name"] = None + + def _configure_torch_compile(self, training_args_kwargs: dict): + if self.cfg.torch_compile and getattr(torch, "_dynamo", None): + torch._dynamo.config.suppress_errors = ( # pylint: disable=protected-access + True + ) + training_args_kwargs["torch_compile"] = self.cfg.torch_compile + if self.cfg.torch_compile_backend: + training_args_kwargs["torch_compile_backend"] = ( + self.cfg.torch_compile_backend + ) + if self.cfg.torch_compile_mode: + training_args_kwargs["torch_compile_mode"] = self.cfg.torch_compile_mode + + def _configure_gradient_checkpointing(self, training_args_kwargs: dict): + if self.cfg.gradient_checkpointing: + training_args_kwargs["gradient_checkpointing"] = ( + self.cfg.gradient_checkpointing + ) + if self.cfg.gradient_checkpointing_kwargs is not None: + training_args_kwargs["gradient_checkpointing_kwargs"] = ( + self.cfg.gradient_checkpointing_kwargs + ) + else: + training_args_kwargs["gradient_checkpointing_kwargs"] = { + "use_reentrant": False + } + + def _set_base_training_args( + self, total_num_steps + ) -> tuple[dict[str, Any], dict[str, Any]]: + training_args_kwargs: dict[str, Any] = {} + trainer_kwargs: dict[str, Any] = {} + + self._configure_warmup_and_logging(total_num_steps, training_args_kwargs) + self._configure_precision_settings(training_args_kwargs) + self._configure_save_and_eval_strategy(training_args_kwargs) + self._configure_gradient_checkpointing(training_args_kwargs) + + # set arg into trainer_args_kwargs with same name if value not None + for arg in [ + # optim/scheduler + "adam_beta1", + "adam_beta2", + "adam_beta3", + "adam_epsilon", + "adam_epsilon2", + "cosine_min_lr_ratio", + "cosine_constant_lr_ratio", + "optim_target_modules", + # trainer + "max_grad_norm", + "dataloader_num_workers", + "dataloader_pin_memory", + "dataloader_prefetch_factor", + "gradient_accumulation_steps", + "learning_rate", + "embedding_lr", + "embedding_lr_scale", + "lr_groups", + "loraplus_lr_ratio", + "loraplus_lr_embedding", + "output_dir", + "save_safetensors", + "save_only_model", + "include_tokens_per_second", + "weight_decay", + "seed", + ]: + if hasattr(self.cfg, arg) and getattr(self.cfg, arg) is not None: + training_args_kwargs[arg] = getattr(self.cfg, arg) + + training_args_kwargs["per_device_train_batch_size"] = self.cfg.micro_batch_size + + if self.cfg.eval_batch_size: + training_args_kwargs["per_device_eval_batch_size"] = ( + self.cfg.eval_batch_size + ) + + training_args_kwargs["max_steps"] = self.cfg.max_steps or total_num_steps or -1 + training_args_kwargs["num_train_epochs"] = self.cfg.num_epochs + + # max_length is not used in CausalTrainer + if self.cfg.reward_model or self.cfg.rl: + training_args_kwargs["max_length"] = self.cfg.sequence_len + + self._configure_reporting(training_args_kwargs) + self._configure_hub_parameters(training_args_kwargs) + self._configure_scheduler(training_args_kwargs) + self._configure_optimizer(training_args_kwargs, trainer_kwargs) + self._configure_torch_compile(training_args_kwargs) + + return training_args_kwargs, trainer_kwargs diff --git a/src/axolotl/core/builders/causal.py b/src/axolotl/core/builders/causal.py new file mode 100644 index 000000000..3b1d0b3c2 --- /dev/null +++ b/src/axolotl/core/builders/causal.py @@ -0,0 +1,487 @@ +"""Builder for causal trainers""" + +import inspect +import math +import os +from pathlib import Path +from typing import Type, Union + +import transformers +from transformers import ( + DataCollatorWithFlattening, + EarlyStoppingCallback, +) +from trl.trainer.utils import RewardDataCollatorWithPadding + +from axolotl.core.builders.base import TrainerBuilderBase +from axolotl.core.trainers import ( + AxolotlMambaTrainer, + AxolotlPRMTrainer, + AxolotlRewardTrainer, + AxolotlTrainer, + ReLoRATrainer, +) +from axolotl.core.training_args import ( + AxolotlPRMConfig, + AxolotlRewardConfig, + AxolotlTrainingArguments, +) +from axolotl.integrations.base import PluginManager +from axolotl.monkeypatch.multipack import SUPPORTED_MULTIPACK_MODEL_TYPES +from axolotl.monkeypatch.relora import ReLoRACallback +from axolotl.processing_strategies import get_processing_strategy +from axolotl.utils import is_comet_available, is_mlflow_available +from axolotl.utils.callbacks import ( + EvalFirstStepCallback, + LossWatchDogCallback, + SaveBetterTransformerModelCallback, + bench_eval_callback_factory, + causal_lm_bench_eval_callback_factory, + colab_inference_post_train_callback, + log_prediction_callback_factory, +) +from axolotl.utils.callbacks.lisa import lisa_callback_factory +from axolotl.utils.callbacks.qat import QATCallback +from axolotl.utils.chat_templates import get_chat_template_from_config +from axolotl.utils.collators import ( + BatchSamplerDataCollatorForSeq2Seq, + DataCollatorForSeq2Seq, + MambaDataCollator, + V2BatchSamplerDataCollatorForSeq2Seq, +) +from axolotl.utils.collators.mm_chat import MultiModalChatDataCollator +from axolotl.utils.logging import get_logger + +LOG = get_logger(__name__) + + +class HFCausalTrainerBuilder(TrainerBuilderBase): + """ + Build the HuggingFace training args/trainer for causal models and reward modeling + using TRL. + """ + + def get_callbacks(self): + callbacks = super().get_callbacks() + callbacks.append(EvalFirstStepCallback()) + + if self.cfg.relora_steps: + callbacks.append(ReLoRACallback(self.cfg)) + + if ( + hasattr(self.model, "use_bettertransformer") + and self.model.use_bettertransformer is True + ): + callbacks.append(SaveBetterTransformerModelCallback()) + + # TODO: check if can move to base class + if self.cfg.loss_watchdog_threshold is not None: + callbacks.append(LossWatchDogCallback(self.cfg)) + + if self.cfg.qat: + callbacks.append(QATCallback(self.cfg.qat)) + + return callbacks + + def get_post_trainer_create_callbacks(self, trainer): + callbacks = [] + if self.cfg.use_wandb and self.cfg.eval_table_size > 0: + LogPredictionCallback = log_prediction_callback_factory( + trainer, self.tokenizer, "wandb" + ) + callbacks.append(LogPredictionCallback(self.cfg)) + if ( + self.cfg.use_mlflow + and is_mlflow_available() + and self.cfg.eval_table_size > 0 + ): + LogPredictionCallback = log_prediction_callback_factory( + trainer, self.tokenizer, "mlflow" + ) + callbacks.append(LogPredictionCallback(self.cfg)) + if self.cfg.use_comet and is_comet_available() and self.cfg.eval_table_size > 0: + LogPredictionCallback = log_prediction_callback_factory( + trainer, self.tokenizer, "comet_ml" + ) + callbacks.append(LogPredictionCallback(self.cfg)) + + if self.cfg.do_bench_eval: + callbacks.append(bench_eval_callback_factory(trainer, self.tokenizer)) + if self.cfg.do_causal_lm_eval: + CausalLMBenchEvalCallback = causal_lm_bench_eval_callback_factory( + trainer, self.tokenizer + ) + callbacks.append(CausalLMBenchEvalCallback(self.cfg)) + + if self.cfg.early_stopping_patience: + early_stop_cb = EarlyStoppingCallback( + self.cfg.early_stopping_patience, + ) + callbacks.append(early_stop_cb) + + if self.cfg.lisa_step_interval and self.cfg.lisa_n_layers: + callbacks.append(lisa_callback_factory(trainer)) + + if any("COLAB_" in key for key in os.environ): + ColabCallback = colab_inference_post_train_callback(trainer) + callbacks.append(ColabCallback(self.cfg)) + + callbacks.extend(super().get_post_trainer_create_callbacks(trainer=trainer)) + return callbacks + + def _get_trainer_cls(self): + if self.cfg.plugins: + plugin_manager = PluginManager.get_instance() + trainer_cls = plugin_manager.get_trainer_cls(self.cfg) + if trainer_cls: + return trainer_cls + if self.cfg.relora_steps: + return ReLoRATrainer + if self.cfg.model_config_type == "mamba": + return AxolotlMambaTrainer + if self.cfg.reward_model: + return AxolotlRewardTrainer + if self.cfg.process_reward_model: + return AxolotlPRMTrainer + return AxolotlTrainer + + def build(self, total_num_steps): + training_arguments_kwargs, trainer_kwargs = self._set_base_training_args( + total_num_steps + ) + + if self.cfg.fsdp: + training_arguments_kwargs["fsdp"] = self.cfg.fsdp + if self.cfg.fsdp_config: + training_arguments_kwargs["fsdp_config"] = { + k.lstrip("fsdp_"): v for k, v in dict(self.cfg.fsdp_config).items() + } + + if self.cfg.adapter == "qlora": + training_arguments_kwargs["qlora"] = True + + # deepspeed + if self.cfg.deepspeed: + training_arguments_kwargs["deepspeed"] = self.cfg.deepspeed + + if self.cfg.lr_quadratic_warmup is not None: + training_arguments_kwargs["lr_quadratic_warmup"] = ( + self.cfg.lr_quadratic_warmup + ) + + if self.cfg.dataloader_drop_last is not None: + training_arguments_kwargs["dataloader_drop_last"] = ( + self.cfg.dataloader_drop_last + ) + elif self.cfg.sample_packing and self.cfg.eval_sample_packing is False: + training_arguments_kwargs["dataloader_drop_last"] = True + + if self.cfg.remove_unused_columns is not None: + training_arguments_kwargs["remove_unused_columns"] = ( + self.cfg.remove_unused_columns + ) + + if self.cfg.do_bench_eval: + training_arguments_kwargs["do_bench_eval"] = self.cfg.do_bench_eval + if self.cfg.bench_dataset: + training_arguments_kwargs["bench_dataset"] = self.cfg.bench_dataset + if self.cfg.do_causal_lm_eval: + training_arguments_kwargs["do_causal_lm_eval"] = self.cfg.do_causal_lm_eval + if self.cfg.metric_for_best_model: + training_arguments_kwargs["metric_for_best_model"] = ( + self.cfg.metric_for_best_model + ) + if self.cfg.greater_is_better: + training_arguments_kwargs["greater_is_better"] = self.cfg.greater_is_better + + # DDP Config + if self.cfg.ddp_timeout: + training_arguments_kwargs["ddp_timeout"] = self.cfg.ddp_timeout + # see https://pytorch.org/docs/stable/generated/torch.nn.parallel.DistributedDataParallel.html + if self.cfg.ddp_bucket_cap_mb: + training_arguments_kwargs["ddp_bucket_cap_mb"] = self.cfg.ddp_bucket_cap_mb + if self.cfg.ddp_broadcast_buffers is not None: + training_arguments_kwargs["ddp_broadcast_buffers"] = ( + self.cfg.ddp_broadcast_buffers + ) + + # these are all the "standard" kwargs that are def used + training_arguments_kwargs["max_seq_length"] = self.cfg.sequence_len + + if self.cfg.auto_find_batch_size is not None: + training_arguments_kwargs["auto_find_batch_size"] = ( + self.cfg.auto_find_batch_size + ) + + training_arguments_kwargs["eval_accumulation_steps"] = ( + self.cfg.gradient_accumulation_steps + ) + + training_arguments_kwargs["load_best_model_at_end"] = ( + ( + self.cfg.load_best_model_at_end is not False + or self.cfg.early_stopping_patience + ) + and ( + (not self.cfg.test_datasets and self.cfg.val_set_size > 0) + or (self.cfg.test_datasets and self.cfg.val_set_size == 0) + ) + and self.cfg.save_steps + and self.cfg.eval_steps + and self.cfg.save_steps % self.cfg.eval_steps == 0 + ) or False + + # handle ddp + ddp_find_unused_parameters = None + if self.cfg.ddp: + ddp_find_unused_parameters = bool(self.cfg.ddp_find_unused_parameters) + training_arguments_kwargs["ddp_find_unused_parameters"] = ( + ddp_find_unused_parameters + ) + + training_arguments_kwargs["group_by_length"] = self.cfg.group_by_length + training_arguments_kwargs["curriculum_sampling"] = self.cfg.curriculum_sampling + + training_arguments_kwargs["sample_packing"] = bool(self.cfg.sample_packing) + training_arguments_kwargs["multipack_real_batches"] = ( + not self.cfg.flash_attention or self.cfg.multipack_real_batches + ) + training_arguments_kwargs["eval_sample_packing"] = bool( + self.cfg.eval_sample_packing + ) + if self.cfg.sample_packing_bin_size is not None: + training_arguments_kwargs["sample_packing_bin_size"] = ( + self.cfg.sample_packing_bin_size + ) + if self.cfg.sample_packing_group_size is not None: + training_arguments_kwargs["sample_packing_group_size"] = ( + self.cfg.sample_packing_group_size + ) + if self.cfg.sample_packing_eff_est: + training_arguments_kwargs["sample_packing_efficiency"] = ( + self.cfg.sample_packing_eff_est + ) + + if self.cfg.relora_steps: + training_arguments_kwargs["relora_steps"] = self.cfg.relora_steps + training_arguments_kwargs["relora_warmup_steps"] = ( + self.cfg.relora_warmup_steps + ) + if self.cfg.relora_anneal_steps: + training_arguments_kwargs["relora_anneal_steps"] = ( + self.cfg.relora_anneal_steps + ) + if self.cfg.relora_prune_ratio: + training_arguments_kwargs["relora_prune_ratio"] = ( + self.cfg.relora_prune_ratio + ) + + if self.cfg.lisa_step_interval and self.cfg.lisa_n_layers: + training_arguments_kwargs["lisa_n_layers"] = self.cfg.lisa_n_layers + training_arguments_kwargs["lisa_step_interval"] = ( + self.cfg.lisa_step_interval + ) + training_arguments_kwargs["lisa_layers_attribute"] = ( + self.cfg.lisa_layers_attribute + ) + + training_arguments_kwargs = self.hook_pre_create_training_args( + training_arguments_kwargs + ) + training_arguments_kwargs["model_type"] = self.cfg.model_config_type + training_arguments_kwargs["pretraining"] = bool(self.cfg.pretraining_dataset) + if self.cfg.chat_template: + training_arguments_kwargs["chat_template"] = get_chat_template_from_config( + cfg=self.cfg, + tokenizer=self.tokenizer, + ) + + if self.cfg.neftune_noise_alpha is not None: + training_arguments_kwargs["neftune_noise_alpha"] = ( + self.cfg.neftune_noise_alpha + ) + + if self.cfg.accelerator_config: + training_arguments_kwargs["accelerator_config"] = ( + self.cfg.accelerator_config + ) + + if self.cfg.image_size: + training_arguments_kwargs["image_size"] = self.cfg.image_size + if self.cfg.image_resize_algorithm: + training_arguments_kwargs["image_resize_algorithm"] = ( + self.cfg.image_resize_algorithm + ) + if self.cfg.kd_ce_alpha is not None: + training_arguments_kwargs["kd_ce_alpha"] = self.cfg.kd_ce_alpha + if self.cfg.kd_alpha is not None: + training_arguments_kwargs["kd_alpha"] = self.cfg.kd_alpha + if self.cfg.kd_temperature is not None: + training_arguments_kwargs["kd_temperature"] = self.cfg.kd_temperature + if self.cfg.kd_zscore_base_temp is not None: + training_arguments_kwargs["kd_zscore_base_temp"] = ( + self.cfg.kd_zscore_base_temp + ) + if self.cfg.kd_top_k_before_softmax is not None: + training_arguments_kwargs["kd_top_k_before_softmax"] = ( + self.cfg.kd_top_k_before_softmax + ) + + if self.cfg.reward_model: + training_args_cls = AxolotlRewardConfig + elif self.cfg.process_reward_model: + training_args_cls = AxolotlPRMConfig + else: + training_args_cls = AxolotlTrainingArguments + training_args = training_args_cls( # pylint: disable=unexpected-keyword-arg + **training_arguments_kwargs, + ) + training_args = self.hook_post_create_training_args(training_args) + + # unset run_name so wandb sets up experiment names + if self.cfg.use_wandb and training_args.run_name == training_args.output_dir: + training_args.run_name = ( # pylint: disable=attribute-defined-outside-init + None + ) + + data_collator_kwargs = { + "padding": True, # True/"longest" is the default + } + multiple = 64 + if self.cfg.pad_to_sequence_len: + data_collator_kwargs["pad_to_multiple_of"] = multiple * math.ceil( + self.cfg.sequence_len / multiple + ) + else: + # A100 is best at 64, while others at 8. Let's use the larger so we don't have to check + # https://docs.nvidia.com/deeplearning/performance/dl-performance-matrix-multiplication/index.html + data_collator_kwargs["pad_to_multiple_of"] = multiple + + trainer_cls = self._get_trainer_cls() + + trainer_kwargs, trainer_cls = self.hook_pre_create_trainer( + trainer_kwargs, trainer_cls + ) + if eval_data_collator := self.build_collator( + training_args, is_eval=True, **data_collator_kwargs + ): + if not (self.cfg.reward_model or self.cfg.process_reward_model): + trainer_kwargs["eval_data_collator"] = eval_data_collator + if not (self.cfg.reward_model or self.cfg.process_reward_model): + trainer_kwargs["bench_data_collator"] = transformers.DataCollatorForSeq2Seq( + self.tokenizer, + return_tensors="pt", + **data_collator_kwargs, + ) + sig = inspect.signature(trainer_cls) + if "processing_class" in sig.parameters: + trainer_kwargs["processing_class"] = self.tokenizer + elif "tokenizer" in sig.parameters: + trainer_kwargs["tokenizer"] = self.tokenizer + if ( + not (trainer_cls in [AxolotlRewardTrainer, AxolotlPRMTrainer]) + and self.cfg.datasets is not None + ): + trainer_kwargs["dataset_tags"] = [ + d["path"] for d in self.cfg.datasets if not Path(d["path"]).is_dir() + ] + trainer = trainer_cls( + model=self.model, + train_dataset=self.train_dataset, + eval_dataset=self.eval_dataset, + args=training_args, + data_collator=self.build_collator(training_args, **data_collator_kwargs), + callbacks=self.get_callbacks(), + **trainer_kwargs, + ) + trainer = self.hook_post_create_trainer(trainer) + for callback in self.get_post_trainer_create_callbacks(trainer): + trainer.add_callback(callback) + + if self.cfg.deepspeed and self.cfg.sample_packing: + trainer.accelerator.state.deepspeed_plugin.deepspeed_config[ + "train_micro_batch_size_per_gpu" + ] = self.cfg.micro_batch_size + + return trainer + + def build_collator( + self, training_args: AxolotlTrainingArguments, is_eval=False, **kwargs + ): + if training_args.pretraining: + if ( + self.cfg.pretraining_sample_concatenation is False + or self.cfg.micro_batch_size > 1 + ): + return DataCollatorForSeq2Seq(self.tokenizer, **kwargs) + return None + + if self.cfg.model_config_type == "mamba": + return MambaDataCollator(tokenizer=self.tokenizer) + + use_batch_sampler_collator = False + if is_eval is False and training_args.sample_packing: + use_batch_sampler_collator = True + if is_eval and training_args.eval_sample_packing: + use_batch_sampler_collator = True + + collator: Type[ + Union[ + V2BatchSamplerDataCollatorForSeq2Seq, + BatchSamplerDataCollatorForSeq2Seq, + DataCollatorForSeq2Seq, + DataCollatorWithFlattening, + RewardDataCollatorWithPadding, + ] + ] + collator_args = [self.tokenizer] + if self.cfg.reward_model: + collator = RewardDataCollatorWithPadding + elif use_batch_sampler_collator: + # Use V2BatchSamplerDataCollatorForSeq2Seq for flex attention, + # supported multipack models, or non-flash-attention llama + if ( + self.cfg.flex_attention + or self.cfg.model_config_type in SUPPORTED_MULTIPACK_MODEL_TYPES + or ( + self.cfg.model_config_type in ["llama"] + and self.cfg.flash_attention is not True + ) + ): + collator = V2BatchSamplerDataCollatorForSeq2Seq + else: + collator = BatchSamplerDataCollatorForSeq2Seq + else: + if self.cfg.processor_type and self.processor: + collator = MultiModalChatDataCollator + kwargs["processing_strategy"] = get_processing_strategy( + self.processor, + training_args.chat_template, + self.cfg.chat_template, + image_size=training_args.image_size, + image_resize_algorithm=training_args.image_resize_algorithm, + ) + elif self.cfg.batch_flattening: + collator = DataCollatorWithFlattening + collator_args.pop(0) + kwargs.pop("pad_to_multiple_of", None) + kwargs.pop("padding", None) + elif self.cfg.kd_trainer: + from axolotl.integrations.kd.collator import ( + DataCollatorForKD, + KDBatchSamplerDataCollatorForSeq2Seq, + ) + + if self.cfg.sample_packing: + collator = KDBatchSamplerDataCollatorForSeq2Seq + else: + collator = DataCollatorForKD + else: + collator = DataCollatorForSeq2Seq + + kwargs["return_tensors"] = "pt" + + return collator( + *collator_args, + **kwargs, + ) diff --git a/src/axolotl/core/builders/rl.py b/src/axolotl/core/builders/rl.py new file mode 100644 index 000000000..14dbfa715 --- /dev/null +++ b/src/axolotl/core/builders/rl.py @@ -0,0 +1,246 @@ +"""Builder for RLHF trainers""" + +import inspect +from pathlib import Path + +from axolotl.core.builders.base import TrainerBuilderBase +from axolotl.core.trainers import ( + AxolotlCPOTrainer, + AxolotlKTOTrainer, + AxolotlORPOTrainer, +) +from axolotl.core.trainers.dpo import DPOStrategy +from axolotl.core.trainers.dpo.args import AxolotlDPOConfig +from axolotl.core.trainers.grpo import GRPOStrategy +from axolotl.core.training_args import ( + AxolotlCPOConfig, + AxolotlKTOConfig, + AxolotlORPOConfig, +) +from axolotl.integrations.base import PluginManager +from axolotl.loaders.utils import ensure_dtype +from axolotl.utils.logging import get_logger +from axolotl.utils.schemas.enums import RLType + +LOG = get_logger(__name__) + + +class HFRLTrainerBuilder(TrainerBuilderBase): + """Trainer factory class for TRL-based RLHF trainers (e.g. DPO)""" + + def get_callbacks(self): + callbacks = super().get_callbacks() + + return callbacks + + def get_post_trainer_create_callbacks(self, trainer): + callbacks = super().get_post_trainer_create_callbacks(trainer=trainer) + return callbacks + + def _get_trainer_cls(self, trainer_kwargs: dict): + """ + Returns trainer_cls and trainer_cls_args + """ + if self.cfg.plugins: + plugin_manager = PluginManager.get_instance() + trainer_cls = plugin_manager.get_trainer_cls(self.cfg) + trainer_cls_args = [] # type: ignore + + if trainer_cls is not None: + return trainer_cls, trainer_cls_args + + trainer_cls = None + trainer_cls_args = [self.model] + + if self.cfg.rl is RLType.GRPO: + trainer_cls = GRPOStrategy.get_trainer_class( + sequence_parallel=self.cfg.sequence_parallel_degree > 1 + ) + trainer_cls_args.extend(GRPOStrategy.set_trainer_args(self.cfg)) + + trainer_kwargs.update(GRPOStrategy.set_trainer_kwargs(self.cfg)) + + elif self.cfg.rl in [RLType.DPO, RLType.IPO]: + trainer_cls = DPOStrategy.get_trainer_class() + trainer_cls_args.append(self.model_ref) + + elif self.cfg.rl is RLType.ORPO: + trainer_cls = AxolotlORPOTrainer + elif self.cfg.rl is RLType.KTO: + trainer_cls = AxolotlKTOTrainer + elif self.cfg.rl is RLType.SIMPO: + trainer_cls = AxolotlCPOTrainer + else: + raise ValueError(f"Unsupported RL: {self.cfg.rl}") + + return trainer_cls, trainer_cls_args + + def _build_training_arguments(self, total_num_steps): + """ + Returns training_args and trainer_kwargs + """ + training_args_kwargs, trainer_kwargs = self._set_base_training_args( + total_num_steps=total_num_steps + ) + + if self.cfg.remove_unused_columns is not None: + training_args_kwargs["remove_unused_columns"] = ( + self.cfg.remove_unused_columns + ) + else: + training_args_kwargs["remove_unused_columns"] = False + + # only rlhf + if self.cfg.dataset_processes: + training_args_kwargs["dataset_num_proc"] = self.cfg.dataset_processes + + if self.cfg.trl and self.cfg.trl.beta is not None: + training_args_kwargs["beta"] = self.cfg.trl.beta + elif self.cfg.rl_beta is not None: + training_args_kwargs["beta"] = self.cfg.rl_beta + elif self.cfg.orpo_alpha is not None: + # trl does some odd mapping of alpha to beta to reuse the beta parameter ??? + training_args_kwargs["beta"] = self.cfg.orpo_alpha + + if self.cfg.rpo_alpha is not None: + training_args_kwargs["rpo_alpha"] = self.cfg.rpo_alpha + + if self.cfg.use_wandb: + training_args_kwargs["run_name"] = self.cfg.wandb_name + + training_args_cls = None + blocklist_args_kwargs = [] + if self.cfg.rl is RLType.SIMPO: + training_args_cls = AxolotlCPOConfig + training_args_kwargs["loss_type"] = "simpo" + training_args_kwargs["simpo_gamma"] = self.cfg.simpo_gamma + if self.cfg.cpo_alpha is not None: + training_args_kwargs["cpo_alpha"] = self.cfg.cpo_alpha + + elif self.cfg.rl is RLType.ORPO: + training_args_cls = AxolotlORPOConfig + if self.cfg.max_prompt_len: + training_args_kwargs["max_prompt_length"] = self.cfg.max_prompt_len + + elif self.cfg.rl is RLType.KTO: + training_args_cls = AxolotlKTOConfig + + training_args_kwargs["desirable_weight"] = ( + self.cfg.kto_desirable_weight or 1.0 + ) + training_args_kwargs["undesirable_weight"] = ( + self.cfg.kto_undesirable_weight or 1.0 + ) + + if self.cfg.max_prompt_len: + training_args_kwargs["max_prompt_length"] = self.cfg.max_prompt_len + + elif self.cfg.rl is RLType.GRPO: + training_args_cls = GRPOStrategy.get_training_args_class() + training_args_kwargs.update(GRPOStrategy.set_training_args_kwargs(self.cfg)) + blocklist_args_kwargs = GRPOStrategy.get_blocklist_args_kwargs() + + elif self.cfg.rl in [RLType.DPO, RLType.IPO]: + training_args_cls = AxolotlDPOConfig + if self.cfg.rl is RLType.IPO: + training_args_kwargs["loss_type"] = "ipo" + + # Not compatible with IPO + if self.cfg.rl is RLType.DPO and self.cfg.dpo_label_smoothing: + training_args_kwargs["label_smoothing"] = self.cfg.dpo_label_smoothing + + training_args_kwargs["max_completion_length"] = None + training_args_kwargs["max_prompt_length"] = self.cfg.sequence_len + training_args_kwargs["generate_during_eval"] = self.cfg.use_wandb + if self.cfg.dpo_use_weighting is not None: + training_args_kwargs["use_weighting"] = self.cfg.dpo_use_weighting + if self.cfg.dpo_use_logits_to_keep is not None: + training_args_kwargs["use_logits_to_keep"] = ( + self.cfg.dpo_use_logits_to_keep + ) + else: + raise ValueError(f"Unsupported RL: {self.cfg.rl}") + + for blocklist_key in blocklist_args_kwargs: + if blocklist_key in training_args_kwargs: + del training_args_kwargs[blocklist_key] + + training_args = training_args_cls( # pylint: disable=unexpected-keyword-arg + logging_first_step=True, + **training_args_kwargs, + ) + + # unset run_name so wandb sets up experiment names + if self.cfg.use_wandb and training_args.run_name == training_args.output_dir: + training_args.run_name = ( # pylint: disable=attribute-defined-outside-init + None + ) + + return training_args, trainer_kwargs + + def build(self, total_num_steps): + training_args, trainer_kwargs = self._build_training_arguments(total_num_steps) + + if self.eval_dataset: + trainer_kwargs["eval_dataset"] = self.eval_dataset + if self.cfg.adapter and self.peft_config and self.cfg.rl is not RLType.GRPO: + trainer_kwargs["peft_config"] = self.peft_config + if self.cfg.precompute_ref_log_probs is not None: + trainer_kwargs["precompute_ref_log_probs"] = ( + self.cfg.precompute_ref_log_probs + ) + + trainer_cls, trainer_cls_args = self._get_trainer_cls(trainer_kwargs) + + sig = inspect.signature(trainer_cls) + if "tokenizer" in sig.parameters: + trainer_kwargs["tokenizer"] = self.tokenizer + else: + trainer_kwargs["processing_class"] = self.tokenizer + + if self.cfg.datasets is not None and ( + trainer_cls is DPOStrategy.get_trainer_class() + ): + trainer_kwargs["dataset_tags"] = [ + d["path"] for d in self.cfg.datasets if not Path(d["path"]).is_dir() + ] + + trainer_kwargs, trainer_cls = self.hook_pre_create_trainer( + trainer_kwargs, trainer_cls + ) + + trainer = trainer_cls( + *trainer_cls_args, + args=training_args, + train_dataset=self.train_dataset, + callbacks=self.get_callbacks(), + **trainer_kwargs, + ) + if self.cfg.fsdp: + ensure_dtype(trainer.model, dtype=self.cfg.torch_dtype) + if self.cfg.rl in [RLType.DPO, RLType.IPO] and trainer.ref_model: + ensure_dtype(trainer.ref_model, dtype=self.cfg.torch_dtype) + + trainer = self.hook_post_create_trainer(trainer) + for callback in self.get_post_trainer_create_callbacks(trainer): + trainer.add_callback(callback) + + return trainer + + +class HFPPOTrainerBuilder(TrainerBuilderBase): + """ + HF Factory class for PPO Trainer + """ + + def get_callbacks(self): + callbacks = super().get_callbacks() + return callbacks + + def get_post_trainer_create_callbacks(self, trainer): + callbacks = super().get_post_trainer_create_callbacks(trainer=trainer) + return callbacks + + def build(self, total_num_steps): + # TODO: build PPOConfig + raise NotImplementedError("PPO trainer builder is not implemented yet.") diff --git a/src/axolotl/core/trainer_builder.py b/src/axolotl/core/trainer_builder.py deleted file mode 100755 index 46ec12ccb..000000000 --- a/src/axolotl/core/trainer_builder.py +++ /dev/null @@ -1,1252 +0,0 @@ -# Copyright 2024 Axolotl AI. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# pylint: disable=too-many-lines -"""Builder for the training args and trainer""" - -import abc -import importlib -import importlib.util -import inspect -import math -import os -import sys -from abc import abstractmethod -from pathlib import Path -from typing import List, Type, Union - -import torch -import transformers -from transformers import ( - DataCollatorWithFlattening, - EarlyStoppingCallback, - TrainerCallback, -) -from transformers.training_args import OptimizerNames -from trl.trainer.utils import RewardDataCollatorWithPadding - -from axolotl.core.trainers import ( - AxolotlCPOTrainer, - AxolotlKTOTrainer, - AxolotlMambaTrainer, - AxolotlORPOTrainer, - AxolotlPRMTrainer, - AxolotlRewardTrainer, - AxolotlTrainer, - ReLoRATrainer, -) -from axolotl.core.trainers.dpo import DPOStrategy -from axolotl.core.trainers.dpo.args import AxolotlDPOConfig -from axolotl.core.trainers.grpo import GRPOStrategy -from axolotl.core.training_args import ( - AxolotlCPOConfig, - AxolotlKTOConfig, - AxolotlORPOConfig, - AxolotlPRMConfig, - AxolotlRewardConfig, - AxolotlTrainingArguments, -) -from axolotl.integrations.base import PluginManager -from axolotl.loaders.utils import ensure_dtype -from axolotl.monkeypatch.multipack import SUPPORTED_MULTIPACK_MODEL_TYPES -from axolotl.monkeypatch.relora import ReLoRACallback -from axolotl.monkeypatch.trainer.lr import patch_trainer_get_lr -from axolotl.processing_strategies import get_processing_strategy -from axolotl.utils import is_comet_available, is_mlflow_available -from axolotl.utils.callbacks import ( - EvalFirstStepCallback, - GCCallback, - GPUStatsCallback, - LossWatchDogCallback, - SaveAxolotlConfigtoWandBCallback, - SaveBetterTransformerModelCallback, - bench_eval_callback_factory, - causal_lm_bench_eval_callback_factory, - colab_inference_post_train_callback, - log_prediction_callback_factory, -) -from axolotl.utils.callbacks.lisa import lisa_callback_factory -from axolotl.utils.callbacks.profiler import PytorchProfilerCallback -from axolotl.utils.callbacks.qat import QATCallback -from axolotl.utils.chat_templates import get_chat_template_from_config -from axolotl.utils.collators import ( - BatchSamplerDataCollatorForSeq2Seq, - DataCollatorForSeq2Seq, - MambaDataCollator, - V2BatchSamplerDataCollatorForSeq2Seq, -) -from axolotl.utils.collators.mm_chat import MultiModalChatDataCollator -from axolotl.utils.logging import get_logger -from axolotl.utils.schemas.enums import CustomSupportedOptimizers, RLType - -try: - import torch._dynamo # pylint: disable=ungrouped-imports -except ImportError: - pass - -LOG = get_logger(__name__) - - -class TrainerBuilderBase(abc.ABC): - """Base class for trainer builder.""" - - _train_dataset = None - _eval_dataset = None - _model_ref = None - _peft_config = None - - def __init__(self, cfg, model, tokenizer, processor=None): - self.cfg = cfg - self.model = model - self.tokenizer = tokenizer - self.processor = processor - - # If the model supports tagging, add the axolotl tag. - # This makes sure the tag is correctly pushed even if a user calls - # model.push_to_hub instead of trainer.push_to_hub. - if hasattr(model, "add_model_tags"): - model.add_model_tags(["axolotl"]) - - patch_trainer_get_lr() - - @property - def model_ref(self): - return self._model_ref - - @model_ref.setter - def model_ref(self, model): - self._model_ref = model - - @property - def train_dataset(self): - return self._train_dataset - - @train_dataset.setter - def train_dataset(self, dataset): - self._train_dataset = dataset - - @property - def eval_dataset(self): - return self._eval_dataset - - @eval_dataset.setter - def eval_dataset(self, dataset): - self._eval_dataset = dataset - - @property - def peft_config(self): - return self._peft_config - - @peft_config.setter - def peft_config(self, peft_config): - self._peft_config = peft_config - - @abstractmethod - def build(self, total_num_steps): - pass - - def get_callbacks(self) -> List[TrainerCallback]: - callbacks = [] - - plugin_manager = PluginManager.get_instance() - callbacks.extend( - plugin_manager.add_callbacks_pre_trainer(cfg=self.cfg, model=self.model) - ) - - if self.cfg.profiler_steps: - callbacks.append( - PytorchProfilerCallback( - steps_to_profile=self.cfg.profiler_steps, - ) - ) - - if self.cfg.gc_steps: - callbacks.append(GCCallback(gc_steps=self.cfg.gc_steps)) - - if self.cfg.use_wandb: - callbacks.append( - SaveAxolotlConfigtoWandBCallback(self.cfg.axolotl_config_path) - ) - if self.cfg.use_mlflow and is_mlflow_available(): - from axolotl.utils.callbacks.mlflow_ import ( - SaveAxolotlConfigtoMlflowCallback, - ) - - callbacks.extend( - [ - SaveAxolotlConfigtoMlflowCallback(self.cfg.axolotl_config_path), - ] - ) - if self.cfg.use_comet and is_comet_available(): - from axolotl.utils.callbacks.comet_ import SaveAxolotlConfigtoCometCallback - - callbacks.append( - SaveAxolotlConfigtoCometCallback(self.cfg.axolotl_config_path) - ) - - return callbacks - - def get_post_trainer_create_callbacks(self, trainer): - """ - Callbacks added after the trainer is created, usually b/c these need access to the trainer - """ - callbacks = [] - if self.cfg.plugins: - plugin_manager = PluginManager.get_instance() - callbacks.extend( - [ - cb - for cb in plugin_manager.add_callbacks_post_trainer( - self.cfg, trainer - ) - if cb - ] - ) - return callbacks - - def hook_pre_create_training_args(self, training_arguments_kwargs): - # TODO - return training_arguments_kwargs - - def hook_post_create_training_args(self, training_arguments): - # TODO - return training_arguments - - def hook_pre_create_trainer(self, trainer_kwargs, trainer_cls): - # TODO - return trainer_kwargs, trainer_cls - - def hook_post_create_trainer(self, trainer): - # TODO - return trainer - - -class HFCausalTrainerBuilder(TrainerBuilderBase): - """ - Build the HuggingFace training args/trainer for causal models and reward modeling - using TRL. - """ - - def get_callbacks(self): - callbacks = super().get_callbacks() - callbacks.append(GPUStatsCallback(self.cfg)) - callbacks.append(EvalFirstStepCallback()) - - if self.cfg.relora_steps: - callbacks.append(ReLoRACallback(self.cfg)) - - if ( - hasattr(self.model, "use_bettertransformer") - and self.model.use_bettertransformer is True - ): - callbacks.append(SaveBetterTransformerModelCallback()) - - if self.cfg.loss_watchdog_threshold is not None: - callbacks.append(LossWatchDogCallback(self.cfg)) - - if self.cfg.qat: - callbacks.append(QATCallback(self.cfg.qat)) - - return callbacks - - def get_post_trainer_create_callbacks(self, trainer): - callbacks = [] - if self.cfg.use_wandb and self.cfg.eval_table_size > 0: - LogPredictionCallback = log_prediction_callback_factory( - trainer, self.tokenizer, "wandb" - ) - callbacks.append(LogPredictionCallback(self.cfg)) - if ( - self.cfg.use_mlflow - and is_mlflow_available() - and self.cfg.eval_table_size > 0 - ): - LogPredictionCallback = log_prediction_callback_factory( - trainer, self.tokenizer, "mlflow" - ) - callbacks.append(LogPredictionCallback(self.cfg)) - if self.cfg.use_comet and is_comet_available() and self.cfg.eval_table_size > 0: - LogPredictionCallback = log_prediction_callback_factory( - trainer, self.tokenizer, "comet_ml" - ) - callbacks.append(LogPredictionCallback(self.cfg)) - - if self.cfg.do_bench_eval: - callbacks.append(bench_eval_callback_factory(trainer, self.tokenizer)) - if self.cfg.do_causal_lm_eval: - CausalLMBenchEvalCallback = causal_lm_bench_eval_callback_factory( - trainer, self.tokenizer - ) - callbacks.append(CausalLMBenchEvalCallback(self.cfg)) - - if self.cfg.early_stopping_patience: - early_stop_cb = EarlyStoppingCallback( - self.cfg.early_stopping_patience, - ) - callbacks.append(early_stop_cb) - - if self.cfg.lisa_step_interval and self.cfg.lisa_n_layers: - callbacks.append(lisa_callback_factory(trainer)) - - if any("COLAB_" in key for key in os.environ): - ColabCallback = colab_inference_post_train_callback(trainer) - callbacks.append(ColabCallback(self.cfg)) - - callbacks.extend(super().get_post_trainer_create_callbacks(trainer=trainer)) - return callbacks - - def _get_trainer_cls(self): - if self.cfg.plugins: - plugin_manager = PluginManager.get_instance() - trainer_cls = plugin_manager.get_trainer_cls(self.cfg) - if trainer_cls: - return trainer_cls - if self.cfg.relora_steps: - return ReLoRATrainer - if self.cfg.model_config_type == "mamba": - return AxolotlMambaTrainer - if self.cfg.reward_model: - return AxolotlRewardTrainer - if self.cfg.process_reward_model: - return AxolotlPRMTrainer - return AxolotlTrainer - - def build(self, total_num_steps): - warmup_steps = None - if self.cfg.warmup_steps is not None: - warmup_steps = self.cfg.warmup_steps - elif self.cfg.warmup_ratio is not None: - warmup_steps = max(int(self.cfg.warmup_ratio * total_num_steps), 0) - else: - warmup_steps = min(int(0.03 * total_num_steps), 100) - if warmup_steps == 1: - warmup_steps = 2 - - logging_steps = ( - self.cfg.logging_steps - if self.cfg.logging_steps is not None - else max(min(int(0.005 * total_num_steps), 10), 1) - ) - - training_arguments_kwargs = {} - - if self.cfg.include_tokens_per_second is not None: - training_arguments_kwargs["include_tokens_per_second"] = ( - self.cfg.include_tokens_per_second - ) - - if self.cfg.bf16 == "full": - training_arguments_kwargs["bf16_full_eval"] = True - else: - training_arguments_kwargs["bf16"] = self.cfg.bf16 - training_arguments_kwargs["fp16"] = ( - self.cfg.fp16 and not self.cfg.bf16 - ) or False - training_arguments_kwargs["tf32"] = self.cfg.tf32 - training_arguments_kwargs["warmup_steps"] = warmup_steps - training_arguments_kwargs["logging_steps"] = logging_steps - - if self.cfg.seed is not None: - training_arguments_kwargs["seed"] = self.cfg.seed - - if self.cfg.gradient_checkpointing: - training_arguments_kwargs["gradient_checkpointing"] = ( - self.cfg.gradient_checkpointing - ) - if self.cfg.gradient_checkpointing_kwargs is not None: - training_arguments_kwargs["gradient_checkpointing_kwargs"] = ( - self.cfg.gradient_checkpointing_kwargs - ) - if self.cfg.fsdp: - training_arguments_kwargs["fsdp"] = self.cfg.fsdp - if self.cfg.fsdp_config: - training_arguments_kwargs["fsdp_config"] = { - k.lstrip("fsdp_"): v for k, v in dict(self.cfg.fsdp_config).items() - } - - if self.cfg.adapter == "qlora": - training_arguments_kwargs["qlora"] = True - - # deepspeed - if self.cfg.deepspeed: - training_arguments_kwargs["deepspeed"] = self.cfg.deepspeed - - if self.cfg.lr_quadratic_warmup is not None: - training_arguments_kwargs["lr_quadratic_warmup"] = ( - self.cfg.lr_quadratic_warmup - ) - - if self.cfg.adam_beta1: - training_arguments_kwargs["adam_beta1"] = self.cfg.adam_beta1 - if self.cfg.adam_beta2: - training_arguments_kwargs["adam_beta2"] = self.cfg.adam_beta2 - if self.cfg.adam_beta3: - training_arguments_kwargs["adam_beta3"] = self.cfg.adam_beta3 - if self.cfg.adam_epsilon: - training_arguments_kwargs["adam_epsilon"] = self.cfg.adam_epsilon - if self.cfg.adam_epsilon2: - training_arguments_kwargs["adam_epsilon2"] = self.cfg.adam_epsilon2 - if self.cfg.max_grad_norm: - training_arguments_kwargs["max_grad_norm"] = self.cfg.max_grad_norm - - if self.cfg.hub_model_id: - training_arguments_kwargs["hub_model_id"] = self.cfg.hub_model_id - training_arguments_kwargs["push_to_hub"] = True - training_arguments_kwargs["hub_private_repo"] = True - training_arguments_kwargs["hub_always_push"] = True - - if self.cfg.hub_strategy: - training_arguments_kwargs["hub_strategy"] = self.cfg.hub_strategy - - if self.cfg.save_safetensors is not None: - training_arguments_kwargs["save_safetensors"] = self.cfg.save_safetensors - - if self.cfg.dataloader_pin_memory is not None: - training_arguments_kwargs["dataloader_pin_memory"] = ( - self.cfg.dataloader_pin_memory - ) - if self.cfg.dataloader_num_workers is not None: - training_arguments_kwargs["dataloader_num_workers"] = ( - self.cfg.dataloader_num_workers - ) - if self.cfg.dataloader_prefetch_factor is not None: - training_arguments_kwargs["dataloader_prefetch_factor"] = ( - self.cfg.dataloader_prefetch_factor - ) - if self.cfg.dataloader_drop_last is not None: - training_arguments_kwargs["dataloader_drop_last"] = ( - self.cfg.dataloader_drop_last - ) - elif self.cfg.sample_packing and self.cfg.eval_sample_packing is False: - training_arguments_kwargs["dataloader_drop_last"] = True - - if self.cfg.remove_unused_columns is not None: - training_arguments_kwargs["remove_unused_columns"] = ( - self.cfg.remove_unused_columns - ) - - if not self.cfg.test_datasets and self.cfg.val_set_size == 0: - # no eval set, so don't eval - training_arguments_kwargs["eval_strategy"] = "no" - elif self.cfg.eval_steps: - training_arguments_kwargs["eval_strategy"] = "steps" - training_arguments_kwargs["eval_steps"] = self.cfg.eval_steps - elif self.cfg.eval_strategy: - training_arguments_kwargs["eval_strategy"] = self.cfg.eval_strategy - else: - # we have an eval set, but no steps defined, default to use epoch - training_arguments_kwargs["eval_strategy"] = "epoch" - - if self.cfg.save_steps: - training_arguments_kwargs["save_strategy"] = "steps" - training_arguments_kwargs["save_steps"] = self.cfg.save_steps - elif self.cfg.save_strategy: - training_arguments_kwargs["save_strategy"] = self.cfg.save_strategy - else: - # default to saving each epoch if not defined - training_arguments_kwargs["save_strategy"] = "epoch" - - training_arguments_kwargs["save_only_model"] = self.cfg.save_only_model - - if self.cfg.do_bench_eval: - training_arguments_kwargs["do_bench_eval"] = self.cfg.do_bench_eval - if self.cfg.bench_dataset: - training_arguments_kwargs["bench_dataset"] = self.cfg.bench_dataset - if self.cfg.do_causal_lm_eval: - training_arguments_kwargs["do_causal_lm_eval"] = self.cfg.do_causal_lm_eval - if self.cfg.metric_for_best_model: - training_arguments_kwargs["metric_for_best_model"] = ( - self.cfg.metric_for_best_model - ) - if self.cfg.greater_is_better: - training_arguments_kwargs["greater_is_better"] = self.cfg.greater_is_better - - if self.cfg.torch_compile: - if torch.__version__ < "2.1.0": # pylint: disable=protected-access - LOG.warning("torch>=2.1.0 required for torch_compile to work properly") - elif torch._dynamo: # pylint: disable=protected-access - torch._dynamo.config.suppress_errors = ( # pylint: disable=protected-access - True - ) - training_arguments_kwargs["torch_compile"] = self.cfg.torch_compile - if self.cfg.torch_compile_backend: - training_arguments_kwargs["torch_compile_backend"] = ( - self.cfg.torch_compile_backend - ) - if self.cfg.torch_compile_mode: - training_arguments_kwargs["torch_compile_mode"] = ( - self.cfg.torch_compile_mode - ) - - # DDP Config - if self.cfg.ddp_timeout: - training_arguments_kwargs["ddp_timeout"] = self.cfg.ddp_timeout - # see https://pytorch.org/docs/stable/generated/torch.nn.parallel.DistributedDataParallel.html - if self.cfg.ddp_bucket_cap_mb: - training_arguments_kwargs["ddp_bucket_cap_mb"] = self.cfg.ddp_bucket_cap_mb - if self.cfg.ddp_broadcast_buffers is not None: - training_arguments_kwargs["ddp_broadcast_buffers"] = ( - self.cfg.ddp_broadcast_buffers - ) - - # these are all the "standard" kwargs that are def used - training_arguments_kwargs["max_steps"] = ( - self.cfg.max_steps if self.cfg.max_steps else -1 - ) - training_arguments_kwargs["max_seq_length"] = self.cfg.sequence_len - training_arguments_kwargs["per_device_train_batch_size"] = ( - self.cfg.micro_batch_size - ) - if self.cfg.eval_batch_size: - training_arguments_kwargs["per_device_eval_batch_size"] = ( - self.cfg.eval_batch_size - ) - if self.cfg.auto_find_batch_size is not None: - training_arguments_kwargs["auto_find_batch_size"] = ( - self.cfg.auto_find_batch_size - ) - training_arguments_kwargs["gradient_accumulation_steps"] = ( - self.cfg.gradient_accumulation_steps - ) - training_arguments_kwargs["eval_accumulation_steps"] = ( - self.cfg.gradient_accumulation_steps - ) - training_arguments_kwargs["num_train_epochs"] = self.cfg.num_epochs - training_arguments_kwargs["learning_rate"] = self.cfg.learning_rate - training_arguments_kwargs["output_dir"] = self.cfg.output_dir - training_arguments_kwargs["save_total_limit"] = ( - self.cfg.save_total_limit if self.cfg.save_total_limit else 4 - ) - training_arguments_kwargs["load_best_model_at_end"] = ( - ( - self.cfg.load_best_model_at_end is not False - or self.cfg.early_stopping_patience - ) - and ( - (not self.cfg.test_datasets and self.cfg.val_set_size > 0) - or (self.cfg.test_datasets and self.cfg.val_set_size == 0) - ) - and self.cfg.save_steps - and self.cfg.eval_steps - and self.cfg.save_steps % self.cfg.eval_steps == 0 - ) or False - - # handle ddp - ddp_find_unused_parameters = None - if self.cfg.ddp: - ddp_find_unused_parameters = bool(self.cfg.ddp_find_unused_parameters) - training_arguments_kwargs["ddp_find_unused_parameters"] = ( - ddp_find_unused_parameters - ) - - training_arguments_kwargs["group_by_length"] = self.cfg.group_by_length - training_arguments_kwargs["curriculum_sampling"] = self.cfg.curriculum_sampling - report_to = [] - if self.cfg.use_wandb: - report_to.append("wandb") - if self.cfg.use_mlflow: - report_to.append("mlflow") - if self.cfg.use_tensorboard: - report_to.append("tensorboard") - if self.cfg.use_comet: - report_to.append("comet_ml") - - training_arguments_kwargs["report_to"] = report_to - if self.cfg.use_wandb: - training_arguments_kwargs["run_name"] = self.cfg.wandb_name - elif self.cfg.use_mlflow: - training_arguments_kwargs["run_name"] = self.cfg.mlflow_run_name - else: - training_arguments_kwargs["run_name"] = None - - if self.cfg.lr_scheduler in ["one_cycle", "rex", "log_sweep"]: - training_arguments_kwargs["lr_scheduler_type"] = "cosine" - training_arguments_kwargs["alternate_lr_scheduler_type"] = ( - self.cfg.lr_scheduler - ) - else: - training_arguments_kwargs["lr_scheduler_type"] = ( - self.cfg.lr_scheduler if self.cfg.lr_scheduler else "cosine" - ) - training_arguments_kwargs["lr_scheduler_kwargs"] = ( - self.cfg.lr_scheduler_kwargs if self.cfg.lr_scheduler_kwargs else {} - ) - training_arguments_kwargs["cosine_min_lr_ratio"] = self.cfg.cosine_min_lr_ratio - training_arguments_kwargs["cosine_constant_lr_ratio"] = ( - self.cfg.cosine_constant_lr_ratio - ) - training_arguments_kwargs["weight_decay"] = ( - self.cfg.weight_decay if self.cfg.weight_decay is not None else 0.0 - ) - - training_arguments_kwargs["sample_packing"] = bool(self.cfg.sample_packing) - training_arguments_kwargs["multipack_real_batches"] = ( - not self.cfg.flash_attention or self.cfg.multipack_real_batches - ) - training_arguments_kwargs["eval_sample_packing"] = bool( - self.cfg.eval_sample_packing - ) - if self.cfg.sample_packing_bin_size is not None: - training_arguments_kwargs["sample_packing_bin_size"] = ( - self.cfg.sample_packing_bin_size - ) - if self.cfg.sample_packing_group_size is not None: - training_arguments_kwargs["sample_packing_group_size"] = ( - self.cfg.sample_packing_group_size - ) - if self.cfg.sample_packing_eff_est: - training_arguments_kwargs["sample_packing_efficiency"] = ( - self.cfg.sample_packing_eff_est - ) - - if self.cfg.relora_steps: - training_arguments_kwargs["relora_steps"] = self.cfg.relora_steps - training_arguments_kwargs["relora_warmup_steps"] = ( - self.cfg.relora_warmup_steps - ) - if self.cfg.relora_anneal_steps: - training_arguments_kwargs["relora_anneal_steps"] = ( - self.cfg.relora_anneal_steps - ) - if self.cfg.relora_prune_ratio: - training_arguments_kwargs["relora_prune_ratio"] = ( - self.cfg.relora_prune_ratio - ) - - if self.cfg.lisa_step_interval and self.cfg.lisa_n_layers: - training_arguments_kwargs["lisa_n_layers"] = self.cfg.lisa_n_layers - training_arguments_kwargs["lisa_step_interval"] = ( - self.cfg.lisa_step_interval - ) - training_arguments_kwargs["lisa_layers_attribute"] = ( - self.cfg.lisa_layers_attribute - ) - - training_arguments_kwargs = self.hook_pre_create_training_args( - training_arguments_kwargs - ) - training_arguments_kwargs["model_type"] = self.cfg.model_config_type - training_arguments_kwargs["pretraining"] = bool(self.cfg.pretraining_dataset) - if self.cfg.chat_template: - training_arguments_kwargs["chat_template"] = get_chat_template_from_config( - cfg=self.cfg, - tokenizer=self.tokenizer, - ) - - if self.cfg.neftune_noise_alpha is not None: - training_arguments_kwargs["neftune_noise_alpha"] = ( - self.cfg.neftune_noise_alpha - ) - - trainer_kwargs = {} - - if self.cfg.reward_model: - training_arguments_kwargs["max_length"] = self.cfg.sequence_len - - # Handle custom optimizer - custom_supported_optimizers = [opt.value for opt in CustomSupportedOptimizers] - if self.cfg.optimizer in custom_supported_optimizers: - # Common optimizer kwargs - optimizer_kwargs = { - "lr": training_arguments_kwargs.get("learning_rate"), - "weight_decay": training_arguments_kwargs.get("weight_decay"), - } - - # Adam-specific kwargs - adam_kwargs = {} - if training_arguments_kwargs.get( - "adam_beta1" - ) and training_arguments_kwargs.get("adam_beta2"): - adam_kwargs["betas"] = ( - training_arguments_kwargs.get("adam_beta1"), - training_arguments_kwargs.get("adam_beta2"), - ) - if training_arguments_kwargs.get("adam_epsilon"): - adam_kwargs["eps"] = training_arguments_kwargs.get("adam_epsilon") - - if self.cfg.optimizer == "muon": - from axolotl.contribs.mit.muon import ( # pylint: disable=no-name-in-module - MuonOptimizerFactory, - ) - - optimizer_cls = MuonOptimizerFactory - optimizer_kwargs.update(adam_kwargs) - elif self.cfg.optimizer == "optimi_adamw": - from optimi import AdamW - - optimizer_kwargs["foreach"] = False - optimizer_cls = AdamW - optimizer_kwargs.update(adam_kwargs) - elif self.cfg.optimizer == "ao_adamw_4bit": - # TODO remove 20250401 - from torchao.prototype.low_bit_optim import AdamW4bit - - optimizer_cls = AdamW4bit - optimizer_kwargs.update(adam_kwargs) - - LOG.warning( - f"`ao_adamw_4bit` will be deprecated soon. Please use `{OptimizerNames.ADAMW_TORCH_4BIT}` instead." - ) - elif self.cfg.optimizer == "ao_adamw_8bit": - from torchao.prototype.low_bit_optim import AdamW8bit - - optimizer_cls = AdamW8bit - optimizer_kwargs.update(adam_kwargs) - elif self.cfg.optimizer == "ao_adamw_fp8": - from torchao.prototype.low_bit_optim import AdamWFp8 - - optimizer_cls = AdamWFp8 - optimizer_kwargs.update(adam_kwargs) - elif self.cfg.optimizer == "adopt_adamw": - from axolotl.utils.optimizers.adopt import ADOPT - - optimizer_cls = ADOPT - adam_kwargs["decouple"] = True - optimizer_kwargs.update(adam_kwargs) - elif self.cfg.optimizer == "came_pytorch": - from came_pytorch import CAME - - optimizer_cls = CAME - - beta1 = training_arguments_kwargs.get("adam_beta1", 0.9) - beta2 = training_arguments_kwargs.get("adam_beta2", 0.999) - beta3 = training_arguments_kwargs.get("adam_beta3", 0.9999) - eps1 = training_arguments_kwargs.get("adam_epsilon", 1e-30) - eps2 = training_arguments_kwargs.get("adam_epsilon2", 1e-16) - adam_kwargs["betas"] = (beta1, beta2, beta3) - adam_kwargs["eps"] = (eps1, eps2) - - optimizer_kwargs.update(adam_kwargs) - - # Parse any additional optimizer args from config - if self.cfg.optim_args: - if isinstance(self.cfg.optim_args, dict): - optimizer_kwargs.update(self.cfg.optim_args) - else: - # Parse string format "key1=value1,key2=value2" - for mapping in self.cfg.optim_args.replace(" ", "").split(","): - key, value = mapping.split("=") - optimizer_kwargs[key] = value - - trainer_kwargs["optimizer_cls_and_kwargs"] = ( - optimizer_cls, - optimizer_kwargs, - ) - else: - # Use transformers' optimizer - training_arguments_kwargs["optim"] = self.cfg.optimizer - - # Parse any additional optimizer args from config - if self.cfg.optim_args: - if isinstance(self.cfg.optim_args, dict): - optim_args = ",".join( - [f"{key}={value}" for key, value in self.cfg.optim_args.items()] - ) - else: - optim_args = self.cfg.optim_args - training_arguments_kwargs["optim_args"] = optim_args - - if self.cfg.optimizer == "adamw_anyprecision": - if Path(self.cfg.torchdistx_path).exists(): - sys.path.append(self.cfg.torchdistx_path) - importlib.import_module("torchdistx") - - if self.cfg.optim_target_modules: - training_arguments_kwargs["optim_target_modules"] = ( - self.cfg.optim_target_modules - ) - - training_arguments_kwargs["embedding_lr"] = self.cfg.embedding_lr - training_arguments_kwargs["embedding_lr_scale"] = self.cfg.embedding_lr_scale - - training_arguments_kwargs["loraplus_lr_ratio"] = self.cfg.loraplus_lr_ratio - training_arguments_kwargs["loraplus_lr_embedding"] = ( - self.cfg.loraplus_lr_embedding - ) - training_arguments_kwargs["lr_groups"] = self.cfg.lr_groups - - if self.cfg.accelerator_config: - training_arguments_kwargs["accelerator_config"] = ( - self.cfg.accelerator_config - ) - - if self.cfg.image_size: - training_arguments_kwargs["image_size"] = self.cfg.image_size - if self.cfg.image_resize_algorithm: - training_arguments_kwargs["image_resize_algorithm"] = ( - self.cfg.image_resize_algorithm - ) - if self.cfg.kd_ce_alpha is not None: - training_arguments_kwargs["kd_ce_alpha"] = self.cfg.kd_ce_alpha - if self.cfg.kd_alpha is not None: - training_arguments_kwargs["kd_alpha"] = self.cfg.kd_alpha - if self.cfg.kd_temperature is not None: - training_arguments_kwargs["kd_temperature"] = self.cfg.kd_temperature - if self.cfg.kd_zscore_base_temp is not None: - training_arguments_kwargs["kd_zscore_base_temp"] = ( - self.cfg.kd_zscore_base_temp - ) - if self.cfg.kd_top_k_before_softmax is not None: - training_arguments_kwargs["kd_top_k_before_softmax"] = ( - self.cfg.kd_top_k_before_softmax - ) - - if self.cfg.reward_model: - training_args_cls = AxolotlRewardConfig - elif self.cfg.process_reward_model: - training_args_cls = AxolotlPRMConfig - else: - training_args_cls = AxolotlTrainingArguments - training_args = training_args_cls( # pylint: disable=unexpected-keyword-arg - **training_arguments_kwargs, - ) - training_args = self.hook_post_create_training_args(training_args) - - # unset run_name so wandb sets up experiment names - if self.cfg.use_wandb and training_args.run_name == training_args.output_dir: - training_args.run_name = ( # pylint: disable=attribute-defined-outside-init - None - ) - - data_collator_kwargs = { - "padding": True, # True/"longest" is the default - } - multiple = 64 - if self.cfg.pad_to_sequence_len: - data_collator_kwargs["pad_to_multiple_of"] = multiple * math.ceil( - self.cfg.sequence_len / multiple - ) - else: - # A100 is best at 64, while others at 8. Let's use the larger so we don't have to check - # https://docs.nvidia.com/deeplearning/performance/dl-performance-matrix-multiplication/index.html - data_collator_kwargs["pad_to_multiple_of"] = multiple - - if self.cfg.reward_model: - data_collator_kwargs["max_length"] = self.cfg.sequence_len - - trainer_cls = self._get_trainer_cls() - trainer_kwargs, trainer_cls = self.hook_pre_create_trainer( - trainer_kwargs, trainer_cls - ) - if eval_data_collator := self.build_collator( - training_args, is_eval=True, **data_collator_kwargs - ): - if not (self.cfg.reward_model or self.cfg.process_reward_model): - trainer_kwargs["eval_data_collator"] = eval_data_collator - if not (self.cfg.reward_model or self.cfg.process_reward_model): - trainer_kwargs["bench_data_collator"] = transformers.DataCollatorForSeq2Seq( - self.tokenizer, - return_tensors="pt", - **data_collator_kwargs, - ) - sig = inspect.signature(trainer_cls) - if "processing_class" in sig.parameters.keys(): - trainer_kwargs["processing_class"] = self.tokenizer - else: - trainer_kwargs["tokenizer"] = self.tokenizer - if ( - not (trainer_cls in [AxolotlRewardTrainer, AxolotlPRMTrainer]) - and self.cfg.datasets is not None - ): - trainer_kwargs["dataset_tags"] = [ - d["path"] for d in self.cfg.datasets if not Path(d["path"]).is_dir() - ] - trainer = trainer_cls( - model=self.model, - train_dataset=self.train_dataset, - eval_dataset=self.eval_dataset, - args=training_args, - data_collator=self.build_collator(training_args, **data_collator_kwargs), - callbacks=self.get_callbacks(), - **trainer_kwargs, - ) - trainer = self.hook_post_create_trainer(trainer) - for callback in self.get_post_trainer_create_callbacks(trainer): - trainer.add_callback(callback) - - if self.cfg.deepspeed and self.cfg.sample_packing: - trainer.accelerator.state.deepspeed_plugin.deepspeed_config[ - "train_micro_batch_size_per_gpu" - ] = self.cfg.micro_batch_size - - return trainer - - def build_collator( - self, training_args: AxolotlTrainingArguments, is_eval=False, **kwargs - ): - if training_args.pretraining: - if ( - self.cfg.pretraining_sample_concatenation is False - or self.cfg.micro_batch_size > 1 - ): - return DataCollatorForSeq2Seq(self.tokenizer, **kwargs) - return None - - if self.cfg.model_config_type == "mamba": - return MambaDataCollator(tokenizer=self.tokenizer) - - use_batch_sampler_collator = False - if is_eval is False and training_args.sample_packing: - use_batch_sampler_collator = True - if is_eval and training_args.eval_sample_packing: - use_batch_sampler_collator = True - - collator: Type[ - Union[ - V2BatchSamplerDataCollatorForSeq2Seq, - BatchSamplerDataCollatorForSeq2Seq, - DataCollatorForSeq2Seq, - DataCollatorWithFlattening, - RewardDataCollatorWithPadding, - ] - ] - collator_args = [self.tokenizer] - if self.cfg.reward_model: - collator = RewardDataCollatorWithPadding - if "max_length" in kwargs: - kwargs.pop("max_length") - elif use_batch_sampler_collator: - if self.cfg.flex_attention: - collator = V2BatchSamplerDataCollatorForSeq2Seq - elif self.cfg.model_config_type in SUPPORTED_MULTIPACK_MODEL_TYPES: - collator = V2BatchSamplerDataCollatorForSeq2Seq - elif ( - self.cfg.model_config_type in ["llama"] - and self.cfg.flash_attention is not True - ): - collator = V2BatchSamplerDataCollatorForSeq2Seq - else: - collator = BatchSamplerDataCollatorForSeq2Seq - else: - if self.cfg.processor_type and self.processor: - collator = MultiModalChatDataCollator - kwargs["processing_strategy"] = get_processing_strategy( - self.processor, - training_args.chat_template, - self.cfg.chat_template, - image_size=training_args.image_size, - image_resize_algorithm=training_args.image_resize_algorithm, - ) - elif self.cfg.batch_flattening: - collator = DataCollatorWithFlattening - collator_args.pop(0) - kwargs.pop("pad_to_multiple_of", None) - kwargs.pop("padding", None) - elif self.cfg.kd_trainer: - from axolotl.integrations.kd.collator import ( - DataCollatorForKD, - KDBatchSamplerDataCollatorForSeq2Seq, - ) - - if self.cfg.sample_packing: - collator = KDBatchSamplerDataCollatorForSeq2Seq - else: - collator = DataCollatorForKD - else: - collator = DataCollatorForSeq2Seq - - kwargs["return_tensors"] = "pt" - - return collator( - *collator_args, - **kwargs, - ) - - -class HFRLTrainerBuilder(TrainerBuilderBase): - """Trainer factory class for TRL-based RLHF trainers (e.g. DPO)""" - - def get_callbacks(self): - callbacks = super().get_callbacks() - - return callbacks - - def get_post_trainer_create_callbacks(self, trainer): - callbacks = super().get_post_trainer_create_callbacks(trainer=trainer) - return callbacks - - def build_training_arguments(self, total_num_steps): - training_args_kwargs = {} - for arg in [ - "adam_beta1", - "adam_beta2", - "adam_epsilon", - "dataloader_num_workers", - "dataloader_pin_memory", - ]: - if hasattr(self.cfg, arg) and getattr(self.cfg, arg) is not None: - training_args_kwargs[arg] = getattr(self.cfg, arg) - - if self.cfg.hub_model_id: - training_args_kwargs["hub_model_id"] = self.cfg.hub_model_id - training_args_kwargs["push_to_hub"] = True - training_args_kwargs["hub_private_repo"] = True - training_args_kwargs["hub_always_push"] = True - - if self.cfg.hub_strategy: - training_args_kwargs["hub_strategy"] = self.cfg.hub_strategy - - if self.cfg.save_safetensors is not None: - training_args_kwargs["save_safetensors"] = self.cfg.save_safetensors - - if self.eval_dataset: - training_args_kwargs["eval_strategy"] = "steps" - training_args_kwargs["eval_steps"] = self.cfg.eval_steps - else: - training_args_kwargs["eval_strategy"] = "no" - - if self.cfg.bf16 or self.cfg.bfloat16: - training_args_kwargs["bf16"] = True - - training_args_kwargs["loraplus_lr_ratio"] = self.cfg.loraplus_lr_ratio - training_args_kwargs["loraplus_lr_embedding"] = self.cfg.loraplus_lr_embedding - training_args_kwargs["lr_scheduler_type"] = ( - self.cfg.lr_scheduler if self.cfg.lr_scheduler else "cosine" - ) - training_args_kwargs["lr_scheduler_kwargs"] = ( - self.cfg.lr_scheduler_kwargs if self.cfg.lr_scheduler_kwargs else {} - ) - if self.cfg.remove_unused_columns is not None: - training_args_kwargs["remove_unused_columns"] = ( - self.cfg.remove_unused_columns - ) - else: - training_args_kwargs["remove_unused_columns"] = False - - if self.cfg.dataloader_pin_memory is not None: - training_args_kwargs["dataloader_pin_memory"] = ( - self.cfg.dataloader_pin_memory - ) - if self.cfg.dataloader_num_workers is not None: - training_args_kwargs["dataloader_num_workers"] = ( - self.cfg.dataloader_num_workers - ) - if self.cfg.dataloader_prefetch_factor is not None: - training_args_kwargs["dataloader_prefetch_factor"] = ( - self.cfg.dataloader_prefetch_factor - ) - - if self.cfg.seed is not None: - training_args_kwargs["seed"] = self.cfg.seed - - if self.cfg.gradient_checkpointing: - training_args_kwargs["gradient_checkpointing"] = ( - self.cfg.gradient_checkpointing - ) - if self.cfg.gradient_checkpointing_kwargs is not None: - training_args_kwargs["gradient_checkpointing_kwargs"] = ( - self.cfg.gradient_checkpointing_kwargs - ) - else: - training_args_kwargs["gradient_checkpointing_kwargs"] = { - "use_reentrant": False - } - - # set save_strategy and save_steps - if self.cfg.save_steps: - training_args_kwargs["save_strategy"] = "steps" - training_args_kwargs["save_steps"] = self.cfg.save_steps - elif self.cfg.save_strategy: - training_args_kwargs["save_strategy"] = self.cfg.save_strategy - else: - # default to saving each epoch if not defined - training_args_kwargs["save_strategy"] = "epoch" - - training_args_kwargs["save_only_model"] = self.cfg.save_only_model - - if self.cfg.dataset_processes: - training_args_kwargs["dataset_num_proc"] = self.cfg.dataset_processes - - if self.cfg.trl and self.cfg.trl.beta is not None: - training_args_kwargs["beta"] = self.cfg.trl.beta - elif self.cfg.rl_beta is not None: - training_args_kwargs["beta"] = self.cfg.rl_beta - elif self.cfg.orpo_alpha is not None: - # trl does some odd mapping of alpha to beta to reuse the beta parameter ??? - training_args_kwargs["beta"] = self.cfg.orpo_alpha - - if self.cfg.rpo_alpha is not None: - training_args_kwargs["rpo_alpha"] = self.cfg.rpo_alpha - - if self.cfg.use_wandb: - training_args_kwargs["run_name"] = self.cfg.wandb_name - - training_args_cls = None - blocklist_args_kwargs = [] - if self.cfg.rl is RLType.SIMPO: - training_args_cls = AxolotlCPOConfig - training_args_kwargs["loss_type"] = "simpo" - training_args_kwargs["max_length"] = self.cfg.sequence_len - training_args_kwargs["simpo_gamma"] = self.cfg.simpo_gamma - if self.cfg.cpo_alpha is not None: - training_args_kwargs["cpo_alpha"] = self.cfg.cpo_alpha - - elif self.cfg.rl is RLType.ORPO: - training_args_cls = AxolotlORPOConfig - training_args_kwargs["max_length"] = self.cfg.sequence_len - if self.cfg.max_prompt_len: - training_args_kwargs["max_prompt_length"] = self.cfg.max_prompt_len - - elif self.cfg.rl is RLType.KTO: - training_args_cls = AxolotlKTOConfig - - training_args_kwargs["desirable_weight"] = ( - self.cfg.kto_desirable_weight or 1.0 - ) - training_args_kwargs["undesirable_weight"] = ( - self.cfg.kto_undesirable_weight or 1.0 - ) - - training_args_kwargs["max_length"] = self.cfg.sequence_len - if self.cfg.max_prompt_len: - training_args_kwargs["max_prompt_length"] = self.cfg.max_prompt_len - - elif self.cfg.rl is RLType.GRPO: - training_args_cls = GRPOStrategy.get_training_args_class() - training_args_kwargs.update(GRPOStrategy.set_training_args_kwargs(self.cfg)) - blocklist_args_kwargs = GRPOStrategy.get_blocklist_args_kwargs() - - else: - training_args_cls = AxolotlDPOConfig - if self.cfg.rl is RLType.IPO: - training_args_kwargs["loss_type"] = "ipo" - training_args_kwargs["max_length"] = self.cfg.sequence_len - training_args_kwargs["max_completion_length"] = None - training_args_kwargs["max_prompt_length"] = self.cfg.sequence_len - training_args_kwargs["generate_during_eval"] = self.cfg.use_wandb - if self.cfg.dpo_use_weighting is not None: - training_args_kwargs["use_weighting"] = self.cfg.dpo_use_weighting - if self.cfg.dpo_use_logits_to_keep is not None: - training_args_kwargs["use_logits_to_keep"] = ( - self.cfg.dpo_use_logits_to_keep - ) - - for blocklist_key in blocklist_args_kwargs: - if blocklist_key in training_args_kwargs: - del training_args_kwargs[blocklist_key] - - max_steps = self.cfg.max_steps or total_num_steps or -1 - training_args_kwargs["num_train_epochs"] = self.cfg.num_epochs - training_args = training_args_cls( # pylint: disable=unexpected-keyword-arg - self.cfg.output_dir, - per_device_train_batch_size=self.cfg.micro_batch_size, - max_steps=max_steps, - gradient_accumulation_steps=self.cfg.gradient_accumulation_steps, - learning_rate=self.cfg.learning_rate, - warmup_steps=self.cfg.warmup_steps, - logging_first_step=True, - logging_steps=1, - optim=self.cfg.optimizer, - save_total_limit=self.cfg.save_total_limit or 5, - **training_args_kwargs, - ) - - # unset run_name so wandb sets up experiment names - if self.cfg.use_wandb and training_args.run_name == training_args.output_dir: - training_args.run_name = ( # pylint: disable=attribute-defined-outside-init - None - ) - - return training_args - - def build(self, total_num_steps): - training_args = self.build_training_arguments(total_num_steps) - trainer_kwargs = {} - if self.cfg.rl is RLType.IPO: - if self.cfg.dpo_label_smoothing: - trainer_kwargs["label_smoothing"] = self.cfg.dpo_label_smoothing - if self.eval_dataset: - trainer_kwargs["eval_dataset"] = self.eval_dataset - if self.cfg.adapter and self.peft_config: - if self.cfg.rl is not RLType.GRPO: - trainer_kwargs["peft_config"] = self.peft_config - if self.cfg.precompute_ref_log_probs is not None: - trainer_kwargs["precompute_ref_log_probs"] = ( - self.cfg.precompute_ref_log_probs - ) - if self.cfg.rl is RLType.GRPO: - trainer_cls = GRPOStrategy.get_trainer_class( - sequence_parallel=self.cfg.sequence_parallel_degree > 1 - ) - trainer_cls_args = [self.model] - trainer_cls_args.extend(GRPOStrategy.set_trainer_args(self.cfg)) - trainer_kwargs.update(GRPOStrategy.set_trainer_kwargs(self.cfg)) - elif self.cfg.rl in [RLType.DPO, RLType.IPO]: - trainer_cls = DPOStrategy.get_trainer_class() - trainer_cls_args = [self.model, self.model_ref] - elif self.cfg.rl is RLType.ORPO: - trainer_cls = AxolotlORPOTrainer - trainer_cls_args = [self.model] - elif self.cfg.rl is RLType.KTO: - trainer_cls = AxolotlKTOTrainer - trainer_cls_args = [self.model] - elif self.cfg.rl is RLType.SIMPO: - trainer_cls = AxolotlCPOTrainer - trainer_cls_args = [self.model] - else: - raise ValueError(f"Unsupported RL: {self.cfg.rl}") - - if self.cfg.plugins: - plugin_manager = PluginManager.get_instance() - temp_trainer_cls = plugin_manager.get_trainer_cls(self.cfg) - if temp_trainer_cls is not None: - trainer_cls = temp_trainer_cls - - sig = inspect.signature(trainer_cls) - if "tokenizer" in sig.parameters.keys(): - trainer_kwargs["tokenizer"] = self.tokenizer - else: - trainer_kwargs["processing_class"] = self.tokenizer - - if self.cfg.datasets is not None and ( - trainer_cls is DPOStrategy.get_trainer_class() - ): - trainer_kwargs["dataset_tags"] = [ - d["path"] for d in self.cfg.datasets if not Path(d["path"]).is_dir() - ] - trainer = trainer_cls( - *trainer_cls_args, - args=training_args, - train_dataset=self.train_dataset, - callbacks=self.get_callbacks(), - **trainer_kwargs, - ) - if self.cfg.fsdp: - ensure_dtype(trainer.model, dtype=self.cfg.torch_dtype) - if self.cfg.rl in [RLType.DPO, RLType.IPO] and trainer.ref_model: - ensure_dtype(trainer.ref_model, dtype=self.cfg.torch_dtype) - - trainer = self.hook_post_create_trainer(trainer) - for callback in self.get_post_trainer_create_callbacks(trainer): - trainer.add_callback(callback) - - return trainer - - -class HFPPOTrainerBuilder(TrainerBuilderBase): - """ - HF Factory class for PPO Trainer - """ - - def get_callbacks(self): - callbacks = super().get_callbacks() - return callbacks - - def get_post_trainer_create_callbacks(self, trainer): - callbacks = super().get_post_trainer_create_callbacks(trainer=trainer) - return callbacks - - def build(self, total_num_steps): - # build PPOConfig - pass diff --git a/src/axolotl/core/trainers/dpo/trainer.py b/src/axolotl/core/trainers/dpo/trainer.py index c2c80c0bc..15af80c02 100644 --- a/src/axolotl/core/trainers/dpo/trainer.py +++ b/src/axolotl/core/trainers/dpo/trainer.py @@ -5,65 +5,31 @@ from functools import wraps from typing import Any, Dict, Union import torch -from peft.optimizers import create_loraplus_optimizer from torch import nn -from transformers import Trainer -from transformers.utils import is_sagemaker_mp_enabled from trl import DPOTrainer from axolotl.core.trainers.mixins import RngLoaderMixin, SchedulerMixin +from axolotl.core.trainers.mixins.optimizer import OptimizerInitMixin, OptimizerMixin from axolotl.core.trainers.utils import ( sanitize_kwargs_for_ds_tagging, sanitize_kwargs_for_tagging, ) -if is_sagemaker_mp_enabled(): - import smdistributed.modelparallel.torch as smp - -class AxolotlDPOTrainer(RngLoaderMixin, SchedulerMixin, DPOTrainer): +class AxolotlDPOTrainer( + RngLoaderMixin, SchedulerMixin, OptimizerMixin, OptimizerInitMixin, DPOTrainer +): """Extend the base DPOTrainer for axolotl helpers.""" tag_names = ["axolotl", "dpo"] def __init__(self, *args, dataset_tags=None, **kwargs): super().__init__(*args, **kwargs) + self.dataset_tags = dataset_tags self.optimizer = None self.model_accepts_loss_kwargs = False - def create_optimizer(self): - # pylint: disable=duplicate-code - if self.args.loraplus_lr_ratio is None: - return super().create_optimizer() - - opt_model = self.model_wrapped if is_sagemaker_mp_enabled() else self.model - if self.optimizer is None: # pylint: disable=access-member-before-definition - optimizer_cls, optimizer_kwargs = Trainer.get_optimizer_cls_and_kwargs( - self.args, - opt_model, - ) - - loraplus_lr_ratio = getattr(self.args, "loraplus_lr_ratio", None) - if loraplus_lr_ratio: - print("Using lora+") - loraplus_lr_embedding = getattr(self.args, "loraplus_lr_embedding", None) - # pylint: disable=duplicate-code - self.optimizer = create_loraplus_optimizer( # pylint: disable=attribute-defined-outside-init - opt_model, - optimizer_cls, - loraplus_lr_ratio=loraplus_lr_ratio, - loraplus_lr_embedding=loraplus_lr_embedding, - **optimizer_kwargs, - ) - - if is_sagemaker_mp_enabled(): - self.optimizer = smp.DistributedOptimizer( # pylint: disable=attribute-defined-outside-init - self.optimizer - ) - - return self.optimizer - @wraps(DPOTrainer.push_to_hub) def push_to_hub(self, *args, **kwargs) -> str: """ diff --git a/src/axolotl/core/trainers/grpo/__init__.py b/src/axolotl/core/trainers/grpo/__init__.py index 196cdb56a..a37c8baca 100644 --- a/src/axolotl/core/trainers/grpo/__init__.py +++ b/src/axolotl/core/trainers/grpo/__init__.py @@ -132,7 +132,7 @@ class GRPOStrategy: @classmethod def get_blocklist_args_kwargs(cls) -> list[str]: - return ["dataset_num_proc"] + return ["dataset_num_proc", "max_length"] @classmethod def get_reward_func(cls, reward_func_fqn: str) -> RewardFunc: @@ -167,4 +167,4 @@ class GRPOStrategy: LOG.info( f"Reward function {reward_func_fqn} is a pre-trained model path - if this is unexpected, please check the reward function path." ) - return reward_func + return reward_func_fqn diff --git a/src/axolotl/core/trainers/grpo/trainer.py b/src/axolotl/core/trainers/grpo/trainer.py index b5b3912cf..5c93c69df 100644 --- a/src/axolotl/core/trainers/grpo/trainer.py +++ b/src/axolotl/core/trainers/grpo/trainer.py @@ -43,6 +43,7 @@ from trl.trainer.utils import pad from axolotl.core.trainers.grpo.sampler import SequenceParallelRepeatRandomSampler from axolotl.core.trainers.mixins import RngLoaderMixin, SchedulerMixin +from axolotl.core.trainers.mixins.optimizer import OptimizerInitMixin, OptimizerMixin from axolotl.monkeypatch.ring_attn import get_ring_attn_group if is_peft_available(): @@ -50,7 +51,9 @@ if is_peft_available(): from peft import PeftConfig -class AxolotlGRPOTrainer(RngLoaderMixin, SchedulerMixin, GRPOTrainer): +class AxolotlGRPOTrainer( + RngLoaderMixin, SchedulerMixin, OptimizerMixin, OptimizerInitMixin, GRPOTrainer +): """Extend the base GRPOTrainer for axolotl helpers""" _tag_names = ["trl", "grpo", "axolotl"] @@ -77,6 +80,7 @@ class AxolotlGRPOSequenceParallelTrainer(AxolotlGRPOTrainer): torch.optim.Optimizer | None, torch.optim.lr_scheduler.LambdaLR | None ] = (None, None), peft_config: "PeftConfig | None" = None, + optimizer_cls_and_kwargs: tuple[type, dict] | None = None, ): # First call the superclass constructor with all arguments super().__init__( @@ -90,6 +94,7 @@ class AxolotlGRPOSequenceParallelTrainer(AxolotlGRPOTrainer): callbacks=callbacks, optimizers=optimizers, peft_config=peft_config, + optimizer_cls_and_kwargs=optimizer_cls_and_kwargs, ) # Get number of SP groups (number of processes divided by SP degree) diff --git a/src/axolotl/core/trainers/mixins/optimizer.py b/src/axolotl/core/trainers/mixins/optimizer.py index abb662706..a9a9a3992 100644 --- a/src/axolotl/core/trainers/mixins/optimizer.py +++ b/src/axolotl/core/trainers/mixins/optimizer.py @@ -198,3 +198,20 @@ class OptimizerMixin(Trainer): ) return self.optimizer + + +class OptimizerInitMixin: + """ + Mixin to handle common optimizer initialization logic for Trainers (mostly TRL) that do not + accept optimizer_cls_and_kwargs as kwarg in constructor. + """ + + def __init__(self, *args, **kwargs): + optimizer_cls_and_kwargs = kwargs.pop("optimizer_cls_and_kwargs", None) + super().__init__(*args, **kwargs) + if ( + optimizer_cls_and_kwargs + and self.optimizer_cls_and_kwargs is None + and self.optimizer is None + ): + self.optimizer_cls_and_kwargs = optimizer_cls_and_kwargs diff --git a/src/axolotl/core/trainers/trl.py b/src/axolotl/core/trainers/trl.py index b2c5c54ca..bf0d50dee 100644 --- a/src/axolotl/core/trainers/trl.py +++ b/src/axolotl/core/trainers/trl.py @@ -1,7 +1,5 @@ """Module for TRL PPO trainer""" -from typing import Literal, Union - import torch from tqdm import tqdm from trl import ( @@ -14,6 +12,7 @@ from trl import ( ) from axolotl.core.trainers.mixins import RngLoaderMixin +from axolotl.core.trainers.mixins.optimizer import OptimizerInitMixin, OptimizerMixin from axolotl.core.trainers.mixins.scheduler import SchedulerMixin @@ -75,87 +74,19 @@ class TRLPPOTrainer(PPOTrainer): ) -class AxolotlORPOTrainer(RngLoaderMixin, SchedulerMixin, ORPOTrainer): +class AxolotlORPOTrainer( + RngLoaderMixin, SchedulerMixin, OptimizerMixin, OptimizerInitMixin, ORPOTrainer +): """ Extend the base ORPOTrainer for axolotl helpers """ tag_names = ["axolotl", "orpo"] - def get_batch_loss_metrics( - self, - model, - batch: dict[str, Union[list, torch.LongTensor]], - train_eval: Literal["train", "eval"] = "train", - ): - """Compute the ORPO loss and other metrics for the given batch of inputs for train or test.""" - # TODO remove once https://github.com/huggingface/trl/pull/3069 is included in a trl release - - metrics = {} - - forward_output = self.concatenated_forward(model, batch) - ( - policy_chosen_logps, - policy_rejected_logps, - policy_chosen_logits, - policy_rejected_logits, - policy_nll_loss, - ) = forward_output[:5] - if self.aux_loss_enabled: - aux_loss = forward_output[5] - - losses, chosen_rewards, rejected_rewards, log_odds_ratio, log_odds_chosen = ( - self.odds_ratio_loss(policy_chosen_logps, policy_rejected_logps) - ) - # full ORPO loss - loss = policy_nll_loss - losses.mean() - - reward_accuracies = (chosen_rewards > rejected_rewards).float() - - prefix = "eval_" if train_eval == "eval" else "" - metrics[f"{prefix}rewards/chosen"] = self.accelerator.gather_for_metrics( - chosen_rewards - ).mean() - metrics[f"{prefix}rewards/rejected"] = self.accelerator.gather_for_metrics( - rejected_rewards - ).mean() - metrics[f"{prefix}rewards/accuracies"] = self.accelerator.gather_for_metrics( - reward_accuracies - ).mean() - metrics[f"{prefix}rewards/margins"] = self.accelerator.gather_for_metrics( - chosen_rewards - rejected_rewards - ).mean() - metrics[f"{prefix}logps/rejected"] = ( - self.accelerator.gather_for_metrics(policy_rejected_logps).detach().mean() - ) - metrics[f"{prefix}logps/chosen"] = ( - self.accelerator.gather_for_metrics(policy_chosen_logps).detach().mean() - ) - metrics[f"{prefix}logits/rejected"] = self.accelerator.gather_for_metrics( - policy_rejected_logits.detach().mean() - ).mean() - metrics[f"{prefix}logits/chosen"] = self.accelerator.gather_for_metrics( - policy_chosen_logits.detach().mean() - ).mean() - metrics[f"{prefix}nll_loss"] = ( - self.accelerator.gather_for_metrics(policy_nll_loss).detach().mean() - ) - metrics[f"{prefix}log_odds_ratio"] = ( - self.accelerator.gather_for_metrics(log_odds_ratio).detach().mean() - ) - metrics[f"{prefix}log_odds_chosen"] = ( - self.accelerator.gather_for_metrics(log_odds_chosen).detach().mean() - ) - for k, v in metrics.items(): - metrics[k] = v.item() - if self.aux_loss_enabled: - loss += self.aux_loss_coef * aux_loss - - return loss, metrics - - -class AxolotlKTOTrainer(RngLoaderMixin, SchedulerMixin, KTOTrainer): +class AxolotlKTOTrainer( + RngLoaderMixin, SchedulerMixin, OptimizerMixin, OptimizerInitMixin, KTOTrainer +): """ Extend the base KTOTrainer for axolotl helpers """ @@ -163,89 +94,19 @@ class AxolotlKTOTrainer(RngLoaderMixin, SchedulerMixin, KTOTrainer): tag_names = ["axolotl", "kto"] -class AxolotlCPOTrainer(RngLoaderMixin, SchedulerMixin, CPOTrainer): +class AxolotlCPOTrainer( + RngLoaderMixin, SchedulerMixin, OptimizerMixin, OptimizerInitMixin, CPOTrainer +): """ Extend the base CPOTrainer for axolotl helpers """ tag_names = ["axolotl", "cpo"] - def get_batch_loss_metrics( - self, - model, - batch: dict[str, Union[list, torch.LongTensor]], - train_eval: Literal["train", "eval"] = "train", - ): - """Compute the CPO loss and other metrics for the given batch of inputs for train or test.""" - metrics = {} - forward_output = self.concatenated_forward(model, batch) - ( - policy_chosen_logps, - policy_rejected_logps, - policy_chosen_logits, - policy_rejected_logits, - policy_nll_loss, - ) = forward_output[:5] - if self.aux_loss_enabled: - aux_loss = forward_output[5] - - losses, chosen_rewards, rejected_rewards = self.cpo_loss( - policy_chosen_logps, - policy_rejected_logps, - ) - - loss = losses.mean() + self.cpo_alpha * policy_nll_loss - reward_accuracies = (chosen_rewards > rejected_rewards).float() - - prefix = "eval_" if train_eval == "eval" else "" - metrics[f"{prefix}rewards/chosen"] = ( - self.accelerator.gather_for_metrics(chosen_rewards).mean().item() - ) - metrics[f"{prefix}rewards/rejected"] = ( - self.accelerator.gather_for_metrics(rejected_rewards).mean().item() - ) - metrics[f"{prefix}rewards/accuracies"] = ( - self.accelerator.gather_for_metrics(reward_accuracies).mean().item() - ) - metrics[f"{prefix}rewards/margins"] = ( - self.accelerator.gather_for_metrics(chosen_rewards - rejected_rewards) - .mean() - .item() - ) - metrics[f"{prefix}logps/rejected"] = ( - self.accelerator.gather_for_metrics(policy_rejected_logps) - .detach() - .mean() - .item() - ) - metrics[f"{prefix}logps/chosen"] = ( - self.accelerator.gather_for_metrics(policy_chosen_logps) - .detach() - .mean() - .item() - ) - metrics[f"{prefix}logits/rejected"] = ( - self.accelerator.gather_for_metrics(policy_rejected_logits.detach().mean()) - .mean() - .item() - ) - metrics[f"{prefix}logits/chosen"] = ( - self.accelerator.gather_for_metrics(policy_chosen_logits.detach().mean()) - .mean() - .item() - ) - metrics[f"{prefix}nll_loss"] = ( - self.accelerator.gather_for_metrics(policy_nll_loss).detach().mean().item() - ) - - if self.aux_loss_enabled: - loss += self.aux_loss_coef * aux_loss - - return loss, metrics - - -class AxolotlRewardTrainer(RngLoaderMixin, SchedulerMixin, RewardTrainer): +class AxolotlRewardTrainer( + RngLoaderMixin, SchedulerMixin, OptimizerMixin, OptimizerInitMixin, RewardTrainer +): """ Extend the base RewardTrainer for axolotl helpers """ @@ -253,7 +114,9 @@ class AxolotlRewardTrainer(RngLoaderMixin, SchedulerMixin, RewardTrainer): tag_names = ["axolotl", "reward"] -class AxolotlPRMTrainer(RngLoaderMixin, SchedulerMixin, PRMTrainer): +class AxolotlPRMTrainer( + RngLoaderMixin, SchedulerMixin, OptimizerMixin, OptimizerInitMixin, PRMTrainer +): """ Extend the base trl.PRMTrainer for axolotl helpers """ diff --git a/src/axolotl/core/training_args.py b/src/axolotl/core/training_args.py index 9c93f77c7..42488e643 100644 --- a/src/axolotl/core/training_args.py +++ b/src/axolotl/core/training_args.py @@ -164,12 +164,6 @@ class AxolotlTrainingMixins: default=None, metadata={"help": "whether to use sequential sampling for curriculum learning"}, ) - alternate_optimizer: Optional[str] = field( - default=None, - metadata={ - "help": "workaround to pass an alternate optimizer to the HF trainer" - }, - ) alternate_lr_scheduler_type: Optional[str] = field( default=None, metadata={ diff --git a/src/axolotl/train.py b/src/axolotl/train.py index 68ba3a124..b59bd8a75 100644 --- a/src/axolotl/train.py +++ b/src/axolotl/train.py @@ -25,7 +25,7 @@ from axolotl.common.datasets import TrainDatasetMeta from axolotl.contribs.lgpl import ( # pylint: disable = no-name-in-module fix_untrained_tokens, ) -from axolotl.core.trainer_builder import HFCausalTrainerBuilder, HFRLTrainerBuilder +from axolotl.core.builders import HFCausalTrainerBuilder, HFRLTrainerBuilder from axolotl.integrations.base import PluginManager from axolotl.loaders import ( ModelLoader, diff --git a/src/axolotl/utils/callbacks/__init__.py b/src/axolotl/utils/callbacks/__init__.py index d94f4be74..8b8a77611 100644 --- a/src/axolotl/utils/callbacks/__init__.py +++ b/src/axolotl/utils/callbacks/__init__.py @@ -46,7 +46,7 @@ from axolotl.utils.logging import get_logger from axolotl.utils.schemas.config import AxolotlInputConfig if TYPE_CHECKING: - from axolotl.core.trainer_builder import AxolotlTrainingArguments + from axolotl.core.training_args import AxolotlTrainingArguments IGNORE_INDEX = -100 diff --git a/src/axolotl/utils/callbacks/comet_.py b/src/axolotl/utils/callbacks/comet_.py index b7e9034b0..7dce95145 100644 --- a/src/axolotl/utils/callbacks/comet_.py +++ b/src/axolotl/utils/callbacks/comet_.py @@ -9,7 +9,7 @@ from axolotl.utils.distributed import is_main_process from axolotl.utils.logging import get_logger if TYPE_CHECKING: - from axolotl.core.trainer_builder import AxolotlTrainingArguments + from axolotl.core.training_args import AxolotlTrainingArguments LOG = get_logger(__name__) diff --git a/src/axolotl/utils/callbacks/lisa.py b/src/axolotl/utils/callbacks/lisa.py index ad7e23144..348cdf2da 100644 --- a/src/axolotl/utils/callbacks/lisa.py +++ b/src/axolotl/utils/callbacks/lisa.py @@ -15,7 +15,7 @@ from transformers import TrainerCallback from axolotl.utils.logging import get_logger if TYPE_CHECKING: - from axolotl.core.trainer_builder import AxolotlTrainer + from axolotl.core.trainers import AxolotlTrainer LOG = get_logger(__name__) diff --git a/src/axolotl/utils/callbacks/mlflow_.py b/src/axolotl/utils/callbacks/mlflow_.py index 15f8ef069..ac72f5e6d 100644 --- a/src/axolotl/utils/callbacks/mlflow_.py +++ b/src/axolotl/utils/callbacks/mlflow_.py @@ -12,7 +12,7 @@ from axolotl.utils.distributed import is_main_process from axolotl.utils.logging import get_logger if TYPE_CHECKING: - from axolotl.core.trainer_builder import AxolotlTrainingArguments + from axolotl.core.training_args import AxolotlTrainingArguments LOG = get_logger(__name__) diff --git a/src/axolotl/utils/data/rl.py b/src/axolotl/utils/data/rl.py index eeea6f207..9264c86ab 100644 --- a/src/axolotl/utils/data/rl.py +++ b/src/axolotl/utils/data/rl.py @@ -71,8 +71,9 @@ def map_dataset(cfg, data_set, ds_transform_fn, tokenizer, **map_kwargs): data_set = data_set.map( ds_transform_fn, - desc="Mapping RL Dataset", num_proc=cfg.dataset_processes, + load_from_cache_file=not cfg.is_preprocess, + desc="Mapping RL Dataset", **map_kwargs, ) diff --git a/src/axolotl/utils/schemas/config.py b/src/axolotl/utils/schemas/config.py index 698befa19..e7bd16892 100644 --- a/src/axolotl/utils/schemas/config.py +++ b/src/axolotl/utils/schemas/config.py @@ -101,6 +101,7 @@ class AxolotlInputConfig( # If `None`, default is `False` in the trainer. dpo_use_weighting: bool | None = None dpo_use_logits_to_keep: bool | None = None + dpo_label_smoothing: float | None = None datasets: ( Annotated[ diff --git a/src/axolotl/utils/trainer.py b/src/axolotl/utils/trainer.py index c08504d73..67f590a37 100644 --- a/src/axolotl/utils/trainer.py +++ b/src/axolotl/utils/trainer.py @@ -16,7 +16,7 @@ from datasets import IterableDataset, disable_caching, enable_caching from torch.utils.data import DataLoader, RandomSampler, SequentialSampler from transformers.utils import is_torch_bf16_gpu_available -from axolotl.core.trainer_builder import HFCausalTrainerBuilder, HFRLTrainerBuilder +from axolotl.core.builders import HFCausalTrainerBuilder, HFRLTrainerBuilder from axolotl.monkeypatch.trainer_eval_guard import patch_evaluation_loop_for_fsdp2 from axolotl.utils.distributed import reduce_and_broadcast from axolotl.utils.environment import check_cuda_p2p_ib_support diff --git a/tests/core/test_builders.py b/tests/core/test_builders.py new file mode 100644 index 000000000..cde7b74ce --- /dev/null +++ b/tests/core/test_builders.py @@ -0,0 +1,595 @@ +"""Unit tests for axolotl.core.builders""" + +# pylint: disable=protected-access + +import sys +from pathlib import Path +from unittest.mock import patch + +import pytest + +from axolotl.common.datasets import load_datasets +from axolotl.core.builders import HFCausalTrainerBuilder, HFRLTrainerBuilder +from axolotl.loaders import ModelLoader, load_tokenizer +from axolotl.utils.config import normalize_config +from axolotl.utils.data.rl import load_prepare_preference_datasets +from axolotl.utils.dict import DictDefault +from axolotl.utils.schemas.enums import RLType + +from tests.constants import ALPACA_MESSAGES_CONFIG_REVISION + + +@pytest.fixture(name="base_cfg") +def fixture_base_cfg(): + """ + Base config with all common arguments between SFT and RLHF + """ + cfg = DictDefault( + { + # Model and tokenizer settings + "base_model": "HuggingFaceTB/SmolLM2-135M-Instruct", + "sequence_len": 2048, + "model_config_type": "llama", # example type + # Basic training settings + "micro_batch_size": 2, + "eval_batch_size": 2, + "num_epochs": 1, + "gradient_accumulation_steps": 1, + "max_steps": 100, + "val_set_size": 0, + # Optimizer settings + "optimizer": "adamw_torch_fused", + "learning_rate": 0.00005, + "weight_decay": 0.01, + "adam_beta1": 0.998, + "adam_beta2": 0.9, + "adam_epsilon": 0.00001, + "max_grad_norm": 1.0, + # LR scheduler settings + "lr_scheduler": "cosine", + "lr_scheduler_kwargs": {"foo": "bar"}, + "warmup_steps": 10, + "warmup_ratio": None, + "cosine_min_lr_ratio": 0.1, + "cosine_constant_lr_ratio": 0.2, + # Checkpointing and saving + "save_steps": 100, + "output_dir": "./model-out", + "save_safetensors": True, + "save_total_limit": 4, + "save_only_model": False, + # Hardware/performance settings + "gradient_checkpointing": False, + "gradient_checkpointing_kwargs": {"use_reentrant": False}, + "dataloader_num_workers": 1, + "dataloader_pin_memory": True, + "dataloader_prefetch_factor": 2, + "sequence_parallel_degree": 1, + # Dtype + "fp16": False, + "bf16": False, + "tf32": False, + # Logging and evaluation + "logging_steps": 10, + "eval_steps": 50, + "eval_strategy": "steps", + "save_strategy": "steps", + "include_tokens_per_second": True, + # Other common settings + "seed": 42, + "remove_unused_columns": True, + "ddp_timeout": 1800, + "ddp_bucket_cap_mb": 25, + "ddp_broadcast_buffers": False, + } + ) + + normalize_config(cfg) + return cfg + + +@pytest.fixture(name="dpo_cfg") +def fixture_dpo_cfg(base_cfg): + cfg = base_cfg.copy() + cfg.update( + { + "rl": RLType.DPO, + "dpo_use_weighting": True, + "dpo_use_logits_to_keep": True, + "dpo_label_smoothing": 0.1, + "beta": 0.1, # DPO beta + } + ) + return cfg + + +@pytest.fixture(name="orpo_cfg") +def fixture_orpo_cfg(base_cfg): + cfg = base_cfg.copy() + cfg.update( + { + "rl": RLType.ORPO, + "orpo_alpha": 0.1, + "max_prompt_len": 512, + } + ) + return cfg + + +@pytest.fixture(name="kto_cfg") +def fixture_kto_cfg(base_cfg): + cfg = base_cfg.copy() + cfg.update( + { + "rl": RLType.KTO, + "kto_desirable_weight": 1.0, + "kto_undesirable_weight": 1.0, + "max_prompt_len": 512, + } + ) + return cfg + + +@pytest.fixture(name="grpo_cfg") +def fixture_grpo_cfg(base_cfg): + cfg = base_cfg.copy() + cfg.update( + { + "rl": RLType.GRPO, + "trl": DictDefault( + { + "beta": 0.001, + "max_completion_length": 256, + "use_vllm": False, # run on CPU + # "vllm_device": "auto", + # "vllm_gpu_memory_utilization": 0.15, + "num_generations": 4, + "reward_funcs": ["rewards.rand_reward_func"], + } + ), + # Must be evenly divisible by num_generations + "micro_batch_size": 4, + } + ) + return cfg + + +@pytest.fixture(name="ipo_cfg") +def fixture_ipo_cfg(base_cfg): + cfg = base_cfg.copy() + cfg.update( + { + "rl": RLType.IPO, + "dpo_label_smoothing": 0, + "beta": 0.1, + } + ) + return cfg + + +@pytest.fixture(name="simpo_cfg") +def fixture_simpo_cfg(base_cfg): + cfg = base_cfg.copy() + cfg.update( + { + "rl": RLType.SIMPO, + "rl_beta": 0.2, + "cpo_alpha": 0.9, + "simpo_gamma": 0.4, + } + ) + return cfg + + +@pytest.fixture(name="sft_cfg") +def fixture_sft_cfg(base_cfg): + cfg = base_cfg.copy() + cfg.update( + { + "rl": None, + "sample_packing": False, + "eval_sample_packing": False, + "flash_attention": False, + } + ) + return cfg + + +@pytest.fixture(name="rm_cfg") +def fixture_rm_cfg(sft_cfg): + cfg = sft_cfg.copy() + cfg.update( + DictDefault( + { + "reward_model": True, + "datasets": [ + { + "path": "argilla/distilabel-intel-orca-dpo-pairs", + "type": "bradley_terry.chat_template", + "split": "train[:1%]", + } + ], + } + ) + ) + return cfg + + +@pytest.fixture(name="prm_cfg") +def fixture_prm_cfg(sft_cfg): + cfg = sft_cfg.copy() + cfg.update( + DictDefault( + { + "process_reward_model": True, + "datasets": [ + { + "path": "trl-lib/math_shepherd", + "type": "stepwise_supervised", + "split": "train[:1%]", + } + ], + } + ) + ) + return cfg + + +@pytest.fixture(name="tokenizer") +def fixture_tokenizer(base_cfg): + return load_tokenizer(base_cfg) + + +@pytest.fixture(name="model") +def fixture_model(base_cfg, tokenizer): + model, _ = ModelLoader(base_cfg, tokenizer).load() + return model + + +class TestHFRLTrainerBuilder: + """ + TestCase class for RLHF trainer builders + """ + + def _test_common_training_arguments(self, training_arguments, rl: str): + """Helper to test common arguments across all variants""" + # Basic training settings + if rl == "grpo": + # grpo_cfg's micro_batch_size is diff from others + assert training_arguments.per_device_train_batch_size == 4 + else: + assert training_arguments.per_device_train_batch_size == 2 + assert training_arguments.gradient_accumulation_steps == 1 + assert training_arguments.max_steps == 100 + + # Optimizer settings + assert training_arguments.learning_rate == 0.00005 + assert training_arguments.weight_decay == 0.01 + assert training_arguments.adam_beta1 == 0.998 + assert training_arguments.adam_beta2 == 0.9 + assert training_arguments.adam_epsilon == 0.00001 + assert training_arguments.max_grad_norm == 1.0 + + # LR scheduler settings + assert training_arguments.lr_scheduler_type == "cosine" + assert training_arguments.warmup_steps == 10 + assert training_arguments.cosine_min_lr_ratio == 0.1 + assert training_arguments.cosine_constant_lr_ratio == 0.2 + + # Other settings + assert training_arguments.dataloader_num_workers == 1 + assert training_arguments.dataloader_pin_memory is True + assert training_arguments.gradient_checkpointing is False + + def test_dpo_training_arguments(self, dpo_cfg, model, tokenizer): + builder = HFRLTrainerBuilder(dpo_cfg, model, tokenizer) + training_arguments, _ = builder._build_training_arguments(100) + + self._test_common_training_arguments(training_arguments, rl=dpo_cfg.rl) + # DPO specific + assert training_arguments.beta == 0.1 + assert hasattr(training_arguments, "use_weighting") + assert training_arguments.use_weighting is True + assert training_arguments.label_smoothing == 0.1 + + def test_orpo_training_arguments(self, orpo_cfg, model, tokenizer): + builder = HFRLTrainerBuilder(orpo_cfg, model, tokenizer) + training_arguments, _ = builder._build_training_arguments(100) + + self._test_common_training_arguments(training_arguments, rl=orpo_cfg.rl) + # ORPO specific + assert training_arguments.beta == 0.1 # maps from orpo_alpha + assert training_arguments.max_prompt_length == 512 + + def test_kto_training_arguments(self, kto_cfg, model, tokenizer): + builder = HFRLTrainerBuilder(kto_cfg, model, tokenizer) + training_arguments, _ = builder._build_training_arguments(100) + + self._test_common_training_arguments(training_arguments, rl=kto_cfg.rl) + # KTO specific + assert training_arguments.desirable_weight == 1.0 + assert training_arguments.undesirable_weight == 1.0 + assert training_arguments.max_prompt_length == 512 + + def _write_rewards_file(self, rewards_dir: Path): + """ + Writes reward function to local tmp path to be loaded on trainer building + """ + # Create rewards.py in a directory we can import from + rewards_dir.mkdir() + rewards_file = rewards_dir / "rewards.py" + rewards_file.write_text( + """import random +def rand_reward_func(prompts, completions) -> list[float]: + return [random.uniform(0, 1) for _ in completions] +""" + ) + + def test_grpo_training_arguments(self, grpo_cfg, model, tokenizer, tmp_path): + + rewards_dir = tmp_path / "rewards_test" + self._write_rewards_file(rewards_dir) + + # Add the directory to Python path so we can import the module + sys.path.insert(0, str(rewards_dir)) + + try: + builder = HFRLTrainerBuilder(grpo_cfg, model, tokenizer) + training_arguments, _ = builder._build_training_arguments(100) + + self._test_common_training_arguments(training_arguments, rl=grpo_cfg.rl) + # GRPO specific + assert training_arguments.beta == 0.001 + assert training_arguments.max_completion_length == 256 + assert training_arguments.use_vllm is False + # assert training_arguments.vllm_device == "auto" + # assert training_arguments.vllm_gpu_memory_utilization == 0.15 + assert training_arguments.num_generations == 4 + + # Test trainer creation to verify reward_funcs + trainer = builder.build(100) + + # Verify reward functions are properly loaded + assert len(trainer.reward_funcs) == 1 + assert trainer.reward_funcs[0].__module__ == "rewards" + assert trainer.reward_funcs[0].__name__ == "rand_reward_func" + finally: + # remove imported module from path + if str(rewards_dir) in sys.path: + sys.path.remove(str(rewards_dir)) + + def test_ipo_training_arguments(self, ipo_cfg, model, tokenizer): + builder = HFRLTrainerBuilder(ipo_cfg, model, tokenizer) + training_arguments, _ = builder._build_training_arguments(100) + + self._test_common_training_arguments(training_arguments, rl=ipo_cfg.rl) + # IPO specific + assert training_arguments.beta == 0.1 + assert training_arguments.loss_type == "ipo" + assert training_arguments.label_smoothing == 0 + + def test_simpo_training_arguments(self, simpo_cfg, model, tokenizer): + builder = HFRLTrainerBuilder(simpo_cfg, model, tokenizer) + training_arguments, _ = builder._build_training_arguments(100) + + self._test_common_training_arguments(training_arguments, rl=simpo_cfg.rl) + # SIMPO specific + assert training_arguments.beta == 0.2 + assert training_arguments.cpo_alpha == 0.9 + assert training_arguments.simpo_gamma == 0.4 + + @pytest.mark.parametrize( + ("cfg_string", "dataset_name"), + [ + ( + "dpo_cfg", + "dataset_fozziethebeat_alpaca_messages_2k_dpo_test_rev_ea82cff", + ), + ( + "ipo_cfg", + "dataset_fozziethebeat_alpaca_messages_2k_dpo_test_rev_ea82cff", + ), + ( + "grpo_cfg", + "dataset_fozziethebeat_alpaca_messages_2k_dpo_test_rev_ea82cff", + ), + ("orpo_cfg", None), # don't use fixture for orpo to use smaller split + ("kto_cfg", None), # no fixture for kto + ( + "simpo_cfg", + "dataset_fozziethebeat_alpaca_messages_2k_dpo_test_rev_ea82cff", + ), + ], + ) + def test_custom_optimizer_cls_and_kwargs( + self, + request, + cfg_string, + dataset_name, + tmp_path, + model, + tokenizer, + ): + cfg = request.getfixturevalue(cfg_string) + + builder = HFRLTrainerBuilder(cfg, model, tokenizer) + cfg["optimizer"] = "muon" + + if cfg_string in ["dpo_cfg", "ipo_cfg", "grpo_cfg", "simpo_cfg"]: + cfg["datasets"] = [DictDefault(ALPACA_MESSAGES_CONFIG_REVISION)] + elif cfg_string == "kto_cfg": + cfg["datasets"] = [ + DictDefault( + { + "path": "argilla/ultrafeedback-binarized-preferences-cleaned-kto", + "type": "llama3.ultra", + "split": "train[:1%]", + } + ) + ] + elif cfg_string == "orpo_cfg": + cfg["datasets"] = [ + DictDefault( + { + "path": "argilla/ultrafeedback-binarized-preferences-cleaned", + "type": "chat_template.argilla", + "split": "train[:1%]", + } + ) + ] + else: + raise ValueError(f"Unhandled cfg_string: {cfg_string}") + + if cfg_string == "grpo_cfg": + rewards_dir = tmp_path / "rewards_test" + self._write_rewards_file(rewards_dir) + + # Add the directory to Python path so we can import the module + sys.path.insert(0, str(rewards_dir)) + + try: + # Only use mock for the commented out configs + if dataset_name is not None: + with patch( + "axolotl.utils.data.rl.load_dataset_w_config" + ) as mock_load_dataset: + mock_load_dataset.return_value = request.getfixturevalue( + dataset_name + ) + train_dataset, eval_dataset = load_prepare_preference_datasets(cfg) + else: + # Load actual datasets for orpo_cfg and kto_cfg + train_dataset, eval_dataset = load_prepare_preference_datasets(cfg) + + builder.train_dataset = train_dataset + builder.eval_dataset = eval_dataset + + trainer = builder.build(100) + + assert trainer.optimizer_cls_and_kwargs is not None + + from axolotl.contribs.mit.muon import ( # pylint: disable=no-name-in-module + Muon, + MuonOptimizerFactory, + ) + + optimizer_cls, optimizer_kwargs = trainer.optimizer_cls_and_kwargs + assert optimizer_cls is MuonOptimizerFactory + assert optimizer_kwargs["lr"] == 0.00005 + assert optimizer_kwargs["weight_decay"] == 0.01 + assert optimizer_kwargs["betas"] == (0.998, 0.9) + assert optimizer_kwargs["eps"] == 0.00001 + + # Ensure optimizer is created with correct class + optim = trainer.create_optimizer() + assert isinstance(optim, Muon) + + finally: + # remove imported module from path + if cfg_string == "grpo_cfg" and str(rewards_dir) in sys.path: + sys.path.remove(str(rewards_dir)) + + +class TestHFCausalTrainerBuilder: + """ + TestCase class for SFT trainer builder + """ + + def test_training_arguments(self, sft_cfg, model, tokenizer): + builder = HFCausalTrainerBuilder(sft_cfg, model, tokenizer) + trainer = builder.build(100) + training_arguments = trainer.args + + # Test common arguments + assert training_arguments.per_device_train_batch_size == 2 + assert training_arguments.gradient_accumulation_steps == 1 + assert training_arguments.max_steps == 100 + + assert training_arguments.learning_rate == 0.00005 + assert training_arguments.weight_decay == 0.01 + assert training_arguments.adam_beta1 == 0.998 + assert training_arguments.adam_beta2 == 0.9 + assert training_arguments.adam_epsilon == 0.00001 + assert training_arguments.max_grad_norm == 1.0 + + assert training_arguments.lr_scheduler_type == "cosine" + assert training_arguments.warmup_steps == 10 + assert training_arguments.cosine_min_lr_ratio == 0.1 + + assert training_arguments.dataloader_num_workers == 1 + assert training_arguments.dataloader_pin_memory is True + assert training_arguments.gradient_checkpointing is False + + # SFT specific + assert training_arguments.sample_packing is False + assert training_arguments.eval_sample_packing is False + + @pytest.mark.parametrize( + "cfg_string", + [ + "sft_cfg", + "rm_cfg", + "prm_cfg", + ], + ) + def test_custom_optimizer_cls_and_kwargs( + self, request, cfg_string, model, tokenizer + ): + cfg = request.getfixturevalue(cfg_string) + builder = HFCausalTrainerBuilder(cfg, model, tokenizer) + cfg["optimizer"] = "muon" + + # need to load datasets for reward model and process reward model trainer + if cfg_string in ["rm_cfg", "prm_cfg"]: + dataset_meta = load_datasets(cfg=cfg) + + builder.train_dataset = dataset_meta.train_dataset + builder.eval_dataset = dataset_meta.eval_dataset + + trainer = builder.build(100) + + assert trainer.optimizer_cls_and_kwargs is not None + + from axolotl.contribs.mit.muon import ( # pylint: disable=no-name-in-module + Muon, + MuonOptimizerFactory, + ) + + optimizer_cls, optimizer_kwargs = trainer.optimizer_cls_and_kwargs + assert optimizer_cls is MuonOptimizerFactory + assert optimizer_kwargs["lr"] == 0.00005 + assert optimizer_kwargs["weight_decay"] == 0.01 + assert optimizer_kwargs["betas"] == (0.998, 0.9) + assert optimizer_kwargs["eps"] == 0.00001 + + # Ensure optimizer is created with correct class + optim = trainer.create_optimizer() + assert isinstance(optim, Muon) + + +class TestTrainerClsPlugin: + """ + TestCase class for trainer builder with plugin + """ + + def test_trainer_cls_is_not_none_with_plugin(self, kto_cfg, model, tokenizer): + """ + Test that the trainer cls is not none with plugin + + Fixes #2693 + """ + cfg = kto_cfg.copy() + cfg.plugins = ["axolotl.integrations.liger.LigerPlugin"] + + # Expected AttributeError as we don't pass regular model configs to RL trainer builder + # If it throws `TypeError: None is not a callable object`, trainer_cls could be None + try: + builder = HFRLTrainerBuilder(cfg, model, tokenizer) + + builder.build(100) + except TypeError as e: + # Error raised if trainer_cls is None + assert "'tuple' object has no attribute 'config'" not in str(e) + except Exception: # pylint: disable=broad-exception-caught + # Another error happens, so we passed trainer_cls to builder + pass diff --git a/tests/core/test_trainer_builder.py b/tests/core/test_trainer_builder.py deleted file mode 100644 index 492578c40..000000000 --- a/tests/core/test_trainer_builder.py +++ /dev/null @@ -1,90 +0,0 @@ -"""Unit tests for axolotl.core.trainer_builder""" - -import pytest - -from axolotl.core.trainer_builder import HFRLTrainerBuilder -from axolotl.loaders import ModelLoader, load_tokenizer -from axolotl.utils.config import normalize_config -from axolotl.utils.dict import DictDefault -from axolotl.utils.schemas.enums import RLType - - -@pytest.fixture(name="cfg") -def fixture_cfg(): - cfg = DictDefault( - { - "base_model": "HuggingFaceTB/SmolLM2-135M", - "micro_batch_size": 1, - "gradient_accumulation_steps": 1, - "learning_rate": 0.00005, - "save_steps": 100, - "output_dir": "./model-out", - "warmup_steps": 10, - "gradient_checkpointing": False, - "optimizer": "adamw_torch_fused", - "sequence_len": 2048, - "rl": True, - "adam_beta1": 0.998, - "adam_beta2": 0.9, - "adam_epsilon": 0.00001, - "dataloader_num_workers": 1, - "dataloader_pin_memory": True, - "model_config_type": "llama", - "special_tokens": { - "pad_token": "<|endoftext|>", - }, - } - ) - - normalize_config(cfg) - - return cfg - - -@pytest.fixture(name="tokenizer") -def fixture_tokenizer(cfg): - return load_tokenizer(cfg) - - -@pytest.fixture(name="model") -def fixture_model(cfg, tokenizer): - return ModelLoader(cfg, tokenizer).load() - - -class TestHFRLTrainerBuilder: - """ - TestCase class for DPO trainer builder - """ - - def test_build_training_arguments(self, cfg, model, tokenizer): - builder = HFRLTrainerBuilder(cfg, model, tokenizer) - training_arguments = builder.build_training_arguments(100) - assert training_arguments.adam_beta1 == 0.998 - assert training_arguments.adam_beta2 == 0.9 - assert training_arguments.adam_epsilon == 0.00001 - assert training_arguments.dataloader_num_workers == 1 - assert training_arguments.dataloader_pin_memory is True - - -class TestTrainerClsPlugin: - """ - TestCase class for trainer builder with plugin - """ - - def test_trainer_cls_is_not_none_with_plugin(self, cfg, model, tokenizer): - """ - Test that the trainer cls is not none with plugin - - Fixes #2693 - """ - cfg.plugins = ["axolotl.integrations.liger.LigerPlugin"] - cfg.rl = RLType.KTO - - # Expected AttributeError as we don't pass regular model configs to RL trainer builder - # If it throws `TypeError: None is not a callable object`, trainer_cls could be None - with pytest.raises( - AttributeError, match=r".*'tuple' object has no attribute 'config'.*" - ): - builder = HFRLTrainerBuilder(cfg, model, tokenizer) - - builder.build(100) diff --git a/tests/e2e/test_imports.py b/tests/e2e/test_imports.py index fc0843479..050e4dfb3 100644 --- a/tests/e2e/test_imports.py +++ b/tests/e2e/test_imports.py @@ -11,11 +11,11 @@ class TestImports(unittest.TestCase): """ def test_import_causal_trainer(self): - from axolotl.core.trainer_builder import ( # pylint: disable=unused-import # noqa: F401 + from axolotl.core.builders import ( # pylint: disable=unused-import # noqa: F401 HFCausalTrainerBuilder, ) def test_import_rl_trainer(self): - from axolotl.core.trainer_builder import ( # pylint: disable=unused-import # noqa: F401 + from axolotl.core.builders import ( # pylint: disable=unused-import # noqa: F401 HFRLTrainerBuilder, )