from __future__ import annotations import logging from dataclasses import dataclass, field from .conversation_store import ConversationStore, HARD_CAP from .operational_store import OperationalStore from .knowledge_store import KnowledgeStore logger = logging.getLogger(__name__) SUMMARIZE_THRESHOLD = 50 @dataclass class MasterContext: user_id: int conversation: list = field(default_factory=list) operational_findings: list = field(default_factory=list) knowledge: dict = field(default_factory=dict) pending_approvals: list = field(default_factory=list) active_directives: list = field(default_factory=list) class MemoryManager: def __init__(self, pool, llm_router=None): self._pool = pool self._llm = llm_router self._conv = ConversationStore(pool) self._ops = OperationalStore(pool) self._know = KnowledgeStore(pool) # ------------------------------------------------------------------ # Tier 1 — Conversation # ------------------------------------------------------------------ async def append_message(self, user_id, role, content, directive_id=None): await self._conv.append(user_id, role, content, directive_id) await self.summarize_if_needed(user_id) async def get_conversation(self, user_id, limit=50): return await self._conv.get(user_id, limit) async def summarize_if_needed(self, user_id): count = await self._conv.count(user_id) if count < SUMMARIZE_THRESHOLD: return if count >= HARD_CAP - 10: logger.warning('Conversation memory near hard cap for user_id=%s count=%d', user_id, count) if self._llm and count >= SUMMARIZE_THRESHOLD: recent = await self._conv.get(user_id, limit=SUMMARIZE_THRESHOLD) if not recent: return history_text = chr(10).join(m['role'] + ': ' + m['content'] for m in recent[:20]) messages = [ {'role': 'system', 'content': 'Summarize this conversation history in 3-5 sentences, preserving key decisions and context.'}, {'role': 'user', 'content': history_text} ] try: resp = await self._llm.submit(messages, caller='memory_manager') summary = resp.content await self._conv.append(user_id, 'system', f'[SUMMARY] {summary}', is_summary=True) await self._conv.prune_old(user_id, keep=20) logger.info('Conversation summarized for user_id=%s', user_id) except Exception as exc: logger.error('Conversation summarization failed user_id=%s: %s', user_id, exc) # ------------------------------------------------------------------ # Tier 2 — Operational # ------------------------------------------------------------------ async def store_findings(self, scope, summary, raw_data=None, ttl_days=90, source_directive_id=None): await self._ops.store(scope, summary, raw_data, ttl_days, source_directive_id) async def get_recent_findings(self, scope, limit=10): return await self._ops.get_recent(scope, limit) async def prune_expired_operational(self): await self._ops.prune_expired() # ------------------------------------------------------------------ # Tier 3 — Long-term Knowledge # ------------------------------------------------------------------ async def upsert_knowledge(self, entity_type, entity_key, facts): await self._know.upsert(entity_type, entity_key, facts) async def get_knowledge(self, entity_type, entity_key): return await self._know.get(entity_type, entity_key) async def get_client_profile(self, partner_id): return await self._know.get_client_profile(partner_id) # ------------------------------------------------------------------ # Context assembly — called before every Master LLM call # ------------------------------------------------------------------ async def build_context(self, user_id, intent_hint=None): conversation = await self._conv.get(user_id, limit=50) ops_scope = intent_hint or 'general' operational_findings = await self._ops.get_recent(ops_scope, limit=5) knowledge = {} if intent_hint: try: knowledge = await self._know.get('user', str(user_id)) except Exception: pass pending_approvals = await self._get_pending_approvals(user_id) active_directives = await self._get_active_directives(user_id) return MasterContext( user_id=user_id, conversation=conversation, operational_findings=operational_findings, knowledge=knowledge, pending_approvals=pending_approvals, active_directives=active_directives, ) async def _get_pending_approvals(self, user_id): try: async with self._pool.acquire(timeout=10) as conn: rows = await conn.fetch( """SELECT directive_id, escalations FROM ab_directive_log WHERE user_id = $1 AND status = 'awaiting_approval' ORDER BY started_at DESC LIMIT 10""", user_id) return [dict(r) for r in rows] except Exception: return [] async def _get_active_directives(self, user_id): try: async with self._pool.acquire(timeout=10) as conn: rows = await conn.fetch( """SELECT directive_id, intent_summary, status, started_at FROM ab_directive_log WHERE user_id = $1 AND status IN ('pending', 'processing') ORDER BY started_at DESC LIMIT 5""", user_id) return [dict(r) for r in rows] except Exception: return [] async def summarize_long_conversations(self): try: async with self._pool.acquire(timeout=10) as conn: rows = await conn.fetch( """SELECT user_id, COUNT(*) as n FROM ab_conversation_memory WHERE is_summary = false GROUP BY user_id HAVING COUNT(*) > $1""", SUMMARIZE_THRESHOLD) for row in rows: await self.summarize_if_needed(row['user_id']) except Exception as exc: logger.error('summarize_long_conversations failed: %s', exc)