from __future__ import annotations import logging from ..tools.odoo_client import OdooClient logger = logging.getLogger(__name__) class ExpensesTools: def __init__(self, odoo: OdooClient): self._o = odoo async def get_expenses(self, employee_id: int = None, state: str = None, date_from: str = None, date_to: str = None, limit: int = 50) -> list: domain = [] if employee_id: domain.append(('employee_id', '=', employee_id)) if state: domain.append(('state', '=', state)) if date_from: domain.append(('date', '>=', date_from)) if date_to: domain.append(('date', '<=', date_to)) fields = ['name', 'employee_id', 'product_id', 'total_amount', 'date', 'state', 'sheet_id', 'description'] return await self._o.search_read('hr.expense', domain, fields, limit=limit) async def get_expense_sheets(self, state: str = None, employee_id: int = None, limit: int = 50) -> list: domain = [] if state: domain.append(('state', '=', state)) if employee_id: domain.append(('employee_id', '=', employee_id)) fields = ['name', 'employee_id', 'state', 'total_amount', 'date', 'accounting_date', 'journal_id'] return await self._o.search_read('hr.expense.sheet', domain, fields, limit=limit) async def get_pending_approvals(self) -> list: return await self._o.search_read( 'hr.expense.sheet', [('state', '=', 'submit')], ['name', 'employee_id', 'total_amount', 'date'], limit=100, ) async def approve_expense_sheet(self, sheet_id: int) -> bool: try: await self._o.call('hr.expense.sheet', 'approve_expense_sheets', [[sheet_id]]) logger.info('Approved expense sheet %s', sheet_id) return True except Exception as exc: logger.warning('approve_expense_sheet failed %s: %s', sheet_id, exc) return False async def get_expenses_summary(self, date_from: str = None, date_to: str = None) -> dict: domain = [('state', 'not in', ['refused'])] if date_from: domain.append(('date', '>=', date_from)) if date_to: domain.append(('date', '<=', date_to)) expenses = await self._o.search_read('hr.expense', domain, ['total_amount', 'employee_id', 'product_id'], limit=1000) total = sum(e.get('total_amount', 0) for e in expenses) pending_sheets = await self.get_pending_approvals() return { 'total_expenses': len(expenses), 'total_amount': total, 'pending_approval_count': len(pending_sheets), 'pending_amount': sum(s.get('total_amount', 0) for s in pending_sheets), } async def get_expense_by_employee(self, employee_id: int, limit: int = 20) -> list: return await self._o.search_read( 'hr.expense', [('employee_id', '=', employee_id)], ['name', 'total_amount', 'date', 'state', 'product_id'], limit=limit, ) async def flag_for_review(self, model: str, record_id: int, reason: str, severity: str = 'medium') -> bool: msg = f'[AI FLAG - {severity.upper()}] {reason}' await self._o.call(model, 'message_post', [[record_id]], {'body': msg, 'message_type': 'comment'}) return True async def post_chatter_note(self, model: str, record_id: int, note: str) -> bool: await self._o.call(model, 'message_post', [[record_id]], {'body': note, 'message_type': 'comment'}) return True async def get_employee_id_for_user(self, user_id) -> int | None: if not user_id: return None try: records = await self._o.search_read( 'hr.employee', [('user_id', '=', int(user_id))], ['id', 'name'], limit=1) return records[0]['id'] if records else None except Exception as exc: logger.warning('get_employee_id_for_user failed user_id=%s: %s', user_id, exc) return None async def get_default_expense_product(self) -> int | None: try: records = await self._o.search_read( 'product.product', [('can_be_expensed', '=', True), ('type', '=', 'service')], ['id', 'name'], limit=1) return records[0]['id'] if records else None except Exception as exc: logger.warning('get_default_expense_product failed: %s', exc) return None async def get_expense_products(self) -> list: """Return all expensable products for category selection.""" try: return await self._o.search_read( 'product.product', [('can_be_expensed', '=', True)], ['id', 'name'], limit=100) except Exception as exc: logger.warning('get_expense_products failed: %s', exc) return [] async def create_expense_sheet(self, name: str, employee_id: int): return await self._o.create('hr.expense.sheet', { 'name': name, 'employee_id': employee_id, }) async def create_expense(self, sheet_id: int, employee_id: int, name: str, total_amount: float, date: str, product_id: int = None, description: str = ''): vals: dict = { 'name': name, 'employee_id': employee_id, 'sheet_id': sheet_id, 'total_amount': total_amount, 'quantity': 1.0, } if date: vals['date'] = date if product_id: vals['product_id'] = product_id return await self._o.create('hr.expense', vals) async def attach_receipt(self, model: str, record_id: int, filename: str, file_b64: str, mimetype: str) -> bool: try: await self._o.create('ir.attachment', { 'name': filename, 'datas': file_b64, 'res_model': model, 'res_id': record_id, 'mimetype': mimetype, }) return True except Exception as exc: logger.warning('attach_receipt failed %s/%s: %s', model, record_id, exc) return False