Compare commits

..

3 Commits

Author SHA1 Message Date
Wing Lian
ee489d16bf wip 2025-02-27 11:42:46 -05:00
Wing Lian
d88e071120 sequence parallelism 2025-02-23 12:19:34 -05:00
Wing Lian
a4170030ab don't install extraneous old version of pydantic in ci and make sre to run multigpu ci (#2355) 2025-02-21 22:06:29 -05:00
13 changed files with 126 additions and 147 deletions

View File

@@ -4,6 +4,10 @@ on:
pull_request:
paths:
- 'tests/e2e/multigpu/*.py'
- 'requirements.txt'
- 'setup.py'
- 'pyproject.toml'
- '.github/workflows/multi-gpu-e2e.yml'
workflow_dispatch:
schedule:
- cron: '0 0 * * 1,4' # Runs at 00:00 UTC every monday & thursday

View File

@@ -37,15 +37,11 @@ temp_dir = tempfile.mkdtemp()
with open(pathlib.Path(temp_dir) / "Dockerfile", "w", encoding="utf-8") as f:
f.write(dockerfile_contents)
cicd_image = (
Image.from_dockerfile(
pathlib.Path(temp_dir) / "Dockerfile",
force_build=True,
gpu="A10G",
)
.env(df_args)
.pip_install("fastapi==0.110.0", "pydantic==2.6.3")
)
cicd_image = Image.from_dockerfile(
pathlib.Path(temp_dir) / "Dockerfile",
force_build=True,
gpu="A10G",
).env(df_args)
app = App("Axolotl CI/CD", secrets=[])

View File

@@ -123,8 +123,6 @@ class ModalCloud(Cloud):
if env := self.get_env():
image = image.env(env)
image = image.pip_install("fastapi==0.110.0", "pydantic==2.6.3")
return image
def get_secrets(self):

View File

@@ -59,6 +59,7 @@ from axolotl.core.training_args import (
AxolotlTrainingArguments,
)
from axolotl.integrations.base import PluginManager
from axolotl.monkeypatch.attention.sequence_parallel import USPRingAttnType, get_extract_fn
from axolotl.monkeypatch.multipack import SUPPORTED_MULTIPACK_MODEL_TYPES
from axolotl.monkeypatch.relora import ReLoRACallback
from axolotl.utils import is_comet_available, is_mlflow_available
@@ -746,6 +747,11 @@ class HFCausalTrainerBuilder(TrainerBuilderBase):
# A100 is best at 64, while others at 8. Let's use the larger so we don't have to check
# https://docs.nvidia.com/deeplearning/performance/dl-performance-matrix-multiplication/index.html
data_collator_kwargs["pad_to_multiple_of"] = 64
if self.cfg.sp_ulysses_degree:
data_collator_kwargs["sp_extract_fn"] = get_extract_fn(
USPRingAttnType.ZIGZAG,
sp_ulysses_degree=self.cfg.sp_ulysses_degree
)
if self.cfg.reward_model:
data_collator_kwargs["max_length"] = self.cfg.sequence_len
@@ -831,9 +837,7 @@ class HFCausalTrainerBuilder(TrainerBuilderBase):
if "max_length" in kwargs:
kwargs.pop("max_length")
elif use_batch_sampler_collator:
if self.cfg.flex_attention is True:
collator = V2BatchSamplerDataCollatorForSeq2Seq
elif self.cfg.model_config_type in SUPPORTED_MULTIPACK_MODEL_TYPES:
if self.cfg.model_config_type in SUPPORTED_MULTIPACK_MODEL_TYPES:
collator = V2BatchSamplerDataCollatorForSeq2Seq
elif (
self.cfg.model_config_type in ["llama"]

View File

@@ -206,6 +206,16 @@ class AxolotlTrainingMixins:
},
)
sp_ulysses_degree: Optional[int] = field(
default=None,
metadata={"help": "Ulysses parallelism for hybrid sequence parallel long context attn"},
)
sp_ring_degree: Optional[int] = field(
default=None,
metadata={"help": "Ring attention parallelism for sequence parallel long context attn"},
)
@dataclass
class AxolotlTrainingArguments(AxolotlTrainingMixins, TrainingArguments):

View File

@@ -0,0 +1,45 @@
from enum import Enum
from functools import partial
from transformers.modeling_utils import ALL_ATTENTION_FUNCTIONS
from yunchang import set_seq_parallel_pg, EXTRACT_FUNC_DICT
from axolotl.utils.distributed import get_world_size, get_rank
class USPRingAttnType(Enum):
BASIC = "basic"
ZIGZAG = "zigzag"
STRIPE = "stripe"
def apply_usp_attn_patch(ring_impl_type: USPRingAttnType):
from axolotl.monkeypatch.attention.sequence_parallel.usp import build_usp_fa_forward
fa_forward = build_usp_fa_forward(ring_impl_type)
ALL_ATTENTION_FUNCTIONS["flash_attention_2"] = fa_forward
def get_extract_fn(ring_impl_type: USPRingAttnType, sp_ulysses_degree: int):
fn = EXTRACT_FUNC_DICT["basic"]
if ring_impl_type.value in EXTRACT_FUNC_DICT.keys():
fn = EXTRACT_FUNC_DICT[ring_impl_type.value]
# map bad key upstream
elif ring_impl_type == USPRingAttnType.STRIPE:
fn = EXTRACT_FUNC_DICT["strip"]
world_size = get_world_size()
rd = world_size // sp_ulysses_degree
return partial(fn, rank=get_rank(), world_size=world_size, rd=rd, ud=sp_ulysses_degree)
def set_usp_parallel_group(sp_ulysses_degree):
"""
setup distributed parallel group for USP attention
make sure this gets called before building any USP attention modules
:param sp_ulysses_degree:
:return:
"""
world_size = get_world_size()
rank = get_rank()
sp_ring_degree = world_size // sp_ulysses_degree
set_seq_parallel_pg(sp_ulysses_degree, sp_ring_degree, rank, world_size)

View File

@@ -0,0 +1,36 @@
from enum import Enum
from typing import Optional, Tuple, Callable
import torch
from yunchang import LongContextAttention
from axolotl.monkeypatch.attention.sequence_parallel import USPRingAttnType
def build_usp_fa_forward(ring_impl_type: USPRingAttnType) -> Callable:
usp_attn = LongContextAttention(ring_impl_type.value)
def flash_attention_forward(
module: torch.nn.Module, # pylint: disable=unused-argument
query: torch.Tensor,
key: torch.Tensor,
value: torch.Tensor,
attention_mask: Optional[torch.Tensor], # pylint: disable=unused-argument
dropout: float = 0.0,
scaling: Optional[float] = None,
sliding_window: Optional[int] = None, # pylint: disable=unused-argument
softcap: Optional[float] = None,
**kwargs, # pylint: disable=unused-argument
) -> Tuple[torch.Tensor, None]:
attn_output = usp_attn(
query,
key,
value,
dropout_p=dropout,
softmax_scale=scaling,
causal=True,
softcap=softcap,
)
return attn_output, None
return flash_attention_forward

View File

@@ -95,103 +95,6 @@ def get_cu_seqlens(attn_mask):
return torch.stack(results).to(dtype=torch.int32), torch.stack(max_seq_lens)
def get_packed_mask_from_pos_ids(position_ids):
if len(position_ids.shape) == 1:
position_ids = position_ids.unsqueeze(0)
device = position_ids.device
results = []
for i, row in enumerate(position_ids):
# Count the number of consecutive zeros from the right side
padding_length = (row == 0).int().flip(dims=[0]).cumprod(dim=0).sum().item()
# Adjust the row to exclude padding
adjusted_row = row[:-padding_length] if padding_length else row.clone()
# Find where the position resets to 0 (indicating a new sequence)
seq_starts = torch.cat(
[
torch.tensor([True], dtype=torch.bool, device=device),
adjusted_row[1:] == 0,
]
)
# Get the indices where the sequence starts
start_indices = torch.cat(
[
torch.nonzero(seq_starts).unbind(dim=1)[0],
torch.tensor([len(adjusted_row)], dtype=torch.int32, device=device),
]
)
# Calculate the sequence lengths
seq_lengths = start_indices[1:] - start_indices[:-1]
# Append the padding length to the sequence lengths
doc_mask = torch.ones(len(row), dtype=torch.int32, device=device)
for i, seq_len in enumerate(seq_lengths):
start_id = start_indices[i]
doc_mask[start_id : start_id + seq_len] = (
(i+1) * doc_mask[start_id : start_id + seq_len]
)
if padding_length:
doc_mask[len(adjusted_row) :] = 0 * doc_mask[len(adjusted_row) :]
results.append(doc_mask)
return torch.stack(results)
def get_seqlens_from_pos_ids(position_ids):
"""generate a sequence length set using pos ids for doc mask creation in flex attention"""
if len(position_ids.shape) == 1:
position_ids = position_ids.unsqueeze(0)
max_seq_len = position_ids.shape[1]
device = position_ids.device
results = []
totalseqlens = []
for row in position_ids:
# Count the number of consecutive zeros from the right side
padding_length = (row == 0).int().flip(dims=[0]).cumprod(dim=0).sum().item()
# Adjust the row to exclude padding
adjusted_row = row[:-padding_length] if padding_length else row.clone()
# Find where the position resets to 0 (indicating a new sequence)
seq_starts = torch.cat(
[
torch.tensor([True], dtype=torch.bool, device=device),
adjusted_row[1:] == 0,
]
)
# Get the indices where the sequence starts
start_indices = torch.cat(
[
torch.nonzero(seq_starts).unbind(dim=1)[0],
torch.tensor([len(adjusted_row)], dtype=torch.int32, device=device),
]
)
# Calculate the sequence lengths
seq_lengths = start_indices[1:] - start_indices[:-1]
# Append the padding length to the sequence lengths
if padding_length:
seq_lengths = torch.cat(
[
seq_lengths,
torch.tensor(
[len(row) - torch.sum(seq_lengths)],
dtype=torch.int32,
device=device,
),
]
)
results.append(seq_lengths)
totalseqlens.append(len(adjusted_row))
return results, torch.tensor(totalseqlens, dtype=torch.int32, device=device)
def get_cu_seqlens_from_pos_ids(position_ids):
"""generate a cumulative sequence length mask for flash attention using pos ids"""
if len(position_ids.shape) == 1:
@@ -273,10 +176,7 @@ def mask_2d_to_4d(
when they attend to each other within that sequence.
This expansion transforms the mask to lower triangular form to prevent future peeking.
"""
if len(mask.size()) == 4:
return mask
bsz, src_len = int(mask.size()[0]), int(mask.size()[1])
bsz, src_len = mask.size()
tgt_len = tgt_len if tgt_len is not None else src_len
mask = mask.unsqueeze(1).unsqueeze(2)

View File

@@ -3,7 +3,7 @@ DataCollator for axolotl to pad labels and position_ids for packed sequences
"""
from dataclasses import dataclass
from typing import Any, Optional, Union
from typing import Any, Optional, Union, Callable
import numpy as np
from transformers import PreTrainedTokenizerBase
@@ -53,6 +53,7 @@ class DataCollatorForSeq2Seq:
label_pad_token_id: int = -100
position_pad_token_id: int = 0
return_tensors: str = "pt"
sp_extract_fn: Optional[Callable] = None
def __call__(self, features, return_tensors=None):
labels = None
@@ -121,6 +122,10 @@ class DataCollatorForSeq2Seq:
return features
def seq_parallel_split(self, features):
if self.sp_extract_fn:
pass
return features
@dataclass
class BatchSamplerDataCollatorForSeq2Seq(DataCollatorForSeq2Seq):

View File

@@ -823,7 +823,6 @@ class AxolotlInputConfig(
xformers_attention: Optional[bool] = None
sdp_attention: Optional[bool] = None
s2_attention: Optional[bool] = None
flex_attention: Optional[bool] = None
flash_attention: Optional[bool] = None
flash_attn_cross_entropy: Optional[bool] = None
flash_attn_rms_norm: Optional[bool] = None
@@ -833,6 +832,8 @@ class AxolotlInputConfig(
eager_attention: Optional[bool] = None
sp_ulysses_degree: Optional[int] = None
unsloth_cross_entropy_loss: Optional[bool] = None
unsloth_lora_mlp: Optional[bool] = None
unsloth_lora_qkv: Optional[bool] = None
@@ -1790,26 +1791,6 @@ class AxolotlConfigWCapabilities(AxolotlInputConfig):
)
return data
@model_validator(mode="before")
@classmethod
def check_flex_torch_version(cls, data):
if (data.get("flex_attention") is not None) and (
data.get("flex_attention") is True
):
env_capabilities = data.get("env_capabilities", {})
torch_version = env_capabilities.get("torch_version")
if torch_version is None:
import torch
torch_version = str(torch.__version__).split("+", maxsplit=1)[0]
if version.parse(torch_version) < version.parse("2.5.1"):
raise ValueError(
"Flex attention is not supported on torch version < 2.5.1"
)
return data
@model_validator(mode="before")
@classmethod
def check_torch_compile_auto(cls, data):

View File

@@ -86,6 +86,12 @@ def get_world_size():
return int(os.getenv("WORLD_SIZE", "1"))
def get_rank():
if not is_distributed():
return 0
return dist.get_rank()
@contextmanager
def zero_only():
"""

View File

@@ -403,7 +403,7 @@ class ModelLoader:
if (
self.cfg.model_config_type in SUPPORTED_MULTIPACK_MODEL_TYPES
and (self.cfg.flash_attention or self.cfg.flex_attention)
and self.cfg.flash_attention
and self.cfg.sample_packing
):
if "auto_map" in self.model_config:
@@ -707,13 +707,7 @@ class ModelLoader:
"""
sample packing uses custom FA2 patch
"""
if self.cfg.flex_attention:
self.model_kwargs["attn_implementation"] = "flex_attention"
self.model_config._attn_implementation = ( # pylint: disable=protected-access
"flex_attention"
)
elif self.cfg.flash_attention:
if self.cfg.flash_attention:
if not self.cfg.sample_packing and self.cfg.s2_attention:
pass
self.model_kwargs["attn_implementation"] = "flash_attention_2"
@@ -1119,7 +1113,7 @@ class ModelLoader:
should_convert = (
# LlamaRMSNorm layers are in fp32 after kbit_training or full finetune, so we need to
# convert them back to fp16/bf16 for flash-attn compatibility.
((needs_fa2_dtype or self.cfg.flash_attention or self.cfg.flex_attention) and not qlora_fsdp)
((needs_fa2_dtype or self.cfg.flash_attention) and not qlora_fsdp)
or self.cfg.cut_cross_entropy # Cut cross entropy requires embedding layers to be in fp16/bf16 for backward pass
)

View File

@@ -346,7 +346,7 @@ def process_datasets_for_packing(cfg, train_dataset, eval_dataset):
load_from_cache_file=not cfg.is_preprocess,
desc="Add position_id column (PoSE)",
)
elif cfg.sample_packing:
elif cfg.sample_packing or cfg.sp_ulysses_degree:
drop_long_kwargs = {}
if filter_map_kwargs:
drop_long_kwargs["desc"] = "Add position_id column (Sample Packing)"