""" ActiveBlue AI — Expenses Agent Unit Tests ========================================== Suite: test_expenses_agent.py Module: agent_service/agents/expenses_agent.py agent_service/tools/receipt_parser.py addons/activeblue_ai/models/ab_ai_mail.py Purpose ------- Verify the core business logic of the expenses agent without requiring a live Odoo instance, database, or LLM. All external dependencies (ORM, HTTP, Ollama) are mocked. Tests run in < 1 second. Run --- source .venv-test/bin/activate python -m pytest tests/test_expenses_agent.py -v Test groups ----------- TestFindSemanticDuplicate — two-pass duplicate-detection algorithm test_plan_* — intent keyword → user_confirmed / user_dup_decision test_act_* — _act() confirmation gate and expense creation TestParseUpload — receipt_parser ZIP handling and metadata TestTextToHtml — HTML escaping (skipped without Odoo env) See tests/TEST_EXPENSES_AGENT.md for full documentation. """ from __future__ import annotations import asyncio import base64 import io import zipfile from unittest.mock import AsyncMock, MagicMock, patch import pytest # --------------------------------------------------------------------------- # _find_semantic_duplicate # --------------------------------------------------------------------------- from agent_service.agents.expenses_agent import ExpensesAgent def _p(vendor='Acme', amount=10.00, date='2026-05-09', time=None): """Shorthand for building a parsed-receipt dict.""" return {'vendor': vendor, 'amount': amount, 'date': date, 'time': time, 'product_name': ''} def _candidate(parsed): """Wrap parsed dict as a (receipt, parsed) candidate tuple.""" return ({}, parsed) class TestFindSemanticDuplicate: # ------------------------------------------------------------------ # Pass 1: amount-based match # ------------------------------------------------------------------ def test_exact_match(self): candidates = [_candidate(_p('Acme', 10.00, '2026-05-09'))] idx = ExpensesAgent._find_semantic_duplicate(_p('Acme', 10.00, '2026-05-09'), candidates) assert idx == 0 def test_amount_within_threshold(self): candidates = [_candidate(_p('Acme', 10.00, '2026-05-09'))] idx = ExpensesAgent._find_semantic_duplicate(_p('Acme', 10.04, '2026-05-09'), candidates) assert idx == 0 def test_amount_just_over_threshold(self): candidates = [_candidate(_p('Acme', 10.00, '2026-05-09'))] idx = ExpensesAgent._find_semantic_duplicate(_p('Acme', 10.06, '2026-05-09'), candidates) # Pass 1 should miss; Pass 2 should still catch it (same vendor + date) assert idx == 0 # caught by Pass 2 def test_different_date_not_duplicate(self): candidates = [_candidate(_p('Acme', 10.00, '2026-05-09'))] idx = ExpensesAgent._find_semantic_duplicate(_p('Acme', 10.00, '2026-05-10'), candidates) assert idx is None def test_zero_amount_not_deduplicated(self): """Zero-amount receipts are too ambiguous — never flagged as dups.""" candidates = [_candidate(_p('Acme', 0.0, '2026-05-09'))] idx = ExpensesAgent._find_semantic_duplicate(_p('Acme', 0.0, '2026-05-09'), candidates) assert idx is None def test_vendor_similarity_above_threshold(self): candidates = [_candidate(_p('IN-N-OUT HOUSTON', 8.55, '2026-05-09'))] idx = ExpensesAgent._find_semantic_duplicate( _p('In-N-Out Houston', 8.55, '2026-05-09'), candidates) assert idx == 0 def test_vendor_similarity_below_threshold_pass1(self): """Completely different vendors with same amount+date → not a dup.""" candidates = [_candidate(_p('McDonald\'s', 8.55, '2026-05-09'))] idx = ExpensesAgent._find_semantic_duplicate( _p('Starbucks', 8.55, '2026-05-09'), candidates) assert idx is None def test_time_within_window_is_dup(self): candidates = [_candidate(_p('Acme', 10.00, '2026-05-09', time='14:00'))] idx = ExpensesAgent._find_semantic_duplicate( _p('Acme', 10.00, '2026-05-09', time='14:25'), candidates) assert idx == 0 def test_time_outside_window_not_dup(self): """Same vendor/amount/date but >30 min apart → different transactions.""" candidates = [_candidate(_p('Acme', 10.00, '2026-05-09', time='12:00'))] idx = ExpensesAgent._find_semantic_duplicate( _p('Acme', 10.00, '2026-05-09', time='14:00'), candidates) assert idx is None def test_one_time_missing_does_not_exclude(self): """If only one receipt has a time, the time check is skipped.""" candidates = [_candidate(_p('Acme', 10.00, '2026-05-09', time='12:00'))] idx = ExpensesAgent._find_semantic_duplicate( _p('Acme', 10.00, '2026-05-09', time=None), candidates) assert idx == 0 def test_filename_vendor_same_amount_date_is_dup(self): """Vendor looks like a filename → treated as dup if amount+date match.""" candidates = [_candidate(_p('20260509_180857.jpg', 10.00, '2026-05-09'))] idx = ExpensesAgent._find_semantic_duplicate( _p('20260509_171757.jpg', 10.00, '2026-05-09'), candidates) assert idx == 0 def test_no_candidates(self): idx = ExpensesAgent._find_semantic_duplicate(_p(), []) assert idx is None def test_returns_correct_index_multiple_candidates(self): candidates = [ _candidate(_p('Burger King', 5.00, '2026-05-09')), _candidate(_p('Acme', 10.00, '2026-05-09')), _candidate(_p('Starbucks', 4.50, '2026-05-08')), ] idx = ExpensesAgent._find_semantic_duplicate(_p('Acme Corp', 10.00, '2026-05-09'), candidates) assert idx == 1 # ------------------------------------------------------------------ # Pass 2: OCR amount mismatch (same vendor+date, different amount) # ------------------------------------------------------------------ def test_pass2_catches_ocr_amount_mismatch(self): """The In-N-Out $8.55 vs $15.00 bug: vendor ≥80%, same date, amounts far apart.""" candidates = [_candidate(_p('IN-N-OUT HOUSTON', 8.55, '2026-05-09'))] idx = ExpensesAgent._find_semantic_duplicate( _p('In-N-Qut Houston', 15.00, '2026-05-09'), candidates) assert idx == 0 def test_pass2_requires_high_vendor_similarity(self): """Pass 2 threshold is 80% — clearly different vendors should not trigger it.""" # "Starbucks Coffee" vs "McDonalds Burger" share very few characters (~25%) candidates = [_candidate(_p('Starbucks Coffee', 8.55, '2026-05-09'))] idx = ExpensesAgent._find_semantic_duplicate( _p('McDonalds Burger', 15.00, '2026-05-09'), candidates) assert idx is None def test_pass2_same_date_required(self): candidates = [_candidate(_p('IN-N-OUT HOUSTON', 8.55, '2026-05-08'))] idx = ExpensesAgent._find_semantic_duplicate( _p('In-N-Out Houston', 15.00, '2026-05-09'), candidates) assert idx is None def test_pass2_respects_time_window(self): """Even with high vendor similarity, >30 min apart means different visit.""" candidates = [_candidate(_p('IN-N-OUT HOUSTON', 8.55, '2026-05-09', time='12:00'))] idx = ExpensesAgent._find_semantic_duplicate( _p('In-N-Out Houston', 15.00, '2026-05-09', time='15:00'), candidates) assert idx is None def test_pass2_skips_filename_vendors(self): """Pass 2 does not apply when the vendor looks like a filename.""" candidates = [_candidate(_p('20260509_180857.jpg', 8.55, '2026-05-09'))] idx = ExpensesAgent._find_semantic_duplicate( _p('20260509_171757.jpg', 15.00, '2026-05-09'), candidates) # Filenames have different names so similarity will be low; # Pass 2 explicitly skips filename vendors. assert idx is None def test_pass2_zero_amount_not_deduplicated(self): candidates = [_candidate(_p('Acme', 0.0, '2026-05-09'))] idx = ExpensesAgent._find_semantic_duplicate( _p('Acme Corp', 0.0, '2026-05-09'), candidates) assert idx is None # --------------------------------------------------------------------------- # _plan() — keyword detection # --------------------------------------------------------------------------- def _make_agent(): """Return an ExpensesAgent with all dependencies mocked out.""" agent = ExpensesAgent.__new__(ExpensesAgent) agent._odoo = MagicMock() agent._llm = MagicMock() agent._peer_bus = None agent._et = MagicMock() agent._gathered_data = {} agent._actions_taken = [] agent._escalations_list = [] return agent def _make_directive(task='', raw_message='', receipts=None): directive = MagicMock() directive.task = task directive.params = {} directive.directive_id = 'test-dir' directive.context.peer_data = {'raw_message': raw_message, 'requesting_user_id': 1} directive.context.receipts = receipts or [] return directive async def _run_plan(task='', raw_message='', receipts=None): agent = _make_agent() agent._directive = _make_directive(task=task, raw_message=raw_message, receipts=receipts) return await agent._plan() @pytest.mark.asyncio async def test_plan_confirm_keyword_sets_confirmed(): plan = await _run_plan(raw_message='confirm') assert plan['user_confirmed'] is True @pytest.mark.asyncio async def test_plan_looks_good_sets_confirmed(): plan = await _run_plan(raw_message='looks good') assert plan['user_confirmed'] is True @pytest.mark.asyncio async def test_plan_go_ahead_sets_confirmed(): plan = await _run_plan(raw_message='go ahead') assert plan['user_confirmed'] is True @pytest.mark.asyncio async def test_plan_no_keyword_not_confirmed(): plan = await _run_plan(raw_message='create an expense report') assert plan['user_confirmed'] is False @pytest.mark.asyncio async def test_plan_keep_all_sets_dup_decision(): plan = await _run_plan(raw_message='confirm, keep all') assert plan['user_confirmed'] is True assert plan['user_dup_decision'] == 'keep_all' @pytest.mark.asyncio async def test_plan_skip_sets_dup_decision(): plan = await _run_plan(raw_message='skip duplicates') assert plan['user_dup_decision'] == 'skip' @pytest.mark.asyncio async def test_plan_default_dup_decision_is_skip(): """When user says 'confirm' with no dup instruction, default to skip.""" plan = await _run_plan(raw_message='confirm') assert plan['user_dup_decision'] == 'skip' @pytest.mark.asyncio async def test_plan_mode_is_read_without_receipts(): plan = await _run_plan(raw_message='show my expenses') assert plan['mode'] == 'read' @pytest.mark.asyncio async def test_plan_mode_is_create_with_receipts(): fake_receipt = {'filename': 'receipt.jpg', 'text': '', 'b64': '', 'sha256': 'abc'} plan = await _run_plan(raw_message='create expense report', receipts=[fake_receipt]) assert plan['mode'] == 'create_from_receipts' @pytest.mark.asyncio async def test_plan_task_field_also_checked(): """master LLM writes intent_summary into task; confirm in task should work.""" plan = await _run_plan(task='confirm the expense report creation', raw_message='') assert plan['user_confirmed'] is True # --------------------------------------------------------------------------- # _act() confirmation gate # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_act_creates_expenses_immediately(): """Expenses are created in draft immediately — no confirmation gate. The old two-step confirm flow was removed because receipts are only available in the initial /upload request, making a follow-up confirmation turn impossible. _act() now creates draft expenses straight away. """ agent = _make_agent() fake_receipt = { 'filename': 'receipt.jpg', 'text': 'Acme $10.00', 'b64': '', 'sha256': 'abc123', 'mimetype': 'image/jpeg', 'date_from_name': None, } agent._directive = _make_directive(raw_message='create expense report', receipts=[fake_receipt]) agent._gathered_data = { 'mode': 'create_from_receipts', 'user_confirmed': False, 'user_dup_decision': 'skip', } parsed_result = {'vendor': 'Acme', 'amount': 10.00, 'date': '2026-05-09', 'time': None, 'product_name': ''} sheet_result = MagicMock(success=True, record_id=42) expense_result = MagicMock(success=True, record_id=99) agent._et.get_employee_id_for_user = AsyncMock(return_value=1) agent._et.get_expense_products = AsyncMock(return_value=[{'id': 1, 'name': 'Meals'}]) agent._et.create_expense_sheet = AsyncMock(return_value=sheet_result) agent._et.create_expense = AsyncMock(return_value=expense_result) with patch.object(agent, '_parse_receipt_text', new=AsyncMock(return_value=parsed_result)): actions = await agent._act({}) assert any('Created expense sheet' in a for a in actions) agent._et.create_expense_sheet.assert_called_once() agent._et.create_expense.assert_called_once() @pytest.mark.asyncio async def test_act_creates_sheet_when_confirmed(): """Second call with user_confirmed=True → expense sheet is created.""" agent = _make_agent() fake_receipt = { 'filename': 'receipt.jpg', 'text': 'Acme $10.00', 'b64': base64.b64encode(b'imgdata').decode(), 'sha256': 'abc123', 'mimetype': 'image/jpeg', 'date_from_name': None, } agent._directive = _make_directive(raw_message='confirm', receipts=[fake_receipt]) agent._gathered_data = { 'mode': 'create_from_receipts', 'user_confirmed': True, 'user_dup_decision': 'skip', } parsed_result = {'vendor': 'Acme', 'amount': 10.00, 'date': '2026-05-09', 'time': None, 'product_name': 'Meals'} sheet_result = MagicMock(success=True, record_id=42) expense_result = MagicMock(success=True, record_id=99) agent._et.get_employee_id_for_user = AsyncMock(return_value=1) agent._et.get_expense_products = AsyncMock(return_value=[{'id': 1, 'name': 'Meals'}]) agent._et.create_expense_sheet = AsyncMock(return_value=sheet_result) agent._et.create_expense = AsyncMock(return_value=expense_result) agent._et.attach_receipt = AsyncMock() with patch.object(agent, '_parse_receipt_text', new=AsyncMock(return_value=parsed_result)): actions = await agent._act({}) assert any('Created expense sheet' in a for a in actions) assert any('Acme' in a for a in actions) agent._et.create_expense_sheet.assert_called_once() agent._et.create_expense.assert_called_once() @pytest.mark.asyncio async def test_act_deduplicates_byte_identical_receipts(): """Two receipts with the same SHA256 → only one expense created.""" agent = _make_agent() receipt = { 'filename': 'receipt.jpg', 'text': 'Acme $10.00', 'b64': '', 'sha256': 'samehash', 'mimetype': 'image/jpeg', 'date_from_name': None, } agent._directive = _make_directive(raw_message='confirm', receipts=[receipt, receipt]) agent._gathered_data = { 'mode': 'create_from_receipts', 'user_confirmed': True, 'user_dup_decision': 'skip', } parsed_result = {'vendor': 'Acme', 'amount': 10.00, 'date': '2026-05-09', 'time': None, 'product_name': ''} sheet_result = MagicMock(success=True, record_id=1) expense_result = MagicMock(success=True, record_id=2) agent._et.get_employee_id_for_user = AsyncMock(return_value=1) agent._et.get_expense_products = AsyncMock(return_value=[{'id': 1, 'name': 'Meals'}]) agent._et.create_expense_sheet = AsyncMock(return_value=sheet_result) agent._et.create_expense = AsyncMock(return_value=expense_result) agent._et.attach_receipt = AsyncMock() with patch.object(agent, '_parse_receipt_text', new=AsyncMock(return_value=parsed_result)): await agent._act({}) # Only one create_expense call despite two identical receipts assert agent._et.create_expense.call_count == 1 @pytest.mark.asyncio async def test_act_no_employee_returns_empty_and_escalates(): agent = _make_agent() fake_receipt = {'filename': 'r.jpg', 'text': '', 'b64': '', 'sha256': 'x', 'mimetype': 'image/jpeg', 'date_from_name': None} agent._directive = _make_directive(raw_message='confirm', receipts=[fake_receipt]) agent._gathered_data = { 'mode': 'create_from_receipts', 'user_confirmed': True, 'user_dup_decision': 'skip', } agent._et.get_employee_id_for_user = AsyncMock(return_value=None) agent._et.get_expense_products = AsyncMock(return_value=[]) result = await agent._act({}) assert result == [] assert any('No employee record' in e for e in agent._escalations_list) # --------------------------------------------------------------------------- # _extract_amount_from_text / _extract_date_from_text — regex helpers # --------------------------------------------------------------------------- from agent_service.agents.expenses_agent import ( _extract_amount_from_text, _extract_date_from_text, _is_likely_bank_statement, _MONTH_MAP, _get_vision_mode, ) class TestExtractAmount: def test_simple_total(self): assert _extract_amount_from_text('Acme\nTotal: $9.99') == 9.99 def test_grand_total(self): assert _extract_amount_from_text('Subtotal: $20.00\nGrand Total: $22.46') == 22.46 def test_amount_due(self): assert _extract_amount_from_text('Amount Due: 198.40') == 198.40 def test_no_dollar_sign(self): assert _extract_amount_from_text('TOTAL 15.75') == 15.75 def test_last_match_wins(self): # Grand total should beat subtotal text = 'Subtotal 18.00\nTax 1.50\nTotal 19.50' assert _extract_amount_from_text(text) == 19.50 def test_empty_text(self): assert _extract_amount_from_text('') == 0.0 def test_no_total_line(self): assert _extract_amount_from_text('No price here') == 0.0 def test_comma_in_amount(self): assert _extract_amount_from_text('Grand Total: $1,234.56') == 1234.56 def test_bottom_scan_garbled_total(self): # OCR garbled "TOTAL" — bottom-scan fallback should find the amount text = 'Burger 5.99\nFries 2.50\nT0TAL 8.49' assert _extract_amount_from_text(text) == 8.49 def test_bottom_scan_skips_change(self): # Should return the total (8.49), not the change (1.51) text = 'TOTAL 8.49\nCash 10.00\nChange 1.51' assert _extract_amount_from_text(text) == 8.49 def test_bottom_scan_amount_on_own_line(self): # Amount printed on a separate line below the label text = 'Items 5.00\nTax 0.50\nTotal\n5.50' assert _extract_amount_from_text(text) == 5.50 def test_total_taxes_excluded(self): # "Total Taxes $2.80" must NOT be confused with the receipt total; # the labeled-total regex excludes 'total tax/taxes' via lookahead. text = 'Subtotal $40.10\nTotal Taxes $2.80\nTotal $42.90' assert _extract_amount_from_text(text) == 42.90 def test_pass1_returns_max_not_last(self): # If OCR garbles "Total Taxes" into "Total\n$2.80", _TOTAL_RE would # accidentally match twice. max() must win over positional [-1]. # Simulate by giving two labeled totals where smaller appears second. text = 'Grand Total $42.90\nTotal $2.80' assert _extract_amount_from_text(text) == 42.90 def test_total_sale_gas_station(self): # Costco / Shell gas receipts say "Total Sale $X.XX", not "Total: $X.XX" text = 'Pump 9 16.189 Gal\nRegular $ 58.75\nTotal Sale $ 58.75' assert _extract_amount_from_text(text) == 58.75 def test_net_sale(self): text = 'Items 22.00\nNet Sale $22.00' assert _extract_amount_from_text(text) == 22.00 def test_amount_due_with_usd_suffix(self): # "Total Charged" is in _TOTAL_RE — Pass 1 catches it text = 'Total Charged: $198.40 USD' assert _extract_amount_from_text(text) == 198.40 def test_top_amount_returned_by_max(self): # Display-style receipt: charge shown at top, no 'Total' label. # Pass 2 (max) must find $40.10 even though it is before the item list. text = 'LAYAL CAFE\n$40.10\n--------\nBreakfast 37.30\nCoffee 2.80' assert _extract_amount_from_text(text) == 40.10 def test_card_terminal_visa_line(self): # Card terminal: amount on a line prefixed with card-brand text. # VISA must NOT be in the skip list so the amount is captured. text = 'MERCHANT XYZ\nYHOOMHXAKKKEO4S VISA USD$ 36.78\nAuth 123456' assert _extract_amount_from_text(text) == 36.78 def test_max_beats_item_prices(self): # Receipt with several item prices — max should return the largest # (the total), not an item that appears last in the text. text = 'Burger 12.99\nFries 4.50\nDrink 2.99\nT0TAL 20.48' assert _extract_amount_from_text(text) == 20.48 def test_change_line_excluded_from_max(self): # Change-due line must be skipped so it never inflates the max. text = 'Items 8.49\nCash Tendered 20.00\nChange 11.51' assert _extract_amount_from_text(text) == 8.49 def test_net_fee_parking(self): # Parking kiosk receipts (e.g. MIA) use "net fee: 150.00 USD" format. # _TOTAL_RE must include "net fee" so Pass 1 catches it and avoids # the max-scan accidentally picking up a larger line like entry/exit fees. text = ( 'MIAMI AIRPORT PARKING\n' 'Entry 05/09 08:00\n' 'Exit 05/10 14:30\n' 'net fee: 150.00 USD' ) assert _extract_amount_from_text(text) == 150.00 class TestBankStatementDetection: def _stmt(self, n: int) -> str: """Generate fake bank statement with n transaction lines.""" lines = [f'05/{i+1:02d} MERCHANT {i} $1{i}.99' for i in range(n)] return '\n'.join(lines) def test_receipt_not_flagged(self): # A typical restaurant receipt has < 10 amount-bearing lines text = 'Acme Cafe\nBurger 12.99\nFries 4.50\nDrink 2.99\nTax 1.65\nTotal 22.13' assert _is_likely_bank_statement(text) is False def test_statement_flagged(self): # 10 transaction lines → flagged as statement assert _is_likely_bank_statement(self._stmt(10)) is True def test_threshold_boundary(self): assert _is_likely_bank_statement(self._stmt(9)) is False assert _is_likely_bank_statement(self._stmt(10)) is True def test_empty_text(self): assert _is_likely_bank_statement('') is False def test_no_amounts(self): assert _is_likely_bank_statement('Hello world\nNo prices here') is False @pytest.mark.asyncio async def test_parse_bank_statement_returns_skip(): """Bank statement image must be skipped — no amount, skip=True returned.""" agent = _make_agent() # Build fake OCR text with 12 transaction lines stmt_text = '\n'.join( f'05/{i+1:02d} SOME MERCHANT {i} ${10 + i}.99' for i in range(12) ) result = await agent._parse_receipt_text( stmt_text, '2026-05-15_bank.png', expense_products=[{'id': 1, 'name': 'Meals'}], ) assert result.get('skip') is True assert result['amount'] == 0.0 class TestExtractDate: def test_iso_format(self): assert _extract_date_from_text('Date: 2026-05-09') == '2026-05-09' def test_slash_iso(self): assert _extract_date_from_text('2026/05/09') == '2026-05-09' def test_us_format(self): assert _extract_date_from_text('05/09/2026') == '2026-05-09' def test_us_short_year(self): assert _extract_date_from_text('05/09/26') == '2026-05-09' def test_dd_mon_yyyy(self): # Airline / hotel receipts: "05 MAY 2026", "Issue Date: 05 May 2026" assert _extract_date_from_text('Issue Date: 05 MAY 2026 MIA A70') == '2026-05-05' def test_mon_dd_yyyy(self): assert _extract_date_from_text('MAY 05 2026') == '2026-05-05' def test_mon_dd_comma_yyyy(self): assert _extract_date_from_text('May 5, 2026') == '2026-05-05' def test_month_map_completeness(self): # All twelve three-letter abbreviations must be present assert len({k for k in _MONTH_MAP if len(k) == 3}) == 12 def test_no_date(self): assert _extract_date_from_text('No date here') is None def test_empty(self): assert _extract_date_from_text('') is None # --------------------------------------------------------------------------- # _parse_receipt_text — combined extraction # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_parse_plain_ocr_text_uses_llm_for_vendor(): """Regex extracts amount; LLM called only for vendor + product_name.""" agent = _make_agent() llm_resp = MagicMock() # LLM now only returns vendor + product_name llm_resp.content = '{"vendor":"Acme","product_name":"Meals"}' agent._llm.submit = AsyncMock(return_value=llm_resp) result = await agent._parse_receipt_text( 'Acme Store\nTotal: $9.99', 'receipt.jpg', expense_products=[{'id': 1, 'name': 'Meals'}], ) assert result['vendor'] == 'Acme' assert result['amount'] == 9.99 # from regex, not LLM assert result['product_name'] == 'Meals' agent._llm.submit.assert_called_once() @pytest.mark.asyncio async def test_parse_date_hint_overrides_ocr_date(): """date_hint from filename must be used; LLM date should be ignored.""" agent = _make_agent() llm_resp = MagicMock() llm_resp.content = '{"vendor":"Shell","product_name":"Fuel"}' agent._llm.submit = AsyncMock(return_value=llm_resp) result = await agent._parse_receipt_text( 'Shell Gas\n05/09/2021\nTotal: $45.00', 'shell.jpg', date_hint='2026-05-09', ) assert result['date'] == '2026-05-09' # filename timestamp wins assert result['amount'] == 45.00 @pytest.mark.asyncio async def test_parse_ocr_failed_skips_llm_amount(): """When OCR fails, amount=0 and date comes from hint or today.""" agent = _make_agent() llm_resp = MagicMock() llm_resp.content = '{"vendor":"","product_name":"Meals"}' agent._llm.submit = AsyncMock(return_value=llm_resp) result = await agent._parse_receipt_text( '[Image: broken.jpg — OCR failed]', 'broken.jpg', date_hint='2026-05-10', expense_products=[{'id': 1, 'name': 'Meals'}], ) assert result['amount'] == 0.0 assert result['date'] == '2026-05-10' @pytest.mark.asyncio async def test_vendor_prompt_does_not_contain_mcdonalds(): """The text-path vendor prompt must not reference 'McDonald' — it biases the model toward returning McDonald's whenever OCR text is unclear. Pinned to text mode so vision path (which has its own cleaner prompt) does not interfere. """ agent = _make_agent() captured: list[str] = [] llm_resp = MagicMock() llm_resp.content = '{"vendor":"The Home Depot","product_name":"Supplies"}' async def _capture(messages, caller=None): for m in messages: captured.append(m.get('content', '')) return llm_resp agent._llm.submit = _capture with patch('agent_service.agents.expenses_agent._get_vision_mode', return_value='text'): await agent._parse_receipt_text( 'THE HOME DEPOT\nHow doers get more done\nWAGNER FURNO 300HG 36.78\nVISA USD$ 36.78', 'homedepot.jpg', expense_products=[{'id': 1, 'name': 'Meals'}, {'id': 2, 'name': 'Supplies'}], ) full_prompt = ' '.join(captured) assert 'McDonald' not in full_prompt, ( "Text-path prompt must not contain 'McDonald' — it biases the model." ) @pytest.mark.asyncio async def test_vendor_prompt_instructs_not_to_guess_absent_brand(): """Text-path prompt must tell LLM not to substitute a brand not in the OCR text.""" agent = _make_agent() captured: list[str] = [] llm_resp = MagicMock() llm_resp.content = '{"vendor":"SERGIO\'S MIAMI AIRPORT","product_name":"Meals"}' async def _capture(messages, caller=None): for m in messages: captured.append(m.get('content', '')) return llm_resp agent._llm.submit = _capture with patch('agent_service.agents.expenses_agent._get_vision_mode', return_value='text'): await agent._parse_receipt_text( '(((HMSHost ByAvolta\nSERGIO\'S MIAMI AIRPORT\nCHK 9745\nPayment $16.29', 'sergios.jpg', expense_products=[{'id': 1, 'name': 'Meals'}], ) full_prompt = ' '.join(captured) assert 'only use a brand name' in full_prompt.lower() or \ 'do not' in full_prompt.lower() or \ 'not substitute' in full_prompt.lower(), ( "Prompt must instruct the LLM not to substitute a different brand name." ) # --------------------------------------------------------------------------- # Vision LLM path — _parse_receipt_text with b64/mimetype # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_vision_path_sends_image_to_llm(): """In vision mode, the LLM call includes an 'images' key with the b64 data.""" agent = _make_agent() captured_messages: list = [] llm_resp = MagicMock() llm_resp.content = '{"vendor":"Home Depot","product_name":"Supplies"}' async def _capture(messages, caller=None): captured_messages.extend(messages) return llm_resp agent._llm.submit = _capture with patch('agent_service.agents.expenses_agent._get_vision_mode', return_value='vision'): result = await agent._parse_receipt_text( 'THE HOME DEPOT\nTotal: $36.78', 'homedepot.jpg', expense_products=[{'id': 1, 'name': 'Supplies'}], b64='FAKEBASE64DATA', mimetype='image/jpeg', ) assert result['vendor'] == 'Home Depot' assert result['amount'] == 36.78 assert len(captured_messages) == 1 msg = captured_messages[0] assert 'images' in msg, "Vision path must include 'images' in LLM message" assert msg['images'] == ['FAKEBASE64DATA'] @pytest.mark.asyncio async def test_text_mode_skips_vision_even_with_image(): """When RECEIPT_VISION_MODE=text, b64 is ignored and no images are sent.""" agent = _make_agent() captured_messages: list = [] llm_resp = MagicMock() llm_resp.content = '{"vendor":"Home Depot","product_name":"Supplies"}' async def _capture(messages, caller=None): captured_messages.extend(messages) return llm_resp agent._llm.submit = _capture with patch('agent_service.agents.expenses_agent._get_vision_mode', return_value='text'): await agent._parse_receipt_text( 'THE HOME DEPOT\nTotal: $36.78', 'homedepot.jpg', expense_products=[{'id': 1, 'name': 'Supplies'}], b64='FAKEBASE64DATA', mimetype='image/jpeg', ) assert len(captured_messages) == 1 assert 'images' not in captured_messages[0], ( "Text mode must NOT send images to the LLM." ) @pytest.mark.asyncio async def test_vision_falls_back_to_text_on_llm_error(): """If the vision LLM call raises, the text path is tried as fallback.""" agent = _make_agent() call_count = [0] llm_resp = MagicMock() llm_resp.content = '{"vendor":"Shell","product_name":"Fuel"}' async def _first_fails(messages, caller=None): call_count[0] += 1 if call_count[0] == 1: raise RuntimeError('simulated vision model error') return llm_resp agent._llm.submit = _first_fails with patch('agent_service.agents.expenses_agent._get_vision_mode', return_value='vision'): result = await agent._parse_receipt_text( 'SHELL GAS STATION\nTotal Sale $55.00', 'shell.jpg', expense_products=[{'id': 1, 'name': 'Fuel'}], b64='FAKEBASE64DATA', mimetype='image/jpeg', ) assert call_count[0] == 2, "Must make exactly 2 LLM calls (vision failed, text succeeded)" assert result['vendor'] == 'Shell' assert result['amount'] == 55.00 @pytest.mark.asyncio async def test_non_image_mimetype_uses_text_path_in_vision_mode(): """PDFs and text files must always use the text path even in vision mode.""" agent = _make_agent() captured_messages: list = [] llm_resp = MagicMock() llm_resp.content = '{"vendor":"United Airlines","product_name":"Travel"}' async def _capture(messages, caller=None): captured_messages.extend(messages) return llm_resp agent._llm.submit = _capture with patch('agent_service.agents.expenses_agent._get_vision_mode', return_value='vision'): await agent._parse_receipt_text( 'United Airlines\nBaggage Fee\nTotal: $45.00', 'ticket.pdf', expense_products=[{'id': 1, 'name': 'Travel'}], b64='FAKEBASE64DATA', mimetype='application/pdf', # NOT an image — no vision ) assert len(captured_messages) == 1 assert 'images' not in captured_messages[0], ( "PDF receipts must not be sent as images even in vision mode." ) # --------------------------------------------------------------------------- # parse_upload — receipt_parser.py # --------------------------------------------------------------------------- from agent_service.tools.receipt_parser import parse_upload class TestParseUpload: def test_text_file_parsed(self): results = parse_upload('receipt.txt', b'Acme Store\nTotal: $10.00') assert len(results) == 1 r = results[0] assert r['filename'] == 'receipt.txt' assert 'Acme Store' in r['text'] assert r['mimetype'] == 'text/plain' assert r['sha256'] # hash present def test_date_extracted_from_filename(self): results = parse_upload('20260509_180857.jpg_compressed.JPEG', b'\xff\xd8\xff') assert results[0]['date_from_name'] == '2026-05-09' def test_no_date_in_plain_filename(self): results = parse_upload('receipt.txt', b'text') assert results[0]['date_from_name'] is None def test_zip_extracted(self): buf = io.BytesIO() with zipfile.ZipFile(buf, 'w') as zf: zf.writestr('receipt.txt', 'Vendor: Acme\nTotal: $5.00') zf.writestr('other.txt', 'Another receipt') results = parse_upload('bundle.zip', buf.getvalue()) assert len(results) == 2 filenames = {r['filename'] for r in results} assert 'receipt.txt' in filenames assert 'other.txt' in filenames def test_zip_skips_directories(self): buf = io.BytesIO() with zipfile.ZipFile(buf, 'w') as zf: zf.writestr('subdir/', '') # directory entry zf.writestr('subdir/file.txt', 'content') results = parse_upload('bundle.zip', buf.getvalue()) assert len(results) == 1 assert results[0]['filename'] == 'file.txt' def test_empty_zip_returns_empty(self): buf = io.BytesIO() with zipfile.ZipFile(buf, 'w'): pass results = parse_upload('empty.zip', buf.getvalue()) assert results == [] def test_sha256_is_consistent(self): data = b'some receipt bytes' r1 = parse_upload('a.txt', data)[0] r2 = parse_upload('b.txt', data)[0] assert r1['sha256'] == r2['sha256'] def test_b64_decodes_to_original(self): data = b'receipt content here' result = parse_upload('r.txt', data)[0] assert base64.b64decode(result['b64']) == data # --------------------------------------------------------------------------- # _text_to_html — ab_ai_mail.py # --------------------------------------------------------------------------- from agent_service.agents.expenses_agent import ExpensesAgent # already imported # Import from Odoo addon directly import sys, importlib def _get_text_to_html(): """Import _text_to_html without triggering Odoo module loading.""" import importlib.util, pathlib path = pathlib.Path(__file__).parent.parent / 'addons' / 'activeblue_ai' / 'models' / 'ab_ai_mail.py' spec = importlib.util.spec_from_file_location('ab_ai_mail', path) mod = importlib.util.module_from_spec(spec) # Stub out the odoo imports so the module loads without Odoo installed sys.modules.setdefault('odoo', MagicMock()) sys.modules.setdefault('odoo.SUPERUSER_ID', MagicMock()) sys.modules.setdefault('markupsafe', __import__('markupsafe')) spec.loader.exec_module(mod) return mod._text_to_html class TestTextToHtml: @pytest.fixture(autouse=True) def fn(self): try: self._fn = _get_text_to_html() except Exception: pytest.skip('ab_ai_mail could not be imported without Odoo environment') def test_plain_text_unchanged(self): result = str(self._fn('hello world')) assert 'hello world' in result def test_newline_becomes_br(self): result = str(self._fn('line one\nline two')) assert '
' in result assert 'line one' in result assert 'line two' in result def test_html_special_chars_escaped(self): result = str(self._fn('')) assert '