Files
odoo-ai/agent_service/tools/receipt_parser.py
Carlos Garcia 4b7223a139 feat: file upload + expense report creation from Discuss attachments
- Discuss bot now reads ir.attachment from incoming messages; file-only
  messages no longer silently dropped
- ZIP files are described (contents listed) and bot asks clarifying
  question before acting; user's follow-up reply looks back for pending
  attachments so files don't need to be re-uploaded
- receipt_parser: extracts text from ZIP (recursive), JPG/PNG/etc (OCR),
  PDF (pdfplumber), HTML, TXT
- expenses_agent: full rewrite fixing broken method signatures; adds
  create_expense_sheet / create_expense / attach_receipt flow driven by
  LLM receipt parsing (Ollama, HIPAA-locked)
- master_agent: extra_context threads receipts + user_id into directives
- FastAPI /upload multipart endpoint; registered in main.py
- Odoo /ai/upload controller proxies files to agent service
- ab_ai_bot: dispatch_message_with_files() for multipart uploads

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-16 01:02:24 -04:00

134 lines
4.5 KiB
Python

from __future__ import annotations
import base64
import io
import logging
import zipfile
from pathlib import Path
logger = logging.getLogger(__name__)
_MIME = {
'.jpg': 'image/jpeg', '.jpeg': 'image/jpeg',
'.png': 'image/png', '.gif': 'image/gif',
'.bmp': 'image/bmp', '.tiff': 'image/tiff', '.tif': 'image/tiff',
'.webp': 'image/webp', '.pdf': 'application/pdf',
'.html': 'text/html', '.htm': 'text/html',
'.txt': 'text/plain', '.zip': 'application/zip',
}
_IMAGE_EXTS = {'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.tif', '.webp'}
def parse_upload(filename: str, data: bytes) -> list[dict]:
"""
Parse one uploaded file into a list of receipt dicts.
ZIP files are recursively unpacked; all other types return a single entry.
Each dict: {filename, text, b64, mimetype}
"""
ext = Path(filename).suffix.lower()
if ext == '.zip':
return _extract_zip(filename, data)
b64 = base64.b64encode(data).decode()
mimetype = _MIME.get(ext, 'application/octet-stream')
if ext in _IMAGE_EXTS:
text = _ocr_image(data, filename)
elif ext == '.pdf':
text = _extract_pdf(data, filename)
elif ext in ('.html', '.htm'):
text = _extract_html(data, filename)
elif ext == '.txt':
text = data.decode('utf-8', errors='replace')
else:
try:
text = data.decode('utf-8', errors='replace')
except Exception:
text = f'[Binary file: {filename}]'
return [{'filename': filename, 'text': text, 'b64': b64, 'mimetype': mimetype}]
def _extract_zip(zip_filename: str, data: bytes) -> list[dict]:
results = []
try:
with zipfile.ZipFile(io.BytesIO(data)) as zf:
for member in zf.namelist():
if member.endswith('/'):
continue
try:
member_data = zf.read(member)
results.extend(parse_upload(Path(member).name, member_data))
except Exception as exc:
logger.warning('receipt_parser: zip member %s failed: %s', member, exc)
except Exception as exc:
logger.error('receipt_parser: zip %s failed: %s', zip_filename, exc)
return results
def _ocr_image(data: bytes, filename: str) -> str:
try:
from PIL import Image
import pytesseract
img = Image.open(io.BytesIO(data))
return pytesseract.image_to_string(img).strip()
except ImportError:
logger.warning('pytesseract/Pillow not installed — OCR unavailable for %s', filename)
return f'[Image: {filename} — install pytesseract+Pillow for OCR]'
except Exception as exc:
logger.warning('OCR failed for %s: %s', filename, exc)
return f'[Image: {filename} — OCR failed: {exc}]'
def _extract_pdf(data: bytes, filename: str) -> str:
try:
import pdfplumber
parts = []
with pdfplumber.open(io.BytesIO(data)) as pdf:
for page in pdf.pages:
t = page.extract_text()
if t:
parts.append(t)
return '\n'.join(parts).strip()
except ImportError:
logger.warning('pdfplumber not installed — PDF extraction unavailable for %s', filename)
return f'[PDF: {filename} — install pdfplumber for text extraction]'
except Exception as exc:
logger.warning('PDF extraction failed for %s: %s', filename, exc)
return f'[PDF: {filename} — extraction failed: {exc}]'
def _extract_html(data: bytes, filename: str) -> str:
try:
from html.parser import HTMLParser
class _TextExtractor(HTMLParser):
def __init__(self):
super().__init__()
self._parts: list[str] = []
self._skip = False
def handle_starttag(self, tag, attrs):
if tag in ('script', 'style'):
self._skip = True
def handle_endtag(self, tag):
if tag in ('script', 'style'):
self._skip = False
def handle_data(self, data):
if not self._skip:
s = data.strip()
if s:
self._parts.append(s)
def text(self):
return ' '.join(self._parts)
parser = _TextExtractor()
parser.feed(data.decode('utf-8', errors='replace'))
return parser.text()
except Exception as exc:
logger.warning('HTML extraction failed for %s: %s', filename, exc)
return f'[HTML: {filename} — extraction failed: {exc}]'