Agents (all following 6-step contract: _plan/_gather/_reason/_act/_report): - AccountingAgent: trial balance, chart of accounts, tax summary (HIPAA-locked) - CrmAgent: pipeline summary, lead/opportunity management, won/lost analysis - SalesAgent: sales orders, quotations, revenue by rep, expired quote detection - ProjectAgent: task tracking, blocked/overdue detection, timesheet logging - ElearningAgent: course completion, low-engagement flagging, next-course suggestion - ExpensesAgent: expense sheets, pending approvals, policy violations (HIPAA-locked) - EmployeesAgent: headcount, contracts, leaves, attendance, expired contract sweep (HIPAA-locked) Tools (one file per domain): - accounting_tools.py, crm_tools.py, sales_tools.py, project_tools.py - elearning_tools.py, expenses_tools.py, employees_tools.py System prompts: each agent has a domain-specific system.txt with rules and output format All agents implement handle_peer_request() and sweep() for proactive monitoring HIPAA-locked agents (accounting, expenses, employees) enforced via LLMRouter Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
88 lines
3.9 KiB
Python
88 lines
3.9 KiB
Python
from __future__ import annotations
|
|
import logging
|
|
from ..tools.odoo_client import OdooClient
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ElearningTools:
|
|
def __init__(self, odoo: OdooClient):
|
|
self._o = odoo
|
|
|
|
async def get_courses(self, active: bool = True, limit: int = 50) -> list:
|
|
domain = [('active', '=', active)]
|
|
fields = ['name', 'description_short', 'website_published', 'total_slides',
|
|
'total_time', 'members_count', 'completion_rate', 'tag_ids']
|
|
return await self._o.search_read('slide.channel', domain, fields, limit=limit)
|
|
|
|
async def get_course_stats(self, channel_id: int) -> dict:
|
|
channels = await self._o.search_read(
|
|
'slide.channel', [('id', '=', channel_id)],
|
|
['name', 'total_slides', 'members_count', 'completion_rate', 'total_time'],
|
|
limit=1,
|
|
)
|
|
if not channels:
|
|
return {}
|
|
ch = channels[0]
|
|
slides = await self._o.search_read(
|
|
'slide.slide', [('channel_id', '=', channel_id), ('active', '=', True)],
|
|
['name', 'slide_type', 'completion_rate', 'likes', 'dislikes', 'view_count'],
|
|
limit=200,
|
|
)
|
|
return {
|
|
'channel': ch,
|
|
'slide_count': len(slides),
|
|
'avg_slide_completion': sum(s.get('completion_rate', 0) for s in slides) / max(len(slides), 1),
|
|
'total_views': sum(s.get('view_count', 0) for s in slides),
|
|
}
|
|
|
|
async def get_enrolled_users(self, channel_id: int, limit: int = 100) -> list:
|
|
domain = [('channel_id', '=', channel_id)]
|
|
fields = ['partner_id', 'completion', 'last_activity_date', 'channel_completion']
|
|
return await self._o.search_read('slide.channel.partner', domain, fields, limit=limit)
|
|
|
|
async def get_slide_completion(self, channel_id: int, min_completion: float = 0.0) -> list:
|
|
partners = await self._o.search_read(
|
|
'slide.channel.partner',
|
|
[('channel_id', '=', channel_id), ('channel_completion', '>=', min_completion)],
|
|
['partner_id', 'channel_completion', 'last_activity_date'],
|
|
limit=200,
|
|
)
|
|
return partners
|
|
|
|
async def get_learning_summary(self) -> dict:
|
|
channels = await self._o.search_read(
|
|
'slide.channel', [('active', '=', True), ('website_published', '=', True)],
|
|
['name', 'members_count', 'completion_rate'],
|
|
limit=50,
|
|
)
|
|
low_completion = [c for c in channels if c.get('completion_rate', 100) < 30]
|
|
return {
|
|
'total_courses': len(channels),
|
|
'total_enrollments': sum(c.get('members_count', 0) for c in channels),
|
|
'avg_completion': sum(c.get('completion_rate', 0) for c in channels) / max(len(channels), 1),
|
|
'low_completion_courses': low_completion,
|
|
}
|
|
|
|
async def flag_low_completion(self, channel_id: int, reason: str) -> bool:
|
|
msg = f'[AI FLAG] {reason}'
|
|
await self._o.call('slide.channel', 'message_post', [[channel_id]], {'body': msg, 'message_type': 'comment'})
|
|
return True
|
|
|
|
async def suggest_next_course(self, partner_id: int) -> list:
|
|
completed = await self._o.search_read(
|
|
'slide.channel.partner',
|
|
[('partner_id', '=', partner_id), ('channel_completion', '>=', 90)],
|
|
['channel_id'],
|
|
limit=50,
|
|
)
|
|
completed_ids = [c['channel_id'][0] if isinstance(c['channel_id'], list) else c['channel_id'] for c in completed]
|
|
domain = [('active', '=', True), ('website_published', '=', True)]
|
|
if completed_ids:
|
|
domain.append(('id', 'not in', completed_ids))
|
|
return await self._o.search_read('slide.channel', domain, ['name', 'total_slides', 'completion_rate'], limit=5)
|
|
|
|
async def post_chatter_note(self, model: str, record_id: int, note: str) -> bool:
|
|
await self._o.call(model, 'message_post', [[record_id]], {'body': note, 'message_type': 'comment'})
|
|
return True
|