feat: Add opt-out Telemetry (#3237)
* initial telemetry manager impl
* adding todo
* updates
* updates
* progress on telemetry: config load, process, model load, train start / end, error tracking
* update error file path sanitization function; adding more error tracking
* updated sanitization logic, tests
* adding runtime metrics (cpu + gpu memory, steps/s, etc.)
* tests for runtime metrics telemetry and assoc. callback
* small update / fix
* simplifying path redaction
* sleep on all ranks in distributed setting
* adding back in base_model redaction w/ whitelist
* fix
* doc update
* improved redaction, send system info during model config load telemetry, etc.
* adding runtime metrics / system info additional accelerator support, etc.
* adding runtime metrics / system info additional accelerator support, etc.
* remove duplicate info
* fixes
* fix issue with tests in ci
* distributed fix
* opt-in version of telemetry
* enable / disable logic update
* docs fix
* doc update
* minor fixes
* simplifying
* slight changes
* fix
* lint
* update posthog dep
* coderabbit comments
* fix: opt-in model
* fix: increase time since last
* fix: increase whitelist orgs
* fix: posthog init and shutdown
* fix: imports
* fix: also check grad norm
* fix: duplicate plugin_manager calls
* fix: bad merge
* chore: update docs
* fix: cache process per comment
* fix: error handling
* fix: tests
* Revert "fix: error handling"
This reverts commit 22d1ea5755.
* fix: test telemetry error_handled bool
* fix: revert test
* chore: final doc fixes
---------
Co-authored-by: Dan Saunders <danjsaund@gmail.com>
Co-authored-by: Dan Saunders <dan@axolotl.ai>
This commit is contained in:
@@ -14,6 +14,8 @@ import yaml
|
||||
from transformers.utils import is_torch_bf16_gpu_available
|
||||
|
||||
from axolotl.integrations.base import PluginManager
|
||||
from axolotl.telemetry.errors import send_errors
|
||||
from axolotl.telemetry.manager import TelemetryManager
|
||||
from axolotl.utils.comet_ import setup_comet_env_vars
|
||||
from axolotl.utils.config import (
|
||||
normalize_cfg_datasets,
|
||||
@@ -31,6 +33,8 @@ LOG = get_logger(__name__)
|
||||
|
||||
API_KEY_FIELDS = {"comet_api_key"}
|
||||
|
||||
TELEMETRY_MANAGER = TelemetryManager.get_instance()
|
||||
|
||||
|
||||
def check_remote_config(config: Union[str, Path]) -> Union[str, Path]:
|
||||
"""
|
||||
@@ -164,6 +168,7 @@ def plugin_set_cfg(cfg: DictDefault):
|
||||
plugin_manager.cfg = cfg
|
||||
|
||||
|
||||
@send_errors
|
||||
def load_cfg(
|
||||
config: str | Path | DictDefault = Path("examples/"), **kwargs
|
||||
) -> DictDefault:
|
||||
@@ -197,6 +202,8 @@ def load_cfg(
|
||||
temp_file.close()
|
||||
cfg.axolotl_config_path = temp_file.name
|
||||
|
||||
TELEMETRY_MANAGER.send_event(event_type="config-loaded", properties=cfg)
|
||||
|
||||
# If there are any options passed in the cli, if it is something that seems valid
|
||||
# from the yaml, then overwrite the value
|
||||
cfg_keys = cfg.keys()
|
||||
@@ -240,6 +247,7 @@ def load_cfg(
|
||||
setup_comet_env_vars(cfg)
|
||||
plugin_set_cfg(cfg)
|
||||
|
||||
TELEMETRY_MANAGER.send_event(event_type="config-processed", properties=cfg)
|
||||
cfg_to_log = {
|
||||
k: "[REDACTED]" if k in API_KEY_FIELDS else v
|
||||
for k, v in cfg.items()
|
||||
|
||||
@@ -19,7 +19,10 @@ from axolotl.cli.utils.diffusion import (
|
||||
launch_diffusion_gradio_ui,
|
||||
)
|
||||
from axolotl.integrations.base import PluginManager
|
||||
from axolotl.utils.chat_templates import get_chat_template_from_config
|
||||
from axolotl.telemetry.errors import send_errors
|
||||
from axolotl.utils.chat_templates import (
|
||||
get_chat_template_from_config,
|
||||
)
|
||||
from axolotl.utils.dict import DictDefault
|
||||
from axolotl.utils.logging import get_logger
|
||||
|
||||
@@ -43,6 +46,7 @@ def get_multi_line_input() -> str:
|
||||
return instruction
|
||||
|
||||
|
||||
@send_errors
|
||||
def do_inference(
|
||||
*,
|
||||
cfg: DictDefault,
|
||||
@@ -160,6 +164,7 @@ def do_inference(
|
||||
print(tokenizer.decode(generated["sequences"].cpu().tolist()[0]))
|
||||
|
||||
|
||||
@send_errors
|
||||
def do_inference_gradio(
|
||||
*,
|
||||
cfg: DictDefault,
|
||||
|
||||
@@ -7,12 +7,14 @@ import fire
|
||||
|
||||
from axolotl.cli.config import load_cfg
|
||||
from axolotl.cli.utils import load_model_and_tokenizer
|
||||
from axolotl.telemetry.errors import send_errors
|
||||
from axolotl.utils.dict import DictDefault
|
||||
from axolotl.utils.logging import get_logger
|
||||
|
||||
LOG = get_logger(__name__)
|
||||
|
||||
|
||||
@send_errors
|
||||
def do_merge_lora(*, cfg: DictDefault) -> None:
|
||||
"""
|
||||
Calls `transformers`' `merge_and_unload` on the model given in the `axolotl` config
|
||||
|
||||
@@ -23,6 +23,7 @@ from safetensors.torch import save_file as safe_save_file
|
||||
from torch.distributed.checkpoint.format_utils import _EmptyStateDictLoadPlanner
|
||||
|
||||
from axolotl.cli.config import load_cfg
|
||||
from axolotl.telemetry.errors import send_errors
|
||||
from axolotl.utils.logging import get_logger
|
||||
from axolotl.utils.train import determine_last_checkpoint
|
||||
|
||||
@@ -118,6 +119,7 @@ def _distributed_checkpoint_to_merged_weights(
|
||||
return save_path_
|
||||
|
||||
|
||||
@send_errors
|
||||
def merge_fsdp_weights(
|
||||
checkpoint_dir: str,
|
||||
output_path: str,
|
||||
|
||||
@@ -17,6 +17,7 @@ from axolotl.cli.config import load_cfg
|
||||
from axolotl.common.const import DEFAULT_DATASET_PREPARED_PATH
|
||||
from axolotl.common.datasets import load_datasets, load_preference_datasets
|
||||
from axolotl.integrations.base import PluginManager
|
||||
from axolotl.telemetry.errors import send_errors
|
||||
from axolotl.utils.dict import DictDefault
|
||||
from axolotl.utils.logging import get_logger
|
||||
from axolotl.utils.trainer import disable_datasets_caching
|
||||
@@ -24,6 +25,7 @@ from axolotl.utils.trainer import disable_datasets_caching
|
||||
LOG = get_logger(__name__)
|
||||
|
||||
|
||||
@send_errors
|
||||
def do_preprocess(cfg: DictDefault, cli_args: PreprocessCliArgs) -> None:
|
||||
"""
|
||||
Preprocesses dataset specified in axolotl config.
|
||||
|
||||
@@ -9,6 +9,7 @@ from datasets import Dataset
|
||||
import axolotl.monkeypatch.data.batch_dataset_fetcher # noqa: F401
|
||||
from axolotl.cli.args import PreprocessCliArgs, TrainerCliArgs
|
||||
from axolotl.loaders import load_processor, load_tokenizer
|
||||
from axolotl.telemetry.errors import send_errors
|
||||
from axolotl.utils.data import prepare_datasets, prepare_preference_datasets
|
||||
from axolotl.utils.dict import DictDefault
|
||||
from axolotl.utils.logging import get_logger
|
||||
@@ -34,6 +35,7 @@ def sample_dataset(dataset: Dataset, num_samples: int) -> Dataset:
|
||||
)
|
||||
|
||||
|
||||
@send_errors
|
||||
def load_datasets(
|
||||
*,
|
||||
cfg: DictDefault,
|
||||
@@ -96,6 +98,7 @@ def load_datasets(
|
||||
)
|
||||
|
||||
|
||||
@send_errors
|
||||
def load_preference_datasets(
|
||||
*, cfg: DictDefault, cli_args: PreprocessCliArgs | TrainerCliArgs | None = None
|
||||
) -> TrainDatasetMeta:
|
||||
|
||||
@@ -29,6 +29,8 @@ from transformers.trainer_pt_utils import AcceleratorConfig
|
||||
|
||||
from axolotl.integrations.base import PluginManager
|
||||
from axolotl.monkeypatch.trainer.lr import patch_trainer_get_lr
|
||||
from axolotl.telemetry.callbacks import TelemetryCallback
|
||||
from axolotl.telemetry.manager import TelemetryManager
|
||||
from axolotl.utils import (
|
||||
is_comet_available,
|
||||
is_mlflow_available,
|
||||
@@ -162,6 +164,10 @@ class TrainerBuilderBase(abc.ABC):
|
||||
)
|
||||
)
|
||||
|
||||
telemetry_manager = TelemetryManager.get_instance()
|
||||
if telemetry_manager.enabled:
|
||||
callbacks.append(TelemetryCallback())
|
||||
|
||||
return callbacks
|
||||
|
||||
def get_post_trainer_create_callbacks(self, trainer):
|
||||
|
||||
@@ -10,6 +10,7 @@ import torch
|
||||
from datasets import Dataset
|
||||
from transformers.trainer import Trainer
|
||||
|
||||
from axolotl.telemetry.errors import send_errors
|
||||
from axolotl.train import (
|
||||
TrainDatasetMeta,
|
||||
setup_model_and_tokenizer,
|
||||
@@ -63,6 +64,7 @@ def evaluate_dataset(
|
||||
return metrics
|
||||
|
||||
|
||||
@send_errors
|
||||
def evaluate(*, cfg: DictDefault, dataset_meta: TrainDatasetMeta) -> Dict[str, float]:
|
||||
"""
|
||||
Evaluate a model on training and validation datasets.
|
||||
|
||||
@@ -20,6 +20,7 @@ from peft import (
|
||||
from transformers import PreTrainedModel
|
||||
|
||||
from axolotl.loaders.utils import get_linear_embedding_layers
|
||||
from axolotl.telemetry.errors import send_errors
|
||||
from axolotl.utils.dict import DictDefault
|
||||
from axolotl.utils.logging import get_logger
|
||||
|
||||
@@ -172,6 +173,7 @@ def load_lora(
|
||||
return model, lora_config
|
||||
|
||||
|
||||
@send_errors
|
||||
def load_adapter(
|
||||
model: PreTrainedModel,
|
||||
cfg: DictDefault,
|
||||
|
||||
@@ -49,6 +49,7 @@ from axolotl.loaders.utils import (
|
||||
load_model_config,
|
||||
)
|
||||
from axolotl.models.mamba import fix_mamba_attn_for_loss
|
||||
from axolotl.telemetry.errors import send_errors
|
||||
from axolotl.utils.bench import log_gpu_memory_usage
|
||||
from axolotl.utils.dict import DictDefault
|
||||
from axolotl.utils.distributed import (
|
||||
@@ -158,6 +159,7 @@ class ModelLoader:
|
||||
"""Property that determines if FSDP with QLoRA is enabled."""
|
||||
return self.is_fsdp_enabled and self.cfg.adapter == "qlora"
|
||||
|
||||
@send_errors
|
||||
def load(self) -> tuple[PreTrainedModel | PeftModelForCausalLM, PeftConfig | None]:
|
||||
"""Load and prepare the model with all configurations and patches.
|
||||
|
||||
|
||||
@@ -6,12 +6,14 @@ from transformers import (
|
||||
PreTrainedTokenizerBase,
|
||||
)
|
||||
|
||||
from axolotl.telemetry.errors import send_errors
|
||||
from axolotl.utils.dict import DictDefault
|
||||
from axolotl.utils.logging import get_logger
|
||||
|
||||
LOG = get_logger(__name__)
|
||||
|
||||
|
||||
@send_errors
|
||||
def load_processor(cfg: DictDefault, tokenizer: PreTrainedTokenizerBase):
|
||||
processor_cls = AutoProcessor
|
||||
if cfg.processor_type:
|
||||
|
||||
@@ -13,6 +13,7 @@ from transformers import (
|
||||
from axolotl.integrations.base import PluginManager
|
||||
from axolotl.loaders.utils import get_linear_embedding_layers, load_model_config
|
||||
from axolotl.prompt_tokenizers import LLAMA_DEFAULT_EOS_TOKEN
|
||||
from axolotl.telemetry.errors import send_errors
|
||||
from axolotl.utils.chat_templates import get_chat_template_from_config
|
||||
from axolotl.utils.dict import DictDefault
|
||||
from axolotl.utils.distributed import (
|
||||
@@ -119,6 +120,7 @@ def modify_tokenizer_files(
|
||||
return tokenizer_dir
|
||||
|
||||
|
||||
@send_errors
|
||||
def load_tokenizer(cfg: DictDefault) -> PreTrainedTokenizer:
|
||||
"""Load and configure the tokenizer based on the provided config."""
|
||||
|
||||
|
||||
0
src/axolotl/telemetry/__init__.py
Normal file
0
src/axolotl/telemetry/__init__.py
Normal file
165
src/axolotl/telemetry/callbacks.py
Normal file
165
src/axolotl/telemetry/callbacks.py
Normal file
@@ -0,0 +1,165 @@
|
||||
"""Trainer callbacks for reporting runtime metrics at regular intervals."""
|
||||
|
||||
import logging
|
||||
import time
|
||||
|
||||
from transformers import (
|
||||
TrainerCallback,
|
||||
TrainerControl,
|
||||
TrainerState,
|
||||
TrainingArguments,
|
||||
)
|
||||
|
||||
from axolotl.telemetry.manager import TelemetryManager
|
||||
from axolotl.telemetry.runtime_metrics import RuntimeMetricsTracker
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
TIME_SINCE_LAST = 60
|
||||
|
||||
|
||||
class TelemetryCallback(TrainerCallback):
|
||||
"""
|
||||
Trainer callback for tracking and reporting runtime metrics.
|
||||
|
||||
This callback tracks training progress, runtime, and memory usage,
|
||||
sending telemetry at configurable intervals.
|
||||
"""
|
||||
|
||||
report_interval_steps: int = 100
|
||||
|
||||
def __init__(self):
|
||||
"""Initialize the metrics callback."""
|
||||
self.tracker = RuntimeMetricsTracker()
|
||||
self.telemetry_manager = TelemetryManager.get_instance()
|
||||
self.current_epoch = -1
|
||||
self.start_time = time.time()
|
||||
self.last_report_time = None
|
||||
self.last_report_step = 0
|
||||
|
||||
# pylint: disable=unused-argument
|
||||
def on_train_begin(
|
||||
self,
|
||||
args: TrainingArguments,
|
||||
state: TrainerState,
|
||||
control: TrainerControl,
|
||||
**kwargs,
|
||||
):
|
||||
"""Handle training start."""
|
||||
self.telemetry_manager.send_event(event_type="train-start")
|
||||
|
||||
# pylint: disable=unused-argument
|
||||
def on_train_end(
|
||||
self,
|
||||
args: TrainingArguments,
|
||||
state: TrainerState,
|
||||
control: TrainerControl,
|
||||
**kwargs,
|
||||
):
|
||||
"""Handle training end."""
|
||||
# Send training completion event
|
||||
self.telemetry_manager.send_event(
|
||||
event_type="train-end",
|
||||
properties=self._extract_last_metrics(state)
|
||||
| self.tracker.metrics.to_dict(),
|
||||
)
|
||||
|
||||
# pylint: disable=unused-argument
|
||||
def on_epoch_begin(
|
||||
self,
|
||||
args: TrainingArguments,
|
||||
state: TrainerState,
|
||||
control: TrainerControl,
|
||||
**kwargs,
|
||||
):
|
||||
"""Handle epoch start."""
|
||||
self.current_epoch += 1
|
||||
self.tracker.start_epoch(self.current_epoch)
|
||||
|
||||
# pylint: disable=unused-argument
|
||||
def on_epoch_end(
|
||||
self,
|
||||
args: TrainingArguments,
|
||||
state: TrainerState,
|
||||
control: TrainerControl,
|
||||
**kwargs,
|
||||
):
|
||||
"""Handle epoch end."""
|
||||
self.tracker.end_epoch(self.current_epoch)
|
||||
|
||||
# pylint: disable=unused-argument
|
||||
def on_step_end(
|
||||
self,
|
||||
args: TrainingArguments,
|
||||
state: TrainerState,
|
||||
control: TrainerControl,
|
||||
**kwargs,
|
||||
):
|
||||
"""Handle step end."""
|
||||
step = state.global_step
|
||||
self.tracker.update_step(step)
|
||||
|
||||
# Check if we should report metrics
|
||||
should_report = (
|
||||
step % self.report_interval_steps == 0
|
||||
or step == 1 # Always report first step
|
||||
or step - self.last_report_step >= self.report_interval_steps
|
||||
)
|
||||
|
||||
if should_report:
|
||||
current_time = time.time()
|
||||
if self.last_report_time is not None:
|
||||
time_since_last_report = current_time - self.last_report_time
|
||||
else:
|
||||
time_since_last_report = current_time - self.start_time
|
||||
steps_since_last_report = step - self.last_report_step
|
||||
|
||||
# Only report if enough time has passed
|
||||
if (
|
||||
step == 1
|
||||
or time_since_last_report >= TIME_SINCE_LAST
|
||||
or steps_since_last_report >= self.report_interval_steps
|
||||
):
|
||||
# Calculate steps per second for this interval
|
||||
if time_since_last_report > 0 and steps_since_last_report > 0:
|
||||
steps_per_second = steps_since_last_report / time_since_last_report
|
||||
else:
|
||||
steps_per_second = 0
|
||||
|
||||
# Update memory metrics
|
||||
self.tracker.update_memory_metrics()
|
||||
|
||||
# Prepare metrics to report
|
||||
metrics = self._extract_last_metrics(state) | {
|
||||
"step": step,
|
||||
"epoch": self.current_epoch,
|
||||
"progress": state.epoch, # Fractional epoch progress
|
||||
"steps_per_second": steps_per_second,
|
||||
"elapsed_time": current_time - self.start_time,
|
||||
"time_since_last_report": time_since_last_report,
|
||||
}
|
||||
|
||||
# Add memory metrics
|
||||
memory_metrics = self.tracker.get_memory_metrics()
|
||||
metrics.update({"memory": memory_metrics})
|
||||
|
||||
# Send telemetry
|
||||
self.telemetry_manager.send_event(
|
||||
event_type="train-progress", properties=metrics
|
||||
)
|
||||
|
||||
# Update last report time and step
|
||||
self.last_report_time = current_time
|
||||
self.last_report_step = step
|
||||
|
||||
def _extract_last_metrics(self, state: TrainerState) -> dict:
|
||||
"""Extract last loss, learning_rate, and grad_norm from log history."""
|
||||
if not state.log_history:
|
||||
return {"loss": 0, "learning_rate": 0, "grad_norm": 0}
|
||||
|
||||
last_log = state.log_history[-1]
|
||||
return {
|
||||
"loss": last_log.get("loss", 0),
|
||||
"learning_rate": last_log.get("learning_rate", 0),
|
||||
"grad_norm": last_log.get("grad_norm", 0),
|
||||
}
|
||||
160
src/axolotl/telemetry/errors.py
Normal file
160
src/axolotl/telemetry/errors.py
Normal file
@@ -0,0 +1,160 @@
|
||||
"""Telemetry utilities for exception and traceback information."""
|
||||
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import traceback
|
||||
from functools import wraps
|
||||
from inspect import getmodule
|
||||
from typing import Any, Callable
|
||||
|
||||
from axolotl.telemetry.manager import TelemetryManager
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
ERROR_HANDLED = False
|
||||
|
||||
|
||||
def sanitize_stack_trace(stack_trace: str) -> str:
|
||||
"""
|
||||
Remove personal information from stack trace messages while keeping Python package codepaths.
|
||||
|
||||
This function identifies Python packages by looking for common patterns in virtual environment
|
||||
and site-packages directories, preserving the package path while removing user-specific paths.
|
||||
|
||||
Args:
|
||||
stack_trace: The original stack trace string.
|
||||
|
||||
Returns:
|
||||
A sanitized version of the stack trace with Python package paths preserved.
|
||||
"""
|
||||
# Split the stack trace into lines to process each file path separately
|
||||
lines = stack_trace.split("\n")
|
||||
sanitized_lines = []
|
||||
|
||||
# Regular expression to find file paths in the stack trace
|
||||
path_pattern = re.compile(r'(?:File ")(.*?)(?:")')
|
||||
|
||||
# Regular expression to identify paths in site-packages or dist-packages
|
||||
# This matches path segments like "site-packages/package_name" or "dist-packages/package_name"
|
||||
site_packages_pattern = re.compile(
|
||||
r"(?:site-packages|dist-packages)[/\\]([\w\-\.]+)"
|
||||
)
|
||||
|
||||
# Additional common virtual environment patterns
|
||||
venv_lib_pattern = re.compile(
|
||||
r"(?:lib|Lib)[/\\](?:python\d+(?:\.\d+)?[/\\])?(?:site-packages|dist-packages)[/\\]([\w\-\.]+)"
|
||||
)
|
||||
|
||||
for line in lines:
|
||||
# Check if this line contains a file path
|
||||
path_match = path_pattern.search(line)
|
||||
|
||||
if path_match:
|
||||
full_path = path_match.group(1)
|
||||
sanitized_path = ""
|
||||
|
||||
# Try to match site-packages pattern
|
||||
site_packages_match = site_packages_pattern.search(full_path)
|
||||
venv_lib_match = venv_lib_pattern.search(full_path)
|
||||
|
||||
if site_packages_match:
|
||||
# Find the index where the matched pattern starts
|
||||
idx = full_path.find("site-packages")
|
||||
if idx == -1:
|
||||
idx = full_path.find("dist-packages")
|
||||
|
||||
# Keep from 'site-packages' onward
|
||||
if idx >= 0:
|
||||
sanitized_path = full_path[idx:]
|
||||
elif venv_lib_match:
|
||||
# For other virtual environment patterns, find the package directory
|
||||
match_idx = venv_lib_match.start(1)
|
||||
if match_idx > 0:
|
||||
# Keep from the package name onward
|
||||
package_name = venv_lib_match.group(1)
|
||||
idx = full_path.rfind(
|
||||
package_name, 0, match_idx + len(package_name)
|
||||
)
|
||||
if idx >= 0:
|
||||
sanitized_path = full_path[idx:]
|
||||
|
||||
# If we couldn't identify a package pattern but path contains 'axolotl'
|
||||
elif "axolotl" in full_path:
|
||||
idx = full_path.rfind("axolotl")
|
||||
if idx >= 0:
|
||||
sanitized_path = full_path[idx:]
|
||||
|
||||
# Apply the sanitization to the line
|
||||
if sanitized_path:
|
||||
line = line.replace(full_path, sanitized_path)
|
||||
else:
|
||||
# If we couldn't identify a package pattern, just keep the filename
|
||||
filename = os.path.basename(full_path)
|
||||
if filename:
|
||||
line = line.replace(full_path, filename)
|
||||
else:
|
||||
line = line.replace(full_path, "")
|
||||
|
||||
sanitized_lines.append(line)
|
||||
|
||||
return "\n".join(sanitized_lines)
|
||||
|
||||
|
||||
def send_errors(func: Callable) -> Callable:
|
||||
"""
|
||||
Decorator to send exception info in a function. If an exception is raised, we send
|
||||
telemetry containing the stack trace and error message.
|
||||
|
||||
If an error occurs in a decorated function that is called by another decorated
|
||||
function, we'll only send telemetry corresponding to the lower-level function.
|
||||
|
||||
Args:
|
||||
func: Function to decorate.
|
||||
|
||||
Returns:
|
||||
Decorated function.
|
||||
"""
|
||||
|
||||
@wraps(func)
|
||||
def wrapper(*args, **kwargs) -> Any:
|
||||
telemetry_manager = TelemetryManager.get_instance()
|
||||
|
||||
if not telemetry_manager.enabled:
|
||||
return func(*args, **kwargs)
|
||||
|
||||
try:
|
||||
return func(*args, **kwargs)
|
||||
except Exception as exception:
|
||||
# Only track if we're not already handling an error. This prevents us from
|
||||
# capturing an error more than once in nested decorated function calls.
|
||||
global ERROR_HANDLED # pylint: disable=global-statement
|
||||
if not ERROR_HANDLED:
|
||||
ERROR_HANDLED = True
|
||||
|
||||
# Get function module path
|
||||
module = getmodule(func)
|
||||
module_path = (
|
||||
f"{module.__name__}.{func.__name__}" if module else func.__name__
|
||||
)
|
||||
|
||||
# Get stack trace
|
||||
stack_trace = "".join(
|
||||
traceback.format_exception(
|
||||
type(exception), exception, exception.__traceback__
|
||||
)
|
||||
)
|
||||
stack_trace = sanitize_stack_trace(stack_trace)
|
||||
|
||||
# Send error telemetry
|
||||
telemetry_manager.send_event(
|
||||
event_type=f"{module_path}-error",
|
||||
properties={
|
||||
"exception": str(exception),
|
||||
"stack_trace": stack_trace,
|
||||
},
|
||||
)
|
||||
|
||||
raise
|
||||
|
||||
return wrapper
|
||||
416
src/axolotl/telemetry/manager.py
Normal file
416
src/axolotl/telemetry/manager.py
Normal file
@@ -0,0 +1,416 @@
|
||||
"""Telemetry manager and associated utilities."""
|
||||
|
||||
import atexit
|
||||
import importlib
|
||||
import logging
|
||||
import os
|
||||
import platform
|
||||
import time
|
||||
import uuid
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import posthog
|
||||
import psutil
|
||||
import torch
|
||||
import yaml
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
POSTHOG_HOST = "https://app.posthog.com"
|
||||
POSTHOG_WRITE_KEY = "phc_1kUR0o04oJKKTTeSsIz2Mfm5mpiVsQEf2WOlzljMD7y"
|
||||
|
||||
OPT_OUT_WARNING_SLEEP_SECONDS = 10
|
||||
OPT_OUT_WARNING = (
|
||||
"\nTelemetry is now enabled by default to help improve Axolotl. "
|
||||
"If you'd like to disable it, set AXOLOTL_DO_NOT_TRACK=1 in your environment.\n\n"
|
||||
"Telemetry data helps us understand:\n"
|
||||
"- Which features are most used\n"
|
||||
"- What hardware configurations to prioritize\n"
|
||||
"- Where users encounter errors\n\n"
|
||||
"Personally identifiable information (PII) is not collected.\n\n"
|
||||
"To remove this warning, explicitly set AXOLOTL_DO_NOT_TRACK=0 (enable telemetry) "
|
||||
"or AXOLOTL_DO_NOT_TRACK=1 (disable telemetry).\n\n"
|
||||
"For details, see: https://docs.axolotl.ai/docs/telemetry.html\n\n"
|
||||
f"Sleeping for {OPT_OUT_WARNING_SLEEP_SECONDS}s..."
|
||||
)
|
||||
|
||||
WHITELIST_PATH = str(Path(__file__).parent / "whitelist.yaml")
|
||||
|
||||
# NOTE: Need to keep these up to date with any config schema changes
|
||||
FIELDS_TO_REDACT = {
|
||||
"base_model",
|
||||
"tokenizer_config",
|
||||
"base_model_config",
|
||||
"pretraining_dataset", # NOTE: this field may be a string or a dictionary
|
||||
"resume_from_checkpoint",
|
||||
"hub_model_id",
|
||||
}
|
||||
PREFIXES_TO_REDACT = {"wandb_", "comet_", "mlflow_", "gradio_"}
|
||||
PATH_INDICATORS = {"path", "dir"}
|
||||
|
||||
# pylint: disable=duplicate-code
|
||||
RELEVANT_PACKAGES = {
|
||||
"torch",
|
||||
"transformers",
|
||||
"trl",
|
||||
"datasets",
|
||||
"peft",
|
||||
"bitsandbytes",
|
||||
"accelerate",
|
||||
"optimum",
|
||||
"deepspeed",
|
||||
"ray",
|
||||
"axolotl",
|
||||
"triton",
|
||||
"mamba-ssm",
|
||||
"flash-attn",
|
||||
"xformers",
|
||||
"autoawq",
|
||||
"tokenizers",
|
||||
"sentencepiece",
|
||||
"torchao",
|
||||
"lm_eval",
|
||||
}
|
||||
|
||||
|
||||
def is_main_process() -> bool:
|
||||
"""
|
||||
Check whether we're running in the main process.
|
||||
|
||||
Note:
|
||||
We're using this function instead of `torch.utils.distributed.is_main_process`
|
||||
causes issues with DeepSpeed world_size since. This function avoids that issue
|
||||
by checking env vars that are set by various launchers.
|
||||
|
||||
Returns:
|
||||
Whether we're running in the main process.
|
||||
"""
|
||||
# If PyTorch distributed is already initialized, use it
|
||||
if torch.distributed.is_initialized():
|
||||
return torch.distributed.get_rank() == 0
|
||||
|
||||
# Otherwise check environment variables for global rank
|
||||
# NOTE: need to verify this in SLURM / OpenMPI environments
|
||||
global_rank = int(
|
||||
os.environ.get(
|
||||
"RANK",
|
||||
os.environ.get(
|
||||
"GLOBAL_RANK",
|
||||
os.environ.get(
|
||||
"SLURM_PROCID",
|
||||
os.environ.get(
|
||||
"OMPI_COMM_WORLD_RANK",
|
||||
"0",
|
||||
),
|
||||
),
|
||||
),
|
||||
)
|
||||
)
|
||||
|
||||
return global_rank == 0
|
||||
|
||||
|
||||
class TelemetryManager:
|
||||
"""Manages telemetry collection and transmission"""
|
||||
|
||||
_instance = None
|
||||
_initialized = False
|
||||
|
||||
def __new__(cls):
|
||||
"""
|
||||
Telemetry manager constructor. Creates the singleton instance of this class if
|
||||
it doesn't already exist.
|
||||
"""
|
||||
if cls._instance is None:
|
||||
cls._instance = super(TelemetryManager, cls).__new__(cls)
|
||||
cls._instance._initialized = False
|
||||
|
||||
return cls._instance
|
||||
|
||||
def __init__(self):
|
||||
"""Telemetry manager initializer"""
|
||||
if self._initialized:
|
||||
return
|
||||
|
||||
self.enabled = self._check_telemetry_enabled()
|
||||
|
||||
if self.enabled:
|
||||
self.run_id = str(uuid.uuid4())
|
||||
self.whitelist = self._load_whitelist()
|
||||
|
||||
try:
|
||||
self.system_info = self._get_system_info()
|
||||
except Exception as e: # pylint: disable=broad-exception-caught
|
||||
LOG.warning(f"Error during system info collection: {e}")
|
||||
self.system_info = None
|
||||
|
||||
self._init_posthog()
|
||||
|
||||
# Register shutdown method to flush posthog telemetry
|
||||
atexit.register(self.shutdown)
|
||||
|
||||
self._initialized = True
|
||||
|
||||
@classmethod
|
||||
def get_instance(cls) -> "TelemetryManager":
|
||||
if cls._instance is None:
|
||||
cls._instance = TelemetryManager()
|
||||
|
||||
return cls._instance
|
||||
|
||||
def _check_telemetry_enabled(self) -> bool:
|
||||
"""
|
||||
Check if telemetry is enabled based on environment variables. We also check
|
||||
whether this is the main process (for the distributed setting and to avoid
|
||||
sending duplicate PostHog events per GPU).
|
||||
|
||||
Note: This is enabled by default on an opt-out basis. Set
|
||||
`AXOLOTL_DO_NOT_TRACK=1` to disable telemetry. For more details, see
|
||||
https://axolotl-ai-cloud.github.io/axolotl/docs/telemetry.html.
|
||||
|
||||
Returns:
|
||||
Boolean denoting whether telemetry is enabled or not.
|
||||
"""
|
||||
# Parse relevant env vars
|
||||
axolotl_do_not_track = os.getenv("AXOLOTL_DO_NOT_TRACK")
|
||||
do_not_track = os.getenv("DO_NOT_TRACK")
|
||||
|
||||
# Default to enabled (opt-out model)
|
||||
if axolotl_do_not_track is None or axolotl_do_not_track.lower() not in (
|
||||
"0",
|
||||
"1",
|
||||
"false",
|
||||
"true",
|
||||
):
|
||||
# Print opt-out info message for main process only
|
||||
if is_main_process():
|
||||
LOG.warning(OPT_OUT_WARNING)
|
||||
time.sleep(OPT_OUT_WARNING_SLEEP_SECONDS)
|
||||
|
||||
return True
|
||||
|
||||
# Only rank 0 will send telemetry
|
||||
if not is_main_process():
|
||||
return False
|
||||
|
||||
if do_not_track is None:
|
||||
do_not_track = "0"
|
||||
|
||||
# Respect AXOLOTL_DO_NOT_TRACK, DO_NOT_TRACK if enabled
|
||||
enabled = axolotl_do_not_track.lower() not in (
|
||||
"1",
|
||||
"true",
|
||||
) and do_not_track.lower() not in ("1", "true")
|
||||
|
||||
return enabled
|
||||
|
||||
def _load_whitelist(self) -> dict:
|
||||
"""Load HuggingFace Hub organization whitelist"""
|
||||
with open(WHITELIST_PATH, encoding="utf-8") as f:
|
||||
whitelist = yaml.safe_load(f)
|
||||
|
||||
# Send org strings to lowercase since model names are case insensitive
|
||||
whitelist["organizations"] = {
|
||||
org.lower() for org in whitelist["organizations"]
|
||||
}
|
||||
|
||||
return whitelist
|
||||
|
||||
def _is_whitelisted(self, value: str) -> bool:
|
||||
"""
|
||||
Check if model / dataset / etc. org is in whitelist.
|
||||
|
||||
Args:
|
||||
value: Value for one of `axolotl.telemetry.manager.FIELDS_WITH_ORGS`
|
||||
("base_model", etc.).
|
||||
|
||||
Returns:
|
||||
Boolean indicating whitelist membership.
|
||||
"""
|
||||
# NOTE: This membership-checking logic can be improved.
|
||||
# What happens when a local model path matches a whitelisted org?
|
||||
parts = value.split("/")
|
||||
if len(parts) < 2:
|
||||
return False
|
||||
org = parts[0]
|
||||
whitelisted = org.lower() in self.whitelist["organizations"]
|
||||
|
||||
return whitelisted
|
||||
|
||||
def _init_posthog(self):
|
||||
"""Initialize PostHog client"""
|
||||
posthog.api_key = POSTHOG_WRITE_KEY
|
||||
posthog.project_api_key = POSTHOG_WRITE_KEY
|
||||
posthog.host = POSTHOG_HOST
|
||||
|
||||
def _redact_paths(self, properties: dict[str, Any]) -> dict[str, Any]:
|
||||
"""
|
||||
Redact properties to remove any paths, so as to avoid inadvertently collecting
|
||||
private or personally identifiable information (PII). We also remove
|
||||
information related to Wandb, MLflow, etc. configuration.
|
||||
|
||||
Args:
|
||||
properties: Dictionary of properties to redact.
|
||||
|
||||
Returns:
|
||||
Properties dictionary with redaction applied.
|
||||
"""
|
||||
if not properties:
|
||||
return {}
|
||||
|
||||
def redact_value(value: Any, key: str = "") -> Any:
|
||||
"""Recursively sanitize values, redacting those with path-like keys"""
|
||||
if isinstance(key, str) and isinstance(value, str):
|
||||
# Other redaction special cases
|
||||
if (
|
||||
key in FIELDS_TO_REDACT
|
||||
or any(prefix in key for prefix in PREFIXES_TO_REDACT)
|
||||
or any(indicator in key.lower() for indicator in PATH_INDICATORS)
|
||||
):
|
||||
# Fields with whitelisted orgs don't need to be redacted
|
||||
if not self._is_whitelisted(value):
|
||||
return "[REDACTED]"
|
||||
|
||||
# Handle nested values
|
||||
if isinstance(value, dict):
|
||||
return {k: redact_value(v, k) for k, v in value.items()}
|
||||
if isinstance(value, list):
|
||||
return [redact_value(item) for item in value]
|
||||
|
||||
return value
|
||||
|
||||
# Create new dict with redacted values
|
||||
redacted = {k: redact_value(v, k) for k, v in properties.items()}
|
||||
|
||||
return redacted
|
||||
|
||||
def _get_system_info(self) -> dict[str, Any]:
|
||||
"""Collect system information for various hardware accelerators"""
|
||||
gpu_info = []
|
||||
accelerator_type = "none"
|
||||
|
||||
# NVIDIA GPUs
|
||||
if torch.cuda.is_available():
|
||||
accelerator_type = "cuda"
|
||||
for i in range(torch.cuda.device_count()):
|
||||
gpu_info.append(
|
||||
{
|
||||
"name": torch.cuda.get_device_name(i),
|
||||
"memory": torch.cuda.get_device_properties(i).total_memory,
|
||||
}
|
||||
)
|
||||
|
||||
# AMD GPUs
|
||||
elif hasattr(torch, "hip") and torch.hip.is_available():
|
||||
accelerator_type = "hip"
|
||||
for i in range(torch.hip.device_count()):
|
||||
gpu_info.append(
|
||||
{
|
||||
"name": torch.hip.get_device_name(i),
|
||||
"memory": (
|
||||
torch.hip.get_device_properties(i).total_memory
|
||||
if hasattr(torch.hip, "get_device_properties")
|
||||
else None
|
||||
),
|
||||
}
|
||||
)
|
||||
|
||||
# Apple Silicon
|
||||
elif hasattr(torch.backends, "mps") and torch.backends.mps.is_available():
|
||||
accelerator_type = "mps"
|
||||
gpu_info.append(
|
||||
{
|
||||
"name": "Apple Silicon",
|
||||
# NOTE: this is memory allocated to this process, not total memory
|
||||
"memory": torch.mps.driver_allocated_memory(),
|
||||
}
|
||||
)
|
||||
|
||||
# Intel GPUs
|
||||
elif hasattr(torch, "xpu") and torch.xpu.is_available():
|
||||
accelerator_type = "xpu"
|
||||
for i in range(torch.xpu.device_count()):
|
||||
memory = None
|
||||
if hasattr(torch.xpu, "get_device_properties"):
|
||||
memory = torch.xpu.get_device_properties(i).total_memory
|
||||
|
||||
gpu_info.append(
|
||||
{
|
||||
"name": torch.xpu.get_device_name(i),
|
||||
"memory": memory,
|
||||
}
|
||||
)
|
||||
|
||||
# NPUs
|
||||
elif hasattr(torch, "npu") and torch.npu.is_available():
|
||||
accelerator_type = "npu"
|
||||
for i in range(torch.npu.device_count()):
|
||||
memory = None
|
||||
if hasattr(torch.npu, "get_device_properties"):
|
||||
memory = torch.npu.get_device_properties(i).total_memory
|
||||
|
||||
gpu_info.append(
|
||||
{
|
||||
"name": torch.npu.get_device_name(i),
|
||||
"memory": memory,
|
||||
}
|
||||
)
|
||||
|
||||
# Get relevant package versions
|
||||
installed_packages = {}
|
||||
for package in RELEVANT_PACKAGES:
|
||||
try:
|
||||
version = importlib.metadata.version(package)
|
||||
installed_packages[f"{package}_version"] = version
|
||||
except importlib.metadata.PackageNotFoundError:
|
||||
pass
|
||||
|
||||
return {
|
||||
"os": platform.system(),
|
||||
"python_version": platform.python_version(),
|
||||
"cpu_count": psutil.cpu_count(),
|
||||
"memory_total": psutil.virtual_memory().total,
|
||||
"accelerator_type": accelerator_type,
|
||||
"accelerator_count": len(gpu_info),
|
||||
"accelerator_info": gpu_info,
|
||||
**installed_packages,
|
||||
}
|
||||
|
||||
def send_event(self, event_type: str, properties: dict[str, Any] | None = None):
|
||||
"""Send a telemetry event"""
|
||||
if not self.enabled:
|
||||
return
|
||||
|
||||
if properties is None:
|
||||
properties = {}
|
||||
|
||||
# Sanitize properties to remove PII
|
||||
properties = self._redact_paths(properties)
|
||||
|
||||
# Wrap PostHog errors in try / except to not raise errors during Axolotl usage
|
||||
try:
|
||||
# Send event via PostHog
|
||||
posthog.capture(
|
||||
distinct_id=self.run_id,
|
||||
event=event_type,
|
||||
properties=properties,
|
||||
disable_geoip=True,
|
||||
)
|
||||
except Exception as e: # pylint: disable=broad-exception-caught
|
||||
LOG.warning(f"Failed to send telemetry event: {e}")
|
||||
|
||||
# Additionally, send system info telemetry when loading config.
|
||||
# NOTE: Is this the best place for this?
|
||||
if event_type == "config-loaded":
|
||||
self.send_system_info()
|
||||
|
||||
def send_system_info(self):
|
||||
"""Helper method for sending system info"""
|
||||
if self.system_info is not None:
|
||||
self.send_event(event_type="system-info", properties=self.system_info)
|
||||
|
||||
def shutdown(self):
|
||||
"""Ensure all queued events are processed before shutdown"""
|
||||
if self.enabled:
|
||||
posthog.shutdown()
|
||||
210
src/axolotl/telemetry/runtime_metrics.py
Normal file
210
src/axolotl/telemetry/runtime_metrics.py
Normal file
@@ -0,0 +1,210 @@
|
||||
"""Telemetry utilities for runtime and memory metrics."""
|
||||
|
||||
import logging
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any
|
||||
|
||||
import psutil
|
||||
import torch
|
||||
|
||||
from axolotl.telemetry.manager import TelemetryManager
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class RuntimeMetrics:
|
||||
"""Container for runtime metrics to be tracked throughout training."""
|
||||
|
||||
# Timing metrics
|
||||
start_time: float
|
||||
epoch_start_times: dict[int, float] = field(init=False)
|
||||
epoch_end_times: dict[int, float] = field(init=False)
|
||||
|
||||
# Memory metrics
|
||||
peak_cpu_memory: int = 0
|
||||
peak_gpu_memory: dict[int, int] = field(init=False)
|
||||
|
||||
# Progress metrics
|
||||
total_steps: int = 0
|
||||
current_epoch: int = 0
|
||||
current_step: int = 0
|
||||
|
||||
def __post_init__(self):
|
||||
"""Initialize empty metric mappings."""
|
||||
self.epoch_start_times = {}
|
||||
self.epoch_end_times = {}
|
||||
self.peak_gpu_memory = {}
|
||||
|
||||
@property
|
||||
def elapsed_time(self) -> float:
|
||||
"""Calculate total elapsed time in seconds."""
|
||||
return time.time() - self.start_time
|
||||
|
||||
def epoch_time(self, epoch: int) -> float | None:
|
||||
"""Calculate time taken for a specific epoch in seconds."""
|
||||
if epoch in self.epoch_start_times and epoch in self.epoch_end_times:
|
||||
return self.epoch_end_times[epoch] - self.epoch_start_times[epoch]
|
||||
|
||||
return None
|
||||
|
||||
def average_epoch_time(self) -> float | None:
|
||||
"""Calculate average time per epoch in seconds."""
|
||||
completed_epochs = [
|
||||
epoch for epoch in self.epoch_start_times if epoch in self.epoch_end_times
|
||||
]
|
||||
if not completed_epochs:
|
||||
return None
|
||||
|
||||
total_time = 0.0
|
||||
for epoch in completed_epochs:
|
||||
epoch_time = self.epoch_time(epoch)
|
||||
if epoch_time is not None: # Check to avoid mypy warning
|
||||
total_time += epoch_time
|
||||
|
||||
return total_time / len(completed_epochs)
|
||||
|
||||
def steps_per_second(self) -> float | None:
|
||||
"""Calculate average steps per second across all training."""
|
||||
if self.total_steps == 0 or self.elapsed_time == 0:
|
||||
return None
|
||||
|
||||
return self.total_steps / self.elapsed_time
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
"""Convert metrics to a dictionary for telemetry reporting."""
|
||||
metrics = {
|
||||
"total_time_seconds": self.elapsed_time,
|
||||
"total_steps": self.total_steps,
|
||||
"steps_per_second": self.steps_per_second(),
|
||||
"epochs_completed": len(
|
||||
[
|
||||
epoch
|
||||
for epoch in self.epoch_start_times
|
||||
if epoch in self.epoch_end_times
|
||||
]
|
||||
),
|
||||
"peak_cpu_memory_bytes": self.peak_cpu_memory,
|
||||
}
|
||||
|
||||
# Add per-epoch timing if available
|
||||
epoch_times: dict[str, float] = {}
|
||||
for epoch in sorted(self.epoch_end_times.keys()):
|
||||
time_taken = self.epoch_time(epoch)
|
||||
if time_taken is not None:
|
||||
epoch_times[f"epoch_{epoch}_seconds"] = time_taken
|
||||
|
||||
if epoch_times:
|
||||
metrics["epoch_times"] = epoch_times # type: ignore
|
||||
metrics["average_epoch_time_seconds"] = self.average_epoch_time()
|
||||
|
||||
# Add GPU memory metrics if available
|
||||
if self.peak_gpu_memory:
|
||||
gpu_metrics: dict[str, int] = {}
|
||||
for gpu_id, memory in self.peak_gpu_memory.items():
|
||||
gpu_metrics[f"gpu_{gpu_id}_peak_memory_bytes"] = memory
|
||||
metrics["gpu_memory"] = gpu_metrics # type: ignore
|
||||
|
||||
return metrics
|
||||
|
||||
|
||||
class RuntimeMetricsTracker:
|
||||
"""Tracker for runtime metrics during training."""
|
||||
|
||||
update_interval = 100
|
||||
|
||||
def __init__(self):
|
||||
"""Initialize the runtime metrics tracker."""
|
||||
self.metrics = RuntimeMetrics(start_time=time.time())
|
||||
self.telemetry_manager = TelemetryManager.get_instance()
|
||||
self._process = psutil.Process()
|
||||
|
||||
def start_epoch(self, epoch: int):
|
||||
"""Record the start of a new epoch."""
|
||||
self.metrics.current_epoch = epoch
|
||||
self.metrics.epoch_start_times[epoch] = time.time()
|
||||
self.update_memory_metrics()
|
||||
|
||||
def end_epoch(self, epoch: int):
|
||||
"""Record the end of an epoch."""
|
||||
self.metrics.epoch_end_times[epoch] = time.time()
|
||||
|
||||
def update_step(self, step: int):
|
||||
"""Update the current step count."""
|
||||
self.metrics.current_step = step
|
||||
self.metrics.total_steps += 1
|
||||
|
||||
# Periodically update memory metrics
|
||||
if step % self.update_interval == 0:
|
||||
self.update_memory_metrics()
|
||||
|
||||
def _get_allocated_memory(self) -> dict[int, int]:
|
||||
"""
|
||||
Helper function for getting accelerator-agnostic allocated memory.
|
||||
|
||||
Returns:
|
||||
A dictionary mapping device IDs to allocated memory in bytes
|
||||
"""
|
||||
memory_used: dict[int, int] = {}
|
||||
|
||||
# NVIDIA GPUs
|
||||
if torch.cuda.is_available():
|
||||
for i in range(torch.cuda.device_count()):
|
||||
memory_used[i] = torch.cuda.memory_allocated(i)
|
||||
|
||||
# AMD GPUs
|
||||
elif hasattr(torch, "hip") and torch.hip.is_available():
|
||||
for i in range(torch.hip.device_count()):
|
||||
if hasattr(torch.hip, "memory_allocated"):
|
||||
memory_used[i] = torch.hip.memory_allocated(i)
|
||||
|
||||
# Apple Silicon
|
||||
elif hasattr(torch.backends, "mps") and torch.backends.mps.is_available():
|
||||
# MPS doesn't have per-device memory stats since there's only one device
|
||||
if hasattr(torch.mps, "current_allocated_memory"):
|
||||
memory_used[0] = torch.mps.current_allocated_memory()
|
||||
|
||||
# Intel GPUs
|
||||
elif hasattr(torch, "xpu") and torch.xpu.is_available():
|
||||
for i in range(torch.xpu.device_count()):
|
||||
if hasattr(torch.xpu, "memory_allocated"):
|
||||
memory_used[i] = torch.xpu.memory_allocated(i)
|
||||
|
||||
# NPUs
|
||||
elif hasattr(torch, "npu") and torch.npu.is_available():
|
||||
for i in range(torch.npu.device_count()):
|
||||
if hasattr(torch.npu, "memory_allocated"):
|
||||
memory_used[i] = torch.npu.memory_allocated(i)
|
||||
|
||||
return memory_used
|
||||
|
||||
def update_memory_metrics(self):
|
||||
"""Update peak memory usage metrics."""
|
||||
# CPU memory
|
||||
cpu_memory = self._process.memory_info().rss
|
||||
self.metrics.peak_cpu_memory = max(self.metrics.peak_cpu_memory, cpu_memory)
|
||||
|
||||
# GPU memory (if available)
|
||||
memory_used = self._get_allocated_memory()
|
||||
for i, memory in memory_used.items():
|
||||
self.metrics.peak_gpu_memory[i] = max(
|
||||
self.metrics.peak_gpu_memory.get(i, 0), memory
|
||||
)
|
||||
|
||||
def get_memory_metrics(self) -> dict[str, Any]:
|
||||
"""Get the current memory metrics as a dictionary."""
|
||||
memory_metrics = {
|
||||
"cpu_memory_bytes": self._process.memory_info().rss,
|
||||
"peak_cpu_memory_bytes": self.metrics.peak_cpu_memory,
|
||||
}
|
||||
|
||||
# GPU memory (if available)
|
||||
memory_used = self._get_allocated_memory()
|
||||
for i, memory in memory_used.items():
|
||||
memory_metrics[f"gpu_{i}_memory_bytes"] = memory
|
||||
memory_metrics[f"gpu_{i}_peak_memory_bytes"] = (
|
||||
self.metrics.peak_gpu_memory.get(i, 0)
|
||||
)
|
||||
|
||||
return memory_metrics
|
||||
33
src/axolotl/telemetry/whitelist.yaml
Normal file
33
src/axolotl/telemetry/whitelist.yaml
Normal file
@@ -0,0 +1,33 @@
|
||||
organizations:
|
||||
- "axolotl-ai-co"
|
||||
- "meta-llama"
|
||||
- "huggingface"
|
||||
- "nvidia"
|
||||
- "facebook"
|
||||
- "google"
|
||||
- "microsoft"
|
||||
- "deepseek-ai"
|
||||
- "HuggingFaceTB"
|
||||
- "mistralai"
|
||||
- "Qwen"
|
||||
- "unsloth"
|
||||
- "NousResearch"
|
||||
- "allenai"
|
||||
- "amd"
|
||||
- "tiiuae"
|
||||
- "tencent"
|
||||
- "zai-org"
|
||||
- "openai"
|
||||
- "ibm-granite"
|
||||
- "arcee-ai"
|
||||
- "swiss-ai"
|
||||
- "CohereForAI"
|
||||
- "deepcogito"
|
||||
- "THUDM"
|
||||
- "ai21labs"
|
||||
- "LiquidAI"
|
||||
- "canopylabs"
|
||||
- "state-spaces"
|
||||
- "mistral-community"
|
||||
- "llava-hf"
|
||||
- "ByteDance-Seed"
|
||||
@@ -31,6 +31,8 @@ from axolotl.contribs.lgpl import ( # pylint: disable = no-name-in-module
|
||||
)
|
||||
from axolotl.integrations.base import PluginManager
|
||||
from axolotl.loaders import ModelLoader, load_processor, load_tokenizer
|
||||
from axolotl.telemetry.errors import send_errors
|
||||
from axolotl.telemetry.manager import TelemetryManager
|
||||
from axolotl.utils.ctx_managers.sequence_parallel import SequenceParallelContextManager
|
||||
from axolotl.utils.dict import DictDefault
|
||||
from axolotl.utils.distributed import cleanup_distributed
|
||||
@@ -45,6 +47,9 @@ if typing.TYPE_CHECKING:
|
||||
|
||||
LOG = get_logger(__name__)
|
||||
|
||||
TELEMETRY_MANAGER = TelemetryManager.get_instance()
|
||||
PLUGIN_MANAGER = PluginManager.get_instance()
|
||||
|
||||
|
||||
def setup_model_and_tokenizer(
|
||||
cfg: DictDefault,
|
||||
@@ -62,7 +67,10 @@ def setup_model_and_tokenizer(
|
||||
`None`), and processor (if multimodal, else `None`).
|
||||
"""
|
||||
# Load tokenizer
|
||||
LOG.debug(f"Loading tokenizer... {cfg.tokenizer_config or cfg.base_model_config}")
|
||||
LOG.debug(
|
||||
f"loading tokenizer... {cfg.tokenizer_config or cfg.base_model_config}",
|
||||
main_process_only=True,
|
||||
)
|
||||
tokenizer = load_tokenizer(cfg)
|
||||
|
||||
# Load processor for multimodal models if needed
|
||||
@@ -78,6 +86,14 @@ def setup_model_and_tokenizer(
|
||||
if model.generation_config is not None:
|
||||
model.generation_config.do_sample = True
|
||||
|
||||
TELEMETRY_MANAGER.send_event(
|
||||
event_type="model-load", properties=model.config.to_dict()
|
||||
)
|
||||
if peft_config:
|
||||
TELEMETRY_MANAGER.send_event(
|
||||
event_type="peft-config-load", properties=peft_config.to_dict()
|
||||
)
|
||||
|
||||
# Apply freezing if specified
|
||||
if cfg.unfrozen_parameters:
|
||||
freeze_layers_except(model, cfg.unfrozen_parameters)
|
||||
@@ -196,8 +212,7 @@ def execute_training(
|
||||
LOG.info("Starting trainer...")
|
||||
trainer.train(resume_from_checkpoint=resume_from_checkpoint)
|
||||
|
||||
plugin_manager = PluginManager.get_instance()
|
||||
plugin_manager.post_train(cfg, trainer.model)
|
||||
PLUGIN_MANAGER.post_train(cfg, trainer.model)
|
||||
|
||||
|
||||
def save_trained_model(
|
||||
@@ -521,9 +536,7 @@ def setup_model_and_trainer(
|
||||
model_ref=model_ref,
|
||||
peft_config=peft_config,
|
||||
)
|
||||
|
||||
plugin_manager = PluginManager.get_instance()
|
||||
plugin_manager.post_trainer_create(cfg, trainer)
|
||||
PLUGIN_MANAGER.post_trainer_create(cfg, trainer)
|
||||
|
||||
if cfg.use_ray:
|
||||
try:
|
||||
@@ -545,6 +558,7 @@ def setup_model_and_trainer(
|
||||
)
|
||||
|
||||
|
||||
@send_errors
|
||||
def train(
|
||||
cfg: DictDefault, dataset_meta: TrainDatasetMeta
|
||||
) -> tuple[PeftModel | PreTrainedModel, PreTrainedTokenizer, Trainer]:
|
||||
@@ -595,5 +609,6 @@ def train(
|
||||
create_model_card(cfg, trainer)
|
||||
if not cfg.use_ray:
|
||||
cleanup_distributed()
|
||||
PLUGIN_MANAGER.post_train(cfg, model)
|
||||
|
||||
return model, tokenizer, trainer
|
||||
|
||||
@@ -1069,7 +1069,7 @@ class AxolotlInputConfig(
|
||||
|
||||
|
||||
class AxolotlConfigWCapabilities(AxolotlInputConfig):
|
||||
"""wrapper to valdiate GPU capabilities with the configured options"""
|
||||
"""Wrapper to valdiate GPU capabilities with the configured options"""
|
||||
|
||||
capabilities: GPUCapabilities
|
||||
env_capabilities: EnvCapabilities
|
||||
|
||||
Reference in New Issue
Block a user