Files
avc-phone-ai/server.py
tocmo0nlord 550550975f Capture symptom reasons; switch startup warmup to lifespan
- Reason extraction missed symptom-style reasons: a caller said "I'm actually
  blind" and the lead logged reason=None (it caught "disintegrated eyes" before
  but not this). Broadened the extractor's reason rule to capture the eye
  problem/symptom as the reason, not just visit types. Verified 3/3 -> "vision
  loss / blindness".
- server.py: move the LLM warmup/pin (keep_alive=-1) from the deprecated
  on_event("startup") to a lifespan handler — silences the FastAPI deprecation
  warning; model still shows ollama ps UNTIL=Forever.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-27 04:37:45 +00:00

236 lines
9.8 KiB
Python

#!/usr/bin/env python3
"""Twilio-facing web server for the AVC phone agent.
Two endpoints, both reached by Twilio over your public Traefik domain:
POST /voice -> returns TwiML telling Twilio to open a bidirectional Media Stream
back to wss://<PUBLIC_HOST>/ws?token=<STREAM_TOKEN>
WS /ws -> Twilio's Media Stream. We check the stream token, read the opening
'start' event for the SIDs, then hand the socket to the pipeline.
Security:
- POST /voice is authenticated with Twilio's X-Twilio-Signature (HMAC-SHA1 over the
public URL + sorted POST params, keyed by the auth token). Enforced whenever
TWILIO_AUTH_TOKEN is set; set TWILIO_VALIDATE=false to bypass for local testing.
- WS /ws can't carry an X-Twilio-Signature usefully, so we gate it with a shared
STREAM_TOKEN embedded in the wss URL we hand Twilio in the TwiML.
Inbound only. Run behind Traefik (TLS terminated there); this app listens plain HTTP
on $PORT. See README for the Twilio number + Traefik wiring.
"""
import asyncio
import base64
import hashlib
import hmac
import json
import os
import secrets
from contextlib import asynccontextmanager
import httpx
from fastapi import FastAPI, Request, WebSocket
from fastapi.responses import HTMLResponse
from loguru import logger
from bot import run_call
from pipecat.serializers.twilio import TwilioFrameSerializer
# Public hostname Twilio dials back into (your Traefik domain), e.g. phone.example.com
PUBLIC_HOST = os.environ.get("PUBLIC_HOST", "CHANGE-ME.example.com")
PORT = int(os.environ.get("PORT", "8200"))
# Bind localhost by default: nginx terminates TLS and proxies in from 127.0.0.1, so the
# app needn't be exposed on the LAN. Set BIND_HOST=0.0.0.0 only if a remote proxy needs it.
BIND_HOST = os.environ.get("BIND_HOST", "127.0.0.1")
# Twilio REST creds — let the serializer auto-hang-up the carrier leg on EndFrame,
# and validate inbound webhook signatures.
TWILIO_ACCOUNT_SID = os.environ.get("TWILIO_ACCOUNT_SID")
TWILIO_AUTH_TOKEN = os.environ.get("TWILIO_AUTH_TOKEN")
# Signature validation is ON by default when an auth token exists; explicit opt-out.
TWILIO_VALIDATE = os.environ.get("TWILIO_VALIDATE", "true").lower() not in ("false", "0", "no")
# Shared secret embedded in the Media Stream wss URL to gate /ws. Auto-generated if
# unset (fine for a single process), but set it in .env for stability across restarts.
STREAM_TOKEN = os.environ.get("STREAM_TOKEN") or secrets.token_urlsafe(24)
# Max simultaneous live calls. Each call loads a Whisper model + an Ollama context on
# the 16GB GPU and Ollama serializes generation, so cap this to protect call quality.
# Over-cap callers hear BUSY_MESSAGE and are hung up — existing calls are never degraded.
MAX_CONCURRENT_CALLS = int(os.environ.get("MAX_CONCURRENT_CALLS", "2"))
BUSY_MESSAGE = os.environ.get(
"BUSY_MESSAGE",
"Thank you for calling Advanced Vision Care. All of our lines are busy right now. "
"Please call back in a few minutes. Goodbye.",
)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""On startup, pin the LLM in VRAM (keep_alive=-1) so the first turn of a call isn't a cold
model reload. Cold reloads were adding ~3s of dead air to the first reply; latency is
otherwise LLM-side (Whisper STT is ~0.1s). Best-effort — a failure never blocks startup."""
base = os.environ.get("OLLAMA_URL", "http://127.0.0.1:11434/v1").rstrip("/")
if base.endswith("/v1"):
base = base[:-3]
model = os.environ.get("OLLAMA_MODEL", "activeblue-avc:latest")
try:
async with httpx.AsyncClient(timeout=120) as c:
await c.post(f"{base}/api/generate",
json={"model": model, "prompt": "ok", "stream": False, "keep_alive": -1})
logger.info(f"Warmed + pinned Ollama model {model} (keep_alive=-1)")
except Exception as e:
logger.warning(f"LLM warmup failed (first call may be slow): {e!r}")
yield
app = FastAPI(lifespan=lifespan)
# Live count of active /ws pipelines (the real GPU consumers), guarded by a lock.
_active_calls = 0
_active_lock = asyncio.Lock()
async def _reserve_call_slot() -> bool:
"""Atomically take a call slot. Returns False if at capacity."""
global _active_calls
async with _active_lock:
if _active_calls >= MAX_CONCURRENT_CALLS:
return False
_active_calls += 1
return True
async def _release_call_slot():
global _active_calls
async with _active_lock:
_active_calls = max(0, _active_calls - 1)
def _twilio_signature_ok(url: str, params: dict, header_sig: str) -> bool:
"""Recompute Twilio's request signature and compare in constant time.
Algorithm (Twilio docs): take the full public URL, append each POST param as
key+value sorted by key, HMAC-SHA1 with the auth token, base64-encode.
"""
if not (TWILIO_AUTH_TOKEN and header_sig):
return False
payload = url + "".join(f"{k}{params[k]}" for k in sorted(params))
digest = hmac.new(TWILIO_AUTH_TOKEN.encode(), payload.encode("utf-8"), hashlib.sha1).digest()
expected = base64.b64encode(digest).decode()
return hmac.compare_digest(expected, header_sig)
@app.get("/health")
async def health():
return {
"status": "ok",
"public_host": PUBLIC_HOST,
"validate": TWILIO_VALIDATE and bool(TWILIO_AUTH_TOKEN),
"active_calls": _active_calls,
"max_calls": MAX_CONCURRENT_CALLS,
}
@app.post("/voice")
async def voice(request: Request):
"""TwiML: connect the call to our Media Stream WebSocket (bidirectional)."""
form = dict(await request.form())
if TWILIO_VALIDATE and TWILIO_AUTH_TOKEN:
# Validate against the PUBLIC url Twilio actually signed, not the internal one.
public_url = f"https://{PUBLIC_HOST}/voice"
sig = request.headers.get("X-Twilio-Signature", "")
if not _twilio_signature_ok(public_url, form, sig):
logger.warning("Rejected /voice: bad or missing X-Twilio-Signature")
return HTMLResponse(status_code=403, content="forbidden")
elif not TWILIO_AUTH_TOKEN:
logger.warning("/voice signature validation DISABLED (no TWILIO_AUTH_TOKEN set)")
caller = form.get("From", "") # caller-ID; passed through for appointment callback
# Capacity gate: if all slots are busy, speak the busy message and hang up here —
# before any GPU work — so in-progress calls are never degraded. (A reservation is
# taken at /ws, tied to the socket lifecycle; this is the live read of that count.)
if _active_calls >= MAX_CONCURRENT_CALLS:
logger.info(f"At capacity ({_active_calls}/{MAX_CONCURRENT_CALLS}) — returning busy")
busy = f"""<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Say>{BUSY_MESSAGE}</Say>
<Hangup/>
</Response>"""
return HTMLResponse(content=busy, media_type="application/xml")
# NOTE: <Connect><Stream> is bidirectional (agent can speak back). <Start><Stream>
# would be one-way (listen only) — do not use that here.
# Token passed as a <Parameter> (Twilio does NOT preserve a query string on the
# wss URL); it arrives in the /ws 'start' message's customParameters.
twiml = f"""<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Connect>
<Stream url="wss://{PUBLIC_HOST}/ws">
<Parameter name="token" value="{STREAM_TOKEN}" />
<Parameter name="caller" value="{caller}" />
</Stream>
</Connect>
</Response>"""
return HTMLResponse(content=twiml, media_type="application/xml")
@app.websocket("/ws")
async def media_stream(websocket: WebSocket):
# The stream token rides in the TwiML <Parameter>, which Twilio delivers inside the
# 'start' message's customParameters — so we must accept the socket to read it, then
# validate + capacity-gate before doing any real work.
await websocket.accept()
call_sid = None
reserved = False
try:
# Twilio sends a 'connected' frame, then a 'start' frame with SIDs + params.
msgs = websocket.iter_text()
await msgs.__anext__() # 'connected'
start = json.loads(await msgs.__anext__()) # 'start'
start_data = start["start"]
token = (start_data.get("customParameters") or {}).get("token")
if token != STREAM_TOKEN:
logger.warning("Rejected /ws: bad or missing stream token")
await websocket.close(code=1008) # policy violation
return
# Capacity gate (hard safety net for the /voice→/ws race).
if not await _reserve_call_slot():
logger.warning(f"/ws over capacity ({_active_calls}/{MAX_CONCURRENT_CALLS}) — closing")
await websocket.close(code=1013) # try again later
return
reserved = True
stream_sid = start_data["streamSid"]
call_sid = start_data["callSid"]
caller_number = (start_data.get("customParameters") or {}).get("caller") or None
logger.info(
f"Media stream start: call={call_sid} stream={stream_sid} caller={caller_number} "
f"({_active_calls}/{MAX_CONCURRENT_CALLS} slots in use)"
)
serializer = TwilioFrameSerializer(
stream_sid=stream_sid,
call_sid=call_sid,
account_sid=TWILIO_ACCOUNT_SID,
auth_token=TWILIO_AUTH_TOKEN,
)
await run_call(websocket, serializer, caller_number=caller_number, call_sid=call_sid)
except Exception:
logger.exception("Call pipeline error")
finally:
if reserved:
await _release_call_slot()
logger.info(f"Call ended: {call_sid} ({_active_calls}/{MAX_CONCURRENT_CALLS} slots in use)")
if __name__ == "__main__":
import uvicorn
logger.info(f"AVC phone agent on {BIND_HOST}:{PORT} | public={PUBLIC_HOST} | "
f"sig_validation={'on' if (TWILIO_VALIDATE and TWILIO_AUTH_TOKEN) else 'OFF'}")
uvicorn.run(app, host=BIND_HOST, port=PORT)