Files
odoo-ai/tests/test_expenses_agent.py
Carlos Garcia aea2fa02b8 expenses_agent: batch LLM calls + skip RAG to fix timeout on large uploads
- auto_rag=False: skip PeerBus odoo_doc_agent call on every execute();
  eliminates 30s Ollama semaphore contention before parsing even starts
- _batch_parse_receipts(): Phase 1 regex (instant per-receipt: amount,
  date, bank-statement skip); Phase 2 single batched LLM call for all
  vendor+product_name instead of N individual calls; vision mode falls
  back to per-receipt calls (can't batch images); LLM fallback on bad
  JSON or wrong item count
- _act() updated to use _batch_parse_receipts()
- 7 new tests covering batch happy path, regex-only amounts, private-key
  cleanup, bank-statement skip, malformed-JSON fallback, wrong-count
  fallback, no-products short-circuit (99 tests total, all passing)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-21 06:36:22 -04:00

1160 lines
44 KiB
Python

"""
ActiveBlue AI — Expenses Agent Unit Tests
==========================================
Suite: test_expenses_agent.py
Module: agent_service/agents/expenses_agent.py
agent_service/tools/receipt_parser.py
addons/activeblue_ai/models/ab_ai_mail.py
Purpose
-------
Verify the core business logic of the expenses agent without requiring
a live Odoo instance, database, or LLM. All external dependencies
(ORM, HTTP, Ollama) are mocked. Tests run in < 1 second.
Run
---
source .venv-test/bin/activate
python -m pytest tests/test_expenses_agent.py -v
Test groups
-----------
TestFindSemanticDuplicate — two-pass duplicate-detection algorithm
test_plan_* — intent keyword → user_confirmed / user_dup_decision
test_act_* — _act() confirmation gate and expense creation
TestParseUpload — receipt_parser ZIP handling and metadata
TestTextToHtml — HTML escaping (skipped without Odoo env)
See tests/TEST_EXPENSES_AGENT.md for full documentation.
"""
from __future__ import annotations
import asyncio
import base64
import io
import zipfile
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
# ---------------------------------------------------------------------------
# _find_semantic_duplicate
# ---------------------------------------------------------------------------
from agent_service.agents.expenses_agent import ExpensesAgent
def _p(vendor='Acme', amount=10.00, date='2026-05-09', time=None):
"""Shorthand for building a parsed-receipt dict."""
return {'vendor': vendor, 'amount': amount, 'date': date, 'time': time, 'product_name': ''}
def _candidate(parsed):
"""Wrap parsed dict as a (receipt, parsed) candidate tuple."""
return ({}, parsed)
class TestFindSemanticDuplicate:
# ------------------------------------------------------------------
# Pass 1: amount-based match
# ------------------------------------------------------------------
def test_exact_match(self):
candidates = [_candidate(_p('Acme', 10.00, '2026-05-09'))]
idx = ExpensesAgent._find_semantic_duplicate(_p('Acme', 10.00, '2026-05-09'), candidates)
assert idx == 0
def test_amount_within_threshold(self):
candidates = [_candidate(_p('Acme', 10.00, '2026-05-09'))]
idx = ExpensesAgent._find_semantic_duplicate(_p('Acme', 10.04, '2026-05-09'), candidates)
assert idx == 0
def test_amount_just_over_threshold(self):
candidates = [_candidate(_p('Acme', 10.00, '2026-05-09'))]
idx = ExpensesAgent._find_semantic_duplicate(_p('Acme', 10.06, '2026-05-09'), candidates)
# Pass 1 should miss; Pass 2 should still catch it (same vendor + date)
assert idx == 0 # caught by Pass 2
def test_different_date_not_duplicate(self):
candidates = [_candidate(_p('Acme', 10.00, '2026-05-09'))]
idx = ExpensesAgent._find_semantic_duplicate(_p('Acme', 10.00, '2026-05-10'), candidates)
assert idx is None
def test_zero_amount_not_deduplicated(self):
"""Zero-amount receipts are too ambiguous — never flagged as dups."""
candidates = [_candidate(_p('Acme', 0.0, '2026-05-09'))]
idx = ExpensesAgent._find_semantic_duplicate(_p('Acme', 0.0, '2026-05-09'), candidates)
assert idx is None
def test_vendor_similarity_above_threshold(self):
candidates = [_candidate(_p('IN-N-OUT HOUSTON', 8.55, '2026-05-09'))]
idx = ExpensesAgent._find_semantic_duplicate(
_p('In-N-Out Houston', 8.55, '2026-05-09'), candidates)
assert idx == 0
def test_vendor_similarity_below_threshold_pass1(self):
"""Completely different vendors with same amount+date → not a dup."""
candidates = [_candidate(_p('McDonald\'s', 8.55, '2026-05-09'))]
idx = ExpensesAgent._find_semantic_duplicate(
_p('Starbucks', 8.55, '2026-05-09'), candidates)
assert idx is None
def test_time_within_window_is_dup(self):
candidates = [_candidate(_p('Acme', 10.00, '2026-05-09', time='14:00'))]
idx = ExpensesAgent._find_semantic_duplicate(
_p('Acme', 10.00, '2026-05-09', time='14:25'), candidates)
assert idx == 0
def test_time_outside_window_not_dup(self):
"""Same vendor/amount/date but >30 min apart → different transactions."""
candidates = [_candidate(_p('Acme', 10.00, '2026-05-09', time='12:00'))]
idx = ExpensesAgent._find_semantic_duplicate(
_p('Acme', 10.00, '2026-05-09', time='14:00'), candidates)
assert idx is None
def test_one_time_missing_does_not_exclude(self):
"""If only one receipt has a time, the time check is skipped."""
candidates = [_candidate(_p('Acme', 10.00, '2026-05-09', time='12:00'))]
idx = ExpensesAgent._find_semantic_duplicate(
_p('Acme', 10.00, '2026-05-09', time=None), candidates)
assert idx == 0
def test_filename_vendor_same_amount_date_is_dup(self):
"""Vendor looks like a filename → treated as dup if amount+date match."""
candidates = [_candidate(_p('20260509_180857.jpg', 10.00, '2026-05-09'))]
idx = ExpensesAgent._find_semantic_duplicate(
_p('20260509_171757.jpg', 10.00, '2026-05-09'), candidates)
assert idx == 0
def test_no_candidates(self):
idx = ExpensesAgent._find_semantic_duplicate(_p(), [])
assert idx is None
def test_returns_correct_index_multiple_candidates(self):
candidates = [
_candidate(_p('Burger King', 5.00, '2026-05-09')),
_candidate(_p('Acme', 10.00, '2026-05-09')),
_candidate(_p('Starbucks', 4.50, '2026-05-08')),
]
idx = ExpensesAgent._find_semantic_duplicate(_p('Acme Corp', 10.00, '2026-05-09'), candidates)
assert idx == 1
# ------------------------------------------------------------------
# Pass 2: OCR amount mismatch (same vendor+date, different amount)
# ------------------------------------------------------------------
def test_pass2_catches_ocr_amount_mismatch(self):
"""The In-N-Out $8.55 vs $15.00 bug: vendor ≥80%, same date, amounts far apart."""
candidates = [_candidate(_p('IN-N-OUT HOUSTON', 8.55, '2026-05-09'))]
idx = ExpensesAgent._find_semantic_duplicate(
_p('In-N-Qut Houston', 15.00, '2026-05-09'), candidates)
assert idx == 0
def test_pass2_requires_high_vendor_similarity(self):
"""Pass 2 threshold is 80% — clearly different vendors should not trigger it."""
# "Starbucks Coffee" vs "McDonalds Burger" share very few characters (~25%)
candidates = [_candidate(_p('Starbucks Coffee', 8.55, '2026-05-09'))]
idx = ExpensesAgent._find_semantic_duplicate(
_p('McDonalds Burger', 15.00, '2026-05-09'), candidates)
assert idx is None
def test_pass2_same_date_required(self):
candidates = [_candidate(_p('IN-N-OUT HOUSTON', 8.55, '2026-05-08'))]
idx = ExpensesAgent._find_semantic_duplicate(
_p('In-N-Out Houston', 15.00, '2026-05-09'), candidates)
assert idx is None
def test_pass2_respects_time_window(self):
"""Even with high vendor similarity, >30 min apart means different visit."""
candidates = [_candidate(_p('IN-N-OUT HOUSTON', 8.55, '2026-05-09', time='12:00'))]
idx = ExpensesAgent._find_semantic_duplicate(
_p('In-N-Out Houston', 15.00, '2026-05-09', time='15:00'), candidates)
assert idx is None
def test_pass2_skips_filename_vendors(self):
"""Pass 2 does not apply when the vendor looks like a filename."""
candidates = [_candidate(_p('20260509_180857.jpg', 8.55, '2026-05-09'))]
idx = ExpensesAgent._find_semantic_duplicate(
_p('20260509_171757.jpg', 15.00, '2026-05-09'), candidates)
# Filenames have different names so similarity will be low;
# Pass 2 explicitly skips filename vendors.
assert idx is None
def test_pass2_zero_amount_not_deduplicated(self):
candidates = [_candidate(_p('Acme', 0.0, '2026-05-09'))]
idx = ExpensesAgent._find_semantic_duplicate(
_p('Acme Corp', 0.0, '2026-05-09'), candidates)
assert idx is None
# ---------------------------------------------------------------------------
# _plan() — keyword detection
# ---------------------------------------------------------------------------
def _make_agent():
"""Return an ExpensesAgent with all dependencies mocked out."""
agent = ExpensesAgent.__new__(ExpensesAgent)
agent._odoo = MagicMock()
agent._llm = MagicMock()
agent._peer_bus = None
agent._et = MagicMock()
agent._gathered_data = {}
agent._actions_taken = []
agent._escalations_list = []
return agent
def _make_directive(task='', raw_message='', receipts=None):
directive = MagicMock()
directive.task = task
directive.params = {}
directive.directive_id = 'test-dir'
directive.context.peer_data = {'raw_message': raw_message, 'requesting_user_id': 1}
directive.context.receipts = receipts or []
return directive
async def _run_plan(task='', raw_message='', receipts=None):
agent = _make_agent()
agent._directive = _make_directive(task=task, raw_message=raw_message, receipts=receipts)
return await agent._plan()
@pytest.mark.asyncio
async def test_plan_confirm_keyword_sets_confirmed():
plan = await _run_plan(raw_message='confirm')
assert plan['user_confirmed'] is True
@pytest.mark.asyncio
async def test_plan_looks_good_sets_confirmed():
plan = await _run_plan(raw_message='looks good')
assert plan['user_confirmed'] is True
@pytest.mark.asyncio
async def test_plan_go_ahead_sets_confirmed():
plan = await _run_plan(raw_message='go ahead')
assert plan['user_confirmed'] is True
@pytest.mark.asyncio
async def test_plan_no_keyword_not_confirmed():
plan = await _run_plan(raw_message='create an expense report')
assert plan['user_confirmed'] is False
@pytest.mark.asyncio
async def test_plan_keep_all_sets_dup_decision():
plan = await _run_plan(raw_message='confirm, keep all')
assert plan['user_confirmed'] is True
assert plan['user_dup_decision'] == 'keep_all'
@pytest.mark.asyncio
async def test_plan_skip_sets_dup_decision():
plan = await _run_plan(raw_message='skip duplicates')
assert plan['user_dup_decision'] == 'skip'
@pytest.mark.asyncio
async def test_plan_default_dup_decision_is_skip():
"""When user says 'confirm' with no dup instruction, default to skip."""
plan = await _run_plan(raw_message='confirm')
assert plan['user_dup_decision'] == 'skip'
@pytest.mark.asyncio
async def test_plan_mode_is_read_without_receipts():
plan = await _run_plan(raw_message='show my expenses')
assert plan['mode'] == 'read'
@pytest.mark.asyncio
async def test_plan_mode_is_create_with_receipts():
fake_receipt = {'filename': 'receipt.jpg', 'text': '', 'b64': '', 'sha256': 'abc'}
plan = await _run_plan(raw_message='create expense report', receipts=[fake_receipt])
assert plan['mode'] == 'create_from_receipts'
@pytest.mark.asyncio
async def test_plan_task_field_also_checked():
"""master LLM writes intent_summary into task; confirm in task should work."""
plan = await _run_plan(task='confirm the expense report creation', raw_message='')
assert plan['user_confirmed'] is True
# ---------------------------------------------------------------------------
# _act() confirmation gate
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_act_creates_expenses_immediately():
"""Expenses are created in draft immediately — no confirmation gate.
The old two-step confirm flow was removed because receipts are only
available in the initial /upload request, making a follow-up confirmation
turn impossible. _act() now creates draft expenses straight away.
"""
agent = _make_agent()
fake_receipt = {
'filename': 'receipt.jpg', 'text': 'Acme $10.00',
'b64': '', 'sha256': 'abc123', 'mimetype': 'image/jpeg',
'date_from_name': None,
}
agent._directive = _make_directive(raw_message='create expense report', receipts=[fake_receipt])
agent._gathered_data = {
'mode': 'create_from_receipts',
'user_confirmed': False,
'user_dup_decision': 'skip',
}
parsed_result = {'vendor': 'Acme', 'amount': 10.00, 'date': '2026-05-09',
'time': None, 'product_name': ''}
sheet_result = MagicMock(success=True, record_id=42)
expense_result = MagicMock(success=True, record_id=99)
agent._et.get_employee_id_for_user = AsyncMock(return_value=1)
agent._et.get_expense_products = AsyncMock(return_value=[{'id': 1, 'name': 'Meals'}])
agent._et.create_expense_sheet = AsyncMock(return_value=sheet_result)
agent._et.create_expense = AsyncMock(return_value=expense_result)
with patch.object(agent, '_parse_receipt_text', new=AsyncMock(return_value=parsed_result)):
actions = await agent._act({})
assert any('Created expense sheet' in a for a in actions)
agent._et.create_expense_sheet.assert_called_once()
agent._et.create_expense.assert_called_once()
@pytest.mark.asyncio
async def test_act_creates_sheet_when_confirmed():
"""Second call with user_confirmed=True → expense sheet is created."""
agent = _make_agent()
fake_receipt = {
'filename': 'receipt.jpg', 'text': 'Acme $10.00',
'b64': base64.b64encode(b'imgdata').decode(), 'sha256': 'abc123',
'mimetype': 'image/jpeg', 'date_from_name': None,
}
agent._directive = _make_directive(raw_message='confirm', receipts=[fake_receipt])
agent._gathered_data = {
'mode': 'create_from_receipts',
'user_confirmed': True,
'user_dup_decision': 'skip',
}
parsed_result = {'vendor': 'Acme', 'amount': 10.00, 'date': '2026-05-09',
'time': None, 'product_name': 'Meals'}
sheet_result = MagicMock(success=True, record_id=42)
expense_result = MagicMock(success=True, record_id=99)
agent._et.get_employee_id_for_user = AsyncMock(return_value=1)
agent._et.get_expense_products = AsyncMock(return_value=[{'id': 1, 'name': 'Meals'}])
agent._et.create_expense_sheet = AsyncMock(return_value=sheet_result)
agent._et.create_expense = AsyncMock(return_value=expense_result)
agent._et.attach_receipt = AsyncMock()
with patch.object(agent, '_parse_receipt_text', new=AsyncMock(return_value=parsed_result)):
actions = await agent._act({})
assert any('Created expense sheet' in a for a in actions)
assert any('Acme' in a for a in actions)
agent._et.create_expense_sheet.assert_called_once()
agent._et.create_expense.assert_called_once()
@pytest.mark.asyncio
async def test_act_deduplicates_byte_identical_receipts():
"""Two receipts with the same SHA256 → only one expense created."""
agent = _make_agent()
receipt = {
'filename': 'receipt.jpg', 'text': 'Acme $10.00',
'b64': '', 'sha256': 'samehash', 'mimetype': 'image/jpeg',
'date_from_name': None,
}
agent._directive = _make_directive(raw_message='confirm', receipts=[receipt, receipt])
agent._gathered_data = {
'mode': 'create_from_receipts',
'user_confirmed': True,
'user_dup_decision': 'skip',
}
parsed_result = {'vendor': 'Acme', 'amount': 10.00, 'date': '2026-05-09',
'time': None, 'product_name': ''}
sheet_result = MagicMock(success=True, record_id=1)
expense_result = MagicMock(success=True, record_id=2)
agent._et.get_employee_id_for_user = AsyncMock(return_value=1)
agent._et.get_expense_products = AsyncMock(return_value=[{'id': 1, 'name': 'Meals'}])
agent._et.create_expense_sheet = AsyncMock(return_value=sheet_result)
agent._et.create_expense = AsyncMock(return_value=expense_result)
agent._et.attach_receipt = AsyncMock()
with patch.object(agent, '_parse_receipt_text', new=AsyncMock(return_value=parsed_result)):
await agent._act({})
# Only one create_expense call despite two identical receipts
assert agent._et.create_expense.call_count == 1
@pytest.mark.asyncio
async def test_act_no_employee_returns_empty_and_escalates():
agent = _make_agent()
fake_receipt = {'filename': 'r.jpg', 'text': '', 'b64': '', 'sha256': 'x',
'mimetype': 'image/jpeg', 'date_from_name': None}
agent._directive = _make_directive(raw_message='confirm', receipts=[fake_receipt])
agent._gathered_data = {
'mode': 'create_from_receipts',
'user_confirmed': True,
'user_dup_decision': 'skip',
}
agent._et.get_employee_id_for_user = AsyncMock(return_value=None)
agent._et.get_expense_products = AsyncMock(return_value=[])
result = await agent._act({})
assert result == []
assert any('No employee record' in e for e in agent._escalations_list)
# ---------------------------------------------------------------------------
# _extract_amount_from_text / _extract_date_from_text — regex helpers
# ---------------------------------------------------------------------------
from agent_service.agents.expenses_agent import (
_extract_amount_from_text, _extract_date_from_text, _is_likely_bank_statement,
_MONTH_MAP, _get_vision_mode,
)
class TestExtractAmount:
def test_simple_total(self):
assert _extract_amount_from_text('Acme\nTotal: $9.99') == 9.99
def test_grand_total(self):
assert _extract_amount_from_text('Subtotal: $20.00\nGrand Total: $22.46') == 22.46
def test_amount_due(self):
assert _extract_amount_from_text('Amount Due: 198.40') == 198.40
def test_no_dollar_sign(self):
assert _extract_amount_from_text('TOTAL 15.75') == 15.75
def test_last_match_wins(self):
# Grand total should beat subtotal
text = 'Subtotal 18.00\nTax 1.50\nTotal 19.50'
assert _extract_amount_from_text(text) == 19.50
def test_empty_text(self):
assert _extract_amount_from_text('') == 0.0
def test_no_total_line(self):
assert _extract_amount_from_text('No price here') == 0.0
def test_comma_in_amount(self):
assert _extract_amount_from_text('Grand Total: $1,234.56') == 1234.56
def test_bottom_scan_garbled_total(self):
# OCR garbled "TOTAL" — bottom-scan fallback should find the amount
text = 'Burger 5.99\nFries 2.50\nT0TAL 8.49'
assert _extract_amount_from_text(text) == 8.49
def test_bottom_scan_skips_change(self):
# Should return the total (8.49), not the change (1.51)
text = 'TOTAL 8.49\nCash 10.00\nChange 1.51'
assert _extract_amount_from_text(text) == 8.49
def test_bottom_scan_amount_on_own_line(self):
# Amount printed on a separate line below the label
text = 'Items 5.00\nTax 0.50\nTotal\n5.50'
assert _extract_amount_from_text(text) == 5.50
def test_total_taxes_excluded(self):
# "Total Taxes $2.80" must NOT be confused with the receipt total;
# the labeled-total regex excludes 'total tax/taxes' via lookahead.
text = 'Subtotal $40.10\nTotal Taxes $2.80\nTotal $42.90'
assert _extract_amount_from_text(text) == 42.90
def test_pass1_returns_max_not_last(self):
# If OCR garbles "Total Taxes" into "Total\n$2.80", _TOTAL_RE would
# accidentally match twice. max() must win over positional [-1].
# Simulate by giving two labeled totals where smaller appears second.
text = 'Grand Total $42.90\nTotal $2.80'
assert _extract_amount_from_text(text) == 42.90
def test_total_sale_gas_station(self):
# Costco / Shell gas receipts say "Total Sale $X.XX", not "Total: $X.XX"
text = 'Pump 9 16.189 Gal\nRegular $ 58.75\nTotal Sale $ 58.75'
assert _extract_amount_from_text(text) == 58.75
def test_net_sale(self):
text = 'Items 22.00\nNet Sale $22.00'
assert _extract_amount_from_text(text) == 22.00
def test_amount_due_with_usd_suffix(self):
# "Total Charged" is in _TOTAL_RE — Pass 1 catches it
text = 'Total Charged: $198.40 USD'
assert _extract_amount_from_text(text) == 198.40
def test_top_amount_returned_by_max(self):
# Display-style receipt: charge shown at top, no 'Total' label.
# Pass 2 (max) must find $40.10 even though it is before the item list.
text = 'LAYAL CAFE\n$40.10\n--------\nBreakfast 37.30\nCoffee 2.80'
assert _extract_amount_from_text(text) == 40.10
def test_card_terminal_visa_line(self):
# Card terminal: amount on a line prefixed with card-brand text.
# VISA must NOT be in the skip list so the amount is captured.
text = 'MERCHANT XYZ\nYHOOMHXAKKKEO4S VISA USD$ 36.78\nAuth 123456'
assert _extract_amount_from_text(text) == 36.78
def test_max_beats_item_prices(self):
# Receipt with several item prices — max should return the largest
# (the total), not an item that appears last in the text.
text = 'Burger 12.99\nFries 4.50\nDrink 2.99\nT0TAL 20.48'
assert _extract_amount_from_text(text) == 20.48
def test_change_line_excluded_from_max(self):
# Change-due line must be skipped so it never inflates the max.
text = 'Items 8.49\nCash Tendered 20.00\nChange 11.51'
assert _extract_amount_from_text(text) == 8.49
def test_net_fee_parking(self):
# Parking kiosk receipts (e.g. MIA) use "net fee: 150.00 USD" format.
# _TOTAL_RE must include "net fee" so Pass 1 catches it and avoids
# the max-scan accidentally picking up a larger line like entry/exit fees.
text = (
'MIAMI AIRPORT PARKING\n'
'Entry 05/09 08:00\n'
'Exit 05/10 14:30\n'
'net fee: 150.00 USD'
)
assert _extract_amount_from_text(text) == 150.00
class TestBankStatementDetection:
def _stmt(self, n: int) -> str:
"""Generate fake bank statement with n transaction lines."""
lines = [f'05/{i+1:02d} MERCHANT {i} $1{i}.99' for i in range(n)]
return '\n'.join(lines)
def test_receipt_not_flagged(self):
# A typical restaurant receipt has < 10 amount-bearing lines
text = 'Acme Cafe\nBurger 12.99\nFries 4.50\nDrink 2.99\nTax 1.65\nTotal 22.13'
assert _is_likely_bank_statement(text) is False
def test_statement_flagged(self):
# 10 transaction lines → flagged as statement
assert _is_likely_bank_statement(self._stmt(10)) is True
def test_threshold_boundary(self):
assert _is_likely_bank_statement(self._stmt(9)) is False
assert _is_likely_bank_statement(self._stmt(10)) is True
def test_empty_text(self):
assert _is_likely_bank_statement('') is False
def test_no_amounts(self):
assert _is_likely_bank_statement('Hello world\nNo prices here') is False
@pytest.mark.asyncio
async def test_parse_bank_statement_returns_skip():
"""Bank statement image must be skipped — no amount, skip=True returned."""
agent = _make_agent()
# Build fake OCR text with 12 transaction lines
stmt_text = '\n'.join(
f'05/{i+1:02d} SOME MERCHANT {i} ${10 + i}.99' for i in range(12)
)
result = await agent._parse_receipt_text(
stmt_text, '2026-05-15_bank.png',
expense_products=[{'id': 1, 'name': 'Meals'}],
)
assert result.get('skip') is True
assert result['amount'] == 0.0
class TestExtractDate:
def test_iso_format(self):
assert _extract_date_from_text('Date: 2026-05-09') == '2026-05-09'
def test_slash_iso(self):
assert _extract_date_from_text('2026/05/09') == '2026-05-09'
def test_us_format(self):
assert _extract_date_from_text('05/09/2026') == '2026-05-09'
def test_us_short_year(self):
assert _extract_date_from_text('05/09/26') == '2026-05-09'
def test_dd_mon_yyyy(self):
# Airline / hotel receipts: "05 MAY 2026", "Issue Date: 05 May 2026"
assert _extract_date_from_text('Issue Date: 05 MAY 2026 MIA A70') == '2026-05-05'
def test_mon_dd_yyyy(self):
assert _extract_date_from_text('MAY 05 2026') == '2026-05-05'
def test_mon_dd_comma_yyyy(self):
assert _extract_date_from_text('May 5, 2026') == '2026-05-05'
def test_month_map_completeness(self):
# All twelve three-letter abbreviations must be present
assert len({k for k in _MONTH_MAP if len(k) == 3}) == 12
def test_no_date(self):
assert _extract_date_from_text('No date here') is None
def test_empty(self):
assert _extract_date_from_text('') is None
# ---------------------------------------------------------------------------
# _parse_receipt_text — combined extraction
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_parse_plain_ocr_text_uses_llm_for_vendor():
"""Regex extracts amount; LLM called only for vendor + product_name."""
agent = _make_agent()
llm_resp = MagicMock()
# LLM now only returns vendor + product_name
llm_resp.content = '{"vendor":"Acme","product_name":"Meals"}'
agent._llm.submit = AsyncMock(return_value=llm_resp)
result = await agent._parse_receipt_text(
'Acme Store\nTotal: $9.99', 'receipt.jpg',
expense_products=[{'id': 1, 'name': 'Meals'}],
)
assert result['vendor'] == 'Acme'
assert result['amount'] == 9.99 # from regex, not LLM
assert result['product_name'] == 'Meals'
agent._llm.submit.assert_called_once()
@pytest.mark.asyncio
async def test_parse_date_hint_overrides_ocr_date():
"""date_hint from filename must be used; LLM date should be ignored."""
agent = _make_agent()
llm_resp = MagicMock()
llm_resp.content = '{"vendor":"Shell","product_name":"Fuel"}'
agent._llm.submit = AsyncMock(return_value=llm_resp)
result = await agent._parse_receipt_text(
'Shell Gas\n05/09/2021\nTotal: $45.00', 'shell.jpg',
date_hint='2026-05-09',
)
assert result['date'] == '2026-05-09' # filename timestamp wins
assert result['amount'] == 45.00
@pytest.mark.asyncio
async def test_parse_ocr_failed_skips_llm_amount():
"""When OCR fails, amount=0 and date comes from hint or today."""
agent = _make_agent()
llm_resp = MagicMock()
llm_resp.content = '{"vendor":"","product_name":"Meals"}'
agent._llm.submit = AsyncMock(return_value=llm_resp)
result = await agent._parse_receipt_text(
'[Image: broken.jpg — OCR failed]', 'broken.jpg',
date_hint='2026-05-10',
expense_products=[{'id': 1, 'name': 'Meals'}],
)
assert result['amount'] == 0.0
assert result['date'] == '2026-05-10'
@pytest.mark.asyncio
async def test_vendor_prompt_does_not_contain_mcdonalds():
"""The text-path vendor prompt must not reference 'McDonald' — it biases
the model toward returning McDonald's whenever OCR text is unclear.
Pinned to text mode so vision path (which has its own cleaner prompt) does
not interfere.
"""
agent = _make_agent()
captured: list[str] = []
llm_resp = MagicMock()
llm_resp.content = '{"vendor":"The Home Depot","product_name":"Supplies"}'
async def _capture(messages, caller=None):
for m in messages:
captured.append(m.get('content', ''))
return llm_resp
agent._llm.submit = _capture
with patch('agent_service.agents.expenses_agent._get_vision_mode', return_value='text'):
await agent._parse_receipt_text(
'THE HOME DEPOT\nHow doers get more done\nWAGNER FURNO 300HG 36.78\nVISA USD$ 36.78',
'homedepot.jpg',
expense_products=[{'id': 1, 'name': 'Meals'}, {'id': 2, 'name': 'Supplies'}],
)
full_prompt = ' '.join(captured)
assert 'McDonald' not in full_prompt, (
"Text-path prompt must not contain 'McDonald' — it biases the model."
)
@pytest.mark.asyncio
async def test_vendor_prompt_instructs_not_to_guess_absent_brand():
"""Text-path prompt must tell LLM not to substitute a brand not in the OCR text."""
agent = _make_agent()
captured: list[str] = []
llm_resp = MagicMock()
llm_resp.content = '{"vendor":"SERGIO\'S MIAMI AIRPORT","product_name":"Meals"}'
async def _capture(messages, caller=None):
for m in messages:
captured.append(m.get('content', ''))
return llm_resp
agent._llm.submit = _capture
with patch('agent_service.agents.expenses_agent._get_vision_mode', return_value='text'):
await agent._parse_receipt_text(
'(((HMSHost ByAvolta\nSERGIO\'S MIAMI AIRPORT\nCHK 9745\nPayment $16.29',
'sergios.jpg',
expense_products=[{'id': 1, 'name': 'Meals'}],
)
full_prompt = ' '.join(captured)
assert 'only use a brand name' in full_prompt.lower() or \
'do not' in full_prompt.lower() or \
'not substitute' in full_prompt.lower(), (
"Prompt must instruct the LLM not to substitute a different brand name."
)
# ---------------------------------------------------------------------------
# Vision LLM path — _parse_receipt_text with b64/mimetype
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_vision_path_sends_image_to_llm():
"""In vision mode, the LLM call includes an 'images' key with the b64 data."""
agent = _make_agent()
captured_messages: list = []
llm_resp = MagicMock()
llm_resp.content = '{"vendor":"Home Depot","product_name":"Supplies"}'
async def _capture(messages, caller=None):
captured_messages.extend(messages)
return llm_resp
agent._llm.submit = _capture
with patch('agent_service.agents.expenses_agent._get_vision_mode', return_value='vision'):
result = await agent._parse_receipt_text(
'THE HOME DEPOT\nTotal: $36.78', 'homedepot.jpg',
expense_products=[{'id': 1, 'name': 'Supplies'}],
b64='FAKEBASE64DATA',
mimetype='image/jpeg',
)
assert result['vendor'] == 'Home Depot'
assert result['amount'] == 36.78
assert len(captured_messages) == 1
msg = captured_messages[0]
assert 'images' in msg, "Vision path must include 'images' in LLM message"
assert msg['images'] == ['FAKEBASE64DATA']
@pytest.mark.asyncio
async def test_text_mode_skips_vision_even_with_image():
"""When RECEIPT_VISION_MODE=text, b64 is ignored and no images are sent."""
agent = _make_agent()
captured_messages: list = []
llm_resp = MagicMock()
llm_resp.content = '{"vendor":"Home Depot","product_name":"Supplies"}'
async def _capture(messages, caller=None):
captured_messages.extend(messages)
return llm_resp
agent._llm.submit = _capture
with patch('agent_service.agents.expenses_agent._get_vision_mode', return_value='text'):
await agent._parse_receipt_text(
'THE HOME DEPOT\nTotal: $36.78', 'homedepot.jpg',
expense_products=[{'id': 1, 'name': 'Supplies'}],
b64='FAKEBASE64DATA',
mimetype='image/jpeg',
)
assert len(captured_messages) == 1
assert 'images' not in captured_messages[0], (
"Text mode must NOT send images to the LLM."
)
@pytest.mark.asyncio
async def test_vision_falls_back_to_text_on_llm_error():
"""If the vision LLM call raises, the text path is tried as fallback."""
agent = _make_agent()
call_count = [0]
llm_resp = MagicMock()
llm_resp.content = '{"vendor":"Shell","product_name":"Fuel"}'
async def _first_fails(messages, caller=None):
call_count[0] += 1
if call_count[0] == 1:
raise RuntimeError('simulated vision model error')
return llm_resp
agent._llm.submit = _first_fails
with patch('agent_service.agents.expenses_agent._get_vision_mode', return_value='vision'):
result = await agent._parse_receipt_text(
'SHELL GAS STATION\nTotal Sale $55.00', 'shell.jpg',
expense_products=[{'id': 1, 'name': 'Fuel'}],
b64='FAKEBASE64DATA',
mimetype='image/jpeg',
)
assert call_count[0] == 2, "Must make exactly 2 LLM calls (vision failed, text succeeded)"
assert result['vendor'] == 'Shell'
assert result['amount'] == 55.00
@pytest.mark.asyncio
async def test_non_image_mimetype_uses_text_path_in_vision_mode():
"""PDFs and text files must always use the text path even in vision mode."""
agent = _make_agent()
captured_messages: list = []
llm_resp = MagicMock()
llm_resp.content = '{"vendor":"United Airlines","product_name":"Travel"}'
async def _capture(messages, caller=None):
captured_messages.extend(messages)
return llm_resp
agent._llm.submit = _capture
with patch('agent_service.agents.expenses_agent._get_vision_mode', return_value='vision'):
await agent._parse_receipt_text(
'United Airlines\nBaggage Fee\nTotal: $45.00', 'ticket.pdf',
expense_products=[{'id': 1, 'name': 'Travel'}],
b64='FAKEBASE64DATA',
mimetype='application/pdf', # NOT an image — no vision
)
assert len(captured_messages) == 1
assert 'images' not in captured_messages[0], (
"PDF receipts must not be sent as images even in vision mode."
)
# ---------------------------------------------------------------------------
# _batch_parse_receipts — batched LLM call for vendor + product_name
# ---------------------------------------------------------------------------
def _make_receipt(filename='receipt.jpg', text='Acme\nTotal: $10.00',
b64='', mimetype='image/jpeg', date_from_name=None):
"""Build a minimal receipt dict as produced by parse_upload."""
return {'filename': filename, 'text': text, 'b64': b64,
'mimetype': mimetype, 'date_from_name': date_from_name,
'sha256': 'abc'}
@pytest.mark.asyncio
async def test_batch_parse_single_llm_call_for_multiple_receipts():
"""N text receipts must result in exactly 1 LLM call (batched prompt)."""
agent = _make_agent()
receipts = [
_make_receipt('a.txt', 'Shell Gas\nTotal: $45.00'),
_make_receipt('b.txt', 'Marriott Hotel\nAmount Due: $180.00'),
_make_receipt('c.txt', 'Chipotle\nTotal: $12.75'),
]
products = [{'id': 1, 'name': 'Meals'}, {'id': 2, 'name': 'Travel'}, {'id': 3, 'name': 'Fuel'}]
llm_resp = MagicMock()
llm_resp.content = (
'[{"vendor":"Shell","product_name":"Fuel"},'
'{"vendor":"Marriott","product_name":"Travel"},'
'{"vendor":"Chipotle","product_name":"Meals"}]'
)
agent._llm.submit = AsyncMock(return_value=llm_resp)
with patch('agent_service.agents.expenses_agent._get_vision_mode', return_value='text'):
results = await agent._batch_parse_receipts(receipts, products)
agent._llm.submit.assert_called_once()
assert len(results) == 3
assert results[0]['vendor'] == 'Shell'
assert results[1]['vendor'] == 'Marriott'
assert results[2]['vendor'] == 'Chipotle'
@pytest.mark.asyncio
async def test_batch_parse_amounts_from_regex_not_llm():
"""Amounts must come from regex (Phase 1), not from the LLM batch response."""
agent = _make_agent()
receipts = [_make_receipt('r.txt', 'Acme Store\nTotal: $99.99')]
products = [{'id': 1, 'name': 'Supplies'}]
llm_resp = MagicMock()
llm_resp.content = '[{"vendor":"Acme","product_name":"Supplies"}]'
agent._llm.submit = AsyncMock(return_value=llm_resp)
with patch('agent_service.agents.expenses_agent._get_vision_mode', return_value='text'):
results = await agent._batch_parse_receipts(receipts, products)
assert results[0]['amount'] == 99.99
@pytest.mark.asyncio
async def test_batch_parse_no_private_keys_in_results():
"""Internal _-prefixed keys must be stripped from every result dict."""
agent = _make_agent()
receipts = [_make_receipt('r.txt', 'Acme\nTotal: $10.00')]
products = [{'id': 1, 'name': 'Meals'}]
llm_resp = MagicMock()
llm_resp.content = '[{"vendor":"Acme","product_name":"Meals"}]'
agent._llm.submit = AsyncMock(return_value=llm_resp)
with patch('agent_service.agents.expenses_agent._get_vision_mode', return_value='text'):
results = await agent._batch_parse_receipts(receipts, products)
for entry in results:
private = [k for k in entry if k.startswith('_')]
assert private == [], f'Private keys not cleaned up: {private}'
@pytest.mark.asyncio
async def test_batch_parse_bank_statement_skipped_no_llm():
"""Bank statements inside a batch must be skipped; no LLM call for them."""
agent = _make_agent()
# 12 transaction lines → flagged as bank statement
stmt = '\n'.join(f'05/{i+1:02d} MERCHANT {i} ${10 + i}.99' for i in range(12))
receipts = [
_make_receipt('stmt.pdf', stmt),
_make_receipt('real.txt', 'Shell Gas\nTotal: $45.00'),
]
products = [{'id': 1, 'name': 'Fuel'}]
llm_resp = MagicMock()
llm_resp.content = '[{"vendor":"Shell","product_name":"Fuel"}]'
agent._llm.submit = AsyncMock(return_value=llm_resp)
with patch('agent_service.agents.expenses_agent._get_vision_mode', return_value='text'):
results = await agent._batch_parse_receipts(receipts, products)
# Only 1 item sent to LLM (the real receipt, not the statement)
agent._llm.submit.assert_called_once()
# Statement entry has skip=True
assert results[0].get('skip') is True
assert results[0]['amount'] == 0.0
# Real receipt parsed normally
assert results[1]['vendor'] == 'Shell'
@pytest.mark.asyncio
async def test_batch_parse_falls_back_on_malformed_json():
"""When the batch LLM returns malformed JSON, falls back to individual calls."""
agent = _make_agent()
receipts = [
_make_receipt('a.txt', 'Shell\nTotal: $45.00'),
_make_receipt('b.txt', 'Marriott\nTotal: $180.00'),
]
products = [{'id': 1, 'name': 'Travel'}]
call_count = [0]
individual_resp = MagicMock()
individual_resp.content = '{"vendor":"Shell","product_name":"Travel"}'
async def _side_effect(messages, caller=None):
call_count[0] += 1
if call_count[0] == 1:
bad = MagicMock()
bad.content = 'not valid json at all'
return bad
return individual_resp
agent._llm.submit = _side_effect
with patch('agent_service.agents.expenses_agent._get_vision_mode', return_value='text'):
results = await agent._batch_parse_receipts(receipts, products)
# 1 batch attempt + 2 individual fallback calls = 3
assert call_count[0] == 3
assert len(results) == 2
@pytest.mark.asyncio
async def test_batch_parse_falls_back_on_wrong_item_count():
"""When the LLM returns a JSON array with wrong length, falls back."""
agent = _make_agent()
receipts = [
_make_receipt('a.txt', 'Shell\nTotal: $45.00'),
_make_receipt('b.txt', 'Marriott\nTotal: $180.00'),
]
products = [{'id': 1, 'name': 'Travel'}]
call_count = [0]
fallback_resp = MagicMock()
fallback_resp.content = '{"vendor":"Shell","product_name":"Travel"}'
async def _side_effect(messages, caller=None):
call_count[0] += 1
if call_count[0] == 1:
# Returns only 1 item, expected 2
wrong = MagicMock()
wrong.content = '[{"vendor":"Shell","product_name":"Travel"}]'
return wrong
return fallback_resp
agent._llm.submit = _side_effect
with patch('agent_service.agents.expenses_agent._get_vision_mode', return_value='text'):
results = await agent._batch_parse_receipts(receipts, products)
# 1 batch attempt + 2 individual fallback calls = 3
assert call_count[0] == 3
assert len(results) == 2
@pytest.mark.asyncio
async def test_batch_parse_no_products_skips_llm():
"""When there are no expense products, the LLM is not called."""
agent = _make_agent()
receipts = [_make_receipt('r.txt', 'Acme\nTotal: $10.00')]
agent._llm.submit = AsyncMock()
with patch('agent_service.agents.expenses_agent._get_vision_mode', return_value='text'):
results = await agent._batch_parse_receipts(receipts, [])
agent._llm.submit.assert_not_called()
assert len(results) == 1
# ---------------------------------------------------------------------------
# parse_upload — receipt_parser.py
# ---------------------------------------------------------------------------
from agent_service.tools.receipt_parser import parse_upload
class TestParseUpload:
def test_text_file_parsed(self):
results = parse_upload('receipt.txt', b'Acme Store\nTotal: $10.00')
assert len(results) == 1
r = results[0]
assert r['filename'] == 'receipt.txt'
assert 'Acme Store' in r['text']
assert r['mimetype'] == 'text/plain'
assert r['sha256'] # hash present
def test_date_extracted_from_filename(self):
results = parse_upload('20260509_180857.jpg_compressed.JPEG', b'\xff\xd8\xff')
assert results[0]['date_from_name'] == '2026-05-09'
def test_no_date_in_plain_filename(self):
results = parse_upload('receipt.txt', b'text')
assert results[0]['date_from_name'] is None
def test_zip_extracted(self):
buf = io.BytesIO()
with zipfile.ZipFile(buf, 'w') as zf:
zf.writestr('receipt.txt', 'Vendor: Acme\nTotal: $5.00')
zf.writestr('other.txt', 'Another receipt')
results = parse_upload('bundle.zip', buf.getvalue())
assert len(results) == 2
filenames = {r['filename'] for r in results}
assert 'receipt.txt' in filenames
assert 'other.txt' in filenames
def test_zip_skips_directories(self):
buf = io.BytesIO()
with zipfile.ZipFile(buf, 'w') as zf:
zf.writestr('subdir/', '') # directory entry
zf.writestr('subdir/file.txt', 'content')
results = parse_upload('bundle.zip', buf.getvalue())
assert len(results) == 1
assert results[0]['filename'] == 'file.txt'
def test_empty_zip_returns_empty(self):
buf = io.BytesIO()
with zipfile.ZipFile(buf, 'w'):
pass
results = parse_upload('empty.zip', buf.getvalue())
assert results == []
def test_sha256_is_consistent(self):
data = b'some receipt bytes'
r1 = parse_upload('a.txt', data)[0]
r2 = parse_upload('b.txt', data)[0]
assert r1['sha256'] == r2['sha256']
def test_b64_decodes_to_original(self):
data = b'receipt content here'
result = parse_upload('r.txt', data)[0]
assert base64.b64decode(result['b64']) == data
# ---------------------------------------------------------------------------
# _text_to_html — ab_ai_mail.py
# ---------------------------------------------------------------------------
from agent_service.agents.expenses_agent import ExpensesAgent # already imported
# Import from Odoo addon directly
import sys, importlib
def _get_text_to_html():
"""Import _text_to_html without triggering Odoo module loading."""
import importlib.util, pathlib
path = pathlib.Path(__file__).parent.parent / 'addons' / 'activeblue_ai' / 'models' / 'ab_ai_mail.py'
spec = importlib.util.spec_from_file_location('ab_ai_mail', path)
mod = importlib.util.module_from_spec(spec)
# Stub out the odoo imports so the module loads without Odoo installed
sys.modules.setdefault('odoo', MagicMock())
sys.modules.setdefault('odoo.SUPERUSER_ID', MagicMock())
sys.modules.setdefault('markupsafe', __import__('markupsafe'))
spec.loader.exec_module(mod)
return mod._text_to_html
class TestTextToHtml:
@pytest.fixture(autouse=True)
def fn(self):
try:
self._fn = _get_text_to_html()
except Exception:
pytest.skip('ab_ai_mail could not be imported without Odoo environment')
def test_plain_text_unchanged(self):
result = str(self._fn('hello world'))
assert 'hello world' in result
def test_newline_becomes_br(self):
result = str(self._fn('line one\nline two'))
assert '<br>' in result
assert 'line one' in result
assert 'line two' in result
def test_html_special_chars_escaped(self):
result = str(self._fn('<script>alert("xss")</script>'))
assert '<script>' not in result
assert '&lt;script&gt;' in result
def test_ampersand_escaped(self):
result = str(self._fn('Layal Cafe & Banquet'))
assert '&amp;' in result
def test_empty_string(self):
result = str(self._fn(''))
assert result == ''