from __future__ import annotations import asyncio import difflib import json import logging import re from datetime import date as _date from .base_agent import BaseAgent, AgentReport, AgentDirective, SweepReport from ..tools.expenses_tools import ExpensesTools # --------------------------------------------------------------------------- # Receipt OCR helpers — regex-based, deterministic extraction # --------------------------------------------------------------------------- # Matches an explicitly labeled total line. # Handles "Total: $22.46", "GRAND TOTAL 22.46", "Amount Due: 22.46", # "Total Sale $58.75" (gas stations), "Net Sale $X", etc. # # The negative lookahead (?!\s*tax) prevents "Total Tax" / "Total Taxes" # (a sub-total line present on restaurant receipts) from being confused # with the final total when Tesseract splits a two-column label+amount # layout across lines. _TOTAL_RE = re.compile( r'(?:grand\s*total|total\s*due|amount\s*due|balance\s*due|' r'total\s*amount|total\s*charged|total\s*sale|net\s*sale|' r'sale\s*total|you\s*paid|amount\s*paid|total)' r'(?!\s*tax)' # exclude "Total Tax / Total Taxes" r'\s*[:\-]?\s*\$?\s*([\d,]+\.\d{2})', re.IGNORECASE, ) # Lines that should never be treated as the total — change given back, # tip added after the fact, etc. Card-brand lines like "VISA USD$ 36.78" # are intentionally NOT listed here: the amount on those lines IS the charge. _SKIP_LINE_RE = re.compile( r'\b(?:change|cash\s*(?:paid|tendered)?|tip|gratuity)\b', re.IGNORECASE, ) # Any standalone dollar-like amount (optional $, up to 6 digits, 2 decimals) _ANY_DOLLAR_RE = re.compile(r'(? bool: """Return True when the OCR text has too many amount-bearing lines to be a receipt. Single receipts: typically 1-9 lines with dollar values. Bank/card statements: 10-50+ lines (one per transaction). """ count = sum(1 for line in text.splitlines() if _ANY_DOLLAR_RE.search(line)) return count >= _STMT_AMOUNT_LINE_THRESHOLD _DATE_ISO_RE = re.compile(r'\b(\d{4})[-/](\d{2})[-/](\d{2})\b') # YYYY-MM-DD or YYYY/MM/DD _DATE_US_RE = re.compile(r'\b(\d{1,2})[/\-](\d{1,2})[/\-](\d{4})\b') # M/D/YYYY _DATE_US_SHORT_RE = re.compile(r'\b(\d{1,2})[/\-](\d{1,2})[/\-](\d{2})\b') # M/D/YY # "05 MAY 2026" or "MAY 05 2026" or "05 May, 2026" (airline / hotel receipts) _DATE_MON_RE = re.compile( r'\b(\d{1,2})\s+([A-Za-z]{3,9})[,\s]+(\d{4})\b' # DD MON YYYY r'|\b([A-Za-z]{3,9})\s+(\d{1,2})[,\s]+(\d{4})\b', # MON DD YYYY ) _MONTH_MAP: dict[str, int] = { 'jan': 1, 'feb': 2, 'mar': 3, 'apr': 4, 'may': 5, 'jun': 6, 'jul': 7, 'aug': 8, 'sep': 9, 'oct': 10, 'nov': 11, 'dec': 12, 'january': 1, 'february': 2, 'march': 3, 'april': 4, 'june': 6, 'july': 7, 'august': 8, 'september': 9, 'october': 10, 'november': 11, 'december': 12, } def _extract_amount_from_text(text: str) -> float: """Return the final total from OCR receipt text, or 0.0 if not found. Pass 1 — labeled total: 'Total:', 'Grand Total:', 'Amount Due:', etc. Pass 2 — full-text maximum: scan every line for a dollar amount (skipping change/tip lines) and return the largest value found. This handles: • display-style receipts that show the charge at the top with no label (e.g. LAYAL CAFE — "$40.10" printed before the item list) • card-terminal printouts with lines like "VISA USD$ 36.78" that carry no 'Total' keyword The maximum heuristic works because the receipt total is always ≥ any individual item price; Pass 1 (labeled total) catches the rare cases where a discount makes the total less than a line item. """ if not text: return 0.0 # Pass 1: explicit label match — return the LARGEST labeled amount. # Using max() rather than the last positional match handles the common # OCR artefact where "Total\n$2.80" (garbled "Total Taxes") appears # before "Total\n$42.90" in the text; the actual total wins on value. matches = list(_TOTAL_RE.finditer(text)) if matches: best_labeled = 0.0 for m in matches: try: val = float(m.group(1).replace(',', '')) if val > best_labeled: best_labeled = val except ValueError: pass if best_labeled > 0: return best_labeled # Pass 2: maximum dollar amount across the full text best = 0.0 for line in text.splitlines(): if _SKIP_LINE_RE.search(line): continue m = _ANY_DOLLAR_RE.search(line) if m: try: val = float(m.group(1).replace(',', '')) if val > best: best = val except ValueError: pass if best > 0: return best return 0.0 def _extract_date_from_text(text: str) -> str | None: """Return the first plausible date in OCR text as YYYY-MM-DD, or None.""" if not text: return None m = _DATE_ISO_RE.search(text) if m: y, mo, d = int(m.group(1)), int(m.group(2)), int(m.group(3)) if 2000 <= y <= 2099 and 1 <= mo <= 12 and 1 <= d <= 31: return f'{y}-{mo:02d}-{d:02d}' m = _DATE_US_RE.search(text) if m: mo, d, y = int(m.group(1)), int(m.group(2)), int(m.group(3)) if 1 <= mo <= 12 and 1 <= d <= 31 and y >= 2000: return f'{y}-{mo:02d}-{d:02d}' m = _DATE_US_SHORT_RE.search(text) if m: mo, d, yr = int(m.group(1)), int(m.group(2)), int(m.group(3)) if 1 <= mo <= 12 and 1 <= d <= 31: y = 2000 + yr if yr < 50 else 1900 + yr return f'{y}-{mo:02d}-{d:02d}' # Month-name formats: "05 MAY 2026", "MAY 05 2026", "05 May, 2026" # Common on airline, hotel, and formal business receipts. m = _DATE_MON_RE.search(text) if m: if m.group(1): # DD MON YYYY branch d_s, mon_s, y_s = m.group(1), m.group(2), m.group(3) else: # MON DD YYYY branch mon_s, d_s, y_s = m.group(4), m.group(5), m.group(6) mo = _MONTH_MAP.get(mon_s.lower()[:3]) if mo: d_i, y_i = int(d_s), int(y_s) if 1 <= d_i <= 31 and 2000 <= y_i <= 2099: return f'{y_i}-{mo:02d}-{d_i:02d}' return None logger = logging.getLogger(__name__) EXPENSES_TOOLS = [ {'name': 'get_expenses', 'description': 'Retrieve expense records', 'parameters': {'employee_id': {'type': 'integer', 'optional': True}, 'state': {'type': 'string', 'optional': True}, 'date_from': {'type': 'string', 'optional': True}, 'date_to': {'type': 'string', 'optional': True}, 'limit': {'type': 'integer', 'optional': True}}}, {'name': 'get_expense_sheets', 'description': 'Get expense report sheets', 'parameters': {'state': {'type': 'string', 'optional': True}, 'employee_id': {'type': 'integer', 'optional': True}, 'limit': {'type': 'integer', 'optional': True}}}, {'name': 'get_pending_approvals', 'description': 'Get expense sheets pending approval', 'parameters': {}}, {'name': 'approve_expense_sheet', 'description': 'Approve an expense sheet', 'parameters': {'sheet_id': {'type': 'integer'}}}, {'name': 'get_expenses_summary', 'description': 'Get expense summary for a period', 'parameters': {'date_from': {'type': 'string', 'optional': True}, 'date_to': {'type': 'string', 'optional': True}}}, {'name': 'get_expense_by_employee', 'description': 'Get expenses for a specific employee', 'parameters': {'employee_id': {'type': 'integer'}, 'limit': {'type': 'integer', 'optional': True}}}, {'name': 'flag_for_review', 'description': 'Flag an expense for review', 'parameters': {'model': {'type': 'string'}, 'record_id': {'type': 'integer'}, 'reason': {'type': 'string'}, 'severity': {'type': 'string', 'optional': True}}}, {'name': 'post_chatter_note', 'description': 'Post a note on a record', 'parameters': {'model': {'type': 'string'}, 'record_id': {'type': 'integer'}, 'note': {'type': 'string'}}}, ] class ExpensesAgent(BaseAgent): name = 'expenses_agent' domain = 'expenses' required_odoo_module = 'hr_expense' system_prompt_file = 'expenses_system.txt' tools = EXPENSES_TOOLS def __init__(self, odoo, llm, peer_bus=None): super().__init__(odoo, llm, peer_bus) self._et = ExpensesTools(odoo) self._gathered_data: dict = {} self._actions_taken: list = [] self._escalations_list: list = [] async def _plan(self) -> dict: task = (self._directive.task if self._directive else '').lower() receipts = getattr(self._directive.context, 'receipts', []) if self._directive else [] # The master LLM rewrites the user message into intent_summary (task). # Also check the original raw_message threaded through peer_data so # short replies like "skip duplicates" are detected even when rewritten. raw_msg = '' if self._directive and self._directive.context: raw_msg = (self._directive.context.peer_data.get('raw_message') or '').lower() combined = task + ' ' + raw_msg # Detect whether the user is responding to a duplicate-approval request skip_keywords = ('skip', 'remove duplicate', 'exclude duplicate', 'drop duplicate') keep_keywords = ('keep all', 'keep both', 'include all', 'no skip', "don't skip") confirm_keywords = ('confirm', 'looks good', 'go ahead', 'proceed', 'create it', 'create them') if any(k in combined for k in keep_keywords): user_dup_decision = 'keep_all' elif any(k in combined for k in skip_keywords): user_dup_decision = 'skip' else: user_dup_decision = 'skip' # default: skip duplicates when confirmed user_confirmed = any(k in combined for k in confirm_keywords) return { 'mode': 'create_from_receipts' if receipts else 'read', 'user_dup_decision': user_dup_decision, 'user_confirmed': user_confirmed, 'fetch_summary': any(k in task for k in ('summary', 'overview')) and not receipts, 'fetch_pending': any(k in task for k in ('pending', 'approve', 'approval')) and not receipts, 'employee_id': self._directive.params.get('employee_id') if self._directive else None, 'date_from': self._directive.params.get('date_from') if self._directive else None, 'date_to': self._directive.params.get('date_to') if self._directive else None, } async def _gather(self, plan: dict) -> dict: data: dict = {'mode': plan.get('mode', 'read'), 'user_dup_decision': plan.get('user_dup_decision', 'skip'), 'user_confirmed': plan.get('user_confirmed', False)} if plan.get('mode') == 'create_from_receipts': self._gathered_data = data return data data['summary'] = await self._et.get_expenses_summary( date_from=plan.get('date_from'), date_to=plan.get('date_to'), ) if plan.get('fetch_pending'): data['pending'] = await self._et.get_pending_approvals() self._gathered_data = data return data async def _reason(self) -> dict: data = self._gathered_data analysis: dict = {'escalations': [], 'flags': []} if data.get('mode') == 'create_from_receipts': self._escalations_list = [] return analysis summary = data.get('summary', {}) if summary.get('pending_approval_count', 0) > 10: analysis['escalations'].append( f'{summary["pending_approval_count"]} expense sheets pending approval.' ) self._escalations_list = analysis['escalations'] return analysis async def _act(self, reasoning: dict) -> list: if self._gathered_data.get('mode') != 'create_from_receipts': return [] receipts = getattr(self._directive.context, 'receipts', []) if self._directive else [] if not receipts: return [] user_dup_decision = self._gathered_data.get('user_dup_decision', 'skip') user_confirmed = self._gathered_data.get('user_confirmed', False) user_id = (self._directive.context.peer_data.get('requesting_user_id') if self._directive else None) employee_id = await self._et.get_employee_id_for_user(user_id) if not employee_id: self._escalations_list.append( 'No employee record found for the current user; cannot create expense report.') return [] expense_products = await self._et.get_expense_products() # Prefer "Meals" as the fallback category — most receipts are food. # Avoid blindly defaulting to whatever Odoo returns first (often "Communication"). _meals = next((p for p in expense_products if p['name'].lower() == 'meals'), None) default_product_id = ( _meals['id'] if _meals else (expense_products[0]['id'] if expense_products else None) ) product_map = {p['id']: p['name'] for p in expense_products} logger.info('expenses_agent: %d receipts received, %d expense products available', len(receipts), len(expense_products)) # Pass 1: byte-exact dedup seen_hashes: set = set() unique_receipts = [] for r in receipts: h = r.get('sha256') if h and h in seen_hashes: logger.info('expenses_agent: skipping byte-identical receipt %s', r.get('filename')) continue if h: seen_hashes.add(h) unique_receipts.append(r) # Log OCR quality for each receipt so we can diagnose extraction failures for r in unique_receipts: raw_text = r.get('text', '') or '' ocr_len = len(raw_text) ocr_preview = raw_text[:120].replace('\n', '↵') logger.info('ocr filename=%r date_hint=%r ocr_len=%d text_preview=%r', r.get('filename'), r.get('date_from_name'), ocr_len, ocr_preview) # Parse all receipts concurrently parse_tasks = [ self._parse_receipt_text( r.get('text', ''), r.get('filename', 'receipt'), expense_products=expense_products, date_hint=r.get('date_from_name'), ) for r in unique_receipts ] raw_parsed = await asyncio.gather(*parse_tasks, return_exceptions=True) paired: list[tuple[dict, dict]] = [] for receipt, parsed in zip(unique_receipts, raw_parsed): if isinstance(parsed, Exception): logger.warning('expenses_agent: parse failed for %s: %s', receipt.get('filename'), parsed) parsed = {'vendor': receipt.get('filename', 'Expense'), 'amount': 0.0, 'date': receipt.get('date_from_name') or _date.today().isoformat(), 'time': None, 'product_name': ''} if parsed.get('skip'): logger.info('expenses_agent: skipping bank/card statement: %s', receipt.get('filename')) self._escalations_list.append( f"Skipped \"{receipt.get('filename')}\": " 'looks like a bank or card statement, not a single receipt.' ) continue logger.info('parsed filename=%r → vendor=%r amount=%s date=%r product=%r', receipt.get('filename'), parsed.get('vendor'), parsed.get('amount'), parsed.get('date'), parsed.get('product_name')) paired.append((receipt, parsed)) # Pass 2: semantic dedup deduped: list[tuple[dict, dict]] = [] dup_indices: set[int] = set() # indices into `paired` that are duplicates for i, (receipt, parsed) in enumerate(paired): dup_idx = self._find_semantic_duplicate(parsed, deduped) if dup_idx is not None: dup_indices.add(i) if len(receipt.get('text', '')) > len(deduped[dup_idx][0].get('text', '')): deduped[dup_idx] = (receipt, parsed) else: deduped.append((receipt, parsed)) # Auto-skip semantic duplicates by default; keep_all only if user explicitly asked. # Receipts are only available in this single /upload request — there is no # persistent receipt store across turns, so a "confirm then create" flow would # always fail on the follow-up turn (no receipts in context). Creating # immediately in draft state is the correct approach: users review and # submit inside Odoo > Expenses. n_skipped = len(paired) - len(deduped) self._gathered_data['n_skipped'] = n_skipped final_list = paired if user_dup_decision == 'keep_all' else deduped sheet_name = f'Expense Report - {_date.today().isoformat()}' sheet_result = await self._et.create_expense_sheet(sheet_name, employee_id) if not sheet_result.success: self._escalations_list.append(f'Failed to create expense sheet: {sheet_result.error}') return [] sheet_id = sheet_result.record_id actions = [f'Created expense sheet "{sheet_name}" (ID {sheet_id})'] for receipt, parsed in final_list: product_id = default_product_id chosen_name = parsed.get('product_name', '') if chosen_name: for p in expense_products: if p['name'].lower() == chosen_name.lower(): product_id = p['id'] break expense_result = await self._et.create_expense( sheet_id=sheet_id, employee_id=employee_id, name=str(parsed.get('vendor', receipt.get('filename', 'Expense')))[:64], total_amount=float(parsed.get('amount', 0.0)), date=str(parsed.get('date') or _date.today().isoformat()), product_id=product_id, ) if expense_result.success: cat = product_map.get(product_id, 'Expense') actions.append( f"Added: {parsed.get('vendor', 'Unknown vendor')} " f"${float(parsed.get('amount', 0)):.2f} " f"({cat}) on {parsed.get('date', 'today')}" ) if receipt.get('b64'): await self._et.attach_receipt( 'hr.expense', expense_result.record_id, receipt.get('filename', 'receipt'), receipt['b64'], receipt.get('mimetype', 'application/octet-stream'), ) else: actions.append( f"Could not create expense for {receipt.get('filename', 'receipt')}: " f"{expense_result.error}" ) self._actions_taken = actions return actions @staticmethod def _find_semantic_duplicate(parsed: dict, candidates: list) -> int | None: """ Return the index in `candidates` of a receipt that appears to be the same physical receipt as `parsed`, or None if no match found. Pass 1 — exact-amount match (all must pass): 1. Same date 2. Amount > 0 and within $0.05 of each other 3. Transaction times within 30 min (if both present) 4. Vendor similarity >= 60 % (or both vendors are raw filenames) Pass 2 — OCR-error match (amount may differ due to misread): 1. Same date 2. Both amounts > 0 3. Vendor similarity >= 80 % (stricter threshold compensates for loose amount) 4. Times within 30 min (if both present) """ amt = float(parsed.get('amount', 0)) date = parsed.get('date', '') time = parsed.get('time') # HH:MM or None vendor = str(parsed.get('vendor', '')).lower().strip() is_filename = vendor.endswith(('.jpg', '.jpeg', '.png', '.gif', '.webp', '.bmp')) def _times_compatible(t1, t2) -> bool: """Return False only when both times are present and >30 min apart.""" if not (t1 and t2): return True try: h1, m1 = (int(p) for p in t1.split(':')[:2]) h2, m2 = (int(p) for p in t2.split(':')[:2]) return abs((h1 * 60 + m1) - (h2 * 60 + m2)) <= 30 except Exception: return True # Pass 1: amount must match within $0.05 for idx, (_, other) in enumerate(candidates): other_amt = float(other.get('amount', 0)) if amt == 0 or other_amt == 0: continue if abs(amt - other_amt) > 0.05: continue if date != other.get('date', ''): continue if not _times_compatible(time, other.get('time')): continue other_vendor = str(other.get('vendor', '')).lower().strip() other_is_filename = other_vendor.endswith( ('.jpg', '.jpeg', '.png', '.gif', '.webp', '.bmp')) if is_filename or other_is_filename: return idx if difflib.SequenceMatcher(None, vendor, other_vendor).ratio() >= 0.6: return idx # Pass 2: same vendor + same date even when amounts differ (OCR misread) if not is_filename: for idx, (_, other) in enumerate(candidates): other_amt = float(other.get('amount', 0)) if amt == 0 or other_amt == 0: continue if date != other.get('date', ''): continue if not _times_compatible(time, other.get('time')): continue other_vendor = str(other.get('vendor', '')).lower().strip() if other_vendor.endswith(('.jpg', '.jpeg', '.png', '.gif', '.webp', '.bmp')): continue if difflib.SequenceMatcher(None, vendor, other_vendor).ratio() >= 0.80: return idx return None async def _parse_receipt_text(self, text: str, filename: str, expense_products: list = None, date_hint: str = None) -> dict: """Parse a single receipt into structured fields. Strategy (most-reliable first): amount → regex on OCR text (deterministic) date → filename timestamp > OCR regex > today vendor → LLM (short excerpt, first ~600 chars) product_name→ LLM (semantic match against expense product list) The LLM is intentionally NOT asked for amount or date — the local model hallucinates those fields when OCR text is ambiguous. """ today = _date.today().isoformat() stripped = (text or '').strip() ocr_failed = not stripped or stripped.startswith('[') # ── Bank / card statement detection ────────────────────────────────── # A statement screenshot has many amount-bearing lines; running the # max-scan on it returns a random large transaction, not a total. # Skip these files so they don't produce a wildly wrong expense. if not ocr_failed and _is_likely_bank_statement(stripped): n = sum(1 for l in stripped.splitlines() if _ANY_DOLLAR_RE.search(l)) logger.warning( 'receipt %s: looks like a bank/card statement (%d amount lines) — skip', filename, n, ) return {'vendor': filename, 'amount': 0.0, 'date': date_hint or today, 'time': None, 'product_name': '', 'skip': True} # ── Amount: regex (deterministic) ──────────────────────────────────── amount = _extract_amount_from_text(stripped) if not ocr_failed else 0.0 # ── Date: filename > OCR regex > today ─────────────────────────────── if date_hint: date = date_hint elif not ocr_failed: date = _extract_date_from_text(stripped) or today else: date = today # ── Vendor + Category: LLM (two fields only) ───────────────────────── vendor = filename product_name = '' product_list = ', '.join(f'"{p["name"]}"' for p in (expense_products or [])) if not ocr_failed: # Give LLM only the header of the receipt — vendor is in the first lines excerpt = stripped[:600] prompt = ( 'Return ONLY valid JSON with exactly two keys:\n' '"vendor": the merchant or store name from the receipt header. ' 'OCR often garbles text — use your knowledge to correct obvious ' 'errors (e.g. "NeDonald\'s" → "McDonald\'s", "TN-N-QUT" → ' '"IN-N-OUT Burger", "Subwey" → "Subway", "LRYAL" → "LAYAL"). ' 'If this looks like a bank or credit-card statement listing ' 'multiple transactions rather than a single merchant receipt, ' 'use "". Use "" if no clear business name is visible.\n' f'"product_name": pick the single best match from [{product_list}]. ' 'Guide: restaurant / cafe / fast food → food/meal product; ' 'airline / airport / transit / taxi / parking / rental car → travel product; ' 'gas station / petrol / fuel → fuel product; ' 'hotel / motel / lodging → accommodation product; ' 'office / tech / hardware store → supplies product. ' 'Return "" if nothing fits.\n\n' f'Receipt text:\n{excerpt}\n\nJSON only:' ) elif product_list: # OCR failed — guess category from filename only prompt = ( f'A receipt file named "{filename}" could not be read. ' f'Pick the most likely match from [{product_list}] based on the filename, ' f'or "". Return ONLY: {{"vendor": "", "product_name": "..."}}' ) else: return {'vendor': filename, 'amount': amount, 'date': date, 'time': None, 'product_name': ''} try: resp = await self._llm.submit( [{'role': 'user', 'content': prompt}], caller='expenses_agent_receipt_parser', ) raw = (resp.content or '').strip() first, last = raw.find('{'), raw.rfind('}') if first != -1 and last > first: data = json.loads(raw[first:last + 1]) v = str(data.get('vendor', '') or '').strip() if v: vendor = v product_name = str(data.get('product_name', '') or '').strip() except Exception as exc: logger.warning('Receipt vendor/category parse failed for %s: %s', filename, exc) return {'vendor': vendor, 'amount': amount, 'date': date, 'time': None, 'product_name': product_name} async def _report(self) -> AgentReport: data = self._gathered_data directive_id = self._directive.directive_id if self._directive else '' if data.get('mode') == 'create_from_receipts': if self._actions_taken: lines = '\n'.join(f' • {a}' for a in self._actions_taken) n_skipped = data.get('n_skipped', 0) dup_note = f'\n({n_skipped} duplicate receipt(s) were automatically skipped.)' if n_skipped else '' stmt_skips = [e for e in self._escalations_list if 'statement' in e.lower()] stmt_note = ('\n⚠ ' + '\n⚠ '.join(stmt_skips)) if stmt_skips else '' summary = ( f'Expense report created successfully:\n{lines}{dup_note}{stmt_note}\n\n' 'The report is in draft — open Odoo › Expenses, ' 'review the amounts, and click Submit to send for approval.' ) status = 'complete' else: summary = ('Could not create expense report. ' + '; '.join(self._escalations_list or ['Unknown error'])) status = 'failed' return AgentReport( directive_id=directive_id, agent=self.name, status=status, summary=summary, data=data, escalations=self._escalations_list, actions_taken=self._actions_taken) summary_data = data.get('summary', {}) parts = [] if summary_data: parts.append( f'Expenses: {summary_data.get("total_expenses", 0)} records, ' f'total ${summary_data.get("total_amount", 0):.2f}. ' f'{summary_data.get("pending_approval_count", 0)} pending approval.' ) if not parts: parts.append('Expenses review complete.') return AgentReport( directive_id=directive_id, agent=self.name, status='complete', summary='\n'.join(parts), data=data, escalations=self._escalations_list, actions_taken=[]) async def _dispatch_tool(self, name: str, args: dict): dispatch = { 'get_expenses': self._et.get_expenses, 'get_expense_sheets': self._et.get_expense_sheets, 'get_pending_approvals': self._et.get_pending_approvals, 'approve_expense_sheet': self._et.approve_expense_sheet, 'get_expenses_summary': self._et.get_expenses_summary, 'get_expense_by_employee': self._et.get_expense_by_employee, 'flag_for_review': self._et.flag_for_review, 'post_chatter_note': self._et.post_chatter_note, } if name not in dispatch: raise ValueError(f'Unknown tool: {name}') return await dispatch[name](**args) async def handle_peer_request(self, request_type: str, params: dict, directive_id: str) -> dict: try: if request_type == 'expenses_summary': return await self._et.get_expenses_summary() if request_type == 'employee_expenses': return {'expenses': await self._et.get_expense_by_employee( employee_id=params['employee_id'])} return {'error': f'Unknown type: {request_type}'} except Exception as exc: return {'error': str(exc)} async def sweep(self) -> SweepReport: findings = [] try: pending = await self._et.get_pending_approvals() for sheet in pending: emp = sheet.get('employee_id', [0, '']) findings.append({ 'type': 'pending_expense_approval', 'sheet_id': sheet.get('id'), 'employee': emp[1] if isinstance(emp, list) else '', 'amount': sheet.get('total_amount', 0), 'severity': 'low', }) except Exception as exc: return SweepReport(agent=self.name, findings=[], error=str(exc)) return SweepReport(agent=self.name, findings=findings, actions_taken=[], summary=f'Expenses sweep: {len(findings)} pending approvals.')