Before writing any expense records the bot now posts a numbered table of parsed vendor/amount/date for every receipt, with duplicate entries flagged inline. User replies 'confirm' (skips dups) or 'confirm, keep all'. This catches OCR amount misreads before they land in Odoo. Also removes the separate awaiting_dup_approval step; duplicate review is now part of the single confirmation table. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
495 lines
23 KiB
Python
495 lines
23 KiB
Python
from __future__ import annotations
|
||
import asyncio
|
||
import difflib
|
||
import json
|
||
import logging
|
||
from datetime import date as _date
|
||
from .base_agent import BaseAgent, AgentReport, AgentDirective, SweepReport
|
||
from ..tools.expenses_tools import ExpensesTools
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
EXPENSES_TOOLS = [
|
||
{'name': 'get_expenses', 'description': 'Retrieve expense records',
|
||
'parameters': {'employee_id': {'type': 'integer', 'optional': True},
|
||
'state': {'type': 'string', 'optional': True},
|
||
'date_from': {'type': 'string', 'optional': True},
|
||
'date_to': {'type': 'string', 'optional': True},
|
||
'limit': {'type': 'integer', 'optional': True}}},
|
||
{'name': 'get_expense_sheets', 'description': 'Get expense report sheets',
|
||
'parameters': {'state': {'type': 'string', 'optional': True},
|
||
'employee_id': {'type': 'integer', 'optional': True},
|
||
'limit': {'type': 'integer', 'optional': True}}},
|
||
{'name': 'get_pending_approvals', 'description': 'Get expense sheets pending approval',
|
||
'parameters': {}},
|
||
{'name': 'approve_expense_sheet', 'description': 'Approve an expense sheet',
|
||
'parameters': {'sheet_id': {'type': 'integer'}}},
|
||
{'name': 'get_expenses_summary', 'description': 'Get expense summary for a period',
|
||
'parameters': {'date_from': {'type': 'string', 'optional': True},
|
||
'date_to': {'type': 'string', 'optional': True}}},
|
||
{'name': 'get_expense_by_employee', 'description': 'Get expenses for a specific employee',
|
||
'parameters': {'employee_id': {'type': 'integer'},
|
||
'limit': {'type': 'integer', 'optional': True}}},
|
||
{'name': 'flag_for_review', 'description': 'Flag an expense for review',
|
||
'parameters': {'model': {'type': 'string'}, 'record_id': {'type': 'integer'},
|
||
'reason': {'type': 'string'},
|
||
'severity': {'type': 'string', 'optional': True}}},
|
||
{'name': 'post_chatter_note', 'description': 'Post a note on a record',
|
||
'parameters': {'model': {'type': 'string'}, 'record_id': {'type': 'integer'},
|
||
'note': {'type': 'string'}}},
|
||
]
|
||
|
||
|
||
class ExpensesAgent(BaseAgent):
|
||
name = 'expenses_agent'
|
||
domain = 'expenses'
|
||
required_odoo_module = 'hr_expense'
|
||
system_prompt_file = 'expenses_system.txt'
|
||
tools = EXPENSES_TOOLS
|
||
|
||
def __init__(self, odoo, llm, peer_bus=None):
|
||
super().__init__(odoo, llm, peer_bus)
|
||
self._et = ExpensesTools(odoo)
|
||
self._gathered_data: dict = {}
|
||
self._actions_taken: list = []
|
||
self._escalations_list: list = []
|
||
|
||
async def _plan(self) -> dict:
|
||
task = (self._directive.task if self._directive else '').lower()
|
||
receipts = getattr(self._directive.context, 'receipts', []) if self._directive else []
|
||
|
||
# The master LLM rewrites the user message into intent_summary (task).
|
||
# Also check the original raw_message threaded through peer_data so
|
||
# short replies like "skip duplicates" are detected even when rewritten.
|
||
raw_msg = ''
|
||
if self._directive and self._directive.context:
|
||
raw_msg = (self._directive.context.peer_data.get('raw_message') or '').lower()
|
||
combined = task + ' ' + raw_msg
|
||
|
||
# Detect whether the user is responding to a duplicate-approval request
|
||
skip_keywords = ('skip', 'remove duplicate', 'exclude duplicate', 'drop duplicate')
|
||
keep_keywords = ('keep all', 'keep both', 'include all', 'no skip', "don't skip")
|
||
confirm_keywords = ('confirm', 'looks good', 'go ahead', 'proceed', 'create it', 'create them')
|
||
if any(k in combined for k in keep_keywords):
|
||
user_dup_decision = 'keep_all'
|
||
elif any(k in combined for k in skip_keywords):
|
||
user_dup_decision = 'skip'
|
||
else:
|
||
user_dup_decision = 'skip' # default: skip duplicates when confirmed
|
||
|
||
user_confirmed = any(k in combined for k in confirm_keywords)
|
||
|
||
return {
|
||
'mode': 'create_from_receipts' if receipts else 'read',
|
||
'user_dup_decision': user_dup_decision,
|
||
'user_confirmed': user_confirmed,
|
||
'fetch_summary': any(k in task for k in ('summary', 'overview')) and not receipts,
|
||
'fetch_pending': any(k in task for k in ('pending', 'approve', 'approval')) and not receipts,
|
||
'employee_id': self._directive.params.get('employee_id') if self._directive else None,
|
||
'date_from': self._directive.params.get('date_from') if self._directive else None,
|
||
'date_to': self._directive.params.get('date_to') if self._directive else None,
|
||
}
|
||
|
||
async def _gather(self, plan: dict) -> dict:
|
||
data: dict = {'mode': plan.get('mode', 'read'),
|
||
'user_dup_decision': plan.get('user_dup_decision', 'skip'),
|
||
'user_confirmed': plan.get('user_confirmed', False)}
|
||
if plan.get('mode') == 'create_from_receipts':
|
||
self._gathered_data = data
|
||
return data
|
||
data['summary'] = await self._et.get_expenses_summary(
|
||
date_from=plan.get('date_from'), date_to=plan.get('date_to'),
|
||
)
|
||
if plan.get('fetch_pending'):
|
||
data['pending'] = await self._et.get_pending_approvals()
|
||
self._gathered_data = data
|
||
return data
|
||
|
||
async def _reason(self) -> dict:
|
||
data = self._gathered_data
|
||
analysis: dict = {'escalations': [], 'flags': []}
|
||
if data.get('mode') == 'create_from_receipts':
|
||
self._escalations_list = []
|
||
return analysis
|
||
summary = data.get('summary', {})
|
||
if summary.get('pending_approval_count', 0) > 10:
|
||
analysis['escalations'].append(
|
||
f'{summary["pending_approval_count"]} expense sheets pending approval.'
|
||
)
|
||
self._escalations_list = analysis['escalations']
|
||
return analysis
|
||
|
||
async def _act(self, reasoning: dict) -> list:
|
||
if self._gathered_data.get('mode') != 'create_from_receipts':
|
||
return []
|
||
receipts = getattr(self._directive.context, 'receipts', []) if self._directive else []
|
||
if not receipts:
|
||
return []
|
||
|
||
user_dup_decision = self._gathered_data.get('user_dup_decision', 'skip')
|
||
user_confirmed = self._gathered_data.get('user_confirmed', False)
|
||
|
||
user_id = (self._directive.context.peer_data.get('requesting_user_id')
|
||
if self._directive else None)
|
||
employee_id = await self._et.get_employee_id_for_user(user_id)
|
||
if not employee_id:
|
||
self._escalations_list.append(
|
||
'No employee record found for the current user; cannot create expense report.')
|
||
return []
|
||
|
||
expense_products = await self._et.get_expense_products()
|
||
default_product_id = expense_products[0]['id'] if expense_products else None
|
||
product_map = {p['id']: p['name'] for p in expense_products}
|
||
|
||
# Pass 1: byte-exact dedup
|
||
seen_hashes: set = set()
|
||
unique_receipts = []
|
||
for r in receipts:
|
||
h = r.get('sha256')
|
||
if h and h in seen_hashes:
|
||
logger.info('expenses_agent: skipping byte-identical receipt %s', r.get('filename'))
|
||
continue
|
||
if h:
|
||
seen_hashes.add(h)
|
||
unique_receipts.append(r)
|
||
|
||
# Parse all receipts concurrently
|
||
parse_tasks = [
|
||
self._parse_receipt_text(
|
||
r.get('text', ''), r.get('filename', 'receipt'),
|
||
expense_products=expense_products,
|
||
date_hint=r.get('date_from_name'),
|
||
)
|
||
for r in unique_receipts
|
||
]
|
||
raw_parsed = await asyncio.gather(*parse_tasks, return_exceptions=True)
|
||
|
||
paired: list[tuple[dict, dict]] = []
|
||
for receipt, parsed in zip(unique_receipts, raw_parsed):
|
||
if isinstance(parsed, Exception):
|
||
logger.warning('expenses_agent: parse failed for %s: %s',
|
||
receipt.get('filename'), parsed)
|
||
parsed = {'vendor': receipt.get('filename', 'Expense'), 'amount': 0.0,
|
||
'date': receipt.get('date_from_name') or _date.today().isoformat(),
|
||
'time': None, 'product_name': ''}
|
||
paired.append((receipt, parsed))
|
||
|
||
# Pass 2: semantic dedup
|
||
deduped: list[tuple[dict, dict]] = []
|
||
dup_indices: set[int] = set() # indices into `paired` that are duplicates
|
||
for i, (receipt, parsed) in enumerate(paired):
|
||
dup_idx = self._find_semantic_duplicate(parsed, deduped)
|
||
if dup_idx is not None:
|
||
dup_indices.add(i)
|
||
if len(receipt.get('text', '')) > len(deduped[dup_idx][0].get('text', '')):
|
||
deduped[dup_idx] = (receipt, parsed)
|
||
else:
|
||
deduped.append((receipt, parsed))
|
||
|
||
# Always show confirmation summary before creating — lets user verify
|
||
# parsed amounts and review flagged duplicates in one step.
|
||
if not user_confirmed:
|
||
self._gathered_data['mode'] = 'awaiting_confirmation'
|
||
self._confirmation_items = [
|
||
(receipt, parsed, i in dup_indices)
|
||
for i, (receipt, parsed) in enumerate(paired)
|
||
]
|
||
self._deduped = deduped
|
||
return []
|
||
|
||
# User confirmed — apply dup decision
|
||
final_list = paired if user_dup_decision == 'keep_all' else deduped
|
||
|
||
sheet_name = f'Expense Report - {_date.today().isoformat()}'
|
||
sheet_result = await self._et.create_expense_sheet(sheet_name, employee_id)
|
||
if not sheet_result.success:
|
||
self._escalations_list.append(f'Failed to create expense sheet: {sheet_result.error}')
|
||
return []
|
||
|
||
sheet_id = sheet_result.record_id
|
||
actions = [f'Created expense sheet "{sheet_name}" (ID {sheet_id})']
|
||
|
||
for receipt, parsed in final_list:
|
||
product_id = default_product_id
|
||
chosen_name = parsed.get('product_name', '')
|
||
if chosen_name:
|
||
for p in expense_products:
|
||
if p['name'].lower() == chosen_name.lower():
|
||
product_id = p['id']
|
||
break
|
||
|
||
expense_result = await self._et.create_expense(
|
||
sheet_id=sheet_id,
|
||
employee_id=employee_id,
|
||
name=str(parsed.get('vendor', receipt.get('filename', 'Expense')))[:64],
|
||
total_amount=float(parsed.get('amount', 0.0)),
|
||
date=str(parsed.get('date') or _date.today().isoformat()),
|
||
product_id=product_id,
|
||
)
|
||
if expense_result.success:
|
||
cat = product_map.get(product_id, 'Expense')
|
||
actions.append(
|
||
f"Added: {parsed.get('vendor', 'Unknown vendor')} "
|
||
f"${float(parsed.get('amount', 0)):.2f} "
|
||
f"({cat}) on {parsed.get('date', 'today')}"
|
||
)
|
||
if receipt.get('b64'):
|
||
await self._et.attach_receipt(
|
||
'hr.expense', expense_result.record_id,
|
||
receipt.get('filename', 'receipt'),
|
||
receipt['b64'],
|
||
receipt.get('mimetype', 'application/octet-stream'),
|
||
)
|
||
else:
|
||
actions.append(
|
||
f"Could not create expense for {receipt.get('filename', 'receipt')}: "
|
||
f"{expense_result.error}"
|
||
)
|
||
|
||
self._actions_taken = actions
|
||
return actions
|
||
|
||
@staticmethod
|
||
def _find_semantic_duplicate(parsed: dict, candidates: list) -> int | None:
|
||
"""
|
||
Return the index in `candidates` of a receipt that appears to be the
|
||
same physical receipt as `parsed`, or None if no match found.
|
||
|
||
Pass 1 — exact-amount match (all must pass):
|
||
1. Same date
|
||
2. Amount > 0 and within $0.05 of each other
|
||
3. Transaction times within 30 min (if both present)
|
||
4. Vendor similarity >= 60 % (or both vendors are raw filenames)
|
||
|
||
Pass 2 — OCR-error match (amount may differ due to misread):
|
||
1. Same date
|
||
2. Both amounts > 0
|
||
3. Vendor similarity >= 80 % (stricter threshold compensates for loose amount)
|
||
4. Times within 30 min (if both present)
|
||
"""
|
||
amt = float(parsed.get('amount', 0))
|
||
date = parsed.get('date', '')
|
||
time = parsed.get('time') # HH:MM or None
|
||
vendor = str(parsed.get('vendor', '')).lower().strip()
|
||
is_filename = vendor.endswith(('.jpg', '.jpeg', '.png', '.gif', '.webp', '.bmp'))
|
||
|
||
def _times_compatible(t1, t2) -> bool:
|
||
"""Return False only when both times are present and >30 min apart."""
|
||
if not (t1 and t2):
|
||
return True
|
||
try:
|
||
h1, m1 = (int(p) for p in t1.split(':')[:2])
|
||
h2, m2 = (int(p) for p in t2.split(':')[:2])
|
||
return abs((h1 * 60 + m1) - (h2 * 60 + m2)) <= 30
|
||
except Exception:
|
||
return True
|
||
|
||
# Pass 1: amount must match within $0.05
|
||
for idx, (_, other) in enumerate(candidates):
|
||
other_amt = float(other.get('amount', 0))
|
||
if amt == 0 or other_amt == 0:
|
||
continue
|
||
if abs(amt - other_amt) > 0.05:
|
||
continue
|
||
if date != other.get('date', ''):
|
||
continue
|
||
if not _times_compatible(time, other.get('time')):
|
||
continue
|
||
other_vendor = str(other.get('vendor', '')).lower().strip()
|
||
other_is_filename = other_vendor.endswith(
|
||
('.jpg', '.jpeg', '.png', '.gif', '.webp', '.bmp'))
|
||
if is_filename or other_is_filename:
|
||
return idx
|
||
if difflib.SequenceMatcher(None, vendor, other_vendor).ratio() >= 0.6:
|
||
return idx
|
||
|
||
# Pass 2: same vendor + same date even when amounts differ (OCR misread)
|
||
if not is_filename:
|
||
for idx, (_, other) in enumerate(candidates):
|
||
other_amt = float(other.get('amount', 0))
|
||
if amt == 0 or other_amt == 0:
|
||
continue
|
||
if date != other.get('date', ''):
|
||
continue
|
||
if not _times_compatible(time, other.get('time')):
|
||
continue
|
||
other_vendor = str(other.get('vendor', '')).lower().strip()
|
||
if other_vendor.endswith(('.jpg', '.jpeg', '.png', '.gif', '.webp', '.bmp')):
|
||
continue
|
||
if difflib.SequenceMatcher(None, vendor, other_vendor).ratio() >= 0.80:
|
||
return idx
|
||
|
||
return None
|
||
|
||
async def _parse_receipt_text(self, text: str, filename: str,
|
||
expense_products: list = None,
|
||
date_hint: str = None) -> dict:
|
||
today = _date.today().isoformat()
|
||
fallback = {'vendor': filename, 'amount': 0.0,
|
||
'date': date_hint or today, 'time': None, 'product_name': ''}
|
||
ocr_failed = not text or text.startswith('[')
|
||
|
||
product_list = ''
|
||
if expense_products:
|
||
names = [p['name'] for p in expense_products]
|
||
product_list = ', '.join(f'"{n}"' for n in names)
|
||
|
||
if ocr_failed:
|
||
# No OCR text — still try to classify category from filename/date
|
||
if not product_list:
|
||
return fallback
|
||
prompt = (
|
||
f'A receipt photo named "{filename}" could not be read by OCR. '
|
||
f'Based only on the filename, pick the most likely expense category '
|
||
f'from this list: [{product_list}]. '
|
||
f'Return ONLY valid JSON: {{"product_name": "..."}}'
|
||
)
|
||
else:
|
||
prompt = (
|
||
'Extract expense details from the following receipt text. '
|
||
'Return ONLY valid JSON with these keys:\n'
|
||
'"vendor" (string, merchant or restaurant name),\n'
|
||
'"amount" (number — the FINAL total the customer paid; '
|
||
'look for a line explicitly labeled "Total", "Grand Total", "Amount Due", or "Balance Due"; '
|
||
'do NOT use subtotal, tax, tip, or individual line items; '
|
||
'if the label is ambiguous choose the bottom-most total on the receipt; '
|
||
'return 0 if no clear total is found),\n'
|
||
f'"date" (string YYYY-MM-DD, use {date_hint or today} if not found in text),\n'
|
||
'"time" (string HH:MM in 24-hour format — the transaction time printed on the receipt; '
|
||
'null if not present),\n'
|
||
f'"product_name" (string, pick the best match from [{product_list}] or empty string).\n\n'
|
||
f'Receipt text:\n{text[:2000]}\n\nJSON only:'
|
||
)
|
||
try:
|
||
resp = await self._llm.submit(
|
||
[{'role': 'user', 'content': prompt}],
|
||
caller='expenses_agent_receipt_parser',
|
||
)
|
||
raw = (resp.content or '').strip()
|
||
first, last = raw.find('{'), raw.rfind('}')
|
||
if first != -1 and last > first:
|
||
data = json.loads(raw[first:last + 1])
|
||
return {
|
||
'vendor': str(data.get('vendor', filename)),
|
||
'amount': float(data.get('amount', 0.0)),
|
||
'date': str(data.get('date') or date_hint or today),
|
||
'time': data.get('time') or None,
|
||
'product_name': str(data.get('product_name', '')),
|
||
}
|
||
except Exception as exc:
|
||
logger.warning('Receipt parse failed for %s: %s', filename, exc)
|
||
return fallback
|
||
|
||
async def _report(self) -> AgentReport:
|
||
data = self._gathered_data
|
||
directive_id = self._directive.directive_id if self._directive else ''
|
||
|
||
if data.get('mode') == 'awaiting_confirmation':
|
||
items = getattr(self, '_confirmation_items', [])
|
||
n_dups = sum(1 for _, _, is_dup in items if is_dup)
|
||
lines = [f'I parsed {len(items)} receipt(s). Please review before I create the expense report:\n']
|
||
lines.append(f' {"#":>3} {"Vendor":<30} {"Amount":>8} {"Date":<12}')
|
||
lines.append(f' {"---":>3} {"-"*30} {"-"*8} {"-"*12}')
|
||
for i, (receipt, parsed, is_dup) in enumerate(items, 1):
|
||
vendor = str(parsed.get('vendor') or receipt.get('filename', '?'))[:30]
|
||
amt = float(parsed.get('amount') or 0)
|
||
dt = str(parsed.get('date') or '')
|
||
flag = ' !! duplicate' if is_dup else ''
|
||
lines.append(f' {i:>3}. {vendor:<30} ${amt:>7.2f} {dt}{flag}')
|
||
lines.append('')
|
||
if n_dups:
|
||
lines.append(
|
||
f'{n_dups} item(s) marked "!! duplicate" appear to be the same receipt '
|
||
f'as another entry (possibly an OCR amount mismatch).'
|
||
)
|
||
lines.append(
|
||
'Reply "confirm" to create the report and exclude duplicates (recommended).'
|
||
)
|
||
lines.append(
|
||
'Reply "confirm, keep all" to include every item even if duplicated.'
|
||
)
|
||
else:
|
||
lines.append('Reply "confirm" to create the expense report.')
|
||
return AgentReport(
|
||
directive_id=directive_id, agent=self.name, status='complete',
|
||
summary='\n'.join(lines), data=data,
|
||
escalations=[], actions_taken=[])
|
||
|
||
if data.get('mode') == 'create_from_receipts':
|
||
if self._actions_taken:
|
||
lines = '\n'.join(f' • {a}' for a in self._actions_taken)
|
||
summary = (
|
||
f'Expense report created successfully:\n{lines}\n\n'
|
||
'The report is in draft. Please open Odoo › Expenses, '
|
||
'review the entries, and click Submit to send for approval.'
|
||
)
|
||
status = 'complete'
|
||
else:
|
||
summary = ('Could not create expense report. ' +
|
||
'; '.join(self._escalations_list or ['Unknown error']))
|
||
status = 'failed'
|
||
return AgentReport(
|
||
directive_id=directive_id, agent=self.name, status=status,
|
||
summary=summary, data=data,
|
||
escalations=self._escalations_list, actions_taken=self._actions_taken)
|
||
|
||
summary_data = data.get('summary', {})
|
||
parts = []
|
||
if summary_data:
|
||
parts.append(
|
||
f'Expenses: {summary_data.get("total_expenses", 0)} records, '
|
||
f'total ${summary_data.get("total_amount", 0):.2f}. '
|
||
f'{summary_data.get("pending_approval_count", 0)} pending approval.'
|
||
)
|
||
if not parts:
|
||
parts.append('Expenses review complete.')
|
||
return AgentReport(
|
||
directive_id=directive_id, agent=self.name, status='complete',
|
||
summary='\n'.join(parts), data=data,
|
||
escalations=self._escalations_list, actions_taken=[])
|
||
|
||
async def _dispatch_tool(self, name: str, args: dict):
|
||
dispatch = {
|
||
'get_expenses': self._et.get_expenses,
|
||
'get_expense_sheets': self._et.get_expense_sheets,
|
||
'get_pending_approvals': self._et.get_pending_approvals,
|
||
'approve_expense_sheet': self._et.approve_expense_sheet,
|
||
'get_expenses_summary': self._et.get_expenses_summary,
|
||
'get_expense_by_employee': self._et.get_expense_by_employee,
|
||
'flag_for_review': self._et.flag_for_review,
|
||
'post_chatter_note': self._et.post_chatter_note,
|
||
}
|
||
if name not in dispatch:
|
||
raise ValueError(f'Unknown tool: {name}')
|
||
return await dispatch[name](**args)
|
||
|
||
async def handle_peer_request(self, request: dict) -> dict:
|
||
req_type = request.get('type', '')
|
||
try:
|
||
if req_type == 'expenses_summary':
|
||
return await self._et.get_expenses_summary()
|
||
if req_type == 'employee_expenses':
|
||
return {'expenses': await self._et.get_expense_by_employee(
|
||
employee_id=request['employee_id'])}
|
||
return {'error': f'Unknown type: {req_type}'}
|
||
except Exception as exc:
|
||
return {'error': str(exc)}
|
||
|
||
async def sweep(self) -> SweepReport:
|
||
findings = []
|
||
try:
|
||
pending = await self._et.get_pending_approvals()
|
||
for sheet in pending:
|
||
emp = sheet.get('employee_id', [0, ''])
|
||
findings.append({
|
||
'type': 'pending_expense_approval',
|
||
'sheet_id': sheet.get('id'),
|
||
'employee': emp[1] if isinstance(emp, list) else '',
|
||
'amount': sheet.get('total_amount', 0),
|
||
'severity': 'low',
|
||
})
|
||
except Exception as exc:
|
||
return SweepReport(agent=self.name, findings=[], error=str(exc))
|
||
return SweepReport(agent=self.name, findings=findings, actions_taken=[],
|
||
summary=f'Expenses sweep: {len(findings)} pending approvals.')
|