Files
odoo-ai/agent_service/agents/sysops_agent.py
Carlos Garcia 8d1727b498 feat: sysops_agent — Docker/git self-management with auto-heal
Adds a new specialist agent that gives the AI system control over its
own infrastructure:

- sysops_tools.py: docker SDK (ps/logs/restart) + git CLI (pull/status/log)
  + Odoo channel notifier for autonomous action broadcasts
- sysops_agent.py: BaseAgent subclass handling on-demand chat requests,
  auto_heal() triggered by health failures, and sweep() for audits
- Background auto-heal loop (main.py): runs every 2 minutes, calls
  _get_failing_systems() and triggers auto_heal() when degraded
- health.py: extracted _get_failing_systems() helper reused by both
  the /health/detailed endpoint and the auto-heal loop
- docker-compose.yml: mount docker socket + /root/odoo workspace +
  SSH keys for git authentication
- Dockerfile: add git to apt-get
- requirements.txt: add docker==7.1.0 Python SDK

Auto-heal behavior:
  - Detects failing containers, restarts them, notifies all bot DM channels
  - Ollama (192.168.2.9) is flagged as external and skipped
  - On-demand via chat: "restart agent", "check logs", "pull latest code"

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-19 17:01:57 -04:00

248 lines
9.8 KiB
Python

from __future__ import annotations
import logging
from .base_agent import BaseAgent, AgentReport, SweepReport
from ..tools.sysops_tools import SysopsTools, ALLOWED_CONTAINERS, SYSTEM_CONTAINER_MAP, REPO_PATH
logger = logging.getLogger(__name__)
SYSOPS_TOOLS = [
{'name': 'docker_ps',
'description': 'List all Docker containers with their current status',
'parameters': {}},
{'name': 'docker_logs',
'description': 'Retrieve recent log lines from a container',
'parameters': {'container': {'type': 'string'},
'lines': {'type': 'integer', 'optional': True}}},
{'name': 'docker_restart',
'description': 'Restart a Docker container by name',
'parameters': {'container': {'type': 'string'}}},
{'name': 'git_pull',
'description': 'Pull latest code from git remote (main branch)',
'parameters': {'repo_path': {'type': 'string', 'optional': True}}},
{'name': 'git_status',
'description': 'Check git working tree status',
'parameters': {'repo_path': {'type': 'string', 'optional': True}}},
{'name': 'git_log',
'description': 'Show recent git commits',
'parameters': {'repo_path': {'type': 'string', 'optional': True},
'n': {'type': 'integer', 'optional': True}}},
]
# Maps task keywords to container names for on-demand restart requests
_CONTAINER_KEYWORDS = {
'agent': 'activeblue-agent',
'activeblue-agent': 'activeblue-agent',
'agent-db': 'activeblue-agent-db',
'activeblue-agent-db': 'activeblue-agent-db',
'odoo': 'odoo-web-1',
'odoo-web': 'odoo-web-1',
'odoo-web-1': 'odoo-web-1',
}
class SysopsAgent(BaseAgent):
name = 'sysops_agent'
domain = 'infrastructure'
required_odoo_module = 'base'
system_prompt_file = 'sysops_system.txt'
tools = SYSOPS_TOOLS
auto_rag = False
def __init__(self, odoo, llm, peer_bus=None):
super().__init__(odoo, llm, peer_bus)
self._st = SysopsTools(odoo=odoo)
self._current_plan: dict = {}
self._actions_taken: list[str] = []
self._gathered_data: dict = {}
# --- Tool bridge methods (available to _loop() if used) ---
async def _tool_docker_ps(self) -> list:
return await self._st.docker_ps()
async def _tool_docker_logs(self, container: str, lines: int = 50) -> str:
return await self._st.docker_logs(container, lines)
async def _tool_docker_restart(self, container: str) -> str:
return await self._st.docker_restart(container)
async def _tool_git_pull(self, repo_path: str = None) -> str:
return await self._st.git_pull(repo_path or REPO_PATH)
async def _tool_git_status(self, repo_path: str = None) -> str:
return await self._st.git_status(repo_path or REPO_PATH)
async def _tool_git_log(self, repo_path: str = None, n: int = 5) -> str:
return await self._st.git_log(repo_path or REPO_PATH, n)
# --- BaseAgent lifecycle (on-demand chat requests) ---
async def _plan(self) -> dict:
task = (self._directive.task if self._directive else '').lower()
plan = {
'task': task,
'wants_restart': any(k in task for k in ('restart', 'reboot')),
'wants_logs': any(k in task for k in ('log', 'crash', 'error', 'fail', 'why', 'slow')),
'wants_git': any(k in task for k in ('pull', 'git', 'update code', 'deploy', 'latest code')),
'wants_status': True,
}
self._current_plan = plan
return plan
async def _gather(self, plan: dict) -> dict:
data: dict = {}
data['containers'] = await self._st.docker_ps()
if plan.get('wants_git'):
data['git_status'] = await self._st.git_status()
data['git_log'] = await self._st.git_log()
if plan.get('wants_logs'):
target = self._resolve_container(plan['task']) or 'activeblue-agent'
data['logs_container'] = target
data['logs'] = await self._st.docker_logs(target, lines=80)
self._gathered_data = data
return data
async def _reason(self) -> dict:
containers = self._gathered_data.get('containers', [])
unhealthy = [
c for c in containers
if c.get('name') in ALLOWED_CONTAINERS
and ('exited' in c.get('status', '').lower()
or 'unhealthy' in c.get('status', '').lower())
]
return {'unhealthy': unhealthy}
async def _act(self, reasoning: dict) -> list:
self._actions_taken = []
plan = self._current_plan
task = plan.get('task', '')
if plan.get('wants_restart'):
target = self._resolve_container(task)
if target:
result = await self._st.docker_restart(target)
self._actions_taken.append(result)
await self._st.notify_all_bot_channels(
f'[SysOps] {target} restarted on user request.')
if plan.get('wants_git'):
result = await self._st.git_pull()
self._actions_taken.append(f'git pull: {result}')
if 'Already up to date' not in result:
await self._st.notify_all_bot_channels(
f'[SysOps] Pulled latest code:\n{result}')
return self._actions_taken
async def _report(self) -> AgentReport:
directive_id = self._directive.directive_id if self._directive else ''
containers = self._gathered_data.get('containers', [])
lines = []
managed = [c for c in containers if c.get('name') in ALLOWED_CONTAINERS]
if managed:
lines.append('Container status:')
for c in managed:
lines.append(f' {c["name"]}: {c["status"]}')
if self._actions_taken:
lines.append('')
lines.append('Actions taken:')
for a in self._actions_taken:
lines.append(f'{a}')
if 'logs' in self._gathered_data:
lines.append('')
lines.append(f'Recent logs ({self._gathered_data.get("logs_container", "")}, last 80 lines):')
tail = self._gathered_data['logs']
lines.append(tail[-2000:] if len(tail) > 2000 else tail)
if 'git_log' in self._gathered_data:
lines.append('')
lines.append('Recent commits:')
lines.append(self._gathered_data['git_log'])
summary = '\n'.join(lines) if lines else 'System check complete — all containers running.'
return AgentReport(
directive_id=directive_id, agent=self.name, status='complete',
summary=summary, actions_taken=self._actions_taken,
)
# --- Auto-heal: called by background health-check loop ---
async def auto_heal(self, failing_systems: list[str]) -> None:
actions: list[str] = []
try:
await self._st.notify_all_bot_channels(
f'[SysOps] Health degraded — failing: {", ".join(failing_systems)}\n'
f'Starting auto-recovery...'
)
containers = await self._st.docker_ps()
container_map = {c['name']: c for c in containers}
for system in failing_systems:
if system == 'ollama':
actions.append('ollama: external host (192.168.2.9) — cannot restart from here')
continue
target = SYSTEM_CONTAINER_MAP.get(system)
if not target:
continue
c_info = container_map.get(target, {})
status = c_info.get('status', 'unknown')
try:
await self._st.docker_restart(target)
actions.append(f'{target}: restarted (was: {status})')
except Exception as exc:
actions.append(f'{target}: restart failed — {exc}')
summary = '\n'.join(f'{a}' for a in actions) or ' No actions taken.'
await self._st.notify_all_bot_channels(
f'[SysOps] Auto-recovery complete:\n{summary}\n'
f'Status will update on next health check.'
)
logger.info('sysops auto_heal complete: %s', actions)
except Exception as exc:
logger.error('sysops auto_heal error: %s', exc)
try:
await self._st.notify_all_bot_channels(f'[SysOps] Auto-recovery error: {exc}')
except Exception:
pass
# --- Sweep: scheduled read-only audit ---
async def sweep(self) -> SweepReport:
findings: list[dict] = []
try:
containers = await self._st.docker_ps()
for c in containers:
if c.get('name') not in ALLOWED_CONTAINERS:
continue
status = c.get('status', '')
if 'exited' in status.lower():
findings.append({
'type': 'container_exited', 'container': c['name'],
'status': status, 'severity': 'high',
})
elif 'unhealthy' in status.lower():
findings.append({
'type': 'container_unhealthy', 'container': c['name'],
'status': status, 'severity': 'medium',
})
git_st = await self._st.git_status()
if git_st != 'Clean working tree':
findings.append({
'type': 'git_uncommitted', 'details': git_st, 'severity': 'low',
})
except Exception as exc:
logger.error('sysops sweep error: %s', exc)
return SweepReport(agent=self.name, findings=[], recommendations=[])
return SweepReport(agent=self.name, findings=findings, recommendations=[])
# --- Helpers ---
def _resolve_container(self, task: str) -> str | None:
for kw, name in _CONTAINER_KEYWORDS.items():
if kw in task:
return name
return None