Files
odoo-ai/agent_service/agents/expenses_agent.py
Carlos Garcia 4b7223a139 feat: file upload + expense report creation from Discuss attachments
- Discuss bot now reads ir.attachment from incoming messages; file-only
  messages no longer silently dropped
- ZIP files are described (contents listed) and bot asks clarifying
  question before acting; user's follow-up reply looks back for pending
  attachments so files don't need to be re-uploaded
- receipt_parser: extracts text from ZIP (recursive), JPG/PNG/etc (OCR),
  PDF (pdfplumber), HTML, TXT
- expenses_agent: full rewrite fixing broken method signatures; adds
  create_expense_sheet / create_expense / attach_receipt flow driven by
  LLM receipt parsing (Ollama, HIPAA-locked)
- master_agent: extra_context threads receipts + user_id into directives
- FastAPI /upload multipart endpoint; registered in main.py
- Odoo /ai/upload controller proxies files to agent service
- ab_ai_bot: dispatch_message_with_files() for multipart uploads

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-16 01:02:24 -04:00

268 lines
12 KiB
Python

from __future__ import annotations
import json
import logging
from datetime import date as _date
from .base_agent import BaseAgent, AgentReport, AgentDirective, SweepReport
from ..tools.expenses_tools import ExpensesTools
logger = logging.getLogger(__name__)
EXPENSES_TOOLS = [
{'name': 'get_expenses', 'description': 'Retrieve expense records',
'parameters': {'employee_id': {'type': 'integer', 'optional': True},
'state': {'type': 'string', 'optional': True},
'date_from': {'type': 'string', 'optional': True},
'date_to': {'type': 'string', 'optional': True},
'limit': {'type': 'integer', 'optional': True}}},
{'name': 'get_expense_sheets', 'description': 'Get expense report sheets',
'parameters': {'state': {'type': 'string', 'optional': True},
'employee_id': {'type': 'integer', 'optional': True},
'limit': {'type': 'integer', 'optional': True}}},
{'name': 'get_pending_approvals', 'description': 'Get expense sheets pending approval',
'parameters': {}},
{'name': 'approve_expense_sheet', 'description': 'Approve an expense sheet',
'parameters': {'sheet_id': {'type': 'integer'}}},
{'name': 'get_expenses_summary', 'description': 'Get expense summary for a period',
'parameters': {'date_from': {'type': 'string', 'optional': True},
'date_to': {'type': 'string', 'optional': True}}},
{'name': 'get_expense_by_employee', 'description': 'Get expenses for a specific employee',
'parameters': {'employee_id': {'type': 'integer'},
'limit': {'type': 'integer', 'optional': True}}},
{'name': 'flag_for_review', 'description': 'Flag an expense for review',
'parameters': {'model': {'type': 'string'}, 'record_id': {'type': 'integer'},
'reason': {'type': 'string'},
'severity': {'type': 'string', 'optional': True}}},
{'name': 'post_chatter_note', 'description': 'Post a note on a record',
'parameters': {'model': {'type': 'string'}, 'record_id': {'type': 'integer'},
'note': {'type': 'string'}}},
]
class ExpensesAgent(BaseAgent):
name = 'expenses_agent'
domain = 'expenses'
required_odoo_module = 'hr_expense'
system_prompt_file = 'expenses_system.txt'
tools = EXPENSES_TOOLS
def __init__(self, odoo, llm, peer_bus=None):
super().__init__(odoo, llm, peer_bus)
self._et = ExpensesTools(odoo)
self._gathered_data: dict = {}
self._actions_taken: list = []
self._escalations_list: list = []
async def _plan(self) -> dict:
task = (self._directive.task if self._directive else '').lower()
receipts = getattr(self._directive.context, 'receipts', []) if self._directive else []
return {
'mode': 'create_from_receipts' if receipts else 'read',
'fetch_summary': any(k in task for k in ('summary', 'overview')) and not receipts,
'fetch_pending': any(k in task for k in ('pending', 'approve', 'approval')) and not receipts,
'employee_id': self._directive.params.get('employee_id') if self._directive else None,
'date_from': self._directive.params.get('date_from') if self._directive else None,
'date_to': self._directive.params.get('date_to') if self._directive else None,
}
async def _gather(self, plan: dict) -> dict:
data: dict = {'mode': plan.get('mode', 'read')}
if plan.get('mode') == 'create_from_receipts':
self._gathered_data = data
return data
data['summary'] = await self._et.get_expenses_summary(
date_from=plan.get('date_from'), date_to=plan.get('date_to'),
)
if plan.get('fetch_pending'):
data['pending'] = await self._et.get_pending_approvals()
self._gathered_data = data
return data
async def _reason(self) -> dict:
data = self._gathered_data
analysis: dict = {'escalations': [], 'flags': []}
if data.get('mode') == 'create_from_receipts':
self._escalations_list = []
return analysis
summary = data.get('summary', {})
if summary.get('pending_approval_count', 0) > 10:
analysis['escalations'].append(
f'{summary["pending_approval_count"]} expense sheets pending approval.'
)
self._escalations_list = analysis['escalations']
return analysis
async def _act(self, reasoning: dict) -> list:
if self._gathered_data.get('mode') != 'create_from_receipts':
return []
receipts = getattr(self._directive.context, 'receipts', []) if self._directive else []
if not receipts:
return []
user_id = (self._directive.context.peer_data.get('requesting_user_id')
if self._directive else None)
employee_id = await self._et.get_employee_id_for_user(user_id)
if not employee_id:
self._escalations_list.append(
'No employee record found for the current user; cannot create expense report.')
return []
sheet_name = f'Expense Report - {_date.today().isoformat()}'
sheet_result = await self._et.create_expense_sheet(sheet_name, employee_id)
if not sheet_result.success:
self._escalations_list.append(f'Failed to create expense sheet: {sheet_result.error}')
return []
sheet_id = sheet_result.record_id
actions = [f'Created expense sheet "{sheet_name}" (ID {sheet_id})']
product_id = await self._et.get_default_expense_product()
for receipt in receipts:
parsed = await self._parse_receipt_text(
receipt.get('text', ''), receipt.get('filename', 'receipt'))
expense_result = await self._et.create_expense(
sheet_id=sheet_id,
employee_id=employee_id,
name=str(parsed.get('vendor', receipt.get('filename', 'Expense')))[:64],
total_amount=float(parsed.get('amount', 0.0)),
date=str(parsed.get('date') or _date.today().isoformat()),
product_id=product_id,
description=str(parsed.get('description', '')),
)
if expense_result.success:
actions.append(
f"Added: {parsed.get('vendor', 'Unknown vendor')} "
f"${float(parsed.get('amount', 0)):.2f} "
f"on {parsed.get('date', 'today')}"
)
if receipt.get('b64'):
await self._et.attach_receipt(
'hr.expense', expense_result.record_id,
receipt.get('filename', 'receipt'),
receipt['b64'],
receipt.get('mimetype', 'application/octet-stream'),
)
else:
actions.append(
f"Could not create expense for {receipt.get('filename', 'receipt')}: "
f"{expense_result.error}"
)
self._actions_taken = actions
return actions
async def _parse_receipt_text(self, text: str, filename: str) -> dict:
fallback = {'vendor': filename, 'amount': 0.0,
'date': _date.today().isoformat(), 'description': filename}
if not text or text.startswith('['):
return fallback
prompt = (
'Extract expense details from the following receipt text. '
'Return ONLY valid JSON with these keys: '
'"vendor" (string), "amount" (number, the total charged), '
'"date" (string YYYY-MM-DD, use today if absent), '
'"description" (string, brief expense type).\n\n'
f'Receipt text (first 2000 chars):\n{text[:2000]}\n\nJSON only:'
)
try:
resp = await self._llm.submit(
[{'role': 'user', 'content': prompt}],
caller='expenses_agent_receipt_parser',
)
raw = (resp.content or '').strip()
first, last = raw.find('{'), raw.rfind('}')
if first != -1 and last > first:
data = json.loads(raw[first:last + 1])
return {
'vendor': str(data.get('vendor', filename)),
'amount': float(data.get('amount', 0.0)),
'date': str(data.get('date', _date.today().isoformat())),
'description': str(data.get('description', '')),
}
except Exception as exc:
logger.warning('Receipt parse failed for %s: %s', filename, exc)
return fallback
async def _report(self) -> AgentReport:
data = self._gathered_data
directive_id = self._directive.directive_id if self._directive else ''
if data.get('mode') == 'create_from_receipts':
if self._actions_taken:
lines = '\n'.join(f'{a}' for a in self._actions_taken)
summary = (
f'Expense report created successfully:\n{lines}\n\n'
'The report is in draft. Please open Odoo > Expenses, '
'review the entries, and click Submit to send for approval.'
)
status = 'complete'
else:
summary = ('Could not create expense report. ' +
'; '.join(self._escalations_list or ['Unknown error']))
status = 'failed'
return AgentReport(
directive_id=directive_id, agent=self.name, status=status,
summary=summary, data=data,
escalations=self._escalations_list, actions_taken=self._actions_taken)
summary_data = data.get('summary', {})
parts = []
if summary_data:
parts.append(
f'Expenses: {summary_data.get("total_expenses", 0)} records, '
f'total ${summary_data.get("total_amount", 0):.2f}. '
f'{summary_data.get("pending_approval_count", 0)} pending approval.'
)
if not parts:
parts.append('Expenses review complete.')
return AgentReport(
directive_id=directive_id, agent=self.name, status='complete',
summary='\n'.join(parts), data=data,
escalations=self._escalations_list, actions_taken=[])
async def _dispatch_tool(self, name: str, args: dict):
dispatch = {
'get_expenses': self._et.get_expenses,
'get_expense_sheets': self._et.get_expense_sheets,
'get_pending_approvals': self._et.get_pending_approvals,
'approve_expense_sheet': self._et.approve_expense_sheet,
'get_expenses_summary': self._et.get_expenses_summary,
'get_expense_by_employee': self._et.get_expense_by_employee,
'flag_for_review': self._et.flag_for_review,
'post_chatter_note': self._et.post_chatter_note,
}
if name not in dispatch:
raise ValueError(f'Unknown tool: {name}')
return await dispatch[name](**args)
async def handle_peer_request(self, request: dict) -> dict:
req_type = request.get('type', '')
try:
if req_type == 'expenses_summary':
return await self._et.get_expenses_summary()
if req_type == 'employee_expenses':
return {'expenses': await self._et.get_expense_by_employee(
employee_id=request['employee_id'])}
return {'error': f'Unknown type: {req_type}'}
except Exception as exc:
return {'error': str(exc)}
async def sweep(self) -> SweepReport:
findings = []
try:
pending = await self._et.get_pending_approvals()
for sheet in pending:
emp = sheet.get('employee_id', [0, ''])
findings.append({
'type': 'pending_expense_approval',
'sheet_id': sheet.get('id'),
'employee': emp[1] if isinstance(emp, list) else '',
'amount': sheet.get('total_amount', 0),
'severity': 'low',
})
except Exception as exc:
return SweepReport(agent=self.name, findings=[], error=str(exc))
return SweepReport(agent=self.name, findings=findings, actions_taken=[],
summary=f'Expenses sweep: {len(findings)} pending approvals.')