from __future__ import annotations import json import logging import sys import time from typing import Any, Optional _loki_url: Optional[str] = None _service_name = 'activeblue-ai' class JSONFormatter(logging.Formatter): """Emit log records as single-line JSON for Loki/structured log ingestion.""" def format(self, record: logging.LogRecord) -> str: log_entry: dict[str, Any] = { 'ts': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(record.created)), 'level': record.levelname, 'logger': record.name, 'msg': record.getMessage(), 'service': _service_name, } if record.exc_info: log_entry['exception'] = self.formatException(record.exc_info) for key in ('directive_id', 'agent', 'user_id', 'action', 'duration_ms'): val = getattr(record, key, None) if val is not None: log_entry[key] = val return json.dumps(log_entry, default=str) def configure_logging(level: str = 'INFO', fmt: str = 'json', loki_url: str = '') -> None: global _loki_url _loki_url = loki_url or None numeric_level = getattr(logging, level.upper(), logging.INFO) root = logging.getLogger() root.setLevel(numeric_level) root.handlers.clear() handler = logging.StreamHandler(sys.stdout) if fmt == 'json': handler.setFormatter(JSONFormatter()) else: handler.setFormatter(logging.Formatter( '%(asctime)s %(name)-20s %(levelname)-8s %(message)s', )) root.addHandler(handler) logging.getLogger('uvicorn.access').setLevel(logging.WARNING) logging.getLogger('asyncpg').setLevel(logging.WARNING) def get_logger(name: str) -> logging.Logger: return logging.getLogger(name) class _DirectiveAdapter(logging.LoggerAdapter): def process(self, msg, kwargs): kwargs.setdefault('extra', {}) kwargs['extra'].update(self.extra) return msg, kwargs def log_directive_event(logger: logging.Logger, level: str, msg: str, directive_id: str = '', agent: str = '', user_id: str = '', action: str = '', duration_ms: int = 0) -> None: extra = {} if directive_id: extra['directive_id'] = directive_id if agent: extra['agent'] = agent if user_id: extra['user_id'] = user_id if action: extra['action'] = action if duration_ms: extra['duration_ms'] = duration_ms getattr(logger, level.lower(), logger.info)(msg, extra=extra) async def push_to_loki(labels: dict, entries: list[dict]) -> None: if not _loki_url: return try: import httpx streams = [{'stream': labels, 'values': [[str(int(e['ts'] * 1e9)), e['line']] for e in entries]}] payload = {'streams': streams} async with httpx.AsyncClient(timeout=5) as client: await client.post(f'{_loki_url}/loki/api/v1/push', json=payload) except Exception as exc: logging.getLogger(__name__).debug('Loki push failed: %s', exc)