Files
odoo-ai/agent_service/tools/expenses_tools.py
ActiveBlue Build fe47f950e4 feat(agents): add 7 specialist agents with tools and system prompts
Agents (all following 6-step contract: _plan/_gather/_reason/_act/_report):
- AccountingAgent: trial balance, chart of accounts, tax summary (HIPAA-locked)
- CrmAgent: pipeline summary, lead/opportunity management, won/lost analysis
- SalesAgent: sales orders, quotations, revenue by rep, expired quote detection
- ProjectAgent: task tracking, blocked/overdue detection, timesheet logging
- ElearningAgent: course completion, low-engagement flagging, next-course suggestion
- ExpensesAgent: expense sheets, pending approvals, policy violations (HIPAA-locked)
- EmployeesAgent: headcount, contracts, leaves, attendance, expired contract sweep (HIPAA-locked)

Tools (one file per domain):
- accounting_tools.py, crm_tools.py, sales_tools.py, project_tools.py
- elearning_tools.py, expenses_tools.py, employees_tools.py

System prompts: each agent has a domain-specific system.txt with rules and output format

All agents implement handle_peer_request() and sweep() for proactive monitoring
HIPAA-locked agents (accounting, expenses, employees) enforced via LLMRouter

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-12 18:04:32 -04:00

87 lines
3.6 KiB
Python

from __future__ import annotations
import logging
from ..tools.odoo_client import OdooClient
logger = logging.getLogger(__name__)
class ExpensesTools:
def __init__(self, odoo: OdooClient):
self._o = odoo
async def get_expenses(self, employee_id: int = None, state: str = None,
date_from: str = None, date_to: str = None, limit: int = 50) -> list:
domain = []
if employee_id:
domain.append(('employee_id', '=', employee_id))
if state:
domain.append(('state', '=', state))
if date_from:
domain.append(('date', '>=', date_from))
if date_to:
domain.append(('date', '<=', date_to))
fields = ['name', 'employee_id', 'product_id', 'total_amount', 'date',
'state', 'sheet_id', 'description']
return await self._o.search_read('hr.expense', domain, fields, limit=limit)
async def get_expense_sheets(self, state: str = None, employee_id: int = None,
limit: int = 50) -> list:
domain = []
if state:
domain.append(('state', '=', state))
if employee_id:
domain.append(('employee_id', '=', employee_id))
fields = ['name', 'employee_id', 'state', 'total_amount', 'date',
'accounting_date', 'journal_id']
return await self._o.search_read('hr.expense.sheet', domain, fields, limit=limit)
async def get_pending_approvals(self) -> list:
return await self._o.search_read(
'hr.expense.sheet',
[('state', '=', 'submit')],
['name', 'employee_id', 'total_amount', 'date'],
limit=100,
)
async def approve_expense_sheet(self, sheet_id: int) -> bool:
try:
await self._o.call('hr.expense.sheet', 'approve_expense_sheets', [[sheet_id]])
logger.info('Approved expense sheet %s', sheet_id)
return True
except Exception as exc:
logger.warning('approve_expense_sheet failed %s: %s', sheet_id, exc)
return False
async def get_expenses_summary(self, date_from: str = None, date_to: str = None) -> dict:
domain = [('state', 'not in', ['refused'])]
if date_from:
domain.append(('date', '>=', date_from))
if date_to:
domain.append(('date', '<=', date_to))
expenses = await self._o.search_read('hr.expense', domain, ['total_amount', 'employee_id', 'product_id'], limit=1000)
total = sum(e.get('total_amount', 0) for e in expenses)
pending_sheets = await self.get_pending_approvals()
return {
'total_expenses': len(expenses),
'total_amount': total,
'pending_approval_count': len(pending_sheets),
'pending_amount': sum(s.get('total_amount', 0) for s in pending_sheets),
}
async def get_expense_by_employee(self, employee_id: int, limit: int = 20) -> list:
return await self._o.search_read(
'hr.expense',
[('employee_id', '=', employee_id)],
['name', 'total_amount', 'date', 'state', 'product_id'],
limit=limit,
)
async def flag_for_review(self, model: str, record_id: int, reason: str, severity: str = 'medium') -> bool:
msg = f'[AI FLAG - {severity.upper()}] {reason}'
await self._o.call(model, 'message_post', [[record_id]], {'body': msg, 'message_type': 'comment'})
return True
async def post_chatter_note(self, model: str, record_id: int, note: str) -> bool:
await self._o.call(model, 'message_post', [[record_id]], {'body': note, 'message_type': 'comment'})
return True