Add Exact Deduplication Feature to Preprocessing Pipeline (#2072)

* Add example YAML file for training Mistral using DPO

* added deduplication code

* Add exact deduplication feature and update examples

* Improve deduplication for train/eval overlap

Changed the deduplication function to use a more memory-efficient hashing method. Applied Git suggestions to improve clarity and maintainability.\n\nThe deduplication now handles cases where train and eval datasets have overlapping elements.

* Improve deduplication for train/eval overlap

Changed the deduplication function to use a more memory-efficient hashing method. Applied Git suggestions to improve clarity and maintainability.\n\nThe deduplication now handles cases where train and eval datasets have overlapping elements.

* Apply suggestions from code review

To handle the original case where we do not do deduplication

Co-authored-by: Wing Lian <wing.lian@gmail.com>

* Improve false collision detection to ensure dataset integrity

- Added test cases to simulate and verify handling of forced hash collisions between datasets.
- Ensured that datasets with identical hashes but different content are correctly identified, preventing incorrect deduplication.
- Updated unit tests to include scenarios where collisions occur across both training and evaluation datasets, as well as within a single dataset.

* Moved the constants file to the tests folder

- Relocated `constants.py` to the `tests` folder to improve modularity and maintain a clear separation between source and test files.
- Renamed `cicd/tests.py` to `cicd/cicd_tests.py` to resolve a conflict with `tests/__init__.py`, which caused Mypy to fail due to duplicate module names.
- Updated all references to `cicd.tests` in the codebase to `cicd.cicd_tests` to reflect the renaming and ensure compatibility.
- These changes ensure Mypy passes the pre-commit hook and maintain alignment with the project's structure.

* revert some changes from previous commit and fix relative import

---------

Co-authored-by: Wing Lian <wing.lian@gmail.com>
Co-authored-by: Wing Lian <wing@axolotl.ai>
This commit is contained in:
Oliver Molenschot
2024-12-02 05:47:10 -08:00
committed by GitHub
parent 5f1d98e8fc
commit b620ed94d0
11 changed files with 767 additions and 51 deletions

View File

@@ -139,7 +139,7 @@ def check_remote_config(config: Union[str, Path]):
with open(output_path, "wb") as file:
file.write(content)
LOG.info(
f"Using the following config obtained from {config}:\n\n{content.decode('utf-8')}\n"
f"Using the following config obtained from {config}: \n\n{content.decode('utf-8')}\n"
)
return output_path

View File

@@ -624,6 +624,7 @@ class AxolotlInputConfig(
json_schema_extra={"description": "streaming dataset to use for pretraining"},
)
dataset_processes: Optional[int] = Field(default=os.cpu_count())
dataset_exact_deduplication: Optional[bool] = None
dataset_keep_in_memory: Optional[bool] = None
dataloader_pin_memory: Optional[bool] = None
dataloader_num_workers: Optional[int] = None

View File

@@ -13,7 +13,7 @@ from axolotl.common.const import DEFAULT_DATASET_PREPARED_PATH
from axolotl.prompt_strategies.dpo import load as load_dpo
from axolotl.prompt_strategies.kto import load as load_kto
from axolotl.prompt_strategies.orpo import load as load_orpo
from axolotl.utils.data.utils import md5
from axolotl.utils.data.utils import deduplicate_and_log_datasets, md5
from axolotl.utils.dict import DictDefault
from axolotl.utils.distributed import is_main_process, zero_first
from axolotl.utils.models import load_tokenizer
@@ -208,4 +208,9 @@ def load_prepare_dpo_datasets(cfg):
if eval_dataset and not eval_is_preprocessed:
_save_preprocessed_ds(cfg, cfg.test_datasets, eval_dataset)
if cfg.dataset_exact_deduplication:
train_dataset, eval_dataset, _ = deduplicate_and_log_datasets(
train_dataset=train_dataset, eval_dataset=eval_dataset
)
return train_dataset, eval_dataset

View File

@@ -44,7 +44,7 @@ from axolotl.prompters import (
UnsupportedPrompter,
)
from axolotl.utils.data.pretraining import wrap_pretraining_dataset
from axolotl.utils.data.utils import md5
from axolotl.utils.data.utils import deduplicate_and_log_datasets, md5
from axolotl.utils.dict import DictDefault
from axolotl.utils.distributed import is_local_main_process, zero_first
from axolotl.utils.trainer import (
@@ -136,8 +136,9 @@ def prepare_dataset(cfg, tokenizer, processor=None):
# https://discuss.huggingface.co/t/how-to-use-huggingface-trainer-streaming-datasets-without-wrapping-it-with-torchdatas-iterablewrapper/25230
train_dataset = train_dataset.with_format("torch")
eval_dataset = None
if cfg.dataset_exact_deduplication:
LOG.info("Deduplication not available for pretrained datasets")
return train_dataset, eval_dataset, cfg.max_steps, prompters
if eval_dataset and cfg.sample_packing and cfg.eval_sample_packing is not False:
total_eval_steps = calculate_total_num_steps(cfg, eval_dataset, update=False)
if total_eval_steps == 0:
@@ -178,7 +179,7 @@ def load_tokenized_prepared_datasets(
+ "|".join(
sorted(
[
f"{d.path}:{d.type}:{d.shards}:{d.conversation}{d.split}"
f"{d.path}: {d.type}: {d.shards}: {d.conversation}{d.split}"
for d in cfg_datasets
]
)
@@ -584,7 +585,8 @@ def load_prepare_datasets(
)
train_fingerprint = md5(to_hash_train)
test_fingerprint = md5(to_hash_test)
if cfg.dataset_exact_deduplication:
_, _, dataset = deduplicate_and_log_datasets(dataset=dataset)
dataset = dataset.train_test_split(
test_size=val_set_size,
shuffle=False,
@@ -596,12 +598,17 @@ def load_prepare_datasets(
train_dataset = dataset["train"]
eval_dataset = dataset["test"]
elif split == "test":
if cfg.dataset_exact_deduplication:
_, eval_dataset, _ = deduplicate_and_log_datasets(eval_dataset=dataset)
else:
eval_dataset = dataset
train_dataset = None
eval_dataset = dataset
else:
train_dataset = dataset
if cfg.dataset_exact_deduplication:
train_dataset, _, _ = deduplicate_and_log_datasets(train_dataset=dataset)
else:
train_dataset = dataset
eval_dataset = None
return train_dataset, eval_dataset, prompters

View File

@@ -1,6 +1,11 @@
"""data handling helpers"""
import hashlib
import logging
from datasets import Dataset
LOG = logging.getLogger("axolotl")
def md5(to_hash: str, encoding: str = "utf-8") -> str:
@@ -8,3 +13,96 @@ def md5(to_hash: str, encoding: str = "utf-8") -> str:
return hashlib.md5(to_hash.encode(encoding), usedforsecurity=False).hexdigest()
except TypeError:
return hashlib.md5(to_hash.encode(encoding)).hexdigest() # nosec
def sha256(to_hash: str, encoding: str = "utf-8") -> str:
return hashlib.sha256(to_hash.encode(encoding)).hexdigest()
def deduplicate_dataset(
dataset: Dataset, seen_hashes: dict[str, list[int]], other_dataset: Dataset = None
) -> Dataset:
unique_indices = []
for idx, row in enumerate(dataset):
row_hash = sha256(str(row)) # Using SHA256 for collision resistance.
if row_hash not in seen_hashes:
seen_hashes[row_hash] = [idx]
unique_indices.append(idx)
else:
# Check for collision by looking up the original dataset indices
original_indices = seen_hashes[row_hash]
is_duplicate = False
for original_idx in original_indices:
if (
not idx == original_idx
and original_idx < len(dataset)
and str(dataset[original_idx]) == str(row)
):
is_duplicate = True
break
# Check in the other dataset if provided
if other_dataset is not None:
if original_idx < len(other_dataset) and str(
other_dataset[original_idx]
) == str(row):
is_duplicate = True
break
if not is_duplicate:
seen_hashes[row_hash].append(idx)
unique_indices.append(idx)
continue
return dataset.select(unique_indices)
def deduplicate_and_log_datasets(
*,
train_dataset: Dataset = None,
eval_dataset: Dataset = None,
dataset: Dataset = None,
) -> tuple[Dataset, Dataset, Dataset]:
"""
Deduplicates train, eval, and an optional dataset if provided, logging original and new sizes.
Returns:
tuple: Deduplicated train, eval, and additional datasets.
"""
seen_hashes: dict[str, list[int]] = {}
# Handle cases where datasets are None
if train_dataset is not None:
LOG.info(
f"Starting deduplication for train dataset. Original size: {len(train_dataset)}"
)
train_dataset = deduplicate_dataset(
dataset=train_dataset, seen_hashes=seen_hashes
)
LOG.info(
f"Deduplication complete for train dataset. New size: {len(train_dataset)}"
)
else:
LOG.info("Train dataset is None. Skipping deduplication.")
if eval_dataset is not None:
LOG.info(
f"Starting deduplication for eval dataset. Original size: {len(eval_dataset)}"
)
eval_dataset = deduplicate_dataset(
dataset=eval_dataset, seen_hashes=seen_hashes, other_dataset=train_dataset
)
LOG.info(
f"Deduplication complete for eval dataset. New size: {len(eval_dataset)}"
)
else:
LOG.info("Eval dataset is None. Skipping deduplication.")
if dataset is not None and (eval_dataset is None and train_dataset is None):
LOG.info(
f"Starting deduplication for combined dataset. Original size: {len(dataset)}"
)
dataset = deduplicate_dataset(dataset=dataset, seen_hashes=seen_hashes)
LOG.info(
f"Deduplication complete for combined dataset. New size: {len(dataset)}"
)
return train_dataset, eval_dataset, dataset