Compare commits

..

13 Commits

Author SHA1 Message Date
Dan Saunders
7eba3795fe fixes 2025-08-22 16:02:30 +00:00
Dan Saunders
1b7b67d06e smoke test 2025-08-22 16:02:30 +00:00
Dan Saunders
0843dc678a separate out train and eval datasets streaming; cleanup 2025-08-22 16:02:30 +00:00
Dan Saunders
067158e24a nits 2025-08-22 16:02:30 +00:00
Dan Saunders
aa5a497a2c nits 2025-08-22 16:02:30 +00:00
Dan Saunders
2176962231 separate out train and eval dataset streaming 2025-08-22 16:02:30 +00:00
Dan Saunders
10335d5df9 add multidata strats 2025-08-22 16:02:30 +00:00
Dan Saunders
e4e8ffd40c nits 2025-08-22 16:02:30 +00:00
Dan Saunders
846aa41baa nits 2025-08-22 16:02:30 +00:00
Dan Saunders
7bb52d00bb progress on streaming 2025-08-22 16:02:30 +00:00
Dan Saunders
3b2dd05798 remove iterable CLI arg 2025-08-22 16:02:30 +00:00
Dan Saunders
b6431083be nit 2025-08-22 16:02:30 +00:00
Dan Saunders
16ff01df85 separate streaming and pretraining 2025-08-22 16:02:30 +00:00
17 changed files with 996 additions and 207 deletions

View File

@@ -13,12 +13,6 @@ class PreprocessCliArgs:
debug_num_examples: int = field(default=1)
prompter: Optional[str] = field(default=None)
download: Optional[bool] = field(default=True)
iterable: Optional[bool] = field(
default=None,
metadata={
"help": "Use IterableDataset for streaming processing of large datasets"
},
)
@dataclass

View File

@@ -6,6 +6,7 @@ from dataclasses import dataclass
from datasets import Dataset
import axolotl.monkeypatch.data.batch_dataset_fetcher # pylint: disable=unused-import # noqa: F401
from axolotl.cli.args import PreprocessCliArgs, TrainerCliArgs
from axolotl.loaders import load_processor, load_tokenizer
from axolotl.utils.data import prepare_datasets, prepare_preference_datasets
@@ -54,13 +55,11 @@ def load_datasets(
"""
tokenizer = load_tokenizer(cfg)
processor = load_processor(cfg, tokenizer=tokenizer) if cfg.processor_type else None
preprocess_iterable = getattr(cli_args, "iterable", False)
train_dataset, eval_dataset, total_num_steps, prompters = prepare_datasets(
cfg,
tokenizer,
processor=processor,
preprocess_iterable=preprocess_iterable,
)
if (

View File

@@ -476,8 +476,6 @@ class HFCausalTrainerBuilder(TrainerBuilderBase):
)
):
collator = V2BatchSamplerDataCollatorForSeq2Seq
if self.cfg.squash_position_ids:
kwargs["squash_position_ids"] = True
else:
collator = BatchSamplerDataCollatorForSeq2Seq
else:

View File

@@ -1,4 +1,14 @@
"""Module containing Dataset functionality"""
"""
Module containing dataset functionality.
We want this to be a wrapper for an existing dataset that we have loaded. Lets use the
concept of middlewares to wrap each dataset, for example:
ConstantLengthDataset(ShuffledDataset([TokenizedPromptDataset(alpaca_dataset)])).
Let's check to ensure we don't truncate an item in the middle. We'll use the collators
later on to pad the datasets.
"""
from typing import Any
import torch
from datasets import Dataset, IterableDataset
@@ -7,12 +17,6 @@ from axolotl.utils.logging import get_logger
from .prompt_tokenizers import PromptTokenizingStrategy
# We want this to be a wrapper for an existing dataset that we have loaded
# lets use the concept of middlewares to wrap each dataset, for example
# ConstantLengthDataset(ShuffledDataset([TokenizedPromptDataset(alpaca_dataset)]))
# let's check to ensure we don't truncate an item in the middle, we'll use
# the collators later on to pad the datasets
LOG = get_logger(__name__)
@@ -42,10 +46,15 @@ class TokenizedPromptDataset(Dataset):
**kwargs,
)
def process(self, dataset):
features = dataset.features.keys()
def process(self, dataset: Dataset | IterableDataset) -> Dataset | IterableDataset:
"""Apply filtering and tokenization."""
# For IterableDataset, we can't access features up front. Anyways, we don't care
# to remove unused columns from streaming datasets.
features = None
if not isinstance(dataset, IterableDataset):
features = dataset.features.keys()
map_kwargs = {}
map_kwargs: dict[str, Any] = {}
if self.prompt_tokenizer.supports_batched:
map_kwargs["batched"] = True
map_kwargs["batch_size"] = 1_000
@@ -54,18 +63,28 @@ class TokenizedPromptDataset(Dataset):
hasattr(self.prompt_tokenizer, "filter_rows")
and self.prompt_tokenizer.filter_rows
):
filter_kwargs: dict[str, Any] = {"desc": "Strategy Filtering Rows"}
if not isinstance(dataset, IterableDataset):
filter_kwargs["num_proc"] = self.process_count
dataset = dataset.filter(
self.prompt_tokenizer.filter_rows,
num_proc=self.process_count,
desc="Strategy Filtering Rows",
**filter_kwargs,
)
map_kwargs = {
**map_kwargs,
"desc": "Tokenizing Prompts",
}
# Only add remove_columns for regular datasets
if not isinstance(dataset, IterableDataset):
map_kwargs["remove_columns"] = features
map_kwargs["num_proc"] = self.process_count
map_kwargs["keep_in_memory"] = self.keep_in_memory
return dataset.map(
self.prompt_tokenizer.tokenize_prompt,
num_proc=self.process_count,
remove_columns=features,
keep_in_memory=self.keep_in_memory,
desc="Tokenizing Prompts",
**map_kwargs,
)
@@ -79,16 +98,15 @@ def wrap_dataset_for_tokenized_prompt(
map_kwargs = {}
if prompt_tokenizer.supports_batched:
map_kwargs["batched"] = True
features = list(dataset.features.keys())
return dataset.map(
prompt_tokenizer.tokenize_prompt,
remove_columns=features,
**map_kwargs,
)
return TokenizedPromptDataset(prompt_tokenizer, dataset, **kwargs)
# TODO this isn't the best since it can't interleave datasets
# TODO: this isn't the best since it can't interleave datasets.
# NOTE: this is only used in a test. Can it be deleted?
class ConstantLengthDataset(IterableDataset):
"""Iterable dataset that returns constant length chunks of tokens from stream of
text files.

View File

@@ -277,14 +277,6 @@ class PatchManager:
has_remote_code=has_remote_code,
)
if self.cfg.sample_packing:
from axolotl.monkeypatch.data.batch_dataset_fetcher import (
apply_multipack_dataloader_patch,
)
LOG.info("Applying multipack dataloader patch for sample packing...")
apply_multipack_dataloader_patch()
def _apply_fsdp2_bnb_patches(self):
"""Apply FSDP2 BNB patches."""
if (

View File

@@ -1,4 +1,4 @@
"""Monkey patches for the dataset fetcher to handle batches of packed indexes."""
"""monkey patches for the dataset fetcher to handle batches of packed indexes"""
# pylint: disable=protected-access
@@ -6,20 +6,10 @@ import torch
from torch.utils.data._utils.fetch import _BaseDatasetFetcher
from torch.utils.data._utils.worker import _worker_loop
_ORIGINAL_MAP_DATASET_FETCHER = None
_ORIGINAL_WORKER_LOOP = None
_IS_PATCHED = False
class _MapDatasetFetcher(_BaseDatasetFetcher):
"""
Custom dataset fetcher that handles nested batch structures from
MultipackBatchSampler.
"""
def fetch(self, possibly_batched_index):
if isinstance(possibly_batched_index[0], list):
# Handle nested structure from MultipackBatchSampler
data = [None for i in possibly_batched_index]
for i, possibly_batched_index_ in enumerate(possibly_batched_index):
if self.auto_collation:
@@ -33,7 +23,6 @@ class _MapDatasetFetcher(_BaseDatasetFetcher):
else:
data[i] = self.dataset[possibly_batched_index_]
else:
# Standard batch handling
if self.auto_collation:
if hasattr(self.dataset, "__getitems__") and self.dataset.__getitems__:
data = self.dataset.__getitems__(possibly_batched_index)
@@ -45,54 +34,14 @@ class _MapDatasetFetcher(_BaseDatasetFetcher):
def patch_fetchers():
"""Apply patches to PyTorch's DataLoader components."""
torch.utils.data._utils.fetch._MapDatasetFetcher = _MapDatasetFetcher
torch.utils.data.dataloader._utils.fetch._MapDatasetFetcher = _MapDatasetFetcher
def patched_worker_loop(*args, **kwargs):
"""Worker loop that ensures patches are applied in worker processes."""
patch_fetchers()
return _worker_loop(*args, **kwargs)
def apply_multipack_dataloader_patch():
"""
This patch allows DataLoader to correctly process batches that contain multiple bins
of packed sequences.
"""
# pylint: disable=global-statement
global _ORIGINAL_MAP_DATASET_FETCHER, _ORIGINAL_WORKER_LOOP, _IS_PATCHED
if _IS_PATCHED:
return
# Store original implementations
_ORIGINAL_MAP_DATASET_FETCHER = torch.utils.data._utils.fetch._MapDatasetFetcher
_ORIGINAL_WORKER_LOOP = torch.utils.data._utils.worker._worker_loop
# Apply patches
patch_fetchers()
torch.utils.data._utils.worker._worker_loop = patched_worker_loop
_IS_PATCHED = True
def remove_multipack_dataloader_patch():
"""Remove the monkeypatch and restore original PyTorch DataLoader behavior."""
# pylint: disable=global-statement
global _IS_PATCHED
if not _IS_PATCHED:
return
if _ORIGINAL_MAP_DATASET_FETCHER:
torch.utils.data._utils.fetch._MapDatasetFetcher = _ORIGINAL_MAP_DATASET_FETCHER
torch.utils.data.dataloader._utils.fetch._MapDatasetFetcher = (
_ORIGINAL_MAP_DATASET_FETCHER
)
if _ORIGINAL_WORKER_LOOP:
torch.utils.data._utils.worker._worker_loop = _ORIGINAL_WORKER_LOOP
_IS_PATCHED = False
torch.utils.data._utils.worker._worker_loop = patched_worker_loop
patch_fetchers()

View File

@@ -9,6 +9,7 @@ from datasets import (
Dataset,
DatasetDict,
IterableDataset,
IterableDatasetDict,
load_dataset,
)
from transformers import PreTrainedTokenizer, ProcessorMixin
@@ -43,12 +44,53 @@ from axolotl.utils.trainer import (
LOG = get_logger(__name__)
def _is_streaming_enabled_for_split(
cfg: DictDefault, split: Literal["train", "test"]
) -> bool:
"""Check if streaming is enabled for a specific split."""
if split == "test":
# For eval datasets, check eval_streaming first, then fall back to streaming
eval_streaming = cfg.get("eval_streaming")
if eval_streaming is not None:
return eval_streaming
# Fall back to main streaming setting
streaming = cfg.get("streaming")
if streaming is True:
return True
# Check if pretraining dataset exists (defaults to streaming)
has_pretraining = cfg.get("pretraining_dataset") is not None
streaming_default_for_pretraining = has_pretraining and streaming is None
return streaming_default_for_pretraining
def _get_streaming_config_for_split(
cfg: DictDefault, split: Literal["train", "test"]
) -> DictDefault:
"""Get a modified config object with split-specific streaming settings."""
if split != "test":
return cfg
# Override with eval-specific configs if they exist
streaming_cfg = DictDefault(cfg)
eval_strategy = cfg.get("eval_dataset_mixing_strategy")
eval_weights = cfg.get("eval_mixing_weights")
if eval_strategy is not None:
streaming_cfg["dataset_mixing_strategy"] = eval_strategy
if eval_weights is not None:
streaming_cfg["mixing_weights"] = eval_weights
return streaming_cfg
@retry_on_request_exceptions(max_retries=3, delay=5)
def prepare_datasets(
cfg: DictDefault,
tokenizer: PreTrainedTokenizer,
processor: ProcessorMixin | None = None,
preprocess_iterable: bool = False,
) -> tuple[IterableDataset | Dataset, Dataset | None, int, list[Prompter | None]]:
"""Prepare training and evaluation datasets based on configuration.
@@ -56,23 +98,19 @@ def prepare_datasets(
cfg: Dictionary mapping `axolotl` config keys to values.
tokenizer: Tokenizer to use for processing text.
processor: Optional processor for multimodal datasets.
preprocess_iterable: Whether to use iterable preprocessing.
Returns:
Tuple of (train_dataset, eval_dataset, total_steps, prompters).
"""
if cfg.pretraining_dataset:
return _prepare_pretraining_dataset(
cfg, tokenizer, processor, preprocess_iterable
)
return _prepare_standard_dataset(cfg, tokenizer, processor, preprocess_iterable)
return _prepare_pretraining_dataset(cfg, tokenizer, processor)
return _prepare_standard_dataset(cfg, tokenizer, processor)
def _prepare_standard_dataset(
cfg: DictDefault,
tokenizer: PreTrainedTokenizer,
processor: ProcessorMixin | None,
preprocess_iterable: bool,
) -> tuple[Dataset, Dataset | None, int, list[Prompter | None]]:
"""Prepare standard (non-pretraining) datasets."""
@@ -83,7 +121,6 @@ def _prepare_standard_dataset(
cfg,
split="train",
processor=processor,
preprocess_iterable=preprocess_iterable,
)
# Overwrite eval_dataset if test data exists
@@ -93,7 +130,6 @@ def _prepare_standard_dataset(
cfg,
split="test",
processor=processor,
preprocess_iterable=preprocess_iterable,
)
return train_dataset, eval_dataset, prompters
@@ -109,7 +145,13 @@ def _prepare_standard_dataset(
return train_dataset, eval_dataset, -1, prompters
# Validate sample packing configuration for evaluation
if eval_dataset and cfg.sample_packing and cfg.eval_sample_packing is not False:
# Skip validation for streaming eval datasets since theWhat hy don't have a calculable length
if (
eval_dataset
and cfg.sample_packing
and cfg.eval_sample_packing is not False
and not isinstance(eval_dataset, IterableDataset)
):
total_eval_steps = calculate_total_num_steps(cfg, eval_dataset, update=False)
if total_eval_steps == 0:
raise ValueError(
@@ -117,13 +159,17 @@ def _prepare_standard_dataset(
"You should set `eval_sample_packing: False` in your config."
)
# Calculate total number of training steps
if cfg.max_steps:
total_num_steps = min(
calculate_total_num_steps(cfg, train_dataset), cfg.max_steps
)
# Set total_num_steps for training
if isinstance(train_dataset, IterableDataset):
total_num_steps = cfg.max_steps
else:
total_num_steps = calculate_total_num_steps(cfg, train_dataset)
if cfg.max_steps:
total_num_steps = min(
calculate_total_num_steps(cfg, train_dataset), cfg.max_steps
)
else:
total_num_steps = calculate_total_num_steps(cfg, train_dataset)
LOG.info(f"Maximum number of steps set at {total_num_steps}")
return train_dataset, eval_dataset, total_num_steps, prompters
@@ -132,7 +178,6 @@ def _prepare_pretraining_dataset(
cfg: DictDefault,
tokenizer: PreTrainedTokenizer,
processor: ProcessorMixin | None,
preprocess_iterable: bool,
) -> tuple[IterableDataset, Dataset | None, int, list[Prompter | None]]:
"""
Prepare dataset for pretraining mode.
@@ -153,7 +198,6 @@ def _prepare_pretraining_dataset(
cfg,
split="test",
processor=processor,
preprocess_iterable=preprocess_iterable,
)
if cfg.dataset_exact_deduplication:
@@ -256,7 +300,6 @@ def _load_tokenized_prepared_datasets(
cfg: DictDefault,
split: Literal["train", "test"] = "train",
processor: ProcessorMixin | None = None,
preprocess_iterable: bool = False,
) -> tuple[Dataset | DatasetDict, list[Prompter | None]]:
"""Load or create tokenized and prepared datasets for training or testing.
@@ -265,39 +308,51 @@ def _load_tokenized_prepared_datasets(
cfg: Configuration object.
split: Dataset split to load ('train' or 'test').
processor: Optional processor for multimodal datasets.
preprocess_iterable: Whether to use iterable preprocessing.
Returns:
Tuple of (dataset, prompters list).
"""
# Select correct dataset configuration based on split
datasets_configs = cfg.datasets if split == "train" else cfg.test_datasets
# Generate dataset hash for caching
dataset_hash = generate_dataset_hash_from_config(
cfg, datasets_configs, tokenizer.name_or_path
)
# Try loading from hub if push_dataset_to_hub is configured
dataset = None
if cfg.push_dataset_to_hub:
dataset = try_load_from_hub(cfg, dataset_hash, split)
# If not found on hub, try loading from disk
if dataset is None:
dataset = load_preprocessed_dataset(cfg, dataset_hash)
# If not found on disk or skipping prepared dataset, load and process raw datasets
prompters: list[Prompter | None] = []
if dataset is None:
# Check if streaming is enabled for this split
use_streaming = _is_streaming_enabled_for_split(cfg, split)
if use_streaming:
# For streaming datasets, skip caching and load raw datasets directly
streaming_cfg = _get_streaming_config_for_split(cfg, split)
dataset, prompters = _load_raw_datasets(
cfg,
streaming_cfg,
datasets_configs,
tokenizer,
split,
processor,
preprocess_iterable,
)
else:
# Generate dataset hash for caching
dataset_hash = generate_dataset_hash_from_config(
cfg, datasets_configs, tokenizer.name_or_path
)
# Try loading from hub if push_dataset_to_hub is configured
dataset = None
if cfg.push_dataset_to_hub:
dataset = try_load_from_hub(cfg, dataset_hash, split)
# If not found on hub, try loading from disk
if dataset is None:
dataset = load_preprocessed_dataset(cfg, dataset_hash)
# If not found on disk or skipping prepared dataset, load and process raw
# datasets
if dataset is None:
dataset, prompters = _load_raw_datasets(
cfg,
datasets_configs,
tokenizer,
split,
processor,
)
return dataset, prompters
@@ -306,9 +361,8 @@ def _load_raw_datasets(
cfg: DictDefault,
datasets_configs: list,
tokenizer: PreTrainedTokenizer,
split: str,
split: Literal["train", "test"],
processor: ProcessorMixin | None = None,
preprocess_iterable: bool = False,
) -> tuple[Dataset, list[Prompter | None]]:
"""Load, process, merge, and save raw datasets."""
LOG.info("Loading raw datasets...", main_process_only=False)
@@ -329,7 +383,6 @@ def _load_raw_datasets(
split=split,
seed=cfg.seed,
processor=processor,
preprocess_iterable=preprocess_iterable,
)
datasets.append(dataset_wrapper)
prompters.append(dataset_prompter)
@@ -345,11 +398,12 @@ def _load_raw_datasets(
if cfg.sample_packing:
dataset, _ = process_datasets_for_packing(cfg, dataset, None)
# Save the prepared dataset
dataset_hash = generate_dataset_hash_from_config(
cfg, datasets_configs, tokenizer.name_or_path
)
save_preprocessed_dataset(cfg, dataset, dataset_hash, split)
# Only save regular datasets to disk, not streaming datasets
if not isinstance(dataset, IterableDataset):
dataset_hash = generate_dataset_hash_from_config(
cfg, datasets_configs, tokenizer.name_or_path
)
save_preprocessed_dataset(cfg, dataset, dataset_hash, split)
return dataset, prompters
@@ -358,22 +412,19 @@ def _load_and_process_single_dataset(
dataset_config: DictDefault,
cfg: DictDefault,
tokenizer: PreTrainedTokenizer,
split: str,
split: Literal["train", "test"],
seed: int,
processor: ProcessorMixin | None = None,
preprocess_iterable: bool = False,
) -> tuple[Dataset | IterableDataset, Prompter | None]:
"""Load and process a single dataset based on the passed config."""
# Load the dataset
use_streaming_for_split = _is_streaming_enabled_for_split(cfg, split)
dataset = load_dataset_with_config(
dataset_config, cfg.hf_use_auth_token, streaming=preprocess_iterable
dataset_config, cfg.hf_use_auth_token, use_streaming_for_split
)
# Parse dataset type
d_base_type, d_prompt_style = _parse_dataset_type(dataset_config.type)
# Select the appropriate split
if isinstance(dataset, DatasetDict):
if isinstance(dataset, (DatasetDict, IterableDatasetDict)):
if dataset_config.split and dataset_config.split in dataset:
dataset = dataset[dataset_config.split]
elif split in dataset:
@@ -418,11 +469,13 @@ def _parse_dataset_type(d_type: str) -> tuple[str | None, str | None]:
def _handle_train_dataset_split(
dataset: Dataset, cfg: DictDefault
) -> tuple[Dataset, Dataset | None]:
dataset: Dataset | IterableDataset, cfg: DictDefault
) -> tuple[Dataset | IterableDataset, Dataset | IterableDataset | None]:
"""Handle processing for train split, including validation set creation."""
val_set_size = (
int(cfg.val_set_size) if cfg.val_set_size > 1 else float(cfg.val_set_size)
int(cfg.val_set_size)
if cfg.val_set_size and cfg.val_set_size > 1
else float(cfg.val_set_size or 0.0)
)
if val_set_size:
@@ -433,27 +486,33 @@ def _handle_train_dataset_split(
return train_dataset, eval_dataset
# No validation split - apply deduplication if needed and return as train dataset
if cfg.dataset_exact_deduplication:
if cfg.dataset_exact_deduplication and not isinstance(dataset, IterableDataset):
train_dataset, _ = deduplicate_and_log_datasets(dataset=dataset)
else:
if cfg.dataset_exact_deduplication and isinstance(dataset, IterableDataset):
LOG.info("Deduplication skipped for streaming datasets (not compatible)")
train_dataset = dataset
return train_dataset, None
def _handle_test_dataset_split(
dataset: Dataset, cfg: DictDefault
) -> tuple[None, Dataset | None]:
dataset: Dataset | IterableDataset, cfg: DictDefault
) -> tuple[None, Dataset | IterableDataset | None]:
"""Handle processing for test split."""
if cfg.dataset_exact_deduplication:
if cfg.dataset_exact_deduplication and not isinstance(dataset, IterableDataset):
eval_dataset, _ = deduplicate_and_log_datasets(dataset=dataset)
else:
if cfg.dataset_exact_deduplication and isinstance(dataset, IterableDataset):
LOG.info("Deduplication skipped for streaming datasets (not compatible)")
eval_dataset = dataset
return None, eval_dataset
def _apply_dataset_sharding(dataset: Dataset, cfg: DictDefault) -> Dataset:
def _apply_dataset_sharding(
dataset: Dataset | IterableDataset, cfg: DictDefault
) -> Dataset | IterableDataset:
"""Apply dataset sharding if configured.
Args:
@@ -479,7 +538,6 @@ def _load_and_prepare_datasets(
cfg: DictDefault,
split: Literal["train", "test"] = "train",
processor: ProcessorMixin | None = None,
preprocess_iterable: bool = False,
) -> tuple[Dataset | None, Dataset | None, list[Prompter | None]]:
"""Load and prepare datasets with optional validation split and sharding.
@@ -488,7 +546,6 @@ def _load_and_prepare_datasets(
cfg: Configuration object.
split: Dataset split to load ('train' or 'test').
processor: Optional processor for multimodal datasets.
preprocess_iterable: Whether to use iterable preprocessing.
Returns:
Tuple of (train_dataset, eval_dataset, prompters).
@@ -499,7 +556,6 @@ def _load_and_prepare_datasets(
cfg,
split=split,
processor=processor,
preprocess_iterable=preprocess_iterable,
)
# Apply dataset sharding if configured using shared function

View File

@@ -13,6 +13,7 @@ from datasets import (
IterableDataset,
IterableDatasetDict,
concatenate_datasets,
interleave_datasets,
load_dataset,
load_from_disk,
)
@@ -524,7 +525,9 @@ def generate_dataset_hash_from_config(
return str(md5(config_str))
def merge_datasets(datasets: list[Dataset], cfg: DictDefault) -> Dataset:
def merge_datasets(
datasets: list[Dataset | IterableDataset], cfg: DictDefault
) -> Dataset | IterableDataset:
"""Merge multiple datasets into one with optional shuffling.
Args:
@@ -537,23 +540,23 @@ def merge_datasets(datasets: list[Dataset], cfg: DictDefault) -> Dataset:
if len(datasets) == 1:
ds = datasets[0]
# Do not shuffle if curriculum sampling is enabled or
# shuffle_merged_datasets is disabled
if cfg.curriculum_sampling or not cfg.shuffle_merged_datasets:
if (
cfg.curriculum_sampling
or not cfg.shuffle_merged_datasets
or isinstance(ds, IterableDataset)
):
return ds
return ds.shuffle(seed=cfg.seed)
# If enabled, shuffle each dataset independently before merging.
# This allows curriculum learning strategies to be applied at the dataset level.
if cfg.shuffle_before_merging_datasets:
if cfg.shuffle_before_merging_datasets and all(
isinstance(ds, Dataset) for ds in datasets
):
LOG.info("Shuffling each dataset individually before merging...")
datasets = [ds.shuffle(seed=cfg.seed) for ds in datasets]
LOG.info("Merging datasets...")
merged_dataset = concatenate_datasets(datasets)
merged_dataset = _merge_datasets_with_strategy(datasets, cfg)
if cfg.shuffle_merged_datasets:
if cfg.shuffle_merged_datasets and not isinstance(merged_dataset, IterableDataset):
LOG.debug("Shuffling merged datasets...")
if cfg.curriculum_sampling:
LOG.warning(
@@ -562,6 +565,47 @@ def merge_datasets(datasets: list[Dataset], cfg: DictDefault) -> Dataset:
)
merged_dataset = merged_dataset.shuffle(seed=cfg.seed)
else:
LOG.debug("Not shuffling merged datasets.")
if isinstance(merged_dataset, IterableDataset):
LOG.debug("Skipping shuffle for streaming datasets.")
else:
LOG.debug("Not shuffling merged datasets.")
return merged_dataset
def _merge_datasets_with_strategy(
datasets: list[Dataset | IterableDataset], cfg: DictDefault
) -> Dataset | IterableDataset:
"""
Merge datasets using the configured mixing strategy. Works with streaming and non-
streaming datasets.
Args:
datasets: List of datasets to merge.
cfg: Configuration object containing mixing settings.
Returns:
Merged dataset (Dataset or IterableDataset depending on inputs).
"""
strategy = cfg.get("dataset_mixing_strategy", "concatenate")
weights = cfg.get("mixing_weights", None)
LOG.info(f"Merging datasets with mixing strategy: {strategy}...")
if strategy == "concatenate":
# Concatenate only works with non-iterable datasets
if not all(isinstance(ds, Dataset) for ds in datasets):
raise ValueError(
"Cannot concatenate streaming datasets. Use 'round_robin', 'weighted', "
"or 'random' instead."
)
return concatenate_datasets(datasets)
if strategy == "round_robin":
return interleave_datasets(datasets, seed=cfg.seed)
if strategy == "weighted":
return interleave_datasets(datasets, probabilities=weights, seed=cfg.seed)
if strategy == "random":
# Random sampling with equal probability
equal_weights = [1.0 / len(datasets)] * len(datasets)
return interleave_datasets(datasets, probabilities=equal_weights, seed=cfg.seed)
raise ValueError(f"Unknown dataset mixing strategy: {strategy}")

View File

@@ -190,11 +190,15 @@ def handle_long_seq_in_dataset(
Returns:
Filtered dataset with long sequences removed.
"""
if "input_ids" not in dataset.column_names:
LOG.warning(
"Dataset does not contain 'input_ids' column. Skip drop long seq. This is "
"expected for reward modeling."
)
if hasattr(dataset, "column_names") and dataset.column_names:
if "input_ids" not in dataset.column_names:
LOG.warning(
"Dataset does not contain 'input_ids' column. Skip drop long seq. This "
"is expected for reward modeling."
)
return dataset
elif isinstance(dataset, IterableDataset):
LOG.info("Skipping drop_long_seq for streaming datasets (not compatible)")
return dataset
drop_long = functools.partial(

View File

@@ -100,10 +100,6 @@ def get_dataset_wrapper(
dataset_config, tokenizer, cfg, dataset, dataset_kwargs
)
# Skip preparation if configured
if cfg.skip_prepare_dataset:
return dataset, None
# Bradley-Terry dataset
if dataset_config.type.startswith("bradley_terry"):
return _handle_bradley_terry_dataset(

View File

@@ -459,12 +459,6 @@ class AxolotlInputConfig(
"description": "The multiprocessing start method to use for packing. Should be 'fork', 'spawn' or 'forkserver'"
},
)
squash_position_ids: bool | None = Field(
default=None,
json_schema_extra={
"description": "Whether to squash position_ids for packing, effectively extending context length."
},
)
eval_sample_packing: bool | None = Field(
default=None,
json_schema_extra={
@@ -938,9 +932,45 @@ class AxolotlInputConfig(
fix_untrained_tokens: int | list[int] | None = None
streaming: bool | None = Field(
default=None,
json_schema_extra={
"description": "Whether to use streaming datasets (IterableDataset) for training datasets. When True, data is loaded on-demand during training without upfront preprocessing. Requires max_steps to be set. Pre-training datasets default to streaming unless explicitly set to False."
},
)
eval_streaming: bool | None = Field(
default=None,
json_schema_extra={
"description": "Whether to use streaming datasets for evaluation datasets. If not set, falls back to the 'streaming' setting. Useful for streaming large training data while keeping smaller eval datasets in memory."
},
)
dataset_mixing_strategy: str | None = Field(
default="round_robin",
json_schema_extra={
"description": "Strategy for mixing multiple datasets: 'concatenate', 'round_robin' (equal sampling), 'weighted' (use mixing_weights), or 'random' (random sampling with equal probability). Works for both streaming and non-streaming datasets."
},
)
mixing_weights: list[float] | None = Field(
default=None,
json_schema_extra={
"description": "Weights for weighted mixing strategy when using multiple datasets. Must sum to 1.0 and have same length as datasets list. Only used when dataset_mixing_strategy='weighted'."
},
)
eval_dataset_mixing_strategy: str | None = Field(
default=None,
json_schema_extra={
"description": "Strategy for mixing multiple evaluation datasets. If not set, falls back to dataset_mixing_strategy. Options: 'concatenate', 'round_robin', 'weighted', 'random'."
},
)
eval_mixing_weights: list[float] | None = Field(
default=None,
json_schema_extra={
"description": "Weights for weighted mixing strategy for evaluation datasets. Must sum to 1.0 and have same length as evaluation datasets list."
},
)
# INTERNALS - document for now, generally not set externally
is_preprocess: bool | None = None
preprocess_iterable: bool | None = None
total_num_tokens: int | None = Field(
default=None,

View File

@@ -161,7 +161,12 @@ class HyperparametersConfig(BaseModel):
max_grad_norm: float | None = Field(
default=None, json_schema_extra={"description": "Gradient clipping max norm"}
)
num_epochs: float = Field(default=1.0)
num_epochs: float = Field(
default=1.0,
json_schema_extra={
"description": "Number of iterations over dataset for training"
},
)
@field_validator("batch_size")
@classmethod

View File

@@ -3,6 +3,7 @@
# pylint: disable=too-many-boolean-expressions
import json
import os
import sys
import tempfile
from pathlib import Path
@@ -192,6 +193,7 @@ class AttentionValidationMixin:
return data
# pylint: disable=too-many-public-methods
class TrainingValidationMixin:
"""Validation methods related to training configuration."""
@@ -508,11 +510,58 @@ class TrainingValidationMixin:
# combining these would raise `TypeError: cannot pickle 'dict_keys' object`
# due to trying to count the number of tokens total in the dataset
raise ValueError(
"pretraining_dataset and include_tokens_per_second cannot be used together."
"pretraining_dataset and include_tokens_per_second cannot be used "
"together."
)
return data
@model_validator(mode="before")
@classmethod
def check_max_steps_num_epochs_conflict(cls, data):
"""Handle max_steps and num_epochs configuration and auto-set defaults."""
max_steps = data.get("max_steps")
num_epochs = data.get("num_epochs")
# Auto-set num_epochs to 1 if neither max_steps nor num_epochs are set
if max_steps is None and num_epochs is None:
data["num_epochs"] = 1.0
return data
@model_validator(mode="before")
@classmethod
def check_saves_per_epoch_conflicts(cls, data):
"""Ensure saves_per_epoch is compatible with training configuration."""
saves_per_epoch = data.get("saves_per_epoch")
num_epochs = data.get("num_epochs")
if saves_per_epoch is not None:
# Check if saves_per_epoch is set but num_epochs is unset
if num_epochs is None:
raise ValueError(
"saves_per_epoch requires num_epochs to be set to calculate save "
"intervals."
)
return data
@model_validator(mode="before")
@classmethod
def check_evals_per_epoch_conflicts(cls, data):
"""Ensure evals_per_epoch is compatible with training configuration."""
evals_per_epoch = data.get("evals_per_epoch")
num_epochs = data.get("num_epochs")
if evals_per_epoch is not None:
if num_epochs is None:
raise ValueError(
"evals_per_epoch requires num_epochs to be set to calculate "
"evaluation intervals."
)
return data
class LoRAValidationMixin:
"""Validation methods related to LoRA/QLoRA configuration."""
@@ -1078,6 +1127,30 @@ class PretrainingValidationMixin:
data["accelerator_config"]["dispatch_batches"] = False
return data
@model_validator(mode="before")
@classmethod
def check_streaming_split_batches_accelerate(cls, data):
# Check if either training or eval uses streaming
streaming = data.get("streaming", False)
eval_streaming = data.get("eval_streaming")
if eval_streaming is None:
eval_streaming = streaming
# If either training or eval uses streaming, configure accelerator
if streaming or eval_streaming:
accelerator_config = data.get("accelerator_config", {})
if not accelerator_config:
data["accelerator_config"] = {
"split_batches": False,
"dispatch_batches": False,
}
else:
if accelerator_config.get("split_batches") is None:
data["accelerator_config"]["split_batches"] = False
if accelerator_config.get("dispatch_batches") is None:
data["accelerator_config"]["dispatch_batches"] = False
return data
class ModelCompatibilityValidationMixin:
"""Validation methods for specific model compatibility."""
@@ -1336,6 +1409,168 @@ class GRPOVllmValidationMixin:
return self
class StreamingValidationMixin:
"""Validation methods related to streaming datasets."""
def _is_streaming_enabled(self, context: str = "train") -> bool:
"""Check if streaming is enabled for a given context (train or eval)."""
if context == "eval":
eval_streaming = getattr(self, "eval_streaming", None)
if eval_streaming is not None:
return eval_streaming
# Fall back to main streaming setting
streaming = getattr(self, "streaming", None)
if streaming is True:
return True
# Check if pretraining dataset exists (defaults to streaming)
has_pretraining = getattr(self, "pretraining_dataset", None) is not None
streaming_default_for_pretraining = has_pretraining and streaming is None
return streaming_default_for_pretraining
@model_validator(mode="after")
def check_streaming_requires_max_steps(self):
"""Ensure max_steps is set when using streaming datasets."""
# Check if streaming is enabled for training datasets
if self._is_streaming_enabled("train"):
max_steps = getattr(self, "max_steps", None)
if not max_steps:
raise ValueError("max_steps must be set when using streaming datasets")
return self
@model_validator(mode="after")
def check_streaming_validation_splits_conflict(self):
"""Ensure validation splits are not used with streaming datasets."""
# Check if streaming is enabled for training datasets
if self._is_streaming_enabled("train"):
val_set_size = getattr(self, "val_set_size", 0.0)
if val_set_size and val_set_size > 0:
raise ValueError(
"Validation splits not supported for streaming datasets, skipping"
)
return self
@model_validator(mode="after")
def check_streaming_preprocessing_conflict(self):
"""Ensure preprocessing is not enabled with streaming datasets."""
# Check if streaming is enabled for training or eval datasets
if self._is_streaming_enabled("train") or self._is_streaming_enabled("eval"):
if os.environ.get("AXOLOTL_IS_PREPROCESS") == "1":
raise ValueError("preprocess is not supported for streaming datasets")
return self
@model_validator(mode="after")
def check_streaming_skip_prepare_dataset(self):
"""Ensure skip_prepare_dataset is set for streaming datasets."""
# Check if streaming is enabled for training or eval datasets
if self._is_streaming_enabled("train") or self._is_streaming_enabled("eval"):
skip_prepare = getattr(self, "skip_prepare_dataset", None)
if skip_prepare is False:
LOG.warning(
"skip_prepare_dataset=False is not compatible with streaming "
"datasets. Setting skip_prepare_dataset=True."
)
self.skip_prepare_dataset = True
return self
@model_validator(mode="after")
def check_dataset_mixing_weights(self):
"""Validate dataset mixing weights configuration."""
valid_strategies = ["concatenate", "round_robin", "weighted", "random"]
# Get datasets to validate length against
datasets = getattr(self, "datasets", None)
test_datasets = getattr(self, "test_datasets", None)
# Check main strategy and weights
strategy = getattr(self, "dataset_mixing_strategy", "concatenate")
weights = getattr(self, "mixing_weights", None)
dataset_count = len(datasets) if datasets else 0
self._validate_dataset_strategy_and_weights(
strategy,
weights,
"dataset_mixing_strategy",
"mixing_weights",
valid_strategies,
dataset_count,
)
# Check eval-specific strategy and weights
eval_strategy = getattr(self, "eval_dataset_mixing_strategy", None)
eval_weights = getattr(self, "eval_mixing_weights", None)
if eval_strategy is not None:
eval_dataset_count = len(test_datasets) if test_datasets else dataset_count
self._validate_dataset_strategy_and_weights(
eval_strategy,
eval_weights,
"eval_dataset_mixing_strategy",
"eval_mixing_weights",
valid_strategies,
eval_dataset_count,
)
elif eval_weights is not None:
LOG.warning(
"eval_mixing_weights provided but eval_dataset_mixing_strategy is not set. "
"Weights will be ignored unless eval_dataset_mixing_strategy='weighted'."
)
return self
def _validate_dataset_strategy_and_weights(
self,
strategy,
weights,
strategy_field,
weights_field,
valid_strategies,
dataset_count,
):
"""Helper method to validate dataset mixing strategy and weights pair."""
if strategy not in valid_strategies:
raise ValueError(
f"{strategy_field} must be one of {valid_strategies}, "
f"got '{strategy}'"
)
if strategy == "weighted":
if weights is None:
raise ValueError(
f"{weights_field} must be provided when "
f"{strategy_field}='weighted'"
)
if not isinstance(weights, list) or not all(
isinstance(w, (int, float)) for w in weights
):
raise ValueError(f"{weights_field} must be a list of numbers")
if any(w < 0 for w in weights):
raise ValueError(f"{weights_field} must be non-negative")
if abs(sum(weights) - 1.0) > 1e-6:
raise ValueError(f"{weights_field} must sum to 1.0, got {sum(weights)}")
# Validate weights length against dataset count
if dataset_count > 0 and len(weights) != dataset_count:
raise ValueError(
f"{weights_field} length ({len(weights)}) must match number of datasets ({dataset_count})"
)
elif weights is not None and strategy != "weighted":
LOG.warning(
f"{weights_field} provided but {strategy_field} is '{strategy}'. "
"Weights will be ignored."
)
# pylint: disable=too-many-ancestors
class ValidationMixin(
DatasetValidationMixin,
@@ -1347,6 +1582,7 @@ class ValidationMixin(
SystemValidationMixin,
ChatTemplateValidationMixin,
PretrainingValidationMixin,
StreamingValidationMixin,
ModelCompatibilityValidationMixin,
ComplexValidationMixin,
GRPOVllmValidationMixin,

View File

@@ -547,7 +547,7 @@ def setup_deepspeed_env(cfg, stage=None):
if stage == 3:
os.environ["ACCELERATE_DEEPSPEED_ZERO3_INIT"] = "true"
# NOTE(djsaunde): The distribued state cannot be initialized prior to the
# NOTE(djsaunde): The distributed state cannot be initialized prior to the
# ACCELERATE_USE_DEEPSPEED assignment, but it must be initialized some time prior
# to model load.
if (

261
tests/e2e/test_streaming.py Normal file
View File

@@ -0,0 +1,261 @@
"""E2E tests for streaming dataset functionality"""
import pytest
from axolotl.common.datasets import load_datasets
from axolotl.train import train
from axolotl.utils.config import normalize_config, validate_config
from axolotl.utils.dict import DictDefault
from .utils import check_model_output_exists, check_tensorboard
class TestStreamingDatasets:
"""Test case for streaming datasets with different mixing strategies"""
@pytest.mark.parametrize(
("dataset_mixing_strategy", "mixing_weights"),
[
("round_robin", None),
("weighted", [0.7, 0.3]),
("random", None),
],
)
def test_streaming_dataset_mixing_strategies(
self, temp_dir, dataset_mixing_strategy, mixing_weights
):
"""Test different mixing strategies with streaming datasets"""
cfg = DictDefault(
{
"base_model": "HuggingFaceTB/SmolLM2-135M",
"flash_attention": True,
"sequence_len": 1024,
"sample_packing": False,
"dataset_processes": 1,
"special_tokens": {
"pad_token": "<|endoftext|>",
},
"datasets": [
{
"path": "mhenrichsen/alpaca_2k_test",
"type": "alpaca",
},
{
"path": "tatsu-lab/alpaca",
"type": "alpaca",
},
],
# Streaming config
"streaming": True,
"max_steps": 3, # Very small for smoke test
"dataset_mixing_strategy": dataset_mixing_strategy,
"micro_batch_size": 1,
"gradient_accumulation_steps": 1,
"val_set_size": 0.0,
"output_dir": temp_dir,
"learning_rate": 0.00001,
"optimizer": "adamw_torch_fused",
"lr_scheduler": "cosine",
"save_safetensors": True,
"bf16": "auto",
"use_tensorboard": True,
"save_first_step": False,
}
)
# Add mixing weights if specified
if mixing_weights:
cfg["mixing_weights"] = mixing_weights
cfg = validate_config(cfg)
normalize_config(cfg)
dataset_meta = load_datasets(cfg=cfg)
train(cfg=cfg, dataset_meta=dataset_meta)
check_model_output_exists(temp_dir, cfg)
# Verify training actually happened by checking loss decrease
check_tensorboard(
temp_dir + "/runs",
"train/train_loss",
2.5, # Loss should be reasonable for a smoke test (higher threshold for streaming)
"Train Loss (%s) is too high",
)
def test_streaming_eval_specific_mixing(self, temp_dir):
"""Test eval-specific mixing strategy override"""
cfg = DictDefault(
{
"base_model": "HuggingFaceTB/SmolLM2-135M",
"flash_attention": True,
"sequence_len": 512,
"sample_packing": False,
"dataset_processes": 1,
"special_tokens": {
"pad_token": "<|endoftext|>",
},
"datasets": [
{
"path": "mhenrichsen/alpaca_2k_test",
"type": "alpaca",
},
{
"path": "tatsu-lab/alpaca",
"type": "alpaca",
},
],
"test_datasets": [
{
"path": "mhenrichsen/alpaca_2k_test",
"type": "alpaca",
"split": "train", # Specify train split for eval dataset
},
{
"path": "tatsu-lab/alpaca",
"type": "alpaca",
"split": "train", # Specify train split for eval dataset
},
],
# Streaming config
"streaming": True,
"eval_streaming": True,
"max_steps": 3,
# Different mixing for train vs eval
"dataset_mixing_strategy": "round_robin",
"eval_dataset_mixing_strategy": "weighted",
"eval_mixing_weights": [0.6, 0.4],
"micro_batch_size": 1,
"gradient_accumulation_steps": 1,
"output_dir": temp_dir,
"learning_rate": 0.00001,
"optimizer": "adamw_torch_fused",
"lr_scheduler": "cosine",
"save_safetensors": True,
"bf16": "auto",
"use_tensorboard": True,
"save_first_step": False,
"eval_steps": 3, # Eval at the end
}
)
cfg = validate_config(cfg)
normalize_config(cfg)
dataset_meta = load_datasets(cfg=cfg)
train(cfg=cfg, dataset_meta=dataset_meta)
check_model_output_exists(temp_dir, cfg)
# Check both train and eval losses
check_tensorboard(
temp_dir + "/runs",
"train/train_loss",
2.5,
"Train Loss (%s) is too high",
)
check_tensorboard(
temp_dir + "/runs",
"eval/eval_loss",
2.5,
"Eval Loss (%s) is too high",
)
def test_streaming_validation_error(self, temp_dir):
"""Test that pydantic validation catches invalid streaming configs"""
cfg = DictDefault(
{
"base_model": "HuggingFaceTB/SmolLM2-135M",
"datasets": [
{
"path": "mhenrichsen/alpaca_2k_test",
"type": "alpaca",
},
{
"path": "tatsu-lab/alpaca",
"type": "alpaca",
},
],
"streaming": True,
"max_steps": 3,
# Invalid: wrong number of weights for datasets
"dataset_mixing_strategy": "weighted",
"mixing_weights": [1.0], # Should be [0.x, 0.y] for 2 datasets
"micro_batch_size": 1,
"gradient_accumulation_steps": 1,
"output_dir": temp_dir,
"learning_rate": 0.00001,
"special_tokens": {
"pad_token": "<|endoftext|>",
},
}
)
# This should raise a validation error
with pytest.raises(Exception) as exc_info:
validate_config(cfg)
# Verify it's the right validation error
assert "mixing_weights length" in str(exc_info.value)
assert "must match number of datasets" in str(exc_info.value)
def test_streaming_three_datasets_weighted(self, temp_dir):
"""Test weighted mixing with three datasets"""
cfg = DictDefault(
{
"base_model": "HuggingFaceTB/SmolLM2-135M",
"flash_attention": True,
"sequence_len": 512,
"sample_packing": False,
"dataset_processes": 1,
"special_tokens": {
"pad_token": "<|endoftext|>",
},
"datasets": [
{
"path": "mhenrichsen/alpaca_2k_test",
"type": "alpaca",
},
{
"path": "tatsu-lab/alpaca",
"type": "alpaca",
},
{
"path": "yahma/alpaca-cleaned",
"type": "alpaca",
},
],
# Streaming config
"streaming": True,
"max_steps": 3,
"dataset_mixing_strategy": "weighted",
"mixing_weights": [0.5, 0.3, 0.2],
"micro_batch_size": 1,
"gradient_accumulation_steps": 1,
"val_set_size": 0.0,
"output_dir": temp_dir,
"learning_rate": 0.00001,
"optimizer": "adamw_torch_fused",
"lr_scheduler": "cosine",
"save_safetensors": True,
"bf16": "auto",
"use_tensorboard": True,
"save_first_step": False,
}
)
cfg = validate_config(cfg)
normalize_config(cfg)
dataset_meta = load_datasets(cfg=cfg)
train(cfg=cfg, dataset_meta=dataset_meta)
check_model_output_exists(temp_dir, cfg)
check_tensorboard(
temp_dir + "/runs",
"train/train_loss",
2.5,
"Train Loss (%s) is too high",
)

View File

@@ -7,13 +7,13 @@ from typing import Any, Generator
from unittest.mock import patch
import pytest
from datasets import Dataset
from datasets import Dataset, IterableDataset
from huggingface_hub import snapshot_download
from transformers import PreTrainedTokenizer
from axolotl.loaders.tokenizer import load_tokenizer
from axolotl.utils.data.rl import prepare_preference_datasets
from axolotl.utils.data.sft import _load_tokenized_prepared_datasets
from axolotl.utils.data.sft import _load_tokenized_prepared_datasets, prepare_datasets
from axolotl.utils.dict import DictDefault
from tests.constants import (
@@ -24,6 +24,7 @@ from tests.constants import (
from tests.hf_offline_utils import enable_hf_offline
# pylint: disable=too-many-public-methods
class TestDatasetPreparation:
"""Test a configured dataloader."""
@@ -46,6 +47,24 @@ class TestDatasetPreparation:
]
)
@pytest.fixture
def streaming_dataset_fixture(self):
"""Create a streaming dataset fixture for testing."""
def generator():
yield {
"instruction": "Evaluate this sentence for spelling and grammar mistakes",
"input": "He finnished his meal and left the resturant",
"output": "He finished his meal and left the restaurant.",
}
yield {
"instruction": "What is the capital of France?",
"input": "",
"output": "The capital of France is Paris.",
}
return IterableDataset.from_generator(generator)
@pytest.mark.skip(reason="TODO: fix hf hub offline to work with HF rate limits")
@enable_hf_offline
def test_load_hub(self, tokenizer):
@@ -486,3 +505,201 @@ class TestDatasetPreparation:
assert "attention_mask" in dataset.features
assert "labels" in dataset.features
shutil.rmtree(tmp_ds_path)
def test_streaming_sft_dataset(self, tokenizer, streaming_dataset_fixture):
"""Test streaming SFT dataset preparation with IterableDataset."""
with patch("axolotl.utils.data.sft.load_dataset_with_config") as mock_load:
mock_load.return_value = streaming_dataset_fixture
cfg = DictDefault(
{
"tokenizer_config": "huggyllama/llama-7b",
"sequence_len": 256,
"streaming": True,
"max_steps": 100, # Required for streaming datasets
"datasets": [
{
"path": "dummy/path",
"type": "alpaca",
},
],
}
)
train_dataset, eval_dataset, total_num_steps, prompters = prepare_datasets(
cfg, tokenizer
)
# Verify it returns an IterableDataset
assert isinstance(train_dataset, IterableDataset)
assert eval_dataset is None # No eval split for streaming
assert total_num_steps == 100 # Should use max_steps
assert len(prompters) == 1
# Test that we can iterate through the dataset
sample_count = 0
for sample in train_dataset:
assert "input_ids" in sample
assert "attention_mask" in sample
assert "labels" in sample
sample_count += 1
if sample_count >= 2: # Just test first few samples
break
assert sample_count == 2
def test_dataset_mixing_strategy_validation(self):
"""Test validation of dataset mixing strategy configuration."""
from axolotl.utils.data.shared import _merge_datasets_with_strategy
# Test valid strategies work
valid_strategies = ["round_robin", "weighted", "random"]
dataset1 = Dataset.from_dict({"text": ["a"], "source": ["ds1"]})
dataset2 = Dataset.from_dict({"text": ["b"], "source": ["ds2"]})
for strategy in valid_strategies:
cfg = DictDefault(
{
"dataset_mixing_strategy": strategy,
"mixing_weights": [0.5, 0.5] if strategy == "weighted" else None,
"seed": 42,
}
)
# Should not raise an error
merged = _merge_datasets_with_strategy([dataset1, dataset2], cfg)
assert len(merged) >= 1
def test_regular_dataset_round_robin_mixing(self):
"""Test round-robin mixing for regular datasets."""
from axolotl.utils.data.shared import _merge_datasets_with_strategy
# Create test datasets
dataset1 = Dataset.from_dict(
{"text": ["ds1_item1", "ds1_item2"], "source": ["ds1", "ds1"]}
)
dataset2 = Dataset.from_dict(
{"text": ["ds2_item1", "ds2_item2"], "source": ["ds2", "ds2"]}
)
cfg = DictDefault({"dataset_mixing_strategy": "round_robin", "seed": 42})
merged = _merge_datasets_with_strategy([dataset1, dataset2], cfg)
# Should have all samples from both datasets
assert len(merged) == 4
assert isinstance(merged, Dataset)
# Check that samples are interleaved (not just concatenated)
sources = [sample["source"] for sample in merged]
# Round-robin should alternate between datasets
assert sources != ["ds1", "ds1", "ds2", "ds2"] # Not concatenated
def test_regular_dataset_weighted_mixing(self):
"""Test weighted mixing for regular datasets."""
from axolotl.utils.data.shared import _merge_datasets_with_strategy
# Create test datasets
dataset1 = Dataset.from_dict(
{
"text": ["ds1_item1", "ds1_item2", "ds1_item3", "ds1_item4"],
"source": ["ds1"] * 4,
}
)
dataset2 = Dataset.from_dict(
{
"text": ["ds2_item1", "ds2_item2", "ds2_item3", "ds2_item4"],
"source": ["ds2"] * 4,
}
)
cfg = DictDefault(
{
"dataset_mixing_strategy": "weighted",
"mixing_weights": [0.75, 0.25], # 3:1 ratio
"seed": 42,
}
)
merged = _merge_datasets_with_strategy([dataset1, dataset2], cfg)
# Should have samples proportional to weights
assert len(merged) > 0
assert isinstance(merged, Dataset)
# Count samples from each dataset
sources = [sample["source"] for sample in merged]
ds1_count = sources.count("ds1")
ds2_count = sources.count("ds2")
# Should have samples from both datasets
assert ds1_count > 0 and ds2_count > 0 # Both datasets should be represented
def test_streaming_dataset_mixing(self):
"""Test that streaming datasets use HuggingFace interleave_datasets."""
from axolotl.utils.data.shared import _merge_datasets_with_strategy
# Create test streaming datasets
def gen1():
yield {"text": "stream1_item1", "source": "stream1"}
yield {"text": "stream1_item2", "source": "stream1"}
def gen2():
yield {"text": "stream2_item1", "source": "stream2"}
yield {"text": "stream2_item2", "source": "stream2"}
stream1 = IterableDataset.from_generator(gen1)
stream2 = IterableDataset.from_generator(gen2)
cfg = DictDefault({"dataset_mixing_strategy": "round_robin", "seed": 42})
merged = _merge_datasets_with_strategy([stream1, stream2], cfg)
# Should return an IterableDataset
assert isinstance(merged, IterableDataset)
# Test that we can iterate and get samples
samples = list(merged.take(3))
assert len(samples) >= 2 # Should get at least 2 samples
# Should have samples from both datasets
sources = [sample["source"] for sample in samples]
assert len(set(sources)) >= 1 # At least one unique source
def test_eval_streaming_config(self):
"""Test eval_streaming separate from streaming config."""
from axolotl.utils.data.sft import _is_streaming_enabled_for_split
# Test train streaming enabled, eval streaming disabled
cfg = DictDefault({"streaming": True, "eval_streaming": False})
assert _is_streaming_enabled_for_split(cfg, "train")
assert _is_streaming_enabled_for_split(cfg, "test")
# Test train streaming disabled, eval streaming enabled
cfg2 = DictDefault({"streaming": False, "eval_streaming": True})
assert _is_streaming_enabled_for_split(cfg2, "train")
assert _is_streaming_enabled_for_split(cfg2, "test")
def test_eval_specific_mixing_configs(self):
"""Test eval-specific mixing configs override main configs."""
from axolotl.utils.data.sft import _get_streaming_config_for_split
cfg = DictDefault(
{
"dataset_mixing_strategy": "round_robin",
"mixing_weights": [0.5, 0.5],
"eval_dataset_mixing_strategy": "weighted",
"eval_mixing_weights": [0.8, 0.2],
}
)
# Train split should use main config
train_cfg = _get_streaming_config_for_split(cfg, "train")
assert train_cfg["dataset_mixing_strategy"] == "round_robin"
assert train_cfg["mixing_weights"] == [0.5, 0.5]
# Test split should use eval-specific config
test_cfg = _get_streaming_config_for_split(cfg, "test")
assert test_cfg["dataset_mixing_strategy"] == "weighted"
assert test_cfg["mixing_weights"] == [0.8, 0.2]

View File

@@ -48,13 +48,7 @@ class TestBatchedSamplerPacking:
max_seq_length,
sequential,
):
from axolotl.monkeypatch.data.batch_dataset_fetcher import (
apply_multipack_dataloader_patch,
remove_multipack_dataloader_patch,
)
# Apply the patch for multipack handling
apply_multipack_dataloader_patch()
import axolotl.monkeypatch.data.batch_dataset_fetcher # pylint: disable=unused-import # noqa: F401
dataset = dataset_winglian_tiny_shakespeare["train"]
@@ -107,14 +101,10 @@ class TestBatchedSamplerPacking:
for pack in batch:
batch_idxs.extend(pack)
try:
for batch in loader:
assert batch["input_ids"].numel() <= batch_size * max_seq_length
assert batch["input_ids"].shape[1] == max_seq_length
for batch in loader:
assert batch["input_ids"].numel() <= batch_size * max_seq_length
assert batch["input_ids"].shape[1] == max_seq_length
original_idxs = set(range(len(train_dataset)))
assert original_idxs == set(batch_idxs)
assert len(batch_idxs) == len(set(batch_idxs))
finally:
# Clean up: remove the patch after the test
remove_multipack_dataloader_patch()
original_idxs = set(range(len(train_dataset)))
assert original_idxs == set(batch_idxs)
assert len(batch_idxs) == len(set(batch_idxs))