Compare commits

..

3 Commits

Author SHA1 Message Date
Dan Saunders
4870638734 initial impl of streaming preprocessing 2025-08-19 23:10:54 +00:00
Dan Saunders
b25078397c nit 2025-08-19 18:12:09 +00:00
Dan Saunders
ba681125d7 separate streaming and pretraining 2025-08-19 18:05:05 +00:00
23 changed files with 409 additions and 73 deletions

View File

@@ -12,6 +12,5 @@ reviews:
auto_review:
enabled: true
drafts: false
auto_incremental_review: true
chat:
auto_reply: true

View File

@@ -41,12 +41,6 @@ model, and final model output, you may need at least 3TB of free disk space to k
axolotl train examples/gpt-oss/gpt-oss-120b-fft-fsdp2-offload.yaml
```
To simplify fine-tuning across 2 nodes × 8x H100 (80GB) GPUs, we've partnered with [Baseten](https://baseten.co) to showcase multi-node
training of the 120B model using Baseten Truss. You can read more about this recipe on
[Baseten's blog](https://www.baseten.co/blog/how-to-fine-tune-gpt-oss-120b-with-baseten-and-axolotl/). The recipe can
be found on their
[GitHub](https://github.com/basetenlabs/ml-cookbook/tree/main/examples/oss-gpt-120b-axolotl/training).
ERRATA: Transformers saves the model Architecture prefixed with `FSDP` which needs to be manually renamed in `config.json`.
See https://github.com/huggingface/transformers/pull/40207 for the status of this issue.
@@ -67,23 +61,9 @@ mv ./outputs/gpt-oss-out/merged/* ./outputs/gpt-oss-out/
### Inferencing your fine-tuned model
#### vLLM
GPT-OSS support in vLLM does not exist in a stable release yet. See https://x.com/MaziyarPanahi/status/1955741905515323425
for more information about using a special vllm-openai docker image for inferencing with vLLM.
Optionally, vLLM can be installed from nightly:
```bash
pip install --no-build-isolation --pre -U vllm --extra-index-url https://wheels.vllm.ai/nightly
```
and the vLLM server can be started with the following command (modify `--tensor-parallel-size 8` to match your environment):
```bash
vllm serve ./outputs/gpt-oss-out/ --served-model-name axolotl/gpt-oss-20b --host 0.0.0.0 --port 8888 --tensor-parallel-size 8
```
#### SGLang
SGLang has 0-day support in main, see https://github.com/sgl-project/sglang/issues/8833 for infomation on installing
SGLang from source. Once you've installed SGLang, run the following command to launch a SGLang server:

View File

@@ -44,7 +44,7 @@ bf16: true
tf32: true
flash_attention: true
attn_implementation: kernels-community/vllm-flash-attn3 # this is not needed if using flash_attn >= 2.8.3
attn_implementation: kernels-community/vllm-flash-attn3
gradient_checkpointing: true
activation_offloading: true

View File

@@ -40,7 +40,7 @@ bf16: true
tf32: true
flash_attention: true
attn_implementation: kernels-community/vllm-flash-attn3 # this is not needed if using flash_attn >= 2.8.3
attn_implementation: kernels-community/vllm-flash-attn3
gradient_checkpointing: true
activation_offloading: true

View File

@@ -15,7 +15,7 @@ datasets:
field_thinking: thinking
template_thinking_key: thinking
dataset_prepared_path: ./outputs/last_run_prepared
dataset_prepared_path: last_run_prepared
val_set_size: 0
output_dir: ./outputs/gpt-oss-out/
@@ -41,7 +41,7 @@ bf16: true
tf32: true
flash_attention: true
attn_implementation: kernels-community/vllm-flash-attn3 # this is not needed if using flash_attn >= 2.8.3
attn_implementation: kernels-community/vllm-flash-attn3
gradient_checkpointing: true
activation_offloading: true

View File

@@ -15,7 +15,7 @@ datasets:
field_thinking: thinking
template_thinking_key: thinking
dataset_prepared_path: ./outputs/last_run_prepared
dataset_prepared_path: last_run_prepared
val_set_size: 0
output_dir: ./outputs/gpt-oss-out/
@@ -40,7 +40,7 @@ bf16: true
tf32: true
flash_attention: true
attn_implementation: kernels-community/vllm-flash-attn3 # this is not needed if using flash_attn >= 2.8.3
attn_implementation: kernels-community/vllm-flash-attn3
gradient_checkpointing: true
activation_offloading: true

View File

@@ -53,7 +53,7 @@ bf16: true
tf32: true
flash_attention: true
attn_implementation: kernels-community/vllm-flash-attn3 # this is not needed if using flash_attn >= 2.8.3
attn_implementation: kernels-community/vllm-flash-attn3
gradient_checkpointing: true
activation_offloading: true

View File

@@ -118,9 +118,9 @@ def get_package_version():
extras_require = {
"flash-attn": ["flash-attn==2.8.3"],
"flash-attn": ["flash-attn==2.8.2"],
"ring-flash-attn": [
"flash-attn==2.8.3",
"flash-attn==2.8.2",
"ring-flash-attn>=0.1.7",
"yunchang==0.6.0",
],

View File

@@ -82,7 +82,7 @@ class ModalCloud(Cloud):
return res
def get_image(self):
docker_tag = "main-py3.11-cu126-2.7.1"
docker_tag = "main-py3.11-cu124-2.6.0"
if self.config.docker_tag:
docker_tag = self.config.docker_tag
docker_image = f"axolotlai/axolotl:{docker_tag}"
@@ -200,7 +200,7 @@ class ModalCloud(Cloud):
if family in ["a10", "a10g"]:
return modal.gpu.A10G(count=count)
if family == "h100":
return f"H100:{count}"
return modal.gpu.H100(count=count)
if family == "t4":
return modal.gpu.T4(count=count)
if family == "l4":

View File

@@ -64,7 +64,7 @@ def do_inference(
importlib.import_module("axolotl.prompters"), prompter
)
elif cfg.chat_template:
chat_template_str = get_chat_template(cfg.chat_template, tokenizer=tokenizer)
chat_template_str = get_chat_template(cfg.chat_template)
elif cfg.datasets[0].type == "chat_template":
chat_template_str = get_chat_template_from_config(
cfg=cfg, ds_cfg=cfg.datasets[0], tokenizer=tokenizer

View File

@@ -97,8 +97,7 @@ def do_cli(
"""
# pylint: disable=duplicate-code
os.environ["AXOLOTL_IS_PREPROCESS"] = "1"
is_preprocess = kwargs.pop("is_preprocess", True)
parsed_cfg = load_cfg(config, is_preprocess=is_preprocess, **kwargs)
parsed_cfg = load_cfg(config, **kwargs)
parsed_cfg.is_preprocess = True
parser = transformers.HfArgumentParser(PreprocessCliArgs)
parsed_cli_args, _ = parser.parse_args_into_dataclasses(

View File

@@ -3,12 +3,11 @@
import random
from copy import deepcopy
from itertools import product
from typing import Any
def generate_sweep_configs(
base_config: dict[str, list], sweeps_config: dict[str, list]
) -> list[dict[str, Any]]:
) -> list[dict[str, list]]:
"""
Recursively generates all possible configurations by applying sweeps to the base config.

View File

@@ -4,7 +4,6 @@ import os
import subprocess # nosec
import sys
import tempfile
from pathlib import Path
from typing import Any, Iterator, Literal
import yaml
@@ -89,12 +88,7 @@ def generate_config_files(config: str, sweep: str | None) -> Iterator[tuple[str,
# Generate all possible configurations
permutations = generate_sweep_configs(base_config, sweep_config)
is_group = len(permutations) > 1
base_output_dir = base_config.get("output_dir", "./model-out")
for idx, permutation in enumerate(permutations, start=1):
permutation_dir = Path(permutation.get("output_dir", base_output_dir))
permutation_id = f"sweep{idx:04d}"
permutation["output_dir"] = str(permutation_dir / permutation_id)
for permutation in permutations:
# pylint: disable=consider-using-with
temp_file = tempfile.NamedTemporaryFile(
mode="w",

View File

@@ -40,6 +40,7 @@ from axolotl.utils.collators import (
BatchSamplerDataCollatorForSeq2Seq,
DataCollatorForSeq2Seq,
MambaDataCollator,
StreamingDataCollator,
V2BatchSamplerDataCollatorForSeq2Seq,
)
from axolotl.utils.collators.mm_chat import MultiModalChatDataCollator
@@ -422,6 +423,17 @@ class HFCausalTrainerBuilder(TrainerBuilderBase):
is_eval=False,
**kwargs,
):
from datasets import IterableDataset
if isinstance(self.train_dataset, IterableDataset) and not is_eval:
LOG.info("Using StreamingDataCollator")
return StreamingDataCollator(
tokenizer=self.tokenizer,
cfg=self.cfg,
prompter=None,
**kwargs,
)
if training_args.pretraining:
if (
self.cfg.pretraining_sample_concatenation is False

View File

@@ -43,7 +43,11 @@ class TokenizedPromptDataset(Dataset):
)
def process(self, dataset):
features = dataset.features.keys()
# For IterableDataset, we can't access features upfront
# We'll need to infer from the first batch
features = None
if hasattr(dataset, "features") and dataset.features:
features = dataset.features.keys()
map_kwargs = {}
if self.prompt_tokenizer.supports_batched:
@@ -54,18 +58,29 @@ class TokenizedPromptDataset(Dataset):
hasattr(self.prompt_tokenizer, "filter_rows")
and self.prompt_tokenizer.filter_rows
):
filter_kwargs = {"desc": "Strategy Filtering Rows"}
# Only add num_proc for regular datasets
if features is not None:
filter_kwargs["num_proc"] = self.process_count
dataset = dataset.filter(
self.prompt_tokenizer.filter_rows,
num_proc=self.process_count,
desc="Strategy Filtering Rows",
**filter_kwargs,
)
map_kwargs = {
**map_kwargs,
"desc": "Tokenizing Prompts",
}
# Only add remove_columns for regular datasets
if features is not None:
map_kwargs["remove_columns"] = features
map_kwargs["num_proc"] = self.process_count
map_kwargs["keep_in_memory"] = self.keep_in_memory
return dataset.map(
self.prompt_tokenizer.tokenize_prompt,
num_proc=self.process_count,
remove_columns=features,
keep_in_memory=self.keep_in_memory,
desc="Tokenizing Prompts",
**map_kwargs,
)

View File

@@ -187,7 +187,7 @@ def _process_lora_module_for_fsdp(module, fsdp2_kwargs):
# Linear4Bit will keep it's bias term in fp32. If the weight dtype is in bf16 we are not able to
# wrap this. Therefore we must ensure the bias has the same dtype as the weight
if hasattr(module.base_layer, "bias") and module.base_layer.bias is not None:
if module.base_layer.bias is not None:
if module.base_layer.weight.dtype != module.base_layer.bias.dtype:
log_bias_dtype_mismatch = True
module.base_layer.bias.data = module.base_layer.bias.data.to(

View File

@@ -253,9 +253,7 @@ def save_trained_model(
# final model weights have already been saved by `ReLoRACallback.on_train_end`
return
if ( # pylint: disable=too-many-nested-blocks
trainer.is_fsdp_enabled or cfg.fsdp_config
):
if trainer.is_fsdp_enabled or cfg.fsdp_config:
if cfg.fsdp_config or cfg.fsdp:
if cfg.fsdp_config.final_state_dict_type:
state_dict_type = cfg.fsdp_config.final_state_dict_type
@@ -287,8 +285,6 @@ def save_trained_model(
if trainer.accelerator.is_main_process:
# move all files in merged_path to cfg.output_dir
for merged_file in Path(merged_path).iterdir():
if (Path(cfg.output_dir) / merged_file.name).exists():
(Path(cfg.output_dir) / merged_file.name).unlink()
shutil.move(str(merged_file), cfg.output_dir)
shutil.rmtree(merged_path) # remove what should be an empty dir
# TODO(wing):see https://github.com/huggingface/transformers/pull/40207

View File

@@ -1,11 +1,19 @@
"""
shared axolotl collators for multipack, mamba, multimodal
"""
"""Shared axolotl collators for multipack, mamba, multimodal, etc."""
from .batching import ( # noqa: F401
from .batching import (
BatchSamplerDataCollatorForSeq2Seq,
DataCollatorForSeq2Seq,
PretrainingBatchSamplerDataCollatorForSeq2Seq,
V2BatchSamplerDataCollatorForSeq2Seq,
)
from .mamba import MambaDataCollator # noqa: F401
from .mamba import MambaDataCollator
from .streaming import StreamingDataCollator
__all__ = [
"BatchSamplerDataCollatorForSeq2Seq",
"DataCollatorForSeq2Seq",
"PretrainingBatchSamplerDataCollatorForSeq2Seq",
"V2BatchSamplerDataCollatorForSeq2Seq",
"MambaDataCollator",
"StreamingDataCollator",
]

View File

@@ -0,0 +1,146 @@
from dataclasses import dataclass
from typing import Any, List
import torch
from transformers import PreTrainedTokenizerBase, default_data_collator
from transformers.utils import PaddingStrategy
from axolotl.prompters import Prompter
from axolotl.utils.dict import DictDefault
@dataclass
class StreamingDataCollator:
tokenizer: PreTrainedTokenizerBase
cfg: DictDefault
prompter: Prompter | None = None
padding: bool | str | PaddingStrategy = True
max_length: int | None = None
pad_to_multiple_of: int | None = None
label_pad_token_id: int = -100
return_tensors: str = "pt"
def __post_init__(self):
if self.max_length is None:
self.max_length = self.cfg.sequence_len
def __call__(self, raw_batch: List[dict]) -> dict[str, Any]:
processed_samples = []
for raw_sample in raw_batch:
formatted_sample = raw_sample
if self.prompter:
formatted_sample = self._apply_prompt_formatting(raw_sample)
tokenized_sample = self._tokenize_sample(formatted_sample)
if len(tokenized_sample["input_ids"]) > self.max_length:
tokenized_sample = self._truncate_sample(tokenized_sample)
if tokenized_sample.get("input_ids"):
processed_samples.append(tokenized_sample)
return self._pad_and_batch(processed_samples)
def _apply_prompt_formatting(self, raw_sample: dict) -> dict:
formatted_text = self.prompter.build_prompt(
instruction=raw_sample.get("instruction", ""),
input=raw_sample.get("input", ""),
output=raw_sample.get("output", ""),
)
return {"text": formatted_text}
def _tokenize_sample(self, sample: dict) -> dict:
text = sample.get("text", sample.get("content", ""))
if not text:
instruction = sample.get("instruction", "")
input_text = sample.get("input", "")
output_text = sample.get("output", "")
parts = []
if instruction:
parts.append(f"Instruction: {instruction}")
if input_text:
parts.append(f"Input: {input_text}")
if output_text:
parts.append(f"Output: {output_text}")
text = "\n".join(parts)
if not text:
return {"input_ids": [], "attention_mask": [], "labels": []}
tokenized = self.tokenizer(
text,
truncation=False,
padding=False,
return_tensors=None,
)
tokenized["labels"] = tokenized["input_ids"].copy()
return tokenized
def _truncate_sample(self, tokenized_sample: dict) -> dict:
max_len = self.max_length
for key in ["input_ids", "attention_mask", "labels"]:
if key in tokenized_sample:
tokenized_sample[key] = tokenized_sample[key][:max_len]
return tokenized_sample
def _pad_and_batch(self, processed_samples: List[dict]) -> dict[str, Any]:
if not processed_samples:
processed_samples = [
{
"input_ids": [self.tokenizer.eos_token_id],
"attention_mask": [1],
"labels": [self.tokenizer.eos_token_id],
}
]
batch_samples = []
for sample in processed_samples:
batch_sample = {}
for key, value in sample.items():
if key in ["input_ids", "attention_mask", "labels"]:
batch_sample[key] = torch.tensor(value, dtype=torch.long)
batch_samples.append(batch_sample)
if self.padding:
max_len_in_batch = max(len(sample["input_ids"]) for sample in batch_samples)
for sample in batch_samples:
current_len = len(sample["input_ids"])
pad_len = max_len_in_batch - current_len
if pad_len > 0:
pad_token_id = (
self.tokenizer.pad_token_id or self.tokenizer.eos_token_id
)
sample["input_ids"] = torch.cat(
[
sample["input_ids"],
torch.full((pad_len,), pad_token_id, dtype=torch.long),
]
)
sample["attention_mask"] = torch.cat(
[
sample["attention_mask"],
torch.zeros(pad_len, dtype=torch.long),
]
)
sample["labels"] = torch.cat(
[
sample["labels"],
torch.full(
(pad_len,), self.label_pad_token_id, dtype=torch.long
),
]
)
batch = {}
for key in ["input_ids", "attention_mask", "labels"]:
if key in batch_samples[0]:
batch[key] = torch.stack([sample[key] for sample in batch_samples])
return batch

View File

@@ -9,6 +9,7 @@ from datasets import (
Dataset,
DatasetDict,
IterableDataset,
IterableDatasetDict,
load_dataset,
)
from transformers import PreTrainedTokenizer, ProcessorMixin
@@ -43,6 +44,18 @@ from axolotl.utils.trainer import (
LOG = get_logger(__name__)
def _determine_streaming_mode(cfg: DictDefault) -> bool:
"""Determine if we should use streaming mode based on config."""
if cfg.streaming is not None:
return cfg.streaming
# Default to streaming for pretraining datasets
if cfg.pretraining_dataset:
return True
return False
@retry_on_request_exceptions(max_retries=3, delay=5)
def prepare_datasets(
cfg: DictDefault,
@@ -61,11 +74,52 @@ def prepare_datasets(
Returns:
Tuple of (train_dataset, eval_dataset, total_steps, prompters).
"""
if cfg.pretraining_dataset:
return _prepare_pretraining_dataset(
cfg, tokenizer, processor, preprocess_iterable
streaming_mode = _determine_streaming_mode(cfg)
if streaming_mode:
if cfg.pretraining_dataset:
return _prepare_streaming_pretraining_dataset(cfg, tokenizer, processor)
else:
return _prepare_streaming_sft_dataset(cfg, tokenizer, processor)
else:
if cfg.pretraining_dataset:
return _prepare_pretraining_dataset(
cfg, tokenizer, processor, preprocess_iterable=False
)
else:
return _prepare_standard_dataset(
cfg, tokenizer, processor, preprocess_iterable=False
)
def _prepare_streaming_sft_dataset(
cfg: DictDefault,
tokenizer: PreTrainedTokenizer,
processor: ProcessorMixin | None,
) -> tuple[IterableDataset, Dataset | None, int, list[Prompter | None]]:
LOG.info("Loading streaming datasets")
raw_datasets = _load_raw_datasets_for_streaming(cfg, split="train")
eval_dataset = None
if cfg.test_datasets:
eval_raw_datasets = _load_raw_datasets_for_streaming(
cfg, split="test", dataset_configs=cfg.test_datasets
)
return _prepare_standard_dataset(cfg, tokenizer, processor, preprocess_iterable)
eval_dataset = _process_eval_dataset_minimal(
eval_raw_datasets, cfg, tokenizer, processor
)
elif cfg.val_set_size:
LOG.info("Validation splits not supported for streaming datasets")
if not cfg.max_steps:
raise ValueError("max_steps must be set when using streaming datasets")
total_num_steps = cfg.max_steps
LOG.info(f"Maximum steps: {total_num_steps}")
prompters = [None] * len(cfg.datasets) if cfg.datasets else []
return raw_datasets, eval_dataset, total_num_steps, prompters
def _prepare_standard_dataset(
@@ -373,7 +427,7 @@ def _load_and_process_single_dataset(
d_base_type, d_prompt_style = _parse_dataset_type(dataset_config.type)
# Select the appropriate split
if isinstance(dataset, DatasetDict):
if isinstance(dataset, (DatasetDict, IterableDatasetDict)):
if dataset_config.split and dataset_config.split in dataset:
dataset = dataset[dataset_config.split]
elif split in dataset:
@@ -512,3 +566,78 @@ def _load_and_prepare_datasets(
train_dataset, eval_dataset = _handle_test_dataset_split(dataset, cfg)
return train_dataset, eval_dataset, prompters
def _load_raw_datasets_for_streaming(
cfg: DictDefault, split: str = "train", dataset_configs: list | None = None
) -> IterableDataset:
configs = (
dataset_configs
if dataset_configs is not None
else (cfg.datasets if split == "train" else cfg.test_datasets)
)
if not configs:
raise ValueError(f"No dataset configurations found for split '{split}'")
datasets = []
for dataset_config in datasets_with_name_generator(configs):
raw_dataset = load_dataset_with_config(
dataset_config, cfg.hf_use_auth_token, streaming=True
)
if isinstance(raw_dataset, (DatasetDict, IterableDatasetDict)):
if dataset_config.split and dataset_config.split in raw_dataset:
raw_dataset = raw_dataset[dataset_config.split]
elif split in raw_dataset:
raw_dataset = raw_dataset[split]
else:
raise ValueError(
f"no {split} split found for dataset {dataset_config.path}, "
"you may specify a split with 'split: ...'"
)
datasets.append(raw_dataset)
if len(datasets) == 1:
return datasets[0]
else:
return merge_datasets(datasets, cfg)
def _process_eval_dataset_minimal(
raw_dataset: IterableDataset,
cfg: DictDefault,
tokenizer: PreTrainedTokenizer,
processor: ProcessorMixin | None,
) -> Dataset | None:
LOG.info("Eval dataset processing skipped for streaming")
return None
def _prepare_streaming_pretraining_dataset(
cfg: DictDefault,
tokenizer: PreTrainedTokenizer,
processor: ProcessorMixin | None,
) -> tuple[IterableDataset, Dataset | None, int, list[Prompter | None]]:
pretraining_config = _extract_pretraining_config(cfg)
train_dataset = load_dataset_with_config(
pretraining_config, cfg.hf_use_auth_token, streaming=True
)
if isinstance(train_dataset, (DatasetDict, IterableDatasetDict)):
if pretraining_config.split and pretraining_config.split in train_dataset:
train_dataset = train_dataset[pretraining_config.split]
elif "train" in train_dataset:
train_dataset = train_dataset["train"]
else:
raise ValueError("no train split found for pretraining dataset")
if not cfg.max_steps:
raise ValueError("max_steps must be set when using streaming datasets")
total_num_steps = cfg.max_steps
LOG.info(f"Maximum steps: {total_num_steps}")
return train_dataset, None, total_num_steps, []

View File

@@ -190,12 +190,18 @@ def handle_long_seq_in_dataset(
Returns:
Filtered dataset with long sequences removed.
"""
if "input_ids" not in dataset.column_names:
LOG.warning(
"Dataset does not contain 'input_ids' column. Skip drop long seq. This is "
"expected for reward modeling."
)
return dataset
if hasattr(dataset, "column_names") and dataset.column_names:
if "input_ids" not in dataset.column_names:
LOG.warning(
"Dataset does not contain 'input_ids' column. Skip drop long seq. This is "
"expected for reward modeling."
)
return dataset
else:
# For IterableDataset, we can't check columns upfront, so skip for streaming
if isinstance(dataset, IterableDataset):
LOG.info("Skipping drop_long_seq for streaming datasets (not compatible)")
return dataset
drop_long = functools.partial(
drop_long_seq,

View File

@@ -932,6 +932,34 @@ class AxolotlInputConfig(
fix_untrained_tokens: int | list[int] | None = None
streaming: bool | None = Field(
default=None,
json_schema_extra={
"description": "Whether to use streaming datasets (IterableDataset) for processing large datasets that don't fit in memory. When True, data is loaded on-demand during training without upfront preprocessing. Requires max_steps to be set. Pre-training datasets default to streaming unless explicitly set to False."
},
)
streaming_dataset_mixing_strategy: str | None = Field(
default="round_robin",
json_schema_extra={
"description": "Strategy for mixing multiple streaming datasets: 'round_robin' (equal sampling), 'weighted' (use streaming_mixing_weights), or 'random' (random sampling with equal probability)."
},
)
streaming_mixing_weights: list[float] | None = Field(
default=None,
json_schema_extra={
"description": "Weights for weighted mixing strategy when using multiple streaming datasets. Must sum to 1.0 and have same length as datasets list. Only used when streaming_dataset_mixing_strategy='weighted'."
},
)
streaming_buffer_per_dataset: int | None = Field(
default=1000,
json_schema_extra={
"description": "Buffer size per dataset when mixing multiple streaming datasets. Higher values may improve mixing quality but use more memory."
},
)
# INTERNALS - document for now, generally not set externally
is_preprocess: bool | None = None
preprocess_iterable: bool | None = None

View File

@@ -1337,6 +1337,30 @@ class GRPOVllmValidationMixin:
# pylint: disable=too-many-ancestors
class StreamingValidationMixin:
"""Validation methods related to streaming datasets."""
@model_validator(mode="after")
def check_streaming_requires_max_steps(self):
"""Ensure max_steps is set when using streaming datasets."""
# Check if streaming is explicitly enabled
streaming_enabled = getattr(self, "streaming", None) is True
# Check if pretraining dataset exists (defaults to streaming)
has_pretraining = getattr(self, "pretraining_dataset", None) is not None
streaming_default_for_pretraining = (
has_pretraining and getattr(self, "streaming", None) is None
)
# If streaming is enabled (explicitly or by default for pretraining)
if streaming_enabled or streaming_default_for_pretraining:
max_steps = getattr(self, "max_steps", None)
if not max_steps:
raise ValueError("max_steps must be set when using streaming datasets")
return self
class ValidationMixin(
DatasetValidationMixin,
AttentionValidationMixin,
@@ -1347,6 +1371,7 @@ class ValidationMixin(
SystemValidationMixin,
ChatTemplateValidationMixin,
PretrainingValidationMixin,
StreamingValidationMixin,
ModelCompatibilityValidationMixin,
ComplexValidationMixin,
GRPOVllmValidationMixin,