feat(agents): add 7 specialist agents with tools and system prompts

Agents (all following 6-step contract: _plan/_gather/_reason/_act/_report):
- AccountingAgent: trial balance, chart of accounts, tax summary (HIPAA-locked)
- CrmAgent: pipeline summary, lead/opportunity management, won/lost analysis
- SalesAgent: sales orders, quotations, revenue by rep, expired quote detection
- ProjectAgent: task tracking, blocked/overdue detection, timesheet logging
- ElearningAgent: course completion, low-engagement flagging, next-course suggestion
- ExpensesAgent: expense sheets, pending approvals, policy violations (HIPAA-locked)
- EmployeesAgent: headcount, contracts, leaves, attendance, expired contract sweep (HIPAA-locked)

Tools (one file per domain):
- accounting_tools.py, crm_tools.py, sales_tools.py, project_tools.py
- elearning_tools.py, expenses_tools.py, employees_tools.py

System prompts: each agent has a domain-specific system.txt with rules and output format

All agents implement handle_peer_request() and sweep() for proactive monitoring
HIPAA-locked agents (accounting, expenses, employees) enforced via LLMRouter

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
ActiveBlue Build
2026-04-12 18:04:32 -04:00
parent 29409ed71d
commit fe47f950e4
21 changed files with 1761 additions and 0 deletions

View File

@@ -0,0 +1,86 @@
from __future__ import annotations
import logging
from datetime import date, timedelta
from ..tools.odoo_client import OdooClient
logger = logging.getLogger(__name__)
class AccountingTools:
def __init__(self, odoo: OdooClient):
self._o = odoo
async def get_journal_entries(self, journal_id: int = None, date_from: str = None,
date_to: str = None, state: str = 'posted', limit: int = 50) -> list:
domain = [('move_type', '=', 'entry'), ('state', '=', state)]
if journal_id:
domain.append(('journal_id', '=', journal_id))
if date_from:
domain.append(('date', '>=', date_from))
if date_to:
domain.append(('date', '<=', date_to))
fields = ['name', 'date', 'journal_id', 'ref', 'state', 'amount_total', 'line_ids']
return await self._o.search_read('account.move', domain, fields, limit=limit)
async def get_chart_of_accounts(self, account_type: str = None, limit: int = 100) -> list:
domain = [('deprecated', '=', False)]
if account_type:
domain.append(('account_type', '=', account_type))
fields = ['code', 'name', 'account_type', 'balance', 'currency_id']
return await self._o.search_read('account.account', domain, fields, limit=limit)
async def get_account_balance(self, account_id: int) -> dict:
records = await self._o.search_read(
'account.account', [('id', '=', account_id)],
['code', 'name', 'balance', 'account_type'], limit=1,
)
return records[0] if records else {}
async def get_trial_balance(self, date_from: str = None, date_to: str = None) -> list:
today = date.today()
df = date_from or today.replace(day=1).isoformat()
dt = date_to or today.isoformat()
domain = [
('move_id.state', '=', 'posted'),
('date', '>=', df),
('date', '<=', dt),
]
fields = ['account_id', 'debit', 'credit', 'balance']
lines = await self._o.search_read('account.move.line', domain, fields, limit=500)
by_account: dict = {}
for line in lines:
aid = line['account_id'][0] if isinstance(line['account_id'], list) else line['account_id']
aname = line['account_id'][1] if isinstance(line['account_id'], list) else str(aid)
if aid not in by_account:
by_account[aid] = {'account_id': aid, 'account_name': aname, 'debit': 0.0, 'credit': 0.0}
by_account[aid]['debit'] += line.get('debit', 0.0)
by_account[aid]['credit'] += line.get('credit', 0.0)
result = list(by_account.values())
for r in result:
r['balance'] = r['debit'] - r['credit']
return result
async def get_tax_summary(self, date_from: str = None, date_to: str = None) -> dict:
today = date.today()
df = date_from or today.replace(day=1).isoformat()
dt = date_to or today.isoformat()
domain = [
('move_id.state', '=', 'posted'),
('date', '>=', df),
('date', '<=', dt),
('tax_ids', '!=', False),
]
fields = ['tax_ids', 'debit', 'credit', 'balance']
lines = await self._o.search_read('account.move.line', domain, fields, limit=500)
total_tax = sum(abs(line.get('balance', 0)) for line in lines)
return {'period_from': df, 'period_to': dt, 'total_tax_lines': len(lines), 'total_tax_amount': total_tax}
async def flag_for_review(self, model: str, record_id: int, reason: str, severity: str = 'medium') -> bool:
note = f'[AI FLAG - {severity.upper()}] {reason}'
await self._o.call(model, 'message_post', [[record_id]], {'body': note, 'message_type': 'comment'})
logger.info('Flagged %s:%s (%s) for review', model, record_id, severity)
return True
async def post_chatter_note(self, model: str, record_id: int, note: str) -> bool:
await self._o.call(model, 'message_post', [[record_id]], {'body': note, 'message_type': 'comment'})
return True

View File

@@ -0,0 +1,104 @@
from __future__ import annotations
import logging
from ..tools.odoo_client import OdooClient
logger = logging.getLogger(__name__)
class CrmTools:
def __init__(self, odoo: OdooClient):
self._o = odoo
async def get_leads(self, stage_id: int = None, user_id: int = None,
limit: int = 50, active: bool = True) -> list:
domain = [('type', '=', 'lead'), ('active', '=', active)]
if stage_id:
domain.append(('stage_id', '=', stage_id))
if user_id:
domain.append(('user_id', '=', user_id))
fields = ['name', 'partner_id', 'stage_id', 'user_id', 'expected_revenue',
'probability', 'date_deadline', 'priority']
return await self._o.search_read('crm.lead', domain, fields, limit=limit)
async def get_opportunities(self, stage_id: int = None, user_id: int = None,
limit: int = 50, active: bool = True) -> list:
domain = [('type', '=', 'opportunity'), ('active', '=', active)]
if stage_id:
domain.append(('stage_id', '=', stage_id))
if user_id:
domain.append(('user_id', '=', user_id))
fields = ['name', 'partner_id', 'stage_id', 'user_id', 'expected_revenue',
'probability', 'date_deadline', 'priority', 'date_closed']
return await self._o.search_read('crm.lead', domain, fields, limit=limit)
async def get_pipeline_summary(self) -> dict:
stages = await self._o.search_read('crm.stage', [], ['name', 'sequence'], limit=20)
opportunities = await self._o.search_read(
'crm.lead',
[('type', '=', 'opportunity'), ('active', '=', True)],
['stage_id', 'expected_revenue', 'probability'],
limit=500,
)
by_stage: dict = {}
for opp in opportunities:
sid = opp['stage_id'][0] if isinstance(opp['stage_id'], list) else opp['stage_id']
sname = opp['stage_id'][1] if isinstance(opp['stage_id'], list) else str(sid)
if sid not in by_stage:
by_stage[sid] = {'stage': sname, 'count': 0, 'total_revenue': 0.0, 'weighted': 0.0}
by_stage[sid]['count'] += 1
rev = opp.get('expected_revenue', 0)
prob = opp.get('probability', 0) / 100
by_stage[sid]['total_revenue'] += rev
by_stage[sid]['weighted'] += rev * prob
return {
'stages': list(by_stage.values()),
'total_opportunities': len(opportunities),
'total_pipeline': sum(o.get('expected_revenue', 0) for o in opportunities),
'weighted_pipeline': sum(
o.get('expected_revenue', 0) * o.get('probability', 0) / 100 for o in opportunities
),
}
async def update_lead_stage(self, lead_id: int, stage_id: int) -> bool:
result = await self._o.write('crm.lead', [lead_id], {'stage_id': stage_id})
return result.success
async def assign_lead(self, lead_id: int, user_id: int) -> bool:
result = await self._o.write('crm.lead', [lead_id], {'user_id': user_id})
return result.success
async def log_activity(self, lead_id: int, activity_type: str, note: str,
date_deadline: str = None) -> bool:
type_records = await self._o.search_read(
'mail.activity.type', [('name', 'ilike', activity_type)], ['id'], limit=1,
)
type_id = type_records[0]['id'] if type_records else False
vals = {'note': note, 'res_model': 'crm.lead', 'res_id': lead_id}
if type_id:
vals['activity_type_id'] = type_id
if date_deadline:
vals['date_deadline'] = date_deadline
await self._o.call('mail.activity', 'create', [vals])
return True
async def get_won_lost_analysis(self, date_from: str = None, date_to: str = None) -> dict:
domain_won = [('type', '=', 'opportunity'), ('probability', '=', 100)]
domain_lost = [('type', '=', 'opportunity'), ('active', '=', False)]
if date_from:
domain_won.append(('date_closed', '>=', date_from))
domain_lost.append(('date_closed', '>=', date_from))
if date_to:
domain_won.append(('date_closed', '<=', date_to))
domain_lost.append(('date_closed', '<=', date_to))
won = await self._o.search_read('crm.lead', domain_won, ['expected_revenue'], limit=500)
lost = await self._o.search_read('crm.lead', domain_lost, ['expected_revenue'], limit=500)
return {
'won_count': len(won),
'won_revenue': sum(o.get('expected_revenue', 0) for o in won),
'lost_count': len(lost),
'lost_revenue': sum(o.get('expected_revenue', 0) for o in lost),
}
async def post_chatter_note(self, model: str, record_id: int, note: str) -> bool:
await self._o.call(model, 'message_post', [[record_id]], {'body': note, 'message_type': 'comment'})
return True

View File

@@ -0,0 +1,87 @@
from __future__ import annotations
import logging
from ..tools.odoo_client import OdooClient
logger = logging.getLogger(__name__)
class ElearningTools:
def __init__(self, odoo: OdooClient):
self._o = odoo
async def get_courses(self, active: bool = True, limit: int = 50) -> list:
domain = [('active', '=', active)]
fields = ['name', 'description_short', 'website_published', 'total_slides',
'total_time', 'members_count', 'completion_rate', 'tag_ids']
return await self._o.search_read('slide.channel', domain, fields, limit=limit)
async def get_course_stats(self, channel_id: int) -> dict:
channels = await self._o.search_read(
'slide.channel', [('id', '=', channel_id)],
['name', 'total_slides', 'members_count', 'completion_rate', 'total_time'],
limit=1,
)
if not channels:
return {}
ch = channels[0]
slides = await self._o.search_read(
'slide.slide', [('channel_id', '=', channel_id), ('active', '=', True)],
['name', 'slide_type', 'completion_rate', 'likes', 'dislikes', 'view_count'],
limit=200,
)
return {
'channel': ch,
'slide_count': len(slides),
'avg_slide_completion': sum(s.get('completion_rate', 0) for s in slides) / max(len(slides), 1),
'total_views': sum(s.get('view_count', 0) for s in slides),
}
async def get_enrolled_users(self, channel_id: int, limit: int = 100) -> list:
domain = [('channel_id', '=', channel_id)]
fields = ['partner_id', 'completion', 'last_activity_date', 'channel_completion']
return await self._o.search_read('slide.channel.partner', domain, fields, limit=limit)
async def get_slide_completion(self, channel_id: int, min_completion: float = 0.0) -> list:
partners = await self._o.search_read(
'slide.channel.partner',
[('channel_id', '=', channel_id), ('channel_completion', '>=', min_completion)],
['partner_id', 'channel_completion', 'last_activity_date'],
limit=200,
)
return partners
async def get_learning_summary(self) -> dict:
channels = await self._o.search_read(
'slide.channel', [('active', '=', True), ('website_published', '=', True)],
['name', 'members_count', 'completion_rate'],
limit=50,
)
low_completion = [c for c in channels if c.get('completion_rate', 100) < 30]
return {
'total_courses': len(channels),
'total_enrollments': sum(c.get('members_count', 0) for c in channels),
'avg_completion': sum(c.get('completion_rate', 0) for c in channels) / max(len(channels), 1),
'low_completion_courses': low_completion,
}
async def flag_low_completion(self, channel_id: int, reason: str) -> bool:
msg = f'[AI FLAG] {reason}'
await self._o.call('slide.channel', 'message_post', [[channel_id]], {'body': msg, 'message_type': 'comment'})
return True
async def suggest_next_course(self, partner_id: int) -> list:
completed = await self._o.search_read(
'slide.channel.partner',
[('partner_id', '=', partner_id), ('channel_completion', '>=', 90)],
['channel_id'],
limit=50,
)
completed_ids = [c['channel_id'][0] if isinstance(c['channel_id'], list) else c['channel_id'] for c in completed]
domain = [('active', '=', True), ('website_published', '=', True)]
if completed_ids:
domain.append(('id', 'not in', completed_ids))
return await self._o.search_read('slide.channel', domain, ['name', 'total_slides', 'completion_rate'], limit=5)
async def post_chatter_note(self, model: str, record_id: int, note: str) -> bool:
await self._o.call(model, 'message_post', [[record_id]], {'body': note, 'message_type': 'comment'})
return True

View File

@@ -0,0 +1,91 @@
from __future__ import annotations
import logging
from ..tools.odoo_client import OdooClient
logger = logging.getLogger(__name__)
class EmployeesTools:
def __init__(self, odoo: OdooClient):
self._o = odoo
async def get_employees(self, department_id: int = None, active: bool = True,
limit: int = 100) -> list:
domain = [('active', '=', active)]
if department_id:
domain.append(('department_id', '=', department_id))
fields = ['name', 'department_id', 'job_id', 'job_title', 'work_email',
'coach_id', 'parent_id', 'employee_type']
return await self._o.search_read('hr.employee', domain, fields, limit=limit)
async def get_employee_profile(self, employee_id: int) -> dict:
employees = await self._o.search_read(
'hr.employee', [('id', '=', employee_id)],
['name', 'department_id', 'job_id', 'job_title', 'work_email',
'coach_id', 'parent_id', 'employee_type', 'study_field', 'study_school'],
limit=1,
)
return employees[0] if employees else {}
async def get_leaves(self, employee_id: int = None, state: str = None,
date_from: str = None, limit: int = 50) -> list:
domain = []
if employee_id:
domain.append(('employee_id', '=', employee_id))
if state:
domain.append(('state', '=', state))
if date_from:
domain.append(('date_from', '>=', date_from))
fields = ['name', 'employee_id', 'holiday_status_id', 'date_from',
'date_to', 'number_of_days', 'state']
return await self._o.search_read('hr.leave', domain, fields, limit=limit)
async def get_contracts(self, employee_id: int = None, state: str = 'open',
limit: int = 50) -> list:
domain = [('state', '=', state)]
if employee_id:
domain.append(('employee_id', '=', employee_id))
fields = ['name', 'employee_id', 'wage', 'date_start', 'date_end',
'state', 'structure_type_id']
return await self._o.search_read('hr.contract', domain, fields, limit=limit)
async def get_attendance_summary(self, employee_id: int, date_from: str,
date_to: str) -> dict:
domain = [
('employee_id', '=', employee_id),
('check_in', '>=', date_from),
('check_in', '<=', date_to),
]
records = await self._o.search_read('hr.attendance', domain, ['worked_hours', 'check_in'], limit=200)
total_hours = sum(r.get('worked_hours', 0) for r in records)
days_present = len(set(r['check_in'][:10] for r in records if r.get('check_in')))
return {
'employee_id': employee_id,
'period_from': date_from,
'period_to': date_to,
'total_hours': round(total_hours, 2),
'days_present': days_present,
'attendance_records': len(records),
}
async def get_department_summary(self, department_id: int) -> dict:
employees = await self.get_employees(department_id=department_id)
active_contracts = await self.get_contracts(state='open')
dept_contracts = [c for c in active_contracts
if any(e['id'] == (c['employee_id'][0] if isinstance(c['employee_id'], list) else c['employee_id'])
for e in employees)]
return {
'department_id': department_id,
'headcount': len(employees),
'active_contracts': len(dept_contracts),
'avg_wage': sum(c.get('wage', 0) for c in dept_contracts) / max(len(dept_contracts), 1),
}
async def flag_for_review(self, model: str, record_id: int, reason: str, severity: str = 'medium') -> bool:
msg = f'[AI FLAG - {severity.upper()}] {reason}'
await self._o.call(model, 'message_post', [[record_id]], {'body': msg, 'message_type': 'comment'})
return True
async def post_chatter_note(self, model: str, record_id: int, note: str) -> bool:
await self._o.call(model, 'message_post', [[record_id]], {'body': note, 'message_type': 'comment'})
return True

View File

@@ -0,0 +1,86 @@
from __future__ import annotations
import logging
from ..tools.odoo_client import OdooClient
logger = logging.getLogger(__name__)
class ExpensesTools:
def __init__(self, odoo: OdooClient):
self._o = odoo
async def get_expenses(self, employee_id: int = None, state: str = None,
date_from: str = None, date_to: str = None, limit: int = 50) -> list:
domain = []
if employee_id:
domain.append(('employee_id', '=', employee_id))
if state:
domain.append(('state', '=', state))
if date_from:
domain.append(('date', '>=', date_from))
if date_to:
domain.append(('date', '<=', date_to))
fields = ['name', 'employee_id', 'product_id', 'total_amount', 'date',
'state', 'sheet_id', 'description']
return await self._o.search_read('hr.expense', domain, fields, limit=limit)
async def get_expense_sheets(self, state: str = None, employee_id: int = None,
limit: int = 50) -> list:
domain = []
if state:
domain.append(('state', '=', state))
if employee_id:
domain.append(('employee_id', '=', employee_id))
fields = ['name', 'employee_id', 'state', 'total_amount', 'date',
'accounting_date', 'journal_id']
return await self._o.search_read('hr.expense.sheet', domain, fields, limit=limit)
async def get_pending_approvals(self) -> list:
return await self._o.search_read(
'hr.expense.sheet',
[('state', '=', 'submit')],
['name', 'employee_id', 'total_amount', 'date'],
limit=100,
)
async def approve_expense_sheet(self, sheet_id: int) -> bool:
try:
await self._o.call('hr.expense.sheet', 'approve_expense_sheets', [[sheet_id]])
logger.info('Approved expense sheet %s', sheet_id)
return True
except Exception as exc:
logger.warning('approve_expense_sheet failed %s: %s', sheet_id, exc)
return False
async def get_expenses_summary(self, date_from: str = None, date_to: str = None) -> dict:
domain = [('state', 'not in', ['refused'])]
if date_from:
domain.append(('date', '>=', date_from))
if date_to:
domain.append(('date', '<=', date_to))
expenses = await self._o.search_read('hr.expense', domain, ['total_amount', 'employee_id', 'product_id'], limit=1000)
total = sum(e.get('total_amount', 0) for e in expenses)
pending_sheets = await self.get_pending_approvals()
return {
'total_expenses': len(expenses),
'total_amount': total,
'pending_approval_count': len(pending_sheets),
'pending_amount': sum(s.get('total_amount', 0) for s in pending_sheets),
}
async def get_expense_by_employee(self, employee_id: int, limit: int = 20) -> list:
return await self._o.search_read(
'hr.expense',
[('employee_id', '=', employee_id)],
['name', 'total_amount', 'date', 'state', 'product_id'],
limit=limit,
)
async def flag_for_review(self, model: str, record_id: int, reason: str, severity: str = 'medium') -> bool:
msg = f'[AI FLAG - {severity.upper()}] {reason}'
await self._o.call(model, 'message_post', [[record_id]], {'body': msg, 'message_type': 'comment'})
return True
async def post_chatter_note(self, model: str, record_id: int, note: str) -> bool:
await self._o.call(model, 'message_post', [[record_id]], {'body': note, 'message_type': 'comment'})
return True

View File

@@ -0,0 +1,83 @@
from __future__ import annotations
import logging
from ..tools.odoo_client import OdooClient
logger = logging.getLogger(__name__)
class ProjectTools:
def __init__(self, odoo: OdooClient):
self._o = odoo
async def get_projects(self, active: bool = True, limit: int = 50) -> list:
domain = [('active', '=', active)]
fields = ['name', 'partner_id', 'user_id', 'date_start', 'date',
'task_count', 'description', 'last_update_status']
return await self._o.search_read('project.project', domain, fields, limit=limit)
async def get_tasks(self, project_id: int = None, stage_id: int = None,
user_id: int = None, limit: int = 100) -> list:
domain = [('active', '=', True)]
if project_id:
domain.append(('project_id', '=', project_id))
if stage_id:
domain.append(('stage_id', '=', stage_id))
if user_id:
domain.append(('user_ids', 'in', [user_id]))
fields = ['name', 'project_id', 'stage_id', 'user_ids', 'date_deadline',
'priority', 'kanban_state', 'description', 'tag_ids']
return await self._o.search_read('project.task', domain, fields, limit=limit)
async def get_project_summary(self, project_id: int) -> dict:
tasks = await self._o.search_read(
'project.task', [('project_id', '=', project_id), ('active', '=', True)],
['stage_id', 'kanban_state', 'date_deadline', 'user_ids'],
limit=500,
)
total = len(tasks)
blocked = [t for t in tasks if t.get('kanban_state') == 'blocked']
overdue = [t for t in tasks if t.get('date_deadline') and t['date_deadline'] < str(__import__('datetime').date.today())]
return {
'project_id': project_id,
'total_tasks': total,
'blocked_tasks': len(blocked),
'overdue_tasks': len(overdue),
}
async def update_task_stage(self, task_id: int, stage_id: int) -> bool:
result = await self._o.write('project.task', [task_id], {'stage_id': stage_id})
return result.success
async def assign_task(self, task_id: int, user_id: int) -> bool:
result = await self._o.write('project.task', [task_id], {'user_ids': [(4, user_id)]})
return result.success
async def create_task(self, project_id: int, name: str, description: str = '',
user_id: int = None, date_deadline: str = None) -> int:
vals = {'project_id': project_id, 'name': name}
if description:
vals['description'] = description
if user_id:
vals['user_ids'] = [(4, user_id)]
if date_deadline:
vals['date_deadline'] = date_deadline
record_id = await self._o.call('project.task', 'create', [vals])
logger.info('Created task %s in project %s', record_id, project_id)
return record_id
async def log_timesheet(self, task_id: int, employee_id: int, hours: float,
description: str = '', date: str = None) -> int:
import datetime
vals = {
'task_id': task_id,
'employee_id': employee_id,
'unit_amount': hours,
'name': description or 'AI-logged timesheet',
'date': date or str(datetime.date.today()),
}
record_id = await self._o.call('account.analytic.line', 'create', [vals])
return record_id
async def post_chatter_note(self, model: str, record_id: int, note: str) -> bool:
await self._o.call(model, 'message_post', [[record_id]], {'body': note, 'message_type': 'comment'})
return True

View File

@@ -0,0 +1,81 @@
from __future__ import annotations
import logging
from ..tools.odoo_client import OdooClient
logger = logging.getLogger(__name__)
class SalesTools:
def __init__(self, odoo: OdooClient):
self._o = odoo
async def get_sales_orders(self, state: str = 'sale', partner_id: int = None,
date_from: str = None, date_to: str = None, limit: int = 50) -> list:
domain = [('state', '=', state)]
if partner_id:
domain.append(('partner_id', '=', partner_id))
if date_from:
domain.append(('date_order', '>=', date_from))
if date_to:
domain.append(('date_order', '<=', date_to))
fields = ['name', 'partner_id', 'state', 'date_order', 'amount_total',
'amount_untaxed', 'user_id', 'invoice_status']
return await self._o.search_read('sale.order', domain, fields, limit=limit)
async def get_quotations(self, partner_id: int = None, limit: int = 50) -> list:
domain = [('state', 'in', ['draft', 'sent'])]
if partner_id:
domain.append(('partner_id', '=', partner_id))
fields = ['name', 'partner_id', 'state', 'date_order', 'validity_date',
'amount_total', 'user_id']
return await self._o.search_read('sale.order', domain, fields, limit=limit)
async def get_sales_summary(self, date_from: str = None, date_to: str = None) -> dict:
domain = [('state', '=', 'sale')]
if date_from:
domain.append(('date_order', '>=', date_from))
if date_to:
domain.append(('date_order', '<=', date_to))
orders = await self._o.search_read('sale.order', domain, ['amount_total', 'user_id'], limit=1000)
total = sum(o.get('amount_total', 0) for o in orders)
by_rep: dict = {}
for o in orders:
uid = o['user_id'][0] if isinstance(o['user_id'], list) else 0
uname = o['user_id'][1] if isinstance(o['user_id'], list) else 'Unknown'
by_rep.setdefault(uid, {'name': uname, 'count': 0, 'total': 0.0})
by_rep[uid]['count'] += 1
by_rep[uid]['total'] += o.get('amount_total', 0)
return {
'order_count': len(orders),
'total_revenue': total,
'by_sales_rep': sorted(by_rep.values(), key=lambda x: x['total'], reverse=True)[:10],
}
async def get_customer_orders(self, partner_id: int, limit: int = 20) -> list:
return await self._o.search_read(
'sale.order', [('partner_id', '=', partner_id)],
['name', 'state', 'date_order', 'amount_total', 'invoice_status'],
limit=limit,
)
async def confirm_quotation(self, order_id: int) -> bool:
try:
await self._o.call('sale.order', 'action_confirm', [[order_id]])
logger.info('Confirmed quotation %s', order_id)
return True
except Exception as exc:
logger.warning('confirm_quotation failed %s: %s', order_id, exc)
return False
async def update_order_note(self, order_id: int, note: str) -> bool:
result = await self._o.write('sale.order', [order_id], {'note': note})
return result.success
async def flag_for_review(self, model: str, record_id: int, reason: str, severity: str = 'medium') -> bool:
msg = f'[AI FLAG - {severity.upper()}] {reason}'
await self._o.call(model, 'message_post', [[record_id]], {'body': msg, 'message_type': 'comment'})
return True
async def post_chatter_note(self, model: str, record_id: int, note: str) -> bool:
await self._o.call(model, 'message_post', [[record_id]], {'body': note, 'message_type': 'comment'})
return True