feat: add Master AI with directive builder and synthesis
This commit is contained in:
265
agent_service/agents/master_agent.py
Normal file
265
agent_service/agents/master_agent.py
Normal file
@@ -0,0 +1,265 @@
|
||||
from __future__ import annotations
|
||||
import asyncio, json, logging, os, time, uuid
|
||||
from dataclasses import dataclass, field
|
||||
from .base_agent import AgentDirective, DirectiveContext, AgentReport
|
||||
from .peer_bus import PeerBus
|
||||
from ..memory.memory_manager import MemoryManager, MasterContext
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class IntentResult:
|
||||
needs_clarification: bool = False
|
||||
clarification_question: object = None
|
||||
is_continuation: bool = False
|
||||
agents: list = field(default_factory=list)
|
||||
intent_summary: str = ''
|
||||
params: dict = field(default_factory=dict)
|
||||
context_hints: list = field(default_factory=list)
|
||||
|
||||
|
||||
@dataclass
|
||||
class AccessResult:
|
||||
allowed: bool
|
||||
denied_agents: list = field(default_factory=list)
|
||||
reason: str = ''
|
||||
|
||||
|
||||
@dataclass
|
||||
class MasterResponse:
|
||||
directive_id: str
|
||||
response: str
|
||||
status: str
|
||||
escalations: list = field(default_factory=list)
|
||||
actions_taken: list = field(default_factory=list)
|
||||
|
||||
|
||||
def _load_prompt(filename):
|
||||
base = os.path.dirname(__file__)
|
||||
path = os.path.join(base, '..', 'prompts', filename)
|
||||
try:
|
||||
with open(path) as f:
|
||||
return f.read()
|
||||
except FileNotFoundError:
|
||||
logger.warning('Prompt file not found: %s', path)
|
||||
return ''
|
||||
|
||||
|
||||
AGENT_ACCESS_GROUPS = {
|
||||
'finance_agent': 'account.group_account_user',
|
||||
'accounting_agent': 'account.group_account_user',
|
||||
'sales_agent': 'sales_team.group_sale_salesman',
|
||||
'project_agent': 'project.group_project_user',
|
||||
'expenses_agent': 'hr_expense.group_hr_expense_user',
|
||||
'employees_agent': 'hr.group_hr_user',
|
||||
}
|
||||
|
||||
|
||||
class MasterAgent:
|
||||
def __init__(self, odoo, llm, memory: MemoryManager, registry):
|
||||
self._odoo = odoo
|
||||
self._llm = llm
|
||||
self._memory = memory
|
||||
self._registry = registry
|
||||
|
||||
def build_system_prompt(self, active_agents):
|
||||
template = _load_prompt('master_system.txt')
|
||||
agent_list = chr(10).join(
|
||||
f'- {a["agent_key"]}: {a["capabilities_summary"]}' for a in active_agents)
|
||||
return template.format(agent_list=agent_list)
|
||||
|
||||
async def handle_message(self, user_id, channel_id, message, directive_id) -> MasterResponse:
|
||||
logger.info('MasterAgent.handle_message user_id=%s directive=%s', user_id, directive_id)
|
||||
t0 = time.monotonic()
|
||||
await self._log_directive_start(directive_id, user_id, channel_id, message)
|
||||
try:
|
||||
context = await self._build_context(user_id, message)
|
||||
intent = await self._classify_intent(context, message)
|
||||
if intent.needs_clarification:
|
||||
q = intent.clarification_question or 'Could you provide more details?'
|
||||
await self._memory.append_message(user_id, 'assistant', q, directive_id)
|
||||
await self._log_directive_complete(directive_id, 'awaiting_clarification', q)
|
||||
return MasterResponse(directive_id=directive_id, response=q,
|
||||
status='awaiting_clarification')
|
||||
access = await self._check_access(user_id, intent.agents)
|
||||
if not access.allowed:
|
||||
msg = f'You don' + chr(39) + 't have access to: {chr(44).join(access.denied_agents)}.'
|
||||
await self._memory.append_message(user_id, 'assistant', msg, directive_id)
|
||||
await self._log_directive_complete(directive_id, 'failed', msg)
|
||||
return MasterResponse(directive_id=directive_id, response=msg, status='failed')
|
||||
directives = await self._build_directives(intent, context, directive_id)
|
||||
reports = await self._dispatch_agents(directives)
|
||||
response_text = await self._synthesize(reports, context)
|
||||
await self._update_memory(user_id, message, response_text, reports, directive_id)
|
||||
all_escalations = []
|
||||
all_actions = []
|
||||
status = 'complete'
|
||||
for r in reports:
|
||||
all_escalations.extend(r.escalations)
|
||||
all_actions.extend(r.actions_taken)
|
||||
if r.status in ('failed', 'escalated'):
|
||||
status = 'partial' if status == 'complete' else status
|
||||
if all_escalations:
|
||||
status = 'awaiting_approval'
|
||||
await self._log_directive_complete(directive_id, status, response_text, all_actions, all_escalations)
|
||||
ms = int((time.monotonic() - t0) * 1000)
|
||||
logger.info('MasterAgent complete directive=%s status=%s ms=%d', directive_id, status, ms)
|
||||
return MasterResponse(directive_id=directive_id, response=response_text,
|
||||
status=status, escalations=all_escalations, actions_taken=all_actions)
|
||||
except Exception as exc:
|
||||
logger.error('MasterAgent FAILED directive=%s: %s', directive_id, exc)
|
||||
err_msg = 'I encountered an error processing your request. Please try again or contact your administrator.'
|
||||
await self._log_directive_complete(directive_id, 'failed', err_msg, error=str(exc))
|
||||
return MasterResponse(directive_id=directive_id, response=err_msg, status='failed')
|
||||
|
||||
async def _build_context(self, user_id, message) -> MasterContext:
|
||||
hint = message[:40] if message else None
|
||||
return await self._memory.build_context(user_id, intent_hint=hint)
|
||||
|
||||
async def _classify_intent(self, context: MasterContext, message) -> IntentResult:
|
||||
active_agents = await self._registry.get_active_agents()
|
||||
system = self.build_system_prompt(active_agents)
|
||||
history = [
|
||||
{'role': m['role'] if m['role'] != 'system' else 'user', 'content': m['content']}
|
||||
for m in context.conversation[-20:]
|
||||
if m['role'] in ('user', 'assistant')
|
||||
]
|
||||
msgs = [{'role': 'system', 'content': system}, *history,
|
||||
{'role': 'user', 'content': message}]
|
||||
resp = await self._llm.submit(msgs, caller='master')
|
||||
try:
|
||||
raw = resp.content.strip()
|
||||
if raw.startswith(chr(96)*3):
|
||||
raw = raw.split(chr(10), 1)[1].rsplit(chr(10), 1)[0]
|
||||
data = json.loads(raw)
|
||||
return IntentResult(
|
||||
needs_clarification=data.get('needs_clarification', False),
|
||||
clarification_question=data.get('clarification_question'),
|
||||
is_continuation=data.get('is_continuation', False),
|
||||
agents=data.get('agents', []),
|
||||
intent_summary=data.get('intent_summary', ''),
|
||||
params=data.get('params', {}),
|
||||
context_hints=data.get('context_hints', []))
|
||||
except (json.JSONDecodeError, KeyError) as exc:
|
||||
logger.warning('Intent classification parse failed: %s', exc)
|
||||
return IntentResult(needs_clarification=True,
|
||||
clarification_question='Could you clarify what you need?')
|
||||
|
||||
async def _check_access(self, user_id, agents) -> AccessResult:
|
||||
denied = []
|
||||
try:
|
||||
user_data = await self._odoo.call('res.users', 'read', [[user_id]], {'fields': ['groups_id']})
|
||||
group_ids = user_data[0].get('groups_id', []) if user_data else []
|
||||
group_rows = await self._odoo.search_read('res.groups', [['id', 'in', group_ids]], ['full_name'])
|
||||
user_group_names = {r['full_name'] for r in group_rows}
|
||||
except Exception as exc:
|
||||
logger.warning('Access check failed, permitting: %s', exc)
|
||||
return AccessResult(allowed=True)
|
||||
for agent_key in agents:
|
||||
required = AGENT_ACCESS_GROUPS.get(agent_key)
|
||||
if required and required not in user_group_names:
|
||||
denied.append(agent_key)
|
||||
if denied:
|
||||
return AccessResult(allowed=False, denied_agents=denied)
|
||||
return AccessResult(allowed=True)
|
||||
|
||||
async def _build_directives(self, intent: IntentResult, context: MasterContext, directive_id) -> list:
|
||||
directives = []
|
||||
for agent_key in intent.agents:
|
||||
ctx = DirectiveContext(
|
||||
client_profile=context.knowledge,
|
||||
recent_findings=context.operational_findings,
|
||||
conversation_summary=chr(10).join(
|
||||
m['content'] for m in context.conversation[-5:] if m['role'] == 'assistant'),
|
||||
peer_data={})
|
||||
d = AgentDirective(
|
||||
directive_id=directive_id, agent=agent_key, task=intent.intent_summary,
|
||||
params=intent.params, context=ctx,
|
||||
authorized_actions=['read', 'search', 'report', 'post_chatter',
|
||||
'send_email', 'create_non_financial', 'write_non_financial'],
|
||||
constraints={'max_amount': 5000})
|
||||
directives.append(d)
|
||||
return directives
|
||||
|
||||
async def _dispatch_agents(self, directives) -> list:
|
||||
if len(directives) == 1:
|
||||
d = directives[0]
|
||||
agent = self._registry.get_agent_instance(d.agent)
|
||||
if agent is None:
|
||||
return [AgentReport(directive_id=d.directive_id, agent=d.agent,
|
||||
status='failed', summary=f'Agent {d.agent} not loaded')]
|
||||
return [await agent.execute(d)]
|
||||
tasks = [self._registry.get_agent_instance(d.agent).execute(d)
|
||||
for d in directives if self._registry.get_agent_instance(d.agent)]
|
||||
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
reports = []
|
||||
for i, r in enumerate(results):
|
||||
if isinstance(r, Exception):
|
||||
d = directives[i]
|
||||
reports.append(AgentReport(directive_id=d.directive_id, agent=d.agent,
|
||||
status='failed', summary=str(r)))
|
||||
else:
|
||||
reports.append(r)
|
||||
return reports
|
||||
|
||||
async def _synthesize(self, reports, context: MasterContext) -> str:
|
||||
if not reports:
|
||||
return 'No agent responses received.'
|
||||
if len(reports) == 1 and reports[0].status == 'complete':
|
||||
return reports[0].summary
|
||||
summaries = chr(10).join(f'{r.agent}: {r.summary}' for r in reports)
|
||||
msg = ('Synthesize these agent reports into one coherent response. '
|
||||
'Business language only. No internal IDs. '
|
||||
'Separate: actions completed, items pending approval, recommendations.'
|
||||
+ chr(10) + summaries)
|
||||
resp = await self._llm.submit(
|
||||
[{'role': 'system', 'content': 'You are a business intelligence assistant.'},
|
||||
{'role': 'user', 'content': msg}],
|
||||
caller='master_synthesis')
|
||||
return resp.content
|
||||
|
||||
async def _update_memory(self, user_id, message, response, reports, directive_id):
|
||||
await self._memory.append_message(user_id, 'user', message, directive_id)
|
||||
await self._memory.append_message(user_id, 'assistant', response, directive_id)
|
||||
for report in reports:
|
||||
if report.data:
|
||||
await self._memory.store_findings(
|
||||
scope=report.agent.replace('_agent', ''),
|
||||
summary=report.summary, raw_data=report.data,
|
||||
source_directive_id=directive_id)
|
||||
|
||||
async def handle_approval(self, directive_id, item_id, approved, approver_uid) -> str:
|
||||
if approved:
|
||||
return f'Approval recorded. Re-executing approved action for {directive_id}.'
|
||||
return 'Action rejected and recorded.'
|
||||
|
||||
async def is_continuation(self, user_id, message) -> bool:
|
||||
recent = await self._memory.get_conversation(user_id, limit=3)
|
||||
return bool(recent)
|
||||
|
||||
async def _log_directive_start(self, directive_id, user_id, channel_id, message):
|
||||
try:
|
||||
pool = self._odoo.pg_pool
|
||||
if not pool: return
|
||||
sql = ('INSERT INTO ab_directive_log '
|
||||
'(directive_id, user_id, channel_id, raw_message, status) '
|
||||
'VALUES ($1, $2, $3, $4, $5) ON CONFLICT (directive_id) DO NOTHING')
|
||||
async with pool.acquire(timeout=10) as conn:
|
||||
await conn.execute(sql, directive_id, user_id, channel_id, message, 'processing')
|
||||
except Exception as exc:
|
||||
logger.warning('_log_directive_start failed: %s', exc)
|
||||
|
||||
async def _log_directive_complete(self, directive_id, status, response,
|
||||
actions=None, escalations=None, error=None):
|
||||
try:
|
||||
pool = self._odoo.pg_pool
|
||||
if not pool: return
|
||||
sql = ('UPDATE ab_directive_log SET status=$2, final_response=$3, '
|
||||
'actions_taken=$4, escalations=$5, completed_at=NOW(), error=$6 '
|
||||
'WHERE directive_id=$1')
|
||||
async with pool.acquire(timeout=10) as conn:
|
||||
await conn.execute(sql, directive_id, status, response,
|
||||
json.dumps(actions or []), json.dumps(escalations or []), error)
|
||||
except Exception as exc:
|
||||
logger.warning('_log_directive_complete failed: %s', exc)
|
||||
35
agent_service/prompts/master_system.txt
Normal file
35
agent_service/prompts/master_system.txt
Normal file
@@ -0,0 +1,35 @@
|
||||
You are ActiveBlue AI - the central intelligence for Active Blue LLC Odoo instance.
|
||||
Active Blue is an MSP serving medical and dental practices at 8 locations across
|
||||
Miami, Dadeland, Tomball, Hollywood, and Miami Lakes (FL and TX).
|
||||
|
||||
Your role:
|
||||
1. Understand intent from natural language using full conversation context
|
||||
2. Build precise directives for specialist agents with injected memory context
|
||||
3. Synthesize agent reports into one coherent user response
|
||||
4. Update memory with new findings after every interaction
|
||||
|
||||
You are the ONLY entity that communicates with users.
|
||||
You do NOT act on Odoo directly.
|
||||
|
||||
Active specialist agents:
|
||||
{agent_list}
|
||||
|
||||
If a user requests something for an agent not listed, tell them the Odoo module is not installed.
|
||||
|
||||
Rules:
|
||||
- Ask ONE clarifying question if intent is ambiguous - then dispatch
|
||||
- Confirm multi-step plans before executing
|
||||
- Surface escalations with approve/reject options
|
||||
- Never expose agent names, tool names, or system internals to users
|
||||
- HIPAA: Never include patient names, MRN, DOB, or any PHI in responses
|
||||
|
||||
Classify intent in JSON only:
|
||||
{
|
||||
"needs_clarification": false,
|
||||
"clarification_question": null,
|
||||
"is_continuation": false,
|
||||
"agents": ["finance_agent"],
|
||||
"intent_summary": "...",
|
||||
"params": {},
|
||||
"context_hints": []
|
||||
}
|
||||
Reference in New Issue
Block a user