fix: increase timeout and parallelize receipt processing
- ab_ai_bot: raise requests.post timeout 120s -> 600s so long OCR+LLM runs don't silently drop the reply in Discuss - upload: run parse_upload in ThreadPoolExecutor so tesseract OCR doesn't block the FastAPI event loop - expenses_agent: parse all receipts concurrently with asyncio.gather (Ollama semaphore caps parallelism at 2); reduces 13-receipt LLM time from ~39s sequential to ~20s parallel Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -102,7 +102,7 @@ class AbAiBot(models.Model):
|
|||||||
if session_id:
|
if session_id:
|
||||||
payload['session_id'] = session_id
|
payload['session_id'] = session_id
|
||||||
try:
|
try:
|
||||||
resp = requests.post(url, json=payload, headers=self._build_headers(), timeout=120)
|
resp = requests.post(url, json=payload, headers=self._build_headers(), timeout=600)
|
||||||
resp.raise_for_status()
|
resp.raise_for_status()
|
||||||
return resp.json()
|
return resp.json()
|
||||||
except requests.exceptions.Timeout:
|
except requests.exceptions.Timeout:
|
||||||
@@ -140,7 +140,7 @@ class AbAiBot(models.Model):
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
resp = requests.post(url, data=form_data, files=files or [('files', ('empty', b'', 'text/plain'))],
|
resp = requests.post(url, data=form_data, files=files or [('files', ('empty', b'', 'text/plain'))],
|
||||||
headers=headers, timeout=120)
|
headers=headers, timeout=600)
|
||||||
resp.raise_for_status()
|
resp.raise_for_status()
|
||||||
return resp.json()
|
return resp.json()
|
||||||
except requests.exceptions.Timeout:
|
except requests.exceptions.Timeout:
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
import asyncio
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
from datetime import date as _date
|
from datetime import date as _date
|
||||||
@@ -133,12 +134,25 @@ class ExpensesAgent(BaseAgent):
|
|||||||
seen_hashes.add(h)
|
seen_hashes.add(h)
|
||||||
unique_receipts.append(r)
|
unique_receipts.append(r)
|
||||||
|
|
||||||
for receipt in unique_receipts:
|
# Parse all receipts concurrently (bounded by Ollama semaphore)
|
||||||
parsed = await self._parse_receipt_text(
|
parse_tasks = [
|
||||||
receipt.get('text', ''), receipt.get('filename', 'receipt'),
|
self._parse_receipt_text(
|
||||||
|
r.get('text', ''), r.get('filename', 'receipt'),
|
||||||
expense_products=expense_products,
|
expense_products=expense_products,
|
||||||
date_hint=receipt.get('date_from_name'),
|
date_hint=r.get('date_from_name'),
|
||||||
)
|
)
|
||||||
|
for r in unique_receipts
|
||||||
|
]
|
||||||
|
parsed_list = await asyncio.gather(*parse_tasks, return_exceptions=True)
|
||||||
|
|
||||||
|
for receipt, parsed in zip(unique_receipts, parsed_list):
|
||||||
|
if isinstance(parsed, Exception):
|
||||||
|
logger.warning('expenses_agent: parse failed for %s: %s',
|
||||||
|
receipt.get('filename'), parsed)
|
||||||
|
parsed = {'vendor': receipt.get('filename', 'Expense'), 'amount': 0.0,
|
||||||
|
'date': receipt.get('date_from_name') or _date.today().isoformat(),
|
||||||
|
'product_name': ''}
|
||||||
|
|
||||||
# Pick product by name match returned from LLM, fall back to default
|
# Pick product by name match returned from LLM, fall back to default
|
||||||
product_id = default_product_id
|
product_id = default_product_id
|
||||||
chosen_name = parsed.get('product_name', '')
|
chosen_name = parsed.get('product_name', '')
|
||||||
|
|||||||
@@ -31,12 +31,18 @@ async def upload(
|
|||||||
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
|
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
|
||||||
detail='Agent service not ready')
|
detail='Agent service not ready')
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
|
_ocr_executor = ThreadPoolExecutor(max_workers=2)
|
||||||
|
|
||||||
receipts: list[dict] = []
|
receipts: list[dict] = []
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
for f in files:
|
for f in files:
|
||||||
data = await f.read()
|
data = await f.read()
|
||||||
filename = f.filename or 'receipt'
|
filename = f.filename or 'receipt'
|
||||||
try:
|
try:
|
||||||
parsed = parse_upload(filename, data)
|
# parse_upload may run OCR (CPU-bound) — offload to thread pool
|
||||||
|
parsed = await loop.run_in_executor(_ocr_executor, parse_upload, filename, data)
|
||||||
receipts.extend(parsed)
|
receipts.extend(parsed)
|
||||||
logger.info('upload: parsed %s → %d receipt(s)', filename, len(parsed))
|
logger.info('upload: parsed %s → %d receipt(s)', filename, len(parsed))
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
|
|||||||
Reference in New Issue
Block a user