feat: OCR via tesseract, dedup, category selection for expense receipts

- Dockerfile: install tesseract-ocr so Pillow+pytesseract can OCR receipt images
- operational_store: JSON-serialize raw_data before passing to asyncpg JSONB
- receipt_parser: add SHA256 hash + date extracted from filename timestamps
- expenses_agent: deduplicate receipts by hash before creating expense records
- expenses_agent: fetch all expensable Odoo products, pass list to LLM for
  category selection (Meals, Flights, etc.) per receipt
- expenses_agent: pass date_hint from filename (e.g. 20260509_180857.jpg -> 2026-05-09)
  as fallback when OCR text is unavailable
- expenses_tools: add get_expense_products() to fetch all expensable products

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Carlos Garcia
2026-05-16 01:40:32 -04:00
parent 6ab9624ec6
commit ef6dad5a81
5 changed files with 96 additions and 21 deletions

View File

@@ -7,6 +7,7 @@ WORKDIR /app
RUN apt-get update && apt-get install -y --no-install-recommends \
gcc libpq-dev \
tesseract-ocr \
&& rm -rf /var/lib/apt/lists/*
COPY requirements.txt .

View File

@@ -115,11 +115,39 @@ class ExpensesAgent(BaseAgent):
sheet_id = sheet_result.record_id
actions = [f'Created expense sheet "{sheet_name}" (ID {sheet_id})']
product_id = await self._et.get_default_expense_product()
# Fetch all expensable products once for category selection
expense_products = await self._et.get_expense_products()
default_product_id = expense_products[0]['id'] if expense_products else None
product_map = {p['id']: p['name'] for p in expense_products}
for receipt in receipts:
# Deduplicate receipts by SHA256 hash — same image uploaded twice
seen_hashes: set = set()
unique_receipts = []
for r in receipts:
h = r.get('sha256')
if h:
if h in seen_hashes:
logger.info('expenses_agent: skipping duplicate receipt %s', r.get('filename'))
actions.append(f"Skipped duplicate: {r.get('filename', 'receipt')}")
continue
seen_hashes.add(h)
unique_receipts.append(r)
for receipt in unique_receipts:
parsed = await self._parse_receipt_text(
receipt.get('text', ''), receipt.get('filename', 'receipt'))
receipt.get('text', ''), receipt.get('filename', 'receipt'),
expense_products=expense_products,
date_hint=receipt.get('date_from_name'),
)
# Pick product by name match returned from LLM, fall back to default
product_id = default_product_id
chosen_name = parsed.get('product_name', '')
if chosen_name:
for p in expense_products:
if p['name'].lower() == chosen_name.lower():
product_id = p['id']
break
expense_result = await self._et.create_expense(
sheet_id=sheet_id,
employee_id=employee_id,
@@ -127,13 +155,13 @@ class ExpensesAgent(BaseAgent):
total_amount=float(parsed.get('amount', 0.0)),
date=str(parsed.get('date') or _date.today().isoformat()),
product_id=product_id,
description=str(parsed.get('description', '')),
)
if expense_result.success:
cat = product_map.get(product_id, 'Expense')
actions.append(
f"Added: {parsed.get('vendor', 'Unknown vendor')} "
f"${float(parsed.get('amount', 0)):.2f} "
f"on {parsed.get('date', 'today')}"
f"({cat}) on {parsed.get('date', 'today')}"
)
if receipt.get('b64'):
await self._et.attach_receipt(
@@ -151,20 +179,39 @@ class ExpensesAgent(BaseAgent):
self._actions_taken = actions
return actions
async def _parse_receipt_text(self, text: str, filename: str) -> dict:
async def _parse_receipt_text(self, text: str, filename: str,
expense_products: list = None,
date_hint: str = None) -> dict:
today = _date.today().isoformat()
fallback = {'vendor': filename, 'amount': 0.0,
'date': _date.today().isoformat(), 'description': filename}
if not text or text.startswith('['):
return fallback
'date': date_hint or today, 'product_name': ''}
ocr_failed = not text or text.startswith('[')
prompt = (
'Extract expense details from the following receipt text. '
'Return ONLY valid JSON with these keys: '
'"vendor" (string), "amount" (number, the total charged), '
'"date" (string YYYY-MM-DD, use today if absent), '
'"description" (string, brief expense type).\n\n'
f'Receipt text (first 2000 chars):\n{text[:2000]}\n\nJSON only:'
)
product_list = ''
if expense_products:
names = [p['name'] for p in expense_products]
product_list = ', '.join(f'"{n}"' for n in names)
if ocr_failed:
# No OCR text — still try to classify category from filename/date
if not product_list:
return fallback
prompt = (
f'A receipt photo named "{filename}" could not be read by OCR. '
f'Based only on the filename, pick the most likely expense category '
f'from this list: [{product_list}]. '
f'Return ONLY valid JSON: {{"product_name": "..."}}'
)
else:
prompt = (
'Extract expense details from the following receipt text. '
'Return ONLY valid JSON with these keys:\n'
'"vendor" (string, merchant name),\n'
'"amount" (number, the total amount charged — look for "Total", "Amount Due", "Grand Total"),\n'
f'"date" (string YYYY-MM-DD, use {date_hint or today} if not found),\n'
f'"product_name" (string, pick the best match from [{product_list}] or empty string).\n\n'
f'Receipt text (first 2000 chars):\n{text[:2000]}\n\nJSON only:'
)
try:
resp = await self._llm.submit(
[{'role': 'user', 'content': prompt}],
@@ -177,8 +224,8 @@ class ExpensesAgent(BaseAgent):
return {
'vendor': str(data.get('vendor', filename)),
'amount': float(data.get('amount', 0.0)),
'date': str(data.get('date', _date.today().isoformat())),
'description': str(data.get('description', '')),
'date': str(data.get('date') or date_hint or today),
'product_name': str(data.get('product_name', '')),
}
except Exception as exc:
logger.warning('Receipt parse failed for %s: %s', filename, exc)

View File

@@ -1,4 +1,5 @@
from __future__ import annotations
import json
import logging
from datetime import datetime, timedelta
@@ -11,12 +12,14 @@ class OperationalStore:
async def store(self, scope, summary, raw_data=None, ttl_days=90, source_directive_id=None):
expires_at = datetime.utcnow() + timedelta(days=ttl_days)
# asyncpg JSONB column expects a JSON string, not a Python dict
raw_data_json = json.dumps(raw_data) if raw_data is not None else None
async with self._pool.acquire(timeout=10) as conn:
await conn.execute(
"""INSERT INTO ab_operational_memory
(scope, summary, raw_data, source_directive_id, expires_at)
VALUES ($1, $2, $3, $4, $5)""",
scope, summary, raw_data, source_directive_id, expires_at)
scope, summary, raw_data_json, source_directive_id, expires_at)
async def get_recent(self, scope, limit=10):
async with self._pool.acquire(timeout=10) as conn:

View File

@@ -107,6 +107,17 @@ class ExpensesTools:
logger.warning('get_default_expense_product failed: %s', exc)
return None
async def get_expense_products(self) -> list:
"""Return all expensable products for category selection."""
try:
return await self._o.search_read(
'product.product',
[('can_be_expensed', '=', True)],
['id', 'name'], limit=100)
except Exception as exc:
logger.warning('get_expense_products failed: %s', exc)
return []
async def create_expense_sheet(self, name: str, employee_id: int):
return await self._o.create('hr.expense.sheet', {
'name': name,

View File

@@ -1,12 +1,17 @@
from __future__ import annotations
import base64
import hashlib
import io
import logging
import re
import zipfile
from pathlib import Path
logger = logging.getLogger(__name__)
# Extract YYYYMMDD from filenames like 20260509_180857.jpg
_DATE_PATTERN = re.compile(r'(\d{4})(\d{2})(\d{2})_\d{6}')
_MIME = {
'.jpg': 'image/jpeg', '.jpeg': 'image/jpeg',
'.png': 'image/png', '.gif': 'image/gif',
@@ -31,6 +36,13 @@ def parse_upload(filename: str, data: bytes) -> list[dict]:
b64 = base64.b64encode(data).decode()
mimetype = _MIME.get(ext, 'application/octet-stream')
sha256 = hashlib.sha256(data).hexdigest()
# Extract date from timestamp-style filenames (e.g. 20260509_180857.jpg)
date_from_name = None
m = _DATE_PATTERN.search(filename)
if m:
date_from_name = f'{m.group(1)}-{m.group(2)}-{m.group(3)}'
if ext in _IMAGE_EXTS:
text = _ocr_image(data, filename)
@@ -46,7 +58,8 @@ def parse_upload(filename: str, data: bytes) -> list[dict]:
except Exception:
text = f'[Binary file: {filename}]'
return [{'filename': filename, 'text': text, 'b64': b64, 'mimetype': mimetype}]
return [{'filename': filename, 'text': text, 'b64': b64, 'mimetype': mimetype,
'sha256': sha256, 'date_from_name': date_from_name}]
def _extract_zip(zip_filename: str, data: bytes) -> list[dict]: