feat(service): add FastAPI agent service with 5 routers and Docker setup

- config.py: pydantic-settings with all env vars, privacy mode, per-agent overrides
- app_state.py: global singletons (pool, master agent, registry, llm_router, sweep)
- main.py: FastAPI lifespan startup — DB pool, LLM router, Odoo client, agents, master
- routers/dispatch.py: POST /dispatch with rate limiting and webhook secret auth
- routers/approval.py: GET /approval/pending, POST /approval/respond
- routers/registry.py: GET/POST /registry/agents, POST /registry/backend overrides
- routers/sweep.py: POST /sweep trigger, GET /sweep/status
- routers/health.py: GET /health, GET /health/detailed (DB/Odoo/Ollama checks)
- requirements.txt: pinned deps (fastapi, uvicorn, asyncpg, anthropic, alembic)
- Dockerfile: python:3.11-slim, single uvicorn worker
- docker-compose.yml: agent-service + postgres:15, bound to 192.168.2.47:8001

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
ActiveBlue Build
2026-04-12 17:54:28 -04:00
parent dab6354d09
commit 430ab966b2
12 changed files with 908 additions and 0 deletions

View File

View File

@@ -0,0 +1,89 @@
from __future__ import annotations
import logging
from typing import Optional
from fastapi import APIRouter, HTTPException, status
from pydantic import BaseModel
logger = logging.getLogger(__name__)
router = APIRouter(prefix='/approval', tags=['approval'])
class ApprovalRequest(BaseModel):
directive_id: str
approved: bool
approver_id: str
note: Optional[str] = None
class PendingApproval(BaseModel):
directive_id: str
agent: str
action: str
description: str
created_at: str
context: dict = {}
@router.get('/pending', response_model=list[PendingApproval])
async def list_pending():
from ..app_state import get_db_pool
pool = get_db_pool()
if pool is None:
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail='DB not ready')
async with pool.acquire(timeout=10) as conn:
rows = await conn.fetch(
'SELECT directive_id, agent_name, action_type, description, created_at, context_data '
'FROM ab_directive_log WHERE status = $1 ORDER BY created_at ASC',
'pending_approval',
)
return [
PendingApproval(
directive_id=str(r['directive_id']),
agent=r['agent_name'] or '',
action=r['action_type'] or '',
description=r['description'] or '',
created_at=str(r['created_at']),
context=r['context_data'] or {},
)
for r in rows
]
@router.post('/respond', status_code=status.HTTP_200_OK)
async def respond_approval(req: ApprovalRequest):
from ..app_state import get_db_pool, get_master_agent
pool = get_db_pool()
if pool is None:
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail='DB not ready')
async with pool.acquire(timeout=10) as conn:
row = await conn.fetchrow(
'SELECT directive_id, status FROM ab_directive_log WHERE directive_id = $1',
req.directive_id,
)
if not row:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='Directive not found')
if row['status'] != 'pending_approval':
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f'Directive is not pending approval (status={row["status"]})',
)
new_status = 'approved' if req.approved else 'rejected'
await conn.execute(
'UPDATE ab_directive_log SET status=$1, approver_id=$2, approval_note=$3, updated_at=NOW() '
'WHERE directive_id=$4',
new_status, req.approver_id, req.note, req.directive_id,
)
logger.info('Directive %s %s by %s', req.directive_id, new_status, req.approver_id)
if req.approved:
master = get_master_agent()
if master:
try:
await master.resume_directive(req.directive_id)
except Exception as exc:
logger.error('resume_directive failed %s: %s', req.directive_id, exc)
return {'directive_id': req.directive_id, 'status': new_status}

View File

@@ -0,0 +1,102 @@
from __future__ import annotations
import asyncio
import hashlib
import hmac
import logging
import time
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Request, status
from pydantic import BaseModel, Field
from ..config import get_settings
logger = logging.getLogger(__name__)
router = APIRouter(prefix='/dispatch', tags=['dispatch'])
# In-memory rate limit store: {user_id: [timestamp, ...]}
_rate_limit_store: dict[str, list[float]] = {}
class DispatchRequest(BaseModel):
user_id: str = Field(..., description='Odoo user ID or session identifier')
message: str = Field(..., description='User natural-language message')
context: dict = Field(default_factory=dict, description='Optional context (partner_id, etc.)')
session_id: Optional[str] = Field(None, description='Conversation session ID')
class DispatchResponse(BaseModel):
directive_id: str
reply: str
agent_reports: list[dict] = []
escalations: list[str] = []
actions_taken: list[dict] = []
session_id: Optional[str] = None
def _verify_webhook_secret(request: Request) -> None:
settings = get_settings()
secret = settings.webhook_secret
if not secret:
return
sig = request.headers.get('X-ActiveBlue-Signature', '')
if not sig:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Missing signature')
def _check_rate_limit(user_id: str) -> None:
settings = get_settings()
limit = settings.dispatch_rate_limit_per_user
now = time.monotonic()
window = 60.0
timestamps = _rate_limit_store.get(user_id, [])
timestamps = [t for t in timestamps if now - t < window]
if len(timestamps) >= limit:
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail=f'Rate limit exceeded: {limit} requests/minute',
)
timestamps.append(now)
_rate_limit_store[user_id] = timestamps
@router.post('', response_model=DispatchResponse)
async def dispatch(req: DispatchRequest, request: Request):
_verify_webhook_secret(request)
_check_rate_limit(req.user_id)
from ..app_state import get_master_agent
master = get_master_agent()
if master is None:
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail='Agent service not ready')
settings = get_settings()
timeout = settings.directive_timeout_minutes * 60
try:
response = await asyncio.wait_for(
master.handle_message(
user_id=req.user_id,
message=req.message,
context=req.context,
session_id=req.session_id,
),
timeout=timeout,
)
except asyncio.TimeoutError:
raise HTTPException(
status_code=status.HTTP_504_GATEWAY_TIMEOUT,
detail=f'Directive timed out after {settings.directive_timeout_minutes}m',
)
except Exception as exc:
logger.exception('dispatch error user=%s: %s', req.user_id, exc)
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(exc))
return DispatchResponse(
directive_id=response.directive_id,
reply=response.reply,
agent_reports=[r.dict() if hasattr(r, 'dict') else r for r in response.agent_reports],
escalations=response.escalations,
actions_taken=response.actions_taken,
session_id=req.session_id,
)

View File

@@ -0,0 +1,85 @@
from __future__ import annotations
import asyncio
import logging
import time
from fastapi import APIRouter
from pydantic import BaseModel
logger = logging.getLogger(__name__)
router = APIRouter(prefix='/health', tags=['health'])
_start_time = time.time()
class HealthResponse(BaseModel):
status: str
uptime_seconds: float
class DetailedHealthResponse(BaseModel):
status: str
uptime_seconds: float
db: str
odoo: str
ollama: str
master_agent: str
privacy_mode: str
@router.get('', response_model=HealthResponse)
async def health():
return HealthResponse(status='ok', uptime_seconds=round(time.time() - _start_time, 1))
@router.get('/detailed', response_model=DetailedHealthResponse)
async def health_detailed():
from ..app_state import get_db_pool, get_master_agent, get_llm_router
from ..config import get_settings
uptime = round(time.time() - _start_time, 1)
settings = get_settings()
# DB check
db_status = 'unavailable'
pool = get_db_pool()
if pool:
try:
async with pool.acquire(timeout=5) as conn:
await conn.fetchval('SELECT 1')
db_status = 'ok'
except Exception as exc:
db_status = f'error: {exc}'
# Odoo check
odoo_status = 'unavailable'
master = get_master_agent()
if master and hasattr(master, '_odoo'):
try:
await asyncio.wait_for(master._odoo.ping(), timeout=5)
odoo_status = 'ok'
except Exception as exc:
odoo_status = f'error: {exc}'
# Ollama check
ollama_status = 'unavailable'
llm_router = get_llm_router()
if llm_router and hasattr(llm_router, '_ollama'):
try:
await asyncio.wait_for(llm_router._ollama.ping(), timeout=5)
ollama_status = 'ok'
except Exception as exc:
ollama_status = f'error: {exc}'
master_status = 'ok' if master is not None else 'unavailable'
overall = 'ok' if all(s == 'ok' for s in [db_status, master_status]) else 'degraded'
return DetailedHealthResponse(
status=overall,
uptime_seconds=uptime,
db=db_status,
odoo=odoo_status,
ollama=ollama_status,
master_agent=master_status,
privacy_mode=settings.llm_privacy_mode,
)

View File

@@ -0,0 +1,98 @@
from __future__ import annotations
import logging
from typing import Optional
from fastapi import APIRouter, HTTPException, status
from pydantic import BaseModel
logger = logging.getLogger(__name__)
router = APIRouter(prefix='/registry', tags=['registry'])
class AgentInfo(BaseModel):
name: str
domain: str
active: bool
backend: str = 'ollama'
last_seen: Optional[str] = None
class BackendOverride(BaseModel):
agent_name: str
backend: str # ollama | claude
set_by: str
note: Optional[str] = None
@router.get('/agents', response_model=list[AgentInfo])
async def list_agents():
from ..app_state import get_agent_registry, get_llm_router
registry = get_agent_registry()
llm_router = get_llm_router()
if registry is None:
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail='Registry not ready')
agents = registry.get_all()
result = []
for agent in agents:
backend = 'ollama'
if llm_router:
try:
backend = await llm_router.get_backend(agent['name'])
except Exception:
pass
result.append(AgentInfo(
name=agent['name'],
domain=agent.get('domain', ''),
active=agent.get('active', True),
backend=backend,
last_seen=agent.get('last_seen'),
))
return result
@router.post('/sync', status_code=status.HTTP_200_OK)
async def sync_registry():
from ..app_state import get_agent_registry
registry = get_agent_registry()
if registry is None:
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail='Registry not ready')
try:
count = await registry.sync()
return {'synced': count}
except Exception as exc:
logger.error('registry sync failed: %s', exc)
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(exc))
@router.post('/backend', status_code=status.HTTP_200_OK)
async def set_backend_override(req: BackendOverride):
from ..app_state import get_llm_router
llm_router = get_llm_router()
if llm_router is None:
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail='LLM router not ready')
if req.backend not in ('ollama', 'claude'):
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='backend must be ollama or claude')
try:
await llm_router.set_backend_override(
caller=req.agent_name,
backend=req.backend,
set_by=req.set_by,
note=req.note,
)
return {'agent': req.agent_name, 'backend': req.backend}
except Exception as exc:
logger.error('set_backend_override failed: %s', exc)
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(exc))
@router.delete('/backend/{agent_name}', status_code=status.HTTP_200_OK)
async def reset_backend_override(agent_name: str):
from ..app_state import get_llm_router
llm_router = get_llm_router()
if llm_router is None:
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail='LLM router not ready')
try:
await llm_router.reset_backend_override(caller=agent_name)
return {'agent': agent_name, 'reset': True}
except Exception as exc:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(exc))

View File

@@ -0,0 +1,55 @@
from __future__ import annotations
import asyncio
import logging
from typing import Optional
from fastapi import APIRouter, HTTPException, status
from pydantic import BaseModel
logger = logging.getLogger(__name__)
router = APIRouter(prefix='/sweep', tags=['sweep'])
# Track running sweep task
_sweep_task: Optional[asyncio.Task] = None
_last_sweep_result: Optional[dict] = None
class SweepRequest(BaseModel):
agents: list[str] = [] # empty = all active agents
class SweepStatusResponse(BaseModel):
running: bool
last_result: Optional[dict] = None
@router.post('', status_code=status.HTTP_202_ACCEPTED)
async def trigger_sweep(req: SweepRequest):
global _sweep_task
if _sweep_task and not _sweep_task.done():
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail='Sweep already running',
)
from ..app_state import get_sweep_coordinator
coordinator = get_sweep_coordinator()
if coordinator is None:
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail='Sweep coordinator not ready')
async def _run():
global _last_sweep_result
try:
result = await coordinator.run_sweep(agents=req.agents or None)
_last_sweep_result = result
except Exception as exc:
logger.error('sweep run error: %s', exc)
_last_sweep_result = {'error': str(exc)}
_sweep_task = asyncio.create_task(_run())
return {'status': 'accepted', 'agents': req.agents or 'all'}
@router.get('/status', response_model=SweepStatusResponse)
async def sweep_status():
running = _sweep_task is not None and not _sweep_task.done()
return SweepStatusResponse(running=running, last_result=_last_sweep_result)