Three receipts per batch were failing with JSONDecodeError (e.g. "Expecting ':' delimiter: line 1 column 90") because activeblue-chat (llama3.2-vision) occasionally outputs near-JSON with trailing commas, single-quoted strings, or unquoted keys. Two-layer fix: 1. Add format='json' to the Ollama chat call — Ollama JSON mode forces syntactically valid output at the sampler level, eliminating most structural errors. 2. Add _repair_json() fallback that runs on any remaining JSONDecodeError: strips trailing commas, converts single→double quotes, and quotes unquoted keys. If repair succeeds, the result is re-serialised as canonical JSON before being returned. Also re-serialise with json.dumps() on success so the fast path in _parse_receipt_text always receives clean, canonical JSON regardless of whitespace or key ordering in the model's original output. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
286 lines
11 KiB
Python
286 lines
11 KiB
Python
from __future__ import annotations
|
|
import base64
|
|
import hashlib
|
|
import io
|
|
import logging
|
|
import re
|
|
import zipfile
|
|
from pathlib import Path
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Extract YYYYMMDD from filenames like 20260509_180857.jpg
|
|
_DATE_PATTERN = re.compile(r'(\d{4})(\d{2})(\d{2})_\d{6}')
|
|
|
|
_MIME = {
|
|
'.jpg': 'image/jpeg', '.jpeg': 'image/jpeg',
|
|
'.png': 'image/png', '.gif': 'image/gif',
|
|
'.bmp': 'image/bmp', '.tiff': 'image/tiff', '.tif': 'image/tiff',
|
|
'.webp': 'image/webp', '.pdf': 'application/pdf',
|
|
'.html': 'text/html', '.htm': 'text/html',
|
|
'.txt': 'text/plain', '.zip': 'application/zip',
|
|
}
|
|
|
|
_IMAGE_EXTS = {'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.tif', '.webp'}
|
|
|
|
|
|
def parse_upload(filename: str, data: bytes) -> list[dict]:
|
|
"""
|
|
Parse one uploaded file into a list of receipt dicts.
|
|
ZIP files are recursively unpacked; all other types return a single entry.
|
|
Each dict: {filename, text, b64, mimetype}
|
|
"""
|
|
ext = Path(filename).suffix.lower()
|
|
if ext == '.zip':
|
|
return _extract_zip(filename, data)
|
|
|
|
b64 = base64.b64encode(data).decode()
|
|
mimetype = _MIME.get(ext, 'application/octet-stream')
|
|
sha256 = hashlib.sha256(data).hexdigest()
|
|
|
|
# Extract date from timestamp-style filenames (e.g. 20260509_180857.jpg)
|
|
date_from_name = None
|
|
m = _DATE_PATTERN.search(filename)
|
|
if m:
|
|
date_from_name = f'{m.group(1)}-{m.group(2)}-{m.group(3)}'
|
|
|
|
if ext in _IMAGE_EXTS:
|
|
text = _ocr_image(data, filename)
|
|
elif ext == '.pdf':
|
|
text = _extract_pdf(data, filename)
|
|
elif ext in ('.html', '.htm'):
|
|
text = _extract_html(data, filename)
|
|
elif ext == '.txt':
|
|
text = data.decode('utf-8', errors='replace')
|
|
else:
|
|
try:
|
|
text = data.decode('utf-8', errors='replace')
|
|
except Exception:
|
|
text = f'[Binary file: {filename}]'
|
|
|
|
return [{'filename': filename, 'text': text, 'b64': b64, 'mimetype': mimetype,
|
|
'sha256': sha256, 'date_from_name': date_from_name}]
|
|
|
|
|
|
def _extract_zip(zip_filename: str, data: bytes) -> list[dict]:
|
|
results = []
|
|
try:
|
|
with zipfile.ZipFile(io.BytesIO(data)) as zf:
|
|
for member in zf.namelist():
|
|
if member.endswith('/'):
|
|
continue
|
|
try:
|
|
member_data = zf.read(member)
|
|
results.extend(parse_upload(Path(member).name, member_data))
|
|
except Exception as exc:
|
|
logger.warning('receipt_parser: zip member %s failed: %s', member, exc)
|
|
except Exception as exc:
|
|
logger.error('receipt_parser: zip %s failed: %s', zip_filename, exc)
|
|
return results
|
|
|
|
|
|
def _ocr_image(data: bytes, filename: str) -> str:
|
|
"""Extract text from a receipt image.
|
|
|
|
Tries vision-model OCR first when VISION_OCR_MODEL is configured,
|
|
then falls back to the Tesseract pipeline.
|
|
"""
|
|
from agent_service.config import get_settings
|
|
settings = get_settings()
|
|
if settings.vision_ocr_model:
|
|
result = _ocr_image_vision(data, filename,
|
|
settings.ollama_url,
|
|
settings.vision_ocr_model)
|
|
if result:
|
|
return result
|
|
logger.warning('Vision OCR returned empty for %s — falling back to Tesseract', filename)
|
|
return _ocr_image_tesseract(data, filename)
|
|
|
|
|
|
def _ocr_image_vision(data: bytes, filename: str, ollama_url: str, model: str) -> str:
|
|
"""Use an Ollama vision model to extract receipt data directly as JSON.
|
|
|
|
Returns a JSON string {vendor, amount, date, time, category} so the
|
|
expenses agent can skip the second LLM extraction step entirely.
|
|
Returns empty string on any failure so the caller falls back to Tesseract.
|
|
"""
|
|
import json as _json
|
|
import re as _re
|
|
|
|
def _repair_json(s: str) -> str:
|
|
"""Fix the most common LLM JSON formatting mistakes.
|
|
|
|
Handles:
|
|
- trailing commas before } or ] → {"a":1,} becomes {"a":1}
|
|
- single-quoted strings → {'a':'b'} becomes {"a":"b"}
|
|
- unquoted string keys → {a: "b"} becomes {"a": "b"}
|
|
"""
|
|
# trailing commas
|
|
s = _re.sub(r',\s*([}\]])', r'\1', s)
|
|
# single-quoted strings (careful around apostrophes in values)
|
|
s = _re.sub(r"'([^']*)'", r'"\1"', s)
|
|
# unquoted keys: word characters before a colon
|
|
s = _re.sub(r'(?<!["\w])(\w+)\s*:', r'"\1":', s)
|
|
return s
|
|
|
|
try:
|
|
import ollama as _ollama
|
|
client = _ollama.Client(host=ollama_url)
|
|
response = client.chat(
|
|
model=model,
|
|
format='json', # Ollama JSON mode — forces syntactically valid output
|
|
messages=[{
|
|
'role': 'user',
|
|
'content': (
|
|
'You are a receipt data extractor. '
|
|
'Read this receipt image and extract the following fields. '
|
|
'Copy values EXACTLY as printed — do NOT guess, infer, or '
|
|
'invent values you cannot clearly see.\n\n'
|
|
'Fields to extract:\n'
|
|
'- vendor: the store or restaurant name exactly as printed; '
|
|
'empty string if not clearly visible\n'
|
|
'- amount: the FINAL total the customer paid; find a line '
|
|
'labeled "Total", "Grand Total", "Amount Due", or "Balance Due"; '
|
|
'copy the number exactly; do NOT use subtotal, tax, or tip; '
|
|
'return 0 if no clearly labeled final total is visible\n'
|
|
'- date: transaction date in YYYY-MM-DD format; '
|
|
'null if not clearly visible\n'
|
|
'- time: transaction time in HH:MM 24-hour format; '
|
|
'null if not clearly visible\n'
|
|
'- category: one of: meals, fuel, hotel, office, transport, other\n\n'
|
|
'Return ONLY a valid JSON object, no commentary, no markdown:\n'
|
|
'{"vendor":"...","amount":0.00,"date":"YYYY-MM-DD or null",'
|
|
'"time":"HH:MM or null","category":"..."}'
|
|
),
|
|
'images': [data],
|
|
}],
|
|
)
|
|
if isinstance(response, dict):
|
|
raw = (response.get('message', {}).get('content') or '').strip()
|
|
else:
|
|
raw = (response.message.content or '').strip()
|
|
|
|
# Must contain a JSON object, not prose
|
|
first, last = raw.find('{'), raw.rfind('}')
|
|
if first == -1 or last <= first:
|
|
logger.warning('Vision OCR %s: model returned prose, falling back to Tesseract',
|
|
filename)
|
|
return ''
|
|
json_str = raw[first:last + 1]
|
|
|
|
# Parse — on failure attempt common repairs then retry once
|
|
try:
|
|
parsed = _json.loads(json_str)
|
|
except _json.JSONDecodeError as json_err:
|
|
repaired = _repair_json(json_str)
|
|
try:
|
|
parsed = _json.loads(repaired)
|
|
logger.debug('Vision OCR %s: JSON repaired successfully', filename)
|
|
except _json.JSONDecodeError:
|
|
logger.warning('Vision OCR %s: JSON parse failed (%s), falling back',
|
|
filename, json_err)
|
|
return ''
|
|
|
|
if 'amount' not in parsed:
|
|
logger.warning('Vision OCR %s: JSON missing amount field, falling back', filename)
|
|
return ''
|
|
logger.debug('Vision OCR %s (%s): extracted JSON ok', filename, model)
|
|
# Re-serialise so downstream always gets clean, canonical JSON
|
|
return _json.dumps(parsed)
|
|
except ImportError:
|
|
logger.warning('ollama package not installed — vision OCR unavailable for %s', filename)
|
|
return ''
|
|
except Exception as exc:
|
|
logger.warning('Vision OCR failed for %s: %s', filename, exc)
|
|
return ''
|
|
|
|
|
|
def _ocr_image_tesseract(data: bytes, filename: str) -> str:
|
|
"""Tesseract-based OCR pipeline (fallback)."""
|
|
try:
|
|
from PIL import Image, ImageFilter, ImageOps
|
|
import pytesseract
|
|
img = Image.open(io.BytesIO(data))
|
|
|
|
# Resize very large images — tesseract is slower and less accurate at
|
|
# phone-camera resolution; 1800px wide is plenty for receipt text.
|
|
max_w = 1800
|
|
if img.width > max_w:
|
|
scale = max_w / img.width
|
|
img = img.resize((max_w, int(img.height * scale)), Image.LANCZOS)
|
|
|
|
# Grayscale + adaptive binarisation + sharpen
|
|
img = ImageOps.grayscale(img)
|
|
img = ImageOps.autocontrast(img)
|
|
img = img.point(lambda x: 0 if x < 140 else 255)
|
|
img = img.filter(ImageFilter.SHARPEN)
|
|
|
|
# psm 1 = automatic page segmentation + OSD (handles rotated receipts).
|
|
# Fall back to psm 6 if OSD data is missing.
|
|
try:
|
|
text = pytesseract.image_to_string(img, config='--oem 3 --psm 1').strip()
|
|
except Exception:
|
|
text = pytesseract.image_to_string(img, config='--oem 3 --psm 6').strip()
|
|
|
|
logger.debug('Tesseract OCR %s: %d chars', filename, len(text))
|
|
return text
|
|
except ImportError:
|
|
logger.warning('pytesseract/Pillow not installed — OCR unavailable for %s', filename)
|
|
return f'[Image: {filename} — install pytesseract+Pillow for OCR]'
|
|
except Exception as exc:
|
|
logger.warning('Tesseract OCR failed for %s: %s', filename, exc)
|
|
return f'[Image: {filename} — OCR failed: {exc}]'
|
|
|
|
|
|
def _extract_pdf(data: bytes, filename: str) -> str:
|
|
try:
|
|
import pdfplumber
|
|
parts = []
|
|
with pdfplumber.open(io.BytesIO(data)) as pdf:
|
|
for page in pdf.pages:
|
|
t = page.extract_text()
|
|
if t:
|
|
parts.append(t)
|
|
return '\n'.join(parts).strip()
|
|
except ImportError:
|
|
logger.warning('pdfplumber not installed — PDF extraction unavailable for %s', filename)
|
|
return f'[PDF: {filename} — install pdfplumber for text extraction]'
|
|
except Exception as exc:
|
|
logger.warning('PDF extraction failed for %s: %s', filename, exc)
|
|
return f'[PDF: {filename} — extraction failed: {exc}]'
|
|
|
|
|
|
def _extract_html(data: bytes, filename: str) -> str:
|
|
try:
|
|
from html.parser import HTMLParser
|
|
|
|
class _TextExtractor(HTMLParser):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self._parts: list[str] = []
|
|
self._skip = False
|
|
|
|
def handle_starttag(self, tag, attrs):
|
|
if tag in ('script', 'style'):
|
|
self._skip = True
|
|
|
|
def handle_endtag(self, tag):
|
|
if tag in ('script', 'style'):
|
|
self._skip = False
|
|
|
|
def handle_data(self, data):
|
|
if not self._skip:
|
|
s = data.strip()
|
|
if s:
|
|
self._parts.append(s)
|
|
|
|
def text(self):
|
|
return ' '.join(self._parts)
|
|
|
|
parser = _TextExtractor()
|
|
parser.feed(data.decode('utf-8', errors='replace'))
|
|
return parser.text()
|
|
except Exception as exc:
|
|
logger.warning('HTML extraction failed for %s: %s', filename, exc)
|
|
return f'[HTML: {filename} — extraction failed: {exc}]'
|