Compare commits

..

4 Commits

Author SHA1 Message Date
Wing Lian
9f1d548534 don't use zero first context for loading datasets 2025-05-23 10:38:32 -04:00
Wing Lian
a27b909c5c GRPO fixes (peft) (#2676)
* don't set peft_config on grpo to prevent double peft wrap

* remove overrides needed to support bug

* fix grpo tests

* require more CPU for multigpu to help with torch compile for vllm
2025-05-16 15:47:03 -04:00
xzuyn
6cb07b9d12 Fix for setting adam_beta3 and adam_epsilon2 for CAME Optimizer (#2654) [skip ci]
* make setting `adam_beta3` and `adam_epsilon2` work correctly

* update config docs so users know args are specific to CAME optim

---------

Co-authored-by: Wing Lian <wing@axolotl.ai>
2025-05-16 15:46:50 -04:00
C080
288653adb6 Fix: Make MLflow config artifact logging respect hf_mlflow_log_artifa… (#2675) [skip ci]
* Fix: Make MLflow config artifact logging respect hf_mlflow_log_artifacts setting

* cleanup and lint

---------

Co-authored-by: Wing Lian <wing@axolotl.ai>
2025-05-16 15:46:31 -04:00
11 changed files with 215 additions and 170 deletions

View File

@@ -70,7 +70,7 @@ def run_cmd(cmd: str, run_folder: str):
image=cicd_image, image=cicd_image,
gpu=GPU_CONFIG, gpu=GPU_CONFIG,
timeout=90 * 60, timeout=90 * 60,
cpu=8.0, cpu=16.0,
memory=131072 * N_GPUS, memory=131072 * N_GPUS,
volumes=VOLUME_CONFIG, volumes=VOLUME_CONFIG,
) )

View File

@@ -633,7 +633,9 @@ weight_decay:
# adamw hyperparams # adamw hyperparams
adam_beta1: adam_beta1:
adam_beta2: adam_beta2:
adam_beta3: # only used for CAME Optimizer
adam_epsilon: adam_epsilon:
adam_epsilon2: # only used for CAME Optimizer
# Gradient clipping max norm # Gradient clipping max norm
max_grad_norm: max_grad_norm:

View File

@@ -387,8 +387,12 @@ class HFCausalTrainerBuilder(TrainerBuilderBase):
training_arguments_kwargs["adam_beta1"] = self.cfg.adam_beta1 training_arguments_kwargs["adam_beta1"] = self.cfg.adam_beta1
if self.cfg.adam_beta2: if self.cfg.adam_beta2:
training_arguments_kwargs["adam_beta2"] = self.cfg.adam_beta2 training_arguments_kwargs["adam_beta2"] = self.cfg.adam_beta2
if self.cfg.adam_beta3:
training_arguments_kwargs["adam_beta3"] = self.cfg.adam_beta3
if self.cfg.adam_epsilon: if self.cfg.adam_epsilon:
training_arguments_kwargs["adam_epsilon"] = self.cfg.adam_epsilon training_arguments_kwargs["adam_epsilon"] = self.cfg.adam_epsilon
if self.cfg.adam_epsilon2:
training_arguments_kwargs["adam_epsilon2"] = self.cfg.adam_epsilon2
if self.cfg.max_grad_norm: if self.cfg.max_grad_norm:
training_arguments_kwargs["max_grad_norm"] = self.cfg.max_grad_norm training_arguments_kwargs["max_grad_norm"] = self.cfg.max_grad_norm
@@ -713,7 +717,7 @@ class HFCausalTrainerBuilder(TrainerBuilderBase):
beta1 = training_arguments_kwargs.get("adam_beta1", 0.9) beta1 = training_arguments_kwargs.get("adam_beta1", 0.9)
beta2 = training_arguments_kwargs.get("adam_beta2", 0.999) beta2 = training_arguments_kwargs.get("adam_beta2", 0.999)
beta3 = training_arguments_kwargs.get("adam_beta2", 0.9999) beta3 = training_arguments_kwargs.get("adam_beta3", 0.9999)
eps1 = training_arguments_kwargs.get("adam_epsilon", 1e-30) eps1 = training_arguments_kwargs.get("adam_epsilon", 1e-30)
eps2 = training_arguments_kwargs.get("adam_epsilon2", 1e-16) eps2 = training_arguments_kwargs.get("adam_epsilon2", 1e-16)
adam_kwargs["betas"] = (beta1, beta2, beta3) adam_kwargs["betas"] = (beta1, beta2, beta3)
@@ -1170,7 +1174,8 @@ class HFRLTrainerBuilder(TrainerBuilderBase):
if self.eval_dataset: if self.eval_dataset:
trainer_kwargs["eval_dataset"] = self.eval_dataset trainer_kwargs["eval_dataset"] = self.eval_dataset
if self.cfg.adapter and self.peft_config: if self.cfg.adapter and self.peft_config:
trainer_kwargs["peft_config"] = self.peft_config if self.cfg.rl is not RLType.GRPO:
trainer_kwargs["peft_config"] = self.peft_config
if self.cfg.precompute_ref_log_probs is not None: if self.cfg.precompute_ref_log_probs is not None:
trainer_kwargs["precompute_ref_log_probs"] = ( trainer_kwargs["precompute_ref_log_probs"] = (
self.cfg.precompute_ref_log_probs self.cfg.precompute_ref_log_probs

View File

@@ -156,9 +156,6 @@ class AxolotlTrainer(
Helper method to get the sampler for evaluation. Handles sequence parallelism Helper method to get the sampler for evaluation. Handles sequence parallelism
and sample packing cases. and sample packing cases.
Args:
eval_dataset: Evaluation dataset.
Returns: Returns:
If the dataset is non-empty, a sampler is returned, the type of which If the dataset is non-empty, a sampler is returned, the type of which
depends on the passed training args. depends on the passed training args.
@@ -240,6 +237,9 @@ class AxolotlTrainer(
self.accelerator.even_batches = False self.accelerator.even_batches = False
# Return unprepared dataloader if using sequence parallelism # Return unprepared dataloader if using sequence parallelism
# TODO(djsaunde): We might be able to use `accelerate`'s dataloader preparation
# if we use `dispatch_batches` and `slice_fn_for_dispatch` properly (i.e.,
# slice each batch along the sequence dimension).
if self.args.sequence_parallel_degree > 1: if self.args.sequence_parallel_degree > 1:
return dataloader return dataloader

View File

@@ -1,25 +1,33 @@
"""DPO trainer for Axolotl""" """
DPO trainer for axolotl
"""
import gc import gc
import random
from functools import wraps from functools import wraps
from typing import Any, Dict, Union from typing import Any, Dict, Optional, Union
import pandas as pd
import torch import torch
from datasets import Dataset import wandb
from accelerate import PartialState
from datasets import Dataset, IterableDataset
from peft.optimizers import create_loraplus_optimizer from peft.optimizers import create_loraplus_optimizer
from torch import nn from torch import nn
from torch.utils.data import Sampler from torch.utils.data import DataLoader
from transformers import ( from transformers import (
BaseImageProcessor,
FeatureExtractionMixin,
PreTrainedTokenizerBase,
ProcessorMixin,
Trainer, Trainer,
) )
from transformers.trainer_utils import EvalLoopOutput
from transformers.utils import is_sagemaker_mp_enabled from transformers.utils import is_sagemaker_mp_enabled
from trl import DPOTrainer from trl import DPOConfig, DPOTrainer, maybe_apply_chat_template, maybe_extract_prompt
from trl.trainer.utils import log_table_to_comet_experiment
from axolotl.core.trainers.mixins import ( from axolotl.core.trainers.mixins import RngLoaderMixin, SchedulerMixin
RngLoaderMixin,
SchedulerMixin,
SequenceParallelMixin,
)
from axolotl.core.trainers.utils import ( from axolotl.core.trainers.utils import (
sanitize_kwargs_for_ds_tagging, sanitize_kwargs_for_ds_tagging,
sanitize_kwargs_for_tagging, sanitize_kwargs_for_tagging,
@@ -29,10 +37,10 @@ if is_sagemaker_mp_enabled():
import smdistributed.modelparallel.torch as smp import smdistributed.modelparallel.torch as smp
class AxolotlDPOTrainer( class AxolotlDPOTrainer(RngLoaderMixin, SchedulerMixin, DPOTrainer):
RngLoaderMixin, SchedulerMixin, SequenceParallelMixin, DPOTrainer """
): Extend the base DPOTrainer for axolotl helpers
"""Extend the base DPOTrainer for axolotl helpers""" """
tag_names = ["axolotl", "dpo"] tag_names = ["axolotl", "dpo"]
@@ -87,6 +95,64 @@ class AxolotlDPOTrainer(
return super().push_to_hub(*args, **kwargs) return super().push_to_hub(*args, **kwargs)
# TODO: remove this once https://github.com/huggingface/trl/pull/3377 is in a release
def _prepare_dataset(
self,
dataset: Union[Dataset, IterableDataset],
processing_class: Union[
PreTrainedTokenizerBase,
BaseImageProcessor,
FeatureExtractionMixin,
ProcessorMixin,
],
args: DPOConfig,
dataset_name: str,
) -> Union[Dataset, IterableDataset]:
# Build the kwargs for the `map` function
map_kwargs: Dict[str, Any] = {"writer_batch_size": 10}
if isinstance(dataset, Dataset): # IterableDataset does not support num_proc
map_kwargs["num_proc"] = args.dataset_num_proc
with PartialState().main_process_first():
# Extract prompt if needed
if isinstance(
dataset, Dataset
): # `IterableDataset.map` does not support `desc`
map_kwargs["desc"] = f"Extracting prompt in {dataset_name} dataset"
dataset = dataset.map(maybe_extract_prompt, **map_kwargs)
# Apply the chat template if needed
if isinstance(
dataset, Dataset
): # `IterableDataset.map` does not support `desc`
map_kwargs["desc"] = f"Applying chat template to {dataset_name} dataset"
dataset = dataset.map(
maybe_apply_chat_template,
fn_kwargs={"tokenizer": processing_class, "tools": args.tools},
**map_kwargs,
)
# Tokenize the dataset
if isinstance(
dataset, Dataset
): # `IterableDataset.map` does not support `desc`
map_kwargs["desc"] = f"Tokenizing {dataset_name} dataset"
dataset = dataset.map(
self.tokenize_row if not self.is_vision_model else self.process_row,
remove_columns=["chosen", "rejected"],
fn_kwargs={
"processing_class": processing_class,
"max_prompt_length": args.max_prompt_length,
"max_completion_length": args.max_completion_length,
# for enc-dec, we add the special tokens ([bos_token] + prompt + [eos_token]; completion + [eos_token])
"add_special_tokens": False,
},
**map_kwargs,
)
return dataset
@staticmethod @staticmethod
def tokenize_row( def tokenize_row(
features, features,
@@ -127,48 +193,68 @@ class AxolotlDPOTrainer(
torch.cuda.empty_cache() torch.cuda.empty_cache()
return loss return loss
def _get_train_sampler(self) -> Sampler | None: # TODO: remove this once https://github.com/huggingface/trl/pull/3377 is in a release
def evaluation_loop(
self,
dataloader: DataLoader,
description: str,
prediction_loss_only: Optional[bool] = None,
ignore_keys: Optional[list[str]] = None,
metric_key_prefix: str = "eval",
) -> EvalLoopOutput:
""" """
Helper method to get the sampler for training. Handles cases for sequence Overriding built-in evaluation loop to store metrics for each batch.
parallelism, sample packing, and curriculum sampling (sequential). Prediction/evaluation loop, shared by `Trainer.evaluate()` and `Trainer.predict()`.
Returns: Works both with or without labels.
If the dataset is non-empty, a sampler is returned, the type of which
depends on the passed training args.
""" """
import torch.distributed as dist
if dist.get_rank() == 0: # Sample and save to game log if requested (for one batch to save time)
import ipdb if self.generate_during_eval:
# Generate random indices within the range of the total number of samples
num_samples = len(dataloader.dataset)
random_indices = random.sample(
range(num_samples), k=self.args.eval_batch_size
)
ipdb.set_trace() # Use dataloader.dataset.select to get the random batch without iterating over the DataLoader
dist.barrier() random_batch_dataset = dataloader.dataset.select(random_indices)
if dist.get_rank() == 1: random_batch = self.data_collator(random_batch_dataset)
import ipdb random_batch = self._prepare_inputs(random_batch)
ipdb.set_trace() policy_output_decoded, ref_output_decoded = (
dist.barrier() self.generate_from_model_and_ref(self.model, random_batch)
)
if self.args.sequence_parallel_degree > 1: table = pd.DataFrame(
return self._sp_get_train_sampler(self.train_dataset) columns=["Prompt", "Policy", "Ref Model"],
data=[
[prompt, pol[len(prompt) :], ref[len(prompt) :]]
for prompt, pol, ref in zip(
random_batch_dataset["prompt"],
policy_output_decoded,
ref_output_decoded,
)
],
)
if "wandb" in self.args.report_to and self.accelerator.is_main_process:
wandb.log({"game_log": wandb.Table(data=table)})
return super()._get_train_sampler() if "comet_ml" in self.args.report_to:
log_table_to_comet_experiment(
name="game_log.csv",
table=table,
)
def _get_eval_sampler(self, eval_dataset: Dataset | None = None) -> Sampler | None: # Base evaluation
""" initial_output = super( # pylint: disable=bad-super-call
Helper method to get the sampler for evaluation. Handles sequence parallelism DPOTrainer, self
and sample packing cases. ).evaluation_loop(
dataloader,
description,
prediction_loss_only,
ignore_keys,
metric_key_prefix,
)
Args: return initial_output
eval_dataset: Evaluation dataset.
Returns:
If the dataset is non-empty, a sampler is returned, the type of which
depends on the passed training args.
"""
eval_dataset = eval_dataset if eval_dataset is not None else self.eval_dataset
if self.args.sequence_parallel_degree > 1:
return self._sp_get_eval_sampler(eval_dataset)
return super()._get_eval_sampler(eval_dataset)

View File

@@ -3,7 +3,6 @@
# pylint: disable=too-many-lines,duplicate-code,protected-access,no-member # pylint: disable=too-many-lines,duplicate-code,protected-access,no-member
import warnings import warnings
from contextlib import nullcontext
from typing import Any from typing import Any
import datasets import datasets
@@ -14,7 +13,7 @@ from accelerate.utils import (
broadcast_object_list, broadcast_object_list,
gather, gather,
gather_object, gather_object,
is_peft_model, is_peft_available,
) )
from datasets import Dataset, IterableDataset from datasets import Dataset, IterableDataset
from torch import nn from torch import nn
@@ -30,15 +29,13 @@ from transformers import (
TrainerCallback, TrainerCallback,
) )
from transformers.trainer_utils import seed_worker from transformers.trainer_utils import seed_worker
from transformers.utils import is_peft_available
from trl import GRPOTrainer from trl import GRPOTrainer
from trl.data_utils import ( from trl.data_utils import (
apply_chat_template, apply_chat_template,
is_conversational, is_conversational,
maybe_apply_chat_template, maybe_apply_chat_template,
) )
from trl.extras.profiling import profiling_context, profiling_decorator from trl.extras.profiling import profiling_context
from trl.import_utils import is_deepspeed_available
from trl.models import unwrap_model_for_generation from trl.models import unwrap_model_for_generation
from trl.trainer.grpo_config import GRPOConfig from trl.trainer.grpo_config import GRPOConfig
from trl.trainer.grpo_trainer import RewardFunc, nanstd from trl.trainer.grpo_trainer import RewardFunc, nanstd
@@ -52,62 +49,12 @@ if is_peft_available():
# pylint: disable=unused-import # pylint: disable=unused-import
from peft import PeftConfig from peft import PeftConfig
if is_deepspeed_available():
import deepspeed
class AxolotlGRPOTrainer(RngLoaderMixin, SchedulerMixin, GRPOTrainer): class AxolotlGRPOTrainer(RngLoaderMixin, SchedulerMixin, GRPOTrainer):
"""Extend the base GRPOTrainer for axolotl helpers""" """Extend the base GRPOTrainer for axolotl helpers"""
_tag_names = ["trl", "grpo", "axolotl"] _tag_names = ["trl", "grpo", "axolotl"]
@profiling_decorator
def _move_model_to_vllm(self):
# For DeepSpeed ZeRO-3, we need to gather all parameters before operations
deepspeed_plugin = self.accelerator.state.deepspeed_plugin
zero_stage_3 = deepspeed_plugin is not None and deepspeed_plugin.zero_stage == 3
gather_if_zero3 = (
deepspeed.zero.GatheredParameters if zero_stage_3 else nullcontext
)
if is_peft_model(self.model):
# With PEFT and DeepSpeed ZeRO Stage 3, we must gather the full model at once before merging, as merging
# adapters in a sharded manner is not supported.
with gather_if_zero3(list(self.model.parameters())):
self.model.merge_adapter()
# Update vLLM weights while parameters are gathered
for name, param in self.model.named_parameters():
# When using PEFT, we need to recover the original parameter name and discard some parameters
name = (
name.removeprefix("base_model.model.")
.removeprefix("base_model.model.")
.replace(".base_layer", "")
)
if self.model.prefix in name:
continue
# When module to save, remove its prefix and discard the original module
if "original_module" in name:
continue
name = name.replace("modules_to_save.default.", "")
if self.accelerator.is_main_process:
self.vllm_client.update_named_param(name, param.data)
# Unmerge adapters while parameters are still gathered
self.model.unmerge_adapter()
# Parameters will automatically be repartitioned when exiting the context
else:
# For non-PEFT models, simply gather and update each parameter individually.
for name, param in self.model.named_parameters():
with gather_if_zero3([param]):
if self.accelerator.is_main_process:
self.vllm_client.update_named_param(name, param.data)
# Reset cache on main process
if self.accelerator.is_main_process:
self.vllm_client.reset_prefix_cache()
class AxolotlGRPOSequenceParallelTrainer(AxolotlGRPOTrainer): class AxolotlGRPOSequenceParallelTrainer(AxolotlGRPOTrainer):
"""Extend the base GRPOTrainer for sequence parallelism handling""" """Extend the base GRPOTrainer for sequence parallelism handling"""
@@ -266,6 +213,9 @@ class AxolotlGRPOSequenceParallelTrainer(AxolotlGRPOTrainer):
self.accelerator.even_batches = False self.accelerator.even_batches = False
# Return unprepared dataloader if using sequence parallelism # Return unprepared dataloader if using sequence parallelism
# TODO(djsaunde): We might be able to use `accelerate`'s dataloader preparation
# if we use `dispatch_batches` and `slice_fn_for_dispatch` properly (i.e.,
# slice each batch along the sequence dimension).
if self.args.sequence_parallel_degree > 1: if self.args.sequence_parallel_degree > 1:
return dataloader return dataloader

View File

@@ -227,6 +227,19 @@ class AxolotlTrainingMixins:
}, },
) )
adam_beta3: Optional[float] = field(
default=None,
metadata={
"help": "The beta3 hyperparameter used in some optimizers such as CAME"
},
)
adam_epsilon2: Optional[float] = field(
default=None,
metadata={
"help": "The epsilon2 hyperparameter used in some optimizers such as CAME"
},
)
# multi-modal section # multi-modal section
image_size: int | tuple[int, int] | None = field( image_size: int | tuple[int, int] | None = field(

View File

@@ -1,6 +1,7 @@
"""MLFlow module for trainer callbacks""" """MLFlow module for trainer callbacks"""
import logging import logging
import os
from shutil import copyfile from shutil import copyfile
from tempfile import NamedTemporaryFile from tempfile import NamedTemporaryFile
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
@@ -16,6 +17,11 @@ if TYPE_CHECKING:
LOG = logging.getLogger("axolotl.callbacks") LOG = logging.getLogger("axolotl.callbacks")
def should_log_artifacts() -> bool:
truths = ["TRUE", "1", "YES"]
return os.getenv("HF_MLFLOW_LOG_ARTIFACTS", "FALSE").upper() in truths
class SaveAxolotlConfigtoMlflowCallback(TrainerCallback): class SaveAxolotlConfigtoMlflowCallback(TrainerCallback):
# pylint: disable=duplicate-code # pylint: disable=duplicate-code
"""Callback to save axolotl config to mlflow""" """Callback to save axolotl config to mlflow"""
@@ -32,13 +38,18 @@ class SaveAxolotlConfigtoMlflowCallback(TrainerCallback):
): ):
if is_main_process(): if is_main_process():
try: try:
with NamedTemporaryFile( if should_log_artifacts():
mode="w", delete=False, suffix=".yml", prefix="axolotl_config_" with NamedTemporaryFile(
) as temp_file: mode="w", delete=False, suffix=".yml", prefix="axolotl_config_"
copyfile(self.axolotl_config_path, temp_file.name) ) as temp_file:
mlflow.log_artifact(temp_file.name, artifact_path="") copyfile(self.axolotl_config_path, temp_file.name)
mlflow.log_artifact(temp_file.name, artifact_path="")
LOG.info(
"The Axolotl config has been saved to the MLflow artifacts."
)
else:
LOG.info( LOG.info(
"The Axolotl config has been saved to the MLflow artifacts." "Skipping logging artifacts to MLflow (hf_mlflow_log_artifacts is false)"
) )
except (FileNotFoundError, ConnectionError) as err: except (FileNotFoundError, ConnectionError) as err:
LOG.warning(f"Error while saving Axolotl config to MLflow: {err}") LOG.warning(f"Error while saving Axolotl config to MLflow: {err}")

View File

@@ -1,7 +1,6 @@
"""Module for Axolotl trainer sequence parallelism manager and utilities""" """Module for Axolotl trainer sequence parallelism manager and utilities"""
import functools import functools
import inspect
import torch import torch
import torch.distributed as dist import torch.distributed as dist
@@ -33,7 +32,7 @@ def apply_sequence_parallelism(
to only keep the last N tokens in the sequence during generation. to only keep the last N tokens in the sequence during generation.
Args: Args:
batch: Dictionary of model arguments (e.g., input_ids, attention_mask, etc.). batch: Batch dictionary (e.g., input_ids, attention_mask, etc.).
local_rank: Local rank in the sequence parallel group. local_rank: Local rank in the sequence parallel group.
local_world_size: World size of the sequence parallel group. local_world_size: World size of the sequence parallel group.
gradient_accumulation_steps: Number of steps to accumulate gradients over. gradient_accumulation_steps: Number of steps to accumulate gradients over.
@@ -207,26 +206,12 @@ class SequenceParallelContextManager:
def __enter__(self): def __enter__(self):
# Forward pre-hook to apply sequence parallelism # Forward pre-hook to apply sequence parallelism
def sequence_parallel_pre_hook(_, args, kwargs): def sequence_parallel_pre_hook(_, args, kwargs):
# Convert all args to kwargs using the model's forward function signature # Apply sequence parallelism to kwargs and get original sequence length and padding info
updated_kwargs = kwargs.copy() kwargs, self.original_seq_len, self.pad_len = (
self.apply_sequence_parallelism(batch=kwargs)
# Get parameter names from the model's forward function
forward_params = list(
inspect.signature(self.models[0].forward).parameters.keys()
) )
# Map args to their parameter names return args, kwargs
for i, arg in enumerate(args):
if i < len(forward_params):
param_name = forward_params[i]
updated_kwargs[param_name] = arg
# Apply sequence parallelism to empty args and updated kwargs
updated_kwargs, self.original_seq_len, self.pad_len = (
self.apply_sequence_parallelism(updated_kwargs)
)
return (), updated_kwargs
# Forward post-hook to gather outputs # Forward post-hook to gather outputs
def sequence_parallel_post_hook(_, __, output: ModelOutput) -> ModelOutput: def sequence_parallel_post_hook(_, __, output: ModelOutput) -> ModelOutput:

View File

@@ -53,7 +53,7 @@ from axolotl.utils.data.utils import (
retry_on_request_exceptions, retry_on_request_exceptions,
) )
from axolotl.utils.dict import DictDefault from axolotl.utils.dict import DictDefault
from axolotl.utils.distributed import is_local_main_process, zero_first from axolotl.utils.distributed import is_local_main_process
from axolotl.utils.trainer import ( from axolotl.utils.trainer import (
calculate_total_num_steps, calculate_total_num_steps,
process_datasets_for_packing, process_datasets_for_packing,
@@ -66,32 +66,31 @@ LOG = logging.getLogger(__name__)
def prepare_dataset(cfg, tokenizer, processor=None, preprocess_iterable=None): def prepare_dataset(cfg, tokenizer, processor=None, preprocess_iterable=None):
prompters = [] prompters = []
if not cfg.pretraining_dataset: if not cfg.pretraining_dataset:
with zero_first(is_local_main_process()): if cfg.test_datasets:
if cfg.test_datasets: train_dataset, _, prompters = load_prepare_datasets(
train_dataset, _, prompters = load_prepare_datasets( tokenizer,
tokenizer, cfg,
cfg, DEFAULT_DATASET_PREPARED_PATH,
DEFAULT_DATASET_PREPARED_PATH, split="train",
split="train", processor=processor,
processor=processor, preprocess_iterable=preprocess_iterable,
preprocess_iterable=preprocess_iterable, )
) _, eval_dataset, _ = load_prepare_datasets(
_, eval_dataset, _ = load_prepare_datasets( tokenizer,
tokenizer, cfg,
cfg, DEFAULT_DATASET_PREPARED_PATH,
DEFAULT_DATASET_PREPARED_PATH, split="test",
split="test", processor=processor,
processor=processor, preprocess_iterable=preprocess_iterable,
preprocess_iterable=preprocess_iterable, )
) else:
else: train_dataset, eval_dataset, prompters = load_prepare_datasets(
train_dataset, eval_dataset, prompters = load_prepare_datasets( tokenizer,
tokenizer, cfg,
cfg, DEFAULT_DATASET_PREPARED_PATH,
DEFAULT_DATASET_PREPARED_PATH, processor=processor,
processor=processor, preprocess_iterable=preprocess_iterable,
preprocess_iterable=preprocess_iterable, )
)
else: else:
# Load streaming dataset if pretraining_dataset is given # Load streaming dataset if pretraining_dataset is given
path = cfg.pretraining_dataset path = cfg.pretraining_dataset
@@ -272,7 +271,7 @@ def load_tokenized_prepared_datasets(
LOG.info("Loading raw datasets...") LOG.info("Loading raw datasets...")
if not cfg.is_preprocess: if not cfg.is_preprocess:
LOG.warning( LOG.warning(
"Processing datasets during training can lead to VRAM instability. Please pre-process your dataset." "Processing datasets during training can lead to VRAM instability. Please use `axolotl preprocess` to prepare your dataset."
) )
if cfg.seed: if cfg.seed:

View File

@@ -166,7 +166,6 @@ def oai_gsm8k_transform(cfg, *args, **kwargs):
""" """
) )
@pytest.mark.skip(reason="flaky test")
@pytest.mark.parametrize( @pytest.mark.parametrize(
"num_gpus", "num_gpus",
[1, 2], [1, 2],
@@ -231,8 +230,6 @@ def oai_gsm8k_transform(cfg, *args, **kwargs):
"NCCL_P2P_LEVEL": "LOC", "NCCL_P2P_LEVEL": "LOC",
**current_env, **current_env,
"CUDA_VISIBLE_DEVICES": "1", "CUDA_VISIBLE_DEVICES": "1",
"VLLM_DISABLE_COMPILE_CACHE": "1",
# "VLLM_USE_V1": "0",
} }
vllm_process = start_vllm( vllm_process = start_vllm(
cfg.base_model, cfg.base_model,
@@ -266,7 +263,6 @@ def oai_gsm8k_transform(cfg, *args, **kwargs):
finally: finally:
recursive_kill(vllm_process) recursive_kill(vllm_process)
@pytest.mark.skip(reason="flaky test")
@pytest.mark.parametrize( @pytest.mark.parametrize(
"num_gpus", "num_gpus",
[1, 2], [1, 2],
@@ -325,8 +321,6 @@ def oai_gsm8k_transform(cfg, *args, **kwargs):
"NCCL_P2P_LEVEL": "LOC", # nccl can be brittle, assume P2P isn't reliable "NCCL_P2P_LEVEL": "LOC", # nccl can be brittle, assume P2P isn't reliable
**current_env, **current_env,
"CUDA_VISIBLE_DEVICES": "1", "CUDA_VISIBLE_DEVICES": "1",
"VLLM_DISABLE_COMPILE_CACHE": "1",
# "VLLM_USE_V1": "0",
} }
vllm_process = start_vllm( vllm_process = start_vllm(
cfg.base_model, cfg.base_model,