Initial commit: avc-phone-ai codebase + CLAUDE.md
This commit is contained in:
218
server.py
Normal file
218
server.py
Normal file
@@ -0,0 +1,218 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Twilio-facing web server for the AVC phone agent.
|
||||
|
||||
Two endpoints, both reached by Twilio over your public Traefik domain:
|
||||
|
||||
POST /voice -> returns TwiML telling Twilio to open a bidirectional Media Stream
|
||||
back to wss://<PUBLIC_HOST>/ws?token=<STREAM_TOKEN>
|
||||
WS /ws -> Twilio's Media Stream. We check the stream token, read the opening
|
||||
'start' event for the SIDs, then hand the socket to the pipeline.
|
||||
|
||||
Security:
|
||||
- POST /voice is authenticated with Twilio's X-Twilio-Signature (HMAC-SHA1 over the
|
||||
public URL + sorted POST params, keyed by the API Key Secret). Enforced whenever
|
||||
TWILIO_API_KEY_SECRET is set; set TWILIO_VALIDATE=false to bypass for local testing.
|
||||
- WS /ws can't carry an X-Twilio-Signature usefully, so we gate it with a shared
|
||||
STREAM_TOKEN embedded in the wss URL we hand Twilio in the TwiML.
|
||||
|
||||
Inbound only. Run behind Traefik (TLS terminated there); this app listens plain HTTP
|
||||
on $PORT. See README for the Twilio number + Traefik wiring.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import base64
|
||||
import hashlib
|
||||
import hmac
|
||||
import json
|
||||
import os
|
||||
import secrets
|
||||
|
||||
from fastapi import FastAPI, Request, WebSocket
|
||||
from fastapi.responses import HTMLResponse
|
||||
from loguru import logger
|
||||
|
||||
from bot import run_call
|
||||
from pipecat.serializers.twilio import TwilioFrameSerializer
|
||||
|
||||
# Public hostname Twilio dials back into (your Traefik domain), e.g. phone.example.com
|
||||
PUBLIC_HOST = os.environ.get("PUBLIC_HOST", "CHANGE-ME.example.com")
|
||||
PORT = int(os.environ.get("PORT", "8200"))
|
||||
# Bind localhost by default: nginx terminates TLS and proxies in from 127.0.0.1, so the
|
||||
# app needn't be exposed on the LAN. Set BIND_HOST=0.0.0.0 only if a remote proxy needs it.
|
||||
BIND_HOST = os.environ.get("BIND_HOST", "127.0.0.1")
|
||||
|
||||
# Twilio REST creds — let the serializer auto-hang-up the carrier leg on EndFrame,
|
||||
# and validate inbound webhook signatures.
|
||||
TWILIO_ACCOUNT_SID = os.environ.get("TWILIO_ACCOUNT_SID")
|
||||
# Standard API Key (scoped to this app, revocable independently) instead of the account
|
||||
# master Auth Token. The Secret is used both for HMAC webhook-signature validation and as
|
||||
# the serializer credential for auto-hang-up.
|
||||
TWILIO_API_KEY_SID = os.environ.get("TWILIO_API_KEY_SID")
|
||||
TWILIO_API_KEY_SECRET = os.environ.get("TWILIO_API_KEY_SECRET")
|
||||
# Signature validation is ON by default when the API key secret exists; explicit opt-out.
|
||||
TWILIO_VALIDATE = os.environ.get("TWILIO_VALIDATE", "true").lower() not in ("false", "0", "no")
|
||||
|
||||
# Shared secret embedded in the Media Stream wss URL to gate /ws. Auto-generated if
|
||||
# unset (fine for a single process), but set it in .env for stability across restarts.
|
||||
STREAM_TOKEN = os.environ.get("STREAM_TOKEN") or secrets.token_urlsafe(24)
|
||||
|
||||
# Max simultaneous live calls. Each call holds an Ollama context on the 16GB GPU and
|
||||
# Ollama serializes generation, so cap this to protect call quality.
|
||||
# Over-cap callers hear BUSY_MESSAGE and are hung up — existing calls are never degraded.
|
||||
MAX_CONCURRENT_CALLS = int(os.environ.get("MAX_CONCURRENT_CALLS", "2"))
|
||||
BUSY_MESSAGE = os.environ.get(
|
||||
"BUSY_MESSAGE",
|
||||
"Thank you for calling Advanced Vision Care. All of our lines are busy right now. "
|
||||
"Please call back in a few minutes. Goodbye.",
|
||||
)
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
# Live count of active /ws pipelines (the real GPU consumers), guarded by a lock.
|
||||
_active_calls = 0
|
||||
_active_lock = asyncio.Lock()
|
||||
|
||||
|
||||
async def _reserve_call_slot() -> bool:
|
||||
"""Atomically take a call slot. Returns False if at capacity."""
|
||||
global _active_calls
|
||||
async with _active_lock:
|
||||
if _active_calls >= MAX_CONCURRENT_CALLS:
|
||||
return False
|
||||
_active_calls += 1
|
||||
return True
|
||||
|
||||
|
||||
async def _release_call_slot():
|
||||
global _active_calls
|
||||
async with _active_lock:
|
||||
_active_calls = max(0, _active_calls - 1)
|
||||
|
||||
|
||||
def _twilio_signature_ok(url: str, params: dict, header_sig: str) -> bool:
|
||||
"""Recompute Twilio's request signature and compare in constant time.
|
||||
|
||||
Algorithm (Twilio docs): take the full public URL, append each POST param as
|
||||
key+value sorted by key, HMAC-SHA1 with the API Key Secret, base64-encode.
|
||||
"""
|
||||
if not (TWILIO_API_KEY_SECRET and header_sig):
|
||||
return False
|
||||
payload = url + "".join(f"{k}{params[k]}" for k in sorted(params))
|
||||
digest = hmac.new(TWILIO_API_KEY_SECRET.encode(), payload.encode("utf-8"), hashlib.sha1).digest()
|
||||
expected = base64.b64encode(digest).decode()
|
||||
return hmac.compare_digest(expected, header_sig)
|
||||
|
||||
|
||||
@app.get("/health")
|
||||
async def health():
|
||||
return {
|
||||
"status": "ok",
|
||||
"public_host": PUBLIC_HOST,
|
||||
"validate": TWILIO_VALIDATE and bool(TWILIO_API_KEY_SECRET),
|
||||
"active_calls": _active_calls,
|
||||
"max_calls": MAX_CONCURRENT_CALLS,
|
||||
}
|
||||
|
||||
|
||||
@app.post("/voice")
|
||||
async def voice(request: Request):
|
||||
"""TwiML: connect the call to our Media Stream WebSocket (bidirectional)."""
|
||||
form = dict(await request.form())
|
||||
if TWILIO_VALIDATE and TWILIO_API_KEY_SECRET:
|
||||
# Validate against the PUBLIC url Twilio actually signed, not the internal one.
|
||||
public_url = f"https://{PUBLIC_HOST}/voice"
|
||||
sig = request.headers.get("X-Twilio-Signature", "")
|
||||
if not _twilio_signature_ok(public_url, form, sig):
|
||||
logger.warning("Rejected /voice: bad or missing X-Twilio-Signature")
|
||||
return HTMLResponse(status_code=403, content="forbidden")
|
||||
elif not TWILIO_API_KEY_SECRET:
|
||||
logger.warning("/voice signature validation DISABLED (no TWILIO_API_KEY_SECRET set)")
|
||||
|
||||
caller = form.get("From", "") # caller-ID; passed through for appointment callback
|
||||
|
||||
# Capacity gate: if all slots are busy, speak the busy message and hang up here —
|
||||
# before any GPU work — so in-progress calls are never degraded. (A reservation is
|
||||
# taken at /ws, tied to the socket lifecycle; this is the live read of that count.)
|
||||
if _active_calls >= MAX_CONCURRENT_CALLS:
|
||||
logger.info(f"At capacity ({_active_calls}/{MAX_CONCURRENT_CALLS}) — returning busy")
|
||||
busy = f"""<?xml version="1.0" encoding="UTF-8"?>
|
||||
<Response>
|
||||
<Say>{BUSY_MESSAGE}</Say>
|
||||
<Hangup/>
|
||||
</Response>"""
|
||||
return HTMLResponse(content=busy, media_type="application/xml")
|
||||
|
||||
# NOTE: <Connect><Stream> is bidirectional (agent can speak back). <Start><Stream>
|
||||
# would be one-way (listen only) — do not use that here.
|
||||
# Token passed as a <Parameter> (Twilio does NOT preserve a query string on the
|
||||
# wss URL); it arrives in the /ws 'start' message's customParameters.
|
||||
twiml = f"""<?xml version="1.0" encoding="UTF-8"?>
|
||||
<Response>
|
||||
<Connect>
|
||||
<Stream url="wss://{PUBLIC_HOST}/ws">
|
||||
<Parameter name="token" value="{STREAM_TOKEN}" />
|
||||
<Parameter name="caller" value="{caller}" />
|
||||
</Stream>
|
||||
</Connect>
|
||||
</Response>"""
|
||||
return HTMLResponse(content=twiml, media_type="application/xml")
|
||||
|
||||
|
||||
@app.websocket("/ws")
|
||||
async def media_stream(websocket: WebSocket):
|
||||
# The stream token rides in the TwiML <Parameter>, which Twilio delivers inside the
|
||||
# 'start' message's customParameters — so we must accept the socket to read it, then
|
||||
# validate + capacity-gate before doing any real work.
|
||||
await websocket.accept()
|
||||
|
||||
call_sid = None
|
||||
reserved = False
|
||||
try:
|
||||
# Twilio sends a 'connected' frame, then a 'start' frame with SIDs + params.
|
||||
msgs = websocket.iter_text()
|
||||
await msgs.__anext__() # 'connected'
|
||||
start = json.loads(await msgs.__anext__()) # 'start'
|
||||
start_data = start["start"]
|
||||
|
||||
token = (start_data.get("customParameters") or {}).get("token")
|
||||
if token != STREAM_TOKEN:
|
||||
logger.warning("Rejected /ws: bad or missing stream token")
|
||||
await websocket.close(code=1008) # policy violation
|
||||
return
|
||||
|
||||
# Capacity gate (hard safety net for the /voice→/ws race).
|
||||
if not await _reserve_call_slot():
|
||||
logger.warning(f"/ws over capacity ({_active_calls}/{MAX_CONCURRENT_CALLS}) — closing")
|
||||
await websocket.close(code=1013) # try again later
|
||||
return
|
||||
reserved = True
|
||||
|
||||
stream_sid = start_data["streamSid"]
|
||||
call_sid = start_data["callSid"]
|
||||
caller_number = (start_data.get("customParameters") or {}).get("caller") or None
|
||||
logger.info(
|
||||
f"Media stream start: call={call_sid} stream={stream_sid} caller={caller_number} "
|
||||
f"({_active_calls}/{MAX_CONCURRENT_CALLS} slots in use)"
|
||||
)
|
||||
|
||||
serializer = TwilioFrameSerializer(
|
||||
stream_sid=stream_sid,
|
||||
call_sid=call_sid,
|
||||
account_sid=TWILIO_ACCOUNT_SID,
|
||||
auth_token=TWILIO_API_KEY_SECRET,
|
||||
)
|
||||
await run_call(websocket, serializer, caller_number=caller_number, call_sid=call_sid)
|
||||
except Exception:
|
||||
logger.exception("Call pipeline error")
|
||||
finally:
|
||||
if reserved:
|
||||
await _release_call_slot()
|
||||
logger.info(f"Call ended: {call_sid} ({_active_calls}/{MAX_CONCURRENT_CALLS} slots in use)")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import uvicorn
|
||||
|
||||
logger.info(f"AVC phone agent on {BIND_HOST}:{PORT} | public={PUBLIC_HOST} | "
|
||||
f"sig_validation={'on' if (TWILIO_VALIDATE and TWILIO_API_KEY_SECRET) else 'OFF'}")
|
||||
uvicorn.run(app, host=BIND_HOST, port=PORT)
|
||||
Reference in New Issue
Block a user