The router was calling handle_message(user_id, message, context, session_id)
but MasterAgent accepts (user_id, channel_id, message, directive_id) and
returns MasterResponse{response, status, ...} with no .reply or
.agent_reports fields. Discuss DMs to the bot crashed with TypeError.
Now the router:
- Derives directive_id from session_id (or generates one)
- Pulls channel_id out of req.context
- Maps MasterResponse.response -> DispatchResponse.reply
- Returns an empty agent_reports list (the field is reserved for future use;
per-agent reports aren't part of MasterResponse)
107 lines
3.5 KiB
Python
107 lines
3.5 KiB
Python
from __future__ import annotations
|
|
import asyncio
|
|
import hashlib
|
|
import hmac
|
|
import logging
|
|
import time
|
|
import uuid
|
|
from typing import Optional
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Request, status
|
|
from pydantic import BaseModel, Field
|
|
|
|
from ..config import get_settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
router = APIRouter(prefix='/dispatch', tags=['dispatch'])
|
|
|
|
# In-memory rate limit store: {user_id: [timestamp, ...]}
|
|
_rate_limit_store: dict[str, list[float]] = {}
|
|
|
|
|
|
class DispatchRequest(BaseModel):
|
|
user_id: str = Field(..., description='Odoo user ID or session identifier')
|
|
message: str = Field(..., description='User natural-language message')
|
|
context: dict = Field(default_factory=dict, description='Optional context (partner_id, etc.)')
|
|
session_id: Optional[str] = Field(None, description='Conversation session ID')
|
|
|
|
|
|
class DispatchResponse(BaseModel):
|
|
directive_id: str
|
|
reply: str
|
|
agent_reports: list[dict] = []
|
|
escalations: list[str] = []
|
|
actions_taken: list[dict] = []
|
|
session_id: Optional[str] = None
|
|
|
|
|
|
def _verify_webhook_secret(request: Request) -> None:
|
|
settings = get_settings()
|
|
secret = settings.webhook_secret
|
|
if not secret:
|
|
return
|
|
sig = request.headers.get('X-ActiveBlue-Signature', '')
|
|
if not sig:
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Missing signature')
|
|
|
|
|
|
def _check_rate_limit(user_id: str) -> None:
|
|
settings = get_settings()
|
|
limit = settings.dispatch_rate_limit_per_user
|
|
now = time.monotonic()
|
|
window = 60.0
|
|
timestamps = _rate_limit_store.get(user_id, [])
|
|
timestamps = [t for t in timestamps if now - t < window]
|
|
if len(timestamps) >= limit:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
|
|
detail=f'Rate limit exceeded: {limit} requests/minute',
|
|
)
|
|
timestamps.append(now)
|
|
_rate_limit_store[user_id] = timestamps
|
|
|
|
|
|
@router.post('', response_model=DispatchResponse)
|
|
async def dispatch(req: DispatchRequest, request: Request):
|
|
_verify_webhook_secret(request)
|
|
_check_rate_limit(req.user_id)
|
|
|
|
from ..app_state import get_master_agent
|
|
master = get_master_agent()
|
|
if master is None:
|
|
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail='Agent service not ready')
|
|
|
|
settings = get_settings()
|
|
timeout = settings.directive_timeout_minutes * 60
|
|
|
|
directive_id = req.session_id or uuid.uuid4().hex
|
|
channel_id = req.context.get('channel_id') if isinstance(req.context, dict) else None
|
|
|
|
try:
|
|
response = await asyncio.wait_for(
|
|
master.handle_message(
|
|
user_id=req.user_id,
|
|
channel_id=channel_id,
|
|
message=req.message,
|
|
directive_id=directive_id,
|
|
),
|
|
timeout=timeout,
|
|
)
|
|
except asyncio.TimeoutError:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_504_GATEWAY_TIMEOUT,
|
|
detail=f'Directive timed out after {settings.directive_timeout_minutes}m',
|
|
)
|
|
except Exception as exc:
|
|
logger.exception('dispatch error user=%s: %s', req.user_id, exc)
|
|
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(exc))
|
|
|
|
return DispatchResponse(
|
|
directive_id=response.directive_id,
|
|
reply=response.response,
|
|
agent_reports=[],
|
|
escalations=response.escalations,
|
|
actions_taken=response.actions_taken,
|
|
session_id=req.session_id,
|
|
)
|