Files
odoo-ai/agent_service/routers/dispatch.py
Carlos Garcia 4cb94b18f1 fix(agent): align /dispatch with MasterAgent.handle_message signature
The router was calling handle_message(user_id, message, context, session_id)
but MasterAgent accepts (user_id, channel_id, message, directive_id) and
returns MasterResponse{response, status, ...} with no .reply or
.agent_reports fields. Discuss DMs to the bot crashed with TypeError.

Now the router:
- Derives directive_id from session_id (or generates one)
- Pulls channel_id out of req.context
- Maps MasterResponse.response -> DispatchResponse.reply
- Returns an empty agent_reports list (the field is reserved for future use;
  per-agent reports aren't part of MasterResponse)
2026-04-24 23:06:24 -04:00

107 lines
3.5 KiB
Python

from __future__ import annotations
import asyncio
import hashlib
import hmac
import logging
import time
import uuid
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Request, status
from pydantic import BaseModel, Field
from ..config import get_settings
logger = logging.getLogger(__name__)
router = APIRouter(prefix='/dispatch', tags=['dispatch'])
# In-memory rate limit store: {user_id: [timestamp, ...]}
_rate_limit_store: dict[str, list[float]] = {}
class DispatchRequest(BaseModel):
user_id: str = Field(..., description='Odoo user ID or session identifier')
message: str = Field(..., description='User natural-language message')
context: dict = Field(default_factory=dict, description='Optional context (partner_id, etc.)')
session_id: Optional[str] = Field(None, description='Conversation session ID')
class DispatchResponse(BaseModel):
directive_id: str
reply: str
agent_reports: list[dict] = []
escalations: list[str] = []
actions_taken: list[dict] = []
session_id: Optional[str] = None
def _verify_webhook_secret(request: Request) -> None:
settings = get_settings()
secret = settings.webhook_secret
if not secret:
return
sig = request.headers.get('X-ActiveBlue-Signature', '')
if not sig:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Missing signature')
def _check_rate_limit(user_id: str) -> None:
settings = get_settings()
limit = settings.dispatch_rate_limit_per_user
now = time.monotonic()
window = 60.0
timestamps = _rate_limit_store.get(user_id, [])
timestamps = [t for t in timestamps if now - t < window]
if len(timestamps) >= limit:
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail=f'Rate limit exceeded: {limit} requests/minute',
)
timestamps.append(now)
_rate_limit_store[user_id] = timestamps
@router.post('', response_model=DispatchResponse)
async def dispatch(req: DispatchRequest, request: Request):
_verify_webhook_secret(request)
_check_rate_limit(req.user_id)
from ..app_state import get_master_agent
master = get_master_agent()
if master is None:
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail='Agent service not ready')
settings = get_settings()
timeout = settings.directive_timeout_minutes * 60
directive_id = req.session_id or uuid.uuid4().hex
channel_id = req.context.get('channel_id') if isinstance(req.context, dict) else None
try:
response = await asyncio.wait_for(
master.handle_message(
user_id=req.user_id,
channel_id=channel_id,
message=req.message,
directive_id=directive_id,
),
timeout=timeout,
)
except asyncio.TimeoutError:
raise HTTPException(
status_code=status.HTTP_504_GATEWAY_TIMEOUT,
detail=f'Directive timed out after {settings.directive_timeout_minutes}m',
)
except Exception as exc:
logger.exception('dispatch error user=%s: %s', req.user_id, exc)
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(exc))
return DispatchResponse(
directive_id=response.directive_id,
reply=response.response,
agent_reports=[],
escalations=response.escalations,
actions_taken=response.actions_taken,
session_id=req.session_id,
)