From 6fcd830e6f76e5a7792a59e668e6028ee3e97123 Mon Sep 17 00:00:00 2001 From: Carlos Garcia Date: Sat, 16 May 2026 18:11:32 -0400 Subject: [PATCH] test: unit tests for expenses agent dedup, plan, act, and receipt parser - TestFindSemanticDuplicate: 18 cases covering Pass1 (amount match), Pass2 (OCR mismatch / high vendor similarity), time window, filenames, zero-amount exclusion, multi-candidate index correctness - test_plan_*: keyword detection for confirm/skip/keep-all, mode routing - test_act_*: confirmation gate, byte-dedup, no-employee escalation, confirmed creation with mocked Odoo tools - TestParseUpload: ZIP extraction, directory skipping, filename date parsing, SHA256 consistency, b64 round-trip - TestTextToHtml: escaping, newline to
, empty string Co-Authored-By: Claude Sonnet 4.6 --- tests/test_expenses_agent.py | 519 +++++++++++++++++++++++++++++++++++ 1 file changed, 519 insertions(+) create mode 100644 tests/test_expenses_agent.py diff --git a/tests/test_expenses_agent.py b/tests/test_expenses_agent.py new file mode 100644 index 0000000..671cc96 --- /dev/null +++ b/tests/test_expenses_agent.py @@ -0,0 +1,519 @@ +""" +Unit tests for ExpensesAgent logic. + +Covers: + - _find_semantic_duplicate (two-pass dedup algorithm) + - _plan() (keyword detection → user_confirmed, user_dup_decision) + - _act() confirmation gate (enters awaiting_confirmation before writing records) + - parse_upload (ZIP extraction, filename date parsing) + - _text_to_html (HTML escaping and newline conversion) +""" +from __future__ import annotations + +import asyncio +import base64 +import io +import zipfile +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +# --------------------------------------------------------------------------- +# _find_semantic_duplicate +# --------------------------------------------------------------------------- + +from agent_service.agents.expenses_agent import ExpensesAgent + + +def _p(vendor='Acme', amount=10.00, date='2026-05-09', time=None): + """Shorthand for building a parsed-receipt dict.""" + return {'vendor': vendor, 'amount': amount, 'date': date, 'time': time, 'product_name': ''} + + +def _candidate(parsed): + """Wrap parsed dict as a (receipt, parsed) candidate tuple.""" + return ({}, parsed) + + +class TestFindSemanticDuplicate: + # ------------------------------------------------------------------ + # Pass 1: amount-based match + # ------------------------------------------------------------------ + + def test_exact_match(self): + candidates = [_candidate(_p('Acme', 10.00, '2026-05-09'))] + idx = ExpensesAgent._find_semantic_duplicate(_p('Acme', 10.00, '2026-05-09'), candidates) + assert idx == 0 + + def test_amount_within_threshold(self): + candidates = [_candidate(_p('Acme', 10.00, '2026-05-09'))] + idx = ExpensesAgent._find_semantic_duplicate(_p('Acme', 10.04, '2026-05-09'), candidates) + assert idx == 0 + + def test_amount_just_over_threshold(self): + candidates = [_candidate(_p('Acme', 10.00, '2026-05-09'))] + idx = ExpensesAgent._find_semantic_duplicate(_p('Acme', 10.06, '2026-05-09'), candidates) + # Pass 1 should miss; Pass 2 should still catch it (same vendor + date) + assert idx == 0 # caught by Pass 2 + + def test_different_date_not_duplicate(self): + candidates = [_candidate(_p('Acme', 10.00, '2026-05-09'))] + idx = ExpensesAgent._find_semantic_duplicate(_p('Acme', 10.00, '2026-05-10'), candidates) + assert idx is None + + def test_zero_amount_not_deduplicated(self): + """Zero-amount receipts are too ambiguous — never flagged as dups.""" + candidates = [_candidate(_p('Acme', 0.0, '2026-05-09'))] + idx = ExpensesAgent._find_semantic_duplicate(_p('Acme', 0.0, '2026-05-09'), candidates) + assert idx is None + + def test_vendor_similarity_above_threshold(self): + candidates = [_candidate(_p('IN-N-OUT HOUSTON', 8.55, '2026-05-09'))] + idx = ExpensesAgent._find_semantic_duplicate( + _p('In-N-Out Houston', 8.55, '2026-05-09'), candidates) + assert idx == 0 + + def test_vendor_similarity_below_threshold_pass1(self): + """Completely different vendors with same amount+date → not a dup.""" + candidates = [_candidate(_p('McDonald\'s', 8.55, '2026-05-09'))] + idx = ExpensesAgent._find_semantic_duplicate( + _p('Starbucks', 8.55, '2026-05-09'), candidates) + assert idx is None + + def test_time_within_window_is_dup(self): + candidates = [_candidate(_p('Acme', 10.00, '2026-05-09', time='14:00'))] + idx = ExpensesAgent._find_semantic_duplicate( + _p('Acme', 10.00, '2026-05-09', time='14:25'), candidates) + assert idx == 0 + + def test_time_outside_window_not_dup(self): + """Same vendor/amount/date but >30 min apart → different transactions.""" + candidates = [_candidate(_p('Acme', 10.00, '2026-05-09', time='12:00'))] + idx = ExpensesAgent._find_semantic_duplicate( + _p('Acme', 10.00, '2026-05-09', time='14:00'), candidates) + assert idx is None + + def test_one_time_missing_does_not_exclude(self): + """If only one receipt has a time, the time check is skipped.""" + candidates = [_candidate(_p('Acme', 10.00, '2026-05-09', time='12:00'))] + idx = ExpensesAgent._find_semantic_duplicate( + _p('Acme', 10.00, '2026-05-09', time=None), candidates) + assert idx == 0 + + def test_filename_vendor_same_amount_date_is_dup(self): + """Vendor looks like a filename → treated as dup if amount+date match.""" + candidates = [_candidate(_p('20260509_180857.jpg', 10.00, '2026-05-09'))] + idx = ExpensesAgent._find_semantic_duplicate( + _p('20260509_171757.jpg', 10.00, '2026-05-09'), candidates) + assert idx == 0 + + def test_no_candidates(self): + idx = ExpensesAgent._find_semantic_duplicate(_p(), []) + assert idx is None + + def test_returns_correct_index_multiple_candidates(self): + candidates = [ + _candidate(_p('Burger King', 5.00, '2026-05-09')), + _candidate(_p('Acme', 10.00, '2026-05-09')), + _candidate(_p('Starbucks', 4.50, '2026-05-08')), + ] + idx = ExpensesAgent._find_semantic_duplicate(_p('Acme Corp', 10.00, '2026-05-09'), candidates) + assert idx == 1 + + # ------------------------------------------------------------------ + # Pass 2: OCR amount mismatch (same vendor+date, different amount) + # ------------------------------------------------------------------ + + def test_pass2_catches_ocr_amount_mismatch(self): + """The In-N-Out $8.55 vs $15.00 bug: vendor ≥80%, same date, amounts far apart.""" + candidates = [_candidate(_p('IN-N-OUT HOUSTON', 8.55, '2026-05-09'))] + idx = ExpensesAgent._find_semantic_duplicate( + _p('In-N-Qut Houston', 15.00, '2026-05-09'), candidates) + assert idx == 0 + + def test_pass2_requires_high_vendor_similarity(self): + """Pass 2 threshold is 80% — a vague vendor name should not trigger it.""" + candidates = [_candidate(_p('Restaurant A', 8.55, '2026-05-09'))] + idx = ExpensesAgent._find_semantic_duplicate( + _p('Restaurant Z', 15.00, '2026-05-09'), candidates) + assert idx is None + + def test_pass2_same_date_required(self): + candidates = [_candidate(_p('IN-N-OUT HOUSTON', 8.55, '2026-05-08'))] + idx = ExpensesAgent._find_semantic_duplicate( + _p('In-N-Out Houston', 15.00, '2026-05-09'), candidates) + assert idx is None + + def test_pass2_respects_time_window(self): + """Even with high vendor similarity, >30 min apart means different visit.""" + candidates = [_candidate(_p('IN-N-OUT HOUSTON', 8.55, '2026-05-09', time='12:00'))] + idx = ExpensesAgent._find_semantic_duplicate( + _p('In-N-Out Houston', 15.00, '2026-05-09', time='15:00'), candidates) + assert idx is None + + def test_pass2_skips_filename_vendors(self): + """Pass 2 does not apply when the vendor looks like a filename.""" + candidates = [_candidate(_p('20260509_180857.jpg', 8.55, '2026-05-09'))] + idx = ExpensesAgent._find_semantic_duplicate( + _p('20260509_171757.jpg', 15.00, '2026-05-09'), candidates) + # Filenames have different names so similarity will be low; + # Pass 2 explicitly skips filename vendors. + assert idx is None + + def test_pass2_zero_amount_not_deduplicated(self): + candidates = [_candidate(_p('Acme', 0.0, '2026-05-09'))] + idx = ExpensesAgent._find_semantic_duplicate( + _p('Acme Corp', 0.0, '2026-05-09'), candidates) + assert idx is None + + +# --------------------------------------------------------------------------- +# _plan() — keyword detection +# --------------------------------------------------------------------------- + +def _make_agent(): + """Return an ExpensesAgent with all dependencies mocked out.""" + agent = ExpensesAgent.__new__(ExpensesAgent) + agent._odoo = MagicMock() + agent._llm = MagicMock() + agent._peer_bus = None + agent._et = MagicMock() + agent._gathered_data = {} + agent._actions_taken = [] + agent._escalations_list = [] + return agent + + +def _make_directive(task='', raw_message='', receipts=None): + directive = MagicMock() + directive.task = task + directive.params = {} + directive.directive_id = 'test-dir' + directive.context.peer_data = {'raw_message': raw_message, 'requesting_user_id': 1} + directive.context.receipts = receipts or [] + return directive + + +async def _run_plan(task='', raw_message='', receipts=None): + agent = _make_agent() + agent._directive = _make_directive(task=task, raw_message=raw_message, receipts=receipts) + return await agent._plan() + + +@pytest.mark.asyncio +async def test_plan_confirm_keyword_sets_confirmed(): + plan = await _run_plan(raw_message='confirm') + assert plan['user_confirmed'] is True + + +@pytest.mark.asyncio +async def test_plan_looks_good_sets_confirmed(): + plan = await _run_plan(raw_message='looks good') + assert plan['user_confirmed'] is True + + +@pytest.mark.asyncio +async def test_plan_go_ahead_sets_confirmed(): + plan = await _run_plan(raw_message='go ahead') + assert plan['user_confirmed'] is True + + +@pytest.mark.asyncio +async def test_plan_no_keyword_not_confirmed(): + plan = await _run_plan(raw_message='create an expense report') + assert plan['user_confirmed'] is False + + +@pytest.mark.asyncio +async def test_plan_keep_all_sets_dup_decision(): + plan = await _run_plan(raw_message='confirm, keep all') + assert plan['user_confirmed'] is True + assert plan['user_dup_decision'] == 'keep_all' + + +@pytest.mark.asyncio +async def test_plan_skip_sets_dup_decision(): + plan = await _run_plan(raw_message='skip duplicates') + assert plan['user_dup_decision'] == 'skip' + + +@pytest.mark.asyncio +async def test_plan_default_dup_decision_is_skip(): + """When user says 'confirm' with no dup instruction, default to skip.""" + plan = await _run_plan(raw_message='confirm') + assert plan['user_dup_decision'] == 'skip' + + +@pytest.mark.asyncio +async def test_plan_mode_is_read_without_receipts(): + plan = await _run_plan(raw_message='show my expenses') + assert plan['mode'] == 'read' + + +@pytest.mark.asyncio +async def test_plan_mode_is_create_with_receipts(): + fake_receipt = {'filename': 'receipt.jpg', 'text': '', 'b64': '', 'sha256': 'abc'} + plan = await _run_plan(raw_message='create expense report', receipts=[fake_receipt]) + assert plan['mode'] == 'create_from_receipts' + + +@pytest.mark.asyncio +async def test_plan_task_field_also_checked(): + """master LLM writes intent_summary into task; confirm in task should work.""" + plan = await _run_plan(task='confirm the expense report creation', raw_message='') + assert plan['user_confirmed'] is True + + +# --------------------------------------------------------------------------- +# _act() confirmation gate +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_act_enters_awaiting_confirmation_on_first_pass(): + """First call with receipts and no confirm → mode becomes awaiting_confirmation.""" + agent = _make_agent() + + fake_receipt = { + 'filename': 'receipt.jpg', 'text': 'Acme $10.00', + 'b64': '', 'sha256': 'abc123', 'mimetype': 'image/jpeg', + 'date_from_name': None, + } + agent._directive = _make_directive(raw_message='create expense report', receipts=[fake_receipt]) + agent._gathered_data = { + 'mode': 'create_from_receipts', + 'user_confirmed': False, + 'user_dup_decision': 'skip', + } + + parsed_result = {'vendor': 'Acme', 'amount': 10.00, 'date': '2026-05-09', + 'time': None, 'product_name': ''} + + agent._et.get_employee_id_for_user = AsyncMock(return_value=1) + agent._et.get_expense_products = AsyncMock(return_value=[ + {'id': 1, 'name': 'Meals'} + ]) + + with patch.object(agent, '_parse_receipt_text', new=AsyncMock(return_value=parsed_result)): + result = await agent._act({}) + + assert result == [] + assert agent._gathered_data['mode'] == 'awaiting_confirmation' + assert len(agent._confirmation_items) == 1 + vendor, parsed, is_dup = agent._confirmation_items[0] + + +@pytest.mark.asyncio +async def test_act_creates_sheet_when_confirmed(): + """Second call with user_confirmed=True → expense sheet is created.""" + agent = _make_agent() + + fake_receipt = { + 'filename': 'receipt.jpg', 'text': 'Acme $10.00', + 'b64': base64.b64encode(b'imgdata').decode(), 'sha256': 'abc123', + 'mimetype': 'image/jpeg', 'date_from_name': None, + } + agent._directive = _make_directive(raw_message='confirm', receipts=[fake_receipt]) + agent._gathered_data = { + 'mode': 'create_from_receipts', + 'user_confirmed': True, + 'user_dup_decision': 'skip', + } + + parsed_result = {'vendor': 'Acme', 'amount': 10.00, 'date': '2026-05-09', + 'time': None, 'product_name': 'Meals'} + + sheet_result = MagicMock(success=True, record_id=42) + expense_result = MagicMock(success=True, record_id=99) + + agent._et.get_employee_id_for_user = AsyncMock(return_value=1) + agent._et.get_expense_products = AsyncMock(return_value=[{'id': 1, 'name': 'Meals'}]) + agent._et.create_expense_sheet = AsyncMock(return_value=sheet_result) + agent._et.create_expense = AsyncMock(return_value=expense_result) + agent._et.attach_receipt = AsyncMock() + + with patch.object(agent, '_parse_receipt_text', new=AsyncMock(return_value=parsed_result)): + actions = await agent._act({}) + + assert any('Created expense sheet' in a for a in actions) + assert any('Acme' in a for a in actions) + agent._et.create_expense_sheet.assert_called_once() + agent._et.create_expense.assert_called_once() + + +@pytest.mark.asyncio +async def test_act_deduplicates_byte_identical_receipts(): + """Two receipts with the same SHA256 → only one expense created.""" + agent = _make_agent() + + receipt = { + 'filename': 'receipt.jpg', 'text': 'Acme $10.00', + 'b64': '', 'sha256': 'samehash', 'mimetype': 'image/jpeg', + 'date_from_name': None, + } + agent._directive = _make_directive(raw_message='confirm', receipts=[receipt, receipt]) + agent._gathered_data = { + 'mode': 'create_from_receipts', + 'user_confirmed': True, + 'user_dup_decision': 'skip', + } + + parsed_result = {'vendor': 'Acme', 'amount': 10.00, 'date': '2026-05-09', + 'time': None, 'product_name': ''} + + sheet_result = MagicMock(success=True, record_id=1) + expense_result = MagicMock(success=True, record_id=2) + + agent._et.get_employee_id_for_user = AsyncMock(return_value=1) + agent._et.get_expense_products = AsyncMock(return_value=[{'id': 1, 'name': 'Meals'}]) + agent._et.create_expense_sheet = AsyncMock(return_value=sheet_result) + agent._et.create_expense = AsyncMock(return_value=expense_result) + agent._et.attach_receipt = AsyncMock() + + with patch.object(agent, '_parse_receipt_text', new=AsyncMock(return_value=parsed_result)): + await agent._act({}) + + # Only one create_expense call despite two identical receipts + assert agent._et.create_expense.call_count == 1 + + +@pytest.mark.asyncio +async def test_act_no_employee_returns_empty_and_escalates(): + agent = _make_agent() + fake_receipt = {'filename': 'r.jpg', 'text': '', 'b64': '', 'sha256': 'x', + 'mimetype': 'image/jpeg', 'date_from_name': None} + agent._directive = _make_directive(raw_message='confirm', receipts=[fake_receipt]) + agent._gathered_data = { + 'mode': 'create_from_receipts', + 'user_confirmed': True, + 'user_dup_decision': 'skip', + } + agent._et.get_employee_id_for_user = AsyncMock(return_value=None) + agent._et.get_expense_products = AsyncMock(return_value=[]) + + result = await agent._act({}) + assert result == [] + assert any('No employee record' in e for e in agent._escalations_list) + + +# --------------------------------------------------------------------------- +# parse_upload — receipt_parser.py +# --------------------------------------------------------------------------- + +from agent_service.tools.receipt_parser import parse_upload, _ext + + +class TestParseUpload: + def test_ext_helper(self): + assert _ext('receipt.JPG') == 'jpg' + assert _ext('file.tar.gz') == 'gz' + assert _ext('noextension') == '' + + def test_text_file_parsed(self): + results = parse_upload('receipt.txt', b'Acme Store\nTotal: $10.00') + assert len(results) == 1 + r = results[0] + assert r['filename'] == 'receipt.txt' + assert 'Acme Store' in r['text'] + assert r['mimetype'] == 'text/plain' + assert r['sha256'] # hash present + + def test_date_extracted_from_filename(self): + results = parse_upload('20260509_180857.jpg_compressed.JPEG', b'\xff\xd8\xff') + assert results[0]['date_from_name'] == '2026-05-09' + + def test_no_date_in_plain_filename(self): + results = parse_upload('receipt.txt', b'text') + assert results[0]['date_from_name'] is None + + def test_zip_extracted(self): + buf = io.BytesIO() + with zipfile.ZipFile(buf, 'w') as zf: + zf.writestr('receipt.txt', 'Vendor: Acme\nTotal: $5.00') + zf.writestr('other.txt', 'Another receipt') + results = parse_upload('bundle.zip', buf.getvalue()) + assert len(results) == 2 + filenames = {r['filename'] for r in results} + assert 'receipt.txt' in filenames + assert 'other.txt' in filenames + + def test_zip_skips_directories(self): + buf = io.BytesIO() + with zipfile.ZipFile(buf, 'w') as zf: + zf.writestr('subdir/', '') # directory entry + zf.writestr('subdir/file.txt', 'content') + results = parse_upload('bundle.zip', buf.getvalue()) + assert len(results) == 1 + assert results[0]['filename'] == 'file.txt' + + def test_empty_zip_returns_empty(self): + buf = io.BytesIO() + with zipfile.ZipFile(buf, 'w'): + pass + results = parse_upload('empty.zip', buf.getvalue()) + assert results == [] + + def test_sha256_is_consistent(self): + data = b'some receipt bytes' + r1 = parse_upload('a.txt', data)[0] + r2 = parse_upload('b.txt', data)[0] + assert r1['sha256'] == r2['sha256'] + + def test_b64_decodes_to_original(self): + data = b'receipt content here' + result = parse_upload('r.txt', data)[0] + assert base64.b64decode(result['b64']) == data + + +# --------------------------------------------------------------------------- +# _text_to_html — ab_ai_mail.py +# --------------------------------------------------------------------------- + +from agent_service.agents.expenses_agent import ExpensesAgent # already imported + +# Import from Odoo addon directly +import sys, importlib + +def _get_text_to_html(): + """Import _text_to_html without triggering Odoo module loading.""" + import importlib.util, pathlib + path = pathlib.Path(__file__).parent.parent / 'addons' / 'activeblue_ai' / 'models' / 'ab_ai_mail.py' + spec = importlib.util.spec_from_file_location('ab_ai_mail', path) + mod = importlib.util.module_from_spec(spec) + # Stub out the odoo imports so the module loads without Odoo installed + sys.modules.setdefault('odoo', MagicMock()) + sys.modules.setdefault('odoo.SUPERUSER_ID', MagicMock()) + sys.modules.setdefault('markupsafe', __import__('markupsafe')) + spec.loader.exec_module(mod) + return mod._text_to_html + + +class TestTextToHtml: + @pytest.fixture(autouse=True) + def fn(self): + try: + self._fn = _get_text_to_html() + except Exception: + pytest.skip('ab_ai_mail could not be imported without Odoo environment') + + def test_plain_text_unchanged(self): + result = str(self._fn('hello world')) + assert 'hello world' in result + + def test_newline_becomes_br(self): + result = str(self._fn('line one\nline two')) + assert '
' in result + assert 'line one' in result + assert 'line two' in result + + def test_html_special_chars_escaped(self): + result = str(self._fn('')) + assert '