from __future__ import annotations import base64 import hashlib import io import logging import re import zipfile from pathlib import Path logger = logging.getLogger(__name__) # Extract YYYYMMDD from filenames like 20260509_180857.jpg _DATE_PATTERN = re.compile(r'(\d{4})(\d{2})(\d{2})_\d{6}') _MIME = { '.jpg': 'image/jpeg', '.jpeg': 'image/jpeg', '.png': 'image/png', '.gif': 'image/gif', '.bmp': 'image/bmp', '.tiff': 'image/tiff', '.tif': 'image/tiff', '.webp': 'image/webp', '.pdf': 'application/pdf', '.html': 'text/html', '.htm': 'text/html', '.txt': 'text/plain', '.zip': 'application/zip', } _IMAGE_EXTS = {'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.tif', '.webp'} def parse_upload(filename: str, data: bytes) -> list[dict]: """ Parse one uploaded file into a list of receipt dicts. ZIP files are recursively unpacked; all other types return a single entry. Each dict: {filename, text, b64, mimetype} """ ext = Path(filename).suffix.lower() if ext == '.zip': return _extract_zip(filename, data) b64 = base64.b64encode(data).decode() mimetype = _MIME.get(ext, 'application/octet-stream') sha256 = hashlib.sha256(data).hexdigest() # Extract date from timestamp-style filenames (e.g. 20260509_180857.jpg) date_from_name = None m = _DATE_PATTERN.search(filename) if m: date_from_name = f'{m.group(1)}-{m.group(2)}-{m.group(3)}' if ext in _IMAGE_EXTS: text = _ocr_image(data, filename) elif ext == '.pdf': text = _extract_pdf(data, filename) elif ext in ('.html', '.htm'): text = _extract_html(data, filename) elif ext == '.txt': text = data.decode('utf-8', errors='replace') else: try: text = data.decode('utf-8', errors='replace') except Exception: text = f'[Binary file: {filename}]' return [{'filename': filename, 'text': text, 'b64': b64, 'mimetype': mimetype, 'sha256': sha256, 'date_from_name': date_from_name}] def _extract_zip(zip_filename: str, data: bytes) -> list[dict]: results = [] try: with zipfile.ZipFile(io.BytesIO(data)) as zf: for member in zf.namelist(): if member.endswith('/'): continue try: member_data = zf.read(member) results.extend(parse_upload(Path(member).name, member_data)) except Exception as exc: logger.warning('receipt_parser: zip member %s failed: %s', member, exc) except Exception as exc: logger.error('receipt_parser: zip %s failed: %s', zip_filename, exc) return results def _ocr_image(data: bytes, filename: str) -> str: """Extract text from a receipt image. Tries vision-model OCR first when VISION_OCR_MODEL is configured, then falls back to the Tesseract pipeline. """ from agent_service.config import get_settings settings = get_settings() if settings.vision_ocr_model: result = _ocr_image_vision(data, filename, settings.ollama_url, settings.vision_ocr_model) if result: return result logger.warning('Vision OCR returned empty for %s — falling back to Tesseract', filename) return _ocr_image_tesseract(data, filename) def _ocr_image_vision(data: bytes, filename: str, ollama_url: str, model: str) -> str: """Use an Ollama vision model to extract receipt data directly as JSON. Returns a JSON string {vendor, amount, date, time, category} so the expenses agent can skip the second LLM extraction step entirely. Returns empty string on any failure so the caller falls back to Tesseract. """ import json as _json try: import ollama as _ollama client = _ollama.Client(host=ollama_url) response = client.chat( model=model, messages=[{ 'role': 'user', 'content': ( 'This is a photo of a receipt. Extract these fields:\n' '- vendor: the store or restaurant name\n' '- amount: the FINAL total the customer paid. Look for a line ' 'labeled "Total", "Grand Total", "Amount Due", or "Balance Due". ' 'Do NOT use subtotal, tax, or tip. Return 0 if you cannot find ' 'a clear final total.\n' '- date: transaction date in YYYY-MM-DD format\n' '- time: transaction time in HH:MM 24-hour format, or null\n' '- category: one word describing the expense type — one of: ' 'meals, fuel, hotel, office, transport, other\n\n' 'Return ONLY a valid JSON object, no commentary, no markdown:\n' '{"vendor":"...","amount":0.00,"date":"YYYY-MM-DD",' '"time":"HH:MM or null","category":"..."}' ), 'images': [data], }], ) if isinstance(response, dict): raw = (response.get('message', {}).get('content') or '').strip() else: raw = (response.message.content or '').strip() # Must contain a JSON object, not prose first, last = raw.find('{'), raw.rfind('}') if first == -1 or last <= first: logger.warning('Vision OCR %s: model returned prose, falling back to Tesseract', filename) return '' json_str = raw[first:last + 1] parsed = _json.loads(json_str) if 'amount' not in parsed: logger.warning('Vision OCR %s: JSON missing amount field, falling back', filename) return '' logger.debug('Vision OCR %s (%s): extracted JSON ok', filename, model) return json_str except ImportError: logger.warning('ollama package not installed — vision OCR unavailable for %s', filename) return '' except Exception as exc: logger.warning('Vision OCR failed for %s: %s', filename, exc) return '' def _ocr_image_tesseract(data: bytes, filename: str) -> str: """Tesseract-based OCR pipeline (fallback).""" try: from PIL import Image, ImageFilter, ImageOps import pytesseract img = Image.open(io.BytesIO(data)) # Resize very large images — tesseract is slower and less accurate at # phone-camera resolution; 1800px wide is plenty for receipt text. max_w = 1800 if img.width > max_w: scale = max_w / img.width img = img.resize((max_w, int(img.height * scale)), Image.LANCZOS) # Grayscale + adaptive binarisation + sharpen img = ImageOps.grayscale(img) img = ImageOps.autocontrast(img) img = img.point(lambda x: 0 if x < 140 else 255) img = img.filter(ImageFilter.SHARPEN) # psm 1 = automatic page segmentation + OSD (handles rotated receipts). # Fall back to psm 6 if OSD data is missing. try: text = pytesseract.image_to_string(img, config='--oem 3 --psm 1').strip() except Exception: text = pytesseract.image_to_string(img, config='--oem 3 --psm 6').strip() logger.debug('Tesseract OCR %s: %d chars', filename, len(text)) return text except ImportError: logger.warning('pytesseract/Pillow not installed — OCR unavailable for %s', filename) return f'[Image: {filename} — install pytesseract+Pillow for OCR]' except Exception as exc: logger.warning('Tesseract OCR failed for %s: %s', filename, exc) return f'[Image: {filename} — OCR failed: {exc}]' def _extract_pdf(data: bytes, filename: str) -> str: try: import pdfplumber parts = [] with pdfplumber.open(io.BytesIO(data)) as pdf: for page in pdf.pages: t = page.extract_text() if t: parts.append(t) return '\n'.join(parts).strip() except ImportError: logger.warning('pdfplumber not installed — PDF extraction unavailable for %s', filename) return f'[PDF: {filename} — install pdfplumber for text extraction]' except Exception as exc: logger.warning('PDF extraction failed for %s: %s', filename, exc) return f'[PDF: {filename} — extraction failed: {exc}]' def _extract_html(data: bytes, filename: str) -> str: try: from html.parser import HTMLParser class _TextExtractor(HTMLParser): def __init__(self): super().__init__() self._parts: list[str] = [] self._skip = False def handle_starttag(self, tag, attrs): if tag in ('script', 'style'): self._skip = True def handle_endtag(self, tag): if tag in ('script', 'style'): self._skip = False def handle_data(self, data): if not self._skip: s = data.strip() if s: self._parts.append(s) def text(self): return ' '.join(self._parts) parser = _TextExtractor() parser.feed(data.decode('utf-8', errors='replace')) return parser.text() except Exception as exc: logger.warning('HTML extraction failed for %s: %s', filename, exc) return f'[HTML: {filename} — extraction failed: {exc}]'