diff --git a/tests/test_expenses_agent.py b/tests/test_expenses_agent.py
new file mode 100644
index 0000000..671cc96
--- /dev/null
+++ b/tests/test_expenses_agent.py
@@ -0,0 +1,519 @@
+"""
+Unit tests for ExpensesAgent logic.
+
+Covers:
+ - _find_semantic_duplicate (two-pass dedup algorithm)
+ - _plan() (keyword detection → user_confirmed, user_dup_decision)
+ - _act() confirmation gate (enters awaiting_confirmation before writing records)
+ - parse_upload (ZIP extraction, filename date parsing)
+ - _text_to_html (HTML escaping and newline conversion)
+"""
+from __future__ import annotations
+
+import asyncio
+import base64
+import io
+import zipfile
+from unittest.mock import AsyncMock, MagicMock, patch
+
+import pytest
+
+# ---------------------------------------------------------------------------
+# _find_semantic_duplicate
+# ---------------------------------------------------------------------------
+
+from agent_service.agents.expenses_agent import ExpensesAgent
+
+
+def _p(vendor='Acme', amount=10.00, date='2026-05-09', time=None):
+ """Shorthand for building a parsed-receipt dict."""
+ return {'vendor': vendor, 'amount': amount, 'date': date, 'time': time, 'product_name': ''}
+
+
+def _candidate(parsed):
+ """Wrap parsed dict as a (receipt, parsed) candidate tuple."""
+ return ({}, parsed)
+
+
+class TestFindSemanticDuplicate:
+ # ------------------------------------------------------------------
+ # Pass 1: amount-based match
+ # ------------------------------------------------------------------
+
+ def test_exact_match(self):
+ candidates = [_candidate(_p('Acme', 10.00, '2026-05-09'))]
+ idx = ExpensesAgent._find_semantic_duplicate(_p('Acme', 10.00, '2026-05-09'), candidates)
+ assert idx == 0
+
+ def test_amount_within_threshold(self):
+ candidates = [_candidate(_p('Acme', 10.00, '2026-05-09'))]
+ idx = ExpensesAgent._find_semantic_duplicate(_p('Acme', 10.04, '2026-05-09'), candidates)
+ assert idx == 0
+
+ def test_amount_just_over_threshold(self):
+ candidates = [_candidate(_p('Acme', 10.00, '2026-05-09'))]
+ idx = ExpensesAgent._find_semantic_duplicate(_p('Acme', 10.06, '2026-05-09'), candidates)
+ # Pass 1 should miss; Pass 2 should still catch it (same vendor + date)
+ assert idx == 0 # caught by Pass 2
+
+ def test_different_date_not_duplicate(self):
+ candidates = [_candidate(_p('Acme', 10.00, '2026-05-09'))]
+ idx = ExpensesAgent._find_semantic_duplicate(_p('Acme', 10.00, '2026-05-10'), candidates)
+ assert idx is None
+
+ def test_zero_amount_not_deduplicated(self):
+ """Zero-amount receipts are too ambiguous — never flagged as dups."""
+ candidates = [_candidate(_p('Acme', 0.0, '2026-05-09'))]
+ idx = ExpensesAgent._find_semantic_duplicate(_p('Acme', 0.0, '2026-05-09'), candidates)
+ assert idx is None
+
+ def test_vendor_similarity_above_threshold(self):
+ candidates = [_candidate(_p('IN-N-OUT HOUSTON', 8.55, '2026-05-09'))]
+ idx = ExpensesAgent._find_semantic_duplicate(
+ _p('In-N-Out Houston', 8.55, '2026-05-09'), candidates)
+ assert idx == 0
+
+ def test_vendor_similarity_below_threshold_pass1(self):
+ """Completely different vendors with same amount+date → not a dup."""
+ candidates = [_candidate(_p('McDonald\'s', 8.55, '2026-05-09'))]
+ idx = ExpensesAgent._find_semantic_duplicate(
+ _p('Starbucks', 8.55, '2026-05-09'), candidates)
+ assert idx is None
+
+ def test_time_within_window_is_dup(self):
+ candidates = [_candidate(_p('Acme', 10.00, '2026-05-09', time='14:00'))]
+ idx = ExpensesAgent._find_semantic_duplicate(
+ _p('Acme', 10.00, '2026-05-09', time='14:25'), candidates)
+ assert idx == 0
+
+ def test_time_outside_window_not_dup(self):
+ """Same vendor/amount/date but >30 min apart → different transactions."""
+ candidates = [_candidate(_p('Acme', 10.00, '2026-05-09', time='12:00'))]
+ idx = ExpensesAgent._find_semantic_duplicate(
+ _p('Acme', 10.00, '2026-05-09', time='14:00'), candidates)
+ assert idx is None
+
+ def test_one_time_missing_does_not_exclude(self):
+ """If only one receipt has a time, the time check is skipped."""
+ candidates = [_candidate(_p('Acme', 10.00, '2026-05-09', time='12:00'))]
+ idx = ExpensesAgent._find_semantic_duplicate(
+ _p('Acme', 10.00, '2026-05-09', time=None), candidates)
+ assert idx == 0
+
+ def test_filename_vendor_same_amount_date_is_dup(self):
+ """Vendor looks like a filename → treated as dup if amount+date match."""
+ candidates = [_candidate(_p('20260509_180857.jpg', 10.00, '2026-05-09'))]
+ idx = ExpensesAgent._find_semantic_duplicate(
+ _p('20260509_171757.jpg', 10.00, '2026-05-09'), candidates)
+ assert idx == 0
+
+ def test_no_candidates(self):
+ idx = ExpensesAgent._find_semantic_duplicate(_p(), [])
+ assert idx is None
+
+ def test_returns_correct_index_multiple_candidates(self):
+ candidates = [
+ _candidate(_p('Burger King', 5.00, '2026-05-09')),
+ _candidate(_p('Acme', 10.00, '2026-05-09')),
+ _candidate(_p('Starbucks', 4.50, '2026-05-08')),
+ ]
+ idx = ExpensesAgent._find_semantic_duplicate(_p('Acme Corp', 10.00, '2026-05-09'), candidates)
+ assert idx == 1
+
+ # ------------------------------------------------------------------
+ # Pass 2: OCR amount mismatch (same vendor+date, different amount)
+ # ------------------------------------------------------------------
+
+ def test_pass2_catches_ocr_amount_mismatch(self):
+ """The In-N-Out $8.55 vs $15.00 bug: vendor ≥80%, same date, amounts far apart."""
+ candidates = [_candidate(_p('IN-N-OUT HOUSTON', 8.55, '2026-05-09'))]
+ idx = ExpensesAgent._find_semantic_duplicate(
+ _p('In-N-Qut Houston', 15.00, '2026-05-09'), candidates)
+ assert idx == 0
+
+ def test_pass2_requires_high_vendor_similarity(self):
+ """Pass 2 threshold is 80% — a vague vendor name should not trigger it."""
+ candidates = [_candidate(_p('Restaurant A', 8.55, '2026-05-09'))]
+ idx = ExpensesAgent._find_semantic_duplicate(
+ _p('Restaurant Z', 15.00, '2026-05-09'), candidates)
+ assert idx is None
+
+ def test_pass2_same_date_required(self):
+ candidates = [_candidate(_p('IN-N-OUT HOUSTON', 8.55, '2026-05-08'))]
+ idx = ExpensesAgent._find_semantic_duplicate(
+ _p('In-N-Out Houston', 15.00, '2026-05-09'), candidates)
+ assert idx is None
+
+ def test_pass2_respects_time_window(self):
+ """Even with high vendor similarity, >30 min apart means different visit."""
+ candidates = [_candidate(_p('IN-N-OUT HOUSTON', 8.55, '2026-05-09', time='12:00'))]
+ idx = ExpensesAgent._find_semantic_duplicate(
+ _p('In-N-Out Houston', 15.00, '2026-05-09', time='15:00'), candidates)
+ assert idx is None
+
+ def test_pass2_skips_filename_vendors(self):
+ """Pass 2 does not apply when the vendor looks like a filename."""
+ candidates = [_candidate(_p('20260509_180857.jpg', 8.55, '2026-05-09'))]
+ idx = ExpensesAgent._find_semantic_duplicate(
+ _p('20260509_171757.jpg', 15.00, '2026-05-09'), candidates)
+ # Filenames have different names so similarity will be low;
+ # Pass 2 explicitly skips filename vendors.
+ assert idx is None
+
+ def test_pass2_zero_amount_not_deduplicated(self):
+ candidates = [_candidate(_p('Acme', 0.0, '2026-05-09'))]
+ idx = ExpensesAgent._find_semantic_duplicate(
+ _p('Acme Corp', 0.0, '2026-05-09'), candidates)
+ assert idx is None
+
+
+# ---------------------------------------------------------------------------
+# _plan() — keyword detection
+# ---------------------------------------------------------------------------
+
+def _make_agent():
+ """Return an ExpensesAgent with all dependencies mocked out."""
+ agent = ExpensesAgent.__new__(ExpensesAgent)
+ agent._odoo = MagicMock()
+ agent._llm = MagicMock()
+ agent._peer_bus = None
+ agent._et = MagicMock()
+ agent._gathered_data = {}
+ agent._actions_taken = []
+ agent._escalations_list = []
+ return agent
+
+
+def _make_directive(task='', raw_message='', receipts=None):
+ directive = MagicMock()
+ directive.task = task
+ directive.params = {}
+ directive.directive_id = 'test-dir'
+ directive.context.peer_data = {'raw_message': raw_message, 'requesting_user_id': 1}
+ directive.context.receipts = receipts or []
+ return directive
+
+
+async def _run_plan(task='', raw_message='', receipts=None):
+ agent = _make_agent()
+ agent._directive = _make_directive(task=task, raw_message=raw_message, receipts=receipts)
+ return await agent._plan()
+
+
+@pytest.mark.asyncio
+async def test_plan_confirm_keyword_sets_confirmed():
+ plan = await _run_plan(raw_message='confirm')
+ assert plan['user_confirmed'] is True
+
+
+@pytest.mark.asyncio
+async def test_plan_looks_good_sets_confirmed():
+ plan = await _run_plan(raw_message='looks good')
+ assert plan['user_confirmed'] is True
+
+
+@pytest.mark.asyncio
+async def test_plan_go_ahead_sets_confirmed():
+ plan = await _run_plan(raw_message='go ahead')
+ assert plan['user_confirmed'] is True
+
+
+@pytest.mark.asyncio
+async def test_plan_no_keyword_not_confirmed():
+ plan = await _run_plan(raw_message='create an expense report')
+ assert plan['user_confirmed'] is False
+
+
+@pytest.mark.asyncio
+async def test_plan_keep_all_sets_dup_decision():
+ plan = await _run_plan(raw_message='confirm, keep all')
+ assert plan['user_confirmed'] is True
+ assert plan['user_dup_decision'] == 'keep_all'
+
+
+@pytest.mark.asyncio
+async def test_plan_skip_sets_dup_decision():
+ plan = await _run_plan(raw_message='skip duplicates')
+ assert plan['user_dup_decision'] == 'skip'
+
+
+@pytest.mark.asyncio
+async def test_plan_default_dup_decision_is_skip():
+ """When user says 'confirm' with no dup instruction, default to skip."""
+ plan = await _run_plan(raw_message='confirm')
+ assert plan['user_dup_decision'] == 'skip'
+
+
+@pytest.mark.asyncio
+async def test_plan_mode_is_read_without_receipts():
+ plan = await _run_plan(raw_message='show my expenses')
+ assert plan['mode'] == 'read'
+
+
+@pytest.mark.asyncio
+async def test_plan_mode_is_create_with_receipts():
+ fake_receipt = {'filename': 'receipt.jpg', 'text': '', 'b64': '', 'sha256': 'abc'}
+ plan = await _run_plan(raw_message='create expense report', receipts=[fake_receipt])
+ assert plan['mode'] == 'create_from_receipts'
+
+
+@pytest.mark.asyncio
+async def test_plan_task_field_also_checked():
+ """master LLM writes intent_summary into task; confirm in task should work."""
+ plan = await _run_plan(task='confirm the expense report creation', raw_message='')
+ assert plan['user_confirmed'] is True
+
+
+# ---------------------------------------------------------------------------
+# _act() confirmation gate
+# ---------------------------------------------------------------------------
+
+@pytest.mark.asyncio
+async def test_act_enters_awaiting_confirmation_on_first_pass():
+ """First call with receipts and no confirm → mode becomes awaiting_confirmation."""
+ agent = _make_agent()
+
+ fake_receipt = {
+ 'filename': 'receipt.jpg', 'text': 'Acme $10.00',
+ 'b64': '', 'sha256': 'abc123', 'mimetype': 'image/jpeg',
+ 'date_from_name': None,
+ }
+ agent._directive = _make_directive(raw_message='create expense report', receipts=[fake_receipt])
+ agent._gathered_data = {
+ 'mode': 'create_from_receipts',
+ 'user_confirmed': False,
+ 'user_dup_decision': 'skip',
+ }
+
+ parsed_result = {'vendor': 'Acme', 'amount': 10.00, 'date': '2026-05-09',
+ 'time': None, 'product_name': ''}
+
+ agent._et.get_employee_id_for_user = AsyncMock(return_value=1)
+ agent._et.get_expense_products = AsyncMock(return_value=[
+ {'id': 1, 'name': 'Meals'}
+ ])
+
+ with patch.object(agent, '_parse_receipt_text', new=AsyncMock(return_value=parsed_result)):
+ result = await agent._act({})
+
+ assert result == []
+ assert agent._gathered_data['mode'] == 'awaiting_confirmation'
+ assert len(agent._confirmation_items) == 1
+ vendor, parsed, is_dup = agent._confirmation_items[0]
+
+
+@pytest.mark.asyncio
+async def test_act_creates_sheet_when_confirmed():
+ """Second call with user_confirmed=True → expense sheet is created."""
+ agent = _make_agent()
+
+ fake_receipt = {
+ 'filename': 'receipt.jpg', 'text': 'Acme $10.00',
+ 'b64': base64.b64encode(b'imgdata').decode(), 'sha256': 'abc123',
+ 'mimetype': 'image/jpeg', 'date_from_name': None,
+ }
+ agent._directive = _make_directive(raw_message='confirm', receipts=[fake_receipt])
+ agent._gathered_data = {
+ 'mode': 'create_from_receipts',
+ 'user_confirmed': True,
+ 'user_dup_decision': 'skip',
+ }
+
+ parsed_result = {'vendor': 'Acme', 'amount': 10.00, 'date': '2026-05-09',
+ 'time': None, 'product_name': 'Meals'}
+
+ sheet_result = MagicMock(success=True, record_id=42)
+ expense_result = MagicMock(success=True, record_id=99)
+
+ agent._et.get_employee_id_for_user = AsyncMock(return_value=1)
+ agent._et.get_expense_products = AsyncMock(return_value=[{'id': 1, 'name': 'Meals'}])
+ agent._et.create_expense_sheet = AsyncMock(return_value=sheet_result)
+ agent._et.create_expense = AsyncMock(return_value=expense_result)
+ agent._et.attach_receipt = AsyncMock()
+
+ with patch.object(agent, '_parse_receipt_text', new=AsyncMock(return_value=parsed_result)):
+ actions = await agent._act({})
+
+ assert any('Created expense sheet' in a for a in actions)
+ assert any('Acme' in a for a in actions)
+ agent._et.create_expense_sheet.assert_called_once()
+ agent._et.create_expense.assert_called_once()
+
+
+@pytest.mark.asyncio
+async def test_act_deduplicates_byte_identical_receipts():
+ """Two receipts with the same SHA256 → only one expense created."""
+ agent = _make_agent()
+
+ receipt = {
+ 'filename': 'receipt.jpg', 'text': 'Acme $10.00',
+ 'b64': '', 'sha256': 'samehash', 'mimetype': 'image/jpeg',
+ 'date_from_name': None,
+ }
+ agent._directive = _make_directive(raw_message='confirm', receipts=[receipt, receipt])
+ agent._gathered_data = {
+ 'mode': 'create_from_receipts',
+ 'user_confirmed': True,
+ 'user_dup_decision': 'skip',
+ }
+
+ parsed_result = {'vendor': 'Acme', 'amount': 10.00, 'date': '2026-05-09',
+ 'time': None, 'product_name': ''}
+
+ sheet_result = MagicMock(success=True, record_id=1)
+ expense_result = MagicMock(success=True, record_id=2)
+
+ agent._et.get_employee_id_for_user = AsyncMock(return_value=1)
+ agent._et.get_expense_products = AsyncMock(return_value=[{'id': 1, 'name': 'Meals'}])
+ agent._et.create_expense_sheet = AsyncMock(return_value=sheet_result)
+ agent._et.create_expense = AsyncMock(return_value=expense_result)
+ agent._et.attach_receipt = AsyncMock()
+
+ with patch.object(agent, '_parse_receipt_text', new=AsyncMock(return_value=parsed_result)):
+ await agent._act({})
+
+ # Only one create_expense call despite two identical receipts
+ assert agent._et.create_expense.call_count == 1
+
+
+@pytest.mark.asyncio
+async def test_act_no_employee_returns_empty_and_escalates():
+ agent = _make_agent()
+ fake_receipt = {'filename': 'r.jpg', 'text': '', 'b64': '', 'sha256': 'x',
+ 'mimetype': 'image/jpeg', 'date_from_name': None}
+ agent._directive = _make_directive(raw_message='confirm', receipts=[fake_receipt])
+ agent._gathered_data = {
+ 'mode': 'create_from_receipts',
+ 'user_confirmed': True,
+ 'user_dup_decision': 'skip',
+ }
+ agent._et.get_employee_id_for_user = AsyncMock(return_value=None)
+ agent._et.get_expense_products = AsyncMock(return_value=[])
+
+ result = await agent._act({})
+ assert result == []
+ assert any('No employee record' in e for e in agent._escalations_list)
+
+
+# ---------------------------------------------------------------------------
+# parse_upload — receipt_parser.py
+# ---------------------------------------------------------------------------
+
+from agent_service.tools.receipt_parser import parse_upload, _ext
+
+
+class TestParseUpload:
+ def test_ext_helper(self):
+ assert _ext('receipt.JPG') == 'jpg'
+ assert _ext('file.tar.gz') == 'gz'
+ assert _ext('noextension') == ''
+
+ def test_text_file_parsed(self):
+ results = parse_upload('receipt.txt', b'Acme Store\nTotal: $10.00')
+ assert len(results) == 1
+ r = results[0]
+ assert r['filename'] == 'receipt.txt'
+ assert 'Acme Store' in r['text']
+ assert r['mimetype'] == 'text/plain'
+ assert r['sha256'] # hash present
+
+ def test_date_extracted_from_filename(self):
+ results = parse_upload('20260509_180857.jpg_compressed.JPEG', b'\xff\xd8\xff')
+ assert results[0]['date_from_name'] == '2026-05-09'
+
+ def test_no_date_in_plain_filename(self):
+ results = parse_upload('receipt.txt', b'text')
+ assert results[0]['date_from_name'] is None
+
+ def test_zip_extracted(self):
+ buf = io.BytesIO()
+ with zipfile.ZipFile(buf, 'w') as zf:
+ zf.writestr('receipt.txt', 'Vendor: Acme\nTotal: $5.00')
+ zf.writestr('other.txt', 'Another receipt')
+ results = parse_upload('bundle.zip', buf.getvalue())
+ assert len(results) == 2
+ filenames = {r['filename'] for r in results}
+ assert 'receipt.txt' in filenames
+ assert 'other.txt' in filenames
+
+ def test_zip_skips_directories(self):
+ buf = io.BytesIO()
+ with zipfile.ZipFile(buf, 'w') as zf:
+ zf.writestr('subdir/', '') # directory entry
+ zf.writestr('subdir/file.txt', 'content')
+ results = parse_upload('bundle.zip', buf.getvalue())
+ assert len(results) == 1
+ assert results[0]['filename'] == 'file.txt'
+
+ def test_empty_zip_returns_empty(self):
+ buf = io.BytesIO()
+ with zipfile.ZipFile(buf, 'w'):
+ pass
+ results = parse_upload('empty.zip', buf.getvalue())
+ assert results == []
+
+ def test_sha256_is_consistent(self):
+ data = b'some receipt bytes'
+ r1 = parse_upload('a.txt', data)[0]
+ r2 = parse_upload('b.txt', data)[0]
+ assert r1['sha256'] == r2['sha256']
+
+ def test_b64_decodes_to_original(self):
+ data = b'receipt content here'
+ result = parse_upload('r.txt', data)[0]
+ assert base64.b64decode(result['b64']) == data
+
+
+# ---------------------------------------------------------------------------
+# _text_to_html — ab_ai_mail.py
+# ---------------------------------------------------------------------------
+
+from agent_service.agents.expenses_agent import ExpensesAgent # already imported
+
+# Import from Odoo addon directly
+import sys, importlib
+
+def _get_text_to_html():
+ """Import _text_to_html without triggering Odoo module loading."""
+ import importlib.util, pathlib
+ path = pathlib.Path(__file__).parent.parent / 'addons' / 'activeblue_ai' / 'models' / 'ab_ai_mail.py'
+ spec = importlib.util.spec_from_file_location('ab_ai_mail', path)
+ mod = importlib.util.module_from_spec(spec)
+ # Stub out the odoo imports so the module loads without Odoo installed
+ sys.modules.setdefault('odoo', MagicMock())
+ sys.modules.setdefault('odoo.SUPERUSER_ID', MagicMock())
+ sys.modules.setdefault('markupsafe', __import__('markupsafe'))
+ spec.loader.exec_module(mod)
+ return mod._text_to_html
+
+
+class TestTextToHtml:
+ @pytest.fixture(autouse=True)
+ def fn(self):
+ try:
+ self._fn = _get_text_to_html()
+ except Exception:
+ pytest.skip('ab_ai_mail could not be imported without Odoo environment')
+
+ def test_plain_text_unchanged(self):
+ result = str(self._fn('hello world'))
+ assert 'hello world' in result
+
+ def test_newline_becomes_br(self):
+ result = str(self._fn('line one\nline two'))
+ assert '
' in result
+ assert 'line one' in result
+ assert 'line two' in result
+
+ def test_html_special_chars_escaped(self):
+ result = str(self._fn(''))
+ assert '