Files
odoo-ai/agent_service/tools/crm_tools.py
ActiveBlue Build fe47f950e4 feat(agents): add 7 specialist agents with tools and system prompts
Agents (all following 6-step contract: _plan/_gather/_reason/_act/_report):
- AccountingAgent: trial balance, chart of accounts, tax summary (HIPAA-locked)
- CrmAgent: pipeline summary, lead/opportunity management, won/lost analysis
- SalesAgent: sales orders, quotations, revenue by rep, expired quote detection
- ProjectAgent: task tracking, blocked/overdue detection, timesheet logging
- ElearningAgent: course completion, low-engagement flagging, next-course suggestion
- ExpensesAgent: expense sheets, pending approvals, policy violations (HIPAA-locked)
- EmployeesAgent: headcount, contracts, leaves, attendance, expired contract sweep (HIPAA-locked)

Tools (one file per domain):
- accounting_tools.py, crm_tools.py, sales_tools.py, project_tools.py
- elearning_tools.py, expenses_tools.py, employees_tools.py

System prompts: each agent has a domain-specific system.txt with rules and output format

All agents implement handle_peer_request() and sweep() for proactive monitoring
HIPAA-locked agents (accounting, expenses, employees) enforced via LLMRouter

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-12 18:04:32 -04:00

105 lines
4.9 KiB
Python

from __future__ import annotations
import logging
from ..tools.odoo_client import OdooClient
logger = logging.getLogger(__name__)
class CrmTools:
def __init__(self, odoo: OdooClient):
self._o = odoo
async def get_leads(self, stage_id: int = None, user_id: int = None,
limit: int = 50, active: bool = True) -> list:
domain = [('type', '=', 'lead'), ('active', '=', active)]
if stage_id:
domain.append(('stage_id', '=', stage_id))
if user_id:
domain.append(('user_id', '=', user_id))
fields = ['name', 'partner_id', 'stage_id', 'user_id', 'expected_revenue',
'probability', 'date_deadline', 'priority']
return await self._o.search_read('crm.lead', domain, fields, limit=limit)
async def get_opportunities(self, stage_id: int = None, user_id: int = None,
limit: int = 50, active: bool = True) -> list:
domain = [('type', '=', 'opportunity'), ('active', '=', active)]
if stage_id:
domain.append(('stage_id', '=', stage_id))
if user_id:
domain.append(('user_id', '=', user_id))
fields = ['name', 'partner_id', 'stage_id', 'user_id', 'expected_revenue',
'probability', 'date_deadline', 'priority', 'date_closed']
return await self._o.search_read('crm.lead', domain, fields, limit=limit)
async def get_pipeline_summary(self) -> dict:
stages = await self._o.search_read('crm.stage', [], ['name', 'sequence'], limit=20)
opportunities = await self._o.search_read(
'crm.lead',
[('type', '=', 'opportunity'), ('active', '=', True)],
['stage_id', 'expected_revenue', 'probability'],
limit=500,
)
by_stage: dict = {}
for opp in opportunities:
sid = opp['stage_id'][0] if isinstance(opp['stage_id'], list) else opp['stage_id']
sname = opp['stage_id'][1] if isinstance(opp['stage_id'], list) else str(sid)
if sid not in by_stage:
by_stage[sid] = {'stage': sname, 'count': 0, 'total_revenue': 0.0, 'weighted': 0.0}
by_stage[sid]['count'] += 1
rev = opp.get('expected_revenue', 0)
prob = opp.get('probability', 0) / 100
by_stage[sid]['total_revenue'] += rev
by_stage[sid]['weighted'] += rev * prob
return {
'stages': list(by_stage.values()),
'total_opportunities': len(opportunities),
'total_pipeline': sum(o.get('expected_revenue', 0) for o in opportunities),
'weighted_pipeline': sum(
o.get('expected_revenue', 0) * o.get('probability', 0) / 100 for o in opportunities
),
}
async def update_lead_stage(self, lead_id: int, stage_id: int) -> bool:
result = await self._o.write('crm.lead', [lead_id], {'stage_id': stage_id})
return result.success
async def assign_lead(self, lead_id: int, user_id: int) -> bool:
result = await self._o.write('crm.lead', [lead_id], {'user_id': user_id})
return result.success
async def log_activity(self, lead_id: int, activity_type: str, note: str,
date_deadline: str = None) -> bool:
type_records = await self._o.search_read(
'mail.activity.type', [('name', 'ilike', activity_type)], ['id'], limit=1,
)
type_id = type_records[0]['id'] if type_records else False
vals = {'note': note, 'res_model': 'crm.lead', 'res_id': lead_id}
if type_id:
vals['activity_type_id'] = type_id
if date_deadline:
vals['date_deadline'] = date_deadline
await self._o.call('mail.activity', 'create', [vals])
return True
async def get_won_lost_analysis(self, date_from: str = None, date_to: str = None) -> dict:
domain_won = [('type', '=', 'opportunity'), ('probability', '=', 100)]
domain_lost = [('type', '=', 'opportunity'), ('active', '=', False)]
if date_from:
domain_won.append(('date_closed', '>=', date_from))
domain_lost.append(('date_closed', '>=', date_from))
if date_to:
domain_won.append(('date_closed', '<=', date_to))
domain_lost.append(('date_closed', '<=', date_to))
won = await self._o.search_read('crm.lead', domain_won, ['expected_revenue'], limit=500)
lost = await self._o.search_read('crm.lead', domain_lost, ['expected_revenue'], limit=500)
return {
'won_count': len(won),
'won_revenue': sum(o.get('expected_revenue', 0) for o in won),
'lost_count': len(lost),
'lost_revenue': sum(o.get('expected_revenue', 0) for o in lost),
}
async def post_chatter_note(self, model: str, record_id: int, note: str) -> bool:
await self._o.call(model, 'message_post', [[record_id]], {'body': note, 'message_type': 'comment'})
return True