diff --git a/Dockerfile b/Dockerfile index a1e8000..983ddba 100644 --- a/Dockerfile +++ b/Dockerfile @@ -9,6 +9,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ gcc libpq-dev \ tesseract-ocr \ tesseract-ocr-osd \ + git \ && rm -rf /var/lib/apt/lists/* COPY requirements.txt . diff --git a/agent_service/agents/registry.py b/agent_service/agents/registry.py index 7f6c70a..809a822 100644 --- a/agent_service/agents/registry.py +++ b/agent_service/agents/registry.py @@ -16,6 +16,7 @@ _DEFAULT_CAPABILITIES = { 'elearning_agent': 'eLearning — courses, slides, attendees, certifications', 'employees_agent': 'HR — employees, contracts, attendance, leave requests', 'odoo_doc_agent': 'Odoo 18 documentation and workflow guidance (internal use)', + 'sysops_agent': 'System operations — Docker containers, git, logs, auto-healing', } diff --git a/agent_service/agents/sweep_coordinator.py b/agent_service/agents/sweep_coordinator.py index 5773c2f..5ce2e59 100644 --- a/agent_service/agents/sweep_coordinator.py +++ b/agent_service/agents/sweep_coordinator.py @@ -9,6 +9,7 @@ logger = logging.getLogger(__name__) ALL_AGENT_NAMES = [ 'finance_agent', 'accounting_agent', 'crm_agent', 'sales_agent', 'project_agent', 'elearning_agent', 'expenses_agent', 'employees_agent', + 'sysops_agent', ] diff --git a/agent_service/agents/sysops_agent.py b/agent_service/agents/sysops_agent.py new file mode 100644 index 0000000..665edd5 --- /dev/null +++ b/agent_service/agents/sysops_agent.py @@ -0,0 +1,247 @@ +from __future__ import annotations +import logging +from .base_agent import BaseAgent, AgentReport, SweepReport +from ..tools.sysops_tools import SysopsTools, ALLOWED_CONTAINERS, SYSTEM_CONTAINER_MAP, REPO_PATH + +logger = logging.getLogger(__name__) + +SYSOPS_TOOLS = [ + {'name': 'docker_ps', + 'description': 'List all Docker containers with their current status', + 'parameters': {}}, + {'name': 'docker_logs', + 'description': 'Retrieve recent log lines from a container', + 'parameters': {'container': {'type': 'string'}, + 'lines': {'type': 'integer', 'optional': True}}}, + {'name': 'docker_restart', + 'description': 'Restart a Docker container by name', + 'parameters': {'container': {'type': 'string'}}}, + {'name': 'git_pull', + 'description': 'Pull latest code from git remote (main branch)', + 'parameters': {'repo_path': {'type': 'string', 'optional': True}}}, + {'name': 'git_status', + 'description': 'Check git working tree status', + 'parameters': {'repo_path': {'type': 'string', 'optional': True}}}, + {'name': 'git_log', + 'description': 'Show recent git commits', + 'parameters': {'repo_path': {'type': 'string', 'optional': True}, + 'n': {'type': 'integer', 'optional': True}}}, +] + +# Maps task keywords to container names for on-demand restart requests +_CONTAINER_KEYWORDS = { + 'agent': 'activeblue-agent', + 'activeblue-agent': 'activeblue-agent', + 'agent-db': 'activeblue-agent-db', + 'activeblue-agent-db': 'activeblue-agent-db', + 'odoo': 'odoo-web-1', + 'odoo-web': 'odoo-web-1', + 'odoo-web-1': 'odoo-web-1', +} + + +class SysopsAgent(BaseAgent): + name = 'sysops_agent' + domain = 'infrastructure' + required_odoo_module = 'base' + system_prompt_file = 'sysops_system.txt' + tools = SYSOPS_TOOLS + auto_rag = False + + def __init__(self, odoo, llm, peer_bus=None): + super().__init__(odoo, llm, peer_bus) + self._st = SysopsTools(odoo=odoo) + self._current_plan: dict = {} + self._actions_taken: list[str] = [] + self._gathered_data: dict = {} + + # --- Tool bridge methods (available to _loop() if used) --- + + async def _tool_docker_ps(self) -> list: + return await self._st.docker_ps() + + async def _tool_docker_logs(self, container: str, lines: int = 50) -> str: + return await self._st.docker_logs(container, lines) + + async def _tool_docker_restart(self, container: str) -> str: + return await self._st.docker_restart(container) + + async def _tool_git_pull(self, repo_path: str = None) -> str: + return await self._st.git_pull(repo_path or REPO_PATH) + + async def _tool_git_status(self, repo_path: str = None) -> str: + return await self._st.git_status(repo_path or REPO_PATH) + + async def _tool_git_log(self, repo_path: str = None, n: int = 5) -> str: + return await self._st.git_log(repo_path or REPO_PATH, n) + + # --- BaseAgent lifecycle (on-demand chat requests) --- + + async def _plan(self) -> dict: + task = (self._directive.task if self._directive else '').lower() + plan = { + 'task': task, + 'wants_restart': any(k in task for k in ('restart', 'reboot')), + 'wants_logs': any(k in task for k in ('log', 'crash', 'error', 'fail', 'why', 'slow')), + 'wants_git': any(k in task for k in ('pull', 'git', 'update code', 'deploy', 'latest code')), + 'wants_status': True, + } + self._current_plan = plan + return plan + + async def _gather(self, plan: dict) -> dict: + data: dict = {} + data['containers'] = await self._st.docker_ps() + if plan.get('wants_git'): + data['git_status'] = await self._st.git_status() + data['git_log'] = await self._st.git_log() + if plan.get('wants_logs'): + target = self._resolve_container(plan['task']) or 'activeblue-agent' + data['logs_container'] = target + data['logs'] = await self._st.docker_logs(target, lines=80) + self._gathered_data = data + return data + + async def _reason(self) -> dict: + containers = self._gathered_data.get('containers', []) + unhealthy = [ + c for c in containers + if c.get('name') in ALLOWED_CONTAINERS + and ('exited' in c.get('status', '').lower() + or 'unhealthy' in c.get('status', '').lower()) + ] + return {'unhealthy': unhealthy} + + async def _act(self, reasoning: dict) -> list: + self._actions_taken = [] + plan = self._current_plan + task = plan.get('task', '') + + if plan.get('wants_restart'): + target = self._resolve_container(task) + if target: + result = await self._st.docker_restart(target) + self._actions_taken.append(result) + await self._st.notify_all_bot_channels( + f'[SysOps] {target} restarted on user request.') + + if plan.get('wants_git'): + result = await self._st.git_pull() + self._actions_taken.append(f'git pull: {result}') + if 'Already up to date' not in result: + await self._st.notify_all_bot_channels( + f'[SysOps] Pulled latest code:\n{result}') + + return self._actions_taken + + async def _report(self) -> AgentReport: + directive_id = self._directive.directive_id if self._directive else '' + containers = self._gathered_data.get('containers', []) + + lines = [] + managed = [c for c in containers if c.get('name') in ALLOWED_CONTAINERS] + if managed: + lines.append('Container status:') + for c in managed: + lines.append(f' {c["name"]}: {c["status"]}') + + if self._actions_taken: + lines.append('') + lines.append('Actions taken:') + for a in self._actions_taken: + lines.append(f' • {a}') + + if 'logs' in self._gathered_data: + lines.append('') + lines.append(f'Recent logs ({self._gathered_data.get("logs_container", "")}, last 80 lines):') + tail = self._gathered_data['logs'] + lines.append(tail[-2000:] if len(tail) > 2000 else tail) + + if 'git_log' in self._gathered_data: + lines.append('') + lines.append('Recent commits:') + lines.append(self._gathered_data['git_log']) + + summary = '\n'.join(lines) if lines else 'System check complete — all containers running.' + return AgentReport( + directive_id=directive_id, agent=self.name, status='complete', + summary=summary, actions_taken=self._actions_taken, + ) + + # --- Auto-heal: called by background health-check loop --- + + async def auto_heal(self, failing_systems: list[str]) -> None: + actions: list[str] = [] + try: + await self._st.notify_all_bot_channels( + f'[SysOps] Health degraded — failing: {", ".join(failing_systems)}\n' + f'Starting auto-recovery...' + ) + containers = await self._st.docker_ps() + container_map = {c['name']: c for c in containers} + + for system in failing_systems: + if system == 'ollama': + actions.append('ollama: external host (192.168.2.9) — cannot restart from here') + continue + target = SYSTEM_CONTAINER_MAP.get(system) + if not target: + continue + c_info = container_map.get(target, {}) + status = c_info.get('status', 'unknown') + try: + await self._st.docker_restart(target) + actions.append(f'{target}: restarted (was: {status})') + except Exception as exc: + actions.append(f'{target}: restart failed — {exc}') + + summary = '\n'.join(f' • {a}' for a in actions) or ' No actions taken.' + await self._st.notify_all_bot_channels( + f'[SysOps] Auto-recovery complete:\n{summary}\n' + f'Status will update on next health check.' + ) + logger.info('sysops auto_heal complete: %s', actions) + except Exception as exc: + logger.error('sysops auto_heal error: %s', exc) + try: + await self._st.notify_all_bot_channels(f'[SysOps] Auto-recovery error: {exc}') + except Exception: + pass + + # --- Sweep: scheduled read-only audit --- + + async def sweep(self) -> SweepReport: + findings: list[dict] = [] + try: + containers = await self._st.docker_ps() + for c in containers: + if c.get('name') not in ALLOWED_CONTAINERS: + continue + status = c.get('status', '') + if 'exited' in status.lower(): + findings.append({ + 'type': 'container_exited', 'container': c['name'], + 'status': status, 'severity': 'high', + }) + elif 'unhealthy' in status.lower(): + findings.append({ + 'type': 'container_unhealthy', 'container': c['name'], + 'status': status, 'severity': 'medium', + }) + git_st = await self._st.git_status() + if git_st != 'Clean working tree': + findings.append({ + 'type': 'git_uncommitted', 'details': git_st, 'severity': 'low', + }) + except Exception as exc: + logger.error('sysops sweep error: %s', exc) + return SweepReport(agent=self.name, findings=[], recommendations=[]) + return SweepReport(agent=self.name, findings=findings, recommendations=[]) + + # --- Helpers --- + + def _resolve_container(self, task: str) -> str | None: + for kw, name in _CONTAINER_KEYWORDS.items(): + if kw in task: + return name + return None diff --git a/agent_service/main.py b/agent_service/main.py index 3d415a2..f3e32a1 100644 --- a/agent_service/main.py +++ b/agent_service/main.py @@ -145,6 +145,9 @@ async def lifespan(app: FastAPI): except Exception as exc: logger.warning('Sweep coordinator not available: %s', exc) + # 9. Auto-heal background loop + asyncio.create_task(_auto_heal_loop()) + logger.info('ActiveBlue AI agent service started on port %d', settings.agent_service_port) yield @@ -168,6 +171,13 @@ def _register_specialist_agents(agent_registry, peer_bus, odoo, llm_router) -> N except Exception as exc: logger.warning('Could not register finance_agent: %s', exc) + try: + from .agents.sysops_agent import SysopsAgent + agent_registry.register('sysops_agent', SysopsAgent(odoo=odoo, llm=llm_router, peer_bus=peer_bus)) + logger.info('sysops_agent registered') + except Exception as exc: + logger.warning('Could not register sysops_agent: %s', exc) + specialist_map = { 'accounting_agent': 'AccountingAgent', 'crm_agent': 'CrmAgent', @@ -189,6 +199,27 @@ def _register_specialist_agents(agent_registry, peer_bus, odoo, llm_router) -> N logger.warning('Could not register %s: %s', agent_name, exc) +async def _auto_heal_loop(interval: int = 120) -> None: + """Check health every interval seconds; call sysops_agent.auto_heal() if degraded.""" + await asyncio.sleep(90) # let startup settle before first check + while True: + await asyncio.sleep(interval) + try: + from .routers.health import _get_failing_systems + failing = await _get_failing_systems() + if not failing: + continue + logger.warning('auto_heal_loop: failing systems: %s', failing) + registry = app_state.get_agent_registry() + sysops = registry.get_agent_instance('sysops_agent') if registry else None + if sysops: + await sysops.auto_heal(failing) + else: + logger.warning('auto_heal_loop: sysops_agent not registered, skipping') + except Exception as exc: + logger.warning('auto_heal_loop error: %s', exc) + + def _configure_logging(settings) -> None: level = getattr(logging, settings.log_level.upper(), logging.INFO) if settings.log_format == 'json': diff --git a/agent_service/prompts/master_system.txt b/agent_service/prompts/master_system.txt index 51419a2..80b9697 100644 --- a/agent_service/prompts/master_system.txt +++ b/agent_service/prompts/master_system.txt @@ -44,6 +44,9 @@ User: "how are sales this month?" or "show me the pipeline" User: "what projects are overdue?" -> {"needs_clarification": false, "clarification_question": null, "is_continuation": false, "agents": ["project_agent"], "intent_summary": "find overdue projects", "params": {}, "context_hints": []} +User: "restart the agent service" or "check the docker containers" or "pull the latest code" or "show me the agent logs" +-> {"needs_clarification": false, "clarification_question": null, "is_continuation": false, "agents": ["sysops_agent"], "intent_summary": "infrastructure operation", "params": {}, "context_hints": []} + Now classify the user's message in JSON only: { "needs_clarification": false, diff --git a/agent_service/prompts/sysops_system.txt b/agent_service/prompts/sysops_system.txt new file mode 100644 index 0000000..f3480ef --- /dev/null +++ b/agent_service/prompts/sysops_system.txt @@ -0,0 +1,23 @@ +You are the SysOps agent for ActiveBlue AI. You manage the Docker infrastructure +and git repository for the ActiveBlue AI system. + +Managed containers: + activeblue-agent — the AI agent service (FastAPI, port 8001) + activeblue-agent-db — agent Postgres database + odoo-web-1 — the Odoo 18 application + odoo-db-1 — the Odoo Postgres database + +Git repository: /workspace/odoo-ai (main branch) + +Your responsibilities: +- Report container status clearly +- Restart containers when asked or when health checks fail +- Pull latest code from git when requested +- Show relevant log output when diagnosing issues +- Notify users in Odoo chat whenever you take autonomous actions + +Rules: +- Only restart containers in the managed list above +- Never delete or stop containers permanently +- Always explain what you did and why +- If ollama is failing, report it as external (192.168.2.9) and outside your control diff --git a/agent_service/routers/health.py b/agent_service/routers/health.py index 4f20004..214ff9b 100644 --- a/agent_service/routers/health.py +++ b/agent_service/routers/health.py @@ -32,6 +32,41 @@ async def health(): return HealthResponse(status='ok', uptime_seconds=round(time.time() - _start_time, 1)) +async def _get_failing_systems() -> list[str]: + """Return a list of system names that are not reporting 'ok'.""" + from ..app_state import get_db_pool, get_master_agent, get_llm_router + failing = [] + + pool = get_db_pool() + if not pool: + failing.append('db') + else: + try: + async with pool.acquire(timeout=5) as conn: + await conn.fetchval('SELECT 1') + except Exception: + failing.append('db') + + master = get_master_agent() + if master is None: + failing.append('master_agent') + else: + if hasattr(master, '_odoo'): + try: + await asyncio.wait_for(master._odoo.ping(), timeout=5) + except Exception: + failing.append('odoo') + + llm_router = get_llm_router() + if llm_router and hasattr(llm_router, '_ollama'): + try: + await asyncio.wait_for(llm_router._ollama.ping(), timeout=5) + except Exception: + failing.append('ollama') + + return failing + + @router.get('/detailed', response_model=DetailedHealthResponse) async def health_detailed(): from ..app_state import get_db_pool, get_master_agent, get_llm_router diff --git a/agent_service/tools/sysops_tools.py b/agent_service/tools/sysops_tools.py new file mode 100644 index 0000000..3ea7ec4 --- /dev/null +++ b/agent_service/tools/sysops_tools.py @@ -0,0 +1,137 @@ +from __future__ import annotations +import asyncio +import functools +import logging + +logger = logging.getLogger(__name__) + +REPO_PATH = '/workspace/odoo-ai' + +ALLOWED_CONTAINERS = frozenset({ + 'activeblue-agent', + 'activeblue-agent-db', + 'odoo-web-1', + 'odoo-db-1', +}) + +# Maps health-check system names to the container responsible for them. +# 'ollama' is external (192.168.2.9) and cannot be managed from here. +SYSTEM_CONTAINER_MAP = { + 'db': 'activeblue-agent-db', + 'odoo': 'odoo-web-1', + 'master_agent': 'activeblue-agent', +} + + +class SysopsTools: + def __init__(self, odoo=None): + self._odoo = odoo + self._docker_client = None + + def _get_docker(self): + if self._docker_client is None: + import docker + self._docker_client = docker.from_env() + return self._docker_client + + async def _docker(self, fn, *args, **kwargs): + loop = asyncio.get_event_loop() + client = self._get_docker() + return await loop.run_in_executor(None, functools.partial(fn, client, *args, **kwargs)) + + async def docker_ps(self) -> list[dict]: + def _ps(client): + return [ + { + 'name': c.name, + 'status': c.status, + 'id': c.short_id, + 'image': c.image.tags[0] if c.image.tags else str(c.image.id)[:12], + } + for c in client.containers.list(all=True) + ] + return await self._docker(_ps) + + async def docker_logs(self, container: str, lines: int = 50) -> str: + if container not in ALLOWED_CONTAINERS: + raise ValueError(f'Container {container!r} not in allowed list') + def _logs(client): + c = client.containers.get(container) + return c.logs(tail=lines, stream=False).decode(errors='replace') + return await self._docker(_logs) + + async def docker_restart(self, container: str) -> str: + if container not in ALLOWED_CONTAINERS: + raise ValueError(f'Container {container!r} not in allowed list') + def _restart(client): + c = client.containers.get(container) + c.restart(timeout=30) + await self._docker(_restart) + logger.info('sysops: restarted %s', container) + return f'{container} restarted' + + async def _run_git(self, *args: str, cwd: str = REPO_PATH) -> str: + proc = await asyncio.create_subprocess_exec( + 'git', *args, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.STDOUT, + cwd=cwd, + ) + try: + out, _ = await asyncio.wait_for(proc.communicate(), timeout=60) + return out.decode(errors='replace').strip() + except asyncio.TimeoutError: + try: + proc.kill() + except Exception: + pass + raise TimeoutError('git command timed out') + + async def git_pull(self, repo_path: str = REPO_PATH) -> str: + out = await self._run_git('pull', 'origin', 'main', cwd=repo_path) + logger.info('sysops git pull: %s', out) + return out or 'Already up to date.' + + async def git_status(self, repo_path: str = REPO_PATH) -> str: + out = await self._run_git('status', '--short', cwd=repo_path) + return out or 'Clean working tree' + + async def git_log(self, repo_path: str = REPO_PATH, n: int = 5) -> str: + return await self._run_git('log', f'--max-count={n}', '--oneline', '--no-color', cwd=repo_path) + + async def notify_all_bot_channels(self, message: str) -> int: + """Post a message to every DM channel where the AI bot is a member.""" + if not self._odoo: + return 0 + try: + users = await self._odoo.search_read( + 'res.users', + [('login', 'in', ('activeblue_ai_bot', 'activeblue_ai_bot@local'))], + ['id', 'partner_id'], limit=1, + ) + if not users: + return 0 + bot_partner_id = users[0]['partner_id'][0] + channels = await self._odoo.search_read( + 'discuss.channel', + [('channel_member_ids.partner_id', '=', bot_partner_id), + ('channel_type', '=', 'chat')], + ['id'], limit=50, + ) + count = 0 + html = message.replace('\n', '
') + for ch in channels: + try: + await self._odoo.call('discuss.channel', 'message_post', [[ch['id']]], { + 'body': html, + 'author_id': bot_partner_id, + 'message_type': 'comment', + 'subtype_xmlid': 'mail.mt_comment', + }) + count += 1 + except Exception as exc: + logger.warning('sysops notify ch=%s failed: %s', ch['id'], exc) + return count + except Exception as exc: + logger.warning('notify_all_bot_channels failed: %s', exc) + return 0 diff --git a/docker-compose.yml b/docker-compose.yml index 096bd5e..8e728a6 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -11,6 +11,11 @@ services: depends_on: agent-db: condition: service_healthy + volumes: + - /var/run/docker.sock:/var/run/docker.sock + - /root/odoo:/workspace + - /root/.ssh:/root/.ssh:ro + - /root/.gitconfig:/root/.gitconfig:ro networks: - activeblue-net healthcheck: diff --git a/requirements.txt b/requirements.txt index f2342f8..bdc3d55 100644 --- a/requirements.txt +++ b/requirements.txt @@ -16,3 +16,4 @@ pdfplumber==0.11.4 Pillow==10.4.0 pytesseract==0.3.13 python-multipart==0.0.12 +docker==7.1.0