from __future__ import annotations import asyncio import hashlib import hmac import logging import time import uuid from typing import Optional from fastapi import APIRouter, Depends, HTTPException, Request, status from pydantic import BaseModel, Field from ..config import get_settings logger = logging.getLogger(__name__) router = APIRouter(prefix='/dispatch', tags=['dispatch']) # In-memory rate limit store: {user_id: [timestamp, ...]} _rate_limit_store: dict[str, list[float]] = {} class DispatchRequest(BaseModel): user_id: str = Field(..., description='Odoo user ID or session identifier') message: str = Field(..., description='User natural-language message') context: dict = Field(default_factory=dict, description='Optional context (partner_id, etc.)') session_id: Optional[str] = Field(None, description='Conversation session ID') class DispatchResponse(BaseModel): directive_id: str reply: str agent_reports: list[dict] = [] escalations: list[str] = [] actions_taken: list[dict] = [] session_id: Optional[str] = None def _verify_webhook_secret(request: Request) -> None: settings = get_settings() secret = settings.webhook_secret if not secret: return sig = request.headers.get('X-ActiveBlue-Signature', '') if not sig: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Missing signature') def _check_rate_limit(user_id: str) -> None: settings = get_settings() limit = settings.dispatch_rate_limit_per_user now = time.monotonic() window = 60.0 timestamps = _rate_limit_store.get(user_id, []) timestamps = [t for t in timestamps if now - t < window] if len(timestamps) >= limit: raise HTTPException( status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail=f'Rate limit exceeded: {limit} requests/minute', ) timestamps.append(now) _rate_limit_store[user_id] = timestamps @router.post('', response_model=DispatchResponse) async def dispatch(req: DispatchRequest, request: Request): _verify_webhook_secret(request) _check_rate_limit(req.user_id) from ..app_state import get_master_agent master = get_master_agent() if master is None: raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail='Agent service not ready') settings = get_settings() timeout = settings.directive_timeout_minutes * 60 directive_id = req.session_id or uuid.uuid4().hex channel_id = req.context.get('channel_id') if isinstance(req.context, dict) else None try: response = await asyncio.wait_for( master.handle_message( user_id=req.user_id, channel_id=channel_id, message=req.message, directive_id=directive_id, ), timeout=timeout, ) except asyncio.TimeoutError: raise HTTPException( status_code=status.HTTP_504_GATEWAY_TIMEOUT, detail=f'Directive timed out after {settings.directive_timeout_minutes}m', ) except Exception as exc: logger.exception('dispatch error user=%s: %s', req.user_id, exc) raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(exc)) return DispatchResponse( directive_id=response.directive_id, reply=response.response, agent_reports=[], escalations=response.escalations, actions_taken=response.actions_taken, session_id=req.session_id, )