from __future__ import annotations import base64 import hashlib import io import logging import os import re import zipfile from pathlib import Path logger = logging.getLogger(__name__) # Extract YYYYMMDD from filenames like 20260509_180857.jpg _DATE_PATTERN = re.compile(r'(\d{4})(\d{2})(\d{2})_\d{6}') # --------------------------------------------------------------------------- # OCR engine selection # --------------------------------------------------------------------------- # Set OCR_ENGINE=tesseract in .env to revert to the old Tesseract pipeline. # Default is easyocr which handles phone photos and difficult fonts better. def _get_ocr_engine() -> str: return os.environ.get('OCR_ENGINE', 'easyocr').lower() # EasyOCR Reader is expensive to initialise (~10-30s on first call while it # loads model weights). Cache it as a module-level singleton so the cost is # paid once per container start, not once per receipt. _easyocr_reader = None def _get_easyocr_reader(): global _easyocr_reader if _easyocr_reader is None: import easyocr logger.info('EasyOCR: initialising reader (first use — loading model weights)') _easyocr_reader = easyocr.Reader(['en'], verbose=False) logger.info('EasyOCR: reader ready') return _easyocr_reader _MIME = { '.jpg': 'image/jpeg', '.jpeg': 'image/jpeg', '.png': 'image/png', '.gif': 'image/gif', '.bmp': 'image/bmp', '.tiff': 'image/tiff', '.tif': 'image/tiff', '.webp': 'image/webp', '.pdf': 'application/pdf', '.html': 'text/html', '.htm': 'text/html', '.txt': 'text/plain', '.zip': 'application/zip', } _IMAGE_EXTS = {'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.tif', '.webp'} def parse_upload(filename: str, data: bytes) -> list[dict]: """ Parse one uploaded file into a list of receipt dicts. ZIP files are recursively unpacked; all other types return a single entry. Each dict: {filename, text, b64, mimetype} """ ext = Path(filename).suffix.lower() if ext == '.zip': return _extract_zip(filename, data) b64 = base64.b64encode(data).decode() mimetype = _MIME.get(ext, 'application/octet-stream') sha256 = hashlib.sha256(data).hexdigest() # Extract date from timestamp-style filenames (e.g. 20260509_180857.jpg) date_from_name = None m = _DATE_PATTERN.search(filename) if m: date_from_name = f'{m.group(1)}-{m.group(2)}-{m.group(3)}' if ext in _IMAGE_EXTS: text = _ocr_image(data, filename) elif ext == '.pdf': text = _extract_pdf(data, filename) elif ext in ('.html', '.htm'): text = _extract_html(data, filename) elif ext == '.txt': text = data.decode('utf-8', errors='replace') else: try: text = data.decode('utf-8', errors='replace') except Exception: text = f'[Binary file: {filename}]' return [{'filename': filename, 'text': text, 'b64': b64, 'mimetype': mimetype, 'sha256': sha256, 'date_from_name': date_from_name}] def _extract_zip(zip_filename: str, data: bytes) -> list[dict]: results = [] try: with zipfile.ZipFile(io.BytesIO(data)) as zf: for member in zf.namelist(): if member.endswith('/'): continue try: member_data = zf.read(member) results.extend(parse_upload(Path(member).name, member_data)) except Exception as exc: logger.warning('receipt_parser: zip member %s failed: %s', member, exc) except Exception as exc: logger.error('receipt_parser: zip %s failed: %s', zip_filename, exc) return results def _ocr_image(data: bytes, filename: str) -> str: """Dispatch to the configured OCR engine (EasyOCR or Tesseract).""" if _get_ocr_engine() == 'easyocr': return _ocr_image_easyocr(data, filename) return _ocr_image_tesseract(data, filename) def _easyocr_to_text(results: list) -> str: """Convert EasyOCR result list to a single text string. EasyOCR returns a list of (bbox, text, confidence) tuples. We filter low-confidence detections, sort top-to-bottom then left-to-right, and join with newlines. Receipt images are typically single-column so a simple y-sort produces a clean reading order. Adjacent words on the same horizontal band (y within 40% of the tallest box's height in that group) are merged onto one line — this keeps a label like "TOTAL 42.90" on a single line instead of two lines, which is important for the labeled-total regex in expenses_agent.py. """ if not results: return '' # Filter and extract geometry boxes = [] for bbox, text, conf in results: if conf < 0.3 or not text.strip(): continue ys = [pt[1] for pt in bbox] xs = [pt[0] for pt in bbox] boxes.append({ 'y_top': min(ys), 'y_bot': max(ys), 'x_left': min(xs), 'text': text.strip(), }) if not boxes: return '' boxes.sort(key=lambda b: (b['y_top'], b['x_left'])) # Group into visual lines lines: list[list[dict]] = [] current: list[dict] = [boxes[0]] for box in boxes[1:]: # Compute the current line's y-span cy_top = min(b['y_top'] for b in current) cy_bot = max(b['y_bot'] for b in current) height = max(cy_bot - cy_top, 1) # This box belongs to the same line if its top overlaps the current band if box['y_top'] < cy_bot - height * 0.3: current.append(box) else: lines.append(sorted(current, key=lambda b: b['x_left'])) current = [box] lines.append(sorted(current, key=lambda b: b['x_left'])) return '\n'.join(' '.join(b['text'] for b in line) for line in lines) def _ocr_image_easyocr(data: bytes, filename: str) -> str: """EasyOCR pipeline — better than Tesseract on phone photos, thermal paper, dot-matrix, and rotated receipts. Falls back to Tesseract on any error. """ try: import numpy as np from PIL import Image, ImageOps reader = _get_easyocr_reader() img = Image.open(io.BytesIO(data)) # EXIF rotation — same fix as the Tesseract pipeline try: img = ImageOps.exif_transpose(img) except Exception: pass # Resize very large images for speed; EasyOCR is accurate but slow on # images wider than ~2000px (typical 12MP phone photo is ~4000px wide). max_w = 2000 if img.width > max_w: scale = max_w / img.width img = img.resize((max_w, int(img.height * scale)), Image.LANCZOS) # EasyOCR accepts a numpy array directly img_array = np.array(img) results = reader.readtext(img_array) text = _easyocr_to_text(results) logger.debug('EasyOCR %s: %d chars', filename, len(text)) if len(text) >= 20: return text # Very short result — try Tesseract as fallback before giving up logger.warning('EasyOCR %s: only %d chars, trying Tesseract fallback', filename, len(text)) tess = _ocr_image_tesseract(data, filename) return tess if len(tess) > len(text) else text except ImportError: logger.warning('easyocr/numpy not installed — falling back to Tesseract for %s', filename) return _ocr_image_tesseract(data, filename) except Exception as exc: logger.warning('EasyOCR failed for %s: %s — falling back to Tesseract', filename, exc) return _ocr_image_tesseract(data, filename) def _ocr_image_tesseract(data: bytes, filename: str) -> str: """Tesseract-based OCR pipeline with phone-photo preprocessing.""" try: from PIL import Image, ImageFilter, ImageOps import pytesseract img = Image.open(io.BytesIO(data)) # ── Step 1: EXIF rotation correction ───────────────────────────────── # Phone photos are stored with EXIF orientation metadata but the pixel # data is not actually rotated. Without this fix Tesseract reads a # portrait receipt as a landscape image and produces garbage. try: img = ImageOps.exif_transpose(img) except Exception: pass # exif_transpose requires Pillow >= 6.0 # ── Step 1b: Content-based rotation correction ─────────────────────── # EXIF transpose (Step 1) only corrects for phone-tilt metadata. # If the receipt was physically laid sideways in the frame (e.g. a # landscape receipt photographed with the phone upright), the pixels # are genuinely rotated and EXIF can't help. Ask Tesseract's OSD # engine to detect the text orientation and rotate to correct it. try: osd = pytesseract.image_to_osd(img, config='--psm 0') _am = re.search(r'Rotate:\s*(\d+)', osd) if _am: _angle = int(_am.group(1)) if _angle: img = img.rotate(_angle, expand=True) logger.debug('OSD: rotated %s by %d°', filename, _angle) except Exception: pass # OSD unavailable or not enough text — proceed without correction # ── Step 2: Resize to working width (1800px) ────────────────────────── max_w = 1800 if img.width > max_w: scale = max_w / img.width img = img.resize((max_w, int(img.height * scale)), Image.LANCZOS) # Upscale very small images — Tesseract accuracy drops below ~600px elif img.width < 600: scale = 600 / img.width img = img.resize((600, int(img.height * scale)), Image.LANCZOS) # ── Step 3: Grayscale + contrast ───────────────────────────────────── img = ImageOps.grayscale(img) img = ImageOps.autocontrast(img) img_gray = img # save grayscale for fallback — before binarization # ── Step 4: Sharpen then binarize ───────────────────────────────────── # Sharpen first so edges are crisp before thresholding. # Threshold 160 (was 140) — gentler for faint thermal-print receipts # where light gray text would be wiped out by the stricter threshold. img = img.filter(ImageFilter.SHARPEN) img = img.point(lambda x: 0 if x < 160 else 255) # ── Step 5: OCR — try PSM modes best-suited for receipt layout ──────── # PSM 6 = single uniform text block (best for single-column receipts) # PSM 4 = single column, variable text sizes (wider fallback) # PSM 11 = sparse text — last resort for badly segmented images for psm in (6, 4, 11): try: text = pytesseract.image_to_string( img, config=f'--oem 3 --psm {psm}').strip() if len(text) >= 20: logger.debug('Tesseract OCR %s: psm=%d %d chars', filename, psm, len(text)) return text except Exception: pass # ── Step 5b: Grayscale fallback ─────────────────────────────────────── # Binarization at threshold 160 can destroy dot-matrix and certain # thermal-print fonts (e.g. parking kiosk receipts) where character # pixels are close to the threshold and get wiped to white. If every # binarized attempt failed, retry on the plain grayscale image — # Tesseract handles grey-level input reasonably well for these cases. for psm in (6, 4, 11): try: text = pytesseract.image_to_string( img_gray, config=f'--oem 3 --psm {psm}').strip() if len(text) >= 20: logger.debug('Tesseract grayscale fallback %s: psm=%d %d chars', filename, psm, len(text)) return text except Exception: pass logger.warning('Tesseract OCR %s: all PSM modes returned < 20 chars', filename) return '' except ImportError: logger.warning('pytesseract/Pillow not installed — OCR unavailable for %s', filename) return f'[Image: {filename} — install pytesseract+Pillow for OCR]' except Exception as exc: logger.warning('Tesseract OCR failed for %s: %s', filename, exc) return f'[Image: {filename} — OCR failed: {exc}]' def _extract_pdf(data: bytes, filename: str) -> str: try: import pdfplumber parts = [] with pdfplumber.open(io.BytesIO(data)) as pdf: for page in pdf.pages: t = page.extract_text() if t: parts.append(t) return '\n'.join(parts).strip() except ImportError: logger.warning('pdfplumber not installed — PDF extraction unavailable for %s', filename) return f'[PDF: {filename} — install pdfplumber for text extraction]' except Exception as exc: logger.warning('PDF extraction failed for %s: %s', filename, exc) return f'[PDF: {filename} — extraction failed: {exc}]' def _extract_html(data: bytes, filename: str) -> str: try: from html.parser import HTMLParser class _TextExtractor(HTMLParser): def __init__(self): super().__init__() self._parts: list[str] = [] self._skip = False def handle_starttag(self, tag, attrs): if tag in ('script', 'style'): self._skip = True def handle_endtag(self, tag): if tag in ('script', 'style'): self._skip = False def handle_data(self, data): if not self._skip: s = data.strip() if s: self._parts.append(s) def text(self): return ' '.join(self._parts) parser = _TextExtractor() parser.feed(data.decode('utf-8', errors='replace')) return parser.text() except Exception as exc: logger.warning('HTML extraction failed for %s: %s', filename, exc) return f'[HTML: {filename} — extraction failed: {exc}]'