from __future__ import annotations import asyncio, json, logging, os, time, uuid from dataclasses import dataclass, field from .base_agent import AgentDirective, DirectiveContext, AgentReport from .peer_bus import PeerBus from ..memory.memory_manager import MemoryManager, MasterContext logger = logging.getLogger(__name__) @dataclass class IntentResult: needs_clarification: bool = False clarification_question: object = None is_continuation: bool = False agents: list = field(default_factory=list) intent_summary: str = '' params: dict = field(default_factory=dict) context_hints: list = field(default_factory=list) @dataclass class AccessResult: allowed: bool denied_agents: list = field(default_factory=list) reason: str = '' @dataclass class MasterResponse: directive_id: str response: str status: str escalations: list = field(default_factory=list) actions_taken: list = field(default_factory=list) def _load_prompt(filename): base = os.path.dirname(__file__) path = os.path.join(base, '..', 'prompts', filename) try: with open(path) as f: return f.read() except FileNotFoundError: logger.warning('Prompt file not found: %s', path) return '' AGENT_ACCESS_GROUPS = { 'finance_agent': 'account.group_account_user', 'accounting_agent': 'account.group_account_user', 'sales_agent': 'sales_team.group_sale_salesman', 'project_agent': 'project.group_project_user', 'expenses_agent': 'hr_expense.group_hr_expense_user', 'employees_agent': 'hr.group_hr_user', } class MasterAgent: def __init__(self, odoo, llm, memory: MemoryManager, registry): self._odoo = odoo self._llm = llm self._memory = memory self._registry = registry def build_system_prompt(self, active_agents): template = _load_prompt('master_system.txt') agent_list = chr(10).join( f'- {a["agent_key"]}: {a["capabilities_summary"]}' for a in active_agents) # Plain substitution — the template contains literal { } from a JSON # example block, so str.format would treat them as fields. return template.replace('{agent_list}', agent_list) async def handle_message(self, user_id, channel_id, message, directive_id, extra_context: dict = None) -> MasterResponse: try: user_id = int(user_id) except (TypeError, ValueError): pass logger.info('MasterAgent.handle_message user_id=%s directive=%s', user_id, directive_id) t0 = time.monotonic() await self._log_directive_start(directive_id, user_id, channel_id, message) try: # Persist the user message FIRST so it's part of context on every # branch (clarification, access-denied, happy path). Without this, # clarification turns lose the original question and the bot can't # connect follow-up replies to the in-flight conversation. await self._memory.append_message(user_id, 'user', message, directive_id) context = await self._build_context(user_id, message) intent = await self._classify_intent(context, message) if intent.needs_clarification: q = intent.clarification_question or 'Could you provide more details?' await self._memory.append_message(user_id, 'assistant', q, directive_id) await self._log_directive_complete(directive_id, 'awaiting_clarification', q) return MasterResponse(directive_id=directive_id, response=q, status='awaiting_clarification') if not intent.agents: # No specialist agent applies — answer directly with the LLM. response_text = await self._direct_answer(context, message) await self._memory.append_message(user_id, 'assistant', response_text, directive_id) await self._log_directive_complete(directive_id, 'complete', response_text) return MasterResponse(directive_id=directive_id, response=response_text, status='complete') access = await self._check_access(user_id, intent.agents) if not access.allowed: denied = ', '.join(access.denied_agents) msg = "You don't have access to: " + denied + '.' await self._memory.append_message(user_id, 'assistant', msg, directive_id) await self._log_directive_complete(directive_id, 'failed', msg) return MasterResponse(directive_id=directive_id, response=msg, status='failed') directives = await self._build_directives(intent, context, directive_id, user_id=user_id, extra_context=extra_context) reports = await self._dispatch_agents(directives) response_text = await self._synthesize(reports, context) await self._update_memory(user_id, message, response_text, reports, directive_id) all_escalations = [] all_actions = [] status = 'complete' for r in reports: all_escalations.extend(r.escalations) all_actions.extend(r.actions_taken) if r.status in ('failed', 'escalated'): status = 'partial' if status == 'complete' else status if all_escalations: status = 'awaiting_approval' await self._log_directive_complete(directive_id, status, response_text, all_actions, all_escalations) ms = int((time.monotonic() - t0) * 1000) logger.info('MasterAgent complete directive=%s status=%s ms=%d', directive_id, status, ms) return MasterResponse(directive_id=directive_id, response=response_text, status=status, escalations=all_escalations, actions_taken=all_actions) except Exception as exc: logger.exception('MasterAgent FAILED directive=%s: %s', directive_id, exc) err_msg = 'I encountered an error processing your request. Please try again or contact your administrator.' await self._log_directive_complete(directive_id, 'failed', err_msg, error=str(exc)) return MasterResponse(directive_id=directive_id, response=err_msg, status='failed') async def _build_context(self, user_id, message) -> MasterContext: hint = message[:40] if message else None return await self._memory.build_context(user_id, intent_hint=hint) async def _classify_intent(self, context: MasterContext, message) -> IntentResult: active_agents = await self._registry.get_active_agents() system = self.build_system_prompt(active_agents) history = [ {'role': m['role'] if m['role'] != 'system' else 'user', 'content': m['content']} for m in context.conversation[-20:] if m['role'] in ('user', 'assistant') ] msgs = [{'role': 'system', 'content': system}, *history, {'role': 'user', 'content': message}] resp = await self._llm.submit(msgs, caller='master') raw = (resp.content or '').strip() # Strip markdown fences like ```json ... ``` if raw.startswith('```'): raw = raw.strip('`') if raw.lower().startswith('json'): raw = raw[4:] raw = raw.strip() # Pull out the first {...} block if there's surrounding prose. first = raw.find('{') last = raw.rfind('}') if first != -1 and last != -1 and last > first: raw = raw[first:last + 1] try: data = json.loads(raw) if not isinstance(data, dict): raise ValueError(f'expected JSON object, got {type(data).__name__}') return IntentResult( needs_clarification=bool(data.get('needs_clarification', False)), clarification_question=data.get('clarification_question'), is_continuation=bool(data.get('is_continuation', False)), agents=data.get('agents') or [], intent_summary=data.get('intent_summary', '') or '', params=data.get('params') or {}, context_hints=data.get('context_hints') or []) except Exception as exc: logger.warning('Intent classification parse failed (%s); raw=%r', exc, (resp.content or '')[:300]) # Treat unparseable LLM output as 'no specialist agent applies' so # the direct-answer fallback can handle it instead of looping on # clarification. return IntentResult(needs_clarification=False, agents=[]) async def _check_access(self, user_id, agents) -> AccessResult: denied = [] try: user_data = await self._odoo.call('res.users', 'read', [[user_id]], {'fields': ['groups_id']}) user_group_ids = set(user_data[0].get('groups_id', [])) if user_data else set() except Exception as exc: logger.warning('Access check failed, permitting: %s', exc) return AccessResult(allowed=True) for agent_key in agents: required_xml_id = AGENT_ACCESS_GROUPS.get(agent_key) if not required_xml_id: continue try: module, name = required_xml_id.split('.', 1) imd = await self._odoo.search_read( 'ir.model.data', [['module', '=', module], ['name', '=', name], ['model', '=', 'res.groups']], ['res_id']) if not imd or imd[0]['res_id'] not in user_group_ids: denied.append(agent_key) except Exception as exc: logger.warning('Group lookup failed for %s, permitting: %s', required_xml_id, exc) if denied: return AccessResult(allowed=False, denied_agents=denied) return AccessResult(allowed=True) async def _build_directives(self, intent: IntentResult, context: MasterContext, directive_id, user_id=None, extra_context: dict = None) -> list: receipts = (extra_context or {}).get('receipts', []) directives = [] for agent_key in intent.agents: authorized = ['read', 'search', 'report', 'post_chatter', 'send_email', 'create_non_financial', 'write_non_financial'] if receipts: authorized.append('create_expense') ctx = DirectiveContext( client_profile=context.knowledge, recent_findings=context.operational_findings, conversation_summary=chr(10).join( m['content'] for m in context.conversation[-5:] if m['role'] == 'assistant'), peer_data={'requesting_user_id': user_id}, receipts=receipts) d = AgentDirective( directive_id=directive_id, agent=agent_key, task=intent.intent_summary, params=intent.params, context=ctx, authorized_actions=authorized, constraints={'max_amount': 5000}) directives.append(d) return directives async def _dispatch_agents(self, directives) -> list: if len(directives) == 1: d = directives[0] agent = self._registry.get_agent_instance(d.agent) if agent is None: return [AgentReport(directive_id=d.directive_id, agent=d.agent, status='failed', summary=f'Agent {d.agent} not loaded')] return [await agent.execute(d)] tasks = [self._registry.get_agent_instance(d.agent).execute(d) for d in directives if self._registry.get_agent_instance(d.agent)] results = await asyncio.gather(*tasks, return_exceptions=True) reports = [] for i, r in enumerate(results): if isinstance(r, Exception): d = directives[i] reports.append(AgentReport(directive_id=d.directive_id, agent=d.agent, status='failed', summary=str(r))) else: reports.append(r) return reports async def _direct_answer(self, context: MasterContext, message) -> str: """Answer general / off-topic messages directly without dispatching agents.""" history = [ {'role': m['role'], 'content': m['content']} for m in context.conversation[-10:] if m.get('role') in ('user', 'assistant') ] system = ( "You are ActiveBlue AI, the assistant for the Active Blue Odoo " "instance. Answer the user directly and concisely. Use any prior " "conversation for context. Plain text only — no JSON, no markdown " "code fences." ) msgs = [{'role': 'system', 'content': system}, *history, {'role': 'user', 'content': message}] resp = await self._llm.submit(msgs, caller='master_direct') return (resp.content or '').strip() or 'Sorry, I have no answer for that.' async def _synthesize(self, reports, context: MasterContext) -> str: if not reports: return 'No agent responses received.' if len(reports) == 1: return reports[0].summary or '(no summary)' summaries = chr(10).join(f'{r.agent}: {r.summary}' for r in reports) msg = ('Synthesize these agent reports into one coherent response. ' 'Business language only. No internal IDs. ' 'Separate: actions completed, items pending approval, recommendations.' + chr(10) + summaries) try: resp = await self._llm.submit( [{'role': 'system', 'content': 'You are a business intelligence assistant.'}, {'role': 'user', 'content': msg}], caller='master_synthesis') return resp.content or summaries except Exception as exc: logger.warning('_synthesize LLM call failed, falling back to raw summaries: %s', exc) return summaries async def _update_memory(self, user_id, message, response, reports, directive_id): try: await self._memory.append_message(user_id, 'assistant', response or '', directive_id) except Exception as exc: logger.warning('_update_memory: append_message failed: %s', exc) for report in reports: if report.data: try: await self._memory.store_findings( scope=report.agent.replace('_agent', ''), summary=report.summary, raw_data=report.data, source_directive_id=directive_id) except Exception as exc: logger.warning('_update_memory: store_findings failed agent=%s: %s', report.agent, exc) async def handle_approval(self, directive_id, item_id, approved, approver_uid) -> str: if approved: return f'Approval recorded. Re-executing approved action for {directive_id}.' return 'Action rejected and recorded.' async def is_continuation(self, user_id, message) -> bool: recent = await self._memory.get_conversation(user_id, limit=3) return bool(recent) async def _log_directive_start(self, directive_id, user_id, channel_id, message): try: pool = self._odoo.pg_pool if not pool: return sql = ('INSERT INTO ab_directive_log ' '(directive_id, user_id, channel_id, raw_message, status) ' 'VALUES ($1, $2, $3, $4, $5) ON CONFLICT (directive_id) DO NOTHING') async with pool.acquire(timeout=10) as conn: await conn.execute(sql, directive_id, user_id, channel_id or 0, message, 'processing') except Exception as exc: logger.warning('_log_directive_start failed: %s', exc) async def _log_directive_complete(self, directive_id, status, response, actions=None, escalations=None, error=None): try: pool = self._odoo.pg_pool if not pool: return sql = ('UPDATE ab_directive_log SET status=$2, final_response=$3, ' 'actions_taken=$4, escalations=$5, completed_at=NOW(), error=$6 ' 'WHERE directive_id=$1') async with pool.acquire(timeout=10) as conn: await conn.execute(sql, directive_id, status, response, json.dumps(actions or []), json.dumps(escalations or []), error) except Exception as exc: logger.warning('_log_directive_complete failed: %s', exc)