From 4ca62ee54b01e04292206b3b7e009218879193c2 Mon Sep 17 00:00:00 2001 From: ActiveBlue Build Date: Sun, 12 Apr 2026 17:17:44 -0400 Subject: [PATCH] feat: add Master AI with directive builder and synthesis --- agent_service/agents/master_agent.py | 265 ++++++++++++++++++++++++ agent_service/prompts/master_system.txt | 35 ++++ 2 files changed, 300 insertions(+) create mode 100644 agent_service/agents/master_agent.py create mode 100644 agent_service/prompts/master_system.txt diff --git a/agent_service/agents/master_agent.py b/agent_service/agents/master_agent.py new file mode 100644 index 0000000..075e14a --- /dev/null +++ b/agent_service/agents/master_agent.py @@ -0,0 +1,265 @@ +from __future__ import annotations +import asyncio, json, logging, os, time, uuid +from dataclasses import dataclass, field +from .base_agent import AgentDirective, DirectiveContext, AgentReport +from .peer_bus import PeerBus +from ..memory.memory_manager import MemoryManager, MasterContext + +logger = logging.getLogger(__name__) + + +@dataclass +class IntentResult: + needs_clarification: bool = False + clarification_question: object = None + is_continuation: bool = False + agents: list = field(default_factory=list) + intent_summary: str = '' + params: dict = field(default_factory=dict) + context_hints: list = field(default_factory=list) + + +@dataclass +class AccessResult: + allowed: bool + denied_agents: list = field(default_factory=list) + reason: str = '' + + +@dataclass +class MasterResponse: + directive_id: str + response: str + status: str + escalations: list = field(default_factory=list) + actions_taken: list = field(default_factory=list) + + +def _load_prompt(filename): + base = os.path.dirname(__file__) + path = os.path.join(base, '..', 'prompts', filename) + try: + with open(path) as f: + return f.read() + except FileNotFoundError: + logger.warning('Prompt file not found: %s', path) + return '' + + +AGENT_ACCESS_GROUPS = { + 'finance_agent': 'account.group_account_user', + 'accounting_agent': 'account.group_account_user', + 'sales_agent': 'sales_team.group_sale_salesman', + 'project_agent': 'project.group_project_user', + 'expenses_agent': 'hr_expense.group_hr_expense_user', + 'employees_agent': 'hr.group_hr_user', +} + + +class MasterAgent: + def __init__(self, odoo, llm, memory: MemoryManager, registry): + self._odoo = odoo + self._llm = llm + self._memory = memory + self._registry = registry + + def build_system_prompt(self, active_agents): + template = _load_prompt('master_system.txt') + agent_list = chr(10).join( + f'- {a["agent_key"]}: {a["capabilities_summary"]}' for a in active_agents) + return template.format(agent_list=agent_list) + + async def handle_message(self, user_id, channel_id, message, directive_id) -> MasterResponse: + logger.info('MasterAgent.handle_message user_id=%s directive=%s', user_id, directive_id) + t0 = time.monotonic() + await self._log_directive_start(directive_id, user_id, channel_id, message) + try: + context = await self._build_context(user_id, message) + intent = await self._classify_intent(context, message) + if intent.needs_clarification: + q = intent.clarification_question or 'Could you provide more details?' + await self._memory.append_message(user_id, 'assistant', q, directive_id) + await self._log_directive_complete(directive_id, 'awaiting_clarification', q) + return MasterResponse(directive_id=directive_id, response=q, + status='awaiting_clarification') + access = await self._check_access(user_id, intent.agents) + if not access.allowed: + msg = f'You don' + chr(39) + 't have access to: {chr(44).join(access.denied_agents)}.' + await self._memory.append_message(user_id, 'assistant', msg, directive_id) + await self._log_directive_complete(directive_id, 'failed', msg) + return MasterResponse(directive_id=directive_id, response=msg, status='failed') + directives = await self._build_directives(intent, context, directive_id) + reports = await self._dispatch_agents(directives) + response_text = await self._synthesize(reports, context) + await self._update_memory(user_id, message, response_text, reports, directive_id) + all_escalations = [] + all_actions = [] + status = 'complete' + for r in reports: + all_escalations.extend(r.escalations) + all_actions.extend(r.actions_taken) + if r.status in ('failed', 'escalated'): + status = 'partial' if status == 'complete' else status + if all_escalations: + status = 'awaiting_approval' + await self._log_directive_complete(directive_id, status, response_text, all_actions, all_escalations) + ms = int((time.monotonic() - t0) * 1000) + logger.info('MasterAgent complete directive=%s status=%s ms=%d', directive_id, status, ms) + return MasterResponse(directive_id=directive_id, response=response_text, + status=status, escalations=all_escalations, actions_taken=all_actions) + except Exception as exc: + logger.error('MasterAgent FAILED directive=%s: %s', directive_id, exc) + err_msg = 'I encountered an error processing your request. Please try again or contact your administrator.' + await self._log_directive_complete(directive_id, 'failed', err_msg, error=str(exc)) + return MasterResponse(directive_id=directive_id, response=err_msg, status='failed') + + async def _build_context(self, user_id, message) -> MasterContext: + hint = message[:40] if message else None + return await self._memory.build_context(user_id, intent_hint=hint) + + async def _classify_intent(self, context: MasterContext, message) -> IntentResult: + active_agents = await self._registry.get_active_agents() + system = self.build_system_prompt(active_agents) + history = [ + {'role': m['role'] if m['role'] != 'system' else 'user', 'content': m['content']} + for m in context.conversation[-20:] + if m['role'] in ('user', 'assistant') + ] + msgs = [{'role': 'system', 'content': system}, *history, + {'role': 'user', 'content': message}] + resp = await self._llm.submit(msgs, caller='master') + try: + raw = resp.content.strip() + if raw.startswith(chr(96)*3): + raw = raw.split(chr(10), 1)[1].rsplit(chr(10), 1)[0] + data = json.loads(raw) + return IntentResult( + needs_clarification=data.get('needs_clarification', False), + clarification_question=data.get('clarification_question'), + is_continuation=data.get('is_continuation', False), + agents=data.get('agents', []), + intent_summary=data.get('intent_summary', ''), + params=data.get('params', {}), + context_hints=data.get('context_hints', [])) + except (json.JSONDecodeError, KeyError) as exc: + logger.warning('Intent classification parse failed: %s', exc) + return IntentResult(needs_clarification=True, + clarification_question='Could you clarify what you need?') + + async def _check_access(self, user_id, agents) -> AccessResult: + denied = [] + try: + user_data = await self._odoo.call('res.users', 'read', [[user_id]], {'fields': ['groups_id']}) + group_ids = user_data[0].get('groups_id', []) if user_data else [] + group_rows = await self._odoo.search_read('res.groups', [['id', 'in', group_ids]], ['full_name']) + user_group_names = {r['full_name'] for r in group_rows} + except Exception as exc: + logger.warning('Access check failed, permitting: %s', exc) + return AccessResult(allowed=True) + for agent_key in agents: + required = AGENT_ACCESS_GROUPS.get(agent_key) + if required and required not in user_group_names: + denied.append(agent_key) + if denied: + return AccessResult(allowed=False, denied_agents=denied) + return AccessResult(allowed=True) + + async def _build_directives(self, intent: IntentResult, context: MasterContext, directive_id) -> list: + directives = [] + for agent_key in intent.agents: + ctx = DirectiveContext( + client_profile=context.knowledge, + recent_findings=context.operational_findings, + conversation_summary=chr(10).join( + m['content'] for m in context.conversation[-5:] if m['role'] == 'assistant'), + peer_data={}) + d = AgentDirective( + directive_id=directive_id, agent=agent_key, task=intent.intent_summary, + params=intent.params, context=ctx, + authorized_actions=['read', 'search', 'report', 'post_chatter', + 'send_email', 'create_non_financial', 'write_non_financial'], + constraints={'max_amount': 5000}) + directives.append(d) + return directives + + async def _dispatch_agents(self, directives) -> list: + if len(directives) == 1: + d = directives[0] + agent = self._registry.get_agent_instance(d.agent) + if agent is None: + return [AgentReport(directive_id=d.directive_id, agent=d.agent, + status='failed', summary=f'Agent {d.agent} not loaded')] + return [await agent.execute(d)] + tasks = [self._registry.get_agent_instance(d.agent).execute(d) + for d in directives if self._registry.get_agent_instance(d.agent)] + results = await asyncio.gather(*tasks, return_exceptions=True) + reports = [] + for i, r in enumerate(results): + if isinstance(r, Exception): + d = directives[i] + reports.append(AgentReport(directive_id=d.directive_id, agent=d.agent, + status='failed', summary=str(r))) + else: + reports.append(r) + return reports + + async def _synthesize(self, reports, context: MasterContext) -> str: + if not reports: + return 'No agent responses received.' + if len(reports) == 1 and reports[0].status == 'complete': + return reports[0].summary + summaries = chr(10).join(f'{r.agent}: {r.summary}' for r in reports) + msg = ('Synthesize these agent reports into one coherent response. ' + 'Business language only. No internal IDs. ' + 'Separate: actions completed, items pending approval, recommendations.' + + chr(10) + summaries) + resp = await self._llm.submit( + [{'role': 'system', 'content': 'You are a business intelligence assistant.'}, + {'role': 'user', 'content': msg}], + caller='master_synthesis') + return resp.content + + async def _update_memory(self, user_id, message, response, reports, directive_id): + await self._memory.append_message(user_id, 'user', message, directive_id) + await self._memory.append_message(user_id, 'assistant', response, directive_id) + for report in reports: + if report.data: + await self._memory.store_findings( + scope=report.agent.replace('_agent', ''), + summary=report.summary, raw_data=report.data, + source_directive_id=directive_id) + + async def handle_approval(self, directive_id, item_id, approved, approver_uid) -> str: + if approved: + return f'Approval recorded. Re-executing approved action for {directive_id}.' + return 'Action rejected and recorded.' + + async def is_continuation(self, user_id, message) -> bool: + recent = await self._memory.get_conversation(user_id, limit=3) + return bool(recent) + + async def _log_directive_start(self, directive_id, user_id, channel_id, message): + try: + pool = self._odoo.pg_pool + if not pool: return + sql = ('INSERT INTO ab_directive_log ' + '(directive_id, user_id, channel_id, raw_message, status) ' + 'VALUES ($1, $2, $3, $4, $5) ON CONFLICT (directive_id) DO NOTHING') + async with pool.acquire(timeout=10) as conn: + await conn.execute(sql, directive_id, user_id, channel_id, message, 'processing') + except Exception as exc: + logger.warning('_log_directive_start failed: %s', exc) + + async def _log_directive_complete(self, directive_id, status, response, + actions=None, escalations=None, error=None): + try: + pool = self._odoo.pg_pool + if not pool: return + sql = ('UPDATE ab_directive_log SET status=$2, final_response=$3, ' + 'actions_taken=$4, escalations=$5, completed_at=NOW(), error=$6 ' + 'WHERE directive_id=$1') + async with pool.acquire(timeout=10) as conn: + await conn.execute(sql, directive_id, status, response, + json.dumps(actions or []), json.dumps(escalations or []), error) + except Exception as exc: + logger.warning('_log_directive_complete failed: %s', exc) diff --git a/agent_service/prompts/master_system.txt b/agent_service/prompts/master_system.txt new file mode 100644 index 0000000..85f3938 --- /dev/null +++ b/agent_service/prompts/master_system.txt @@ -0,0 +1,35 @@ +You are ActiveBlue AI - the central intelligence for Active Blue LLC Odoo instance. +Active Blue is an MSP serving medical and dental practices at 8 locations across +Miami, Dadeland, Tomball, Hollywood, and Miami Lakes (FL and TX). + +Your role: +1. Understand intent from natural language using full conversation context +2. Build precise directives for specialist agents with injected memory context +3. Synthesize agent reports into one coherent user response +4. Update memory with new findings after every interaction + +You are the ONLY entity that communicates with users. +You do NOT act on Odoo directly. + +Active specialist agents: +{agent_list} + +If a user requests something for an agent not listed, tell them the Odoo module is not installed. + +Rules: +- Ask ONE clarifying question if intent is ambiguous - then dispatch +- Confirm multi-step plans before executing +- Surface escalations with approve/reject options +- Never expose agent names, tool names, or system internals to users +- HIPAA: Never include patient names, MRN, DOB, or any PHI in responses + +Classify intent in JSON only: +{ + "needs_clarification": false, + "clarification_question": null, + "is_continuation": false, + "agents": ["finance_agent"], + "intent_summary": "...", + "params": {}, + "context_hints": [] +} \ No newline at end of file