from __future__ import annotations import logging from .base_agent import BaseAgent, AgentReport, AgentDirective, SweepReport from ..tools.project_tools import ProjectTools logger = logging.getLogger(__name__) PROJECT_TOOLS = [ {'name': 'get_projects', 'description': 'List projects', 'parameters': {'active': {'type': 'boolean', 'optional': True}, 'limit': {'type': 'integer', 'optional': True}}}, {'name': 'get_tasks', 'description': 'Get tasks with filters', 'parameters': {'project_id': {'type': 'integer', 'optional': True}, 'stage_id': {'type': 'integer', 'optional': True}, 'user_id': {'type': 'integer', 'optional': True}, 'limit': {'type': 'integer', 'optional': True}}}, {'name': 'get_project_summary', 'description': 'Get summary for a specific project', 'parameters': {'project_id': {'type': 'integer'}}}, {'name': 'update_task_stage', 'description': 'Move task to a new stage', 'parameters': {'task_id': {'type': 'integer'}, 'stage_id': {'type': 'integer'}}}, {'name': 'assign_task', 'description': 'Assign task to a user', 'parameters': {'task_id': {'type': 'integer'}, 'user_id': {'type': 'integer'}}}, {'name': 'create_task', 'description': 'Create a new task in a project', 'parameters': {'project_id': {'type': 'integer'}, 'name': {'type': 'string'}, 'description': {'type': 'string', 'optional': True}, 'user_id': {'type': 'integer', 'optional': True}, 'date_deadline': {'type': 'string', 'optional': True}}}, {'name': 'log_timesheet', 'description': 'Log timesheet hours on a task', 'parameters': {'task_id': {'type': 'integer'}, 'employee_id': {'type': 'integer'}, 'hours': {'type': 'number'}, 'description': {'type': 'string', 'optional': True}, 'date': {'type': 'string', 'optional': True}}}, {'name': 'post_chatter_note', 'description': 'Post a note on a record', 'parameters': {'model': {'type': 'string'}, 'record_id': {'type': 'integer'}, 'note': {'type': 'string'}}}, ] class ProjectAgent(BaseAgent): name = 'project_agent' domain = 'project' required_odoo_module = 'project' system_prompt_file = 'project_system.txt' tools = PROJECT_TOOLS def __init__(self, odoo, llm, peer_bus=None): super().__init__(odoo, llm, peer_bus) self._pt = ProjectTools(odoo) self._gathered_data = {} self._actions_taken = [] self._escalations_list = [] async def _plan(self, directive: AgentDirective) -> dict: intent = (directive.intent or '').lower() return { 'fetch_projects': any(k in intent for k in ('project', 'overview')), 'fetch_tasks': 'task' in intent, 'project_id': directive.context.get('project_id'), 'user_id': directive.context.get('user_id'), } async def _gather(self, ctx: dict) -> dict: plan = ctx.get('plan', {}) data: dict = {} if plan.get('fetch_projects') or not plan.get('fetch_tasks'): data['projects'] = await self._pt.get_projects(limit=20) if plan.get('fetch_tasks') or plan.get('project_id'): data['tasks'] = await self._pt.get_tasks( project_id=plan.get('project_id'), user_id=plan.get('user_id'), limit=50, ) self._gathered_data = data return data async def _reason(self, ctx: dict) -> dict: data = self._gathered_data analysis: dict = {'escalations': [], 'blocked_tasks': []} tasks = data.get('tasks', []) blocked = [t for t in tasks if t.get('kanban_state') == 'blocked'] analysis['blocked_tasks'] = blocked if len(blocked) > 5: analysis['escalations'].append(f'{len(blocked)} tasks are blocked.') self._escalations_list = analysis['escalations'] return analysis async def _act(self, ctx: dict) -> list: return [] async def _report(self, ctx: dict) -> AgentReport: data = self._gathered_data analysis = ctx.get('analysis', {}) parts = [] projects = data.get('projects', []) if projects: parts.append(f'Projects: {len(projects)} active.') tasks = data.get('tasks', []) if tasks: blocked = len(analysis.get('blocked_tasks', [])) parts.append(f'Tasks: {len(tasks)} total, {blocked} blocked.') if not parts: parts.append('Project review complete.') return AgentReport(agent=self.name, summary=chr(10).join(parts), data=data, escalations=self._escalations_list, actions_taken=[]) async def _dispatch_tool(self, name: str, args: dict): dispatch = { 'get_projects': self._pt.get_projects, 'get_tasks': self._pt.get_tasks, 'get_project_summary': self._pt.get_project_summary, 'update_task_stage': self._pt.update_task_stage, 'assign_task': self._pt.assign_task, 'create_task': self._pt.create_task, 'log_timesheet': self._pt.log_timesheet, 'post_chatter_note': self._pt.post_chatter_note, } if name not in dispatch: raise ValueError(f'Unknown tool: {name}') return await dispatch[name](**args) async def handle_peer_request(self, request: dict) -> dict: req_type = request.get('type', '') try: if req_type == 'project_list': return {'projects': await self._pt.get_projects()} if req_type == 'task_count': tasks = await self._pt.get_tasks(project_id=request.get('project_id')) return {'count': len(tasks)} return {'error': f'Unknown type: {req_type}'} except Exception as exc: return {'error': str(exc)} async def sweep(self) -> SweepReport: findings = [] try: tasks = await self._pt.get_tasks(limit=200) import datetime today = str(datetime.date.today()) for t in tasks: if t.get('kanban_state') == 'blocked': findings.append({'type': 'blocked_task', 'task_id': t.get('id'), 'name': t.get('name', ''), 'severity': 'medium'}) elif t.get('date_deadline') and t['date_deadline'] < today: findings.append({'type': 'overdue_task', 'task_id': t.get('id'), 'name': t.get('name', ''), 'severity': 'low'}) except Exception as exc: return SweepReport(agent=self.name, findings=[], actions=[], error=str(exc)) return SweepReport(agent=self.name, findings=findings, actions=[], summary=f'Project sweep: {len(findings)} issues found.')