Compare commits

..

13 Commits

Author SHA1 Message Date
Dan Saunders
64f349b7bb diffusion alt: custom loss impl 2025-08-18 20:50:34 +00:00
Dan Saunders
260ebe4c93 diffusion alt: custom loss impl 2025-08-18 20:50:20 +00:00
Dan Saunders
63d2280999 nits 2025-08-18 19:17:24 +00:00
Dan Saunders
b210db2d15 fixes 2025-08-18 19:09:09 +00:00
Dan Saunders
556a69118f sample generation, tests fixes 2025-08-18 18:25:04 +00:00
Dan Saunders
8569675b26 Merge branch 'main' into diffusion 2025-08-18 10:07:55 -04:00
Dan Saunders
077b5a4358 cleanup; tests draft 2025-08-16 02:44:44 +00:00
Dan Saunders
234b7b3126 nits 2025-08-16 00:14:44 +00:00
Dan Saunders
e19be0c2d9 add back in reinit_weights (clobbered?); masking / pretrain fixes 2025-08-15 02:21:25 +00:00
Dan Saunders
479a454ae3 fixes + improvements 2025-08-14 16:11:37 -04:00
Dan Saunders
0a9341acde nits 2025-08-14 01:53:24 -04:00
Dan Saunders
d8b63804bc cleanup 2025-08-14 01:51:13 -04:00
Dan Saunders
3156c605d4 diffusion training plugin 2025-08-14 01:48:22 -04:00
47 changed files with 1925 additions and 995 deletions

View File

@@ -12,6 +12,5 @@ reviews:
auto_review:
enabled: true
drafts: false
auto_incremental_review: true
chat:
auto_reply: true

View File

@@ -41,12 +41,6 @@ model, and final model output, you may need at least 3TB of free disk space to k
axolotl train examples/gpt-oss/gpt-oss-120b-fft-fsdp2-offload.yaml
```
To simplify fine-tuning across 2 nodes × 8x H100 (80GB) GPUs, we've partnered with [Baseten](https://baseten.co) to showcase multi-node
training of the 120B model using Baseten Truss. You can read more about this recipe on
[Baseten's blog](https://www.baseten.co/blog/how-to-fine-tune-gpt-oss-120b-with-baseten-and-axolotl/). The recipe can
be found on their
[GitHub](https://github.com/basetenlabs/ml-cookbook/tree/main/examples/oss-gpt-120b-axolotl/training).
ERRATA: Transformers saves the model Architecture prefixed with `FSDP` which needs to be manually renamed in `config.json`.
See https://github.com/huggingface/transformers/pull/40207 for the status of this issue.
@@ -67,23 +61,9 @@ mv ./outputs/gpt-oss-out/merged/* ./outputs/gpt-oss-out/
### Inferencing your fine-tuned model
#### vLLM
GPT-OSS support in vLLM does not exist in a stable release yet. See https://x.com/MaziyarPanahi/status/1955741905515323425
for more information about using a special vllm-openai docker image for inferencing with vLLM.
Optionally, vLLM can be installed from nightly:
```bash
pip install --no-build-isolation --pre -U vllm --extra-index-url https://wheels.vllm.ai/nightly
```
and the vLLM server can be started with the following command (modify `--tensor-parallel-size 8` to match your environment):
```bash
vllm serve ./outputs/gpt-oss-out/ --served-model-name axolotl/gpt-oss-20b --host 0.0.0.0 --port 8888 --tensor-parallel-size 8
```
#### SGLang
SGLang has 0-day support in main, see https://github.com/sgl-project/sglang/issues/8833 for infomation on installing
SGLang from source. Once you've installed SGLang, run the following command to launch a SGLang server:

View File

@@ -44,7 +44,7 @@ bf16: true
tf32: true
flash_attention: true
attn_implementation: kernels-community/vllm-flash-attn3 # this is not needed if using flash_attn >= 2.8.3
attn_implementation: kernels-community/vllm-flash-attn3
gradient_checkpointing: true
activation_offloading: true

View File

@@ -40,7 +40,7 @@ bf16: true
tf32: true
flash_attention: true
attn_implementation: kernels-community/vllm-flash-attn3 # this is not needed if using flash_attn >= 2.8.3
attn_implementation: kernels-community/vllm-flash-attn3
gradient_checkpointing: true
activation_offloading: true

View File

@@ -15,7 +15,7 @@ datasets:
field_thinking: thinking
template_thinking_key: thinking
dataset_prepared_path: ./outputs/last_run_prepared
dataset_prepared_path: last_run_prepared
val_set_size: 0
output_dir: ./outputs/gpt-oss-out/
@@ -41,7 +41,7 @@ bf16: true
tf32: true
flash_attention: true
attn_implementation: kernels-community/vllm-flash-attn3 # this is not needed if using flash_attn >= 2.8.3
attn_implementation: kernels-community/vllm-flash-attn3
gradient_checkpointing: true
activation_offloading: true

View File

@@ -15,7 +15,7 @@ datasets:
field_thinking: thinking
template_thinking_key: thinking
dataset_prepared_path: ./outputs/last_run_prepared
dataset_prepared_path: last_run_prepared
val_set_size: 0
output_dir: ./outputs/gpt-oss-out/
@@ -40,7 +40,7 @@ bf16: true
tf32: true
flash_attention: true
attn_implementation: kernels-community/vllm-flash-attn3 # this is not needed if using flash_attn >= 2.8.3
attn_implementation: kernels-community/vllm-flash-attn3
gradient_checkpointing: true
activation_offloading: true

View File

@@ -53,7 +53,7 @@ bf16: true
tf32: true
flash_attention: true
attn_implementation: kernels-community/vllm-flash-attn3 # this is not needed if using flash_attn >= 2.8.3
attn_implementation: kernels-community/vllm-flash-attn3
gradient_checkpointing: true
activation_offloading: true

View File

@@ -0,0 +1,57 @@
base_model: meta-llama/Llama-3.2-1B
# Automatically upload checkpoint and final model to HF
# hub_model_id: username/custom_model_name
pretraining_dataset:
- path: wikitext
name: wikitext-103-raw-v1
type: completion
field: text
plugins:
- diffusion.DiffusionPlugin
noise_schedule: cosine
min_mask_ratio: 0.15
max_mask_ratio: 0.85
eps: 5e-4
importance_weighting: true
mask_token_id: 128002
generate_samples: true
generation_interval: 10
output_dir: ./outputs/model-out
sequence_len: 512
sample_packing: true
gradient_accumulation_steps: 8
micro_batch_size: 4
max_steps: 10000
optimizer: adamw_8bit
lr_scheduler: cosine
learning_rate: 3e-4
bf16: auto
tf32: true
gradient_checkpointing: true
resume_from_checkpoint:
logging_steps: 1
sdp_attention: true
warmup_steps: 1000
save_strategy: steps
save_steps: 1000
special_tokens:
pad_token: "<|end_of_text|>"
wandb_project:
wandb_entity:
wandb_watch:
wandb_name:
wandb_log_model:
# save_first_step: true # uncomment this to validate checkpoint saving works with your config

View File

@@ -0,0 +1,58 @@
base_model: meta-llama/Llama-3.2-1B
# Automatically upload checkpoint and final model to HF
# hub_model_id: username/custom_model_name
datasets:
- path: teknium/GPT4-LLM-Cleaned
type: alpaca
val_set_size: 0.05
plugins:
- diffusion.DiffusionPlugin
noise_schedule: cosine
min_mask_ratio: 0.1
max_mask_ratio: 0.9
num_diffusion_steps: 128
eps: 1e-3
importance_weighting: true
mask_token_id: 128002
output_dir: ./outputs/model-out
sequence_len: 512
sample_packing: true
eval_sample_packing: true
gradient_accumulation_steps: 4
micro_batch_size: 4
num_epochs: 1
optimizer: adamw_8bit
lr_scheduler: cosine
learning_rate: 1e-5
bf16: auto
tf32: true
gradient_checkpointing: true
resume_from_checkpoint:
logging_steps: 1
sdp_attention: true
warmup_steps: 1000
save_strategy: steps
eval_strategy: steps
save_steps: 500
eval_steps: 500
special_tokens:
pad_token: "<|end_of_text|>"
wandb_project:
wandb_entity:
wandb_watch:
wandb_name:
wandb_log_model:
# save_first_step: true # uncomment this to validate checkpoint saving works with your config

View File

@@ -13,8 +13,8 @@ liger-kernel==0.6.1
packaging==23.2
huggingface_hub>=0.33.0
peft>=0.17.0
transformers==4.55.3
peft==0.17.0
transformers==4.55.2
tokenizers>=0.21.1
accelerate==1.10.0
datasets==4.0.0

View File

@@ -118,9 +118,9 @@ def get_package_version():
extras_require = {
"flash-attn": ["flash-attn==2.8.3"],
"flash-attn": ["flash-attn==2.8.2"],
"ring-flash-attn": [
"flash-attn==2.8.3",
"flash-attn==2.8.2",
"ring-flash-attn>=0.1.7",
"yunchang==0.6.0",
],

View File

@@ -14,13 +14,9 @@ class PreprocessCliArgs:
prompter: Optional[str] = field(default=None)
download: Optional[bool] = field(default=True)
iterable: Optional[bool] = field(
default=False,
default=None,
metadata={
"help": (
"[DEPRECATED] No longer supported. For streaming datasets, use "
"'axolotl train' and set 'streaming: true' in your YAML config, or "
"pass --streaming instead in the CLI."
)
"help": "Use IterableDataset for streaming processing of large datasets"
},
)

View File

@@ -82,7 +82,7 @@ class ModalCloud(Cloud):
return res
def get_image(self):
docker_tag = "main-py3.11-cu126-2.7.1"
docker_tag = "main-py3.11-cu124-2.6.0"
if self.config.docker_tag:
docker_tag = self.config.docker_tag
docker_image = f"axolotlai/axolotl:{docker_tag}"
@@ -200,7 +200,7 @@ class ModalCloud(Cloud):
if family in ["a10", "a10g"]:
return modal.gpu.A10G(count=count)
if family == "h100":
return f"H100:{count}"
return modal.gpu.H100(count=count)
if family == "t4":
return modal.gpu.T4(count=count)
if family == "l4":

View File

@@ -64,7 +64,7 @@ def do_inference(
importlib.import_module("axolotl.prompters"), prompter
)
elif cfg.chat_template:
chat_template_str = get_chat_template(cfg.chat_template, tokenizer=tokenizer)
chat_template_str = get_chat_template(cfg.chat_template)
elif cfg.datasets[0].type == "chat_template":
chat_template_str = get_chat_template_from_config(
cfg=cfg, ds_cfg=cfg.datasets[0], tokenizer=tokenizer

View File

@@ -35,20 +35,10 @@ def do_preprocess(cfg: DictDefault, cli_args: PreprocessCliArgs) -> None:
check_accelerate_default_config()
check_user_token()
if cli_args.iterable:
LOG.error(
"The --iterable CLI argument for 'axolotl preprocess' is no longer "
"supported. For training, set 'streaming: true' in your YAML config or "
"pass '--streaming' in your 'axolotl train' command for on-the-fly "
"preprocessing."
)
return
for key in ["skip_prepare_dataset", "pretraining_dataset"]:
if cfg.get(key):
LOG.error(
f"You have set `{key}:`. `preprocess` is not needed. Run the 'axolotl "
"train' CLI directly instead."
f"You have set `{key}:`. `preprocess` is not needed. Run the `axolotl train` CLI directly instead."
)
return
@@ -107,8 +97,7 @@ def do_cli(
"""
# pylint: disable=duplicate-code
os.environ["AXOLOTL_IS_PREPROCESS"] = "1"
is_preprocess = kwargs.pop("is_preprocess", True)
parsed_cfg = load_cfg(config, is_preprocess=is_preprocess, **kwargs)
parsed_cfg = load_cfg(config, **kwargs)
parsed_cfg.is_preprocess = True
parser = transformers.HfArgumentParser(PreprocessCliArgs)
parsed_cli_args, _ = parser.parse_args_into_dataclasses(

View File

@@ -3,12 +3,11 @@
import random
from copy import deepcopy
from itertools import product
from typing import Any
def generate_sweep_configs(
base_config: dict[str, list], sweeps_config: dict[str, list]
) -> list[dict[str, Any]]:
) -> list[dict[str, list]]:
"""
Recursively generates all possible configurations by applying sweeps to the base config.

View File

@@ -4,7 +4,6 @@ import os
import subprocess # nosec
import sys
import tempfile
from pathlib import Path
from typing import Any, Iterator, Literal
import yaml
@@ -89,12 +88,7 @@ def generate_config_files(config: str, sweep: str | None) -> Iterator[tuple[str,
# Generate all possible configurations
permutations = generate_sweep_configs(base_config, sweep_config)
is_group = len(permutations) > 1
base_output_dir = base_config.get("output_dir", "./model-out")
for idx, permutation in enumerate(permutations, start=1):
permutation_dir = Path(permutation.get("output_dir", base_output_dir))
permutation_id = f"sweep{idx:04d}"
permutation["output_dir"] = str(permutation_dir / permutation_id)
for permutation in permutations:
# pylint: disable=consider-using-with
temp_file = tempfile.NamedTemporaryFile(
mode="w",

View File

@@ -55,11 +55,13 @@ def load_datasets(
"""
tokenizer = load_tokenizer(cfg)
processor = load_processor(cfg, tokenizer=tokenizer) if cfg.processor_type else None
preprocess_iterable = getattr(cli_args, "iterable", False)
train_dataset, eval_dataset, total_num_steps, prompters = prepare_datasets(
cfg,
tokenizer,
processor=processor,
preprocess_iterable=preprocess_iterable,
)
if (

View File

@@ -10,6 +10,7 @@ import transformers
from transformers import (
DataCollatorWithFlattening,
EarlyStoppingCallback,
Trainer,
)
from trl.trainer.utils import RewardDataCollatorWithPadding
@@ -385,10 +386,11 @@ class HFCausalTrainerBuilder(TrainerBuilderBase):
**data_collator_kwargs,
)
sig = inspect.signature(trainer_cls)
if "processing_class" in sig.parameters:
if "processing_class" in sig.parameters or issubclass(trainer_cls, Trainer):
trainer_kwargs["processing_class"] = self.tokenizer
elif "tokenizer" in sig.parameters:
trainer_kwargs["tokenizer"] = self.tokenizer
if (
trainer_cls not in [AxolotlRewardTrainer, AxolotlPRMTrainer]
and self.cfg.datasets is not None

View File

@@ -82,7 +82,9 @@ class AxolotlTrainer(
super().__init__(*_args, **kwargs)
self.train_data_collator = self.data_collator
self._stored_metrics = defaultdict(lambda: defaultdict(list))
self._stored_metrics = defaultdict(
lambda: defaultdict(lambda: {"values": [], "reduction": "mean"})
)
if self.args.orpo_alpha:
self.loss_fct = torch.nn.CrossEntropyLoss(reduction="none")
@@ -573,9 +575,26 @@ class AxolotlTrainer(
"""
# logs either has 'loss' or 'eval_loss'
train_eval = "train" if "loss" in logs else "eval"
# Add averaged stored metrics to logs
for key, metrics in self._stored_metrics[train_eval].items():
logs[key] = torch.tensor(metrics).mean().item()
# Add reduced stored metrics to logs
for key, metric_data in self._stored_metrics[train_eval].items():
values = torch.tensor(metric_data["values"])
reduction_type = metric_data["reduction"]
if reduction_type == "mean":
logs[key] = values.mean().item()
elif reduction_type == "min":
logs[key] = values.min().item()
elif reduction_type == "max":
logs[key] = values.max().item()
elif reduction_type == "sum":
logs[key] = values.sum().item()
else:
raise NotImplementedError(
"Metric reduction must be one of [mean, min, max, sum]"
)
logs[key] = round(logs[key], 4)
if is_main_process():
# Add memory usage
@@ -592,10 +611,27 @@ class AxolotlTrainer(
return super().log(logs, start_time)
def store_metrics(
self, metrics: dict[str, float], train_eval: Literal["train", "eval"] = "train"
self,
metrics: dict[str, float] | dict[str, tuple[int | float, str]],
train_eval: Literal["train", "eval"] = "train",
reduction: Literal["mean", "min", "max", "sum"] = "mean",
) -> None:
"""
Store metrics with specified reduction type.
Args:
metrics: Dictionary of metric names to values, or metric names to (value,
reduction_type) tuples.
train_eval: Whether this is for training or evaluation.
"""
for key, value in metrics.items():
self._stored_metrics[train_eval][key].append(value)
if isinstance(value, tuple):
metric_value, metric_reduction = value
else:
metric_value, metric_reduction = value, reduction
self._stored_metrics[train_eval][key]["values"].append(metric_value)
self._stored_metrics[train_eval][key]["reduction"] = metric_reduction
def _save_checkpoint(self, model, trial, **kwargs):
# make sure the checkpoint dir exists, since trainer is flakey

View File

@@ -1,19 +1,18 @@
"""
Module containing dataset functionality.
We want this to be a wrapper for an existing dataset that we have loaded. Lets use the
concept of middlewares to wrap each dataset. We'll use the collators later on to pad the
datasets.
"""
from typing import Any
"""Module containing Dataset functionality"""
import torch
from datasets import Dataset, IterableDataset
from axolotl.utils.logging import get_logger
from .prompt_tokenizers import PromptTokenizingStrategy
# We want this to be a wrapper for an existing dataset that we have loaded
# lets use the concept of middlewares to wrap each dataset, for example
# ConstantLengthDataset(ShuffledDataset([TokenizedPromptDataset(alpaca_dataset)]))
# let's check to ensure we don't truncate an item in the middle, we'll use
# the collators later on to pad the datasets
LOG = get_logger(__name__)
@@ -43,13 +42,10 @@ class TokenizedPromptDataset(Dataset):
**kwargs,
)
def process(self, dataset: Dataset | IterableDataset) -> Dataset | IterableDataset:
"""Apply filtering and tokenization."""
features = None
if not isinstance(dataset, IterableDataset):
features = dataset.features.keys()
def process(self, dataset):
features = dataset.features.keys()
map_kwargs: dict[str, Any] = {}
map_kwargs = {}
if self.prompt_tokenizer.supports_batched:
map_kwargs["batched"] = True
map_kwargs["batch_size"] = 1_000
@@ -58,28 +54,18 @@ class TokenizedPromptDataset(Dataset):
hasattr(self.prompt_tokenizer, "filter_rows")
and self.prompt_tokenizer.filter_rows
):
filter_kwargs: dict[str, Any] = {"desc": "Strategy Filtering Rows"}
if not isinstance(dataset, IterableDataset):
filter_kwargs["num_proc"] = self.process_count
dataset = dataset.filter(
self.prompt_tokenizer.filter_rows,
**filter_kwargs,
num_proc=self.process_count,
desc="Strategy Filtering Rows",
)
map_kwargs = {
**map_kwargs,
"desc": "Tokenizing Prompts",
}
# Only add remove_columns for regular datasets
if not isinstance(dataset, IterableDataset):
map_kwargs["remove_columns"] = features
map_kwargs["num_proc"] = self.process_count
map_kwargs["keep_in_memory"] = self.keep_in_memory
return dataset.map(
self.prompt_tokenizer.tokenize_prompt,
num_proc=self.process_count,
remove_columns=features,
keep_in_memory=self.keep_in_memory,
desc="Tokenizing Prompts",
**map_kwargs,
)
@@ -93,16 +79,140 @@ def wrap_dataset_for_tokenized_prompt(
map_kwargs = {}
if prompt_tokenizer.supports_batched:
map_kwargs["batched"] = True
# Map the dataset and remove original columns
# For IterableDataset, features might be None until first iteration
remove_columns = None
if dataset.features is not None:
remove_columns = list(dataset.features.keys())
features = list(dataset.features.keys())
return dataset.map(
prompt_tokenizer.tokenize_prompt,
remove_columns=remove_columns,
remove_columns=features,
**map_kwargs,
)
return TokenizedPromptDataset(prompt_tokenizer, dataset, **kwargs)
# TODO this isn't the best since it can't interleave datasets
class ConstantLengthDataset(IterableDataset):
"""Iterable dataset that returns constant length chunks of tokens from stream of
text files.
Args:
tokenizer: The processor used for processing the data.
dataset: Dataset with text files.
seq_length: Length of token sequences to return.
"""
def __init__( # pylint: disable=super-init-not-called
self,
tokenizer,
datasets,
seq_length=2048,
):
self.tokenizer = tokenizer
self.concat_token_id = tokenizer.eos_token_id
self.datasets: list[IterableDataset] = datasets
self.seq_length = seq_length
vocab_size = len(tokenizer.get_vocab())
if vocab_size <= torch.iinfo(torch.int16).max:
self.tokens_dtype = torch.int16
elif vocab_size <= torch.iinfo(torch.int32).max:
self.tokens_dtype = torch.int32
else:
self.tokens_dtype = torch.int64
def __iter__(self):
buffer = {
"input_ids": [],
"attention_mask": [],
"labels": [],
"position_ids": [],
}
buffer_len = 0
for dataset in self.datasets:
idx = 0
iterator = iter(dataset)
more_examples = True
while more_examples:
try:
example = next(iterator)
idx += 1
except StopIteration:
more_examples = False
example = None
add_concat_token = False
if example:
example_len = len(example["input_ids"])
add_concat_token = example["input_ids"][-1] != self.concat_token_id
else:
example_len = 0
if not example_len or (
buffer_len + int(add_concat_token) + example_len > self.seq_length
):
if buffer["input_ids"]:
input_ids = torch.cat(buffer["input_ids"], dim=-1)[
: self.seq_length
]
attention_mask = torch.cat(buffer["attention_mask"], dim=-1)[
: self.seq_length
]
position_ids = torch.cat(buffer["position_ids"], dim=-1)[
: self.seq_length
]
labels = torch.cat(buffer["labels"], dim=-1)[: self.seq_length]
if labels.size() == input_ids.size() and (
attention_mask.size() == input_ids.size()
):
yield {
"input_ids": input_ids,
"labels": labels,
"attention_mask": attention_mask,
"position_ids": position_ids,
}
else:
LOG.warning(
"Dropping batch due to tensor size mismatch "
f"input_ids: {input_ids.size()}, "
f"labels: {labels.size()}, "
f"attention_mask: {attention_mask.size()}"
)
buffer = {
"input_ids": [],
"attention_mask": [],
"labels": [],
"position_ids": [],
}
buffer_len = 0
idx = 1
if example:
# FIXME
# just going to drop data points that are too long
if len(example["input_ids"]) <= self.seq_length:
input_ids = example["input_ids"]
attention_mask = example["attention_mask"]
labels = example["labels"]
if add_concat_token:
input_ids.append(self.concat_token_id)
attention_mask.append(1)
labels.append(self.concat_token_id)
input_ids_with_concat = torch.tensor(
input_ids, dtype=self.tokens_dtype
)
attention_mask_with_concat = torch.tensor(
[idx * m for m in attention_mask], dtype=torch.int16
)
labels_with_concat = torch.tensor(
labels, dtype=self.tokens_dtype
)
position_ids = torch.arange(
len(input_ids), dtype=self.tokens_dtype
)
buffer["input_ids"].append(input_ids_with_concat)
buffer["attention_mask"].append(attention_mask_with_concat)
buffer["labels"].append(labels_with_concat)
buffer["position_ids"].append(position_ids)
buffer_len += len(input_ids)

View File

@@ -147,7 +147,7 @@ class BasePlugin:
"""
# pylint: disable=unused-argument
def get_trainer_cls(self, cfg: DictDefault) -> Trainer | None:
def get_trainer_cls(self, cfg: DictDefault) -> type[Trainer] | None:
"""Returns a custom class for the trainer.
Args:

View File

@@ -0,0 +1,125 @@
# Diffusion LM Training Plugin for Axolotl
This plugin enables diffusion language model training using the LLaDA (Large Language
And Diffusion Assistant) approach within the Axolotl framework.
## Overview
LLaDA is a diffusion-based approach to language model training that uses:
- **Random token masking** during training instead of next-token prediction
- **Bidirectional attention** to allow the model to see the full context
- **Importance weighting** based on masking probabilities for stable training
This approach can lead to more robust language models with better understanding of
bidirectional context.
## Installation
The plugin is included with Axolotl. To use it, simply add the plugin configuration to
your training config.
## Quickstart
### Basic Configuration
Add the following to your Axolotl configuration YAML:
```yaml
# Enable diffusion LM training plugin
plugins:
- axolotl.integrations.diffusion.DiffusionPlugin
# Diffusion-specific configuration
noise_schedule: linear # or "cosine"
min_mask_ratio: 0.1
max_mask_ratio: 0.9
num_diffusion_steps: 128
eps: 1e-3
importance_weighting: true
mask_token_id: 128002
# Sample generation (optional)
generate_samples: true
generation_interval: 100
num_generation_samples: 3
generation_steps: 128
generation_temperature: 0.0
generation_max_length: 100
# Model configuration
base_model: meta-llama/Llama-3.2-1B
model_type: llama
# Standard Axolotl configuration
datasets:
- path: your_dataset
...
# Other config
sequence_len: 1024
micro_batch_size: 8
gradient_accumulation_steps: 4
learning_rate: 3e-4
```
## Supported Models
Any models that support 4D attention masks should work out of the box. If not, please
create an [issue](https://github.com/axolotl-ai-cloud/axolotl/issues)!
## How It Works
### Random Masking
During training, tokens are randomly masked based on a sampled timestep:
- Sample timestep `t` uniformly from [0, 1]
- Calculate masking probability: `p = (1 - eps) * t + eps`
- Randomly mask tokens with probability `p`
### Bidirectional Attention
The plugin uses native 4D attention masks to:
- Enable bidirectional attention without patches
- Allow all tokens to attend to all other tokens
- Maintain proper padding masks
- Work with modern `transformers` models out of the box
### Diffusion Loss
Loss is computed only on masked tokens with (optional) importance weighting:
```python
loss = sum(cross_entropy(pred, target) / p_mask) / total_tokens
```
## Sample Generation
When `generate_samples: true`, the plugin generates samples during training:
```
Sample 1:
Original (45 tokens): The quick brown fox jumps over the lazy dog...
Masked (18/45 tokens, 40.0%): The [MASK] [MASK] fox [MASK] over [MASK] lazy [MASK]...
Generated: The quick brown fox jumps over the lazy dog...
```
Samples are logged to console and wandb (if enabled).
## Metrics and Monitoring
The plugin adds several metrics to track diffusion training:
- `train/loss`: Weighted diffusion loss
- `train/accuracy`: Accuracy on masked tokens
- `train/mask_ratio`: Average fraction of tokens masked
- `train/num_masked_tokens`: Number of tokens masked
- `train/avg_p_mask`: Average masking probability
- `train/ce_loss`: Unweighted cross-entropy loss
- `train/importance_weight_avg`: Average importance weight
## Limitations
- No flash attention support
## References
- [LLaDA Paper](https://arxiv.org/abs/2404.10406)
- [Axolotl Documentation](https://docs.axolotl.ai/)

View File

@@ -0,0 +1,6 @@
"""Diffusion LM training plugin init."""
from .args import DiffusionArgs
from .plugin import DiffusionPlugin
__all__ = ["DiffusionArgs", "DiffusionPlugin"]

View File

@@ -0,0 +1,70 @@
"""Config args for diffusion LM training."""
from typing import Literal
from pydantic import BaseModel, Field
class DiffusionArgs(BaseModel):
"""Arguments for diffusion LM training plugin."""
# Noise schedule config
noise_schedule: Literal["linear", "cosine"] = Field(
default="linear", description="Type of noise schedule for diffusion training"
)
min_mask_ratio: float = Field(
default=0.1,
ge=0.0,
le=1.0,
description="Minimum masking ratio for diffusion noise schedule",
)
max_mask_ratio: float = Field(
default=0.9,
ge=0.0,
le=1.0,
description="Maximum masking ratio for diffusion noise schedule",
)
num_diffusion_steps: int = Field(
default=128, ge=1, description="Number of diffusion timesteps"
)
eps: float = Field(
default=1e-3,
ge=0.0,
le=1.0,
description="Epsilon value for minimum masking probability in forward process",
)
# Training config
importance_weighting: bool = Field(
default=True,
description="Apply importance weighting to loss based on masking probability",
)
mask_token_id: int = Field(
default=128002,
description=(
"Token ID to use for masking. Default is 128002 "
"(<|reserved_special_token_0|> for Llama 3.2)"
),
)
# Sample generation config
generate_samples: bool = Field(
default=True, description="Enable sample generation during training"
)
generation_interval: int = Field(
default=100, ge=1, description="Generate samples every N steps"
)
num_generation_samples: int = Field(
default=3, ge=1, description="Number of samples to generate each time"
)
generation_steps: int = Field(
default=128, ge=1, description="Number of diffusion steps for generation"
)
generation_temperature: float = Field(
default=0.0,
ge=0.0,
description="Temperature for generation sampling (0.0 = deterministic)",
)
generation_max_length: int = Field(
default=100, ge=1, description="Maximum sequence length for generation"
)

View File

@@ -0,0 +1,113 @@
"""Callbacks for diffusion training."""
import wandb
from transformers.trainer_callback import TrainerCallback, TrainerControl, TrainerState
from transformers.training_args import TrainingArguments
from axolotl.utils.logging import get_logger
from .generation import generate_samples
LOG = get_logger(__name__)
class DiffusionGenerationCallback(TrainerCallback):
"""Callback for generating samples during diffusion training."""
def __init__(self, trainer):
self.trainer = trainer
# pylint: disable=unused-argument
def on_step_end(
self,
args: TrainingArguments,
state: TrainerState,
control: TrainerControl,
**kwargs,
):
"""Generate samples at specified intervals."""
if (
state.global_step > 0
and state.global_step % self.trainer.config.generation_interval == 0
):
# Use eval dataloader if available, otherwise use train dataloader
if (
hasattr(self.trainer, "eval_dataset")
and self.trainer.eval_dataset is not None
):
dataloader = self.trainer.callback_handler.eval_dataloader
else:
dataloader = self.trainer.callback_handler.train_dataloader
# Generate samples
samples = generate_samples(
model=self.trainer.model,
tokenizer=self.trainer.tokenizer,
dataloader=dataloader,
num_generation_samples=self.trainer.config.num_generation_samples,
max_length=self.trainer.config.generation_max_length,
num_diffusion_steps=self.trainer.config.generation_steps,
temperature=self.trainer.config.generation_temperature,
mask_token_id=self.trainer.config.mask_token_id,
)
# Log samples
self._log_samples(samples, state.global_step)
def _log_samples(self, samples: list, step: int):
"""Log generated samples."""
if not samples:
return
LOG.info("=" * 60)
LOG.info("GENERATED SAMPLES")
LOG.info("=" * 60)
for i, sample_data in enumerate(samples, 1):
original = sample_data["original"]
masked = sample_data["masked"]
generated = sample_data["generated"]
mask_ratio = sample_data["mask_ratio"]
masked_tokens = sample_data["masked_tokens"]
total_tokens = sample_data["total_tokens"]
LOG.info(f"\nSample {i}:")
LOG.info(f"\tOriginal ({total_tokens} tokens): {original}")
LOG.info(
f"\tMasked ({masked_tokens}/{total_tokens} tokens, "
f"{mask_ratio:.1%}): {masked}"
)
LOG.info(f"\tGenerated: {generated}")
LOG.info("=" * 60)
if self.trainer.config.use_wandb and self.trainer.state.is_world_process_zero:
if wandb.run is not None:
wandb.log(
{
"generated_samples": wandb.Table(
columns=[
"step",
"original",
"masked",
"generated",
"mask_ratio",
"masked_tokens",
"total_tokens",
],
data=[
[
step,
sample["original"],
sample["masked"],
sample["generated"],
f"{sample['mask_ratio']:.1%}",
sample["masked_tokens"],
sample["total_tokens"],
]
for sample in samples
],
)
},
step=step,
)

View File

@@ -0,0 +1,269 @@
"""Sample generation utilities for diffusion training."""
import logging
from typing import Any, List, Optional
import torch
logger = logging.getLogger(__name__)
def generate_samples(
model: torch.nn.Module,
tokenizer: Any,
dataloader: Optional[Any] = None,
num_generation_samples: int = 3,
max_length: int = 100,
num_diffusion_steps: int = 128,
temperature: float = 0.0,
mask_token_id: int = 32000,
) -> List[dict]:
"""
Generate text samples using the diffusion model by randomly masking sequences from
the given dataset and running the reverse diffusion process.
Args:
model: The wrapped or unwrapped model
tokenizer: Tokenizer for encoding/decoding
dataloader: Validation dataloader (for sampling sequences)
num_generation_samples: Number of samples to generate
max_length: Maximum length of sequences to use
num_diffusion_steps: Number of diffusion steps for generation
temperature: Temperature for sampling (0.0 = deterministic)
mask_token_id: Token ID used for masking
Returns:
List of dictionaries with original text, masked text, and generated text
"""
if dataloader is None:
logger.warning("No validation dataloader provided, cannot generate samples")
return []
# Get the actual model (unwrap if needed)
unwrapped_model = model.module if hasattr(model, "module") else model
unwrapped_model.eval()
generations = []
# Sample sequences from validation dataset
sampled_sequences = _sample_sequences_from_dataloader(
dataloader, num_generation_samples, max_length, unwrapped_model.device
)
logger.info(f"Sampled {len(sampled_sequences)} sequences from validation dataset")
# Generate samples using reverse diffusion process
with torch.no_grad():
for original_sequence in sampled_sequences:
generation_result = _generate(
unwrapped_model,
tokenizer,
original_sequence,
num_diffusion_steps,
temperature,
mask_token_id,
)
generations.append(generation_result)
unwrapped_model.train()
return generations
def _sample_sequences_from_dataloader(
dataloader: Any, num_samples: int, max_length: int, device: torch.device
) -> List[torch.Tensor]:
"""Sample sequences from validation dataloader."""
sampled_sequences = []
sample_count = 0
# Add randomness by skipping a random number of batches
skip_batches = torch.randint(0, 6, (1,)).item()
batch_count = 0
for batch in dataloader:
# Skip some batches for variety
if batch_count < skip_batches:
batch_count += 1
continue
if sample_count >= num_samples:
break
batch_count += 1
input_ids = batch["input_ids"]
attention_mask = batch.get("attention_mask")
# Randomly sample from sequences in this batch
batch_indices = torch.randperm(input_ids.size(0)).tolist()
for i in batch_indices:
if sample_count >= num_samples:
break
# Get actual sequence length (non-padded)
if attention_mask is not None:
seq_len = attention_mask[i].sum().item()
else:
seq_len = input_ids.size(1)
# Limit sequence length to max_length
actual_length = min(seq_len, max_length)
if actual_length < 10: # Skip very short sequences
continue
# Extract the sequence
sequence = input_ids[i][:actual_length].unsqueeze(0).to(device)
sampled_sequences.append(sequence)
sample_count += 1
return sampled_sequences
def _generate(
model: torch.nn.Module,
tokenizer: Any,
original_sequence: torch.Tensor,
num_diffusion_steps: int,
temperature: float,
mask_token_id: int,
) -> dict:
"""Generate a single sample using reverse diffusion."""
# Get original text for comparison
original_text = tokenizer.decode(
original_sequence[0].cpu(), skip_special_tokens=True
)
# Apply custom masking with random ratio (10% to 70%)
total_tokens = original_sequence.size(1)
min_ratio, max_ratio = 0.1, 0.7
target_mask_ratio = torch.rand(1).item() * (max_ratio - min_ratio) + min_ratio
target_masked_tokens = int(total_tokens * target_mask_ratio)
# Create random mask indices
mask_positions = torch.randperm(total_tokens)[:target_masked_tokens]
masked_indices = torch.zeros(
1, total_tokens, dtype=torch.bool, device=original_sequence.device
)
masked_indices[0, mask_positions] = True
# Create masked sequence
masked_sequence = original_sequence.clone()
masked_sequence[masked_indices] = mask_token_id
# Calculate actual mask ratio
masked_tokens = masked_indices.sum().item()
mask_ratio = masked_tokens / total_tokens
# Get masked text for comparison
masked_text = tokenizer.decode(masked_sequence[0].cpu(), skip_special_tokens=False)
# Clean up mask token representation
masked_text = _clean_masked_text(masked_text, tokenizer, mask_token_id)
# Run reverse diffusion process
sequence = masked_sequence.clone()
for step in range(num_diffusion_steps):
sequence = _diffusion_step(
model, sequence, step, num_diffusion_steps, temperature, mask_token_id
)
# Get final generated text
generated_text = tokenizer.decode(sequence[0].cpu(), skip_special_tokens=True)
return {
"original": original_text,
"masked": masked_text,
"generated": generated_text,
"mask_ratio": mask_ratio,
"masked_tokens": masked_tokens,
"total_tokens": total_tokens,
"formatted": (
f"Original: '{original_text}' → Masked: '{masked_text}' "
f"({mask_ratio:.1%}) → Generated: '{generated_text}'"
),
}
def _clean_masked_text(masked_text: str, tokenizer: Any, mask_token_id: int) -> str:
"""Clean up masked text for display."""
mask_token_repr = tokenizer.decode([mask_token_id], skip_special_tokens=False)
cleaned = masked_text.replace(mask_token_repr, "[MASK]")
if hasattr(tokenizer, "special_tokens_map"):
for token_value in tokenizer.special_tokens_map.values():
if token_value and isinstance(token_value, str):
cleaned = cleaned.replace(token_value, "")
cleaned = " ".join(cleaned.split()).strip()
return cleaned
def _diffusion_step(
model: torch.nn.Module,
sequence: torch.Tensor,
step: int,
num_diffusion_steps: int,
temperature: float,
mask_token_id: int,
) -> torch.Tensor:
"""Perform a single diffusion step with remasking."""
# Only process if there are masked tokens remaining
current_mask = sequence == mask_token_id
if not current_mask.any():
return sequence
# Create bidirectional attention mask for diffusion
batch_size, seq_len = sequence.shape
attention_mask = torch.ones(
batch_size, 1, seq_len, seq_len, dtype=torch.bool, device=sequence.device
)
# Forward pass
outputs = model(input_ids=sequence, attention_mask=attention_mask)
logits = outputs.logits
# Only sample at currently masked positions
if current_mask.any():
masked_logits = logits[current_mask]
# Apply temperature scaling
if temperature > 0:
scaled_logits = masked_logits / temperature
else:
scaled_logits = masked_logits
# Suppress mask token in outputs
scaled_logits[:, mask_token_id] = -float("inf")
# Sample predictions
if temperature > 0:
# Add Gumbel noise for sampling
gumbel_noise = -torch.log(
-torch.log(torch.rand_like(scaled_logits, dtype=torch.float32))
)
gumbel_logits = scaled_logits + gumbel_noise
predicted_tokens = torch.argmax(gumbel_logits, dim=-1)
else:
# Deterministic sampling when temperature is 0
predicted_tokens = torch.argmax(scaled_logits, dim=-1)
# Calculate probabilities for confidence scoring
probs = torch.softmax(scaled_logits, dim=-1)
predicted_token_probs = probs[range(len(predicted_tokens)), predicted_tokens]
# Determine how many tokens to unmask this step
remaining_masked = current_mask.sum().item()
if step == num_diffusion_steps - 1:
num_to_unmask = remaining_masked
else:
unmask_ratio = 1.0 / (num_diffusion_steps - step)
num_to_unmask = max(1, int(remaining_masked * unmask_ratio))
# Select highest confidence predictions to unmask
if num_to_unmask >= remaining_masked:
sequence[current_mask] = predicted_tokens
else:
_, top_indices = predicted_token_probs.topk(num_to_unmask)
mask_positions = torch.where(current_mask)[1]
positions_to_unmask = mask_positions[top_indices]
sequence[0, positions_to_unmask] = predicted_tokens[top_indices]
return sequence

View File

@@ -0,0 +1,115 @@
"""Diffusion LM loss function for integration with transformers LOSS_MAPPING."""
from typing import Optional
import torch
import torch.nn.functional as F
def ForDiffusionLMLoss(
logits: torch.Tensor,
labels: torch.Tensor,
vocab_size: int,
config: Optional[dict] = None,
inputs: Optional[dict] = None,
model: Optional[torch.nn.Module] = None,
**kwargs,
) -> torch.Tensor:
"""
Diffusion Language Modeling loss function.
This function computes cross-entropy loss only on masked tokens using
diffusion info stored by the model patch during forward pass.
Args:
logits: Model predictions [batch_size, seq_len, vocab_size]
labels: Ground truth tokens [batch_size, seq_len]
vocab_size: Size of vocabulary
config: Model configuration (contains diffusion parameters)
inputs: Input batch dictionary (contains input_ids, attention_mask)
model: The model instance (to access stored diffusion info)
**kwargs: Additional arguments
Returns:
loss: Computed diffusion loss
"""
# Get diffusion info stored by model patch
if model is None or not hasattr(model, "_diffusion_info"):
# Fallback to regular causal LM loss if no diffusion info
shift_logits = logits[..., :-1, :].contiguous()
shift_labels = labels[..., 1:].contiguous()
loss_fct = torch.nn.CrossEntropyLoss()
return loss_fct(
shift_logits.view(-1, shift_logits.size(-1)), shift_labels.view(-1)
)
diffusion_info = model._diffusion_info
original_input_ids = diffusion_info["original_input_ids"]
masked_indices = diffusion_info["masked_indices"]
p_mask = diffusion_info["p_mask"]
# Get diffusion config parameters
diffusion_config = getattr(config, "diffusion_config", {})
importance_weighting = diffusion_config.get("importance_weighting", True)
# Check if we have any masked tokens
if not masked_indices.any():
return torch.tensor(0.0, device=logits.device, requires_grad=True)
# Get predictions and targets for masked positions only
masked_logits = logits[masked_indices]
masked_targets = original_input_ids[masked_indices] # Original unmasked tokens
# Compute cross-entropy loss without reduction
token_loss = F.cross_entropy(
masked_logits.float(), masked_targets, reduction="none"
)
if importance_weighting:
# Apply importance weighting: 1 / p_mask
masked_p_mask = p_mask.expand_as(masked_indices)[masked_indices]
weighted_loss = token_loss / masked_p_mask
if labels is not None:
# For SFT data: normalize by answer length per sample
answer_mask = labels != -100
answer_lengths = answer_mask.sum(dim=1).float()
# Group losses by batch sample
batch_indices = torch.arange(
original_input_ids.shape[0], device=original_input_ids.device
)
batch_indices = batch_indices.unsqueeze(1).expand_as(masked_indices)
masked_batch_indices = batch_indices[masked_indices]
# Sum losses per sample and normalize by answer length
loss_per_sample = torch.zeros(
original_input_ids.shape[0], device=original_input_ids.device
)
for i in range(original_input_ids.shape[0]):
sample_mask = masked_batch_indices == i
if sample_mask.any():
sample_loss = weighted_loss[sample_mask].sum()
loss_per_sample[i] = sample_loss / max(answer_lengths[i], 1)
loss = loss_per_sample.mean()
else:
# For completion data: simple average
loss = weighted_loss.mean()
else:
# No importance weighting
loss = token_loss.mean()
return loss
def register_diffusion_loss():
"""Register the diffusion loss function in transformers LOSS_MAPPING."""
try:
from transformers.loss.loss_utils import LOSS_MAPPING
LOSS_MAPPING["ForDiffusionLM"] = ForDiffusionLMLoss
return True
except ImportError:
# Fallback for older transformers versions
return False

View File

@@ -0,0 +1,149 @@
"""Model patches for diffusion training."""
import torch
def patch_model_for_bidirectional_attention(model):
"""
Patch model to handle diffusion training with forward process and bidirectional
attention.
This monkey-patches the model's forward method to:
- Apply forward diffusion process (masking) during training
- Use bidirectional attention masks
- Store info for loss computation
"""
original_forward = model.forward
def diffusion_forward(
self,
input_ids: torch.Tensor | None = None,
attention_mask: torch.Tensor | None = None,
labels: torch.Tensor | None = None,
**kwargs,
):
# Check if this is diffusion training
if (
hasattr(self.config, "loss_type")
and self.config.loss_type == "ForDiffusionLM"
and self.training
):
# Store original input_ids for loss computation
original_input_ids = input_ids.clone()
# Apply forward diffusion process (masking)
diffusion_config = getattr(self.config, "diffusion_config", {})
noisy_input_ids, masked_indices, p_mask = _forward_process(
input_ids, attention_mask, labels, diffusion_config
)
# Use noisy input for model forward
input_ids = noisy_input_ids
# Convert attention mask to bidirectional
if attention_mask is not None:
attention_mask = _create_bidirectional_attention_mask(
input_ids, attention_mask
)
# Store diffusion info in the model for loss computation
self._diffusion_info = {
"original_input_ids": original_input_ids,
"masked_indices": masked_indices,
"p_mask": p_mask,
}
return original_forward(
input_ids=input_ids, attention_mask=attention_mask, labels=labels, **kwargs
)
# Replace the forward method
model.forward = diffusion_forward.__get__(model, model.__class__)
def _create_bidirectional_attention_mask(
input_ids: torch.Tensor, attention_mask: torch.Tensor
) -> torch.Tensor:
"""
Create bidirectional attention mask from 2D attention mask.
Args:
input_ids: Input token IDs [batch_size, seq_len]
attention_mask: 2D attention mask [batch_size, seq_len]
Returns:
bidirectional_mask: 4D attention mask [batch_size, 1, seq_len, seq_len]
"""
batch_size, seq_len = input_ids.shape
# Simple bidirectional mask - all tokens can attend to all valid tokens
# Expand 2D mask to 4D: [batch_size, seq_len] -> [batch_size, 1, seq_len, seq_len]
bidirectional_mask = attention_mask.unsqueeze(1).unsqueeze(2) # [B, 1, 1, S]
bidirectional_mask = bidirectional_mask.expand(batch_size, 1, seq_len, seq_len)
# Apply row-wise masking (padded tokens can't attend to anything)
row_mask = attention_mask.unsqueeze(1).unsqueeze(3) # [B, 1, S, 1]
bidirectional_mask = bidirectional_mask & row_mask
return bidirectional_mask
def _forward_process(
input_ids: torch.Tensor,
attention_mask: torch.Tensor | None = None,
labels: torch.Tensor | None = None,
diffusion_config: dict | None = None,
) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
"""
Apply forward diffusion process (random masking).
Args:
input_ids: Input token IDs [batch_size, seq_len]
attention_mask: Attention mask [batch_size, seq_len]
labels: Labels for SFT training [batch_size, seq_len]
diffusion_config: Diffusion configuration dict
Returns:
noisy_input_ids: Input with masked tokens
masked_indices: Boolean mask of which tokens were masked
p_mask: Masking probabilities used
"""
if diffusion_config is None:
diffusion_config = {}
batch_size, seq_len = input_ids.shape
device = input_ids.device
eps = diffusion_config.get("eps", 1e-3)
mask_token_id = diffusion_config.get("mask_token_id", 128002)
# Sample random timesteps for each sample
t = torch.rand(batch_size, device=device)
# Calculate masking probability with epsilon
p_mask = (1 - eps) * t + eps # [batch_size]
p_mask = p_mask.unsqueeze(1).expand(-1, seq_len) # [batch_size, seq_len]
# Don't mask padding tokens
if attention_mask is not None:
p_mask = p_mask * attention_mask.float()
# Create random mask based on p_mask
random_values = torch.rand_like(p_mask)
masked_indices = random_values < p_mask
# Apply attention mask constraints
if attention_mask is not None:
masked_indices = masked_indices & attention_mask.bool()
# For SFT data, only mask answer tokens (where labels != -100)
if labels is not None:
answer_mask = labels != -100
masked_indices = masked_indices & answer_mask
# Create noisy input by replacing masked tokens
noisy_input_ids = input_ids.clone()
noisy_input_ids[masked_indices] = mask_token_id
return noisy_input_ids, masked_indices, p_mask

View File

@@ -0,0 +1,96 @@
"""Diffusion LM training plugin for Axolotl."""
from peft import PeftModel
from transformers import PreTrainedModel
from axolotl.integrations.base import BasePlugin
from axolotl.utils.dict import DictDefault
from axolotl.utils.logging import get_logger
from .args import DiffusionArgs
from .callbacks import DiffusionGenerationCallback
from .loss import register_diffusion_loss
from .model_patch import patch_model_for_bidirectional_attention
LOG = get_logger(__name__)
class DiffusionPlugin(BasePlugin):
"""
Plugin for diffusion language model training.
This plugin enables diffusion-based training using the LLaDA approach, which uses
random masking and bidirectional attention to train language models.
"""
def __init__(self):
super().__init__()
self.cfg = None
if register_diffusion_loss():
LOG.info("Registered ForDiffusionLM loss function")
else:
LOG.warning(
"Failed to register diffusion loss - older transformers version"
)
def get_input_args(self) -> str:
"""Returns the pydantic model for LLaDA plugin arguments."""
return "axolotl.integrations.diffusion.DiffusionArgs"
def post_model_load(self, cfg: DictDefault, model: PreTrainedModel | PeftModel):
"""Configure model for diffusion training after loading."""
self.cfg = cfg
# Set loss type for diffusion training
if hasattr(model, "config"):
model.config.loss_type = "ForDiffusionLM"
# Store diffusion config in model config
model.config.diffusion_config = {
"eps": getattr(cfg, "eps", 1e-3),
"importance_weighting": getattr(cfg, "importance_weighting", True),
"mask_token_id": getattr(cfg, "mask_token_id", 128002),
}
LOG.info("Configured model for diffusion training with ForDiffusionLM loss")
# Patch model for bidirectional attention during training
patch_model_for_bidirectional_attention(model)
LOG.info("Applied bidirectional attention patch to model")
return model
def post_trainer_create(self, cfg: DictDefault, trainer):
"""Configure trainer after creation."""
# Create diffusion config from cfg
diffusion_config = DiffusionArgs(
noise_schedule=getattr(cfg, "noise_schedule", "linear"),
min_mask_ratio=getattr(cfg, "min_mask_ratio", 0.1),
max_mask_ratio=getattr(cfg, "max_mask_ratio", 0.9),
num_diffusion_steps=getattr(cfg, "num_diffusion_steps", 128),
eps=getattr(cfg, "eps", 1e-3),
importance_weighting=getattr(cfg, "importance_weighting", True),
mask_token_id=getattr(cfg, "mask_token_id", 128002),
generate_samples=getattr(cfg, "generate_samples", True),
generation_interval=getattr(cfg, "generation_interval", 100),
num_generation_samples=getattr(cfg, "num_generation_samples", 3),
generation_steps=getattr(cfg, "generation_steps", 128),
generation_temperature=getattr(cfg, "generation_temperature", 0.0),
generation_max_length=getattr(cfg, "generation_max_length", 100),
)
# Store diffusion config on trainer for callbacks to access
trainer.diffusion_config = diffusion_config
LOG.info("Stored diffusion config on trainer")
def add_callbacks_post_trainer(self, cfg: DictDefault, trainer):
"""Add diffusion generation callback if enabled."""
if (
hasattr(trainer, "diffusion_config")
and trainer.diffusion_config.generate_samples
):
generation_callback = DiffusionGenerationCallback(trainer)
LOG.info("Added diffusion generation callback")
return [generation_callback]
return []

View File

@@ -681,6 +681,23 @@ class ModelLoader:
return hf_ds_cfg
def _load_model_from_config(self) -> PreTrainedModel:
"""Load model with random initialization using from_config."""
if self.auto_model_loader in [AutoModelForCausalLM, AutoModelForVision2Seq]:
return self.auto_model_loader.from_config(config=self.model_config)
return self.auto_model_loader(config=self.model_config)
def _load_model_from_pretrained(self, model_loader_class=None) -> PreTrainedModel:
"""Load model from pretrained weights."""
loader = model_loader_class or self.auto_model_loader
kwargs = {
**self.model_kwargs,
"config": self.model_config,
"trust_remote_code": self.cfg.trust_remote_code or False,
**self.model_kwargs,
}
return loader.from_pretrained(self.base_model, **kwargs)
def _build_model(self) -> bool:
"""Load model, with load strategy depending on config."""
skip_move_to_device = False
@@ -695,7 +712,8 @@ class ModelLoader:
if self.is_fsdp_enabled:
if self.cfg.fsdp_config.cpu_ram_efficient_loading:
skip_move_to_device = True
# Don't delete device_map for QLoRA + FSDP - it was set correctly in _set_device_map
# Don't delete device_map for QLoRA + FSDP - it was set correctly in
# _set_device_map
if (
"device_map" in self.model_kwargs
and not self.is_qlora_and_fsdp_enabled
@@ -724,6 +742,11 @@ class ModelLoader:
or self.cfg.qlora_sharded_model_loading
)
):
if self.cfg.reinit_weights:
LOG.warning(
"reinit_weights is not supported with sharded quantized loading. "
"Loading from pretrained weights instead."
)
quant_storage = self.cfg.torch_dtype
quantization_config = getattr(
self.model_config, "quantization_config", None
@@ -739,33 +762,12 @@ class ModelLoader:
quantization_config=quantization_config,
)
skip_move_to_device = True
elif (
self.model_config.model_type in ["llama", "llama4"]
and not self.cfg.trust_remote_code
and not self.cfg.gptq
):
# Please don't remove underscore binding without reading the fn docstring.
_ = self._configure_zero3_memory_efficient_loading()
# Load model with random initialization if specified
if self.cfg.random_init_weights:
# AutoModel classes support the from_config method
if self.auto_model_loader in [
AutoModelForCausalLM,
AutoModelForVision2Seq,
]:
self.model = self.auto_model_loader.from_config(
config=self.model_config,
)
else:
self.model = self.auto_model_loader(config=self.model_config)
else:
self.model = self.auto_model_loader.from_pretrained(
self.base_model,
config=self.model_config,
**self.model_kwargs,
)
elif self.model_type == "MambaLMHeadModel":
if self.cfg.reinit_weights:
LOG.warning(
"reinit_weights is not supported with MambaLMHeadModel. "
"Loading from pretrained weights instead."
)
# FIXME this is janky at best and hacked together to make it work
MambaLMHeadModel = fix_mamba_attn_for_loss() # pylint: disable=invalid-name
@@ -778,41 +780,27 @@ class ModelLoader:
self.base_model,
**self.model_kwargs,
)
elif (
self.model_type
and self.model_type != "AutoModelForCausalLM"
and not self.cfg.trust_remote_code
):
if self.cfg.gptq:
self.model = self.auto_model_loader.from_pretrained(
self.base_model,
config=self.model_config,
trust_remote_code=self.cfg.trust_remote_code or False,
**self.model_kwargs,
)
else:
self.model = getattr(transformers, self.model_type).from_pretrained(
self.base_model,
config=self.model_config,
trust_remote_code=self.cfg.trust_remote_code or False,
**self.model_kwargs,
)
elif self.cfg.gptq:
self.model = self.auto_model_loader.from_pretrained(
self.base_model,
config=self.model_config,
trust_remote_code=self.cfg.trust_remote_code or False,
**self.model_kwargs,
)
else:
# Please don't remove underscore binding without reading the fn docstring.
# Please don't remove underscore binding without reading the fn docstring
_ = self._configure_zero3_memory_efficient_loading()
self.model = self.auto_model_loader.from_pretrained(
self.base_model,
config=self.model_config,
trust_remote_code=self.cfg.trust_remote_code or False,
**self.model_kwargs,
)
if (
self.model_type
and self.model_type != "AutoModelForCausalLM"
and not self.cfg.trust_remote_code
and not self.cfg.gptq
):
# Use model type from transformers
model_loader_class = getattr(transformers, self.model_type)
else:
# Use auto model loader (handles gptq and default cases)
model_loader_class = self.auto_model_loader
if self.cfg.reinit_weights:
self.model = self._load_model_from_config()
else:
self.model = self._load_model_from_pretrained(model_loader_class)
if is_deepspeed_zero3_enabled():
skip_move_to_device = True

View File

@@ -187,7 +187,7 @@ def _process_lora_module_for_fsdp(module, fsdp2_kwargs):
# Linear4Bit will keep it's bias term in fp32. If the weight dtype is in bf16 we are not able to
# wrap this. Therefore we must ensure the bias has the same dtype as the weight
if hasattr(module.base_layer, "bias") and module.base_layer.bias is not None:
if module.base_layer.bias is not None:
if module.base_layer.weight.dtype != module.base_layer.bias.dtype:
log_bias_dtype_mismatch = True
module.base_layer.bias.data = module.base_layer.bias.data.to(

View File

@@ -75,7 +75,7 @@ class PromptTokenizingStrategy(abc.ABC):
) -> BatchEncoding:
empty = BatchEncoding(data={"input_ids": [], "attention_mask": []})
if not prompt:
LOG.warning("Empty text requested for tokenization.")
LOG.warning_once("Empty text requested for tokenization.")
return empty
result = self.tokenizer(

View File

@@ -253,9 +253,7 @@ def save_trained_model(
# final model weights have already been saved by `ReLoRACallback.on_train_end`
return
if ( # pylint: disable=too-many-nested-blocks
trainer.is_fsdp_enabled or cfg.fsdp_config
):
if trainer.is_fsdp_enabled or cfg.fsdp_config:
if cfg.fsdp_config or cfg.fsdp:
if cfg.fsdp_config.final_state_dict_type:
state_dict_type = cfg.fsdp_config.final_state_dict_type
@@ -287,8 +285,6 @@ def save_trained_model(
if trainer.accelerator.is_main_process:
# move all files in merged_path to cfg.output_dir
for merged_file in Path(merged_path).iterdir():
if (Path(cfg.output_dir) / merged_file.name).exists():
(Path(cfg.output_dir) / merged_file.name).unlink()
shutil.move(str(merged_file), cfg.output_dir)
shutil.rmtree(merged_path) # remove what should be an empty dir
# TODO(wing):see https://github.com/huggingface/transformers/pull/40207

View File

@@ -9,7 +9,6 @@ from datasets import (
Dataset,
DatasetDict,
IterableDataset,
IterableDatasetDict,
load_dataset,
)
from transformers import PreTrainedTokenizer, ProcessorMixin
@@ -44,24 +43,12 @@ from axolotl.utils.trainer import (
LOG = get_logger(__name__)
def _is_streaming_enabled(cfg: DictDefault) -> bool:
"""Check if streaming is enabled for a specific split."""
streaming = cfg.get("streaming")
if streaming is True:
return True
# Check if pretraining dataset exists (defaults to streaming)
has_pretraining = cfg.get("pretraining_dataset") is not None
streaming = has_pretraining and streaming is None
return streaming
@retry_on_request_exceptions(max_retries=3, delay=5)
def prepare_datasets(
cfg: DictDefault,
tokenizer: PreTrainedTokenizer,
processor: ProcessorMixin | None = None,
preprocess_iterable: bool = False,
) -> tuple[IterableDataset | Dataset, Dataset | None, int, list[Prompter | None]]:
"""Prepare training and evaluation datasets based on configuration.
@@ -69,19 +56,23 @@ def prepare_datasets(
cfg: Dictionary mapping `axolotl` config keys to values.
tokenizer: Tokenizer to use for processing text.
processor: Optional processor for multimodal datasets.
preprocess_iterable: Whether to use iterable preprocessing.
Returns:
Tuple of (train_dataset, eval_dataset, total_steps, prompters).
"""
if cfg.pretraining_dataset:
return _prepare_pretraining_dataset(cfg, tokenizer, processor)
return _prepare_standard_dataset(cfg, tokenizer, processor)
return _prepare_pretraining_dataset(
cfg, tokenizer, processor, preprocess_iterable
)
return _prepare_standard_dataset(cfg, tokenizer, processor, preprocess_iterable)
def _prepare_standard_dataset(
cfg: DictDefault,
tokenizer: PreTrainedTokenizer,
processor: ProcessorMixin | None,
preprocess_iterable: bool,
) -> tuple[Dataset, Dataset | None, int, list[Prompter | None]]:
"""Prepare standard (non-pretraining) datasets."""
@@ -92,6 +83,7 @@ def _prepare_standard_dataset(
cfg,
split="train",
processor=processor,
preprocess_iterable=preprocess_iterable,
)
# Overwrite eval_dataset if test data exists
@@ -101,6 +93,7 @@ def _prepare_standard_dataset(
cfg,
split="test",
processor=processor,
preprocess_iterable=preprocess_iterable,
)
return train_dataset, eval_dataset, prompters
@@ -116,12 +109,7 @@ def _prepare_standard_dataset(
return train_dataset, eval_dataset, -1, prompters
# Validate sample packing configuration for evaluation
if (
eval_dataset
and cfg.sample_packing
and cfg.eval_sample_packing is not False
and not isinstance(eval_dataset, IterableDataset)
):
if eval_dataset and cfg.sample_packing and cfg.eval_sample_packing is not False:
total_eval_steps = calculate_total_num_steps(cfg, eval_dataset, update=False)
if total_eval_steps == 0:
raise ValueError(
@@ -129,17 +117,13 @@ def _prepare_standard_dataset(
"You should set `eval_sample_packing: False` in your config."
)
# Set total_num_steps for training
if isinstance(train_dataset, IterableDataset):
total_num_steps = cfg.max_steps
# Calculate total number of training steps
if cfg.max_steps:
total_num_steps = min(
calculate_total_num_steps(cfg, train_dataset), cfg.max_steps
)
else:
if cfg.max_steps:
total_num_steps = min(
calculate_total_num_steps(cfg, train_dataset), cfg.max_steps
)
else:
total_num_steps = calculate_total_num_steps(cfg, train_dataset)
total_num_steps = calculate_total_num_steps(cfg, train_dataset)
LOG.info(f"Maximum number of steps set at {total_num_steps}")
return train_dataset, eval_dataset, total_num_steps, prompters
@@ -148,6 +132,7 @@ def _prepare_pretraining_dataset(
cfg: DictDefault,
tokenizer: PreTrainedTokenizer,
processor: ProcessorMixin | None,
preprocess_iterable: bool,
) -> tuple[IterableDataset, Dataset | None, int, list[Prompter | None]]:
"""
Prepare dataset for pretraining mode.
@@ -168,6 +153,7 @@ def _prepare_pretraining_dataset(
cfg,
split="test",
processor=processor,
preprocess_iterable=preprocess_iterable,
)
if cfg.dataset_exact_deduplication:
@@ -270,6 +256,7 @@ def _load_tokenized_prepared_datasets(
cfg: DictDefault,
split: Literal["train", "test"] = "train",
processor: ProcessorMixin | None = None,
preprocess_iterable: bool = False,
) -> tuple[Dataset | DatasetDict, list[Prompter | None]]:
"""Load or create tokenized and prepared datasets for training or testing.
@@ -278,51 +265,39 @@ def _load_tokenized_prepared_datasets(
cfg: Configuration object.
split: Dataset split to load ('train' or 'test').
processor: Optional processor for multimodal datasets.
preprocess_iterable: Whether to use iterable preprocessing.
Returns:
Tuple of (dataset, prompters list).
"""
# Select correct dataset configuration based on split
datasets_configs = cfg.datasets if split == "train" else cfg.test_datasets
# Generate dataset hash for caching
dataset_hash = generate_dataset_hash_from_config(
cfg, datasets_configs, tokenizer.name_or_path
)
# Try loading from hub if push_dataset_to_hub is configured
dataset = None
if cfg.push_dataset_to_hub:
dataset = try_load_from_hub(cfg, dataset_hash, split)
# If not found on hub, try loading from disk
if dataset is None:
dataset = load_preprocessed_dataset(cfg, dataset_hash)
# If not found on disk or skipping prepared dataset, load and process raw datasets
prompters: list[Prompter | None] = []
use_streaming = False
if split == "train":
use_streaming = _is_streaming_enabled(cfg)
if use_streaming:
# For streaming datasets, skip caching and load raw datasets directly
if dataset is None:
dataset, prompters = _load_raw_datasets(
cfg,
datasets_configs,
tokenizer,
split,
processor,
preprocess_iterable,
)
else:
# Generate dataset hash for caching
dataset_hash = generate_dataset_hash_from_config(
cfg, datasets_configs, tokenizer.name_or_path
)
# Try loading from hub if push_dataset_to_hub is configured
dataset = None
if cfg.push_dataset_to_hub:
dataset = try_load_from_hub(cfg, dataset_hash, split)
# If not found on hub, try loading from disk
if dataset is None:
dataset = load_preprocessed_dataset(cfg, dataset_hash)
# If not found on disk or skipping prepared dataset, load and process raw
# datasets
if dataset is None:
dataset, prompters = _load_raw_datasets(
cfg,
datasets_configs,
tokenizer,
split,
processor,
)
return dataset, prompters
@@ -331,8 +306,9 @@ def _load_raw_datasets(
cfg: DictDefault,
datasets_configs: list,
tokenizer: PreTrainedTokenizer,
split: Literal["train", "test"],
split: str,
processor: ProcessorMixin | None = None,
preprocess_iterable: bool = False,
) -> tuple[Dataset, list[Prompter | None]]:
"""Load, process, merge, and save raw datasets."""
LOG.info("Loading raw datasets...", main_process_only=False)
@@ -353,6 +329,7 @@ def _load_raw_datasets(
split=split,
seed=cfg.seed,
processor=processor,
preprocess_iterable=preprocess_iterable,
)
datasets.append(dataset_wrapper)
prompters.append(dataset_prompter)
@@ -368,12 +345,11 @@ def _load_raw_datasets(
if cfg.sample_packing:
dataset, _ = process_datasets_for_packing(cfg, dataset, None)
# Only save regular datasets to disk, not streaming datasets
if not isinstance(dataset, IterableDataset):
dataset_hash = generate_dataset_hash_from_config(
cfg, datasets_configs, tokenizer.name_or_path
)
save_preprocessed_dataset(cfg, dataset, dataset_hash, split)
# Save the prepared dataset
dataset_hash = generate_dataset_hash_from_config(
cfg, datasets_configs, tokenizer.name_or_path
)
save_preprocessed_dataset(cfg, dataset, dataset_hash, split)
return dataset, prompters
@@ -382,22 +358,22 @@ def _load_and_process_single_dataset(
dataset_config: DictDefault,
cfg: DictDefault,
tokenizer: PreTrainedTokenizer,
split: Literal["train", "test"],
split: str,
seed: int,
processor: ProcessorMixin | None = None,
preprocess_iterable: bool = False,
) -> tuple[Dataset | IterableDataset, Prompter | None]:
"""Load and process a single dataset based on the passed config."""
use_streaming = False
if split == "train":
use_streaming = _is_streaming_enabled(cfg)
# Load the dataset
dataset = load_dataset_with_config(
dataset_config, cfg.hf_use_auth_token, use_streaming
dataset_config, cfg.hf_use_auth_token, streaming=preprocess_iterable
)
# Parse dataset type
d_base_type, d_prompt_style = _parse_dataset_type(dataset_config.type)
# Select the appropriate split
if isinstance(dataset, (DatasetDict, IterableDatasetDict)):
if isinstance(dataset, DatasetDict):
if dataset_config.split and dataset_config.split in dataset:
dataset = dataset[dataset_config.split]
elif split in dataset:
@@ -442,13 +418,11 @@ def _parse_dataset_type(d_type: str) -> tuple[str | None, str | None]:
def _handle_train_dataset_split(
dataset: Dataset | IterableDataset, cfg: DictDefault
) -> tuple[Dataset | IterableDataset, Dataset | IterableDataset | None]:
dataset: Dataset, cfg: DictDefault
) -> tuple[Dataset, Dataset | None]:
"""Handle processing for train split, including validation set creation."""
val_set_size = (
int(cfg.val_set_size)
if cfg.val_set_size and cfg.val_set_size > 1
else float(cfg.val_set_size or 0.0)
int(cfg.val_set_size) if cfg.val_set_size > 1 else float(cfg.val_set_size)
)
if val_set_size:
@@ -459,33 +433,27 @@ def _handle_train_dataset_split(
return train_dataset, eval_dataset
# No validation split - apply deduplication if needed and return as train dataset
if cfg.dataset_exact_deduplication and not isinstance(dataset, IterableDataset):
if cfg.dataset_exact_deduplication:
train_dataset, _ = deduplicate_and_log_datasets(dataset=dataset)
else:
if cfg.dataset_exact_deduplication and isinstance(dataset, IterableDataset):
LOG.info("Deduplication skipped for streaming datasets (not compatible)")
train_dataset = dataset
return train_dataset, None
def _handle_test_dataset_split(
dataset: Dataset | IterableDataset, cfg: DictDefault
) -> tuple[None, Dataset | IterableDataset | None]:
dataset: Dataset, cfg: DictDefault
) -> tuple[None, Dataset | None]:
"""Handle processing for test split."""
if cfg.dataset_exact_deduplication and not isinstance(dataset, IterableDataset):
if cfg.dataset_exact_deduplication:
eval_dataset, _ = deduplicate_and_log_datasets(dataset=dataset)
else:
if cfg.dataset_exact_deduplication and isinstance(dataset, IterableDataset):
LOG.info("Deduplication skipped for streaming datasets (not compatible)")
eval_dataset = dataset
return None, eval_dataset
def _apply_dataset_sharding(
dataset: Dataset | IterableDataset, cfg: DictDefault
) -> Dataset | IterableDataset:
def _apply_dataset_sharding(dataset: Dataset, cfg: DictDefault) -> Dataset:
"""Apply dataset sharding if configured.
Args:
@@ -511,6 +479,7 @@ def _load_and_prepare_datasets(
cfg: DictDefault,
split: Literal["train", "test"] = "train",
processor: ProcessorMixin | None = None,
preprocess_iterable: bool = False,
) -> tuple[Dataset | None, Dataset | None, list[Prompter | None]]:
"""Load and prepare datasets with optional validation split and sharding.
@@ -519,6 +488,7 @@ def _load_and_prepare_datasets(
cfg: Configuration object.
split: Dataset split to load ('train' or 'test').
processor: Optional processor for multimodal datasets.
preprocess_iterable: Whether to use iterable preprocessing.
Returns:
Tuple of (train_dataset, eval_dataset, prompters).
@@ -529,6 +499,7 @@ def _load_and_prepare_datasets(
cfg,
split=split,
processor=processor,
preprocess_iterable=preprocess_iterable,
)
# Apply dataset sharding if configured using shared function

View File

@@ -13,7 +13,6 @@ from datasets import (
IterableDataset,
IterableDatasetDict,
concatenate_datasets,
interleave_datasets,
load_dataset,
load_from_disk,
)
@@ -525,9 +524,7 @@ def generate_dataset_hash_from_config(
return str(md5(config_str))
def merge_datasets(
datasets: list[Dataset | IterableDataset], cfg: DictDefault
) -> Dataset | IterableDataset:
def merge_datasets(datasets: list[Dataset], cfg: DictDefault) -> Dataset:
"""Merge multiple datasets into one with optional shuffling.
Args:
@@ -540,23 +537,23 @@ def merge_datasets(
if len(datasets) == 1:
ds = datasets[0]
if (
cfg.curriculum_sampling
or not cfg.shuffle_merged_datasets
or isinstance(ds, IterableDataset)
):
# Do not shuffle if curriculum sampling is enabled or
# shuffle_merged_datasets is disabled
if cfg.curriculum_sampling or not cfg.shuffle_merged_datasets:
return ds
return ds.shuffle(seed=cfg.seed)
if cfg.shuffle_before_merging_datasets and all(
isinstance(ds, Dataset) for ds in datasets
):
# If enabled, shuffle each dataset independently before merging.
# This allows curriculum learning strategies to be applied at the dataset level.
if cfg.shuffle_before_merging_datasets:
LOG.info("Shuffling each dataset individually before merging...")
datasets = [ds.shuffle(seed=cfg.seed) for ds in datasets]
merged_dataset = _merge_datasets_with_strategy(datasets, cfg)
LOG.info("Merging datasets...")
merged_dataset = concatenate_datasets(datasets)
if cfg.shuffle_merged_datasets and not isinstance(merged_dataset, IterableDataset):
if cfg.shuffle_merged_datasets:
LOG.debug("Shuffling merged datasets...")
if cfg.curriculum_sampling:
LOG.warning(
@@ -565,45 +562,6 @@ def merge_datasets(
)
merged_dataset = merged_dataset.shuffle(seed=cfg.seed)
else:
if isinstance(merged_dataset, IterableDataset):
LOG.debug("Skipping shuffle for streaming datasets.")
else:
LOG.debug("Not shuffling merged datasets.")
LOG.debug("Not shuffling merged datasets.")
return merged_dataset
def _merge_datasets_with_strategy(
datasets: list[Dataset | IterableDataset], cfg: DictDefault
) -> Dataset | IterableDataset:
"""
Merge datasets using the configured mixing strategy. Works with streaming and non-
streaming datasets.
Args:
datasets: List of datasets to merge.
cfg: Configuration object containing mixing settings.
Returns:
Merged dataset (Dataset or IterableDataset depending on inputs).
"""
strategy = cfg.get("dataset_mixing_strategy", "concatenate")
weights = cfg.get("mixing_weights", None)
LOG.info(f"Merging datasets with mixing strategy: {strategy}...")
if strategy == "concatenate":
if not all(isinstance(ds, Dataset) for ds in datasets):
raise ValueError(
"Cannot concatenate streaming datasets. Use 'round_robin', 'weighted', "
"or 'random' instead."
)
return concatenate_datasets(datasets)
if strategy == "round_robin":
return interleave_datasets(datasets, seed=cfg.seed)
if strategy == "weighted":
return interleave_datasets(datasets, probabilities=weights, seed=cfg.seed)
if strategy == "random":
equal_weights = [1.0 / len(datasets)] * len(datasets)
return interleave_datasets(datasets, probabilities=equal_weights, seed=cfg.seed)
raise ValueError(f"Unknown dataset mixing strategy: {strategy}")

View File

@@ -190,15 +190,11 @@ def handle_long_seq_in_dataset(
Returns:
Filtered dataset with long sequences removed.
"""
if hasattr(dataset, "column_names") and dataset.column_names:
if "input_ids" not in dataset.column_names:
LOG.warning(
"Dataset does not contain 'input_ids' column. Skip drop long seq. This "
"is expected for reward modeling."
)
return dataset
elif isinstance(dataset, IterableDataset):
LOG.info("Skipping drop_long_seq for streaming datasets (not compatible)")
if "input_ids" not in dataset.column_names:
LOG.warning(
"Dataset does not contain 'input_ids' column. Skip drop long seq. This is "
"expected for reward modeling."
)
return dataset
drop_long = functools.partial(

View File

@@ -109,6 +109,12 @@ class AxolotlInputConfig(
"description": "Don't upcast the embeddings to float32 when using PEFT. Useful for low-VRAM GPUs"
},
)
reinit_weights: bool | None = Field(
default=None,
json_schema_extra={
"description": "Reinitialize model weights randomly instead of loading pretrained weights"
},
)
trainer_cls: str | None = Field(
default=None,
@@ -932,27 +938,9 @@ class AxolotlInputConfig(
fix_untrained_tokens: int | list[int] | None = None
streaming: bool | None = Field(
default=None,
json_schema_extra={
"description": "Whether to use streaming datasets (IterableDataset) for training datasets. When True, data is loaded on-demand during training without upfront preprocessing. Requires max_steps to be set. Pre-training datasets default to streaming unless explicitly set to False."
},
)
dataset_mixing_strategy: str | None = Field(
default="round_robin",
json_schema_extra={
"description": "Strategy for mixing multiple datasets: 'concatenate', 'round_robin' (equal sampling), 'weighted' (use mixing_weights), or 'random' (random sampling with equal probability). Works for both streaming and non-streaming datasets."
},
)
mixing_weights: list[float] | None = Field(
default=None,
json_schema_extra={
"description": "Weights for weighted mixing strategy when using multiple datasets. Must sum to 1.0 and have same length as datasets list. Only used when dataset_mixing_strategy='weighted'."
},
)
# INTERNALS - document for now, generally not set externally
is_preprocess: bool | None = None
preprocess_iterable: bool | None = None
total_num_tokens: int | None = Field(
default=None,

View File

@@ -161,12 +161,7 @@ class HyperparametersConfig(BaseModel):
max_grad_norm: float | None = Field(
default=None, json_schema_extra={"description": "Gradient clipping max norm"}
)
num_epochs: float = Field(
default=1.0,
json_schema_extra={
"description": "Number of iterations over dataset for training"
},
)
num_epochs: float = Field(default=1.0)
@field_validator("batch_size")
@classmethod

View File

@@ -3,7 +3,6 @@
# pylint: disable=too-many-boolean-expressions
import json
import os
import sys
import tempfile
from pathlib import Path
@@ -193,7 +192,6 @@ class AttentionValidationMixin:
return data
# pylint: disable=too-many-public-methods
class TrainingValidationMixin:
"""Validation methods related to training configuration."""
@@ -510,58 +508,11 @@ class TrainingValidationMixin:
# combining these would raise `TypeError: cannot pickle 'dict_keys' object`
# due to trying to count the number of tokens total in the dataset
raise ValueError(
"pretraining_dataset and include_tokens_per_second cannot be used "
"together."
"pretraining_dataset and include_tokens_per_second cannot be used together."
)
return data
@model_validator(mode="before")
@classmethod
def check_max_steps_num_epochs_conflict(cls, data):
"""Handle max_steps and num_epochs configuration and auto-set defaults."""
max_steps = data.get("max_steps")
num_epochs = data.get("num_epochs")
# Auto-set num_epochs to 1 if neither max_steps nor num_epochs are set
if max_steps is None and num_epochs is None:
data["num_epochs"] = 1.0
return data
@model_validator(mode="before")
@classmethod
def check_saves_per_epoch_conflicts(cls, data):
"""Ensure saves_per_epoch is compatible with training configuration."""
saves_per_epoch = data.get("saves_per_epoch")
num_epochs = data.get("num_epochs")
if saves_per_epoch is not None:
# Check if saves_per_epoch is set but num_epochs is unset
if num_epochs is None:
raise ValueError(
"saves_per_epoch requires num_epochs to be set to calculate save "
"intervals."
)
return data
@model_validator(mode="before")
@classmethod
def check_evals_per_epoch_conflicts(cls, data):
"""Ensure evals_per_epoch is compatible with training configuration."""
evals_per_epoch = data.get("evals_per_epoch")
num_epochs = data.get("num_epochs")
if evals_per_epoch is not None:
if num_epochs is None:
raise ValueError(
"evals_per_epoch requires num_epochs to be set to calculate "
"evaluation intervals."
)
return data
class LoRAValidationMixin:
"""Validation methods related to LoRA/QLoRA configuration."""
@@ -1127,27 +1078,6 @@ class PretrainingValidationMixin:
data["accelerator_config"]["dispatch_batches"] = False
return data
@model_validator(mode="before")
@classmethod
def check_streaming_split_batches_accelerate(cls, data):
# Check if streaming is enabled for training
streaming = data.get("streaming", False)
# If streaming is enabled, configure accelerator
if streaming:
accelerator_config = data.get("accelerator_config", {})
if not accelerator_config:
data["accelerator_config"] = {
"split_batches": False,
"dispatch_batches": False,
}
else:
if accelerator_config.get("split_batches") is None:
data["accelerator_config"]["split_batches"] = False
if accelerator_config.get("dispatch_batches") is None:
data["accelerator_config"]["dispatch_batches"] = False
return data
class ModelCompatibilityValidationMixin:
"""Validation methods for specific model compatibility."""
@@ -1406,128 +1336,6 @@ class GRPOVllmValidationMixin:
return self
class StreamingValidationMixin:
"""Validation methods related to streaming datasets."""
def _is_streaming_enabled(self) -> bool:
"""Check if streaming is enabled."""
# Fall back to main streaming setting
streaming = getattr(self, "streaming", None)
if streaming is True:
return True
# Check if pretraining dataset exists (defaults to streaming)
has_pretraining = getattr(self, "pretraining_dataset", None) is not None
streaming = has_pretraining and streaming is None
return streaming
@model_validator(mode="after")
def check_streaming_requires_max_steps(self):
"""Ensure max_steps is set when using streaming datasets."""
# Check if streaming is enabled for training datasets
if self._is_streaming_enabled():
max_steps = getattr(self, "max_steps", None)
if not max_steps:
raise ValueError("max_steps must be set when using streaming datasets")
return self
@model_validator(mode="after")
def check_streaming_validation_splits_conflict(self):
"""Ensure validation splits are not used with streaming datasets."""
# Check if streaming is enabled for training datasets
if self._is_streaming_enabled():
val_set_size = getattr(self, "val_set_size", 0.0)
if val_set_size and val_set_size > 0:
raise ValueError(
"Validation splits not supported for streaming datasets, please "
"use test_datasets: ... instead"
)
return self
@model_validator(mode="after")
def check_streaming_preprocessing_conflict(self):
"""Ensure preprocessing is not enabled with streaming datasets."""
# Check if streaming is enabled for training datasets
if self._is_streaming_enabled():
if os.environ.get("AXOLOTL_IS_PREPROCESS") == "1":
raise ValueError("preprocess is not supported for streaming datasets")
return self
@model_validator(mode="after")
def check_dataset_mixing_weights(self):
"""Validate dataset mixing weights configuration."""
valid_strategies = ["concatenate", "round_robin", "weighted", "random"]
# Get datasets to validate length against
datasets = getattr(self, "datasets", None)
# Check main strategy and weights
strategy = getattr(self, "dataset_mixing_strategy", "concatenate")
weights = getattr(self, "mixing_weights", None)
dataset_count = len(datasets) if datasets else 0
self._validate_dataset_strategy_and_weights(
strategy,
weights,
"dataset_mixing_strategy",
"mixing_weights",
valid_strategies,
dataset_count,
)
return self
def _validate_dataset_strategy_and_weights(
self,
strategy,
weights,
strategy_field,
weights_field,
valid_strategies,
dataset_count,
):
"""Helper method to validate dataset mixing strategy and weights pair."""
if strategy not in valid_strategies:
raise ValueError(
f"{strategy_field} must be one of {valid_strategies}, "
f"got '{strategy}'"
)
if strategy == "weighted":
if weights is None:
raise ValueError(
f"{weights_field} must be provided when "
f"{strategy_field}='weighted'"
)
if not isinstance(weights, list) or not all(
isinstance(w, (int, float)) for w in weights
):
raise ValueError(f"{weights_field} must be a list of numbers")
if any(w < 0 for w in weights):
raise ValueError(f"{weights_field} must be non-negative")
if abs(sum(weights) - 1.0) > 1e-6:
raise ValueError(f"{weights_field} must sum to 1.0, got {sum(weights)}")
# Validate weights length against dataset count
if dataset_count > 0 and len(weights) != dataset_count:
raise ValueError(
f"{weights_field} length ({len(weights)}) must match number of datasets ({dataset_count})"
)
elif weights is not None and strategy != "weighted":
LOG.warning(
f"{weights_field} provided but {strategy_field} is '{strategy}'. "
"Weights will be ignored."
)
# pylint: disable=too-many-ancestors
class ValidationMixin(
DatasetValidationMixin,
@@ -1539,7 +1347,6 @@ class ValidationMixin(
SystemValidationMixin,
ChatTemplateValidationMixin,
PretrainingValidationMixin,
StreamingValidationMixin,
ModelCompatibilityValidationMixin,
ComplexValidationMixin,
GRPOVllmValidationMixin,

View File

@@ -10,6 +10,7 @@ from typing import List, Optional
import numpy as np
import torch
import torch.cuda
from datasets import IterableDataset, disable_caching, enable_caching
from torch.utils.data import DataLoader, RandomSampler, SequentialSampler
from transformers.utils import is_torch_bf16_gpu_available
@@ -22,65 +23,6 @@ from axolotl.utils.samplers import MultipackBatchSampler, get_dataset_lengths
LOG = get_logger(__name__)
def _create_filtered_iterable_dataset(dataset, filter_fn, batched=False):
"""
Create a filtered IterableDataset that works around a HuggingFace datasets
limitation.
"""
def filtered_generator():
"""Generator that yields only samples that pass the filter function."""
if batched:
batch = []
batch_size = 1000 # Process in batches of 1000
for sample in dataset:
batch.append(sample)
if len(batch) >= batch_size:
# Create a batch dict from list of samples
batch_dict = {}
for key in batch[0].keys():
batch_dict[key] = [sample[key] for sample in batch]
# Apply filter function to batch
keep_mask = filter_fn(batch_dict)
# Yield samples that should be kept
for i, keep in enumerate(keep_mask):
if keep:
yield batch[i]
batch = []
# Process remaining samples in batch
if batch:
batch_dict = {}
for key in batch[0].keys():
batch_dict[key] = [sample[key] for sample in batch]
keep_mask = filter_fn(batch_dict)
for i, keep in enumerate(keep_mask):
if keep:
yield batch[i]
else:
# For non-batched filtering, apply filter to each sample individually
for sample in dataset:
if filter_fn(sample):
yield sample
# Create new IterableDataset from the filtered generator
filtered_dataset = IterableDataset.from_generator(filtered_generator)
# Preserve the original features if they exist
# pylint:disable=protected-access
if hasattr(dataset, "_info") and dataset._info.features is not None:
filtered_dataset._info.features = dataset._info.features
return filtered_dataset
@torch.jit.script
def weighted_cross_entropy(
logits: torch.Tensor, labels: torch.Tensor, weights: torch.Tensor
@@ -340,21 +282,12 @@ def process_datasets_for_packing(cfg, train_dataset, eval_dataset):
drop_long_kwargs = {}
if filter_map_kwargs:
drop_long_kwargs["desc"] = "Drop Samples with Zero Trainable Tokens"
# For IterableDatasets, always use custom filtering to avoid features issues
if isinstance(train_dataset, IterableDataset):
# IterableDatasets often have None features after transformations,
# so we use our custom filter implementation that doesn't rely on features
train_dataset = _create_filtered_iterable_dataset(
train_dataset, drop_no_trainable_tokens, batched=True
)
else:
train_dataset = train_dataset.filter(
drop_no_trainable_tokens,
batched=True,
**filter_map_kwargs,
**drop_long_kwargs,
)
train_dataset = train_dataset.filter(
drop_no_trainable_tokens,
batched=True,
**filter_map_kwargs,
**drop_long_kwargs,
)
if prior_len:
dropped = prior_len - len(train_dataset)
if dropped:
@@ -539,7 +472,7 @@ def calculate_total_num_steps(cfg, train_dataset, update=True):
)
data_loader = DataLoader(
train_dataset,
train_dataset.remove_columns(["length"]),
batch_sampler=sampler,
)
data_loader_len = len(data_loader) * cfg.micro_batch_size // cfg.batch_size
@@ -614,7 +547,7 @@ def setup_deepspeed_env(cfg, stage=None):
if stage == 3:
os.environ["ACCELERATE_DEEPSPEED_ZERO3_INIT"] = "true"
# NOTE(djsaunde): The distributed state cannot be initialized prior to the
# NOTE(djsaunde): The distribued state cannot be initialized prior to the
# ACCELERATE_USE_DEEPSPEED assignment, but it must be initialized some time prior
# to model load.
if (

View File

@@ -25,7 +25,7 @@ def min_cfg(temp_dir):
"liger_rms_norm": True,
"liger_glu_activation": True,
"torch_compile": True,
"chat_template": "qwen3",
"chat_template": "llama3",
"kd_trainer": True,
"kd_ce_alpha": 0.1,
"kd_alpha": 0.9,

119
tests/e2e/test_diffusion.py Normal file
View File

@@ -0,0 +1,119 @@
"""E2E smoke test for diffusion training plugin."""
from axolotl.common.datasets import load_datasets
from axolotl.train import train
from axolotl.utils.config import normalize_config, validate_config
from axolotl.utils.dict import DictDefault
from tests.e2e.utils import check_model_output_exists
class TestDiffusion:
"""Test case for diffusion training plugin."""
def test_diffusion_smoke_test(self, temp_dir):
"""
Smoke test for diffusion training to ensure the plugin loads and trains without
error.
"""
cfg = DictDefault(
{
"base_model": "HuggingFaceTB/SmolLM2-135M",
"tokenizer_type": "AutoTokenizer",
"trust_remote_code": True,
"sequence_len": 256,
"val_set_size": 0.1,
"special_tokens": {
"pad_token": "<|endoftext|>",
},
"datasets": [
{
"path": "mhenrichsen/alpaca_2k_test",
"type": "alpaca",
},
],
"num_epochs": 1,
"max_steps": 3,
"micro_batch_size": 1,
"gradient_accumulation_steps": 1,
"output_dir": temp_dir,
"learning_rate": 0.0001,
"optimizer": "adamw_torch",
"lr_scheduler": "cosine",
"bf16": True,
"save_safetensors": True,
"save_first_step": False,
"logging_steps": 1,
"eval_steps": 3,
# Diffusion-specific config
"plugins": ["axolotl.integrations.diffusion.DiffusionPlugin"],
"diffusion_mask_token_id": 16,
"diffusion_eps": 1e-3,
"diffusion_importance_weighting": False,
}
)
cfg = validate_config(cfg)
normalize_config(cfg)
dataset_meta = load_datasets(cfg=cfg)
train(cfg=cfg, dataset_meta=dataset_meta)
check_model_output_exists(temp_dir, cfg)
def test_diffusion_sft_labels(self, temp_dir):
"""Test that diffusion training properly handles SFT data with labels."""
cfg = DictDefault(
{
"base_model": "HuggingFaceTB/SmolLM2-135M",
"tokenizer_type": "AutoTokenizer",
"trust_remote_code": True,
"sequence_len": 256,
"val_set_size": 0.1,
"special_tokens": {
"pad_token": "<|endoftext|>",
},
"datasets": [
{
"path": "mhenrichsen/alpaca_2k_test",
"type": "alpaca",
},
],
"num_epochs": 1,
"max_steps": 3,
"micro_batch_size": 1,
"gradient_accumulation_steps": 1,
"output_dir": temp_dir,
"learning_rate": 0.0001,
"optimizer": "adamw_torch",
"lr_scheduler": "cosine",
"bf16": True,
"save_safetensors": True,
"save_first_step": False,
"logging_steps": 1,
"eval_steps": 2,
# Diffusion-specific config
"plugins": ["axolotl.integrations.diffusion.DiffusionPlugin"],
"diffusion_mask_token_id": 16,
"diffusion_eps": 1e-3,
"diffusion_importance_weighting": True,
# Ensure we have proper SFT labels
"train_on_inputs": False,
}
)
cfg = validate_config(cfg)
normalize_config(cfg)
dataset_meta = load_datasets(cfg=cfg)
# Verify that the dataset has labels
sample = dataset_meta.train_dataset[0]
assert "labels" in sample, "SFT dataset should have labels"
# Check that some labels are -100 (prompt tokens)
labels = sample["labels"]
if hasattr(labels, "tolist"):
labels = labels.tolist()
assert -100 in labels, "SFT dataset should have -100 labels for prompt tokens"
train(cfg=cfg, dataset_meta=dataset_meta)
check_model_output_exists(temp_dir, cfg)

View File

@@ -1,185 +0,0 @@
"""E2E tests for streaming dataset functionality"""
# pylint: disable=duplicate-code
import pytest
from axolotl.common.datasets import load_datasets
from axolotl.train import train
from axolotl.utils.config import normalize_config, validate_config
from axolotl.utils.dict import DictDefault
from .utils import check_model_output_exists, check_tensorboard
class TestStreamingDatasets:
"""Test case for streaming datasets with different mixing strategies"""
@pytest.mark.parametrize(
("dataset_mixing_strategy", "mixing_weights"),
[
("round_robin", None),
("weighted", [0.7, 0.3]),
("random", None),
],
)
def test_streaming_dataset_mixing_strategies(
self, temp_dir, dataset_mixing_strategy, mixing_weights
):
"""Test different mixing strategies with streaming datasets"""
cfg = DictDefault(
{
"base_model": "HuggingFaceTB/SmolLM2-135M",
"flash_attention": True,
"sequence_len": 1024,
"sample_packing": False,
"dataset_processes": 1,
"special_tokens": {
"pad_token": "<|endoftext|>",
},
"datasets": [
{
"path": "mhenrichsen/alpaca_2k_test",
"type": "alpaca",
},
{
"path": "tatsu-lab/alpaca",
"type": "alpaca",
},
],
# Streaming config
"streaming": True,
"max_steps": 3, # Very small for smoke test
"dataset_mixing_strategy": dataset_mixing_strategy,
"micro_batch_size": 1,
"gradient_accumulation_steps": 1,
"val_set_size": 0.0,
"output_dir": temp_dir,
"learning_rate": 0.00001,
"optimizer": "adamw_torch_fused",
"lr_scheduler": "cosine",
"save_safetensors": True,
"bf16": "auto",
"use_tensorboard": True,
"save_first_step": False,
}
)
# Add mixing weights if specified
if mixing_weights:
cfg["mixing_weights"] = mixing_weights
cfg = validate_config(cfg)
normalize_config(cfg)
dataset_meta = load_datasets(cfg=cfg)
train(cfg=cfg, dataset_meta=dataset_meta)
check_model_output_exists(temp_dir, cfg)
# Verify training actually happened by checking loss decrease
check_tensorboard(
temp_dir + "/runs",
"train/train_loss",
2.5, # Loss should be reasonable for a smoke test (higher threshold for streaming)
"Train Loss (%s) is too high",
)
def test_streaming_validation_error(self, temp_dir):
"""Test that pydantic validation catches invalid streaming configs"""
cfg = DictDefault(
{
"base_model": "HuggingFaceTB/SmolLM2-135M",
"datasets": [
{
"path": "mhenrichsen/alpaca_2k_test",
"type": "alpaca",
},
{
"path": "tatsu-lab/alpaca",
"type": "alpaca",
},
],
"streaming": True,
"max_steps": 3,
# Invalid: wrong number of weights for datasets
"dataset_mixing_strategy": "weighted",
"mixing_weights": [1.0], # Should be [0.x, 0.y] for 2 datasets
"micro_batch_size": 1,
"gradient_accumulation_steps": 1,
"output_dir": temp_dir,
"learning_rate": 0.00001,
"special_tokens": {
"pad_token": "<|endoftext|>",
},
}
)
# This should raise a validation error
with pytest.raises(Exception) as exc_info:
validate_config(cfg)
# Verify it's the right validation error
assert "mixing_weights length" in str(exc_info.value)
assert "must match number of datasets" in str(exc_info.value)
def test_streaming_three_datasets_weighted(self, temp_dir):
"""Test weighted mixing with three datasets"""
cfg = DictDefault(
{
"base_model": "HuggingFaceTB/SmolLM2-135M",
"flash_attention": True,
"sequence_len": 512,
"sample_packing": False,
"dataset_processes": 1,
"special_tokens": {
"pad_token": "<|endoftext|>",
},
"datasets": [
{
"path": "mhenrichsen/alpaca_2k_test",
"type": "alpaca",
},
{
"path": "tatsu-lab/alpaca",
"type": "alpaca",
},
{
"path": "yahma/alpaca-cleaned",
"type": "alpaca",
},
],
# Streaming config
"streaming": True,
"max_steps": 3,
"dataset_mixing_strategy": "weighted",
"mixing_weights": [0.5, 0.3, 0.2],
"micro_batch_size": 1,
"gradient_accumulation_steps": 1,
"val_set_size": 0.0,
"output_dir": temp_dir,
"learning_rate": 0.00001,
"optimizer": "adamw_torch_fused",
"lr_scheduler": "cosine",
"save_safetensors": True,
"bf16": "auto",
"use_tensorboard": True,
"save_first_step": False,
}
)
cfg = validate_config(cfg)
normalize_config(cfg)
dataset_meta = load_datasets(cfg=cfg)
train(cfg=cfg, dataset_meta=dataset_meta)
check_model_output_exists(temp_dir, cfg)
check_tensorboard(
temp_dir + "/runs",
"train/train_loss",
2.5,
"Train Loss (%s) is too high",
)

View File

@@ -0,0 +1,335 @@
"""Tests for diffusion trainer integration."""
# pylint: disable=redefined-outer-name,protected-access
from unittest.mock import Mock, patch
import pytest
import torch
from axolotl.integrations.diffusion.args import DiffusionArgs
from axolotl.integrations.diffusion.loss import (
ForDiffusionLMLoss,
register_diffusion_loss,
)
from axolotl.integrations.diffusion.model_patch import (
_create_bidirectional_attention_mask,
_forward_process,
patch_model_for_bidirectional_attention,
)
from axolotl.integrations.diffusion.plugin import DiffusionPlugin
@pytest.fixture
def diffusion_config():
"""Create a diffusion config."""
return DiffusionArgs(
eps=1e-3,
importance_weighting=False,
mask_token_id=32000,
generate_samples=False,
)
@pytest.fixture
def mock_model():
"""Create a mock model."""
model = Mock()
model.config = Mock()
model.config.loss_type = "ForDiffusionLM"
model.config.diffusion_config = {
"eps": 1e-3,
"importance_weighting": False,
"mask_token_id": 32000,
}
model.training = True
return model
class TestDiffusionLoss:
"""Test the ForDiffusionLMLoss function."""
def test_loss_with_diffusion_info(self, mock_model):
"""Test loss computation with stored diffusion info."""
batch_size, seq_len, vocab_size = 1, 5, 1000
# Mock stored diffusion info
original_input_ids = torch.tensor([[1, 10, 20, 30, 2]], dtype=torch.long)
masked_indices = torch.tensor(
[[False, True, True, False, False]], dtype=torch.bool
)
p_mask = torch.tensor([[0.5, 0.5, 0.5, 0.5, 0.5]], dtype=torch.float)
mock_model._diffusion_info = {
"original_input_ids": original_input_ids,
"masked_indices": masked_indices,
"p_mask": p_mask,
}
# Mock logits
logits = torch.randn(batch_size, seq_len, vocab_size, requires_grad=True)
labels = torch.tensor([[-100, -100, 20, 30, 2]], dtype=torch.long)
loss = ForDiffusionLMLoss(
logits=logits,
labels=labels,
vocab_size=vocab_size,
config=mock_model.config,
model=mock_model,
)
assert isinstance(loss, torch.Tensor)
assert loss.requires_grad
assert loss.item() >= 0
def test_loss_fallback_without_diffusion_info(self, mock_model):
"""Test fallback to causal LM loss when no diffusion info."""
batch_size, seq_len, vocab_size = 1, 5, 1000
# Remove diffusion info to trigger fallback
if hasattr(mock_model, "_diffusion_info"):
delattr(mock_model, "_diffusion_info")
logits = torch.randn(batch_size, seq_len, vocab_size, requires_grad=True)
labels = torch.tensor([[1, 10, 20, 30, 2]], dtype=torch.long)
loss = ForDiffusionLMLoss(
logits=logits,
labels=labels,
vocab_size=vocab_size,
config=mock_model.config,
model=mock_model,
)
assert isinstance(loss, torch.Tensor)
assert loss.requires_grad
def test_loss_no_masked_tokens(self, mock_model):
"""Test loss when no tokens are masked."""
batch_size, seq_len, vocab_size = 1, 3, 1000
# No masked tokens
original_input_ids = torch.tensor([[1, 10, 2]], dtype=torch.long)
masked_indices = torch.tensor([[False, False, False]], dtype=torch.bool)
p_mask = torch.tensor([[0.1, 0.1, 0.1]], dtype=torch.float)
mock_model._diffusion_info = {
"original_input_ids": original_input_ids,
"masked_indices": masked_indices,
"p_mask": p_mask,
}
logits = torch.randn(batch_size, seq_len, vocab_size)
labels = torch.tensor([[1, 10, 2]], dtype=torch.long)
loss = ForDiffusionLMLoss(
logits=logits,
labels=labels,
vocab_size=vocab_size,
config=mock_model.config,
model=mock_model,
)
assert loss.item() == 0.0
class TestModelPatch:
"""Test the model patching functionality."""
def test_forward_process_basic(self):
"""Test basic forward process."""
input_ids = torch.tensor([[1, 10, 20, 30, 2]], dtype=torch.long)
diffusion_config = {"eps": 0.1, "mask_token_id": 32000}
noisy_input_ids, masked_indices, p_mask = _forward_process(
input_ids, diffusion_config=diffusion_config
)
# Check shapes
assert noisy_input_ids.shape == input_ids.shape
assert masked_indices.shape == input_ids.shape
assert p_mask.shape == input_ids.shape
# Check that mask token is applied where masked
if masked_indices.any():
assert (noisy_input_ids[masked_indices] == 32000).all()
def test_forward_process_with_labels(self):
"""Test forward process with SFT labels."""
input_ids = torch.tensor([[1, 10, 20, 30, 2]], dtype=torch.long)
labels = torch.tensor([[-100, -100, 20, 30, 2]], dtype=torch.long)
diffusion_config = {"eps": 0.1, "mask_token_id": 32000}
_, masked_indices, _ = _forward_process(
input_ids, labels=labels, diffusion_config=diffusion_config
)
# Check that only answer tokens can be masked (where labels != -100)
non_answer_mask = labels == -100
assert not masked_indices[non_answer_mask].any()
def test_forward_process_with_attention_mask(self):
"""Test forward process with attention mask."""
input_ids = torch.tensor([[1, 10, 20, 0]], dtype=torch.long)
attention_mask = torch.tensor([[1, 1, 1, 0]], dtype=torch.long)
diffusion_config = {"eps": 0.1, "mask_token_id": 32000}
_, masked_indices, p_mask = _forward_process(
input_ids, attention_mask=attention_mask, diffusion_config=diffusion_config
)
# Check that padding tokens are not masked
padding_positions = attention_mask == 0
assert not masked_indices[padding_positions].any()
assert (p_mask[padding_positions] == 0).all()
def test_bidirectional_attention_mask(self):
"""Test bidirectional attention mask creation."""
input_ids = torch.tensor([[1, 10, 20, 2]], dtype=torch.long)
attention_mask = torch.tensor([[1, 1, 1, 1]], dtype=torch.long)
mask = _create_bidirectional_attention_mask(input_ids, attention_mask)
# Should be all-to-all attention
expected_shape = (1, 1, 4, 4)
assert mask.shape == expected_shape
assert mask.all()
def test_bidirectional_attention_mask_with_padding(self):
"""Test bidirectional attention mask with padding."""
input_ids = torch.tensor([[1, 10, 20, 0]], dtype=torch.long)
attention_mask = torch.tensor([[1, 1, 1, 0]], dtype=torch.long)
mask = _create_bidirectional_attention_mask(input_ids, attention_mask)
# Padding positions should not attend or be attended to
assert not mask[0, 0, 3, :].any() # Padding can't attend to anything
assert not mask[0, 0, :, 3].any() # Nothing can attend to padding
def test_patch_model_for_bidirectional_attention(self):
"""Test that model patching works."""
mock_model = Mock()
mock_model.config = Mock()
mock_model.config.loss_type = "ForDiffusionLM"
mock_model.config.diffusion_config = {"eps": 1e-3, "mask_token_id": 32000}
mock_model.training = True
original_forward = Mock()
mock_model.forward = original_forward
# Patch the model
patch_model_for_bidirectional_attention(mock_model)
# Check that forward method was replaced
assert mock_model.forward != original_forward
class TestDiffusionPlugin:
"""Test the DiffusionPlugin."""
def test_plugin_registers_loss_function(self):
"""Test that plugin registers diffusion loss function."""
with patch(
"axolotl.integrations.diffusion.plugin.register_diffusion_loss",
return_value=True,
) as mock_register:
plugin = DiffusionPlugin()
mock_register.assert_called_once()
def test_post_model_load_configuration(self):
"""Test that post_model_load configures model correctly."""
plugin = DiffusionPlugin()
# Mock model and config
mock_model = Mock()
mock_model.config = Mock()
mock_cfg = Mock()
mock_cfg.eps = 1e-3
mock_cfg.importance_weighting = True
mock_cfg.mask_token_id = 32000
with patch(
"axolotl.integrations.diffusion.plugin.patch_model_for_bidirectional_attention"
) as mock_patch:
result = plugin.post_model_load(mock_cfg, mock_model)
# Check model configuration
assert mock_model.config.loss_type == "ForDiffusionLM"
assert mock_model.config.diffusion_config is not None
assert mock_model.config.diffusion_config["eps"] == 1e-3
# Check model was patched
mock_patch.assert_called_once_with(mock_model)
# Should return the model
assert result == mock_model
def test_post_trainer_create_stores_config(self, diffusion_config):
"""Test that post_trainer_create stores config on trainer."""
plugin = DiffusionPlugin()
mock_trainer = Mock()
mock_cfg = Mock()
# Set config attributes
for attr, value in diffusion_config.model_dump().items():
setattr(mock_cfg, attr, value)
plugin.post_trainer_create(mock_cfg, mock_trainer)
# Check that diffusion config was stored on trainer
assert hasattr(mock_trainer, "diffusion_config")
assert mock_trainer.diffusion_config.eps == diffusion_config.eps
def test_add_callbacks_post_trainer_with_generation_enabled(self):
"""Test callback addition when generation is enabled."""
plugin = DiffusionPlugin()
mock_trainer = Mock()
mock_cfg = Mock()
# Mock trainer with diffusion config that has generation enabled
mock_trainer.diffusion_config = DiffusionArgs(generate_samples=True)
with patch(
"axolotl.integrations.diffusion.plugin.DiffusionGenerationCallback"
) as mock_callback_class:
callbacks = plugin.add_callbacks_post_trainer(mock_cfg, mock_trainer)
# Should return one callback
assert len(callbacks) == 1
mock_callback_class.assert_called_once_with(mock_trainer)
def test_add_callbacks_post_trainer_with_generation_disabled(self):
"""Test callback addition when generation is disabled."""
plugin = DiffusionPlugin()
mock_trainer = Mock()
mock_cfg = Mock()
# Mock trainer with diffusion config that has generation disabled
mock_trainer.diffusion_config = DiffusionArgs(generate_samples=False)
callbacks = plugin.add_callbacks_post_trainer(mock_cfg, mock_trainer)
# Should return no callbacks
assert len(callbacks) == 0
class TestLossRegistration:
"""Test loss function registration."""
def test_register_diffusion_loss(self):
"""Test that loss function can be registered."""
with patch("transformers.loss.loss_utils.LOSS_MAPPING", {}) as mock_mapping:
result = register_diffusion_loss()
assert result is True
assert "ForDiffusionLM" in mock_mapping
assert mock_mapping["ForDiffusionLM"] == ForDiffusionLMLoss
def test_register_diffusion_loss_import_error(self):
"""Test fallback when LOSS_MAPPING import fails."""
# Patch the import to raise ImportError
with patch(
"builtins.__import__",
side_effect=ImportError("transformers.loss.loss_utils not found"),
):
result = register_diffusion_loss()
assert result is False

View File

@@ -7,13 +7,13 @@ from typing import Any, Generator
from unittest.mock import patch
import pytest
from datasets import Dataset, IterableDataset
from datasets import Dataset
from huggingface_hub import snapshot_download
from transformers import PreTrainedTokenizer
from axolotl.loaders.tokenizer import load_tokenizer
from axolotl.utils.data.rl import prepare_preference_datasets
from axolotl.utils.data.sft import _load_tokenized_prepared_datasets, prepare_datasets
from axolotl.utils.data.sft import _load_tokenized_prepared_datasets
from axolotl.utils.dict import DictDefault
from tests.constants import (
@@ -24,7 +24,6 @@ from tests.constants import (
from tests.hf_offline_utils import enable_hf_offline
# pylint: disable=too-many-public-methods
class TestDatasetPreparation:
"""Test a configured dataloader."""
@@ -47,24 +46,6 @@ class TestDatasetPreparation:
]
)
@pytest.fixture
def streaming_dataset_fixture(self):
"""Create a streaming dataset fixture for testing."""
def generator():
yield {
"instruction": "Evaluate this sentence for spelling and grammar mistakes",
"input": "He finnished his meal and left the resturant",
"output": "He finished his meal and left the restaurant.",
}
yield {
"instruction": "What is the capital of France?",
"input": "",
"output": "The capital of France is Paris.",
}
return IterableDataset.from_generator(generator)
@pytest.mark.skip(reason="TODO: fix hf hub offline to work with HF rate limits")
@enable_hf_offline
def test_load_hub(self, tokenizer):
@@ -505,162 +486,3 @@ class TestDatasetPreparation:
assert "attention_mask" in dataset.features
assert "labels" in dataset.features
shutil.rmtree(tmp_ds_path)
def test_streaming_sft_dataset(self, tokenizer, streaming_dataset_fixture):
"""Test streaming SFT dataset preparation with IterableDataset."""
with patch("axolotl.utils.data.sft.load_dataset_with_config") as mock_load:
mock_load.return_value = streaming_dataset_fixture
cfg = DictDefault(
{
"tokenizer_config": "huggyllama/llama-7b",
"sequence_len": 256,
"streaming": True,
"max_steps": 100, # Required for streaming datasets
"datasets": [
{
"path": "dummy/path",
"type": "alpaca",
},
],
}
)
train_dataset, eval_dataset, total_num_steps, prompters = prepare_datasets(
cfg, tokenizer
)
# Verify it returns an IterableDataset
assert isinstance(train_dataset, IterableDataset)
assert eval_dataset is None # No eval split for streaming
assert total_num_steps == 100 # Should use max_steps
assert len(prompters) == 1
# Test that we can iterate through the dataset
sample_count = 0
for sample in train_dataset:
assert "input_ids" in sample
assert "attention_mask" in sample
assert "labels" in sample
sample_count += 1
if sample_count >= 2: # Just test first few samples
break
assert sample_count == 2
def test_dataset_mixing_strategy_validation(self):
"""Test validation of dataset mixing strategy configuration."""
from axolotl.utils.data.shared import _merge_datasets_with_strategy
# Test valid strategies work
valid_strategies = ["round_robin", "weighted", "random"]
dataset1 = Dataset.from_dict({"text": ["a"], "source": ["ds1"]})
dataset2 = Dataset.from_dict({"text": ["b"], "source": ["ds2"]})
for strategy in valid_strategies:
cfg = DictDefault(
{
"dataset_mixing_strategy": strategy,
"mixing_weights": [0.5, 0.5] if strategy == "weighted" else None,
"seed": 42,
}
)
# Should not raise an error
merged = _merge_datasets_with_strategy([dataset1, dataset2], cfg)
assert len(merged) >= 1
def test_regular_dataset_round_robin_mixing(self):
"""Test round-robin mixing for regular datasets."""
from axolotl.utils.data.shared import _merge_datasets_with_strategy
# Create test datasets
dataset1 = Dataset.from_dict(
{"text": ["ds1_item1", "ds1_item2"], "source": ["ds1", "ds1"]}
)
dataset2 = Dataset.from_dict(
{"text": ["ds2_item1", "ds2_item2"], "source": ["ds2", "ds2"]}
)
cfg = DictDefault({"dataset_mixing_strategy": "round_robin", "seed": 42})
merged = _merge_datasets_with_strategy([dataset1, dataset2], cfg)
# Should have all samples from both datasets
assert len(merged) == 4
assert isinstance(merged, Dataset)
# Check that samples are interleaved (not just concatenated)
sources = [sample["source"] for sample in merged]
# Round-robin should alternate between datasets
assert sources != ["ds1", "ds1", "ds2", "ds2"] # Not concatenated
def test_regular_dataset_weighted_mixing(self):
"""Test weighted mixing for regular datasets."""
from axolotl.utils.data.shared import _merge_datasets_with_strategy
# Create test datasets
dataset1 = Dataset.from_dict(
{
"text": ["ds1_item1", "ds1_item2", "ds1_item3", "ds1_item4"],
"source": ["ds1"] * 4,
}
)
dataset2 = Dataset.from_dict(
{
"text": ["ds2_item1", "ds2_item2", "ds2_item3", "ds2_item4"],
"source": ["ds2"] * 4,
}
)
cfg = DictDefault(
{
"dataset_mixing_strategy": "weighted",
"mixing_weights": [0.75, 0.25], # 3:1 ratio
"seed": 42,
}
)
merged = _merge_datasets_with_strategy([dataset1, dataset2], cfg)
# Should have samples proportional to weights
assert len(merged) > 0
assert isinstance(merged, Dataset)
# Count samples from each dataset
sources = [sample["source"] for sample in merged]
ds1_count = sources.count("ds1")
ds2_count = sources.count("ds2")
# Should have samples from both datasets
assert ds1_count > 0 and ds2_count > 0 # Both datasets should be represented
def test_streaming_dataset_mixing(self):
"""Test that streaming datasets use HuggingFace interleave_datasets."""
from axolotl.utils.data.shared import _merge_datasets_with_strategy
# Create test streaming datasets
def gen1():
yield {"text": "stream1_item1", "source": "stream1"}
yield {"text": "stream1_item2", "source": "stream1"}
def gen2():
yield {"text": "stream2_item1", "source": "stream2"}
yield {"text": "stream2_item2", "source": "stream2"}
stream1 = IterableDataset.from_generator(gen1)
stream2 = IterableDataset.from_generator(gen2)
cfg = DictDefault({"dataset_mixing_strategy": "round_robin", "seed": 42})
merged = _merge_datasets_with_strategy([stream1, stream2], cfg)
# Should return an IterableDataset
assert isinstance(merged, IterableDataset)
# Test that we can iterate and get samples
samples = list(merged.take(3))
assert len(samples) >= 2 # Should get at least 2 samples
# Should have samples from both datasets
sources = [sample["source"] for sample in samples]
assert len(set(sources)) >= 1 # At least one unique source

View File

@@ -1,11 +1,16 @@
"""Module for testing dataset sequence packing"""
import unittest
from pathlib import Path
from datasets import Dataset, load_dataset
from transformers import AutoTokenizer
from axolotl.cli.args import TrainerCliArgs
from axolotl.common.datasets import load_datasets
from axolotl.datasets import ConstantLengthDataset, TokenizedPromptDataset
from axolotl.prompt_tokenizers import AlpacaPromptTokenizingStrategy
from axolotl.prompters import AlpacaPrompter
from axolotl.train import setup_model_and_trainer
from axolotl.utils.config import normalize_config, validate_config
from axolotl.utils.dict import DictDefault
@@ -31,6 +36,43 @@ class TestPacking(unittest.TestCase):
}
)
def test_increments_attention(self):
prompter = AlpacaPrompter("chat")
strat = AlpacaPromptTokenizingStrategy(
prompter,
self.tokenizer,
False,
2048,
)
dateset = load_dataset(
"json",
data_files=str(Path(__file__).parent / "fixtures/alpaca/alpaca.json"),
)["train"]
dataset = Dataset.from_list(list(TokenizedPromptDataset(strat, dateset)))
constant_len_dataset = ConstantLengthDataset(
self.tokenizer,
[dataset],
seq_length=2048,
)
packed_dataset = Dataset.from_list(list(constant_len_dataset))
example = packed_dataset[0]
next_bos_index = (
example["input_ids"][1:].index(self.tokenizer.bos_token_id) + 1
) # add one since we sliced
# first example doesn't have mask reset
assert example["input_ids"][0] == self.tokenizer.bos_token_id
assert example["attention_mask"][0] == 1
assert example["position_ids"][0] == 0
assert example["position_ids"][1] == 1
# but subsequent one does
assert example["input_ids"][next_bos_index] == self.tokenizer.bos_token_id
assert example["attention_mask"][next_bos_index] == 2
assert example["position_ids"][next_bos_index] == 0
assert example["position_ids"][next_bos_index + 1] == 1
@with_temp_dir
def test_lora_packing(self, temp_dir):
# pylint: disable=duplicate-code