Files
odoo-ai/agent_service/agents/expenses_agent.py
Carlos Garcia beac16a6a9 expenses_agent: fix OCR '$→8' misread inflating receipt totals
Add _fix_ocr_dollar_as_8() which strips a spurious leading '8' when it
sits at a word boundary before a non-zero digit + 1–3 more digits + .dd
(covers $10–$9999).  Applied at the top of _extract_amount_from_text so
both the labeled-total pass and the max-scan pass benefit.

  845.00  → 45.00   ($45 misread as 845)
  885.00  → 85.00   ($85 misread as 885)
  8150.00 → 150.00  ($150 misread as 8150)
  85.00   → 85.00   UNCHANGED (real $85 correctly read)
  8.50    → 8.50    UNCHANGED (real $8.50 correctly read)

12 new tests covering fix cases, non-fix cases, and end-to-end extraction
(110 tests total, all passing).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-21 16:08:39 -04:00

949 lines
45 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import asyncio
import difflib
import json
import logging
import re
from datetime import date as _date
from .base_agent import BaseAgent, AgentReport, AgentDirective, SweepReport
from ..tools.expenses_tools import ExpensesTools
# ---------------------------------------------------------------------------
# Receipt OCR helpers — regex-based, deterministic extraction
# ---------------------------------------------------------------------------
# Matches an explicitly labeled total line.
# Handles "Total: $22.46", "GRAND TOTAL 22.46", "Amount Due: 22.46",
# "Total Sale $58.75" (gas stations), "Net Sale $X", etc.
#
# The negative lookahead (?!\s*tax) prevents "Total Tax" / "Total Taxes"
# (a sub-total line present on restaurant receipts) from being confused
# with the final total when Tesseract splits a two-column label+amount
# layout across lines.
_TOTAL_RE = re.compile(
r'(?:grand\s*total|total\s*due|amount\s*due|balance\s*due|'
r'total\s*amount|total\s*charged|total\s*sale|net\s*sale|'
r'sale\s*total|you\s*paid|amount\s*paid|net\s*fee|total)'
r'(?!\s*tax)' # exclude "Total Tax / Total Taxes"
r'\s*[:\-]?\s*\$?\s*([\d,]+\.\d{2})',
re.IGNORECASE,
)
# OCR artefact: the '$' glyph is often misclassified as '8', turning
# 'Total: $45.00' into 'Total: 845.00'. We strip the spurious leading '8'
# when it sits at a word boundary and is followed by a non-zero digit then
# 1-3 more digits + two decimal places. This covers the $10$9999 range.
#
# 845.00 → 45.00 (was $45, OCR gave 845)
# 885.00 → 85.00 (was $85, OCR gave 885)
# 8150.00 → 150.00 (was $150, OCR gave 8150)
# 85.00 → 85.00 UNCHANGED — real $85 correctly read
# 8.50 → 8.50 UNCHANGED — real $8.50 correctly read
# 12845.00→ 12845.00 UNCHANGED — digit before the 8 blocks lookbehind
# Edge case: a real $8xx amount correctly read (e.g. 840.00) may be reduced
# to $40; this is rare compared to the misread and obvious on human review.
_OCR_DOLLAR_MISREAD_RE = re.compile(r'(?<!\d)8([1-9]\d{1,3}\.\d{2})\b')
def _fix_ocr_dollar_as_8(text: str) -> str:
"""Strip a spurious leading '8' that is an OCR misread of '$'."""
return _OCR_DOLLAR_MISREAD_RE.sub(r'\1', text)
# Lines that should never be treated as the total — change given back,
# tip added after the fact, etc. Card-brand lines like "VISA USD$ 36.78"
# are intentionally NOT listed here: the amount on those lines IS the charge.
_SKIP_LINE_RE = re.compile(
r'\b(?:change|cash\s*(?:paid|tendered)?|tip|gratuity)\b',
re.IGNORECASE,
)
# Any standalone dollar-like amount (optional $, up to 6 digits, 2 decimals)
_ANY_DOLLAR_RE = re.compile(r'(?<!\d)\$?\s*([\d,]{1,6}\.\d{2})(?!\d)')
# A single receipt has at most ~10 lines with dollar amounts (items + tax + total).
# Bank / credit-card statements have far more (one per transaction).
_STMT_AMOUNT_LINE_THRESHOLD = 10
def _is_likely_bank_statement(text: str) -> bool:
"""Return True when the OCR text has too many amount-bearing lines to be a receipt.
Single receipts: typically 1-9 lines with dollar values.
Bank/card statements: 10-50+ lines (one per transaction).
"""
count = sum(1 for line in text.splitlines() if _ANY_DOLLAR_RE.search(line))
return count >= _STMT_AMOUNT_LINE_THRESHOLD
# Image MIME types the vision LLM can process. PDF/HTML/TXT use text-only path.
_VISION_MIMETYPES = frozenset({
'image/jpeg', 'image/png', 'image/gif',
'image/bmp', 'image/tiff', 'image/webp',
})
def _get_vision_mode() -> str:
"""Return the configured receipt_vision_mode ('vision' | 'text').
Wraps get_settings() so tests can patch this single symbol instead of
fighting the lru_cache on Settings. Defaults to 'vision' on any error.
"""
try:
from ..config import get_settings
return get_settings().receipt_vision_mode
except Exception:
return 'vision'
_DATE_ISO_RE = re.compile(r'\b(\d{4})[-/](\d{2})[-/](\d{2})\b') # YYYY-MM-DD or YYYY/MM/DD
_DATE_US_RE = re.compile(r'\b(\d{1,2})[/\-](\d{1,2})[/\-](\d{4})\b') # M/D/YYYY
_DATE_US_SHORT_RE = re.compile(r'\b(\d{1,2})[/\-](\d{1,2})[/\-](\d{2})\b') # M/D/YY
# "05 MAY 2026" or "MAY 05 2026" or "05 May, 2026" (airline / hotel receipts)
_DATE_MON_RE = re.compile(
r'\b(\d{1,2})\s+([A-Za-z]{3,9})[,\s]+(\d{4})\b' # DD MON YYYY
r'|\b([A-Za-z]{3,9})\s+(\d{1,2})[,\s]+(\d{4})\b', # MON DD YYYY
)
_MONTH_MAP: dict[str, int] = {
'jan': 1, 'feb': 2, 'mar': 3, 'apr': 4, 'may': 5, 'jun': 6,
'jul': 7, 'aug': 8, 'sep': 9, 'oct': 10, 'nov': 11, 'dec': 12,
'january': 1, 'february': 2, 'march': 3, 'april': 4,
'june': 6, 'july': 7, 'august': 8, 'september': 9,
'october': 10, 'november': 11, 'december': 12,
}
def _extract_amount_from_text(text: str) -> float:
"""Return the final total from OCR receipt text, or 0.0 if not found.
Pass 1 — labeled total: 'Total:', 'Grand Total:', 'Amount Due:', etc.
Pass 2 — full-text maximum: scan every line for a dollar amount (skipping
change/tip lines) and return the largest value found. This handles:
• display-style receipts that show the charge at the top with no
label (e.g. LAYAL CAFE — "$40.10" printed before the item list)
• card-terminal printouts with lines like "VISA USD$ 36.78" that
carry no 'Total' keyword
The maximum heuristic works because the receipt total is always
≥ any individual item price; Pass 1 (labeled total) catches the
rare cases where a discount makes the total less than a line item.
"""
if not text:
return 0.0
# Normalise '$→8' OCR misread before any pattern matching.
text = _fix_ocr_dollar_as_8(text)
# Pass 1: explicit label match — return the LARGEST labeled amount.
# Using max() rather than the last positional match handles the common
# OCR artefact where "Total\n$2.80" (garbled "Total Taxes") appears
# before "Total\n$42.90" in the text; the actual total wins on value.
matches = list(_TOTAL_RE.finditer(text))
if matches:
best_labeled = 0.0
for m in matches:
try:
val = float(m.group(1).replace(',', ''))
if val > best_labeled:
best_labeled = val
except ValueError:
pass
if best_labeled > 0:
return best_labeled
# Pass 2: maximum dollar amount across the full text
best = 0.0
for line in text.splitlines():
if _SKIP_LINE_RE.search(line):
continue
m = _ANY_DOLLAR_RE.search(line)
if m:
try:
val = float(m.group(1).replace(',', ''))
if val > best:
best = val
except ValueError:
pass
if best > 0:
return best
return 0.0
def _extract_date_from_text(text: str) -> str | None:
"""Return the first plausible date in OCR text as YYYY-MM-DD, or None."""
if not text:
return None
m = _DATE_ISO_RE.search(text)
if m:
y, mo, d = int(m.group(1)), int(m.group(2)), int(m.group(3))
if 2000 <= y <= 2099 and 1 <= mo <= 12 and 1 <= d <= 31:
return f'{y}-{mo:02d}-{d:02d}'
m = _DATE_US_RE.search(text)
if m:
mo, d, y = int(m.group(1)), int(m.group(2)), int(m.group(3))
if 1 <= mo <= 12 and 1 <= d <= 31 and y >= 2000:
return f'{y}-{mo:02d}-{d:02d}'
m = _DATE_US_SHORT_RE.search(text)
if m:
mo, d, yr = int(m.group(1)), int(m.group(2)), int(m.group(3))
if 1 <= mo <= 12 and 1 <= d <= 31:
y = 2000 + yr if yr < 50 else 1900 + yr
return f'{y}-{mo:02d}-{d:02d}'
# Month-name formats: "05 MAY 2026", "MAY 05 2026", "05 May, 2026"
# Common on airline, hotel, and formal business receipts.
m = _DATE_MON_RE.search(text)
if m:
if m.group(1): # DD MON YYYY branch
d_s, mon_s, y_s = m.group(1), m.group(2), m.group(3)
else: # MON DD YYYY branch
mon_s, d_s, y_s = m.group(4), m.group(5), m.group(6)
mo = _MONTH_MAP.get(mon_s.lower()[:3])
if mo:
d_i, y_i = int(d_s), int(y_s)
if 1 <= d_i <= 31 and 2000 <= y_i <= 2099:
return f'{y_i}-{mo:02d}-{d_i:02d}'
return None
logger = logging.getLogger(__name__)
EXPENSES_TOOLS = [
{'name': 'get_expenses', 'description': 'Retrieve expense records',
'parameters': {'employee_id': {'type': 'integer', 'optional': True},
'state': {'type': 'string', 'optional': True},
'date_from': {'type': 'string', 'optional': True},
'date_to': {'type': 'string', 'optional': True},
'limit': {'type': 'integer', 'optional': True}}},
{'name': 'get_expense_sheets', 'description': 'Get expense report sheets',
'parameters': {'state': {'type': 'string', 'optional': True},
'employee_id': {'type': 'integer', 'optional': True},
'limit': {'type': 'integer', 'optional': True}}},
{'name': 'get_pending_approvals', 'description': 'Get expense sheets pending approval',
'parameters': {}},
{'name': 'approve_expense_sheet', 'description': 'Approve an expense sheet',
'parameters': {'sheet_id': {'type': 'integer'}}},
{'name': 'get_expenses_summary', 'description': 'Get expense summary for a period',
'parameters': {'date_from': {'type': 'string', 'optional': True},
'date_to': {'type': 'string', 'optional': True}}},
{'name': 'get_expense_by_employee', 'description': 'Get expenses for a specific employee',
'parameters': {'employee_id': {'type': 'integer'},
'limit': {'type': 'integer', 'optional': True}}},
{'name': 'flag_for_review', 'description': 'Flag an expense for review',
'parameters': {'model': {'type': 'string'}, 'record_id': {'type': 'integer'},
'reason': {'type': 'string'},
'severity': {'type': 'string', 'optional': True}}},
{'name': 'post_chatter_note', 'description': 'Post a note on a record',
'parameters': {'model': {'type': 'string'}, 'record_id': {'type': 'integer'},
'note': {'type': 'string'}}},
]
class ExpensesAgent(BaseAgent):
name = 'expenses_agent'
domain = 'expenses'
required_odoo_module = 'hr_expense'
system_prompt_file = 'expenses_system.txt'
tools = EXPENSES_TOOLS
auto_rag = False # Receipt processing needs no RAG docs; skip the 30s peer-bus call
def __init__(self, odoo, llm, peer_bus=None):
super().__init__(odoo, llm, peer_bus)
self._et = ExpensesTools(odoo)
self._gathered_data: dict = {}
self._actions_taken: list = []
self._escalations_list: list = []
async def _plan(self) -> dict:
task = (self._directive.task if self._directive else '').lower()
receipts = getattr(self._directive.context, 'receipts', []) if self._directive else []
# The master LLM rewrites the user message into intent_summary (task).
# Also check the original raw_message threaded through peer_data so
# short replies like "skip duplicates" are detected even when rewritten.
raw_msg = ''
if self._directive and self._directive.context:
raw_msg = (self._directive.context.peer_data.get('raw_message') or '').lower()
combined = task + ' ' + raw_msg
# Detect whether the user is responding to a duplicate-approval request
skip_keywords = ('skip', 'remove duplicate', 'exclude duplicate', 'drop duplicate')
keep_keywords = ('keep all', 'keep both', 'include all', 'no skip', "don't skip")
confirm_keywords = ('confirm', 'looks good', 'go ahead', 'proceed', 'create it', 'create them')
if any(k in combined for k in keep_keywords):
user_dup_decision = 'keep_all'
elif any(k in combined for k in skip_keywords):
user_dup_decision = 'skip'
else:
user_dup_decision = 'skip' # default: skip duplicates when confirmed
user_confirmed = any(k in combined for k in confirm_keywords)
return {
'mode': 'create_from_receipts' if receipts else 'read',
'user_dup_decision': user_dup_decision,
'user_confirmed': user_confirmed,
'fetch_summary': any(k in task for k in ('summary', 'overview')) and not receipts,
'fetch_pending': any(k in task for k in ('pending', 'approve', 'approval')) and not receipts,
'employee_id': self._directive.params.get('employee_id') if self._directive else None,
'date_from': self._directive.params.get('date_from') if self._directive else None,
'date_to': self._directive.params.get('date_to') if self._directive else None,
}
async def _gather(self, plan: dict) -> dict:
data: dict = {'mode': plan.get('mode', 'read'),
'user_dup_decision': plan.get('user_dup_decision', 'skip'),
'user_confirmed': plan.get('user_confirmed', False)}
if plan.get('mode') == 'create_from_receipts':
self._gathered_data = data
return data
data['summary'] = await self._et.get_expenses_summary(
date_from=plan.get('date_from'), date_to=plan.get('date_to'),
)
if plan.get('fetch_pending'):
data['pending'] = await self._et.get_pending_approvals()
self._gathered_data = data
return data
async def _reason(self) -> dict:
data = self._gathered_data
analysis: dict = {'escalations': [], 'flags': []}
if data.get('mode') == 'create_from_receipts':
self._escalations_list = []
return analysis
summary = data.get('summary', {})
if summary.get('pending_approval_count', 0) > 10:
analysis['escalations'].append(
f'{summary["pending_approval_count"]} expense sheets pending approval.'
)
self._escalations_list = analysis['escalations']
return analysis
async def _act(self, reasoning: dict) -> list:
if self._gathered_data.get('mode') != 'create_from_receipts':
return []
receipts = getattr(self._directive.context, 'receipts', []) if self._directive else []
if not receipts:
return []
user_dup_decision = self._gathered_data.get('user_dup_decision', 'skip')
user_confirmed = self._gathered_data.get('user_confirmed', False)
user_id = (self._directive.context.peer_data.get('requesting_user_id')
if self._directive else None)
employee_id = await self._et.get_employee_id_for_user(user_id)
if not employee_id:
self._escalations_list.append(
'No employee record found for the current user; cannot create expense report.')
return []
expense_products = await self._et.get_expense_products()
# Prefer "Meals" as the fallback category — most receipts are food.
# Avoid blindly defaulting to whatever Odoo returns first (often "Communication").
_meals = next((p for p in expense_products
if p['name'].lower() == 'meals'), None)
default_product_id = (
_meals['id'] if _meals
else (expense_products[0]['id'] if expense_products else None)
)
product_map = {p['id']: p['name'] for p in expense_products}
logger.info('expenses_agent: %d receipts received, %d expense products available',
len(receipts), len(expense_products))
# Pass 1: byte-exact dedup
seen_hashes: set = set()
unique_receipts = []
for r in receipts:
h = r.get('sha256')
if h and h in seen_hashes:
logger.info('expenses_agent: skipping byte-identical receipt %s', r.get('filename'))
continue
if h:
seen_hashes.add(h)
unique_receipts.append(r)
# Log OCR quality for each receipt so we can diagnose extraction failures
for r in unique_receipts:
raw_text = r.get('text', '') or ''
ocr_len = len(raw_text)
ocr_preview = raw_text[:120].replace('\n', '')
logger.info('ocr filename=%r date_hint=%r ocr_len=%d text_preview=%r',
r.get('filename'), r.get('date_from_name'), ocr_len, ocr_preview)
# Parse all receipts: regex phase is instant; LLM phase is batched into
# a single call so N receipts cost 1 LLM round-trip instead of N.
raw_parsed = await self._batch_parse_receipts(unique_receipts, expense_products)
paired: list[tuple[dict, dict]] = []
for receipt, parsed in zip(unique_receipts, raw_parsed):
if isinstance(parsed, Exception):
logger.warning('expenses_agent: parse failed for %s: %s',
receipt.get('filename'), parsed)
parsed = {'vendor': receipt.get('filename', 'Expense'), 'amount': 0.0,
'date': receipt.get('date_from_name') or _date.today().isoformat(),
'time': None, 'product_name': ''}
if parsed.get('skip'):
logger.info('expenses_agent: skipping bank/card statement: %s',
receipt.get('filename'))
self._escalations_list.append(
f"Skipped \"{receipt.get('filename')}\": "
'looks like a bank or card statement, not a single receipt.'
)
continue
logger.info('parsed filename=%r → vendor=%r amount=%s date=%r product=%r',
receipt.get('filename'), parsed.get('vendor'),
parsed.get('amount'), parsed.get('date'), parsed.get('product_name'))
paired.append((receipt, parsed))
# Pass 2: semantic dedup
deduped: list[tuple[dict, dict]] = []
dup_indices: set[int] = set() # indices into `paired` that are duplicates
for i, (receipt, parsed) in enumerate(paired):
dup_idx = self._find_semantic_duplicate(parsed, deduped)
if dup_idx is not None:
dup_indices.add(i)
if len(receipt.get('text', '')) > len(deduped[dup_idx][0].get('text', '')):
deduped[dup_idx] = (receipt, parsed)
else:
deduped.append((receipt, parsed))
# Auto-skip semantic duplicates by default; keep_all only if user explicitly asked.
# Receipts are only available in this single /upload request — there is no
# persistent receipt store across turns, so a "confirm then create" flow would
# always fail on the follow-up turn (no receipts in context). Creating
# immediately in draft state is the correct approach: users review and
# submit inside Odoo > Expenses.
n_skipped = len(paired) - len(deduped)
self._gathered_data['n_skipped'] = n_skipped
final_list = paired if user_dup_decision == 'keep_all' else deduped
sheet_name = f'Expense Report - {_date.today().isoformat()}'
sheet_result = await self._et.create_expense_sheet(sheet_name, employee_id)
if not sheet_result.success:
self._escalations_list.append(f'Failed to create expense sheet: {sheet_result.error}')
return []
sheet_id = sheet_result.record_id
actions = [f'Created expense sheet "{sheet_name}" (ID {sheet_id})']
for receipt, parsed in final_list:
product_id = default_product_id
chosen_name = parsed.get('product_name', '')
if chosen_name:
for p in expense_products:
if p['name'].lower() == chosen_name.lower():
product_id = p['id']
break
expense_result = await self._et.create_expense(
sheet_id=sheet_id,
employee_id=employee_id,
name=str(parsed.get('vendor', receipt.get('filename', 'Expense')))[:64],
total_amount=float(parsed.get('amount', 0.0)),
date=str(parsed.get('date') or _date.today().isoformat()),
product_id=product_id,
)
if expense_result.success:
cat = product_map.get(product_id, 'Expense')
actions.append(
f"Added: {parsed.get('vendor', 'Unknown vendor')} "
f"${float(parsed.get('amount', 0)):.2f} "
f"({cat}) on {parsed.get('date', 'today')}"
)
if receipt.get('b64'):
await self._et.attach_receipt(
'hr.expense', expense_result.record_id,
receipt.get('filename', 'receipt'),
receipt['b64'],
receipt.get('mimetype', 'application/octet-stream'),
)
else:
actions.append(
f"Could not create expense for {receipt.get('filename', 'receipt')}: "
f"{expense_result.error}"
)
self._actions_taken = actions
return actions
@staticmethod
def _find_semantic_duplicate(parsed: dict, candidates: list) -> int | None:
"""
Return the index in `candidates` of a receipt that appears to be the
same physical receipt as `parsed`, or None if no match found.
Pass 1 — exact-amount match (all must pass):
1. Same date
2. Amount > 0 and within $0.05 of each other
3. Transaction times within 30 min (if both present)
4. Vendor similarity >= 60 % (or both vendors are raw filenames)
Pass 2 — OCR-error match (amount may differ due to misread):
1. Same date
2. Both amounts > 0
3. Vendor similarity >= 80 % (stricter threshold compensates for loose amount)
4. Times within 30 min (if both present)
"""
amt = float(parsed.get('amount', 0))
date = parsed.get('date', '')
time = parsed.get('time') # HH:MM or None
vendor = str(parsed.get('vendor', '')).lower().strip()
is_filename = vendor.endswith(('.jpg', '.jpeg', '.png', '.gif', '.webp', '.bmp'))
def _times_compatible(t1, t2) -> bool:
"""Return False only when both times are present and >30 min apart."""
if not (t1 and t2):
return True
try:
h1, m1 = (int(p) for p in t1.split(':')[:2])
h2, m2 = (int(p) for p in t2.split(':')[:2])
return abs((h1 * 60 + m1) - (h2 * 60 + m2)) <= 30
except Exception:
return True
# Pass 1: amount must match within $0.05
for idx, (_, other) in enumerate(candidates):
other_amt = float(other.get('amount', 0))
if amt == 0 or other_amt == 0:
continue
if abs(amt - other_amt) > 0.05:
continue
if date != other.get('date', ''):
continue
if not _times_compatible(time, other.get('time')):
continue
other_vendor = str(other.get('vendor', '')).lower().strip()
other_is_filename = other_vendor.endswith(
('.jpg', '.jpeg', '.png', '.gif', '.webp', '.bmp'))
if is_filename or other_is_filename:
return idx
if difflib.SequenceMatcher(None, vendor, other_vendor).ratio() >= 0.6:
return idx
# Pass 2: same vendor + same date even when amounts differ (OCR misread)
if not is_filename:
for idx, (_, other) in enumerate(candidates):
other_amt = float(other.get('amount', 0))
if amt == 0 or other_amt == 0:
continue
if date != other.get('date', ''):
continue
if not _times_compatible(time, other.get('time')):
continue
other_vendor = str(other.get('vendor', '')).lower().strip()
if other_vendor.endswith(('.jpg', '.jpeg', '.png', '.gif', '.webp', '.bmp')):
continue
if difflib.SequenceMatcher(None, vendor, other_vendor).ratio() >= 0.80:
return idx
return None
async def _batch_parse_receipts(self, receipts: list, expense_products: list) -> list:
"""Parse all receipts with a single LLM call instead of one per receipt.
Phase 1 — regex (no LLM, instant):
• amount : _extract_amount_from_text
• date : filename hint > OCR regex > today
• skip flag : bank/card statement detection
Phase 2 — one batched LLM call:
• vendor + product_name for all non-skipped receipts in one prompt
• Vision mode (RECEIPT_VISION_MODE=vision): falls back to individual
calls because images can't be batched in a single Ollama message
• Falls back to individual _parse_receipt_text calls on any failure
Returns a list of parsed dicts in the same order as `receipts`.
Each dict: {vendor, amount, date, time, product_name, skip?}
"""
today = _date.today().isoformat()
results: list[dict] = []
needs_llm: list[int] = [] # indices into results that need vendor/cat
# ── Phase 1: fast per-receipt regex ──────────────────────────────────
for r in receipts:
filename = r.get('filename', 'receipt')
stripped = (r.get('text', '') or '').strip()
ocr_failed = not stripped or stripped.startswith('[')
if not ocr_failed and _is_likely_bank_statement(stripped):
n = sum(1 for line in stripped.splitlines() if _ANY_DOLLAR_RE.search(line))
logger.warning('receipt %s: bank statement (%d amount lines) — skip', filename, n)
results.append({'vendor': filename, 'amount': 0.0,
'date': r.get('date_from_name') or today, 'time': None,
'product_name': '', 'skip': True})
continue
amount = _extract_amount_from_text(stripped) if not ocr_failed else 0.0
date_hint = r.get('date_from_name')
date = (date_hint or
(_extract_date_from_text(stripped) if not ocr_failed else None) or
today)
results.append({'vendor': filename, 'amount': amount, 'date': date,
'time': None, 'product_name': '',
# internal keys stripped before returning
'_ocr_failed': ocr_failed, '_stripped': stripped,
'_b64': r.get('b64'), '_mimetype': r.get('mimetype'),
'_filename': filename})
needs_llm.append(len(results) - 1)
product_list = ', '.join(f'"{p["name"]}"' for p in expense_products)
if not needs_llm or not product_list:
for entry in results:
for k in list(entry):
if k.startswith('_'):
del entry[k]
return results
# ── Phase 2a: vision mode → individual calls (can't batch images) ────
use_vision = (
_get_vision_mode() == 'vision'
and any(results[i].get('_b64') and
results[i].get('_mimetype') in _VISION_MIMETYPES
for i in needs_llm)
)
if use_vision:
tasks = [
self._parse_receipt_text(
results[i]['_stripped'], results[i]['_filename'],
expense_products=expense_products,
b64=results[i].get('_b64'),
mimetype=results[i].get('_mimetype'),
)
for i in needs_llm
]
individual = await asyncio.gather(*tasks, return_exceptions=True)
for i, parsed in zip(needs_llm, individual):
if isinstance(parsed, Exception) or not isinstance(parsed, dict):
continue
results[i]['vendor'] = parsed.get('vendor', results[i]['_filename'])
results[i]['product_name'] = parsed.get('product_name', '')
for entry in results:
for k in list(entry):
if k.startswith('_'):
del entry[k]
return results
# ── Phase 2b: text mode → single batched LLM call ────────────────────
_cat_guide = (
'restaurant/cafe/food court/bar → food/meal product; '
'airline/airport/transit/taxi/parking/rental car → travel product; '
'gas station/petrol/fuel → fuel product; '
'hotel/motel/lodging → accommodation product; '
'hardware/home improvement/tech/office supply → supplies product; '
'return "" if nothing fits'
)
receipts_block = ''
for seq, i in enumerate(needs_llm, 1):
entry = results[i]
if entry['_ocr_failed']:
excerpt = f'[filename: {entry["_filename"]}]'
else:
excerpt = entry['_stripped'][:300]
receipts_block += f'\n=== Receipt {seq} ({entry["_filename"]}) ===\n{excerpt}\n'
n = len(needs_llm)
batch_prompt = (
f'Return ONLY a JSON array with exactly {n} objects, one per receipt below.\n'
f'Each object must have exactly two keys:\n'
f'"vendor": business name from the receipt header '
f'(first 1-3 lines; ignore slogans and item names; '
f'do NOT substitute a brand not clearly present).\n'
f'"product_name": single best match from [{product_list}].\n'
f'Category guide: {_cat_guide}\n'
f'JSON array only:\n{receipts_block}'
)
try:
resp = await self._llm.submit(
[{'role': 'user', 'content': batch_prompt}],
caller='expenses_agent_receipt_parser',
)
raw = (resp.content or '').strip()
first, last = raw.find('['), raw.rfind(']')
if first == -1 or last <= first:
raise ValueError(f'No JSON array in response: {raw[:200]}')
batch_data = json.loads(raw[first:last + 1])
if len(batch_data) != n:
raise ValueError(f'Expected {n} items, got {len(batch_data)}')
for i, item in zip(needs_llm, batch_data):
v = str(item.get('vendor', '') or '').strip()
if v:
results[i]['vendor'] = v
results[i]['product_name'] = str(item.get('product_name', '') or '').strip()
logger.info('expenses_agent: batch LLM parsed %d receipts in 1 call', n)
except Exception as exc:
logger.warning('expenses_agent: batch LLM failed (%s) — falling back to individual calls', exc)
fallback_tasks = [
self._parse_receipt_text(
results[i]['_stripped'], results[i]['_filename'],
expense_products=expense_products,
)
for i in needs_llm
]
fallback = await asyncio.gather(*fallback_tasks, return_exceptions=True)
for i, parsed in zip(needs_llm, fallback):
if isinstance(parsed, Exception) or not isinstance(parsed, dict):
continue
v = str(parsed.get('vendor', '') or '').strip()
if v:
results[i]['vendor'] = v
results[i]['product_name'] = str(parsed.get('product_name', '') or '').strip()
# Strip internal bookkeeping keys before returning
for entry in results:
for k in list(entry):
if k.startswith('_'):
del entry[k]
return results
async def _parse_receipt_text(self, text: str, filename: str,
expense_products: list = None,
date_hint: str = None,
b64: str = None,
mimetype: str = None) -> dict:
"""Parse a single receipt into structured fields.
Strategy (most-reliable first):
amount → regex on OCR text (deterministic, never ask LLM)
date → filename timestamp > OCR regex > today
vendor → vision LLM (image) > text LLM (OCR excerpt) > filename
product_name → same LLM call as vendor
Vision mode (RECEIPT_VISION_MODE=vision, default):
When the upload is a JPEG/PNG/etc., the raw image is sent to the
vision-capable LLM so it can read logos and stylised fonts that
Tesseract OCR mangles. If the vision call fails for any reason
(model error, timeout, bad JSON) the text path is used as fallback.
Text mode (RECEIPT_VISION_MODE=text):
Classic behaviour — only Tesseract OCR text is forwarded to the LLM.
Set in .env to instantly revert without rebuilding the container.
"""
today = _date.today().isoformat()
stripped = (text or '').strip()
ocr_failed = not stripped or stripped.startswith('[')
# ── Bank / card statement detection ──────────────────────────────────
# A statement screenshot has many amount-bearing lines; running the
# max-scan on it returns a random large transaction, not a total.
# Skip these files so they don't produce a wildly wrong expense.
if not ocr_failed and _is_likely_bank_statement(stripped):
n = sum(1 for l in stripped.splitlines() if _ANY_DOLLAR_RE.search(l))
logger.warning(
'receipt %s: looks like a bank/card statement (%d amount lines) — skip',
filename, n,
)
return {'vendor': filename, 'amount': 0.0,
'date': date_hint or today, 'time': None,
'product_name': '', 'skip': True}
# ── Amount: regex (deterministic) ────────────────────────────────────
amount = _extract_amount_from_text(stripped) if not ocr_failed else 0.0
# ── Date: filename > OCR regex > today ───────────────────────────────
if date_hint:
date = date_hint
elif not ocr_failed:
date = _extract_date_from_text(stripped) or today
else:
date = today
# ── Vendor + Category: LLM ───────────────────────────────────────────
vendor = filename
product_name = ''
product_list = ', '.join(f'"{p["name"]}"' for p in (expense_products or []))
if not product_list:
# No expense products configured — nothing to categorise
return {'vendor': vendor, 'amount': amount, 'date': date,
'time': None, 'product_name': ''}
# Shared category guidance used in both prompt paths
_cat_guide = (
'Guide: restaurant / cafe / fast food / food court → food/meal product; '
'airline / airport / transit / taxi / parking / rental car → travel product; '
'gas station / petrol / fuel → fuel product; '
'hotel / motel / lodging → accommodation product; '
'hardware / home improvement / tech / office supply store → supplies product. '
'Return "" if nothing fits.'
)
# ── Path A: vision LLM ───────────────────────────────────────────────
# Use when: vision mode is enabled AND the file is a supported image type.
# The model sees the actual receipt image — no OCR garbling, reads logos
# and stylised fonts directly. Falls through to Path B on any failure.
use_vision = (
_get_vision_mode() == 'vision'
and bool(b64)
and mimetype in _VISION_MIMETYPES
)
if use_vision:
vision_prompt = (
'Return ONLY valid JSON with exactly two keys:\n'
'"vendor": the business name printed at the top of this receipt '
'(first 1-3 lines; ignore slogans, product item names, '
'and payment-processor logos).\n'
f'"product_name": pick the single best match from [{product_list}]. '
f'{_cat_guide}\n'
'JSON only:'
)
try:
resp = await self._llm.submit(
[{'role': 'user', 'content': vision_prompt, 'images': [b64]}],
caller='expenses_agent_receipt_parser',
)
raw = (resp.content or '').strip()
first, last = raw.find('{'), raw.rfind('}')
if first != -1 and last > first:
data = json.loads(raw[first:last + 1])
v = str(data.get('vendor', '') or '').strip()
if v:
vendor = v
product_name = str(data.get('product_name', '') or '').strip()
logger.debug('vision vendor=%r product=%r for %s', vendor, product_name, filename)
return {'vendor': vendor, 'amount': amount, 'date': date,
'time': None, 'product_name': product_name}
except Exception as exc:
logger.warning(
'Vision LLM failed for %s: %s — falling back to text path',
filename, exc,
)
# Reset vendor so the text path starts fresh
vendor = filename
product_name = ''
# ── Path B: text-only (OCR excerpt) ─────────────────────────────────
# Used when: vision mode is off, mimetype is not an image (PDF/TXT/HTML),
# or the vision call failed.
if not ocr_failed:
excerpt = stripped[:600]
text_prompt = (
'Return ONLY valid JSON with exactly two keys:\n'
'"vendor": the business name printed at the TOP of the receipt '
'(usually the first 1-3 lines). '
'Ignore slogans ("How doers get more done"), product item names, '
'and payment-processor logos. '
'OCR often substitutes look-alike characters — correct obvious '
'errors (e.g. "LRYAL""LAYAL", "Subwey""Subway", '
'"H0ME DEP0T""HOME DEPOT", "W4LMART""WALMART"). '
'IMPORTANT: only use a brand name that is clearly present in the '
'text — do NOT substitute a different well-known brand if the '
'name is merely unclear. '
'If this looks like a bank or credit-card statement listing '
'multiple transactions rather than a single merchant receipt, '
'use "". Use "" if no clear business name is visible.\n'
f'"product_name": pick the single best match from [{product_list}]. '
f'{_cat_guide}\n\n'
f'Receipt text:\n{excerpt}\n\nJSON only:'
)
else:
# OCR failed entirely — guess category from filename only
text_prompt = (
f'A receipt file named "{filename}" could not be read. '
f'Pick the most likely match from [{product_list}] based on the filename, '
f'or "". Return ONLY: {{"vendor": "", "product_name": "..."}}'
)
try:
resp = await self._llm.submit(
[{'role': 'user', 'content': text_prompt}],
caller='expenses_agent_receipt_parser',
)
raw = (resp.content or '').strip()
first, last = raw.find('{'), raw.rfind('}')
if first != -1 and last > first:
data = json.loads(raw[first:last + 1])
v = str(data.get('vendor', '') or '').strip()
if v:
vendor = v
product_name = str(data.get('product_name', '') or '').strip()
except Exception as exc:
logger.warning('Receipt vendor/category parse failed for %s: %s', filename, exc)
return {'vendor': vendor, 'amount': amount, 'date': date,
'time': None, 'product_name': product_name}
async def _report(self) -> AgentReport:
data = self._gathered_data
directive_id = self._directive.directive_id if self._directive else ''
if data.get('mode') == 'create_from_receipts':
if self._actions_taken:
lines = '\n'.join(f'{a}' for a in self._actions_taken)
n_skipped = data.get('n_skipped', 0)
dup_note = f'\n({n_skipped} duplicate receipt(s) were automatically skipped.)' if n_skipped else ''
stmt_skips = [e for e in self._escalations_list if 'statement' in e.lower()]
stmt_note = ('\n' + '\n'.join(stmt_skips)) if stmt_skips else ''
summary = (
f'Expense report created successfully:\n{lines}{dup_note}{stmt_note}\n\n'
'The report is in draft — open Odoo Expenses, '
'review the amounts, and click Submit to send for approval.'
)
status = 'complete'
else:
summary = ('Could not create expense report. ' +
'; '.join(self._escalations_list or ['Unknown error']))
status = 'failed'
return AgentReport(
directive_id=directive_id, agent=self.name, status=status,
summary=summary, data=data,
escalations=self._escalations_list, actions_taken=self._actions_taken)
summary_data = data.get('summary', {})
parts = []
if summary_data:
parts.append(
f'Expenses: {summary_data.get("total_expenses", 0)} records, '
f'total ${summary_data.get("total_amount", 0):.2f}. '
f'{summary_data.get("pending_approval_count", 0)} pending approval.'
)
if not parts:
parts.append('Expenses review complete.')
return AgentReport(
directive_id=directive_id, agent=self.name, status='complete',
summary='\n'.join(parts), data=data,
escalations=self._escalations_list, actions_taken=[])
async def _dispatch_tool(self, name: str, args: dict):
dispatch = {
'get_expenses': self._et.get_expenses,
'get_expense_sheets': self._et.get_expense_sheets,
'get_pending_approvals': self._et.get_pending_approvals,
'approve_expense_sheet': self._et.approve_expense_sheet,
'get_expenses_summary': self._et.get_expenses_summary,
'get_expense_by_employee': self._et.get_expense_by_employee,
'flag_for_review': self._et.flag_for_review,
'post_chatter_note': self._et.post_chatter_note,
}
if name not in dispatch:
raise ValueError(f'Unknown tool: {name}')
return await dispatch[name](**args)
async def handle_peer_request(self, request_type: str, params: dict, directive_id: str) -> dict:
try:
if request_type == 'expenses_summary':
return await self._et.get_expenses_summary()
if request_type == 'employee_expenses':
return {'expenses': await self._et.get_expense_by_employee(
employee_id=params['employee_id'])}
return {'error': f'Unknown type: {request_type}'}
except Exception as exc:
return {'error': str(exc)}
async def sweep(self) -> SweepReport:
findings = []
try:
pending = await self._et.get_pending_approvals()
for sheet in pending:
emp = sheet.get('employee_id', [0, ''])
findings.append({
'type': 'pending_expense_approval',
'sheet_id': sheet.get('id'),
'employee': emp[1] if isinstance(emp, list) else '',
'amount': sheet.get('total_amount', 0),
'severity': 'low',
})
except Exception as exc:
return SweepReport(agent=self.name, findings=[], error=str(exc))
return SweepReport(agent=self.name, findings=findings, actions_taken=[],
summary=f'Expenses sweep: {len(findings)} pending approvals.')