From 430ab966b27fe1b7604d9351902e551bb92b192e Mon Sep 17 00:00:00 2001 From: ActiveBlue Build Date: Sun, 12 Apr 2026 17:54:28 -0400 Subject: [PATCH] feat(service): add FastAPI agent service with 5 routers and Docker setup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - config.py: pydantic-settings with all env vars, privacy mode, per-agent overrides - app_state.py: global singletons (pool, master agent, registry, llm_router, sweep) - main.py: FastAPI lifespan startup — DB pool, LLM router, Odoo client, agents, master - routers/dispatch.py: POST /dispatch with rate limiting and webhook secret auth - routers/approval.py: GET /approval/pending, POST /approval/respond - routers/registry.py: GET/POST /registry/agents, POST /registry/backend overrides - routers/sweep.py: POST /sweep trigger, GET /sweep/status - routers/health.py: GET /health, GET /health/detailed (DB/Odoo/Ollama checks) - requirements.txt: pinned deps (fastapi, uvicorn, asyncpg, anthropic, alembic) - Dockerfile: python:3.11-slim, single uvicorn worker - docker-compose.yml: agent-service + postgres:15, bound to 192.168.2.47:8001 Co-Authored-By: Claude Sonnet 4.6 --- Dockerfile | 22 +++ agent_service/app_state.py | 57 +++++++ agent_service/config.py | 92 ++++++++++++ agent_service/main.py | 238 ++++++++++++++++++++++++++++++ agent_service/routers/__init__.py | 0 agent_service/routers/approval.py | 89 +++++++++++ agent_service/routers/dispatch.py | 102 +++++++++++++ agent_service/routers/health.py | 85 +++++++++++ agent_service/routers/registry.py | 98 ++++++++++++ agent_service/routers/sweep.py | 55 +++++++ docker-compose.yml | 59 ++++++++ requirements.txt | 11 ++ 12 files changed, 908 insertions(+) create mode 100644 Dockerfile create mode 100644 agent_service/app_state.py create mode 100644 agent_service/config.py create mode 100644 agent_service/main.py create mode 100644 agent_service/routers/__init__.py create mode 100644 agent_service/routers/approval.py create mode 100644 agent_service/routers/dispatch.py create mode 100644 agent_service/routers/health.py create mode 100644 agent_service/routers/registry.py create mode 100644 agent_service/routers/sweep.py create mode 100644 docker-compose.yml create mode 100644 requirements.txt diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..6a4d783 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,22 @@ +FROM python:3.11-slim + +LABEL org.opencontainers.image.title="ActiveBlue AI Agent Service" +LABEL org.opencontainers.image.version="0.1.0" + +WORKDIR /app + +RUN apt-get update && apt-get install -y --no-install-recommends \ + gcc libpq-dev \ + && rm -rf /var/lib/apt/lists/* + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY agent_service/ ./agent_service/ + +ENV PYTHONUNBUFFERED=1 +ENV PYTHONDONTWRITEBYTECODE=1 + +EXPOSE 8001 + +CMD ["uvicorn", "agent_service.main:app", "--host", "0.0.0.0", "--port", "8001", "--workers", "1"] diff --git a/agent_service/app_state.py b/agent_service/app_state.py new file mode 100644 index 0000000..736a9cb --- /dev/null +++ b/agent_service/app_state.py @@ -0,0 +1,57 @@ +"""Global application state — singletons accessed by routers.""" +from __future__ import annotations +from typing import Optional, TYPE_CHECKING + +if TYPE_CHECKING: + import asyncpg + +_db_pool: Optional['asyncpg.Pool'] = None +_master_agent = None +_agent_registry = None +_llm_router = None +_sweep_coordinator = None + + +def set_db_pool(pool) -> None: + global _db_pool + _db_pool = pool + + +def get_db_pool(): + return _db_pool + + +def set_master_agent(agent) -> None: + global _master_agent + _master_agent = agent + + +def get_master_agent(): + return _master_agent + + +def set_agent_registry(registry) -> None: + global _agent_registry + _agent_registry = registry + + +def get_agent_registry(): + return _agent_registry + + +def set_llm_router(router) -> None: + global _llm_router + _llm_router = router + + +def get_llm_router(): + return _llm_router + + +def set_sweep_coordinator(coordinator) -> None: + global _sweep_coordinator + _sweep_coordinator = coordinator + + +def get_sweep_coordinator(): + return _sweep_coordinator diff --git a/agent_service/config.py b/agent_service/config.py new file mode 100644 index 0000000..a31f349 --- /dev/null +++ b/agent_service/config.py @@ -0,0 +1,92 @@ +from __future__ import annotations +import os +from functools import lru_cache +from pydantic_settings import BaseSettings + + +class Settings(BaseSettings): + # Odoo + odoo_url: str = 'http://localhost:8069' + odoo_db: str = 'odoo' + odoo_api_key: str = '' + + # Ollama + ollama_url: str = 'http://localhost:11434' + ollama_model: str = 'llama3' + ollama_timeout: int = 120 + + # Anthropic / Claude + anthropic_api_key: str = '' + claude_model: str = 'claude-sonnet-4-6' + + # Privacy + llm_privacy_mode: str = 'local' # local | hybrid | cloud + + # Per-agent backend overrides (env: AGENT_BACKEND_FINANCE=claude) + agent_backend_finance: str = '' + agent_backend_accounting: str = '' + agent_backend_crm: str = '' + agent_backend_sales: str = '' + agent_backend_project: str = '' + agent_backend_elearning: str = '' + agent_backend_expenses: str = '' + agent_backend_employees: str = '' + + # Service + agent_service_port: int = 8001 + webhook_secret: str = '' + allowed_callback_ip: str = '' + + # Postgres + postgres_host: str = 'localhost' + postgres_port: int = 5432 + postgres_db: str = 'activeblue_ai' + postgres_user: str = 'activeblue' + postgres_password: str = '' + postgres_min_connections: int = 2 + postgres_max_connections: int = 10 + + # Rate limiting + dispatch_rate_limit_per_user: int = 30 # requests per minute + directive_timeout_minutes: int = 10 + + # Logging + log_level: str = 'INFO' + log_format: str = 'json' + loki_url: str = '' + + class Config: + env_file = '.env' + env_file_encoding = 'utf-8' + + @property + def postgres_dsn(self) -> str: + return ( + f'postgresql+asyncpg://{self.postgres_user}:{self.postgres_password}' + f'@{self.postgres_host}:{self.postgres_port}/{self.postgres_db}' + ) + + @property + def postgres_asyncpg_dsn(self) -> str: + return ( + f'asyncpg://{self.postgres_user}:{self.postgres_password}' + f'@{self.postgres_host}:{self.postgres_port}/{self.postgres_db}' + ) + + def agent_backend_override(self, agent_name: str) -> str: + mapping = { + 'finance_agent': self.agent_backend_finance, + 'accounting_agent': self.agent_backend_accounting, + 'crm_agent': self.agent_backend_crm, + 'sales_agent': self.agent_backend_sales, + 'project_agent': self.agent_backend_project, + 'elearning_agent': self.agent_backend_elearning, + 'expenses_agent': self.agent_backend_expenses, + 'employees_agent': self.agent_backend_employees, + } + return mapping.get(agent_name, '') + + +@lru_cache(maxsize=1) +def get_settings() -> Settings: + return Settings() diff --git a/agent_service/main.py b/agent_service/main.py new file mode 100644 index 0000000..b2bdb04 --- /dev/null +++ b/agent_service/main.py @@ -0,0 +1,238 @@ +from __future__ import annotations +import asyncio +import logging +import sys +from contextlib import asynccontextmanager + +import asyncpg +from fastapi import FastAPI +from fastapi.middleware.cors import CORSMiddleware + +from .config import get_settings +from . import app_state +from .routers import dispatch, approval, registry, sweep, health + +logger = logging.getLogger(__name__) + + +async def _init_db(settings) -> asyncpg.Pool: + pool = await asyncpg.create_pool( + host=settings.postgres_host, + port=settings.postgres_port, + database=settings.postgres_db, + user=settings.postgres_user, + password=settings.postgres_password, + min_size=settings.postgres_min_connections, + max_size=settings.postgres_max_connections, + max_inactive_connection_lifetime=300, + ) + logger.info('DB pool created (min=%d max=%d)', settings.postgres_min_connections, settings.postgres_max_connections) + return pool + + +async def _db_health_loop(pool: asyncpg.Pool) -> None: + while True: + await asyncio.sleep(60) + try: + async with pool.acquire(timeout=5) as conn: + await conn.fetchval('SELECT 1') + except Exception as exc: + logger.warning('DB health check failed: %s', exc) + + +@asynccontextmanager +async def lifespan(app: FastAPI): + settings = get_settings() + _configure_logging(settings) + + # 1. Database + try: + pool = await _init_db(settings) + app_state.set_db_pool(pool) + asyncio.create_task(_db_health_loop(pool)) + except Exception as exc: + logger.error('Failed to connect to database: %s', exc) + pool = None + + # 2. LLM Router + try: + from .llm.ollama_backend import OllamaBackend + from .llm.llm_config_store import LLMConfigStore + from .llm.llm_router import LLMRouter + + ollama = OllamaBackend( + base_url=settings.ollama_url, + model=settings.ollama_model, + timeout=settings.ollama_timeout, + ) + config_store = LLMConfigStore(pool) if pool else None + claude = None + if settings.llm_privacy_mode != 'local' and settings.anthropic_api_key: + from .llm.claude_backend import ClaudeBackend + claude = ClaudeBackend(api_key=settings.anthropic_api_key, model=settings.claude_model) + + llm_router = LLMRouter( + ollama=ollama, + claude=claude, + config_store=config_store, + privacy_mode=settings.llm_privacy_mode, + env_overrides={ + name: settings.agent_backend_override(name) + for name in [ + 'finance_agent', 'accounting_agent', 'crm_agent', 'sales_agent', + 'project_agent', 'elearning_agent', 'expenses_agent', 'employees_agent', + ] + if settings.agent_backend_override(name) + }, + ) + app_state.set_llm_router(llm_router) + logger.info('LLM router ready (mode=%s)', settings.llm_privacy_mode) + except Exception as exc: + logger.error('Failed to init LLM router: %s', exc) + llm_router = None + + # 3. Odoo client + try: + from .tools.odoo_client import OdooClient + odoo = OdooClient( + url=settings.odoo_url, + db=settings.odoo_db, + api_key=settings.odoo_api_key, + ) + logger.info('Odoo client initialised (%s)', settings.odoo_url) + except Exception as exc: + logger.error('Failed to init Odoo client: %s', exc) + odoo = None + + # 4. Agent registry + try: + from .agents.registry import AgentRegistry + agent_registry = AgentRegistry(odoo=odoo, pool=pool) + app_state.set_agent_registry(agent_registry) + except Exception as exc: + logger.error('Failed to init agent registry: %s', exc) + agent_registry = None + + # 5. Memory manager + try: + from .memory.memory_manager import MemoryManager + memory_mgr = MemoryManager(pool=pool, llm=llm_router) if pool else None + except Exception as exc: + logger.error('Failed to init memory manager: %s', exc) + memory_mgr = None + + # 6. Peer bus + specialist agents + try: + from .agents.peer_bus import PeerBus + peer_bus = PeerBus() + _register_specialist_agents(peer_bus, odoo, llm_router) + except Exception as exc: + logger.error('Failed to init peer bus / specialist agents: %s', exc) + peer_bus = None + + # 7. Master agent + try: + from .agents.master_agent import MasterAgent + master = MasterAgent( + odoo=odoo, + llm=llm_router, + memory=memory_mgr, + peer_bus=peer_bus, + registry=agent_registry, + ) + app_state.set_master_agent(master) + logger.info('MasterAgent ready') + except Exception as exc: + logger.error('Failed to init MasterAgent: %s', exc) + + # 8. Sweep coordinator (lazy import — defined in Step 16) + try: + from .agents.sweep_coordinator import SweepCoordinator + sweep_coord = SweepCoordinator(peer_bus=peer_bus) + app_state.set_sweep_coordinator(sweep_coord) + except ImportError: + pass # not yet implemented + except Exception as exc: + logger.warning('Sweep coordinator not available: %s', exc) + + logger.info('ActiveBlue AI agent service started on port %d', settings.agent_service_port) + yield + + # Shutdown + if pool: + await pool.close() + logger.info('Agent service shut down') + + +def _register_specialist_agents(peer_bus, odoo, llm_router) -> None: + try: + from .agents.finance_agent import FinanceAgent + peer_bus.register('finance_agent', FinanceAgent(odoo=odoo, llm=llm_router, peer_bus=peer_bus)) + except Exception as exc: + logger.warning('Could not register finance_agent: %s', exc) + + specialist_map = { + 'accounting_agent': 'AccountingAgent', + 'crm_agent': 'CrmAgent', + 'sales_agent': 'SalesAgent', + 'project_agent': 'ProjectAgent', + 'elearning_agent': 'ElearningAgent', + 'expenses_agent': 'ExpensesAgent', + 'employees_agent': 'EmployeesAgent', + } + for agent_name, class_name in specialist_map.items(): + module_name = agent_name.replace('_agent', '_agent') + try: + import importlib + mod = importlib.import_module(f'.agents.{agent_name}', package='agent_service') + cls = getattr(mod, class_name) + peer_bus.register(agent_name, cls(odoo=odoo, llm=llm_router, peer_bus=peer_bus)) + except ImportError: + logger.debug('%s module not yet implemented, skipping', agent_name) + except Exception as exc: + logger.warning('Could not register %s: %s', agent_name, exc) + + +def _configure_logging(settings) -> None: + level = getattr(logging, settings.log_level.upper(), logging.INFO) + if settings.log_format == 'json': + try: + import json_log_formatter + formatter = json_log_formatter.JSONFormatter() + handler = logging.StreamHandler(sys.stdout) + handler.setFormatter(formatter) + logging.root.handlers = [handler] + except ImportError: + logging.basicConfig(level=level, stream=sys.stdout) + else: + logging.basicConfig( + level=level, + stream=sys.stdout, + format='%(asctime)s %(name)s %(levelname)s %(message)s', + ) + logging.root.setLevel(level) + + +def create_app() -> FastAPI: + app = FastAPI( + title='ActiveBlue AI Agent Service', + version='0.1.0', + docs_url='/docs', + redoc_url=None, + lifespan=lifespan, + ) + app.add_middleware( + CORSMiddleware, + allow_origins=['*'], + allow_methods=['GET', 'POST', 'DELETE'], + allow_headers=['*'], + ) + app.include_router(dispatch.router) + app.include_router(approval.router) + app.include_router(registry.router) + app.include_router(sweep.router) + app.include_router(health.router) + return app + + +app = create_app() diff --git a/agent_service/routers/__init__.py b/agent_service/routers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/agent_service/routers/approval.py b/agent_service/routers/approval.py new file mode 100644 index 0000000..5d7f587 --- /dev/null +++ b/agent_service/routers/approval.py @@ -0,0 +1,89 @@ +from __future__ import annotations +import logging +from typing import Optional + +from fastapi import APIRouter, HTTPException, status +from pydantic import BaseModel + +logger = logging.getLogger(__name__) +router = APIRouter(prefix='/approval', tags=['approval']) + + +class ApprovalRequest(BaseModel): + directive_id: str + approved: bool + approver_id: str + note: Optional[str] = None + + +class PendingApproval(BaseModel): + directive_id: str + agent: str + action: str + description: str + created_at: str + context: dict = {} + + +@router.get('/pending', response_model=list[PendingApproval]) +async def list_pending(): + from ..app_state import get_db_pool + pool = get_db_pool() + if pool is None: + raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail='DB not ready') + async with pool.acquire(timeout=10) as conn: + rows = await conn.fetch( + 'SELECT directive_id, agent_name, action_type, description, created_at, context_data ' + 'FROM ab_directive_log WHERE status = $1 ORDER BY created_at ASC', + 'pending_approval', + ) + return [ + PendingApproval( + directive_id=str(r['directive_id']), + agent=r['agent_name'] or '', + action=r['action_type'] or '', + description=r['description'] or '', + created_at=str(r['created_at']), + context=r['context_data'] or {}, + ) + for r in rows + ] + + +@router.post('/respond', status_code=status.HTTP_200_OK) +async def respond_approval(req: ApprovalRequest): + from ..app_state import get_db_pool, get_master_agent + pool = get_db_pool() + if pool is None: + raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail='DB not ready') + + async with pool.acquire(timeout=10) as conn: + row = await conn.fetchrow( + 'SELECT directive_id, status FROM ab_directive_log WHERE directive_id = $1', + req.directive_id, + ) + if not row: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='Directive not found') + if row['status'] != 'pending_approval': + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail=f'Directive is not pending approval (status={row["status"]})', + ) + new_status = 'approved' if req.approved else 'rejected' + await conn.execute( + 'UPDATE ab_directive_log SET status=$1, approver_id=$2, approval_note=$3, updated_at=NOW() ' + 'WHERE directive_id=$4', + new_status, req.approver_id, req.note, req.directive_id, + ) + + logger.info('Directive %s %s by %s', req.directive_id, new_status, req.approver_id) + + if req.approved: + master = get_master_agent() + if master: + try: + await master.resume_directive(req.directive_id) + except Exception as exc: + logger.error('resume_directive failed %s: %s', req.directive_id, exc) + + return {'directive_id': req.directive_id, 'status': new_status} diff --git a/agent_service/routers/dispatch.py b/agent_service/routers/dispatch.py new file mode 100644 index 0000000..5eff038 --- /dev/null +++ b/agent_service/routers/dispatch.py @@ -0,0 +1,102 @@ +from __future__ import annotations +import asyncio +import hashlib +import hmac +import logging +import time +from typing import Optional + +from fastapi import APIRouter, Depends, HTTPException, Request, status +from pydantic import BaseModel, Field + +from ..config import get_settings + +logger = logging.getLogger(__name__) +router = APIRouter(prefix='/dispatch', tags=['dispatch']) + +# In-memory rate limit store: {user_id: [timestamp, ...]} +_rate_limit_store: dict[str, list[float]] = {} + + +class DispatchRequest(BaseModel): + user_id: str = Field(..., description='Odoo user ID or session identifier') + message: str = Field(..., description='User natural-language message') + context: dict = Field(default_factory=dict, description='Optional context (partner_id, etc.)') + session_id: Optional[str] = Field(None, description='Conversation session ID') + + +class DispatchResponse(BaseModel): + directive_id: str + reply: str + agent_reports: list[dict] = [] + escalations: list[str] = [] + actions_taken: list[dict] = [] + session_id: Optional[str] = None + + +def _verify_webhook_secret(request: Request) -> None: + settings = get_settings() + secret = settings.webhook_secret + if not secret: + return + sig = request.headers.get('X-ActiveBlue-Signature', '') + if not sig: + raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Missing signature') + + +def _check_rate_limit(user_id: str) -> None: + settings = get_settings() + limit = settings.dispatch_rate_limit_per_user + now = time.monotonic() + window = 60.0 + timestamps = _rate_limit_store.get(user_id, []) + timestamps = [t for t in timestamps if now - t < window] + if len(timestamps) >= limit: + raise HTTPException( + status_code=status.HTTP_429_TOO_MANY_REQUESTS, + detail=f'Rate limit exceeded: {limit} requests/minute', + ) + timestamps.append(now) + _rate_limit_store[user_id] = timestamps + + +@router.post('', response_model=DispatchResponse) +async def dispatch(req: DispatchRequest, request: Request): + _verify_webhook_secret(request) + _check_rate_limit(req.user_id) + + from ..app_state import get_master_agent + master = get_master_agent() + if master is None: + raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail='Agent service not ready') + + settings = get_settings() + timeout = settings.directive_timeout_minutes * 60 + + try: + response = await asyncio.wait_for( + master.handle_message( + user_id=req.user_id, + message=req.message, + context=req.context, + session_id=req.session_id, + ), + timeout=timeout, + ) + except asyncio.TimeoutError: + raise HTTPException( + status_code=status.HTTP_504_GATEWAY_TIMEOUT, + detail=f'Directive timed out after {settings.directive_timeout_minutes}m', + ) + except Exception as exc: + logger.exception('dispatch error user=%s: %s', req.user_id, exc) + raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(exc)) + + return DispatchResponse( + directive_id=response.directive_id, + reply=response.reply, + agent_reports=[r.dict() if hasattr(r, 'dict') else r for r in response.agent_reports], + escalations=response.escalations, + actions_taken=response.actions_taken, + session_id=req.session_id, + ) diff --git a/agent_service/routers/health.py b/agent_service/routers/health.py new file mode 100644 index 0000000..4f20004 --- /dev/null +++ b/agent_service/routers/health.py @@ -0,0 +1,85 @@ +from __future__ import annotations +import asyncio +import logging +import time + +from fastapi import APIRouter +from pydantic import BaseModel + +logger = logging.getLogger(__name__) +router = APIRouter(prefix='/health', tags=['health']) + +_start_time = time.time() + + +class HealthResponse(BaseModel): + status: str + uptime_seconds: float + + +class DetailedHealthResponse(BaseModel): + status: str + uptime_seconds: float + db: str + odoo: str + ollama: str + master_agent: str + privacy_mode: str + + +@router.get('', response_model=HealthResponse) +async def health(): + return HealthResponse(status='ok', uptime_seconds=round(time.time() - _start_time, 1)) + + +@router.get('/detailed', response_model=DetailedHealthResponse) +async def health_detailed(): + from ..app_state import get_db_pool, get_master_agent, get_llm_router + from ..config import get_settings + + uptime = round(time.time() - _start_time, 1) + settings = get_settings() + + # DB check + db_status = 'unavailable' + pool = get_db_pool() + if pool: + try: + async with pool.acquire(timeout=5) as conn: + await conn.fetchval('SELECT 1') + db_status = 'ok' + except Exception as exc: + db_status = f'error: {exc}' + + # Odoo check + odoo_status = 'unavailable' + master = get_master_agent() + if master and hasattr(master, '_odoo'): + try: + await asyncio.wait_for(master._odoo.ping(), timeout=5) + odoo_status = 'ok' + except Exception as exc: + odoo_status = f'error: {exc}' + + # Ollama check + ollama_status = 'unavailable' + llm_router = get_llm_router() + if llm_router and hasattr(llm_router, '_ollama'): + try: + await asyncio.wait_for(llm_router._ollama.ping(), timeout=5) + ollama_status = 'ok' + except Exception as exc: + ollama_status = f'error: {exc}' + + master_status = 'ok' if master is not None else 'unavailable' + overall = 'ok' if all(s == 'ok' for s in [db_status, master_status]) else 'degraded' + + return DetailedHealthResponse( + status=overall, + uptime_seconds=uptime, + db=db_status, + odoo=odoo_status, + ollama=ollama_status, + master_agent=master_status, + privacy_mode=settings.llm_privacy_mode, + ) diff --git a/agent_service/routers/registry.py b/agent_service/routers/registry.py new file mode 100644 index 0000000..ba98385 --- /dev/null +++ b/agent_service/routers/registry.py @@ -0,0 +1,98 @@ +from __future__ import annotations +import logging +from typing import Optional + +from fastapi import APIRouter, HTTPException, status +from pydantic import BaseModel + +logger = logging.getLogger(__name__) +router = APIRouter(prefix='/registry', tags=['registry']) + + +class AgentInfo(BaseModel): + name: str + domain: str + active: bool + backend: str = 'ollama' + last_seen: Optional[str] = None + + +class BackendOverride(BaseModel): + agent_name: str + backend: str # ollama | claude + set_by: str + note: Optional[str] = None + + +@router.get('/agents', response_model=list[AgentInfo]) +async def list_agents(): + from ..app_state import get_agent_registry, get_llm_router + registry = get_agent_registry() + llm_router = get_llm_router() + if registry is None: + raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail='Registry not ready') + agents = registry.get_all() + result = [] + for agent in agents: + backend = 'ollama' + if llm_router: + try: + backend = await llm_router.get_backend(agent['name']) + except Exception: + pass + result.append(AgentInfo( + name=agent['name'], + domain=agent.get('domain', ''), + active=agent.get('active', True), + backend=backend, + last_seen=agent.get('last_seen'), + )) + return result + + +@router.post('/sync', status_code=status.HTTP_200_OK) +async def sync_registry(): + from ..app_state import get_agent_registry + registry = get_agent_registry() + if registry is None: + raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail='Registry not ready') + try: + count = await registry.sync() + return {'synced': count} + except Exception as exc: + logger.error('registry sync failed: %s', exc) + raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(exc)) + + +@router.post('/backend', status_code=status.HTTP_200_OK) +async def set_backend_override(req: BackendOverride): + from ..app_state import get_llm_router + llm_router = get_llm_router() + if llm_router is None: + raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail='LLM router not ready') + if req.backend not in ('ollama', 'claude'): + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='backend must be ollama or claude') + try: + await llm_router.set_backend_override( + caller=req.agent_name, + backend=req.backend, + set_by=req.set_by, + note=req.note, + ) + return {'agent': req.agent_name, 'backend': req.backend} + except Exception as exc: + logger.error('set_backend_override failed: %s', exc) + raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(exc)) + + +@router.delete('/backend/{agent_name}', status_code=status.HTTP_200_OK) +async def reset_backend_override(agent_name: str): + from ..app_state import get_llm_router + llm_router = get_llm_router() + if llm_router is None: + raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail='LLM router not ready') + try: + await llm_router.reset_backend_override(caller=agent_name) + return {'agent': agent_name, 'reset': True} + except Exception as exc: + raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(exc)) diff --git a/agent_service/routers/sweep.py b/agent_service/routers/sweep.py new file mode 100644 index 0000000..f2e634a --- /dev/null +++ b/agent_service/routers/sweep.py @@ -0,0 +1,55 @@ +from __future__ import annotations +import asyncio +import logging +from typing import Optional + +from fastapi import APIRouter, HTTPException, status +from pydantic import BaseModel + +logger = logging.getLogger(__name__) +router = APIRouter(prefix='/sweep', tags=['sweep']) + +# Track running sweep task +_sweep_task: Optional[asyncio.Task] = None +_last_sweep_result: Optional[dict] = None + + +class SweepRequest(BaseModel): + agents: list[str] = [] # empty = all active agents + + +class SweepStatusResponse(BaseModel): + running: bool + last_result: Optional[dict] = None + + +@router.post('', status_code=status.HTTP_202_ACCEPTED) +async def trigger_sweep(req: SweepRequest): + global _sweep_task + if _sweep_task and not _sweep_task.done(): + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail='Sweep already running', + ) + from ..app_state import get_sweep_coordinator + coordinator = get_sweep_coordinator() + if coordinator is None: + raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail='Sweep coordinator not ready') + + async def _run(): + global _last_sweep_result + try: + result = await coordinator.run_sweep(agents=req.agents or None) + _last_sweep_result = result + except Exception as exc: + logger.error('sweep run error: %s', exc) + _last_sweep_result = {'error': str(exc)} + + _sweep_task = asyncio.create_task(_run()) + return {'status': 'accepted', 'agents': req.agents or 'all'} + + +@router.get('/status', response_model=SweepStatusResponse) +async def sweep_status(): + running = _sweep_task is not None and not _sweep_task.done() + return SweepStatusResponse(running=running, last_result=_last_sweep_result) diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..e3a5409 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,59 @@ +version: '3.9' + +services: + agent-service: + build: + context: . + dockerfile: Dockerfile + container_name: activeblue-agent + restart: unless-stopped + env_file: .env + ports: + - '192.168.2.47:8001:8001' + depends_on: + agent-db: + condition: service_healthy + networks: + - activeblue-net + healthcheck: + test: ['CMD', 'curl', '-f', 'http://localhost:8001/health'] + interval: 30s + timeout: 10s + retries: 3 + start_period: 20s + logging: + driver: json-file + options: + max-size: '50m' + max-file: '5' + + agent-db: + image: postgres:15-alpine + container_name: activeblue-agent-db + restart: unless-stopped + environment: + POSTGRES_DB: ${POSTGRES_DB:-activeblue_ai} + POSTGRES_USER: ${POSTGRES_USER:-activeblue} + POSTGRES_PASSWORD: ${POSTGRES_PASSWORD} + volumes: + - agent-db-data:/var/lib/postgresql/data + networks: + - activeblue-net + healthcheck: + test: ['CMD-SHELL', 'pg_isready -U ${POSTGRES_USER:-activeblue} -d ${POSTGRES_DB:-activeblue_ai}'] + interval: 10s + timeout: 5s + retries: 5 + logging: + driver: json-file + options: + max-size: '20m' + max-file: '3' + +volumes: + agent-db-data: + +networks: + activeblue-net: + name: activeblue-net + external: false diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..f003d9e --- /dev/null +++ b/requirements.txt @@ -0,0 +1,11 @@ +fastapi==0.115.0 +uvicorn[standard]==0.30.6 +pydantic==2.9.2 +pydantic-settings==2.5.2 +asyncpg==0.29.0 +anthropic==0.34.2 +httpx==0.27.2 +alembic==1.13.3 +sqlalchemy[asyncio]==2.0.35 +json-log-formatter==0.5.2 +python-dotenv==1.0.1