Files
odoo-ai/agent_service/tools/receipt_parser.py
2026-05-17 11:59:11 -04:00

222 lines
8.1 KiB
Python

from __future__ import annotations
import base64
import hashlib
import io
import logging
import re
import zipfile
from pathlib import Path
logger = logging.getLogger(__name__)
# Extract YYYYMMDD from filenames like 20260509_180857.jpg
_DATE_PATTERN = re.compile(r'(\d{4})(\d{2})(\d{2})_\d{6}')
_MIME = {
'.jpg': 'image/jpeg', '.jpeg': 'image/jpeg',
'.png': 'image/png', '.gif': 'image/gif',
'.bmp': 'image/bmp', '.tiff': 'image/tiff', '.tif': 'image/tiff',
'.webp': 'image/webp', '.pdf': 'application/pdf',
'.html': 'text/html', '.htm': 'text/html',
'.txt': 'text/plain', '.zip': 'application/zip',
}
_IMAGE_EXTS = {'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.tif', '.webp'}
def parse_upload(filename: str, data: bytes) -> list[dict]:
"""
Parse one uploaded file into a list of receipt dicts.
ZIP files are recursively unpacked; all other types return a single entry.
Each dict: {filename, text, b64, mimetype}
"""
ext = Path(filename).suffix.lower()
if ext == '.zip':
return _extract_zip(filename, data)
b64 = base64.b64encode(data).decode()
mimetype = _MIME.get(ext, 'application/octet-stream')
sha256 = hashlib.sha256(data).hexdigest()
# Extract date from timestamp-style filenames (e.g. 20260509_180857.jpg)
date_from_name = None
m = _DATE_PATTERN.search(filename)
if m:
date_from_name = f'{m.group(1)}-{m.group(2)}-{m.group(3)}'
if ext in _IMAGE_EXTS:
text = _ocr_image(data, filename)
elif ext == '.pdf':
text = _extract_pdf(data, filename)
elif ext in ('.html', '.htm'):
text = _extract_html(data, filename)
elif ext == '.txt':
text = data.decode('utf-8', errors='replace')
else:
try:
text = data.decode('utf-8', errors='replace')
except Exception:
text = f'[Binary file: {filename}]'
return [{'filename': filename, 'text': text, 'b64': b64, 'mimetype': mimetype,
'sha256': sha256, 'date_from_name': date_from_name}]
def _extract_zip(zip_filename: str, data: bytes) -> list[dict]:
results = []
try:
with zipfile.ZipFile(io.BytesIO(data)) as zf:
for member in zf.namelist():
if member.endswith('/'):
continue
try:
member_data = zf.read(member)
results.extend(parse_upload(Path(member).name, member_data))
except Exception as exc:
logger.warning('receipt_parser: zip member %s failed: %s', member, exc)
except Exception as exc:
logger.error('receipt_parser: zip %s failed: %s', zip_filename, exc)
return results
def _ocr_image(data: bytes, filename: str) -> str:
"""Extract text from a receipt image.
Tries vision-model OCR first when VISION_OCR_MODEL is configured,
then falls back to the Tesseract pipeline.
"""
from agent_service.config import get_settings
settings = get_settings()
if settings.vision_ocr_model:
result = _ocr_image_vision(data, filename,
settings.ollama_url,
settings.vision_ocr_model)
if result:
return result
logger.warning('Vision OCR returned empty for %s — falling back to Tesseract', filename)
return _ocr_image_tesseract(data, filename)
def _ocr_image_vision(data: bytes, filename: str, ollama_url: str, model: str) -> str:
"""Use an Ollama vision model to read a receipt image."""
try:
import ollama as _ollama
client = _ollama.Client(host=ollama_url)
response = client.chat(
model=model,
messages=[{
'role': 'user',
'content': (
'This is a photo of a paper receipt. '
'Transcribe ALL text exactly as it appears on the receipt. '
'Preserve every line in order: store name, address, date, time, '
'each line item with price, subtotal, tax, tip if present, and '
'the final total. Output the raw text only — no commentary, '
'no markdown, no explanations.'
),
'images': [data],
}],
)
if isinstance(response, dict):
text = (response.get('message', {}).get('content') or '').strip()
else:
text = (response.message.content or '').strip()
logger.debug('Vision OCR %s (%s): %d chars', filename, model, len(text))
return text
except ImportError:
logger.warning('ollama package not installed — vision OCR unavailable for %s', filename)
return ''
except Exception as exc:
logger.warning('Vision OCR failed for %s: %s', filename, exc)
return ''
def _ocr_image_tesseract(data: bytes, filename: str) -> str:
"""Tesseract-based OCR pipeline (fallback)."""
try:
from PIL import Image, ImageFilter, ImageOps
import pytesseract
img = Image.open(io.BytesIO(data))
# Resize very large images — tesseract is slower and less accurate at
# phone-camera resolution; 1800px wide is plenty for receipt text.
max_w = 1800
if img.width > max_w:
scale = max_w / img.width
img = img.resize((max_w, int(img.height * scale)), Image.LANCZOS)
# Grayscale + adaptive binarisation + sharpen
img = ImageOps.grayscale(img)
img = ImageOps.autocontrast(img)
img = img.point(lambda x: 0 if x < 140 else 255)
img = img.filter(ImageFilter.SHARPEN)
# psm 1 = automatic page segmentation + OSD (handles rotated receipts).
# Fall back to psm 6 if OSD data is missing.
try:
text = pytesseract.image_to_string(img, config='--oem 3 --psm 1').strip()
except Exception:
text = pytesseract.image_to_string(img, config='--oem 3 --psm 6').strip()
logger.debug('Tesseract OCR %s: %d chars', filename, len(text))
return text
except ImportError:
logger.warning('pytesseract/Pillow not installed — OCR unavailable for %s', filename)
return f'[Image: {filename} — install pytesseract+Pillow for OCR]'
except Exception as exc:
logger.warning('Tesseract OCR failed for %s: %s', filename, exc)
return f'[Image: {filename} — OCR failed: {exc}]'
def _extract_pdf(data: bytes, filename: str) -> str:
try:
import pdfplumber
parts = []
with pdfplumber.open(io.BytesIO(data)) as pdf:
for page in pdf.pages:
t = page.extract_text()
if t:
parts.append(t)
return '\n'.join(parts).strip()
except ImportError:
logger.warning('pdfplumber not installed — PDF extraction unavailable for %s', filename)
return f'[PDF: {filename} — install pdfplumber for text extraction]'
except Exception as exc:
logger.warning('PDF extraction failed for %s: %s', filename, exc)
return f'[PDF: {filename} — extraction failed: {exc}]'
def _extract_html(data: bytes, filename: str) -> str:
try:
from html.parser import HTMLParser
class _TextExtractor(HTMLParser):
def __init__(self):
super().__init__()
self._parts: list[str] = []
self._skip = False
def handle_starttag(self, tag, attrs):
if tag in ('script', 'style'):
self._skip = True
def handle_endtag(self, tag):
if tag in ('script', 'style'):
self._skip = False
def handle_data(self, data):
if not self._skip:
s = data.strip()
if s:
self._parts.append(s)
def text(self):
return ' '.join(self._parts)
parser = _TextExtractor()
parser.feed(data.decode('utf-8', errors='replace'))
return parser.text()
except Exception as exc:
logger.warning('HTML extraction failed for %s: %s', filename, exc)
return f'[HTML: {filename} — extraction failed: {exc}]'