Files
avc-phone-ai/server.py
tocmo0nlord ba36ae6891 Log/surface the reason, pin LLM warm for latency, doc insurance rule
- Reason visibility: the reason WAS extracted ("disintegrated eyes") but only
  lived in the Odoo description note. Add it to the post-call log line and to
  the Odoo lead title so it's visible at a glance.
- Latency: split the timing — Whisper is ~0.1s, latency is LLM-side. The ~3s
  tail was cold model reloads after Ollama's keep-alive expired. server.py now
  warms + pins the model on startup (keep_alive=-1, ollama ps UNTIL=Forever),
  removing cold first-turn stalls. Whisper size left alone (not the bottleneck).
- CLAUDE.md: insurance rule (never suggest/guess the plan), latency note.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-27 04:24:10 +00:00

236 lines
9.7 KiB
Python

#!/usr/bin/env python3
"""Twilio-facing web server for the AVC phone agent.
Two endpoints, both reached by Twilio over your public Traefik domain:
POST /voice -> returns TwiML telling Twilio to open a bidirectional Media Stream
back to wss://<PUBLIC_HOST>/ws?token=<STREAM_TOKEN>
WS /ws -> Twilio's Media Stream. We check the stream token, read the opening
'start' event for the SIDs, then hand the socket to the pipeline.
Security:
- POST /voice is authenticated with Twilio's X-Twilio-Signature (HMAC-SHA1 over the
public URL + sorted POST params, keyed by the auth token). Enforced whenever
TWILIO_AUTH_TOKEN is set; set TWILIO_VALIDATE=false to bypass for local testing.
- WS /ws can't carry an X-Twilio-Signature usefully, so we gate it with a shared
STREAM_TOKEN embedded in the wss URL we hand Twilio in the TwiML.
Inbound only. Run behind Traefik (TLS terminated there); this app listens plain HTTP
on $PORT. See README for the Twilio number + Traefik wiring.
"""
import asyncio
import base64
import hashlib
import hmac
import json
import os
import secrets
from fastapi import FastAPI, Request, WebSocket
from fastapi.responses import HTMLResponse
from loguru import logger
from bot import run_call
from pipecat.serializers.twilio import TwilioFrameSerializer
# Public hostname Twilio dials back into (your Traefik domain), e.g. phone.example.com
PUBLIC_HOST = os.environ.get("PUBLIC_HOST", "CHANGE-ME.example.com")
PORT = int(os.environ.get("PORT", "8200"))
# Bind localhost by default: nginx terminates TLS and proxies in from 127.0.0.1, so the
# app needn't be exposed on the LAN. Set BIND_HOST=0.0.0.0 only if a remote proxy needs it.
BIND_HOST = os.environ.get("BIND_HOST", "127.0.0.1")
# Twilio REST creds — let the serializer auto-hang-up the carrier leg on EndFrame,
# and validate inbound webhook signatures.
TWILIO_ACCOUNT_SID = os.environ.get("TWILIO_ACCOUNT_SID")
TWILIO_AUTH_TOKEN = os.environ.get("TWILIO_AUTH_TOKEN")
# Signature validation is ON by default when an auth token exists; explicit opt-out.
TWILIO_VALIDATE = os.environ.get("TWILIO_VALIDATE", "true").lower() not in ("false", "0", "no")
# Shared secret embedded in the Media Stream wss URL to gate /ws. Auto-generated if
# unset (fine for a single process), but set it in .env for stability across restarts.
STREAM_TOKEN = os.environ.get("STREAM_TOKEN") or secrets.token_urlsafe(24)
# Max simultaneous live calls. Each call loads a Whisper model + an Ollama context on
# the 16GB GPU and Ollama serializes generation, so cap this to protect call quality.
# Over-cap callers hear BUSY_MESSAGE and are hung up — existing calls are never degraded.
MAX_CONCURRENT_CALLS = int(os.environ.get("MAX_CONCURRENT_CALLS", "2"))
BUSY_MESSAGE = os.environ.get(
"BUSY_MESSAGE",
"Thank you for calling Advanced Vision Care. All of our lines are busy right now. "
"Please call back in a few minutes. Goodbye.",
)
app = FastAPI()
@app.on_event("startup")
async def _warm_llm():
"""Pin the LLM in VRAM (keep_alive=-1) so the first turn of a call isn't a cold model
reload. Cold reloads were adding ~3s of dead air to the first reply; latency is otherwise
LLM-side (Whisper STT is ~0.1s). Best-effort — a failure here never blocks startup."""
import httpx
base = os.environ.get("OLLAMA_URL", "http://127.0.0.1:11434/v1").rstrip("/")
if base.endswith("/v1"):
base = base[:-3]
model = os.environ.get("OLLAMA_MODEL", "activeblue-avc:latest")
try:
async with httpx.AsyncClient(timeout=120) as c:
await c.post(f"{base}/api/generate",
json={"model": model, "prompt": "ok", "stream": False, "keep_alive": -1})
logger.info(f"Warmed + pinned Ollama model {model} (keep_alive=-1)")
except Exception as e:
logger.warning(f"LLM warmup failed (first call may be slow): {e!r}")
# Live count of active /ws pipelines (the real GPU consumers), guarded by a lock.
_active_calls = 0
_active_lock = asyncio.Lock()
async def _reserve_call_slot() -> bool:
"""Atomically take a call slot. Returns False if at capacity."""
global _active_calls
async with _active_lock:
if _active_calls >= MAX_CONCURRENT_CALLS:
return False
_active_calls += 1
return True
async def _release_call_slot():
global _active_calls
async with _active_lock:
_active_calls = max(0, _active_calls - 1)
def _twilio_signature_ok(url: str, params: dict, header_sig: str) -> bool:
"""Recompute Twilio's request signature and compare in constant time.
Algorithm (Twilio docs): take the full public URL, append each POST param as
key+value sorted by key, HMAC-SHA1 with the auth token, base64-encode.
"""
if not (TWILIO_AUTH_TOKEN and header_sig):
return False
payload = url + "".join(f"{k}{params[k]}" for k in sorted(params))
digest = hmac.new(TWILIO_AUTH_TOKEN.encode(), payload.encode("utf-8"), hashlib.sha1).digest()
expected = base64.b64encode(digest).decode()
return hmac.compare_digest(expected, header_sig)
@app.get("/health")
async def health():
return {
"status": "ok",
"public_host": PUBLIC_HOST,
"validate": TWILIO_VALIDATE and bool(TWILIO_AUTH_TOKEN),
"active_calls": _active_calls,
"max_calls": MAX_CONCURRENT_CALLS,
}
@app.post("/voice")
async def voice(request: Request):
"""TwiML: connect the call to our Media Stream WebSocket (bidirectional)."""
form = dict(await request.form())
if TWILIO_VALIDATE and TWILIO_AUTH_TOKEN:
# Validate against the PUBLIC url Twilio actually signed, not the internal one.
public_url = f"https://{PUBLIC_HOST}/voice"
sig = request.headers.get("X-Twilio-Signature", "")
if not _twilio_signature_ok(public_url, form, sig):
logger.warning("Rejected /voice: bad or missing X-Twilio-Signature")
return HTMLResponse(status_code=403, content="forbidden")
elif not TWILIO_AUTH_TOKEN:
logger.warning("/voice signature validation DISABLED (no TWILIO_AUTH_TOKEN set)")
caller = form.get("From", "") # caller-ID; passed through for appointment callback
# Capacity gate: if all slots are busy, speak the busy message and hang up here —
# before any GPU work — so in-progress calls are never degraded. (A reservation is
# taken at /ws, tied to the socket lifecycle; this is the live read of that count.)
if _active_calls >= MAX_CONCURRENT_CALLS:
logger.info(f"At capacity ({_active_calls}/{MAX_CONCURRENT_CALLS}) — returning busy")
busy = f"""<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Say>{BUSY_MESSAGE}</Say>
<Hangup/>
</Response>"""
return HTMLResponse(content=busy, media_type="application/xml")
# NOTE: <Connect><Stream> is bidirectional (agent can speak back). <Start><Stream>
# would be one-way (listen only) — do not use that here.
# Token passed as a <Parameter> (Twilio does NOT preserve a query string on the
# wss URL); it arrives in the /ws 'start' message's customParameters.
twiml = f"""<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Connect>
<Stream url="wss://{PUBLIC_HOST}/ws">
<Parameter name="token" value="{STREAM_TOKEN}" />
<Parameter name="caller" value="{caller}" />
</Stream>
</Connect>
</Response>"""
return HTMLResponse(content=twiml, media_type="application/xml")
@app.websocket("/ws")
async def media_stream(websocket: WebSocket):
# The stream token rides in the TwiML <Parameter>, which Twilio delivers inside the
# 'start' message's customParameters — so we must accept the socket to read it, then
# validate + capacity-gate before doing any real work.
await websocket.accept()
call_sid = None
reserved = False
try:
# Twilio sends a 'connected' frame, then a 'start' frame with SIDs + params.
msgs = websocket.iter_text()
await msgs.__anext__() # 'connected'
start = json.loads(await msgs.__anext__()) # 'start'
start_data = start["start"]
token = (start_data.get("customParameters") or {}).get("token")
if token != STREAM_TOKEN:
logger.warning("Rejected /ws: bad or missing stream token")
await websocket.close(code=1008) # policy violation
return
# Capacity gate (hard safety net for the /voice→/ws race).
if not await _reserve_call_slot():
logger.warning(f"/ws over capacity ({_active_calls}/{MAX_CONCURRENT_CALLS}) — closing")
await websocket.close(code=1013) # try again later
return
reserved = True
stream_sid = start_data["streamSid"]
call_sid = start_data["callSid"]
caller_number = (start_data.get("customParameters") or {}).get("caller") or None
logger.info(
f"Media stream start: call={call_sid} stream={stream_sid} caller={caller_number} "
f"({_active_calls}/{MAX_CONCURRENT_CALLS} slots in use)"
)
serializer = TwilioFrameSerializer(
stream_sid=stream_sid,
call_sid=call_sid,
account_sid=TWILIO_ACCOUNT_SID,
auth_token=TWILIO_AUTH_TOKEN,
)
await run_call(websocket, serializer, caller_number=caller_number, call_sid=call_sid)
except Exception:
logger.exception("Call pipeline error")
finally:
if reserved:
await _release_call_slot()
logger.info(f"Call ended: {call_sid} ({_active_calls}/{MAX_CONCURRENT_CALLS} slots in use)")
if __name__ == "__main__":
import uvicorn
logger.info(f"AVC phone agent on {BIND_HOST}:{PORT} | public={PUBLIC_HOST} | "
f"sig_validation={'on' if (TWILIO_VALIDATE and TWILIO_AUTH_TOKEN) else 'OFF'}")
uvicorn.run(app, host=BIND_HOST, port=PORT)