Files
odoo-ai/agent_service/agents/expenses_agent.py
Carlos Garcia 1536d83376 Improve OCR preprocessing and amount extraction robustness
Image preprocessing (receipt_parser.py):
- Add ImageOps.exif_transpose() — fixes portrait photos stored with EXIF
  rotation metadata (most phone photos); without this Tesseract reads a
  rotated image and produces garbage
- Upscale images < 600px wide for better character recognition
- Raise binarization threshold 140→160 for faint thermal-print receipts
- Try PSM 6 (single text block) before PSM 4, PSM 11 as fallbacks;
  PSM 6 is better suited to single-column receipt layout

Amount extraction (expenses_agent.py):
- Add Pass 2 bottom-of-receipt line scan when labeled Total: regex fails;
  reads lines bottom-to-top in the last 50% of text, skipping change/tip
  lines — handles 'T0TAL' OCR misread and amount-on-next-line layout
- Add _SKIP_LINE_RE and _ANY_DOLLAR_RE module-level patterns
- 8 new tests covering garbled total, change-skip, USD suffix, etc.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-20 23:33:38 -04:00

585 lines
27 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import asyncio
import difflib
import json
import logging
import re
from datetime import date as _date
from .base_agent import BaseAgent, AgentReport, AgentDirective, SweepReport
from ..tools.expenses_tools import ExpensesTools
# ---------------------------------------------------------------------------
# Receipt OCR helpers — regex-based, deterministic extraction
# ---------------------------------------------------------------------------
# Matches an explicitly labeled total line.
# Handles "Total: $22.46", "GRAND TOTAL 22.46", "Amount Due: 22.46", etc.
_TOTAL_RE = re.compile(
r'(?:grand\s*total|total\s*due|amount\s*due|balance\s*due|'
r'total\s*amount|total\s*charged|you\s*paid|amount\s*paid|total)'
r'\s*[:\-]?\s*\$?\s*([\d,]+\.\d{2})',
re.IGNORECASE,
)
# Lines printed AFTER the total (change given, tip, etc.) — skip these
# when doing the bottom-of-receipt scan so we don't mistake them for the total.
_SKIP_LINE_RE = re.compile(
r'\b(?:change|cash\s*(?:paid|tendered)?|tip|gratuity|approved|'
r'auth(?:orized)?|visa|mastercard|amex|discover)\b',
re.IGNORECASE,
)
# Any standalone dollar-like amount (optional $, up to 6 digits, 2 decimals)
_ANY_DOLLAR_RE = re.compile(r'(?<!\d)\$?\s*([\d,]{1,6}\.\d{2})(?!\d)')
_DATE_ISO_RE = re.compile(r'\b(\d{4})[-/](\d{2})[-/](\d{2})\b') # YYYY-MM-DD or YYYY/MM/DD
_DATE_US_RE = re.compile(r'\b(\d{1,2})[/\-](\d{1,2})[/\-](\d{4})\b') # M/D/YYYY
_DATE_US_SHORT_RE = re.compile(r'\b(\d{1,2})[/\-](\d{1,2})[/\-](\d{2})\b') # M/D/YY
def _extract_amount_from_text(text: str) -> float:
"""Return the final total from OCR receipt text, or 0.0 if not found.
Pass 1 — labeled total: 'Total:', 'Grand Total:', 'Amount Due:', etc.
Pass 2 — bottom scan: reads lines from the bottom of the last 50% of text,
skipping change/cash/tip lines. Handles cases where Tesseract
garbled 'TOTAL' (e.g. 'T0TAL') or placed the amount on its own
line below the label.
"""
if not text:
return 0.0
# Pass 1: explicit label match
matches = list(_TOTAL_RE.finditer(text))
if matches:
raw = matches[-1].group(1).replace(',', '')
try:
val = float(raw)
if val > 0:
return val
except ValueError:
pass
# Pass 2: bottom-of-receipt line scan
# Only search the bottom half so item prices (middle section) are excluded
bottom = text[max(0, int(len(text) * 0.5)):]
for line in reversed(bottom.splitlines()):
if _SKIP_LINE_RE.search(line):
continue
m = _ANY_DOLLAR_RE.search(line)
if m:
try:
val = float(m.group(1).replace(',', ''))
if val > 0:
return val
except ValueError:
pass
return 0.0
def _extract_date_from_text(text: str) -> str | None:
"""Return the first plausible date in OCR text as YYYY-MM-DD, or None."""
if not text:
return None
m = _DATE_ISO_RE.search(text)
if m:
y, mo, d = int(m.group(1)), int(m.group(2)), int(m.group(3))
if 2000 <= y <= 2099 and 1 <= mo <= 12 and 1 <= d <= 31:
return f'{y}-{mo:02d}-{d:02d}'
m = _DATE_US_RE.search(text)
if m:
mo, d, y = int(m.group(1)), int(m.group(2)), int(m.group(3))
if 1 <= mo <= 12 and 1 <= d <= 31 and y >= 2000:
return f'{y}-{mo:02d}-{d:02d}'
m = _DATE_US_SHORT_RE.search(text)
if m:
mo, d, yr = int(m.group(1)), int(m.group(2)), int(m.group(3))
if 1 <= mo <= 12 and 1 <= d <= 31:
y = 2000 + yr if yr < 50 else 1900 + yr
return f'{y}-{mo:02d}-{d:02d}'
return None
logger = logging.getLogger(__name__)
EXPENSES_TOOLS = [
{'name': 'get_expenses', 'description': 'Retrieve expense records',
'parameters': {'employee_id': {'type': 'integer', 'optional': True},
'state': {'type': 'string', 'optional': True},
'date_from': {'type': 'string', 'optional': True},
'date_to': {'type': 'string', 'optional': True},
'limit': {'type': 'integer', 'optional': True}}},
{'name': 'get_expense_sheets', 'description': 'Get expense report sheets',
'parameters': {'state': {'type': 'string', 'optional': True},
'employee_id': {'type': 'integer', 'optional': True},
'limit': {'type': 'integer', 'optional': True}}},
{'name': 'get_pending_approvals', 'description': 'Get expense sheets pending approval',
'parameters': {}},
{'name': 'approve_expense_sheet', 'description': 'Approve an expense sheet',
'parameters': {'sheet_id': {'type': 'integer'}}},
{'name': 'get_expenses_summary', 'description': 'Get expense summary for a period',
'parameters': {'date_from': {'type': 'string', 'optional': True},
'date_to': {'type': 'string', 'optional': True}}},
{'name': 'get_expense_by_employee', 'description': 'Get expenses for a specific employee',
'parameters': {'employee_id': {'type': 'integer'},
'limit': {'type': 'integer', 'optional': True}}},
{'name': 'flag_for_review', 'description': 'Flag an expense for review',
'parameters': {'model': {'type': 'string'}, 'record_id': {'type': 'integer'},
'reason': {'type': 'string'},
'severity': {'type': 'string', 'optional': True}}},
{'name': 'post_chatter_note', 'description': 'Post a note on a record',
'parameters': {'model': {'type': 'string'}, 'record_id': {'type': 'integer'},
'note': {'type': 'string'}}},
]
class ExpensesAgent(BaseAgent):
name = 'expenses_agent'
domain = 'expenses'
required_odoo_module = 'hr_expense'
system_prompt_file = 'expenses_system.txt'
tools = EXPENSES_TOOLS
def __init__(self, odoo, llm, peer_bus=None):
super().__init__(odoo, llm, peer_bus)
self._et = ExpensesTools(odoo)
self._gathered_data: dict = {}
self._actions_taken: list = []
self._escalations_list: list = []
async def _plan(self) -> dict:
task = (self._directive.task if self._directive else '').lower()
receipts = getattr(self._directive.context, 'receipts', []) if self._directive else []
# The master LLM rewrites the user message into intent_summary (task).
# Also check the original raw_message threaded through peer_data so
# short replies like "skip duplicates" are detected even when rewritten.
raw_msg = ''
if self._directive and self._directive.context:
raw_msg = (self._directive.context.peer_data.get('raw_message') or '').lower()
combined = task + ' ' + raw_msg
# Detect whether the user is responding to a duplicate-approval request
skip_keywords = ('skip', 'remove duplicate', 'exclude duplicate', 'drop duplicate')
keep_keywords = ('keep all', 'keep both', 'include all', 'no skip', "don't skip")
confirm_keywords = ('confirm', 'looks good', 'go ahead', 'proceed', 'create it', 'create them')
if any(k in combined for k in keep_keywords):
user_dup_decision = 'keep_all'
elif any(k in combined for k in skip_keywords):
user_dup_decision = 'skip'
else:
user_dup_decision = 'skip' # default: skip duplicates when confirmed
user_confirmed = any(k in combined for k in confirm_keywords)
return {
'mode': 'create_from_receipts' if receipts else 'read',
'user_dup_decision': user_dup_decision,
'user_confirmed': user_confirmed,
'fetch_summary': any(k in task for k in ('summary', 'overview')) and not receipts,
'fetch_pending': any(k in task for k in ('pending', 'approve', 'approval')) and not receipts,
'employee_id': self._directive.params.get('employee_id') if self._directive else None,
'date_from': self._directive.params.get('date_from') if self._directive else None,
'date_to': self._directive.params.get('date_to') if self._directive else None,
}
async def _gather(self, plan: dict) -> dict:
data: dict = {'mode': plan.get('mode', 'read'),
'user_dup_decision': plan.get('user_dup_decision', 'skip'),
'user_confirmed': plan.get('user_confirmed', False)}
if plan.get('mode') == 'create_from_receipts':
self._gathered_data = data
return data
data['summary'] = await self._et.get_expenses_summary(
date_from=plan.get('date_from'), date_to=plan.get('date_to'),
)
if plan.get('fetch_pending'):
data['pending'] = await self._et.get_pending_approvals()
self._gathered_data = data
return data
async def _reason(self) -> dict:
data = self._gathered_data
analysis: dict = {'escalations': [], 'flags': []}
if data.get('mode') == 'create_from_receipts':
self._escalations_list = []
return analysis
summary = data.get('summary', {})
if summary.get('pending_approval_count', 0) > 10:
analysis['escalations'].append(
f'{summary["pending_approval_count"]} expense sheets pending approval.'
)
self._escalations_list = analysis['escalations']
return analysis
async def _act(self, reasoning: dict) -> list:
if self._gathered_data.get('mode') != 'create_from_receipts':
return []
receipts = getattr(self._directive.context, 'receipts', []) if self._directive else []
if not receipts:
return []
user_dup_decision = self._gathered_data.get('user_dup_decision', 'skip')
user_confirmed = self._gathered_data.get('user_confirmed', False)
user_id = (self._directive.context.peer_data.get('requesting_user_id')
if self._directive else None)
employee_id = await self._et.get_employee_id_for_user(user_id)
if not employee_id:
self._escalations_list.append(
'No employee record found for the current user; cannot create expense report.')
return []
expense_products = await self._et.get_expense_products()
default_product_id = expense_products[0]['id'] if expense_products else None
product_map = {p['id']: p['name'] for p in expense_products}
logger.info('expenses_agent: %d receipts received, %d expense products available',
len(receipts), len(expense_products))
# Pass 1: byte-exact dedup
seen_hashes: set = set()
unique_receipts = []
for r in receipts:
h = r.get('sha256')
if h and h in seen_hashes:
logger.info('expenses_agent: skipping byte-identical receipt %s', r.get('filename'))
continue
if h:
seen_hashes.add(h)
unique_receipts.append(r)
# Log OCR quality for each receipt so we can diagnose extraction failures
for r in unique_receipts:
raw_text = r.get('text', '') or ''
ocr_len = len(raw_text)
ocr_preview = raw_text[:120].replace('\n', '')
logger.info('ocr filename=%r date_hint=%r ocr_len=%d text_preview=%r',
r.get('filename'), r.get('date_from_name'), ocr_len, ocr_preview)
# Parse all receipts concurrently
parse_tasks = [
self._parse_receipt_text(
r.get('text', ''), r.get('filename', 'receipt'),
expense_products=expense_products,
date_hint=r.get('date_from_name'),
)
for r in unique_receipts
]
raw_parsed = await asyncio.gather(*parse_tasks, return_exceptions=True)
paired: list[tuple[dict, dict]] = []
for receipt, parsed in zip(unique_receipts, raw_parsed):
if isinstance(parsed, Exception):
logger.warning('expenses_agent: parse failed for %s: %s',
receipt.get('filename'), parsed)
parsed = {'vendor': receipt.get('filename', 'Expense'), 'amount': 0.0,
'date': receipt.get('date_from_name') or _date.today().isoformat(),
'time': None, 'product_name': ''}
logger.info('parsed filename=%r → vendor=%r amount=%s date=%r product=%r',
receipt.get('filename'), parsed.get('vendor'),
parsed.get('amount'), parsed.get('date'), parsed.get('product_name'))
paired.append((receipt, parsed))
# Pass 2: semantic dedup
deduped: list[tuple[dict, dict]] = []
dup_indices: set[int] = set() # indices into `paired` that are duplicates
for i, (receipt, parsed) in enumerate(paired):
dup_idx = self._find_semantic_duplicate(parsed, deduped)
if dup_idx is not None:
dup_indices.add(i)
if len(receipt.get('text', '')) > len(deduped[dup_idx][0].get('text', '')):
deduped[dup_idx] = (receipt, parsed)
else:
deduped.append((receipt, parsed))
# Auto-skip semantic duplicates by default; keep_all only if user explicitly asked.
# Receipts are only available in this single /upload request — there is no
# persistent receipt store across turns, so a "confirm then create" flow would
# always fail on the follow-up turn (no receipts in context). Creating
# immediately in draft state is the correct approach: users review and
# submit inside Odoo > Expenses.
n_skipped = len(paired) - len(deduped)
self._gathered_data['n_skipped'] = n_skipped
final_list = paired if user_dup_decision == 'keep_all' else deduped
sheet_name = f'Expense Report - {_date.today().isoformat()}'
sheet_result = await self._et.create_expense_sheet(sheet_name, employee_id)
if not sheet_result.success:
self._escalations_list.append(f'Failed to create expense sheet: {sheet_result.error}')
return []
sheet_id = sheet_result.record_id
actions = [f'Created expense sheet "{sheet_name}" (ID {sheet_id})']
for receipt, parsed in final_list:
product_id = default_product_id
chosen_name = parsed.get('product_name', '')
if chosen_name:
for p in expense_products:
if p['name'].lower() == chosen_name.lower():
product_id = p['id']
break
expense_result = await self._et.create_expense(
sheet_id=sheet_id,
employee_id=employee_id,
name=str(parsed.get('vendor', receipt.get('filename', 'Expense')))[:64],
total_amount=float(parsed.get('amount', 0.0)),
date=str(parsed.get('date') or _date.today().isoformat()),
product_id=product_id,
)
if expense_result.success:
cat = product_map.get(product_id, 'Expense')
actions.append(
f"Added: {parsed.get('vendor', 'Unknown vendor')} "
f"${float(parsed.get('amount', 0)):.2f} "
f"({cat}) on {parsed.get('date', 'today')}"
)
if receipt.get('b64'):
await self._et.attach_receipt(
'hr.expense', expense_result.record_id,
receipt.get('filename', 'receipt'),
receipt['b64'],
receipt.get('mimetype', 'application/octet-stream'),
)
else:
actions.append(
f"Could not create expense for {receipt.get('filename', 'receipt')}: "
f"{expense_result.error}"
)
self._actions_taken = actions
return actions
@staticmethod
def _find_semantic_duplicate(parsed: dict, candidates: list) -> int | None:
"""
Return the index in `candidates` of a receipt that appears to be the
same physical receipt as `parsed`, or None if no match found.
Pass 1 — exact-amount match (all must pass):
1. Same date
2. Amount > 0 and within $0.05 of each other
3. Transaction times within 30 min (if both present)
4. Vendor similarity >= 60 % (or both vendors are raw filenames)
Pass 2 — OCR-error match (amount may differ due to misread):
1. Same date
2. Both amounts > 0
3. Vendor similarity >= 80 % (stricter threshold compensates for loose amount)
4. Times within 30 min (if both present)
"""
amt = float(parsed.get('amount', 0))
date = parsed.get('date', '')
time = parsed.get('time') # HH:MM or None
vendor = str(parsed.get('vendor', '')).lower().strip()
is_filename = vendor.endswith(('.jpg', '.jpeg', '.png', '.gif', '.webp', '.bmp'))
def _times_compatible(t1, t2) -> bool:
"""Return False only when both times are present and >30 min apart."""
if not (t1 and t2):
return True
try:
h1, m1 = (int(p) for p in t1.split(':')[:2])
h2, m2 = (int(p) for p in t2.split(':')[:2])
return abs((h1 * 60 + m1) - (h2 * 60 + m2)) <= 30
except Exception:
return True
# Pass 1: amount must match within $0.05
for idx, (_, other) in enumerate(candidates):
other_amt = float(other.get('amount', 0))
if amt == 0 or other_amt == 0:
continue
if abs(amt - other_amt) > 0.05:
continue
if date != other.get('date', ''):
continue
if not _times_compatible(time, other.get('time')):
continue
other_vendor = str(other.get('vendor', '')).lower().strip()
other_is_filename = other_vendor.endswith(
('.jpg', '.jpeg', '.png', '.gif', '.webp', '.bmp'))
if is_filename or other_is_filename:
return idx
if difflib.SequenceMatcher(None, vendor, other_vendor).ratio() >= 0.6:
return idx
# Pass 2: same vendor + same date even when amounts differ (OCR misread)
if not is_filename:
for idx, (_, other) in enumerate(candidates):
other_amt = float(other.get('amount', 0))
if amt == 0 or other_amt == 0:
continue
if date != other.get('date', ''):
continue
if not _times_compatible(time, other.get('time')):
continue
other_vendor = str(other.get('vendor', '')).lower().strip()
if other_vendor.endswith(('.jpg', '.jpeg', '.png', '.gif', '.webp', '.bmp')):
continue
if difflib.SequenceMatcher(None, vendor, other_vendor).ratio() >= 0.80:
return idx
return None
async def _parse_receipt_text(self, text: str, filename: str,
expense_products: list = None,
date_hint: str = None) -> dict:
"""Parse a single receipt into structured fields.
Strategy (most-reliable first):
amount → regex on OCR text (deterministic)
date → filename timestamp > OCR regex > today
vendor → LLM (short excerpt, first ~600 chars)
product_name→ LLM (semantic match against expense product list)
The LLM is intentionally NOT asked for amount or date — the local
model hallucinates those fields when OCR text is ambiguous.
"""
today = _date.today().isoformat()
stripped = (text or '').strip()
ocr_failed = not stripped or stripped.startswith('[')
# ── Amount: regex (deterministic) ────────────────────────────────────
amount = _extract_amount_from_text(stripped) if not ocr_failed else 0.0
# ── Date: filename > OCR regex > today ───────────────────────────────
if date_hint:
date = date_hint
elif not ocr_failed:
date = _extract_date_from_text(stripped) or today
else:
date = today
# ── Vendor + Category: LLM (two fields only) ─────────────────────────
vendor = filename
product_name = ''
product_list = ', '.join(f'"{p["name"]}"' for p in (expense_products or []))
if not ocr_failed:
# Give LLM only the header of the receipt — vendor is in the first lines
excerpt = stripped[:600]
prompt = (
'Return ONLY valid JSON with exactly two keys:\n'
'"vendor": the store or restaurant name, copied exactly from the '
'first 1-3 lines of the receipt. Use "" if no clear name.\n'
f'"product_name": the single best match from [{product_list}] '
'based on the type of business (restaurant→Meals, gas station→Fuel, '
'hotel→Hotel, airline/transit→Transport, office store→Office Supplies). '
'Use "" if none fit.\n\n'
f'Receipt:\n{excerpt}\n\nJSON only:'
)
elif product_list:
# OCR failed — guess category from filename only
prompt = (
f'A receipt file named "{filename}" could not be read. '
f'Pick the most likely match from [{product_list}] based on the filename, '
f'or "". Return ONLY: {{"vendor": "", "product_name": "..."}}'
)
else:
return {'vendor': filename, 'amount': amount, 'date': date,
'time': None, 'product_name': ''}
try:
resp = await self._llm.submit(
[{'role': 'user', 'content': prompt}],
caller='expenses_agent_receipt_parser',
)
raw = (resp.content or '').strip()
first, last = raw.find('{'), raw.rfind('}')
if first != -1 and last > first:
data = json.loads(raw[first:last + 1])
v = str(data.get('vendor', '') or '').strip()
if v:
vendor = v
product_name = str(data.get('product_name', '') or '').strip()
except Exception as exc:
logger.warning('Receipt vendor/category parse failed for %s: %s', filename, exc)
return {'vendor': vendor, 'amount': amount, 'date': date,
'time': None, 'product_name': product_name}
async def _report(self) -> AgentReport:
data = self._gathered_data
directive_id = self._directive.directive_id if self._directive else ''
if data.get('mode') == 'create_from_receipts':
if self._actions_taken:
lines = '\n'.join(f'{a}' for a in self._actions_taken)
n_skipped = data.get('n_skipped', 0)
dup_note = f'\n({n_skipped} duplicate receipt(s) were automatically skipped.)' if n_skipped else ''
summary = (
f'Expense report created successfully:\n{lines}{dup_note}\n\n'
'The report is in draft — open Odoo Expenses, '
'review the amounts, and click Submit to send for approval.'
)
status = 'complete'
else:
summary = ('Could not create expense report. ' +
'; '.join(self._escalations_list or ['Unknown error']))
status = 'failed'
return AgentReport(
directive_id=directive_id, agent=self.name, status=status,
summary=summary, data=data,
escalations=self._escalations_list, actions_taken=self._actions_taken)
summary_data = data.get('summary', {})
parts = []
if summary_data:
parts.append(
f'Expenses: {summary_data.get("total_expenses", 0)} records, '
f'total ${summary_data.get("total_amount", 0):.2f}. '
f'{summary_data.get("pending_approval_count", 0)} pending approval.'
)
if not parts:
parts.append('Expenses review complete.')
return AgentReport(
directive_id=directive_id, agent=self.name, status='complete',
summary='\n'.join(parts), data=data,
escalations=self._escalations_list, actions_taken=[])
async def _dispatch_tool(self, name: str, args: dict):
dispatch = {
'get_expenses': self._et.get_expenses,
'get_expense_sheets': self._et.get_expense_sheets,
'get_pending_approvals': self._et.get_pending_approvals,
'approve_expense_sheet': self._et.approve_expense_sheet,
'get_expenses_summary': self._et.get_expenses_summary,
'get_expense_by_employee': self._et.get_expense_by_employee,
'flag_for_review': self._et.flag_for_review,
'post_chatter_note': self._et.post_chatter_note,
}
if name not in dispatch:
raise ValueError(f'Unknown tool: {name}')
return await dispatch[name](**args)
async def handle_peer_request(self, request_type: str, params: dict, directive_id: str) -> dict:
try:
if request_type == 'expenses_summary':
return await self._et.get_expenses_summary()
if request_type == 'employee_expenses':
return {'expenses': await self._et.get_expense_by_employee(
employee_id=params['employee_id'])}
return {'error': f'Unknown type: {request_type}'}
except Exception as exc:
return {'error': str(exc)}
async def sweep(self) -> SweepReport:
findings = []
try:
pending = await self._et.get_pending_approvals()
for sheet in pending:
emp = sheet.get('employee_id', [0, ''])
findings.append({
'type': 'pending_expense_approval',
'sheet_id': sheet.get('id'),
'employee': emp[1] if isinstance(emp, list) else '',
'amount': sheet.get('total_amount', 0),
'severity': 'low',
})
except Exception as exc:
return SweepReport(agent=self.name, findings=[], error=str(exc))
return SweepReport(agent=self.name, findings=findings, actions_taken=[],
summary=f'Expenses sweep: {len(findings)} pending approvals.')