Compare commits

..

1 Commits

Author SHA1 Message Date
Wing Lian
c620a218b8 tiled_mlp supports single gpu (#2891)
Some checks failed
ci-cd / build-axolotl (<nil>, 126, 12.6.3, 3.11, 2.6.0) (push) Has been cancelled
ci-cd / build-axolotl (<nil>, 126, 12.6.3, 3.11, 2.7.1) (push) Has been cancelled
ci-cd / build-axolotl (<nil>, 128, 12.8.1, 3.11, 2.7.1) (push) Has been cancelled
ci-cd / build-axolotl (vllm, 126, 12.6.3, 3.11, 2.7.0) (push) Has been cancelled
publish pypi / Create Release (push) Has been cancelled
ci-cd / build-axolotl-cloud (<nil>, 126, 12.6.3, 3.11, 2.7.0) (push) Has been cancelled
ci-cd / build-axolotl-cloud (<nil>, 126, 12.6.3, 3.11, 2.7.1) (push) Has been cancelled
ci-cd / build-axolotl-cloud (<nil>, 126, 12.6.3, true, 3.11, 2.6.0) (push) Has been cancelled
ci-cd / build-axolotl-cloud (<nil>, 128, 12.8.1, 3.11, 2.7.1) (push) Has been cancelled
ci-cd / build-axolotl-cloud-no-tmux (<nil>, 126, 12.6.3, 3.11, 2.6.0) (push) Has been cancelled
publish pypi / Upload release to PyPI (push) Has been cancelled
* tiled_mlp supports single gpu

* use checkpoint offloading for arctic training

* patch torch checkpoint too

* support for single gpu zero3

* add linkback to where it was copied from
2025-07-09 12:48:51 -04:00
122 changed files with 786 additions and 2601 deletions

View File

@@ -33,13 +33,6 @@ jobs:
axolotl_extras:
num_gpus: 2
nightly_build: "true"
- cuda: 126
cuda_version: 12.6.3
python_version: "3.11"
pytorch: 2.7.0
axolotl_extras: vllm
num_gpus: 2
nightly_build: "true"
- cuda: 126
cuda_version: 12.6.3
python_version: "3.11"

View File

@@ -12,16 +12,11 @@ jobs:
fail-fast: false
matrix:
include:
- cuda: 126
cuda_version: 12.6.3
- cuda: 124
cuda_version: 12.4.1
python_version: "3.11"
pytorch: 2.6.0
axolotl_extras:
- cuda: 126
cuda_version: 12.6.3
python_version: "3.11"
pytorch: 2.7.1
axolotl_extras:
runs-on: axolotl-gpu-runner
steps:
- name: Checkout
@@ -65,15 +60,15 @@ jobs:
strategy:
matrix:
include:
- cuda: 126
cuda_version: 12.6.3
- cuda: 124
cuda_version: 12.4.1
python_version: "3.11"
pytorch: 2.6.0
axolotl_extras:
- cuda: 126
cuda_version: 12.6.3
python_version: "3.11"
pytorch: 2.7.1
pytorch: 2.6.0
axolotl_extras:
runs-on: axolotl-gpu-runner
steps:

View File

@@ -28,8 +28,6 @@ jobs:
steps:
- name: Check out repository
uses: actions/checkout@v4
with:
ref: ${{ github.event.pull_request.head.sha }}
- name: Set up Quarto
uses: quarto-dev/quarto-actions/setup@v2
@@ -52,11 +50,10 @@ jobs:
- name: Netlify Publish
uses: nwtgck/actions-netlify@v3.0
id: netlify
with:
publish-dir: './_site'
enable-pull-request-comment: false
enable-github-deployment: false
enable-pull-request-comment: true
enable-github-deployment: true
github-token: ${{ secrets.GITHUB_TOKEN }}
deploy-message: "Deployed On Netlify"
github-deployment-environment: 'preview'
@@ -64,13 +61,3 @@ jobs:
env:
NETLIFY_AUTH_TOKEN: ${{ secrets.NETLIFY_AUTH_TOKEN }}
NETLIFY_SITE_ID: ${{ secrets.NETLIFY_SITE_ID }}
- name: Update PR with preview link
if: ${{ steps.netlify.outcome == 'success' }}
uses: marocchino/sticky-pull-request-comment@v2
with:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
message: |
📖 **Documentation Preview**: ${{ steps.netlify.outputs.deploy-url }}
Deployed on Netlify from commit ${{ github.event.pull_request.head.sha }}

View File

@@ -97,7 +97,7 @@
# # 'no_input_format' cannot include {input}
# no_input_format: "{instruction} "
# # For `completion` datasets only, uses the provided field instead of `text` column
# # For `completion` datsets only, uses the provided field instead of `text` column
# field:
# # Axolotl attempts to save the dataset as an arrow after packing the data together so

View File

@@ -276,7 +276,6 @@ website:
- docs/torchao.qmd
- docs/custom_integrations.qmd
- docs/sequence_parallelism.qmd
- docs/gradient_checkpointing.qmd
- section: "Troubleshooting"
contents:

View File

@@ -187,7 +187,6 @@ Instead of passing `tools` via the system prompt, an alternative method would be
"role": "assistant", // call the function via assistant
"tool_calls": [
{
"id": "...", // required only for mistral
"type": "function",
"function": {
"name": "...",
@@ -200,7 +199,6 @@ Instead of passing `tools` via the system prompt, an alternative method would be
},
{
"role": "tool",
"tool_call_id": "...", // required only for mistral
"name": "...",
"content": "..."
},

View File

@@ -34,7 +34,6 @@ Tags examples:
- `main-base-py3.11-cu128-2.7.1`
- `main-base-py3.11-cu126-2.7.1`
- `main-base-py3.11-cu126-2.7.0`
- `main-base-py3.11-cu126-2.6.0`
- `main-base-py3.11-cu124-2.6.0`
@@ -76,7 +75,6 @@ Tags examples:
- `main-py3.11-cu128-2.7.1`
- `main-py3.11-cu126-2.7.1`
- `main-py3.11-cu126-2.7.0`
- `main-py3.11-cu126-2.6.0`
- `main-py3.11-cu124-2.6.0`
- `main-latest`

View File

@@ -1,29 +0,0 @@
---
title: Gradient Checkpointing and Activation Offloading
---
Gradient checkpointing and activation offloading are techniques used to optimize the performance of deep learning
models by reducing the memory footprint and improving computational efficiency.
### Enabling Gradient Checkpointing
```yaml
gradient_checkpointing: true
```
### Enabling Activation Offloading
```yaml
gradient_checkpointing: true # required for activation offloading
activation_offloading: true
```
Activation offloading variants:
The default `activation_offloading: true` offloads activations to CPU and uses CUDA streams
to overlap the communications and computations when offloading.
The `activation_offloading: legacy` naively offloads activations to CPU and without additional optimizations.
For resource constrained environments with limited CPU memory, `activation_offloading: disk` offloads
activations to disk instead of CPU RAM so that much larger context lengths can be trained with minimal memory.

View File

@@ -23,6 +23,8 @@ Axolotl supports several methods for multi-GPU training:
## DeepSpeed {#sec-deepspeed}
DeepSpeed is the recommended approach for multi-GPU training due to its stability and performance. It provides various optimization levels through ZeRO stages.
### Configuration {#sec-deepspeed-config}
Add to your YAML config:
@@ -30,6 +32,7 @@ Add to your YAML config:
```{.yaml}
deepspeed: deepspeed_configs/zero1.json
```
### Usage {#sec-deepspeed-usage}
```{.bash}
@@ -72,66 +75,9 @@ ZeRO Stage 3 can be used for training on a single GPU by manually setting the en
:::
## Fully Sharded Data Parallel (FSDP) {#sec-fsdp}
## FSDP {#sec-fsdp}
::: {.callout-note}
FSDP2 is recommended for new users. FSDP1 is deprecated and will be removed in an upcoming release of Axolotl.
:::
### Migrating from FSDP1 to FSDP2 {#sec-migrate-fsdp1-fsdp2}
To migrate your config from FSDP1 to FSDP2, you must use the `fsdp_version` top-level config field to specify the FSDP version, and
also follow the config field mapping below to update field names.
#### Config mapping
FSDP1 | FSDP2
-------- | --------
fsdp_sharding_strategy | reshard_after_forward
fsdp_backward_prefetch_policy | **REMOVED**
fsdp_backward_prefetch | **REMOVED**
fsdp_forward_prefetch | **REMOVED**
fsdp_sync_module_states | **REMOVED**
fsdp_cpu_ram_efficient_loading | cpu_ram_efficient_loading
fsdp_state_dict_type | state_dict_type
fsdp_use_orig_params | **REMOVED**
For example, if you were using the following FSDP1 config:
```{.yaml}
fsdp_version: 1
fsdp_config:
fsdp_offload_params: false
fsdp_cpu_ram_efficient_loading: true
fsdp_auto_wrap_policy: TRANSFORMER_BASED_WRAP
fsdp_transformer_layer_cls_to_wrap: Qwen3DecoderLayer
fsdp_state_dict_type: FULL_STATE_DICT
fsdp_sharding_strategy: FULL_SHARD
```
You can migrate to the following FSDP2 config:
```{.yaml}
fsdp_version: 2
fsdp_config:
offload_params: false
cpu_ram_efficient_loading: true
auto_wrap_policy: TRANSFORMER_BASED_WRAP
transformer_layer_cls_to_wrap: Qwen3DecoderLayer
state_dict_type: FULL_STATE_DICT
reshard_after_forward: true
```
### FSDP1 (deprecated) {#sec-fsdp-config}
::: {.callout-note}
Using `fsdp` to configure FSDP is deprecated and will be removed in an upcoming release of Axolotl. Please use `fsdp_config` as above instead.
:::
### Basic FSDP Configuration {#sec-fsdp-config}
```{.yaml}
fsdp:
@@ -143,7 +89,6 @@ fsdp_config:
fsdp_transformer_layer_cls_to_wrap: LlamaDecoderLayer
```
## Sequence parallelism {#sec-sequence-parallelism}
We support sequence parallelism (SP) via the

View File

@@ -40,13 +40,13 @@ use_cpu: false
Configure your model to use FSDP in the Axolotl yaml. For example:
```yaml
fsdp_version: 2
fsdp:
- full_shard
- auto_wrap
fsdp_config:
offload_params: true
state_dict_type: FULL_STATE_DICT
auto_wrap_policy: TRANSFORMER_BASED_WRAP
transformer_layer_cls_to_wrap: LlamaDecoderLayer
reshard_after_forward: true
fsdp_offload_params: true
fsdp_state_dict_type: FULL_STATE_DICT
fsdp_transformer_layer_cls_to_wrap: LlamaDecoderLayer
```
All you have to do now is launch using accelerate as you would usually do on each machine and voila, the processes will start once you have launched accelerate on every machine.

View File

@@ -17,6 +17,7 @@ feedback. Various methods include, but not limited to:
- [Kahneman-Tversky Optimization (KTO)](#kto)
- [Odds Ratio Preference Optimization (ORPO)](#orpo)
- [Group Relative Policy Optimization (GRPO)](#grpo)
- Proximal Policy Optimization (PPO) (not yet supported in axolotl, if you're interested in contributing, please reach out!)
## RLHF using Axolotl
@@ -274,14 +275,15 @@ rl: dpo
datasets:
- path: ...
split: train
type:
field_prompt: "prompt"
field_system: "system"
field_chosen: "chosen"
field_rejected: "rejected"
prompt_format: "{prompt}"
chosen_format: "{chosen}"
rejected_format: "{rejected}"
type: user_defined.default
field_prompt: "prompt"
field_system: "system"
field_chosen: "chosen"
field_rejected: "rejected"
prompt_format: "{prompt}"
chosen_format: "{chosen}"
rejected_format: "{rejected}"
```
The input format is a simple JSON input with customizable fields based on the above config.
@@ -474,13 +476,14 @@ rl: kto
datasets:
- path: ...
split: train
type:
field_prompt: "prompt"
field_system: "system"
field_completion: "completion"
field_label: "label"
prompt_format: "{prompt}"
completion_format: "{completion}"
type: user_defined.default
field_prompt: "prompt"
field_system: "system"
field_completion: "completion"
field_label: "label"
prompt_format: "{prompt}"
completion_format: "{completion}"
```
The input format is a simple JSON input with customizable fields based on the above config.

View File

@@ -1,5 +0,0 @@
# Archived Examples
This directory contains examples that are no longer maintained and may no longer be functional.
We keep them around for archival purposes in case they are useful to others.

View File

@@ -1,12 +1,8 @@
# Finetune Devstral with Axolotl
Devstral Small is a 24B parameter opensource model from MistralAI found on HuggingFace [Devstral-Small-2505](https://huggingface.co/mistralai/Devstral-Small-2505) and [Devstral-Small-2507](https://huggingface.co/mistralai/Devstral-Small-2507). `Devstral-Small-2507` is the latest version of the model and has [function calling](https://mistralai.github.io/mistral-common/usage/tools/) support.
Devstral Small is a 24B parameter opensource model from MistralAI found on HuggingFace [Devstral-Small-2505](https://huggingface.co/mistralai/Devstral-Small-2505). This guide shows how to fine-tune it with Axolotl with multi-turn conversations with proper masking.
This guide shows how to fine-tune it with Axolotl with multi-turn conversations with proper masking.
The model was fine-tuned ontop of [Mistral-Small-3.1](https://huggingface.co/mistralai/Mistral-Small-3.1-24B-Base-2503) without the vision layer and has a context of up to 128k tokens.
Thanks to the team at MistralAI for giving us early access to prepare for this release.
The model was fine-tuned ontop of [Mistral-Small-3.1](https://huggingface.co/mistralai/Mistral-Small-3.1-24B-Base-2503) without the vision layer and has a context of upto 128k tokens.
## Getting started
@@ -21,6 +17,11 @@ cd axolotl
pip3 install packaging==23.2 setuptools==75.8.0 wheel ninja
pip3 install --no-build-isolation -e '.[flash-attn]'
# Install the latest mistral-common from source
pip3 uninstall mistral-common
pip3 install git+https://github.com/mistralai/mistral-common.git@039465d
```
2. Run the finetuning example:
@@ -38,7 +39,6 @@ Let us know how it goes. Happy finetuning! 🚀
- You can run a full finetuning by removing the `adapter: qlora` and `load_in_4bit: true` from the config.
- Read more on how to load your own dataset at [docs](https://docs.axolotl.ai/docs/dataset_loading.html).
- The dataset format follows the OpenAI Messages format as seen [here](https://docs.axolotl.ai/docs/dataset-formats/conversation.html#chat_template).
- Learn how to use function calling with Axolotl at [docs](https://docs.axolotl.ai/docs/dataset-formats/conversation.html#using-tool-use).
## Optimization Guides
@@ -57,7 +57,6 @@ In addition, we do not support overriding tokens yet.
## Related Resources
- [MistralAI Devstral Blog](https://mistral.ai/news/devstral)
- [MistralAI Devstral 1.1 Blog](https://mistral.ai/news/devstral-2507)
- [Axolotl Docs](https://docs.axolotl.ai)
- [Axolotl GitHub](https://github.com/axolotl-ai-cloud/axolotl)
- [Axolotl Website](https://axolotl.ai)

View File

@@ -1,4 +1,4 @@
base_model: mistralai/Devstral-Small-2507
base_model: mistralai/Devstral-Small-2505
# Automatically upload checkpoint and final model to HF
# hub_model_id: username/custom_model_name

View File

@@ -1,7 +0,0 @@
# Liquid Foundation Models 2
LFM2 support in transformers exists in the main branch, but is not yet included in the transformers release.
```bash
pip install --upgrade --no-deps --force-reinstall git+https://github.com/huggingface/transformers.git
```

View File

@@ -1,48 +0,0 @@
base_model: LiquidAI/LFM2-350M
chunked_cross_entropy: true
chat_template: tokenizer_default
eot_tokens:
- "<|im_end|>"
datasets:
- path: mlabonne/FineTome-100k
type: chat_template
split: train[:20%]
field_messages: conversations
message_field_role: from
message_field_content: value
dataset_prepared_path: last_run_prepared
val_set_size: 0.05
output_dir: ./outputs/out
sequence_len: 4096
sample_packing: true
pad_to_sequence_len: true
wandb_project:
wandb_entity:
wandb_watch:
wandb_name:
wandb_log_model:
gradient_accumulation_steps: 2
micro_batch_size: 4
num_epochs: 1
optimizer: adamw_torch_fused
lr_scheduler: cosine
learning_rate: 5e-5
bf16: true
tf32: true
gradient_checkpointing: false
resume_from_checkpoint:
logging_steps: 1
flash_attention: true
warmup_ratio: 0.1
evals_per_epoch: 2
saves_per_epoch: 1
weight_decay: 0.0

View File

@@ -6,19 +6,19 @@ triton>=3.0.0
mamba-ssm==1.2.0.post1
xformers>=0.0.23.post1
autoawq==0.2.7.post3
liger-kernel==0.6.0
liger-kernel==0.5.10
# END section
packaging==23.2
huggingface_hub>=0.33.0
peft==0.16.0
transformers==4.53.2
huggingface_hub==0.32.2
peft==0.15.2
transformers==4.53.1
tokenizers>=0.21.1
accelerate==1.8.1
datasets==4.0.0
datasets==3.6.0
deepspeed>=0.17.0
trl==0.19.1
trl==0.18.2
hf_xet==1.1.2
optimum==1.16.2
@@ -68,4 +68,4 @@ schedulefree==1.4.1
axolotl-contribs-lgpl==0.0.6
axolotl-contribs-mit==0.0.3
mistral-common==1.7.0
mistral-common==1.6.3

View File

@@ -73,9 +73,9 @@ def parse_requirements(extras_require_map):
extras_require_map["vllm"] = ["vllm>=0.9.0"]
elif (major, minor) >= (2, 6):
_install_requires.pop(_install_requires.index(xformers_version))
_install_requires.append("xformers==0.0.29.post3")
# since we only support 2.6.0+cu126
_dependency_links.append("https://download.pytorch.org/whl/cu126")
_install_requires.append(
"xformers==0.0.29.post2"
) # vllm needs post2 w torch 2.6
extras_require_map["vllm"] = ["vllm==0.8.5.post1"]
elif (major, minor) >= (2, 5):
_install_requires.pop(_install_requires.index(xformers_version))
@@ -121,7 +121,7 @@ extras_require = {
"yunchang==0.6.0",
],
"deepspeed": [
"deepspeed==0.17.2",
"deepspeed==0.17.1",
"deepspeed-kernels",
],
"mamba-ssm": [

View File

@@ -4,4 +4,4 @@ import pkgutil
__path__ = pkgutil.extend_path(__path__, __name__) # Make this a namespace package
__version__ = "0.12.0.dev"
__version__ = "0.11.0"

View File

@@ -1,6 +1,5 @@
"""CLI to run preprocessing of a dataset."""
import os
import warnings
from pathlib import Path
from typing import Union
@@ -96,7 +95,6 @@ def do_cli(
kwargs: Additional keyword arguments to override config file values.
"""
# pylint: disable=duplicate-code
os.environ["AXOLOTL_IS_PREPROCESS"] = "1"
parsed_cfg = load_cfg(config, **kwargs)
parsed_cfg.is_preprocess = True
parser = transformers.HfArgumentParser(PreprocessCliArgs)

View File

@@ -109,13 +109,6 @@ def ray_train_func(kwargs: dict):
# initialize accelerator before model instantiation
Accelerator(gradient_accumulation_steps=cfg.gradient_accumulation_steps)
# Register plugins in Ray workers
if cfg.get("plugins"):
from axolotl.cli.config import plugin_set_cfg, prepare_plugins
prepare_plugins(cfg)
plugin_set_cfg(cfg)
kwargs["cfg"] = cfg
do_train(**kwargs)

View File

@@ -37,6 +37,7 @@ def do_vllm_serve(
Returns:
process_id: the process id of the started VLLM server
"""
patch_vllm_worker()
cfg = load_cfg(config)
model = cfg.base_model
@@ -46,9 +47,6 @@ def do_vllm_serve(
tensor_parallel_size = (
cli_args.get("tensor_parallel_size") or cfg.vllm.tensor_parallel_size
)
data_parallel_size = (
cli_args.get("data_parallel_size") or cfg.vllm.data_parallel_size
)
host = cli_args.get("host") or cfg.vllm.host
port = cli_args.get("port") or cfg.vllm.port
gpu_memory_utilization = (
@@ -70,7 +68,6 @@ def do_vllm_serve(
vllm_script_args = AxolotlScriptArguments(
model=model,
tensor_parallel_size=tensor_parallel_size,
data_parallel_size=data_parallel_size,
host=host,
port=port,
gpu_memory_utilization=gpu_memory_utilization,

View File

@@ -112,6 +112,13 @@ class TrainerBuilderBase(abc.ABC):
plugin_manager.add_callbacks_pre_trainer(cfg=self.cfg, model=self.model)
)
if self.cfg.profiler_steps:
callbacks.append(
PytorchProfilerCallback(
steps_to_profile=self.cfg.profiler_steps,
)
)
if self.cfg.gc_steps:
callbacks.append(GCCallback(gc_steps=self.cfg.gc_steps))
@@ -138,14 +145,6 @@ class TrainerBuilderBase(abc.ABC):
callbacks.append(GPUStatsCallback(cfg=self.cfg))
if self.cfg.profiler_steps:
callbacks.append(
PytorchProfilerCallback(
steps_to_profile=self.cfg.profiler_steps,
profiler_steps_start=self.cfg.profiler_steps_start,
)
)
return callbacks
def get_post_trainer_create_callbacks(self, trainer):
@@ -419,9 +418,6 @@ class TrainerBuilderBase(abc.ABC):
torch._dynamo.config.suppress_errors = ( # pylint: disable=protected-access
True
)
torch._dynamo.config.accumulated_cache_size_limit = ( # pylint: disable=protected-access
256
)
training_args_kwargs["torch_compile"] = self.cfg.torch_compile
if self.cfg.torch_compile_backend:
training_args_kwargs["torch_compile_backend"] = (
@@ -430,16 +426,8 @@ class TrainerBuilderBase(abc.ABC):
if self.cfg.torch_compile_mode:
training_args_kwargs["torch_compile_mode"] = self.cfg.torch_compile_mode
def _configure_accelerator_config(self, training_args_kwargs: dict):
if self.cfg.accelerator_config:
training_args_kwargs["accelerator_config"] = self.cfg.accelerator_config
def _configure_gradient_checkpointing(self, training_args_kwargs: dict):
if self.cfg.activation_offloading is True:
# don't use the HF gradient checkpointing, manually wrap
training_args_kwargs["gradient_checkpointing"] = False
training_args_kwargs["activation_offloading"] = True
elif self.cfg.gradient_checkpointing:
if self.cfg.gradient_checkpointing:
training_args_kwargs["gradient_checkpointing"] = (
self.cfg.gradient_checkpointing
)
@@ -513,15 +501,10 @@ class TrainerBuilderBase(abc.ABC):
if self.cfg.reward_model or self.cfg.rl:
training_args_kwargs["max_length"] = self.cfg.sequence_len
if self.cfg.fsdp_config or self.cfg.fsdp:
training_args_kwargs["fsdp_config"] = self.cfg.fsdp_config
training_args_kwargs["fsdp"] = self.cfg.fsdp if self.cfg.fsdp else True
self._configure_reporting(training_args_kwargs)
self._configure_hub_parameters(training_args_kwargs)
self._configure_scheduler(training_args_kwargs)
self._configure_optimizer(training_args_kwargs, trainer_kwargs)
self._configure_torch_compile(training_args_kwargs)
self._configure_accelerator_config(training_args_kwargs)
return training_args_kwargs, trainer_kwargs

View File

@@ -151,6 +151,14 @@ class HFCausalTrainerBuilder(TrainerBuilderBase):
training_arguments_kwargs, trainer_kwargs = self._set_base_training_args(
total_num_steps
)
if self.cfg.fsdp:
training_arguments_kwargs["fsdp"] = self.cfg.fsdp
if self.cfg.fsdp_config:
training_arguments_kwargs["fsdp_config"] = {
k.lstrip("fsdp_"): v for k, v in dict(self.cfg.fsdp_config).items()
}
if self.cfg.adapter == "qlora":
training_arguments_kwargs["qlora"] = True
@@ -310,6 +318,11 @@ class HFCausalTrainerBuilder(TrainerBuilderBase):
self.cfg.neftune_noise_alpha
)
if self.cfg.accelerator_config:
training_arguments_kwargs["accelerator_config"] = (
self.cfg.accelerator_config
)
if self.cfg.image_size:
training_arguments_kwargs["image_size"] = self.cfg.image_size
if self.cfg.image_resize_algorithm:

View File

@@ -208,7 +208,7 @@ class HFRLTrainerBuilder(TrainerBuilderBase):
callbacks=self.get_callbacks(),
**trainer_kwargs,
)
if self.cfg.fsdp_config or self.cfg.fsdp:
if self.cfg.fsdp:
ensure_dtype(trainer.model, dtype=self.cfg.torch_dtype)
if self.cfg.rl in [RLType.DPO, RLType.IPO] and trainer.ref_model:
ensure_dtype(trainer.ref_model, dtype=self.cfg.torch_dtype)
@@ -218,3 +218,21 @@ class HFRLTrainerBuilder(TrainerBuilderBase):
trainer.add_callback(callback)
return trainer
class HFPPOTrainerBuilder(TrainerBuilderBase):
"""
HF Factory class for PPO Trainer
"""
def get_callbacks(self):
callbacks = super().get_callbacks()
return callbacks
def get_post_trainer_create_callbacks(self, trainer):
callbacks = super().get_post_trainer_create_callbacks(trainer=trainer)
return callbacks
def build(self, total_num_steps):
# TODO: build PPOConfig
raise NotImplementedError("PPO trainer builder is not implemented yet.")

View File

@@ -14,4 +14,5 @@ from .trl import (
AxolotlORPOTrainer,
AxolotlPRMTrainer,
AxolotlRewardTrainer,
TRLPPOTrainer,
)

View File

@@ -25,7 +25,6 @@ from trl.trainer.utils import pad_to_length
from typing_extensions import override
from axolotl.core.trainers.mixins import (
ActivationOffloadingMixin,
CheckpointSaveMixin,
OptimizerMixin,
PackingMixin,
@@ -49,7 +48,6 @@ class AxolotlTrainer(
OptimizerMixin,
RngLoaderMixin,
CheckpointSaveMixin,
ActivationOffloadingMixin,
Trainer,
):
"""Extend the base Trainer for axolotl helpers"""
@@ -77,6 +75,18 @@ class AxolotlTrainer(
if self.args.orpo_alpha:
self.loss_fct = torch.nn.CrossEntropyLoss(reduction="none")
def _wrap_model(self, model, training=True, dataloader=None):
if self.args.torch_compile:
torch._dynamo.config.accumulated_cache_size_limit = ( # pylint: disable=protected-access
256
)
model = torch.compile(
model,
backend=self.args.torch_compile_backend,
mode=self.args.torch_compile_mode,
)
return super()._wrap_model(model, training=training, dataloader=dataloader)
def _create_multipack_sampler(
self, base_sampler: Sampler, dataset: Dataset
) -> MultipackBatchSampler:

View File

@@ -14,7 +14,6 @@ from axolotl.core.trainers.grpo.trainer import (
from axolotl.utils.dict import DictDefault
from axolotl.utils.logging import get_logger
from axolotl.utils.schemas.trl import TRLConfig
from axolotl.utils.schemas.vllm import VllmConfig
LOG = get_logger(__name__)
@@ -42,18 +41,9 @@ class GRPOStrategy:
return grpo_args_kwargs
trl: TRLConfig = cfg.trl # type: ignore
vllm_cfg: VllmConfig = cfg.vllm # type: ignore
if trl.use_vllm:
grpo_args_kwargs["use_vllm"] = trl.use_vllm
grpo_args_kwargs["vllm_mode"] = trl.vllm_mode
if trl.vllm_mode == "colocate":
grpo_args_kwargs["vllm_gpu_memory_utilization"] = (
vllm_cfg.gpu_memory_utilization
)
grpo_args_kwargs["vllm_tensor_parallel_size"] = (
vllm_cfg.tensor_parallel_size
)
grpo_args_kwargs["vllm_server_host"] = trl.vllm_server_host or trl.vllm.host # type: ignore[attr-defined]
grpo_args_kwargs["vllm_server_port"] = trl.vllm_server_port or trl.vllm.port # type: ignore[attr-defined]
if trl.vllm_server_timeout:

View File

@@ -59,6 +59,42 @@ class AxolotlGRPOTrainer(
_tag_names = ["trl", "grpo", "axolotl"]
def get_train_dataloader(self):
if self.train_dataset is None:
raise ValueError("Trainer: training requires a train_dataset.")
train_dataset = self.train_dataset
data_collator = self.data_collator
if isinstance(train_dataset, datasets.Dataset):
train_dataset = self._remove_unused_columns(
train_dataset, description="training"
)
else:
data_collator = self._get_collator_with_removed_columns(
data_collator, description="training"
)
dataloader_params = {
"batch_size": self._train_batch_size
* self.args.steps_per_generation, # < this is the change
"collate_fn": data_collator,
"num_workers": self.args.dataloader_num_workers,
"pin_memory": self.args.dataloader_pin_memory,
"persistent_workers": self.args.dataloader_persistent_workers,
}
if not isinstance(train_dataset, torch.utils.data.IterableDataset):
dataloader_params["sampler"] = self._get_train_sampler()
dataloader_params["drop_last"] = self.args.dataloader_drop_last
dataloader_params["worker_init_fn"] = partial(
seed_worker,
num_workers=self.args.dataloader_num_workers,
rank=self.args.process_index,
)
dataloader_params["prefetch_factor"] = self.args.dataloader_prefetch_factor
return self.accelerator.prepare(DataLoader(train_dataset, **dataloader_params))
class AxolotlGRPOSequenceParallelTrainer(AxolotlGRPOTrainer):
"""Extend the base GRPOTrainer for sequence parallelism handling"""
@@ -216,11 +252,7 @@ class AxolotlGRPOSequenceParallelTrainer(AxolotlGRPOTrainer):
dataloader_params["drop_last"] = self.args.dataloader_drop_last
if not is_eval:
dataloader_params["worker_init_fn"] = partial(
seed_worker,
num_workers=self.args.dataloader_num_workers,
rank=self.args.process_index,
)
dataloader_params["worker_init_fn"] = seed_worker
# Create the dataloader
dataloader = DataLoader(dataset, **dataloader_params)

View File

@@ -3,7 +3,6 @@
# pylint: disable=unused-import
# flake8: noqa
from .activation_checkpointing import ActivationOffloadingMixin
from .checkpoints import CheckpointSaveMixin
from .optimizer import OptimizerMixin
from .packing import PackingMixin

View File

@@ -1,37 +0,0 @@
"""
Trainer mixin for activation checkpointing w offloading
"""
import contextlib
from torch import nn
from torch.distributed.algorithms._checkpoint.checkpoint_wrapper import (
apply_activation_checkpointing,
)
from torch.distributed.fsdp.wrap import ModuleWrapPolicy
from transformers import GradientCheckpointingLayer, Trainer
from trl.models.activation_offloading import get_act_offloading_ctx_manager
class ActivationOffloadingMixin(Trainer):
"""
Trainer mixin class for activation checkpointing w offloading
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if self.args.activation_offloading:
self.activation_offload_context = get_act_offloading_ctx_manager(
self.model, use_streams=True
)
else:
self.activation_offload_context = contextlib.nullcontext()
def training_step(self, *args, **kwargs):
with self.activation_offload_context:
return super().training_step(*args, **kwargs)
def ac_wrap_hf_model(model: nn.Module, **kwargs):
auto_wrap_policy = ModuleWrapPolicy(set((GradientCheckpointingLayer,)))
apply_activation_checkpointing(model, auto_wrap_policy=auto_wrap_policy, **kwargs)

View File

@@ -1,9 +1,12 @@
"""Module for TRL RL trainers"""
"""Module for TRL PPO trainer"""
import torch
from tqdm import tqdm
from trl import (
CPOTrainer,
KTOTrainer,
ORPOTrainer,
PPOTrainer,
PRMTrainer,
RewardTrainer,
)
@@ -13,6 +16,64 @@ from axolotl.core.trainers.mixins.optimizer import OptimizerInitMixin, Optimizer
from axolotl.core.trainers.mixins.scheduler import SchedulerMixin
class TRLPPOTrainer(PPOTrainer):
"""Wrapper for TRL PPO trainer to handle customizations"""
tag_names = ["axolotl", "ppo"]
def train(
self,
reward_pipe,
resume_from_checkpoint=None, # pylint: disable=unused-argument
):
generation_kwargs = {
"min_length": -1,
"top_k": 0.0,
"top_p": 1.0,
"do_sample": True,
"pad_token_id": self.tokenizer.eos_token_id,
"max_new_tokens": 32,
}
sent_kwargs = {
"return_all_scores": True,
"function_to_apply": "none",
"batch_size": 16,
}
for _, batch in tqdm(enumerate(self.dataloader)):
query_tensors = batch["input_ids"]
# generate model response
response_tensors, ref_response_tensors = self.generate(
query_tensors,
return_prompt=False,
generate_ref_response=True,
**generation_kwargs,
)
batch["response"] = self.tokenizer.batch_decode(response_tensors)
batch["ref_response"] = self.tokenizer.batch_decode(ref_response_tensors)
# Compute sentiment score
texts = [q + r for q, r in zip(batch["query"], batch["response"])]
pipe_outputs = reward_pipe(texts, **sent_kwargs)
rewards = [torch.tensor(output[1]["score"]) for output in pipe_outputs]
ref_texts = [q + r for q, r in zip(batch["query"], batch["ref_response"])]
ref_pipe_outputs = reward_pipe(ref_texts, **sent_kwargs)
ref_rewards = [
torch.tensor(output[1]["score"]) for output in ref_pipe_outputs
]
batch["ref_rewards"] = ref_rewards
# Run PPO step
stats = self.step(query_tensors, response_tensors, rewards)
self.log_stats(
stats,
batch,
rewards,
columns_to_log=["query", "response", "ref_response", "ref_rewards"],
)
class AxolotlORPOTrainer(
RngLoaderMixin, SchedulerMixin, OptimizerMixin, OptimizerInitMixin, ORPOTrainer
):

View File

@@ -217,11 +217,6 @@ class AxolotlTrainingMixins:
},
)
activation_offloading: bool | None = field(
default=None,
metadata={"help": "Use activation offloading with CUDA streams for training."},
)
# multi-modal section
image_size: int | tuple[int, int] | None = field(

View File

@@ -6,21 +6,15 @@ from typing import Optional, Union, Unpack
import torch
from transformers import Cache
from transformers.modeling_flash_attention_utils import FlashAttentionKwargs
from transformers.modeling_outputs import CausalLMOutputWithPast
from transformers.utils import LossKwargs
try:
from transformers.modeling_flash_attention_utils import FlashAttentionKwargs
from transformers.utils import LossKwargs
class TransformersKwargs(FlashAttentionKwargs, LossKwargs):
"""
placeholder kwargs for hf model classes
"""
except ImportError:
from transformers.utils.generic import ( # type: ignore[no-redef]
TransformersKwargs,
)
class KwargsForCausalLM(FlashAttentionKwargs, LossKwargs):
"""
placeholder kwargs for hf model classes
"""
def kldiv_forward_llama_like(
@@ -39,7 +33,7 @@ def kldiv_forward_llama_like(
output_hidden_states: Optional[bool] = None,
cache_position: Optional[torch.LongTensor] = None,
logits_to_keep: Union[int, torch.Tensor] = 0, # pylint: disable=unused-argument
**kwargs: Unpack[TransformersKwargs], # type: ignore[misc]
**kwargs: Unpack[KwargsForCausalLM], # type: ignore[misc]
) -> CausalLMOutputWithPast:
# pylint: disable=duplicate-code
output_attentions = (

View File

@@ -122,9 +122,9 @@ def load_lora(
rank = int(os.environ.get("LOCAL_RANK", 0))
if (
cfg.fsdp_config
cfg.fsdp
and cfg.adapter
and cfg.fsdp_config.cpu_ram_efficient_loading
and cfg.fsdp_config.fsdp_cpu_ram_efficient_loading
and rank != 0
):
setup_quantized_meta_for_peft(model)
@@ -152,9 +152,9 @@ def load_lora(
"Exception caught during model.print_trainable_parameters(): %s", exc
)
elif (
cfg.fsdp_config
cfg.fsdp
and cfg.adapter
and cfg.fsdp_config.cpu_ram_efficient_loading
and cfg.fsdp_config.fsdp_cpu_ram_efficient_loading
and rank != 0
):
setup_quantized_peft_meta_for_training(model)

View File

@@ -140,15 +140,10 @@ class ModelLoader:
"""Check if flash attention is installed."""
return find_spec("flash_attn") is not None
@property
def is_fsdp_enabled(self):
"""Property that determines if FSDP is enabled."""
return self.cfg.fsdp_config is not None or self.cfg.fsdp is not None
@property
def is_qlora_and_fsdp_enabled(self):
@cached_property
def qlora_fsdp(self):
"""Property that determines if FSDP with QLoRA is enabled."""
return self.is_fsdp_enabled and self.cfg.adapter == "qlora"
return self.cfg.fsdp and self.cfg.adapter == "qlora"
def load(self) -> tuple[PreTrainedModel | PeftModelForCausalLM, PeftConfig | None]:
"""Load and prepare the model with all configurations and patches.
@@ -194,25 +189,15 @@ class ModelLoader:
# Handle PeftModel if needed
if (
isinstance(self.model, (peft.PeftModel, peft.PeftModelForCausalLM))
and not self.is_qlora_and_fsdp_enabled
and not self.qlora_fsdp
):
self.model = self.model.merge_and_unload()
self._apply_activation_checkpointing()
self._resize_token_embeddings()
self._adjust_model_config()
self._log_memory_usage()
self._configure_embedding_dtypes()
self._configure_qat()
log_gpu_memory_usage(LOG, "Memory usage after model load", 0)
def _apply_activation_checkpointing(self):
if self.cfg.activation_offloading is True:
from axolotl.core.trainers.mixins.activation_checkpointing import (
ac_wrap_hf_model,
)
# ^^ importing this at the module level breaks plugins
ac_wrap_hf_model(self.model)
def _resize_token_embeddings(self):
"""Resize token embeddings if needed."""
@@ -266,13 +251,22 @@ class ModelLoader:
):
self.model.config.eos_token_id = self.tokenizer.eos_token_id
def _log_memory_usage(self):
"""Log device memory usage after model load."""
if hasattr(self.model, "device") and self.model.device.type in (
"cuda",
"mps",
"npu",
):
log_gpu_memory_usage(LOG, "after model load", self.model.device)
def _configure_embedding_dtypes(self):
"""Configure embedding module dtypes."""
# Get embedding modules
embedding_modules = get_linear_embedding_layers(self.cfg.model_config_type)
# Initial dtype conversion
if not self.is_fsdp_enabled:
if not self.cfg.fsdp:
# We don't run this during FSDP because this will leave mixed and bfloat16
# dtypes in the model which FSDP doesn't like
if self.cfg.load_in_4bit and self.cfg.embeddings_skip_upcast:
@@ -288,7 +282,7 @@ class ModelLoader:
self._set_z3_leaf_modules()
# Apply gradient checkpointing if needed
needs_fa2_dtype = self.cfg.adapter or self.is_fsdp_enabled
needs_fa2_dtype = self.cfg.adapter or self.cfg.fsdp
if self.cfg.adapter in ["lora", "qlora"]:
needs_fa2_dtype = True
if self.cfg.gradient_checkpointing:
@@ -304,12 +298,10 @@ class ModelLoader:
# we need to convert them back to fp16/bf16 for flash-attn compatibility.
(
(needs_fa2_dtype or self.cfg.flash_attention or self.cfg.flex_attention)
and not self.is_qlora_and_fsdp_enabled
)
or (
# CCE requires embedding layers to be in fp16/bf16 for backward pass
self.cfg.cut_cross_entropy
and not self.qlora_fsdp
)
# CCE requires embedding layers to be in fp16/bf16 for backward pass
or self.cfg.cut_cross_entropy
)
if should_convert:
@@ -365,6 +357,7 @@ class ModelLoader:
and not (self.cfg.rl and self.cfg.load_in_4bit)
and not skip_move_to_device
):
# TODO: validate this conditional
self.model.to(f"{str(get_device_type())}:{self.cfg.local_rank}")
if get_device_count() > 1 and int(os.getenv("WORLD_SIZE", "1")) == 1:
@@ -437,17 +430,7 @@ class ModelLoader:
self.model_kwargs["torch_dtype"] = self.cfg.torch_dtype
is_ds_zero3 = is_deepspeed_zero3_enabled()
# FSDP requires control over device placement, so don't set device_map when FSDP is enabled
if self.is_fsdp_enabled:
# For QLoRA + FSDP, we still need to set device_map to "auto" for proper initialization
if self.is_qlora_and_fsdp_enabled:
self.model_kwargs["device_map"] = {
"": int(os.environ.get("LOCAL_RANK", 0))
}
# For other FSDP cases, don't set device_map at all
elif not is_ds_zero3:
if not is_deepspeed_zero3_enabled():
self.model_kwargs["device_map"] = device_map
cur_device = get_device_type()
@@ -516,7 +499,7 @@ class ModelLoader:
"bnb_4bit_quant_storage": torch.bfloat16,
}
if self.cfg.model_config_type in ["jamba", "qwen2_moe"] and not (
self.cfg.deepspeed or self.is_fsdp_enabled
self.cfg.deepspeed or self.cfg.fsdp
):
# for some reason, this causes the loss to be off by an order of magnitude
# but deepspeed needs this still in bfloat16
@@ -621,21 +604,9 @@ class ModelLoader:
def _build_model(self) -> bool:
"""Load model, with load strategy depending on config."""
skip_move_to_device = False
if self.is_fsdp_enabled:
if self.cfg.fsdp_config.cpu_ram_efficient_loading:
skip_move_to_device = True
# Don't delete device_map for QLoRA + FSDP - it was set correctly in _set_device_map
if (
"device_map" in self.model_kwargs
and not self.is_qlora_and_fsdp_enabled
):
del self.model_kwargs["device_map"]
elif self.is_qlora_and_fsdp_enabled:
skip_move_to_device = True
if (
self.is_qlora_and_fsdp_enabled
and self.cfg.fsdp_config.cpu_ram_efficient_loading
self.qlora_fsdp
and self.cfg.fsdp_config.fsdp_cpu_ram_efficient_loading
and (
self.cfg.model_config_type == "dbrx"
or self.cfg.qlora_sharded_model_loading
@@ -661,6 +632,12 @@ class ModelLoader:
and not self.cfg.trust_remote_code
and not self.cfg.gptq
):
# TODO: Do we need to open this up for all models?
if self.cfg.fsdp and self.cfg.fsdp_config.fsdp_cpu_ram_efficient_loading:
skip_move_to_device = True
if "device_map" in self.model_kwargs:
del self.model_kwargs["device_map"]
# Please don't remove underscore binding without reading the fn docstring.
_ = self._configure_zero3_memory_efficient_loading()
@@ -714,22 +691,33 @@ class ModelLoader:
trust_remote_code=self.cfg.trust_remote_code or False,
**self.model_kwargs,
)
elif self.cfg.gptq:
self.model = self.auto_model_loader.from_pretrained(
self.base_model,
config=self.model_config,
trust_remote_code=self.cfg.trust_remote_code or False,
**self.model_kwargs,
)
else:
# Please don't remove underscore binding without reading the fn docstring.
_ = self._configure_zero3_memory_efficient_loading()
self.model = self.auto_model_loader.from_pretrained(
self.base_model,
config=self.model_config,
trust_remote_code=self.cfg.trust_remote_code or False,
**self.model_kwargs,
)
if self.cfg.gptq:
self.model = self.auto_model_loader.from_pretrained(
self.base_model,
config=self.model_config,
trust_remote_code=self.cfg.trust_remote_code or False,
**self.model_kwargs,
)
else:
if (
self.cfg.fsdp
and self.cfg.fsdp_config.fsdp_cpu_ram_efficient_loading
):
# disabling either of these two still leads to VRAM spike before setting back down
skip_move_to_device = True
if "device_map" in self.model_kwargs:
del self.model_kwargs["device_map"]
# Please don't remove underscore binding without reading the fn docstring.
_ = self._configure_zero3_memory_efficient_loading()
self.model = self.auto_model_loader.from_pretrained(
self.base_model,
config=self.model_config,
trust_remote_code=self.cfg.trust_remote_code or False,
**self.model_kwargs,
)
if is_deepspeed_zero3_enabled():
skip_move_to_device = True
@@ -765,8 +753,8 @@ class ModelLoader:
skip_prepare_model_for_kbit_training = True
if (
self.is_qlora_and_fsdp_enabled
or (self.is_fsdp_enabled and self.cfg.fsdp_config.cpu_ram_efficient_loading)
self.qlora_fsdp
or (self.cfg.fsdp and self.cfg.fsdp_config.fsdp_cpu_ram_efficient_loading)
or is_deepspeed_zero3_enabled()
):
# Make sure everything is in the same dtype

View File

@@ -7,6 +7,7 @@ import importlib.util
from functools import cached_property
import addict
import torch
import transformers
from transformers import PretrainedConfig, PreTrainedModel
@@ -93,14 +94,10 @@ class PatchManager:
def _apply_fsdp_patches(self):
"""Apply patches for FSDP configurations."""
if self.cfg.fsdp_config and str(self.cfg.fsdp_version) == "2":
if self.cfg.fsdp_config and str(self.cfg.fsdp_config.fsdp_version) == "2":
from axolotl.monkeypatch.accelerate.fsdp2 import patch_accelerate_fsdp2
patch_accelerate_fsdp2()
if self.cfg.rl:
from axolotl.monkeypatch.trainer.trl import patch_trl_prepare_fsdp2
patch_trl_prepare_fsdp2()
# if self.cfg.fsdp_config:
# # see transformers#39152
@@ -167,19 +164,28 @@ class PatchManager:
def _apply_gradient_checkpointing_patches(self):
"""Apply patches for gradient checkpointing."""
if (
self.cfg.gradient_checkpointing
and self.cfg.activation_offloading == "legacy"
):
if self.cfg.gradient_checkpointing in ["unsloth", "offload"]:
from axolotl.monkeypatch.gradient_checkpointing import (
CheckpointFunctionWithCPUOffload,
hf_grad_checkpoint_offload_wrapper,
)
transformers.modeling_utils.checkpoint = hf_grad_checkpoint_offload_wrapper
elif (
self.cfg.gradient_checkpointing
and self.cfg.activation_offloading == "offload_disk"
):
if (
self.cfg.gradient_checkpointing_kwargs
and "use_reentrant" in self.cfg.gradient_checkpointing_kwargs
and self.cfg.gradient_checkpointing_kwargs["use_reentrant"] is False
):
transformers.modeling_utils.checkpoint = (
hf_grad_checkpoint_offload_wrapper
)
else:
transformers.modeling_utils.checkpoint.CheckpointFunction = (
CheckpointFunctionWithCPUOffload
)
torch.utils.checkpoint.CheckpointFunction = (
CheckpointFunctionWithCPUOffload
)
if self.cfg.gradient_checkpointing == "offload_disk":
from axolotl.monkeypatch.gradient_checkpointing import (
hf_grad_checkpoint_disk_offload_wrapper,
)

View File

@@ -195,11 +195,9 @@ def ensure_dtype(model: PreTrainedModel, dtype: torch.dtype = torch.bfloat16):
bias_mismatch = module.bias.dtype != dtype
if weight_mismatch:
LOG.debug(
f"Converting module {name}.weight: {module.weight.dtype} -> {dtype}"
)
print(f"Converting module {name}.weight: {module.weight.dtype} -> {dtype}")
if bias_mismatch:
LOG.debug(f"Converting module {name}.bias: {module.bias.dtype} -> {dtype}")
print(f"Converting module {name}.bias: {module.bias.dtype} -> {dtype}")
if weight_mismatch or bias_mismatch:
module.to(dtype)

View File

@@ -2,65 +2,102 @@
monkeypatch for accelerate fsdp2 fix when modifying ordereddict during interation, and saving full state dicts
"""
import copy
import functools
import sys
import torch
from torch import nn
from axolotl.utils.bench import log_gpu_memory_usage
from axolotl.utils.logging import get_logger
LOG = get_logger(__name__)
def fsdp2_load_full_state_dict(
_accelerator, model: torch.nn.Module, full_sd: dict, offload_to_cpu: bool = False
):
def fsdp2_load_full_state_dict(accelerator, model: torch.nn.Module, full_sd: dict):
"""
Loads the full state dict (could be only on rank 0) into the sharded model. This is done by broadcasting the
parameters from rank 0 to all other ranks. This function modifies the model in-place.
Args:
accelerator (`Accelerator`): The accelerator instance
model (`torch.nn.Module`):
The model to load the state dict into, expected to be on meta device or a VRAM spike can occur
full_sd (`dict`): The full state dict to load, can only be on rank 0
"""
import torch.distributed as dist
from torch.distributed.tensor import distribute_tensor
LOG.info("Broadcasting full state dict to all ranks...")
import time
start_time = time.time()
# Model was previously copied to meta device
meta_sharded_sd = model.state_dict()
sharded_sd = {}
for param_name, full_tensor in full_sd.items():
sharded_meta_param = meta_sharded_sd.get(param_name)
full_tensor = full_tensor.to(sharded_meta_param.dtype).to(torch.device("cuda"))
if hasattr(sharded_meta_param, "device_mesh"):
sharded_param = distribute_tensor(
full_tensor,
sharded_meta_param.device_mesh,
sharded_meta_param.placements,
src_data_rank=0,
# Rank 0 distributes the full state dict to other ranks
def _infer_parameter_dtype(model, param_name, empty_param):
try:
old_param = model.get_parameter_or_buffer(param_name)
except AttributeError:
# Need this for LORA, as there some params are not *parameters* of sorts
base_param_name, local_param_name = param_name.rsplit(".", 1)
submodule = model.get_submodule(base_param_name)
old_param = getattr(submodule, local_param_name)
is_torch_e4m3fn_available = hasattr(torch, "float8_e4m3fn")
casting_dtype = None
is_param_float8_e4m3fn = (
is_torch_e4m3fn_available and empty_param.dtype == torch.float8_e4m3fn
)
if empty_param.dtype.is_floating_point and not is_param_float8_e4m3fn:
casting_dtype = old_param.dtype
return old_param is not None and old_param.is_contiguous(), casting_dtype
def _cast_and_contiguous(tensor, to_contiguous, dtype):
if dtype is not None:
tensor = tensor.to(dtype=dtype)
if to_contiguous:
tensor = tensor.contiguous()
return tensor
param_names = sorted(meta_sharded_sd.keys())
for param_name in param_names:
mesh = meta_sharded_sd[param_name].device_mesh
if accelerator.is_main_process:
full_param = full_sd[param_name].detach().cuda()
dist.broadcast(full_param, src=0, group=mesh.get_group())
sharded_tensor = distribute_tensor(
full_param, mesh, sharded_sd[param_name].placements
)
to_contiguous, casting_dtype = _infer_parameter_dtype(
model,
param_name,
full_param,
)
sharded_tensor = _cast_and_contiguous(
sharded_tensor, to_contiguous, casting_dtype
)
sharded_sd[param_name] = sharded_tensor
else:
sharded_param = full_tensor
full_tensor = torch.empty(
sharded_sd[param_name].size(),
device="cuda",
dtype=sharded_sd[param_name].dtype,
)
dist.broadcast(full_tensor, src=0, group=mesh.get_group())
sharded_tensor = distribute_tensor(
full_tensor, mesh, sharded_sd[param_name].placements
)
to_contiguous, casting_dtype = _infer_parameter_dtype(
model,
param_name,
full_tensor,
)
sharded_tensor = _cast_and_contiguous(
sharded_tensor, to_contiguous, casting_dtype
)
sharded_sd[param_name] = sharded_tensor
if offload_to_cpu:
sharded_param = sharded_param.cpu()
sharded_sd[param_name] = nn.Parameter(sharded_param)
del full_tensor
full_sd[param_name] = None
model.load_state_dict(sharded_sd, assign=True, strict=True)
end_time = time.time()
LOG.debug(
f"Time taken to load full state dict: {(end_time - start_time):.2f} seconds"
)
log_gpu_memory_usage(LOG, "Memory usage after broadcasting full state dict", 0)
# we set `assign=True` because our params are on meta device
model.load_state_dict(sharded_sd, assign=True)
return model
@@ -154,195 +191,17 @@ def get_state_dict(self, model, unwrap=True):
return state_dict
def _process_lora_module_for_fsdp(module, fsdp2_kwargs):
"""Helper function to process LoRA modules for FSDP2."""
from torch.distributed.fsdp import fully_shard
log_bias_dtype_mismatch = False
# Linear4Bit will keep it's bias term in fp32. If the weight dtype is in bf16 we are not able to
# wrap this. Therefore we must ensure the bias has the same dtype as the weight
if module.base_layer.bias is not None:
if module.base_layer.weight.dtype != module.base_layer.bias.dtype:
log_bias_dtype_mismatch = True
module.base_layer.bias.data = module.base_layer.bias.data.to(
module.base_layer.weight.dtype
)
for active_adapter in module.active_adapters:
if module.lora_A:
fully_shard(module.lora_A[active_adapter], **fsdp2_kwargs)
if module.lora_B:
fully_shard(module.lora_B[active_adapter], **fsdp2_kwargs)
if module.lora_embedding_A:
fully_shard(module.lora_embedding_A[active_adapter], **fsdp2_kwargs)
if module.lora_embedding_B:
fully_shard(module.lora_embedding_B[active_adapter], **fsdp2_kwargs)
if module.lora_magnitude_vector:
fully_shard(module.lora_magnitude_vector[active_adapter], **fsdp2_kwargs)
return log_bias_dtype_mismatch
def fsdp2_prepare_model(accelerator, model: torch.nn.Module) -> torch.nn.Module:
"""Prepares the model for FSDP2 in-place. Also returns the model to avoid misuse of the original model.
Args:
accelerator (`Accelerator`): The accelerator instance
model (`torch.nn.Module`): The model to prepare
Returns:
`torch.nn.Module`: Prepared model
"""
from accelerate.utils import get_module_children_bottom_up, is_compiled_module
from accelerate.utils.fsdp_utils import fsdp2_prepare_auto_wrap_policy
from accelerate.utils.modeling import get_non_persistent_buffers
from peft import PeftModel
from peft.tuners.lora import LoraLayer
from torch.distributed.fsdp import (
CPUOffloadPolicy,
FSDPModule,
MixedPrecisionPolicy,
fully_shard,
)
is_type_fsdp = isinstance(model, FSDPModule) or (
is_compiled_module(model)
and isinstance(model._orig_mod, FSDPModule) # pylint: disable=protected-access
)
if is_type_fsdp:
return model
fsdp2_plugin = accelerator.state.fsdp_plugin
original_sd = model.state_dict()
from torch.distributed.fsdp.wrap import (
size_based_auto_wrap_policy,
transformer_auto_wrap_policy,
)
# We need the `auto_wrap_policy` original type to create a custom poilicy function for sharding
# This is because `fully_shard` doesn't support old auto wrap policies, rather we have to imitate the behaviour
if fsdp2_plugin.auto_wrap_policy is transformer_auto_wrap_policy:
pass # auto_wrap_policy_type = "transformer"
elif fsdp2_plugin.auto_wrap_policy is size_based_auto_wrap_policy:
pass # auto_wrap_policy_type = "size"
# We set `auto_wrap_policy` to `functools.partial` to avoid creating it again
# This is because of `apply_activation_checkpointing` which will can reuse this function
fsdp2_plugin.set_auto_wrap_policy(model)
if fsdp2_plugin.activation_checkpointing:
from torch.distributed.algorithms._checkpoint.checkpoint_wrapper import (
CheckpointImpl,
apply_activation_checkpointing,
checkpoint_wrapper,
)
# Apply activation checkpointing before applying `fully_shard`
apply_activation_checkpointing(
model,
checkpoint_wrapper_fn=functools.partial(
checkpoint_wrapper,
checkpoint_impl=CheckpointImpl.NO_REENTRANT,
),
auto_wrap_policy=fsdp2_plugin.auto_wrap_policy,
)
fsdp2_kwargs = {
"reshard_after_forward": fsdp2_plugin.reshard_after_forward,
"offload_policy": fsdp2_plugin.cpu_offload,
# `fully_shard` doesn't accept `None` in case of `MixedPrecisionPolicy`
"mp_policy": fsdp2_plugin.mixed_precision_policy or MixedPrecisionPolicy(),
}
model_has_params4bit = False
for _, param in model.named_parameters():
# this is a temporary fix whereby loading models with bnb params cannot be moved from
# GPU to a meta device due with FSDP2 because torch operations don't return the original class type
# bypassing the move to meta will still cause the VRAM spike, but at least it still will load
if param.__class__.__name__ == "Params4bit":
model_has_params4bit = True
break
if fsdp2_plugin.cpu_ram_efficient_loading and not model_has_params4bit:
# Context: `fully_shard` moves the model to GPU if it was on CPU, however it can also be on `meta` and then it stays there even after `fully_shard`
# For this reason, we need to move the model to `meta` device, as then sharding happens on `meta` device
# If we kept the model on CPU (`cpu_ram_efficient_loading` has model be on CPU on all ranks, though non-main ranks only have `torch.emtpy`), `fully_shard` would move it to GPU
# Afterwards, when we call `fsdp2_load_full_state_dict`, us creating the state_dict would result into briefly having two copies of model state_dict on the GPU -> VRAM spike
# We need to keep the original non-persistent buffers, as those MAY not be in the state_dict, resulting in them staying on meta device
# Also, these buffers aren't getting sharded by default
# We get the FQNs of all non-persistent buffers, to re-register them after
non_persistent_buffer_fqns = get_non_persistent_buffers(
model, recurse=True, fqns=True
)
original_non_persistent_buffers = copy.deepcopy(
{k: v for k, v in model.named_buffers() if k in non_persistent_buffer_fqns}
)
# We move the model to meta device, as then sharding happens on meta device
model = model.to(torch.device("meta"))
# We need to re-tie the weights, not exactly sure why, but if we don't do this, reference to `lm_head/embed_tokens` stay hanging -> more VRAM usage
# We assume `transformers` models have a `tie_weights` method if they support it
if hasattr(model, "tie_weights"):
model.tie_weights()
is_peft_model = isinstance(model, PeftModel)
auto_wrap_policy = fsdp2_prepare_auto_wrap_policy(fsdp2_plugin, model)
log_bias_dtype_mismatch = False
if auto_wrap_policy is not None:
for module in get_module_children_bottom_up(model)[:-1]:
if is_peft_model and isinstance(module, LoraLayer):
module_log_bias_mismatch = _process_lora_module_for_fsdp(
module, fsdp2_kwargs
)
log_bias_dtype_mismatch |= module_log_bias_mismatch
if auto_wrap_policy(module) and not isinstance(module, FSDPModule):
fully_shard(module, **fsdp2_kwargs)
fully_shard(model, **fsdp2_kwargs)
if log_bias_dtype_mismatch:
LOG.warning(
"Bias dtype mismatch detected in LoRA base linear layer. Bias parameters have been cast to weight dtype."
)
if fsdp2_plugin.cpu_ram_efficient_loading:
offload_to_cpu = isinstance(fsdp2_plugin.cpu_offload, CPUOffloadPolicy)
fsdp2_load_full_state_dict(
accelerator, model, original_sd, offload_to_cpu=offload_to_cpu
)
if fsdp2_plugin.cpu_ram_efficient_loading and not model_has_params4bit:
# We re-register the buffers, as they may not be in the state_dict
for fqn, buffer_tensor in original_non_persistent_buffers.items():
buffer_tensor = buffer_tensor.to(accelerator.device)
if "." in fqn:
parent_fqn, local_buffer_name = fqn.rsplit(".", 1)
parent_module = model.get_submodule(parent_fqn)
else:
local_buffer_name = fqn
parent_module = model
parent_module.register_buffer(
local_buffer_name, buffer_tensor, persistent=False
)
# We need to tie the weights again, as call to `load_full_state_dict` breaks the tie
# Needs to be called both here and above
# removing this call makes the have slightly different loss
# removing the call above leads to extra memory usage as explained in the comment above
if hasattr(model, "tie_weights"):
model.tie_weights()
return model
def patch_accelerate_fsdp2():
import accelerate
from accelerate.utils import fsdp_utils
fsdp_utils.fsdp2_load_full_state_dict = fsdp2_load_full_state_dict
setattr(
sys.modules["accelerate.utils.fsdp_utils"],
"fsdp2_load_full_state_dict",
fsdp2_load_full_state_dict,
)
accelerate.accelerator.fsdp2_prepare_model = fsdp2_prepare_model
accelerate.Accelerator.get_state_dict = get_state_dict
setattr(
sys.modules["accelerate"],

View File

@@ -6,10 +6,6 @@ from typing import Optional, Tuple, Union
import torch
import transformers
from axolotl.utils.logging import get_logger
LOG = get_logger(__name__)
def patch_flex_wrapper(**flex_attn_compile_kwargs):
# TODO remove this patch when transformers#37285 is merged and in a release
@@ -50,15 +46,10 @@ def patch_flex_wrapper(**flex_attn_compile_kwargs):
# cause errors. The suggested fix is to compile with "max-autotune-no-cudagraphs"
# see https://github.com/pytorch/pytorch/issues/146260 for training
self.training = training
LOG.info(
"Compiling flex attention with kwargs: %s. This may take a while...",
flex_attn_compile_kwargs,
)
self._compiled_flex_attention = torch.compile(
flex_attention,
**flex_attn_compile_kwargs,
)
LOG.info("Flex attention compiled successfully.")
self._is_flex_compiled = True
def __call__(self):

View File

@@ -6,6 +6,7 @@ from functools import partial
from packaging import version
from axolotl.monkeypatch.gradient_checkpointing.offload_cpu import ( # noqa: F401
CheckpointFunctionWithCPUOffload,
CPU_Offloaded_Gradient_Checkpointer,
)
from axolotl.monkeypatch.gradient_checkpointing.offload_disk import (

View File

@@ -14,11 +14,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import contextlib
import inspect
import torch
from packaging import version
from torch.utils.checkpoint import (
_get_autocast_kwargs,
_get_device_module,
_infer_device_type,
check_backward_validity,
detach_variable,
get_device_states,
set_device_states,
)
@@ -69,3 +76,153 @@ class CPU_Offloaded_Gradient_Checkpointer( # pylint: disable=invalid-name
) + (
None,
) * len(ctx.args)
# Copyright 2025 Snowflake Inc.
# SPDX-License-Identifier: Apache-2.0
# https://github.com/snowflakedb/ArcticTraining/blob/main/arctic_training/monkey_patches.py
class CheckpointFunctionWithCPUOffload(torch.autograd.Function):
"""
This is a torch/utils/checkpoint.py CheckpointFunction monkey patch that offloads the first tensor to cpu during forward and back to cuda during backward. This allows significant memory savings when using a very long seqlen. e.g. for llama 8b at 100k it's 24GB saved per gpu: `((100_000*4096)*2*32/2**30)`
In the case of a very long seqlen 100k+ the copying to/from cpu overhead is not big, because dense quadratic attention compute will dominate.
"""
@staticmethod
def forward(ctx, run_function, preserve_rng_state, *args):
check_backward_validity(args)
ctx.run_function = run_function
ctx.preserve_rng_state = preserve_rng_state
# Accommodates the (remote) possibility that autocast is enabled for cpu AND gpu.
ctx.device_type = _infer_device_type(*args)
ctx.device_autocast_kwargs, ctx.cpu_autocast_kwargs = _get_autocast_kwargs(
ctx.device_type
)
if preserve_rng_state:
ctx.fwd_cpu_state = torch.get_rng_state()
# Don't eagerly initialize the cuda context by accident.
# (If the user intends that the context is initialized later, within their
# run_function, we SHOULD actually stash the cuda state here. Unfortunately,
# we have no way to anticipate this will happen before we run the function.)
ctx.had_device_in_fwd = False
device_module = _get_device_module(ctx.device_type)
if getattr(device_module, "_initialized", False):
ctx.had_device_in_fwd = True
ctx.fwd_devices, ctx.fwd_device_states = get_device_states(*args)
# Save non-tensor inputs in ctx, keep a placeholder None for tensors
# to be filled out during the backward.
ctx.inputs = []
ctx.tensor_indices = []
tensor_inputs = []
# x = None
for i, arg in enumerate(args):
if torch.is_tensor(arg):
# cpu-offload
# we don't want the 2nd tensor - usually it's a shared 4D attn mask which is huge [seq,seq]
# upstream could accept a list of arg indices to offload
if i == 0:
# print(f"{arg.shape=}")
ctx.x_device = arg.device
ctx.x_requires_grad = arg.requires_grad
t = arg.detach().cpu()
else:
t = arg
tensor_inputs.append(t)
ctx.tensor_indices.append(i)
ctx.inputs.append(None)
else:
ctx.inputs.append(arg)
ctx.save_for_backward(*tensor_inputs)
with torch.no_grad():
outputs = run_function(*args)
return outputs
@staticmethod
def backward(ctx, *args):
if (
not torch.autograd._is_checkpoint_valid() # pylint: disable=protected-access
):
raise RuntimeError(
"When use_reentrant=True, torch.utils.checkpoint is incompatible"
" with .grad() or passing an `inputs` parameter to .backward()."
" To resolve this error, you can either set use_reentrant=False,"
" or call .backward() without passing the `inputs` argument."
)
# Copy the list to avoid modifying original list.
inputs = list(ctx.inputs)
tensor_indices = ctx.tensor_indices
tensors = ctx.saved_tensors
# Fill in inputs with appropriate saved tensors.
for i, idx in enumerate(tensor_indices):
if i == 0:
t = (
tensors[i]
.to(ctx.x_device)
.detach()
.requires_grad_(ctx.x_requires_grad)
)
else:
t = tensors[i]
inputs[idx] = t
# Stash the surrounding rng state, and mimic the state that was
# present at this time during forward. Restore the surrounding state
# when we're done.
rng_devices = []
if ctx.preserve_rng_state and ctx.had_device_in_fwd:
rng_devices = ctx.fwd_devices
with torch.random.fork_rng(
devices=rng_devices,
enabled=ctx.preserve_rng_state,
device_type=ctx.device_type,
):
if ctx.preserve_rng_state:
torch.set_rng_state(ctx.fwd_cpu_state)
if ctx.had_device_in_fwd:
if has_device_type:
# newer pytorch (as early as 2.7)
set_device_states(
ctx.fwd_devices,
ctx.fwd_device_states,
device_type=ctx.device_type,
)
else:
# older pytorch (at least 2.4)
set_device_states(ctx.fwd_devices, ctx.fwd_device_states)
detached_inputs = detach_variable(tuple(inputs))
device_autocast_ctx = (
torch.amp.autocast(
device_type=ctx.device_type, **ctx.device_autocast_kwargs
)
if torch.amp.is_autocast_available(ctx.device_type)
else contextlib.nullcontext()
)
with torch.enable_grad(), device_autocast_ctx, torch.amp.autocast("cpu", **ctx.cpu_autocast_kwargs): # type: ignore[attr-defined]
outputs = ctx.run_function(*detached_inputs)
if isinstance(outputs, torch.Tensor):
outputs = (outputs,)
# run backward() with only tensor that requires grad
outputs_with_grad = []
args_with_grad = []
for i in range(len(outputs)): # pylint: disable=consider-using-enumerate
if torch.is_tensor(outputs[i]) and outputs[i].requires_grad:
outputs_with_grad.append(outputs[i])
args_with_grad.append(args[i])
if len(outputs_with_grad) == 0:
raise RuntimeError(
"none of output has requires_grad=True, this checkpoint() is not necessary"
)
torch.autograd.backward(outputs_with_grad, args_with_grad)
grads = tuple(
inp.grad if isinstance(inp, torch.Tensor) else None
for inp in detached_inputs
)
return (None, None) + grads

View File

@@ -1,13 +0,0 @@
"""Monkeypatch for TRL trainer FSDP preparation."""
def prepare_fsdp(model, accelerator):
from axolotl.monkeypatch.accelerate.fsdp2 import fsdp2_prepare_model
return fsdp2_prepare_model(accelerator, model)
def patch_trl_prepare_fsdp2():
import trl.models.utils
trl.models.utils.prepare_fsdp = prepare_fsdp

View File

@@ -379,22 +379,6 @@ class ChatTemplateStrategy(PromptTokenizingStrategy):
Public method that can handle either a single prompt or a batch of prompts.
"""
def _remove_none_values(obj):
"""
Remove null from a dictionary-like obj or list.
These can appear due to Dataset loading causing schema merge.
See https://github.com/axolotl-ai-cloud/axolotl/pull/2909
"""
if hasattr(obj, "items"):
return {
k: _remove_none_values(v) for k, v in obj.items() if v is not None
}
if isinstance(obj, list):
return [_remove_none_values(elem) for elem in obj]
return obj
prompt = _remove_none_values(prompt)
if not self.is_prompt_batched(prompt) or not self.supports_batched:
return self._tokenize_single_prompt(prompt)

View File

@@ -33,7 +33,7 @@ def default(cfg, dataset_idx=0, **kwargs): # pylint: disable=unused-argument
system=sample[field_system], prompt=sample[field_prompt]
)
else:
sample["prompt"] = prompt_format.format(prompt=sample[field_prompt])
sample["prompt"] = prompt_format.format(prompt=sample["prompt"])
sample["chosen"] = chosen_format.format(chosen=sample[field_chosen])
sample["rejected"] = rejected_format.format(rejected=sample[field_rejected])
return sample

View File

@@ -15,6 +15,7 @@ from typing import Any, Dict
import torch
import transformers.modelcard
from accelerate.utils import save_fsdp_model
from datasets import Dataset
from huggingface_hub.errors import OfflineModeIsEnabled
from peft import PeftConfig, PeftModel
@@ -67,7 +68,7 @@ def setup_model_and_tokenizer(
`None`), and processor (if multimodal, else `None`).
"""
# Load tokenizer
LOG.debug(f"Loading tokenizer... {cfg.tokenizer_config or cfg.base_model_config}")
LOG.debug(f"loading tokenizer... {cfg.tokenizer_config or cfg.base_model_config}")
tokenizer = load_tokenizer(cfg)
# Load processor for multimodal models if needed
@@ -75,8 +76,11 @@ def setup_model_and_tokenizer(
if cfg.is_multimodal:
processor = load_processor(cfg, tokenizer)
# Load the model
LOG.debug("Loading model")
# Load the model and peft_config
msg = "loading model"
if cfg.adapter:
msg += " and peft_config..."
LOG.debug(msg)
model_loader = ModelLoader(cfg, tokenizer, processor=processor)
model, peft_config = model_loader.load()
@@ -224,9 +228,6 @@ def execute_training(
# torch.set_default_dtype(torch.bfloat16)
trainer.train(resume_from_checkpoint=resume_from_checkpoint)
plugin_manager = PluginManager.get_instance()
plugin_manager.post_train(cfg, trainer.model)
def save_trained_model(
cfg: DictDefault,
@@ -263,6 +264,15 @@ def save_trained_model(
"QAT modules have been converted for PTQ. Please ensure you quantize "
"your model weights with `axolotl quantize`."
)
# Handle FSDP state dict type
state_dict_type = "FULL_STATE_DICT"
if trainer.is_fsdp_enabled and str(cfg.fsdp_config.fsdp_version) != "2":
if cfg.fsdp_final_state_dict_type:
state_dict_type = cfg.fsdp_final_state_dict_type
trainer.accelerator.state.fsdp_plugin.set_state_dict_type(state_dict_type)
LOG.info(f"Set FSDP state dict type to {state_dict_type} for saving.")
# Handle ReLoRA early return case
if cfg.relora_steps:
if cfg.adapter == "lora" and not (cfg.load_in_4bit or cfg.load_in_8bit):
@@ -271,19 +281,22 @@ def save_trained_model(
# final model weights have already been saved by `ReLoRACallback.on_train_end`
return
if trainer.is_fsdp_enabled:
if cfg.fsdp_config or cfg.fsdp:
if cfg.fsdp_config.final_state_dict_type:
state_dict_type = cfg.fsdp_config.final_state_dict_type
else:
state_dict_type = cfg.fsdp_config.state_dict_type
trainer.accelerator.state.fsdp_plugin.set_state_dict_type(state_dict_type)
trainer.save_model(cfg.output_dir)
if state_dict_type == "SHARDED_STATE_DICT":
LOG.info(
"The final model was saved with a sharded state dict. Please ensure you merge "
"the sharded weights with `merge-sharded-fsdp-weights`."
if cfg.fsdp:
# TODO: do we need this fix? https://huggingface.co/docs/accelerate/usage_guides/fsdp#saving-and-loading
# only save on rank 0, otherwise it corrupts output on multi-GPU when multiple
# processes attempt to write the same file
if (
state_dict_type == "SHARDED_STATE_DICT"
and cfg.fsdp_config.fsdp_state_dict_type == "SHARDED_STATE_DICT"
):
save_fsdp_model(
trainer.accelerator.state.fsdp_plugin,
trainer.accelerator,
trainer.model,
cfg.output_dir,
)
elif state_dict_type == "FULL_STATE_DICT":
trainer.save_model(cfg.output_dir)
elif cfg.deepspeed and is_deepspeed_zero3_enabled():
# Copied over from: https://github.com/huggingface/accelerate/blob/5ae611118057232f441055f7ef9ba0b0f2b8d533/docs/source/usage_guides/deepspeed.md#saving-and-loading
trainer.accelerator.wait_for_everyone()
@@ -513,9 +526,6 @@ def setup_model_and_trainer(cfg: DictDefault, dataset_meta: TrainDatasetMeta) ->
peft_config=peft_config,
)
plugin_manager = PluginManager.get_instance()
plugin_manager.post_trainer_create(cfg, trainer)
return (
trainer,
model,
@@ -547,6 +557,9 @@ def train(
processor,
) = setup_model_and_trainer(cfg, dataset_meta)
plugin_manager = PluginManager.get_instance()
plugin_manager.post_trainer_create(cfg, trainer)
# Handle untrained tokens if configured
safe_serialization = cfg.save_safetensors is True
train_dataset = dataset_meta.train_dataset
@@ -569,4 +582,6 @@ def train(
if not cfg.use_ray:
cleanup_distributed()
plugin_manager.post_train(cfg, model)
return model, tokenizer, trainer

View File

@@ -1,7 +1,6 @@
"""Benchmarking and measurement utilities"""
import functools
import logging
import torch
from transformers.utils.import_utils import is_torch_npu_available
@@ -92,27 +91,21 @@ def gpu_memory_usage_smi(device=0):
return 0.0
def log_gpu_memory_usage(
log: logging.Logger | logging.LoggerAdapter,
msg: str = "",
device: int | torch.device = 0,
):
cur_device_type = str(get_device_type())
def log_gpu_memory_usage(log, msg, device):
cur_device = get_device_type()
if torch.backends.mps.is_available():
usage, cache, misc = mps_memory_usage_all()
elif "npu" in cur_device_type and is_torch_npu_available():
elif "npu" in str(cur_device) and is_torch_npu_available():
usage, cache, misc = npu_memory_usage_all(device)
elif "gpu" in cur_device_type and torch.cuda.is_available():
usage, cache, misc = gpu_memory_usage_all(device)
else:
return
usage, cache, misc = gpu_memory_usage_all(device)
extras = []
if cache > 0:
extras.append(f"+{cache:.03f}GB cache")
if misc > 0:
extras.append(f"+{misc:.03f}GB misc")
msg = f"{cur_device_type} memory usage:" if not msg else msg
log.info(
f"{msg} {usage:.03f}GB ({', '.join(extras)})",
f"{str(cur_device)} memory usage {msg}: {usage:.03f}GB ({', '.join(extras)})",
stacklevel=2,
)
return usage, cache, misc

View File

@@ -841,35 +841,21 @@ class SaveAxolotlConfigtoWandBCallback(TrainerCallback):
class GCCallback(TrainerCallback):
"""Callback to garbage collect torch cache"""
def __init__(self, gc_steps: int | None = -1):
self.gc_steps: int = gc_steps or -1
self.next_gc_on_begin_step: int = -1
def _gc(self):
torch.cuda.empty_cache()
gc.collect()
def on_step_begin(
self, args, state, control, **kwargs # pylint: disable=unused-argument
):
if self.next_gc_on_begin_step == state.global_step:
self._gc()
def __init__(self, gc_steps=None):
self.gc_steps = gc_steps
def on_step_end(
self, args, state, control, **kwargs # pylint: disable=unused-argument
):
if control.should_evaluate:
# automatically GC before evals so the eval memory spike from the CEL doesn't OOM the trainer
self._gc()
# also GC on the start of the next step after the eval
self.next_gc_on_begin_step = state.global_step + 1
elif self.gc_steps > 0 and state.global_step % self.gc_steps == 0:
self._gc()
if self.gc_steps > 0 and state.global_step % self.gc_steps == 0:
torch.cuda.empty_cache()
gc.collect()
def on_epoch_end(
self, args, state, control, **kwargs # pylint: disable=unused-argument
):
self._gc()
torch.cuda.empty_cache()
gc.collect()
def colab_inference_post_train_callback(trainer: Trainer):

View File

@@ -19,27 +19,9 @@ class PytorchProfilerCallback(TrainerCallback):
PyTorch Profiler callback to create snapshots of GPU memory usage at specified steps.
"""
def __init__(self, steps_to_profile: int = 5, profiler_steps_start: int = 0):
# steps are 0 indexed, so to start at 0-th step, we start at beginning of first step,
# and finish at end of last step, so 5 steps_to_profile is steps [0, 1, 2, 3, 4]
self.profiler_steps_end = profiler_steps_start + steps_to_profile - 1
if profiler_steps_start == 0:
# start recording memory allocations before everything is allocated, because if we start
# at the beginning of step 0, we won't have any memory allocations in the traces
torch.cuda.memory._record_memory_history( # pylint: disable=protected-access
enabled="all"
)
profiler_steps_start = -1
self.profiler_steps_start = profiler_steps_start
def on_step_begin( # pylint: disable=unused-argument
self,
args: TrainingArguments, # pylint: disable=unused-argument
state: TrainerState,
control: TrainerControl, # pylint: disable=unused-argument
**kwargs, # pylint: disable=unused-argument
):
if state.global_step == self.profiler_steps_start:
def __init__(self, steps_to_profile: int = 5):
self.steps_to_profile = steps_to_profile
if self.steps_to_profile:
torch.cuda.memory._record_memory_history( # pylint: disable=protected-access
enabled="all"
)
@@ -51,28 +33,7 @@ class PytorchProfilerCallback(TrainerCallback):
control: TrainerControl, # pylint: disable=unused-argument
**kwargs, # pylint: disable=unused-argument
):
if state.global_step == self.profiler_steps_end:
snapshot = torch.cuda.memory._snapshot() # pylint: disable=protected-access
with open(Path(args.output_dir) / "snapshot.pickle", "wb") as fout:
dump(snapshot, fout)
# tell CUDA to stop recording memory allocations now
torch.cuda.memory._record_memory_history( # pylint: disable=protected-access
enabled=None
)
def on_train_end( # pylint: disable=unused-argument
self,
args: TrainingArguments, # pylint: disable=unused-argument
state: TrainerState,
control: TrainerControl, # pylint: disable=unused-argument
**kwargs, # pylint: disable=unused-argument
):
# make sure to record if we happen to have more steps than steps to profile
if (
state.global_step >= self.profiler_steps_start
and state.global_step < self.profiler_steps_end
):
if state.global_step == self.steps_to_profile:
snapshot = torch.cuda.memory._snapshot() # pylint: disable=protected-access
with open(Path(args.output_dir) / "snapshot.pickle", "wb") as fout:
dump(snapshot, fout)

View File

@@ -116,10 +116,9 @@ def normalize_config(cfg):
]
choose_device(cfg)
cfg.ddp = cfg.ddp if cfg.ddp is not None else cfg.world_size != 1
if cfg.world_size != 1:
if cfg.ddp:
cfg.device_map = {"": int(os.environ.get("LOCAL_RANK", 0))}
if cfg.fsdp or cfg.fsdp_config or cfg.ddp:
cfg.batch_size = cfg.batch_size * cfg.world_size
cfg.batch_size = cfg.batch_size * cfg.world_size
if not cfg.use_ray:
# delay resolving dtype until on worker node when launching with ray
@@ -275,7 +274,7 @@ def validate_config(
# Convert datasets to proper format if needed
if cfg.get("datasets"):
for idx, ds_cfg in enumerate(cfg["datasets"]):
if cfg.get("rl") in ["dpo", "simpo"] and not isinstance(ds_cfg, DPODataset):
if cfg.get("rl") == "dpo" and not isinstance(ds_cfg, DPODataset):
cfg["datasets"][idx] = DPODataset(**ds_cfg)
elif cfg.get("rl") == "kto" and not isinstance(ds_cfg, KTODataset):
cfg["datasets"][idx] = KTODataset(**dict(ds_cfg))

View File

@@ -497,131 +497,3 @@ class HFMistralTokenizer:
return [
self._mistral.instruct_tokenizer.tokenizer.id_to_piece(id) for id in ids
]
def __call__(
self,
text: str | list[str],
add_special_tokens: bool = True,
padding: bool | str = False,
truncation: bool = False,
max_length: int | None = None,
return_tensors: str | None = None,
**kwargs,
) -> dict[str, list[int] | np.ndarray | Tensor]:
"""
Tokenize text and return a dictionary with input_ids and attention_mask.
Args:
text: Input text string or list of strings to tokenize.
add_special_tokens: Whether to add special tokens (BOS/EOS).
padding: Whether to pad sequences. Can be True, False, "longest", or "max_length".
truncation: Whether to truncate sequences to max_length.
max_length: Maximum sequence length for truncation/padding.
return_tensors: Return format ("pt" for PyTorch, "np" for NumPy, None for lists).
Returns:
Dictionary with "input_ids" and "attention_mask" keys.
"""
# if kwargs passed, raise error
if kwargs:
raise ValueError(
f"Unsupported kwargs: {kwargs}. Please create an issue on GitHub."
)
# `np` can work with inhomogeneous shapes but let's not support it until needed.
if (
isinstance(text, list)
and len(text) > 1
and return_tensors in ("pt", "np")
and padding is False
and truncation is False
):
raise ValueError(
"return_tensors='pt' or 'np' requires padding or truncation."
)
# Handle single string input
if isinstance(text, str):
text = [text]
# Encode all texts
# TODO: figure out how to parallelize this
batch_input_ids = []
for single_text in text:
input_ids = self.encode(single_text, add_special_tokens=add_special_tokens)
# Handle truncation
if truncation and max_length is not None and len(input_ids) > max_length:
input_ids = input_ids[:max_length]
batch_input_ids.append(input_ids)
# Create attention masks (1 for real tokens, 0 for padding)
attention_masks = [[1] * len(input_ids) for input_ids in batch_input_ids]
# Handle padding
if padding in (True, "longest"):
# Pad to longest sequence in batch
max_len = max(len(input_ids) for input_ids in batch_input_ids)
for i, input_ids in enumerate(batch_input_ids):
pad_length = max_len - len(input_ids)
if pad_length > 0:
if self.padding_side == "right":
batch_input_ids[i] = (
input_ids + [self.pad_token_id] * pad_length
)
attention_masks[i] = attention_masks[i] + [0] * pad_length
else: # left padding
batch_input_ids[i] = [
self.pad_token_id
] * pad_length + input_ids
attention_masks[i] = [0] * pad_length + attention_masks[i]
elif padding == "max_length":
if max_length is None:
raise ValueError(
"max_length must be specified when padding='max_length'"
)
for i, input_ids in enumerate(batch_input_ids):
pad_length = max_length - len(input_ids)
if pad_length > 0:
if self.padding_side == "right":
batch_input_ids[i] = (
input_ids + [self.pad_token_id] * pad_length
)
attention_masks[i] = attention_masks[i] + [0] * pad_length
else: # left padding
batch_input_ids[i] = [
self.pad_token_id
] * pad_length + input_ids
attention_masks[i] = [0] * pad_length + attention_masks[i]
# Prepare result
result = {}
# Handle return tensor format
if return_tensors == "pt":
import torch
result["input_ids"] = torch.tensor(batch_input_ids, dtype=torch.long)
result["attention_mask"] = torch.tensor(attention_masks, dtype=torch.long)
elif return_tensors == "np":
result["input_ids"] = np.array(batch_input_ids, dtype=np.int64)
result["attention_mask"] = np.array(attention_masks, dtype=np.int64)
elif return_tensors is None:
result["input_ids"] = batch_input_ids
result["attention_mask"] = attention_masks
else:
raise ValueError(
f"Unsupported return_tensors='{return_tensors}'. "
"Only 'pt' and 'np' are supported."
)
# If single input, return single sequences (not batched)
if len(text) == 1 and return_tensors is None:
result["input_ids"] = result["input_ids"][0]
result["attention_mask"] = result["attention_mask"][0]
return result

Some files were not shown because too many files have changed in this diff Show More