Files
odoo-ai/agent_service/agents/expenses_agent.py
Carlos Garcia 9e3fe974dc Fix dup approval flow: preserve raw message, force expenses routing, fix HTML rendering
- master_agent: thread raw user message into extra_context and peer_data so
  expenses_agent can check it directly without relying on LLM intent_summary
- master_agent: when receipts are in extra_context always route to expenses_agent,
  so replies like 'skip duplicates' still trigger expense processing
- expenses_agent: _plan() checks peer_data raw_message alongside task so
  skip/keep keywords are detected even when master rewrites the intent
- ab_ai_mail: wrap clarification message HTML in Markup() so Odoo does not
  re-escape the tags; use <br> instead of <br/>
- ab_ai_mail: convert agent plain-text replies newlines to <br> for proper
  line-break rendering in Discuss

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-16 11:55:46 -04:00

467 lines
22 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import asyncio
import difflib
import json
import logging
from datetime import date as _date
from .base_agent import BaseAgent, AgentReport, AgentDirective, SweepReport
from ..tools.expenses_tools import ExpensesTools
logger = logging.getLogger(__name__)
EXPENSES_TOOLS = [
{'name': 'get_expenses', 'description': 'Retrieve expense records',
'parameters': {'employee_id': {'type': 'integer', 'optional': True},
'state': {'type': 'string', 'optional': True},
'date_from': {'type': 'string', 'optional': True},
'date_to': {'type': 'string', 'optional': True},
'limit': {'type': 'integer', 'optional': True}}},
{'name': 'get_expense_sheets', 'description': 'Get expense report sheets',
'parameters': {'state': {'type': 'string', 'optional': True},
'employee_id': {'type': 'integer', 'optional': True},
'limit': {'type': 'integer', 'optional': True}}},
{'name': 'get_pending_approvals', 'description': 'Get expense sheets pending approval',
'parameters': {}},
{'name': 'approve_expense_sheet', 'description': 'Approve an expense sheet',
'parameters': {'sheet_id': {'type': 'integer'}}},
{'name': 'get_expenses_summary', 'description': 'Get expense summary for a period',
'parameters': {'date_from': {'type': 'string', 'optional': True},
'date_to': {'type': 'string', 'optional': True}}},
{'name': 'get_expense_by_employee', 'description': 'Get expenses for a specific employee',
'parameters': {'employee_id': {'type': 'integer'},
'limit': {'type': 'integer', 'optional': True}}},
{'name': 'flag_for_review', 'description': 'Flag an expense for review',
'parameters': {'model': {'type': 'string'}, 'record_id': {'type': 'integer'},
'reason': {'type': 'string'},
'severity': {'type': 'string', 'optional': True}}},
{'name': 'post_chatter_note', 'description': 'Post a note on a record',
'parameters': {'model': {'type': 'string'}, 'record_id': {'type': 'integer'},
'note': {'type': 'string'}}},
]
class ExpensesAgent(BaseAgent):
name = 'expenses_agent'
domain = 'expenses'
required_odoo_module = 'hr_expense'
system_prompt_file = 'expenses_system.txt'
tools = EXPENSES_TOOLS
def __init__(self, odoo, llm, peer_bus=None):
super().__init__(odoo, llm, peer_bus)
self._et = ExpensesTools(odoo)
self._gathered_data: dict = {}
self._actions_taken: list = []
self._escalations_list: list = []
async def _plan(self) -> dict:
task = (self._directive.task if self._directive else '').lower()
receipts = getattr(self._directive.context, 'receipts', []) if self._directive else []
# The master LLM rewrites the user message into intent_summary (task).
# Also check the original raw_message threaded through peer_data so
# short replies like "skip duplicates" are detected even when rewritten.
raw_msg = ''
if self._directive and self._directive.context:
raw_msg = (self._directive.context.peer_data.get('raw_message') or '').lower()
combined = task + ' ' + raw_msg
# Detect whether the user is responding to a duplicate-approval request
skip_keywords = ('skip', 'yes', 'remove duplicate', 'exclude duplicate', 'drop duplicate')
keep_keywords = ('keep all', 'keep both', 'include all', 'no skip', "don't skip")
if any(k in combined for k in skip_keywords):
user_dup_decision = 'skip'
elif any(k in combined for k in keep_keywords):
user_dup_decision = 'keep_all'
else:
user_dup_decision = 'none' # first time through — will ask if dups found
return {
'mode': 'create_from_receipts' if receipts else 'read',
'user_dup_decision': user_dup_decision,
'fetch_summary': any(k in task for k in ('summary', 'overview')) and not receipts,
'fetch_pending': any(k in task for k in ('pending', 'approve', 'approval')) and not receipts,
'employee_id': self._directive.params.get('employee_id') if self._directive else None,
'date_from': self._directive.params.get('date_from') if self._directive else None,
'date_to': self._directive.params.get('date_to') if self._directive else None,
}
async def _gather(self, plan: dict) -> dict:
data: dict = {'mode': plan.get('mode', 'read'),
'user_dup_decision': plan.get('user_dup_decision', 'none')}
if plan.get('mode') == 'create_from_receipts':
self._gathered_data = data
return data
data['summary'] = await self._et.get_expenses_summary(
date_from=plan.get('date_from'), date_to=plan.get('date_to'),
)
if plan.get('fetch_pending'):
data['pending'] = await self._et.get_pending_approvals()
self._gathered_data = data
return data
async def _reason(self) -> dict:
data = self._gathered_data
analysis: dict = {'escalations': [], 'flags': []}
if data.get('mode') == 'create_from_receipts':
self._escalations_list = []
return analysis
summary = data.get('summary', {})
if summary.get('pending_approval_count', 0) > 10:
analysis['escalations'].append(
f'{summary["pending_approval_count"]} expense sheets pending approval.'
)
self._escalations_list = analysis['escalations']
return analysis
async def _act(self, reasoning: dict) -> list:
if self._gathered_data.get('mode') != 'create_from_receipts':
return []
receipts = getattr(self._directive.context, 'receipts', []) if self._directive else []
if not receipts:
return []
user_dup_decision = self._gathered_data.get('user_dup_decision', 'none')
user_id = (self._directive.context.peer_data.get('requesting_user_id')
if self._directive else None)
employee_id = await self._et.get_employee_id_for_user(user_id)
if not employee_id:
self._escalations_list.append(
'No employee record found for the current user; cannot create expense report.')
return []
# Fetch all expensable products once for category selection
expense_products = await self._et.get_expense_products()
default_product_id = expense_products[0]['id'] if expense_products else None
product_map = {p['id']: p['name'] for p in expense_products}
# Pass 1: byte-exact dedup (same file uploaded twice)
seen_hashes: set = set()
unique_receipts = []
for r in receipts:
h = r.get('sha256')
if h and h in seen_hashes:
logger.info('expenses_agent: skipping byte-identical receipt %s', r.get('filename'))
continue
if h:
seen_hashes.add(h)
unique_receipts.append(r)
# Parse all receipts concurrently (bounded by Ollama semaphore)
parse_tasks = [
self._parse_receipt_text(
r.get('text', ''), r.get('filename', 'receipt'),
expense_products=expense_products,
date_hint=r.get('date_from_name'),
)
for r in unique_receipts
]
raw_parsed = await asyncio.gather(*parse_tasks, return_exceptions=True)
# Normalise exceptions to fallback dicts
paired: list[tuple[dict, dict]] = []
for receipt, parsed in zip(unique_receipts, raw_parsed):
if isinstance(parsed, Exception):
logger.warning('expenses_agent: parse failed for %s: %s',
receipt.get('filename'), parsed)
parsed = {'vendor': receipt.get('filename', 'Expense'), 'amount': 0.0,
'date': receipt.get('date_from_name') or _date.today().isoformat(),
'time': None, 'product_name': ''}
paired.append((receipt, parsed))
# Pass 2: semantic dedup — detect multiple photos of the same receipt
deduped: list[tuple[dict, dict]] = []
dup_pairs: list[tuple[int, dict, dict]] = [] # (kept_idx, dup_receipt, dup_parsed)
for receipt, parsed in paired:
dup_idx = self._find_semantic_duplicate(parsed, deduped)
if dup_idx is not None:
dup_pairs.append((dup_idx, receipt, parsed))
# Tentatively keep whichever photo had more OCR text
if len(receipt.get('text', '')) > len(deduped[dup_idx][0].get('text', '')):
deduped[dup_idx] = (receipt, parsed)
else:
deduped.append((receipt, parsed))
# If duplicates were found and user hasn't decided yet, pause and ask
if dup_pairs and user_dup_decision == 'none':
self._gathered_data['mode'] = 'awaiting_dup_approval'
self._pending_dup_pairs = dup_pairs
self._deduped = deduped
return []
# Apply user's decision
if user_dup_decision == 'keep_all':
final_list = paired
else:
final_list = deduped # default: skip semantic duplicates
# Create the sheet now that we know what to include
sheet_name = f'Expense Report - {_date.today().isoformat()}'
sheet_result = await self._et.create_expense_sheet(sheet_name, employee_id)
if not sheet_result.success:
self._escalations_list.append(f'Failed to create expense sheet: {sheet_result.error}')
return []
sheet_id = sheet_result.record_id
actions = [f'Created expense sheet "{sheet_name}" (ID {sheet_id})']
for receipt, parsed in final_list:
# Pick product by name match returned from LLM, fall back to default
product_id = default_product_id
chosen_name = parsed.get('product_name', '')
if chosen_name:
for p in expense_products:
if p['name'].lower() == chosen_name.lower():
product_id = p['id']
break
expense_result = await self._et.create_expense(
sheet_id=sheet_id,
employee_id=employee_id,
name=str(parsed.get('vendor', receipt.get('filename', 'Expense')))[:64],
total_amount=float(parsed.get('amount', 0.0)),
date=str(parsed.get('date') or _date.today().isoformat()),
product_id=product_id,
)
if expense_result.success:
cat = product_map.get(product_id, 'Expense')
actions.append(
f"Added: {parsed.get('vendor', 'Unknown vendor')} "
f"${float(parsed.get('amount', 0)):.2f} "
f"({cat}) on {parsed.get('date', 'today')}"
)
if receipt.get('b64'):
await self._et.attach_receipt(
'hr.expense', expense_result.record_id,
receipt.get('filename', 'receipt'),
receipt['b64'],
receipt.get('mimetype', 'application/octet-stream'),
)
else:
actions.append(
f"Could not create expense for {receipt.get('filename', 'receipt')}: "
f"{expense_result.error}"
)
self._actions_taken = actions
return actions
@staticmethod
def _find_semantic_duplicate(parsed: dict, candidates: list) -> int | None:
"""
Return the index in `candidates` of a receipt that appears to be the
same physical receipt as `parsed`, or None if no match found.
Match criteria (all must pass):
1. Same date
2. Amount > 0 and within $0.05 of each other
3. Transaction times within 30 min of each other (if both present);
times > 30 min apart rule out a duplicate
4. Vendor name similarity >= 60 % (or both vendors are raw filenames)
"""
amt = float(parsed.get('amount', 0))
date = parsed.get('date', '')
time = parsed.get('time') # HH:MM or None
vendor = str(parsed.get('vendor', '')).lower().strip()
is_filename = vendor.endswith(('.jpg', '.jpeg', '.png', '.gif', '.webp', '.bmp'))
for idx, (_, other) in enumerate(candidates):
other_amt = float(other.get('amount', 0))
# Skip zero-amount receipts — too ambiguous to dedup
if amt == 0 or other_amt == 0:
continue
if abs(amt - other_amt) > 0.05:
continue
if date != other.get('date', ''):
continue
# Time check: if both receipts have a transaction time and they are
# more than 30 minutes apart they are different transactions.
other_time = other.get('time')
if time and other_time:
try:
h1, m1 = (int(p) for p in time.split(':')[:2])
h2, m2 = (int(p) for p in other_time.split(':')[:2])
if abs((h1 * 60 + m1) - (h2 * 60 + m2)) > 30:
continue
except Exception:
pass # unparseable time — ignore the signal
other_vendor = str(other.get('vendor', '')).lower().strip()
other_is_filename = other_vendor.endswith(
('.jpg', '.jpeg', '.png', '.gif', '.webp', '.bmp'))
if is_filename or other_is_filename:
# Same amount + date, no vendor text to compare — treat as dup
return idx
ratio = difflib.SequenceMatcher(None, vendor, other_vendor).ratio()
if ratio >= 0.6:
return idx
return None
async def _parse_receipt_text(self, text: str, filename: str,
expense_products: list = None,
date_hint: str = None) -> dict:
today = _date.today().isoformat()
fallback = {'vendor': filename, 'amount': 0.0,
'date': date_hint or today, 'time': None, 'product_name': ''}
ocr_failed = not text or text.startswith('[')
product_list = ''
if expense_products:
names = [p['name'] for p in expense_products]
product_list = ', '.join(f'"{n}"' for n in names)
if ocr_failed:
# No OCR text — still try to classify category from filename/date
if not product_list:
return fallback
prompt = (
f'A receipt photo named "{filename}" could not be read by OCR. '
f'Based only on the filename, pick the most likely expense category '
f'from this list: [{product_list}]. '
f'Return ONLY valid JSON: {{"product_name": "..."}}'
)
else:
prompt = (
'Extract expense details from the following receipt text. '
'Return ONLY valid JSON with these keys:\n'
'"vendor" (string, merchant or restaurant name),\n'
'"amount" (number — the FINAL total the customer paid; '
'this is labeled "Total", "Amount Due", "Grand Total", or the last dollar figure; '
'do NOT use subtotal, tax, or tip separately; '
'if multiple totals appear pick the largest one labeled as the final total),\n'
f'"date" (string YYYY-MM-DD, use {date_hint or today} if not found in text),\n'
'"time" (string HH:MM in 24-hour format — the transaction time printed on the receipt; '
'null if not present),\n'
f'"product_name" (string, pick the best match from [{product_list}] or empty string).\n\n'
f'Receipt text:\n{text[:2000]}\n\nJSON only:'
)
try:
resp = await self._llm.submit(
[{'role': 'user', 'content': prompt}],
caller='expenses_agent_receipt_parser',
)
raw = (resp.content or '').strip()
first, last = raw.find('{'), raw.rfind('}')
if first != -1 and last > first:
data = json.loads(raw[first:last + 1])
return {
'vendor': str(data.get('vendor', filename)),
'amount': float(data.get('amount', 0.0)),
'date': str(data.get('date') or date_hint or today),
'time': data.get('time') or None,
'product_name': str(data.get('product_name', '')),
}
except Exception as exc:
logger.warning('Receipt parse failed for %s: %s', filename, exc)
return fallback
async def _report(self) -> AgentReport:
data = self._gathered_data
directive_id = self._directive.directive_id if self._directive else ''
if data.get('mode') == 'awaiting_dup_approval':
dup_pairs = getattr(self, '_pending_dup_pairs', [])
deduped = getattr(self, '_deduped', [])
lines = [f'I found {len(dup_pairs)} suspected duplicate receipt photo(s). '
f'Please review before I create the expense report:\n']
for kept_idx, dup_receipt, dup_parsed in dup_pairs:
kept_receipt, kept_parsed = deduped[kept_idx]
vendor = (dup_parsed.get('vendor') or kept_parsed.get('vendor', 'Unknown'))
amount = float(dup_parsed.get('amount', 0))
dt = dup_parsed.get('date', '')
time_a = kept_parsed.get('time') or ''
time_b = dup_parsed.get('time') or ''
line = f'{vendor} ${amount:.2f} on {dt}'
if time_a or time_b:
line += f' (Photo A at {time_a or "?"}, Photo B at {time_b or "?"})'
line += (f'\n Photo A: {kept_receipt.get("filename", "?")}'
f'\n Photo B: {dup_receipt.get("filename", "?")}')
lines.append(line)
lines.append(
'\nReply "skip duplicates" to keep the clearest photo of each, '
'or "keep all" to include every photo as a separate expense.'
)
return AgentReport(
directive_id=directive_id, agent=self.name, status='complete',
summary='\n'.join(lines), data=data,
escalations=[], actions_taken=[])
if data.get('mode') == 'create_from_receipts':
if self._actions_taken:
lines = '\n'.join(f'{a}' for a in self._actions_taken)
summary = (
f'Expense report created successfully:\n{lines}\n\n'
'The report is in draft. Please open Odoo Expenses, '
'review the entries, and click Submit to send for approval.'
)
status = 'complete'
else:
summary = ('Could not create expense report. ' +
'; '.join(self._escalations_list or ['Unknown error']))
status = 'failed'
return AgentReport(
directive_id=directive_id, agent=self.name, status=status,
summary=summary, data=data,
escalations=self._escalations_list, actions_taken=self._actions_taken)
summary_data = data.get('summary', {})
parts = []
if summary_data:
parts.append(
f'Expenses: {summary_data.get("total_expenses", 0)} records, '
f'total ${summary_data.get("total_amount", 0):.2f}. '
f'{summary_data.get("pending_approval_count", 0)} pending approval.'
)
if not parts:
parts.append('Expenses review complete.')
return AgentReport(
directive_id=directive_id, agent=self.name, status='complete',
summary='\n'.join(parts), data=data,
escalations=self._escalations_list, actions_taken=[])
async def _dispatch_tool(self, name: str, args: dict):
dispatch = {
'get_expenses': self._et.get_expenses,
'get_expense_sheets': self._et.get_expense_sheets,
'get_pending_approvals': self._et.get_pending_approvals,
'approve_expense_sheet': self._et.approve_expense_sheet,
'get_expenses_summary': self._et.get_expenses_summary,
'get_expense_by_employee': self._et.get_expense_by_employee,
'flag_for_review': self._et.flag_for_review,
'post_chatter_note': self._et.post_chatter_note,
}
if name not in dispatch:
raise ValueError(f'Unknown tool: {name}')
return await dispatch[name](**args)
async def handle_peer_request(self, request: dict) -> dict:
req_type = request.get('type', '')
try:
if req_type == 'expenses_summary':
return await self._et.get_expenses_summary()
if req_type == 'employee_expenses':
return {'expenses': await self._et.get_expense_by_employee(
employee_id=request['employee_id'])}
return {'error': f'Unknown type: {req_type}'}
except Exception as exc:
return {'error': str(exc)}
async def sweep(self) -> SweepReport:
findings = []
try:
pending = await self._et.get_pending_approvals()
for sheet in pending:
emp = sheet.get('employee_id', [0, ''])
findings.append({
'type': 'pending_expense_approval',
'sheet_id': sheet.get('id'),
'employee': emp[1] if isinstance(emp, list) else '',
'amount': sheet.get('total_amount', 0),
'severity': 'low',
})
except Exception as exc:
return SweepReport(agent=self.name, findings=[], error=str(exc))
return SweepReport(agent=self.name, findings=findings, actions_taken=[],
summary=f'Expenses sweep: {len(findings)} pending approvals.')