The classifier was silently falling back to a clarification prompt every
time the LLM wrapped its JSON in markdown fences, prefixed it with
'json', or added surrounding prose. The bot then asked 'Could you
clarify what you need?' to every message regardless of clarity.
Now: strip code fences, slice to the first {...} block, and on parse
failure log the raw content (truncated) and treat the message as 'no
specialist agent' so the direct-answer fallback responds instead of
looping on clarification.
318 lines
16 KiB
Python
318 lines
16 KiB
Python
from __future__ import annotations
|
|
import asyncio, json, logging, os, time, uuid
|
|
from dataclasses import dataclass, field
|
|
from .base_agent import AgentDirective, DirectiveContext, AgentReport
|
|
from .peer_bus import PeerBus
|
|
from ..memory.memory_manager import MemoryManager, MasterContext
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class IntentResult:
|
|
needs_clarification: bool = False
|
|
clarification_question: object = None
|
|
is_continuation: bool = False
|
|
agents: list = field(default_factory=list)
|
|
intent_summary: str = ''
|
|
params: dict = field(default_factory=dict)
|
|
context_hints: list = field(default_factory=list)
|
|
|
|
|
|
@dataclass
|
|
class AccessResult:
|
|
allowed: bool
|
|
denied_agents: list = field(default_factory=list)
|
|
reason: str = ''
|
|
|
|
|
|
@dataclass
|
|
class MasterResponse:
|
|
directive_id: str
|
|
response: str
|
|
status: str
|
|
escalations: list = field(default_factory=list)
|
|
actions_taken: list = field(default_factory=list)
|
|
|
|
|
|
def _load_prompt(filename):
|
|
base = os.path.dirname(__file__)
|
|
path = os.path.join(base, '..', 'prompts', filename)
|
|
try:
|
|
with open(path) as f:
|
|
return f.read()
|
|
except FileNotFoundError:
|
|
logger.warning('Prompt file not found: %s', path)
|
|
return ''
|
|
|
|
|
|
AGENT_ACCESS_GROUPS = {
|
|
'finance_agent': 'account.group_account_user',
|
|
'accounting_agent': 'account.group_account_user',
|
|
'sales_agent': 'sales_team.group_sale_salesman',
|
|
'project_agent': 'project.group_project_user',
|
|
'expenses_agent': 'hr_expense.group_hr_expense_user',
|
|
'employees_agent': 'hr.group_hr_user',
|
|
}
|
|
|
|
|
|
class MasterAgent:
|
|
def __init__(self, odoo, llm, memory: MemoryManager, registry):
|
|
self._odoo = odoo
|
|
self._llm = llm
|
|
self._memory = memory
|
|
self._registry = registry
|
|
|
|
def build_system_prompt(self, active_agents):
|
|
template = _load_prompt('master_system.txt')
|
|
agent_list = chr(10).join(
|
|
f'- {a["agent_key"]}: {a["capabilities_summary"]}' for a in active_agents)
|
|
# Plain substitution — the template contains literal { } from a JSON
|
|
# example block, so str.format would treat them as fields.
|
|
return template.replace('{agent_list}', agent_list)
|
|
|
|
async def handle_message(self, user_id, channel_id, message, directive_id) -> MasterResponse:
|
|
try:
|
|
user_id = int(user_id)
|
|
except (TypeError, ValueError):
|
|
pass
|
|
logger.info('MasterAgent.handle_message user_id=%s directive=%s', user_id, directive_id)
|
|
t0 = time.monotonic()
|
|
await self._log_directive_start(directive_id, user_id, channel_id, message)
|
|
try:
|
|
# Persist the user message FIRST so it's part of context on every
|
|
# branch (clarification, access-denied, happy path). Without this,
|
|
# clarification turns lose the original question and the bot can't
|
|
# connect follow-up replies to the in-flight conversation.
|
|
await self._memory.append_message(user_id, 'user', message, directive_id)
|
|
context = await self._build_context(user_id, message)
|
|
intent = await self._classify_intent(context, message)
|
|
if intent.needs_clarification:
|
|
q = intent.clarification_question or 'Could you provide more details?'
|
|
await self._memory.append_message(user_id, 'assistant', q, directive_id)
|
|
await self._log_directive_complete(directive_id, 'awaiting_clarification', q)
|
|
return MasterResponse(directive_id=directive_id, response=q,
|
|
status='awaiting_clarification')
|
|
if not intent.agents:
|
|
# No specialist agent applies — answer directly with the LLM.
|
|
response_text = await self._direct_answer(context, message)
|
|
await self._memory.append_message(user_id, 'assistant', response_text, directive_id)
|
|
await self._log_directive_complete(directive_id, 'complete', response_text)
|
|
return MasterResponse(directive_id=directive_id, response=response_text,
|
|
status='complete')
|
|
access = await self._check_access(user_id, intent.agents)
|
|
if not access.allowed:
|
|
denied = ', '.join(access.denied_agents)
|
|
msg = "You don't have access to: " + denied + '.'
|
|
await self._memory.append_message(user_id, 'assistant', msg, directive_id)
|
|
await self._log_directive_complete(directive_id, 'failed', msg)
|
|
return MasterResponse(directive_id=directive_id, response=msg, status='failed')
|
|
directives = await self._build_directives(intent, context, directive_id)
|
|
reports = await self._dispatch_agents(directives)
|
|
response_text = await self._synthesize(reports, context)
|
|
await self._update_memory(user_id, message, response_text, reports, directive_id)
|
|
all_escalations = []
|
|
all_actions = []
|
|
status = 'complete'
|
|
for r in reports:
|
|
all_escalations.extend(r.escalations)
|
|
all_actions.extend(r.actions_taken)
|
|
if r.status in ('failed', 'escalated'):
|
|
status = 'partial' if status == 'complete' else status
|
|
if all_escalations:
|
|
status = 'awaiting_approval'
|
|
await self._log_directive_complete(directive_id, status, response_text, all_actions, all_escalations)
|
|
ms = int((time.monotonic() - t0) * 1000)
|
|
logger.info('MasterAgent complete directive=%s status=%s ms=%d', directive_id, status, ms)
|
|
return MasterResponse(directive_id=directive_id, response=response_text,
|
|
status=status, escalations=all_escalations, actions_taken=all_actions)
|
|
except Exception as exc:
|
|
logger.exception('MasterAgent FAILED directive=%s: %s', directive_id, exc)
|
|
err_msg = 'I encountered an error processing your request. Please try again or contact your administrator.'
|
|
await self._log_directive_complete(directive_id, 'failed', err_msg, error=str(exc))
|
|
return MasterResponse(directive_id=directive_id, response=err_msg, status='failed')
|
|
|
|
async def _build_context(self, user_id, message) -> MasterContext:
|
|
hint = message[:40] if message else None
|
|
return await self._memory.build_context(user_id, intent_hint=hint)
|
|
|
|
async def _classify_intent(self, context: MasterContext, message) -> IntentResult:
|
|
active_agents = await self._registry.get_active_agents()
|
|
system = self.build_system_prompt(active_agents)
|
|
history = [
|
|
{'role': m['role'] if m['role'] != 'system' else 'user', 'content': m['content']}
|
|
for m in context.conversation[-20:]
|
|
if m['role'] in ('user', 'assistant')
|
|
]
|
|
msgs = [{'role': 'system', 'content': system}, *history,
|
|
{'role': 'user', 'content': message}]
|
|
resp = await self._llm.submit(msgs, caller='master')
|
|
raw = (resp.content or '').strip()
|
|
# Strip markdown fences like ```json ... ```
|
|
if raw.startswith('```'):
|
|
raw = raw.strip('`')
|
|
if raw.lower().startswith('json'):
|
|
raw = raw[4:]
|
|
raw = raw.strip()
|
|
# Pull out the first {...} block if there's surrounding prose.
|
|
first = raw.find('{')
|
|
last = raw.rfind('}')
|
|
if first != -1 and last != -1 and last > first:
|
|
raw = raw[first:last + 1]
|
|
try:
|
|
data = json.loads(raw)
|
|
if not isinstance(data, dict):
|
|
raise ValueError(f'expected JSON object, got {type(data).__name__}')
|
|
return IntentResult(
|
|
needs_clarification=bool(data.get('needs_clarification', False)),
|
|
clarification_question=data.get('clarification_question'),
|
|
is_continuation=bool(data.get('is_continuation', False)),
|
|
agents=data.get('agents') or [],
|
|
intent_summary=data.get('intent_summary', '') or '',
|
|
params=data.get('params') or {},
|
|
context_hints=data.get('context_hints') or [])
|
|
except Exception as exc:
|
|
logger.warning('Intent classification parse failed (%s); raw=%r',
|
|
exc, (resp.content or '')[:300])
|
|
# Treat unparseable LLM output as 'no specialist agent applies' so
|
|
# the direct-answer fallback can handle it instead of looping on
|
|
# clarification.
|
|
return IntentResult(needs_clarification=False, agents=[])
|
|
|
|
async def _check_access(self, user_id, agents) -> AccessResult:
|
|
denied = []
|
|
try:
|
|
user_data = await self._odoo.call('res.users', 'read', [[user_id]], {'fields': ['groups_id']})
|
|
group_ids = user_data[0].get('groups_id', []) if user_data else []
|
|
group_rows = await self._odoo.search_read('res.groups', [['id', 'in', group_ids]], ['full_name'])
|
|
user_group_names = {r['full_name'] for r in group_rows}
|
|
except Exception as exc:
|
|
logger.warning('Access check failed, permitting: %s', exc)
|
|
return AccessResult(allowed=True)
|
|
for agent_key in agents:
|
|
required = AGENT_ACCESS_GROUPS.get(agent_key)
|
|
if required and required not in user_group_names:
|
|
denied.append(agent_key)
|
|
if denied:
|
|
return AccessResult(allowed=False, denied_agents=denied)
|
|
return AccessResult(allowed=True)
|
|
|
|
async def _build_directives(self, intent: IntentResult, context: MasterContext, directive_id) -> list:
|
|
directives = []
|
|
for agent_key in intent.agents:
|
|
ctx = DirectiveContext(
|
|
client_profile=context.knowledge,
|
|
recent_findings=context.operational_findings,
|
|
conversation_summary=chr(10).join(
|
|
m['content'] for m in context.conversation[-5:] if m['role'] == 'assistant'),
|
|
peer_data={})
|
|
d = AgentDirective(
|
|
directive_id=directive_id, agent=agent_key, task=intent.intent_summary,
|
|
params=intent.params, context=ctx,
|
|
authorized_actions=['read', 'search', 'report', 'post_chatter',
|
|
'send_email', 'create_non_financial', 'write_non_financial'],
|
|
constraints={'max_amount': 5000})
|
|
directives.append(d)
|
|
return directives
|
|
|
|
async def _dispatch_agents(self, directives) -> list:
|
|
if len(directives) == 1:
|
|
d = directives[0]
|
|
agent = self._registry.get_agent_instance(d.agent)
|
|
if agent is None:
|
|
return [AgentReport(directive_id=d.directive_id, agent=d.agent,
|
|
status='failed', summary=f'Agent {d.agent} not loaded')]
|
|
return [await agent.execute(d)]
|
|
tasks = [self._registry.get_agent_instance(d.agent).execute(d)
|
|
for d in directives if self._registry.get_agent_instance(d.agent)]
|
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
reports = []
|
|
for i, r in enumerate(results):
|
|
if isinstance(r, Exception):
|
|
d = directives[i]
|
|
reports.append(AgentReport(directive_id=d.directive_id, agent=d.agent,
|
|
status='failed', summary=str(r)))
|
|
else:
|
|
reports.append(r)
|
|
return reports
|
|
|
|
async def _direct_answer(self, context: MasterContext, message) -> str:
|
|
"""Answer general / off-topic messages directly without dispatching agents."""
|
|
history = [
|
|
{'role': m['role'], 'content': m['content']}
|
|
for m in context.conversation[-10:]
|
|
if m.get('role') in ('user', 'assistant')
|
|
]
|
|
system = (
|
|
"You are ActiveBlue AI, the assistant for the Active Blue Odoo "
|
|
"instance. Answer the user directly and concisely. Use any prior "
|
|
"conversation for context. Plain text only — no JSON, no markdown "
|
|
"code fences."
|
|
)
|
|
msgs = [{'role': 'system', 'content': system}, *history,
|
|
{'role': 'user', 'content': message}]
|
|
resp = await self._llm.submit(msgs, caller='master_direct')
|
|
return (resp.content or '').strip() or 'Sorry, I have no answer for that.'
|
|
|
|
async def _synthesize(self, reports, context: MasterContext) -> str:
|
|
if not reports:
|
|
return 'No agent responses received.'
|
|
if len(reports) == 1 and reports[0].status == 'complete':
|
|
return reports[0].summary
|
|
summaries = chr(10).join(f'{r.agent}: {r.summary}' for r in reports)
|
|
msg = ('Synthesize these agent reports into one coherent response. '
|
|
'Business language only. No internal IDs. '
|
|
'Separate: actions completed, items pending approval, recommendations.'
|
|
+ chr(10) + summaries)
|
|
resp = await self._llm.submit(
|
|
[{'role': 'system', 'content': 'You are a business intelligence assistant.'},
|
|
{'role': 'user', 'content': msg}],
|
|
caller='master_synthesis')
|
|
return resp.content
|
|
|
|
async def _update_memory(self, user_id, message, response, reports, directive_id):
|
|
# User message is persisted at the top of handle_message — only save
|
|
# the assistant reply here.
|
|
await self._memory.append_message(user_id, 'assistant', response, directive_id)
|
|
for report in reports:
|
|
if report.data:
|
|
await self._memory.store_findings(
|
|
scope=report.agent.replace('_agent', ''),
|
|
summary=report.summary, raw_data=report.data,
|
|
source_directive_id=directive_id)
|
|
|
|
async def handle_approval(self, directive_id, item_id, approved, approver_uid) -> str:
|
|
if approved:
|
|
return f'Approval recorded. Re-executing approved action for {directive_id}.'
|
|
return 'Action rejected and recorded.'
|
|
|
|
async def is_continuation(self, user_id, message) -> bool:
|
|
recent = await self._memory.get_conversation(user_id, limit=3)
|
|
return bool(recent)
|
|
|
|
async def _log_directive_start(self, directive_id, user_id, channel_id, message):
|
|
try:
|
|
pool = self._odoo.pg_pool
|
|
if not pool: return
|
|
sql = ('INSERT INTO ab_directive_log '
|
|
'(directive_id, user_id, channel_id, raw_message, status) '
|
|
'VALUES ($1, $2, $3, $4, $5) ON CONFLICT (directive_id) DO NOTHING')
|
|
async with pool.acquire(timeout=10) as conn:
|
|
await conn.execute(sql, directive_id, user_id, channel_id, message, 'processing')
|
|
except Exception as exc:
|
|
logger.warning('_log_directive_start failed: %s', exc)
|
|
|
|
async def _log_directive_complete(self, directive_id, status, response,
|
|
actions=None, escalations=None, error=None):
|
|
try:
|
|
pool = self._odoo.pg_pool
|
|
if not pool: return
|
|
sql = ('UPDATE ab_directive_log SET status=$2, final_response=$3, '
|
|
'actions_taken=$4, escalations=$5, completed_at=NOW(), error=$6 '
|
|
'WHERE directive_id=$1')
|
|
async with pool.acquire(timeout=10) as conn:
|
|
await conn.execute(sql, directive_id, status, response,
|
|
json.dumps(actions or []), json.dumps(escalations or []), error)
|
|
except Exception as exc:
|
|
logger.warning('_log_directive_complete failed: %s', exc)
|