The prompt template contains a literal JSON example block ({"needs_clarification": ...})
which str.format() tried to interpret as format fields, raising KeyError on every
Discuss DM. Switch to .replace() so braces in the template are taken literally.
272 lines
13 KiB
Python
272 lines
13 KiB
Python
from __future__ import annotations
|
|
import asyncio, json, logging, os, time, uuid
|
|
from dataclasses import dataclass, field
|
|
from .base_agent import AgentDirective, DirectiveContext, AgentReport
|
|
from .peer_bus import PeerBus
|
|
from ..memory.memory_manager import MemoryManager, MasterContext
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class IntentResult:
|
|
needs_clarification: bool = False
|
|
clarification_question: object = None
|
|
is_continuation: bool = False
|
|
agents: list = field(default_factory=list)
|
|
intent_summary: str = ''
|
|
params: dict = field(default_factory=dict)
|
|
context_hints: list = field(default_factory=list)
|
|
|
|
|
|
@dataclass
|
|
class AccessResult:
|
|
allowed: bool
|
|
denied_agents: list = field(default_factory=list)
|
|
reason: str = ''
|
|
|
|
|
|
@dataclass
|
|
class MasterResponse:
|
|
directive_id: str
|
|
response: str
|
|
status: str
|
|
escalations: list = field(default_factory=list)
|
|
actions_taken: list = field(default_factory=list)
|
|
|
|
|
|
def _load_prompt(filename):
|
|
base = os.path.dirname(__file__)
|
|
path = os.path.join(base, '..', 'prompts', filename)
|
|
try:
|
|
with open(path) as f:
|
|
return f.read()
|
|
except FileNotFoundError:
|
|
logger.warning('Prompt file not found: %s', path)
|
|
return ''
|
|
|
|
|
|
AGENT_ACCESS_GROUPS = {
|
|
'finance_agent': 'account.group_account_user',
|
|
'accounting_agent': 'account.group_account_user',
|
|
'sales_agent': 'sales_team.group_sale_salesman',
|
|
'project_agent': 'project.group_project_user',
|
|
'expenses_agent': 'hr_expense.group_hr_expense_user',
|
|
'employees_agent': 'hr.group_hr_user',
|
|
}
|
|
|
|
|
|
class MasterAgent:
|
|
def __init__(self, odoo, llm, memory: MemoryManager, registry):
|
|
self._odoo = odoo
|
|
self._llm = llm
|
|
self._memory = memory
|
|
self._registry = registry
|
|
|
|
def build_system_prompt(self, active_agents):
|
|
template = _load_prompt('master_system.txt')
|
|
agent_list = chr(10).join(
|
|
f'- {a["agent_key"]}: {a["capabilities_summary"]}' for a in active_agents)
|
|
# Plain substitution — the template contains literal { } from a JSON
|
|
# example block, so str.format would treat them as fields.
|
|
return template.replace('{agent_list}', agent_list)
|
|
|
|
async def handle_message(self, user_id, channel_id, message, directive_id) -> MasterResponse:
|
|
try:
|
|
user_id = int(user_id)
|
|
except (TypeError, ValueError):
|
|
pass
|
|
logger.info('MasterAgent.handle_message user_id=%s directive=%s', user_id, directive_id)
|
|
t0 = time.monotonic()
|
|
await self._log_directive_start(directive_id, user_id, channel_id, message)
|
|
try:
|
|
context = await self._build_context(user_id, message)
|
|
intent = await self._classify_intent(context, message)
|
|
if intent.needs_clarification:
|
|
q = intent.clarification_question or 'Could you provide more details?'
|
|
await self._memory.append_message(user_id, 'assistant', q, directive_id)
|
|
await self._log_directive_complete(directive_id, 'awaiting_clarification', q)
|
|
return MasterResponse(directive_id=directive_id, response=q,
|
|
status='awaiting_clarification')
|
|
access = await self._check_access(user_id, intent.agents)
|
|
if not access.allowed:
|
|
msg = f'You don' + chr(39) + 't have access to: {chr(44).join(access.denied_agents)}.'
|
|
await self._memory.append_message(user_id, 'assistant', msg, directive_id)
|
|
await self._log_directive_complete(directive_id, 'failed', msg)
|
|
return MasterResponse(directive_id=directive_id, response=msg, status='failed')
|
|
directives = await self._build_directives(intent, context, directive_id)
|
|
reports = await self._dispatch_agents(directives)
|
|
response_text = await self._synthesize(reports, context)
|
|
await self._update_memory(user_id, message, response_text, reports, directive_id)
|
|
all_escalations = []
|
|
all_actions = []
|
|
status = 'complete'
|
|
for r in reports:
|
|
all_escalations.extend(r.escalations)
|
|
all_actions.extend(r.actions_taken)
|
|
if r.status in ('failed', 'escalated'):
|
|
status = 'partial' if status == 'complete' else status
|
|
if all_escalations:
|
|
status = 'awaiting_approval'
|
|
await self._log_directive_complete(directive_id, status, response_text, all_actions, all_escalations)
|
|
ms = int((time.monotonic() - t0) * 1000)
|
|
logger.info('MasterAgent complete directive=%s status=%s ms=%d', directive_id, status, ms)
|
|
return MasterResponse(directive_id=directive_id, response=response_text,
|
|
status=status, escalations=all_escalations, actions_taken=all_actions)
|
|
except Exception as exc:
|
|
logger.exception('MasterAgent FAILED directive=%s: %s', directive_id, exc)
|
|
err_msg = 'I encountered an error processing your request. Please try again or contact your administrator.'
|
|
await self._log_directive_complete(directive_id, 'failed', err_msg, error=str(exc))
|
|
return MasterResponse(directive_id=directive_id, response=err_msg, status='failed')
|
|
|
|
async def _build_context(self, user_id, message) -> MasterContext:
|
|
hint = message[:40] if message else None
|
|
return await self._memory.build_context(user_id, intent_hint=hint)
|
|
|
|
async def _classify_intent(self, context: MasterContext, message) -> IntentResult:
|
|
active_agents = await self._registry.get_active_agents()
|
|
system = self.build_system_prompt(active_agents)
|
|
history = [
|
|
{'role': m['role'] if m['role'] != 'system' else 'user', 'content': m['content']}
|
|
for m in context.conversation[-20:]
|
|
if m['role'] in ('user', 'assistant')
|
|
]
|
|
msgs = [{'role': 'system', 'content': system}, *history,
|
|
{'role': 'user', 'content': message}]
|
|
resp = await self._llm.submit(msgs, caller='master')
|
|
try:
|
|
raw = resp.content.strip()
|
|
if raw.startswith(chr(96)*3):
|
|
raw = raw.split(chr(10), 1)[1].rsplit(chr(10), 1)[0]
|
|
data = json.loads(raw)
|
|
return IntentResult(
|
|
needs_clarification=data.get('needs_clarification', False),
|
|
clarification_question=data.get('clarification_question'),
|
|
is_continuation=data.get('is_continuation', False),
|
|
agents=data.get('agents', []),
|
|
intent_summary=data.get('intent_summary', ''),
|
|
params=data.get('params', {}),
|
|
context_hints=data.get('context_hints', []))
|
|
except (json.JSONDecodeError, KeyError) as exc:
|
|
logger.warning('Intent classification parse failed: %s', exc)
|
|
return IntentResult(needs_clarification=True,
|
|
clarification_question='Could you clarify what you need?')
|
|
|
|
async def _check_access(self, user_id, agents) -> AccessResult:
|
|
denied = []
|
|
try:
|
|
user_data = await self._odoo.call('res.users', 'read', [[user_id]], {'fields': ['groups_id']})
|
|
group_ids = user_data[0].get('groups_id', []) if user_data else []
|
|
group_rows = await self._odoo.search_read('res.groups', [['id', 'in', group_ids]], ['full_name'])
|
|
user_group_names = {r['full_name'] for r in group_rows}
|
|
except Exception as exc:
|
|
logger.warning('Access check failed, permitting: %s', exc)
|
|
return AccessResult(allowed=True)
|
|
for agent_key in agents:
|
|
required = AGENT_ACCESS_GROUPS.get(agent_key)
|
|
if required and required not in user_group_names:
|
|
denied.append(agent_key)
|
|
if denied:
|
|
return AccessResult(allowed=False, denied_agents=denied)
|
|
return AccessResult(allowed=True)
|
|
|
|
async def _build_directives(self, intent: IntentResult, context: MasterContext, directive_id) -> list:
|
|
directives = []
|
|
for agent_key in intent.agents:
|
|
ctx = DirectiveContext(
|
|
client_profile=context.knowledge,
|
|
recent_findings=context.operational_findings,
|
|
conversation_summary=chr(10).join(
|
|
m['content'] for m in context.conversation[-5:] if m['role'] == 'assistant'),
|
|
peer_data={})
|
|
d = AgentDirective(
|
|
directive_id=directive_id, agent=agent_key, task=intent.intent_summary,
|
|
params=intent.params, context=ctx,
|
|
authorized_actions=['read', 'search', 'report', 'post_chatter',
|
|
'send_email', 'create_non_financial', 'write_non_financial'],
|
|
constraints={'max_amount': 5000})
|
|
directives.append(d)
|
|
return directives
|
|
|
|
async def _dispatch_agents(self, directives) -> list:
|
|
if len(directives) == 1:
|
|
d = directives[0]
|
|
agent = self._registry.get_agent_instance(d.agent)
|
|
if agent is None:
|
|
return [AgentReport(directive_id=d.directive_id, agent=d.agent,
|
|
status='failed', summary=f'Agent {d.agent} not loaded')]
|
|
return [await agent.execute(d)]
|
|
tasks = [self._registry.get_agent_instance(d.agent).execute(d)
|
|
for d in directives if self._registry.get_agent_instance(d.agent)]
|
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
reports = []
|
|
for i, r in enumerate(results):
|
|
if isinstance(r, Exception):
|
|
d = directives[i]
|
|
reports.append(AgentReport(directive_id=d.directive_id, agent=d.agent,
|
|
status='failed', summary=str(r)))
|
|
else:
|
|
reports.append(r)
|
|
return reports
|
|
|
|
async def _synthesize(self, reports, context: MasterContext) -> str:
|
|
if not reports:
|
|
return 'No agent responses received.'
|
|
if len(reports) == 1 and reports[0].status == 'complete':
|
|
return reports[0].summary
|
|
summaries = chr(10).join(f'{r.agent}: {r.summary}' for r in reports)
|
|
msg = ('Synthesize these agent reports into one coherent response. '
|
|
'Business language only. No internal IDs. '
|
|
'Separate: actions completed, items pending approval, recommendations.'
|
|
+ chr(10) + summaries)
|
|
resp = await self._llm.submit(
|
|
[{'role': 'system', 'content': 'You are a business intelligence assistant.'},
|
|
{'role': 'user', 'content': msg}],
|
|
caller='master_synthesis')
|
|
return resp.content
|
|
|
|
async def _update_memory(self, user_id, message, response, reports, directive_id):
|
|
await self._memory.append_message(user_id, 'user', message, directive_id)
|
|
await self._memory.append_message(user_id, 'assistant', response, directive_id)
|
|
for report in reports:
|
|
if report.data:
|
|
await self._memory.store_findings(
|
|
scope=report.agent.replace('_agent', ''),
|
|
summary=report.summary, raw_data=report.data,
|
|
source_directive_id=directive_id)
|
|
|
|
async def handle_approval(self, directive_id, item_id, approved, approver_uid) -> str:
|
|
if approved:
|
|
return f'Approval recorded. Re-executing approved action for {directive_id}.'
|
|
return 'Action rejected and recorded.'
|
|
|
|
async def is_continuation(self, user_id, message) -> bool:
|
|
recent = await self._memory.get_conversation(user_id, limit=3)
|
|
return bool(recent)
|
|
|
|
async def _log_directive_start(self, directive_id, user_id, channel_id, message):
|
|
try:
|
|
pool = self._odoo.pg_pool
|
|
if not pool: return
|
|
sql = ('INSERT INTO ab_directive_log '
|
|
'(directive_id, user_id, channel_id, raw_message, status) '
|
|
'VALUES ($1, $2, $3, $4, $5) ON CONFLICT (directive_id) DO NOTHING')
|
|
async with pool.acquire(timeout=10) as conn:
|
|
await conn.execute(sql, directive_id, user_id, channel_id, message, 'processing')
|
|
except Exception as exc:
|
|
logger.warning('_log_directive_start failed: %s', exc)
|
|
|
|
async def _log_directive_complete(self, directive_id, status, response,
|
|
actions=None, escalations=None, error=None):
|
|
try:
|
|
pool = self._odoo.pg_pool
|
|
if not pool: return
|
|
sql = ('UPDATE ab_directive_log SET status=$2, final_response=$3, '
|
|
'actions_taken=$4, escalations=$5, completed_at=NOW(), error=$6 '
|
|
'WHERE directive_id=$1')
|
|
async with pool.acquire(timeout=10) as conn:
|
|
await conn.execute(sql, directive_id, status, response,
|
|
json.dumps(actions or []), json.dumps(escalations or []), error)
|
|
except Exception as exc:
|
|
logger.warning('_log_directive_complete failed: %s', exc)
|