Calls were dropping right after answer with "CUDA failed with error out of memory". Cause: each call constructed a new HintedWhisperSTTService -> new ctranslate2 WhisperModel on the GPU, and that VRAM was never released when the call ended. Over ~13 calls the python process grew to 9.7GB; with the pinned LLM (6GB) the 16GB GPU filled (14 MiB free) and Whisper load failed on every call. Fix: cache one WhisperModel per (model,device,compute) in _WHISPER_MODEL_CACHE and reuse it across all calls; bake the fixed hotwords into the shared model's transcribe() once (drops the racy per-call monkey-patch). VRAM now constant (~6GB LLM + ~1.5GB Whisper). Verified: two instances share one model object; GPU back to 6.0/16GB used after restart. Documented the VRAM budget. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
743 lines
38 KiB
Python
743 lines
38 KiB
Python
#!/usr/bin/env python3
|
|
"""AVC optometry phone agent — the Pipecat pipeline for a single inbound call.
|
|
|
|
Same VAD -> STT -> LLM -> TTS loop as pipecat-run/bot.py, but the ends are swapped
|
|
for telephony: audio arrives/leaves as 8 kHz mu-law over a Twilio Media Stream
|
|
(WebSocket), decoded by TwilioFrameSerializer. STT runs on the GPU; the LLM is the
|
|
local `activeblue-avc` fine-tune via Ollama; TTS is local Kokoro.
|
|
|
|
This module just builds + runs the pipeline for one connected call. server.py owns
|
|
the FastAPI/TwiML/WebSocket side and calls run_call() once per call.
|
|
"""
|
|
|
|
import asyncio
|
|
import os
|
|
|
|
import re
|
|
import time
|
|
|
|
from loguru import logger
|
|
|
|
from pipecat.adapters.schemas.function_schema import FunctionSchema
|
|
from pipecat.adapters.schemas.tools_schema import ToolsSchema
|
|
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
|
from pipecat.audio.vad.vad_analyzer import VADParams
|
|
from pipecat.frames.frames import (
|
|
BotStartedSpeakingFrame,
|
|
BotStoppedSpeakingFrame,
|
|
EndFrame,
|
|
EndTaskFrame,
|
|
Frame,
|
|
InputAudioRawFrame,
|
|
LLMFullResponseEndFrame,
|
|
LLMTextFrame,
|
|
TTSSpeakFrame,
|
|
UserStartedSpeakingFrame,
|
|
)
|
|
from pipecat.pipeline.pipeline import Pipeline
|
|
from pipecat.pipeline.runner import PipelineRunner
|
|
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
|
from pipecat.processors.aggregators.llm_context import LLMContext
|
|
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
|
|
from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor
|
|
from pipecat.processors.audio.vad_processor import VADProcessor
|
|
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
|
from pipecat.serializers.twilio import TwilioFrameSerializer
|
|
from pipecat.services.anthropic.llm import AnthropicLLMService
|
|
from pipecat.services.kokoro.tts import KokoroTTSService
|
|
from pipecat.services.ollama.llm import OLLamaLLMService
|
|
from pipecat.services.whisper.stt import WhisperSTTService
|
|
from pipecat.transports.websocket.fastapi import (
|
|
FastAPIWebsocketParams,
|
|
FastAPIWebsocketTransport,
|
|
)
|
|
|
|
from practice import practice_summary
|
|
|
|
# ── Config (env-overridable) ─────────────────────────────────────────────────
|
|
HERE = os.path.dirname(os.path.abspath(__file__))
|
|
# Reuse the Kokoro model files already downloaded by the pipecat-run project.
|
|
MODEL_DIR = os.environ.get("KOKORO_MODEL_DIR", "/home/tocmo0nlord/pipecat-run/models")
|
|
|
|
OLLAMA_MODEL = os.environ.get("OLLAMA_MODEL", "activeblue-avc:latest")
|
|
OLLAMA_URL = os.environ.get("OLLAMA_URL", "http://127.0.0.1:11434/v1")
|
|
# Swappable LLM provider: "ollama" (local) or "anthropic" (Claude API). Same universal
|
|
# LLMContext drives both — only the service construction differs (see build_llm_service).
|
|
LLM_PROVIDER = os.environ.get("LLM_PROVIDER", "ollama").lower()
|
|
ANTHROPIC_API_KEY = os.environ.get("ANTHROPIC_API_KEY", "")
|
|
# Defaults to the most capable model. For low-latency PHONE voice, set ANTHROPIC_MODEL to
|
|
# claude-haiku-4-5 (fastest) or claude-sonnet-4-6 (balance) — see notes in build_llm_service.
|
|
ANTHROPIC_MODEL = os.environ.get("ANTHROPIC_MODEL", "claude-opus-4-8")
|
|
# In-call function-calling: AUTO by provider — ON for Claude (reliable tool calls → real-time
|
|
# Odoo booking), OFF for local Ollama (llama3.1:8b over-calls / leaks JSON). An explicit
|
|
# ENABLE_TOOLS env overrides the auto choice either way.
|
|
_enable_tools_env = os.environ.get("ENABLE_TOOLS")
|
|
ENABLE_TOOLS = (
|
|
_enable_tools_env.lower() in ("1", "true", "yes")
|
|
if _enable_tools_env is not None
|
|
else (LLM_PROVIDER == "anthropic")
|
|
)
|
|
LLM_TEMPERATURE = float(os.environ.get("LLM_TEMPERATURE", "0.3"))
|
|
LLM_MAX_TOKENS = int(os.environ.get("LLM_MAX_TOKENS", "160"))
|
|
KOKORO_VOICE = os.environ.get("KOKORO_VOICE", "af_heart")
|
|
WHISPER_MODEL = os.environ.get("WHISPER_MODEL", "medium") # tiny|base|small|medium
|
|
WHISPER_DEVICE = os.environ.get("WHISPER_DEVICE", "cuda") # cuda for the 5080
|
|
WHISPER_COMPUTE = os.environ.get("WHISPER_COMPUTE", "float16")
|
|
# Bias transcription toward our domain vocabulary (office cities + optometry terms) so
|
|
# 8 kHz telephony audio doesn't turn "Hialeah" into "high allele" or "eye exam" into "hire".
|
|
WHISPER_HOTWORDS = os.environ.get(
|
|
"WHISPER_HOTWORDS",
|
|
"Advanced Vision Care, eye exam, annual exam, appointment, optometry, contact lens, "
|
|
"Hialeah, Kendall, Tamarac, Pembroke Pines, Lauderdale Lakes, Miami Gardens, Boca Raton",
|
|
)
|
|
|
|
# Twilio sends 8 kHz mu-law on the wire, but faster-whisper assumes any numpy array is
|
|
# 16 kHz — so we run the PIPELINE at 16 kHz and let TwilioFrameSerializer resample to/from
|
|
# the 8 kHz wire. Running the pipeline at 8 kHz makes Whisper hear 2x-speed audio and
|
|
# transcribe nothing. (Silero VAD + Kokoro are happy at 16 kHz too.)
|
|
WIRE_SAMPLE_RATE = 8000 # Twilio mu-law on the wire (serializer handles this)
|
|
PIPELINE_SAMPLE_RATE = 16000 # internal rate Whisper/VAD actually need
|
|
|
|
# VAD tuning. Defaults (confidence 0.7 / min_volume 0.6) are desktop-mic values that can
|
|
# miss short/quiet 8 kHz telephony utterances like "yes" — loosen them for the phone.
|
|
# VAD is kept sensitive so a quick/quiet "yes" isn't missed (a caller had to repeat it). This
|
|
# is safe because HalfDuplexGate gates out the agent's echo while it speaks, so sensitive VAD
|
|
# doesn't cause echo false-triggers — it only listens hard during the caller's own turn.
|
|
VAD_CONFIDENCE = float(os.environ.get("VAD_CONFIDENCE", "0.5"))
|
|
VAD_MIN_VOLUME = float(os.environ.get("VAD_MIN_VOLUME", "0.15"))
|
|
VAD_START_SECS = float(os.environ.get("VAD_START_SECS", "0.1"))
|
|
VAD_STOP_SECS = float(os.environ.get("VAD_STOP_SECS", "0.5"))
|
|
# Half-duplex: ignore inbound audio while the agent is speaking (+ this tail in seconds)
|
|
# so the agent's own voice echoing back the phone line can't trigger a false barge-in that
|
|
# cancels its reply (= caller hears silence). Set HALF_DUPLEX=false to allow barge-in.
|
|
HALF_DUPLEX = os.environ.get("HALF_DUPLEX", "true").lower() not in ("false", "0", "no")
|
|
ECHO_TAIL_SECS = float(os.environ.get("ECHO_TAIL_SECS", "0.25"))
|
|
# Silence watchdog: if the caller goes quiet after the agent speaks, re-prompt instead of
|
|
# dead-waiting (a missed/clipped answer otherwise hangs the call). After MAX re-prompts with
|
|
# no response, close gracefully. SILENCE_REPROMPT_SECS must be > HANGUP_DELAY_SECS so a real
|
|
# goodbye hangs up before the watchdog fires.
|
|
SILENCE_WATCHDOG = os.environ.get("SILENCE_WATCHDOG", "true").lower() not in ("false", "0", "no")
|
|
SILENCE_REPROMPT_SECS = float(os.environ.get("SILENCE_REPROMPT_SECS", "7.0"))
|
|
MAX_REPROMPTS = int(os.environ.get("MAX_REPROMPTS", "2"))
|
|
# Record each call to a stereo WAV (caller = left, agent = right) for review/debugging.
|
|
RECORD_CALLS = os.environ.get("RECORD_CALLS", "true").lower() not in ("false", "0", "no")
|
|
RECORDINGS_DIR = os.environ.get("RECORDINGS_DIR", os.path.join(HERE, "recordings"))
|
|
|
|
# Agent persona name — purely for warmth; change/remove freely.
|
|
AGENT_NAME = os.environ.get("AGENT_NAME", "Sofia")
|
|
# How the name should be SPOKEN. Kokoro reads all-caps "AVA" as letters ("A-V-A"), so we
|
|
# respell it as a word for TTS only (logs/Odoo keep AGENT_NAME). Override to taste, e.g.
|
|
# AGENT_NAME_SPOKEN=Eva for an "EE-vuh" sound.
|
|
AGENT_NAME_SPOKEN = os.environ.get("AGENT_NAME_SPOKEN", "Ava")
|
|
# Grace period after the agent finishes the goodbye before we drop the carrier leg, so
|
|
# the caller isn't cut off mid-word. The hang-up itself (EndTaskFrame -> auto_hang_up)
|
|
# is unchanged — this only delays it.
|
|
HANGUP_DELAY_SECS = float(os.environ.get("HANGUP_DELAY_SECS", "4.0"))
|
|
|
|
SYSTEM_PROMPT = (
|
|
f"You are {AGENT_NAME}, a warm, friendly receptionist for Advanced Vision Care, an "
|
|
"optometry practice with eight offices in South Florida. You are on a real phone call, so "
|
|
"talk like a helpful human being: natural, relaxed, and genuinely conversational. Keep every "
|
|
"reply to ONE short sentence — two at the very most, never a paragraph. Speak in English. Say "
|
|
"numbers, dates, and times as words a person would say.\n\n"
|
|
"Your job is to answer questions and take appointment requests. Be warm but DIRECT and "
|
|
"efficient: when the caller greets you, get to the point and lead the call by asking "
|
|
"questions. Never re-ask for something they already told you, and keep each turn to one "
|
|
"short question or statement. Work through the call in THIS order:\n"
|
|
" 1. REASON FIRST — find out what they are calling about (the reason for the visit, or "
|
|
"their question). If it is only a question, answer it.\n"
|
|
" 2. LOCATION — ask which city or area is most convenient, then confirm the matching "
|
|
"office (see the office rule below).\n"
|
|
" 3. CALLER INFO — get their FULL name (first and last; if they give only a first name, "
|
|
"ask their last name). From this point on, address the caller by their name. Then ask their "
|
|
"insurance (log only — see below) and their preferred day and time (in their own words — "
|
|
"see the date rule below).\n"
|
|
" 4. CONFIRM PHONE (no yes needed) — near the end, STATE the callback number back in one "
|
|
"line and invite a CORRECTION ONLY, exactly like: 'I have your number as <the number spelled "
|
|
"out below>; if that's not the best number, just let me know.' Do NOT ask a yes/no question, "
|
|
"do NOT ask permission, and do NOT wait for them to say 'yes' — flow straight into the wrap-up. "
|
|
"Only act on the phone number if they give you a different one. Don't bring it up before this.\n"
|
|
" 5. WRAP UP — recap the booking as a REQUEST in one warm sentence (for example, 'I've "
|
|
"noted your request to come in tomorrow afternoon at our Kendall office'), make clear a "
|
|
"staff member will call back to CONFIRM it, then ASK IF THERE IS ANYTHING ELSE you can help "
|
|
"them with. Only once they are all set, give the closing below.\n"
|
|
"KEEP MOMENTUM: until the booking is complete, ALWAYS end your turn with the next question "
|
|
"you still need answered. Never reply with only an acknowledgment and then stop — for "
|
|
"example, after noting insurance, in the SAME turn go straight on to ask the preferred day "
|
|
"and time. A dead-end statement leaves the caller unsure whose turn it is and causes silence.\n\n"
|
|
"Stay truthful and within your limits:\n"
|
|
"- Use ONLY the facts below for addresses, phone numbers, insurance, and services. Never "
|
|
"make any of these up.\n"
|
|
"- OFFICE SELECTION: ask what city or area is most convenient. When the caller names a place "
|
|
"that matches one of our offices (for example they say 'Kendall' and we have a Kendall "
|
|
"office), simply confirm THAT office and move on — do NOT offer, compare, or mention any "
|
|
"other office or city, and do NOT ask them to choose between offices. Only if their area "
|
|
"matches no office should you name the single nearest one. List offices only if asked.\n"
|
|
"- INSURANCE — log only, never promise, never guess: ask open-endedly what insurance they "
|
|
"have (for example, 'What insurance do you have?'). Do NOT read out or suggest plan names "
|
|
"from the list — let the caller tell you. Capture ONLY what the caller actually says; NEVER "
|
|
"fill in, complete, or guess their plan, and never put words in their mouth. If you don't "
|
|
"clearly hear the plan, ask them to repeat it. Do not promise, confirm, or deny coverage or "
|
|
"any treatment based on their insurance, even if the plan is one we list, and NEVER say 'we "
|
|
"accept' or 'we take' a plan — just note it and say our staff will verify their coverage when "
|
|
"they call back.\n"
|
|
"- DATES — just take down the day and time the caller asks for in their OWN words (e.g. "
|
|
"'next Monday', 'the fifth'). Do NOT work out, state, or correct the calendar date, and NEVER "
|
|
"argue about what today's date is. Tell them staff will confirm the exact date and time on the "
|
|
"callback.\n"
|
|
"- You CANNOT see availability or book/confirm anything. Never say a slot is open or "
|
|
"available, never offer to 'check availability', and NEVER tell the caller their appointment "
|
|
"is 'booked', 'scheduled', 'set', or 'confirmed' — not even in the recap. It is always a "
|
|
"REQUEST: say you've NOTED it and a staff member will call back to confirm the date and time.\n"
|
|
"- Hours are not published — say they vary by office and staff will confirm; never give "
|
|
"specific hours.\n"
|
|
"- You don't give medical advice and can't transfer calls. If the caller mentions an eye "
|
|
"problem, just note it as the reason and say a staff member or doctor will follow up.\n"
|
|
"- If you're not sure you heard something, simply ask them to repeat it.\n"
|
|
"- CLOSING — the word 'Goodbye' ENDS the call, so guard it carefully. You MUST first ask "
|
|
"'Is there anything else I can help you with?' and hear the caller say they need nothing "
|
|
"more. NEVER say 'Goodbye' in the same turn as confirming details or the phone number, and "
|
|
"never before that anything-else question. Once they confirm they're all set, give a brief "
|
|
"warm closing ending with 'Goodbye'.\n\n"
|
|
"PRACTICE FACTS:\n" + practice_summary()
|
|
)
|
|
|
|
|
|
def _build_tools() -> ToolsSchema:
|
|
# Only the booking action is a tool. Practice facts already live in the system prompt,
|
|
# so no get_practice_info tool (avoids needless calls/latency). callback_number is NOT
|
|
# required — we have the caller-ID and inject it in the handler.
|
|
return ToolsSchema(
|
|
standard_tools=[
|
|
FunctionSchema(
|
|
name="record_appointment_request",
|
|
description=(
|
|
"Record the caller's appointment request once you have their name and at "
|
|
"least the office/city and reason. Call this when the caller wants to book "
|
|
"a visit; staff will call back to confirm the exact time."
|
|
),
|
|
properties={
|
|
"patient_name": {"type": "string", "description": "Caller's full name"},
|
|
"location": {"type": "string", "description": "Which office/city the caller wants, e.g. Hialeah, Kendall, Tamarac"},
|
|
"reason": {"type": "string", "description": "Reason for the visit, e.g. annual exam, broken glasses, eye pain"},
|
|
"preferred_time": {"type": "string", "description": "Preferred day/time in the caller's words, if given"},
|
|
},
|
|
required=["patient_name"],
|
|
),
|
|
]
|
|
)
|
|
|
|
|
|
class EndCallProcessor(FrameProcessor):
|
|
"""Lets the agent hang up AND guarantees the callback number is confirmed once.
|
|
|
|
Sits between the LLM and the TTS: it sees reply text (LLMTextFrame, downstream) and the
|
|
upstream BotStoppedSpeakingFrame. On a closing ('goodbye'/'adiós') it waits for TTS to
|
|
finish, pauses HANGUP_DELAY_SECS so the caller isn't clipped, then pushes EndTaskFrame
|
|
(TwilioFrameSerializer auto_hang_up drops the call).
|
|
|
|
Deterministic phone confirmation: the prompt asks the agent to read the callback number
|
|
back, but the 8B skips it ~half the time. So if a closing is reached and the agent never
|
|
spoke the number this call (`phone_marker` not seen in its replies), we suppress the
|
|
hang-up and inject a scripted confirmation turn first — guaranteeing it happens exactly
|
|
once (the agent's own readback satisfies the gate, so no double-ask in the common case)."""
|
|
|
|
_CLOSINGS = ("goodbye", "good-bye", "good bye", "adiós", "adios", "hasta luego")
|
|
# Only force phone confirmation when a booking was actually underway (not info-only calls).
|
|
_BOOKING_KWS = ("appointment", "schedule", "book", "insurance", "what day", "what time",
|
|
"come in", "preferred")
|
|
|
|
def __init__(self, phone_confirm_line: str | None = None, phone_marker: str | None = None):
|
|
super().__init__()
|
|
self._buf = ""
|
|
self._should_end = False
|
|
self._end_task = None
|
|
self._phone_confirm_line = phone_confirm_line
|
|
self._phone_marker = (phone_marker or "").lower()
|
|
# Nothing to confirm (no caller ID) → treat as already handled.
|
|
self._phone_confirmed = not phone_confirm_line
|
|
self._assistant_seen = ""
|
|
self._pending_phone_inject = False
|
|
|
|
@classmethod
|
|
def _is_closing(cls, text: str) -> bool:
|
|
t = (text or "").lower()
|
|
return any(c in t for c in cls._CLOSINGS)
|
|
|
|
async def _hang_up_after_delay(self):
|
|
await asyncio.sleep(HANGUP_DELAY_SECS)
|
|
logger.info(f"{AGENT_NAME} ending task / hanging up")
|
|
try:
|
|
await self.push_frame(EndTaskFrame(), FrameDirection.UPSTREAM)
|
|
except Exception:
|
|
logger.exception("EndTaskFrame push failed (pipeline already ending?)")
|
|
|
|
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
|
await super().process_frame(frame, direction)
|
|
if isinstance(frame, LLMTextFrame):
|
|
self._buf += frame.text
|
|
self._assistant_seen += frame.text.lower()
|
|
if self._phone_marker and self._phone_marker in self._assistant_seen:
|
|
self._phone_confirmed = True # the agent read the number back itself
|
|
elif isinstance(frame, LLMFullResponseEndFrame):
|
|
if self._is_closing(self._buf):
|
|
booking = any(k in self._assistant_seen for k in self._BOOKING_KWS)
|
|
if self._phone_confirmed or not booking:
|
|
self._should_end = True
|
|
logger.info(f"{AGENT_NAME} signalled closing -- will hang up "
|
|
f"{HANGUP_DELAY_SECS:.0f}s after she finishes speaking")
|
|
else:
|
|
# Booking call closing without the number confirmed — do it deterministically.
|
|
self._pending_phone_inject = True
|
|
logger.info(f"{AGENT_NAME} reached closing w/o phone confirmation -- injecting it")
|
|
self._buf = ""
|
|
elif isinstance(frame, BotStoppedSpeakingFrame):
|
|
if self._pending_phone_inject:
|
|
self._pending_phone_inject = False
|
|
self._phone_confirmed = True
|
|
await self.push_frame(TTSSpeakFrame(self._phone_confirm_line), FrameDirection.DOWNSTREAM)
|
|
elif self._should_end:
|
|
self._should_end = False
|
|
# Schedule the teardown so we don't block the pipeline during the grace pause.
|
|
if self._end_task is None:
|
|
self._end_task = asyncio.create_task(self._hang_up_after_delay())
|
|
await self.push_frame(frame, direction)
|
|
|
|
|
|
class AudioHeartbeat(FrameProcessor):
|
|
"""Diagnostic: logs how many inbound audio frames arrive every ~5s. If this keeps
|
|
ticking but VAD never fires, the issue is VAD/threshold; if it drops to 0 after a
|
|
turn, inbound audio stalled at the transport. Cheap, leave it on while stabilizing."""
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self._n = 0
|
|
self._t = time.time()
|
|
|
|
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
|
await super().process_frame(frame, direction)
|
|
if isinstance(frame, InputAudioRawFrame):
|
|
self._n += 1
|
|
now = time.time()
|
|
if now - self._t >= 5:
|
|
logger.info(f"[audio-in] {self._n} frames in last {now - self._t:.0f}s")
|
|
self._n = 0
|
|
self._t = now
|
|
await self.push_frame(frame, direction)
|
|
|
|
|
|
class HalfDuplexGate(FrameProcessor):
|
|
"""Drops inbound audio while the agent is speaking (plus ECHO_TAIL_SECS after it stops).
|
|
|
|
In this pipecat build interruptions are VAD-driven and always on (PipelineParams has no
|
|
allow_interruptions). On a phone line the agent's own TTS echoes back and the VAD reads it
|
|
as the caller speaking → it broadcasts an interruption that cancels the agent mid-reply, so
|
|
the caller hears silence. Sitting BEFORE the VAD, this gate withholds inbound audio frames
|
|
while the bot is speaking, so its echo never reaches the VAD. Trade-off: the caller can't
|
|
barge in mid-utterance (fine for short receptionist replies). Bypass with HALF_DUPLEX=false."""
|
|
|
|
def __init__(self, tail_secs: float = 0.5):
|
|
super().__init__()
|
|
self._bot_speaking = False
|
|
self._reopen_at = 0.0
|
|
self._tail = tail_secs
|
|
|
|
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
|
await super().process_frame(frame, direction)
|
|
if isinstance(frame, BotStartedSpeakingFrame):
|
|
self._bot_speaking = True
|
|
elif isinstance(frame, BotStoppedSpeakingFrame):
|
|
self._bot_speaking = False
|
|
self._reopen_at = time.time() + self._tail
|
|
# Withhold caller audio while the bot speaks (+ echo tail) so echo can't barge in.
|
|
if isinstance(frame, InputAudioRawFrame) and (self._bot_speaking or time.time() < self._reopen_at):
|
|
return
|
|
await self.push_frame(frame, direction)
|
|
|
|
|
|
class SilenceWatchdog(FrameProcessor):
|
|
"""Re-prompts on caller silence instead of dead-waiting. After the agent stops speaking it
|
|
arms a timer; if the caller hasn't started speaking within `silence_secs`, it injects a
|
|
re-prompt ("are you still there?"). After `max_prompts` unanswered re-prompts it speaks a
|
|
graceful closing and ends the call. Any caller speech (UserStartedSpeakingFrame) resets it;
|
|
the agent speaking cancels it. Place AFTER EndCallProcessor so its injected lines go to TTS
|
|
and `silence_secs` > HANGUP_DELAY_SECS so a real goodbye hangs up before it fires."""
|
|
|
|
def __init__(self, silence_secs: float, max_prompts: int, reprompt_line: str, closing_line: str):
|
|
super().__init__()
|
|
self._silence_secs = silence_secs
|
|
self._max_prompts = max_prompts
|
|
self._reprompt_line = reprompt_line
|
|
self._closing_line = closing_line
|
|
self._timer = None
|
|
self._prompts = 0
|
|
self._bot_speaking = False
|
|
self._ending = False
|
|
|
|
def _cancel(self):
|
|
if self._timer and not self._timer.done():
|
|
self._timer.cancel()
|
|
self._timer = None
|
|
|
|
def _arm(self):
|
|
self._cancel()
|
|
self._timer = asyncio.create_task(self._fire())
|
|
|
|
async def _fire(self):
|
|
try:
|
|
await asyncio.sleep(self._silence_secs)
|
|
except asyncio.CancelledError:
|
|
return
|
|
if self._bot_speaking or self._ending:
|
|
return
|
|
if self._prompts >= self._max_prompts:
|
|
self._ending = True
|
|
logger.info("SilenceWatchdog: still silent after re-prompts -- closing the call")
|
|
await self.push_frame(TTSSpeakFrame(self._closing_line), FrameDirection.DOWNSTREAM)
|
|
else:
|
|
self._prompts += 1
|
|
logger.info(f"SilenceWatchdog: caller silent -- re-prompt #{self._prompts}")
|
|
await self.push_frame(TTSSpeakFrame(self._reprompt_line), FrameDirection.DOWNSTREAM)
|
|
|
|
async def _end_soon(self):
|
|
await asyncio.sleep(HANGUP_DELAY_SECS)
|
|
logger.info("SilenceWatchdog: ending task after silent close")
|
|
try:
|
|
await self.push_frame(EndTaskFrame(), FrameDirection.UPSTREAM)
|
|
except Exception:
|
|
logger.exception("watchdog EndTaskFrame push failed")
|
|
|
|
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
|
await super().process_frame(frame, direction)
|
|
if isinstance(frame, UserStartedSpeakingFrame):
|
|
self._prompts = 0 # caller engaged again — reset
|
|
self._cancel()
|
|
elif isinstance(frame, BotStartedSpeakingFrame):
|
|
self._bot_speaking = True
|
|
self._cancel()
|
|
elif isinstance(frame, BotStoppedSpeakingFrame):
|
|
self._bot_speaking = False
|
|
if self._ending:
|
|
asyncio.create_task(self._end_soon())
|
|
else:
|
|
self._arm() # start counting silence once the agent finishes
|
|
await self.push_frame(frame, direction)
|
|
|
|
|
|
# One shared WhisperModel per (model, device, compute) for the whole process. Loading a new
|
|
# model per call leaks GPU memory — ctranslate2 doesn't release VRAM when the call's service is
|
|
# dropped, so models accumulate and the GPU OOMs after a handful of calls. Sharing one keeps
|
|
# VRAM constant.
|
|
_WHISPER_MODEL_CACHE = {}
|
|
|
|
|
|
class HintedWhisperSTTService(WhisperSTTService):
|
|
"""WhisperSTTService that shares ONE WhisperModel across all calls (avoids the per-call
|
|
GPU-memory leak/OOM) and biases transcription toward domain vocabulary via faster-whisper
|
|
`hotwords`. Hotwords are a fixed domain list, so they're baked into the shared model's
|
|
transcribe() once at load — concurrency-safe (no per-call monkey-patch)."""
|
|
|
|
def __init__(self, *args, hotwords: str | None = None, **kwargs):
|
|
self._hotwords = hotwords # set BEFORE super().__init__ (it calls _load)
|
|
super().__init__(*args, **kwargs)
|
|
|
|
def _load(self):
|
|
key = (self._settings.model, self._device, self._compute_type)
|
|
model = _WHISPER_MODEL_CACHE.get(key)
|
|
if model is None:
|
|
super()._load() # base sets self._model
|
|
model = self._model
|
|
if self._hotwords: # bake hotwords in once (value, not self)
|
|
_real = model.transcribe
|
|
_hw = self._hotwords
|
|
|
|
def _patched(audio_arg, **kw):
|
|
kw.setdefault("hotwords", _hw)
|
|
return _real(audio_arg, **kw)
|
|
|
|
model.transcribe = _patched
|
|
_WHISPER_MODEL_CACHE[key] = model
|
|
logger.info(f"Loaded + cached shared Whisper model {key}")
|
|
else:
|
|
logger.info(f"Reusing shared Whisper model {key}")
|
|
self._model = model
|
|
|
|
|
|
# ── TTS number normalization ──────────────────────────────────────────────────
|
|
# Kokoro reads digit strings as cardinals with symbols spoken aloud, e.g. "983-4969"
|
|
# becomes "nine hundred eighty-three dash forty-nine sixty-nine". For a phone agent that
|
|
# reads back phone numbers, street numbers, and zips, that's unusable. We normalize the
|
|
# text right before synthesis (run_tts receives the full sentence) so phone numbers and
|
|
# long digit runs are spoken one digit at a time, regardless of what the model emitted.
|
|
_DIGIT_WORDS = {
|
|
"0": "zero", "1": "one", "2": "two", "3": "three", "4": "four",
|
|
"5": "five", "6": "six", "7": "seven", "8": "eight", "9": "nine",
|
|
}
|
|
_PHONE_RE = re.compile(r"(?:\+?1[\s.\-]?)?\(?\d{3}\)?[\s.\-]?\d{3}[\s.\-]?\d{4}")
|
|
_LONGNUM_RE = re.compile(r"\b\d{4,5}\b") # street numbers, zip codes
|
|
|
|
|
|
def _say_digits(s: str) -> str:
|
|
return " ".join(_DIGIT_WORDS[c] for c in s if c in _DIGIT_WORDS)
|
|
|
|
|
|
def _spoken_phone(number: str) -> str:
|
|
"""Phone number as grouped, digit-by-digit words: '+19735731671' ->
|
|
'nine seven three, five seven three, one six seven one'."""
|
|
d = re.sub(r"\D", "", number or "")
|
|
if len(d) == 11 and d[0] == "1": # drop US country code
|
|
d = d[1:]
|
|
if len(d) == 10: # group as area / prefix / line for natural cadence
|
|
return f"{_say_digits(d[:3])}, {_say_digits(d[3:6])}, {_say_digits(d[6:])}"
|
|
return _say_digits(d)
|
|
|
|
|
|
def _phone_to_words(m: re.Match) -> str:
|
|
return _spoken_phone(m.group(0))
|
|
|
|
|
|
def tts_normalize(text: str) -> str:
|
|
"""Make phone numbers, street numbers, and zips speak naturally (digit by digit), and
|
|
respell the all-caps agent name so it's said as a word, not letter-by-letter."""
|
|
if AGENT_NAME != AGENT_NAME_SPOKEN:
|
|
text = re.sub(rf"\b{re.escape(AGENT_NAME)}\b", AGENT_NAME_SPOKEN, text)
|
|
text = _PHONE_RE.sub(_phone_to_words, text)
|
|
text = _LONGNUM_RE.sub(lambda m: _say_digits(m.group(0)), text)
|
|
return text
|
|
|
|
|
|
class SpokenKokoroTTSService(KokoroTTSService):
|
|
"""KokoroTTSService that normalizes numbers to digit-by-digit speech before synthesis,
|
|
so phone numbers/addresses/zips are read naturally instead of as cardinals + 'dash'."""
|
|
|
|
async def run_tts(self, text: str, context_id: str):
|
|
async for frame in super().run_tts(tts_normalize(text), context_id):
|
|
yield frame
|
|
|
|
|
|
def build_llm_service():
|
|
"""Build the LLM service for the selected provider. The universal LLMContext +
|
|
aggregators work with either, so only this construction differs (true A/B swap)."""
|
|
if LLM_PROVIDER == "anthropic":
|
|
if not ANTHROPIC_API_KEY:
|
|
raise RuntimeError("LLM_PROVIDER=anthropic but ANTHROPIC_API_KEY is not set")
|
|
logger.info(f"LLM provider: anthropic ({ANTHROPIC_MODEL})")
|
|
# NOTE: Opus 4.8/4.7 reject temperature/top_p/top_k (HTTP 400), so we omit them —
|
|
# this keeps the default Opus model working. For low-latency phone voice, prefer
|
|
# claude-haiku-4-5 (fastest) or claude-sonnet-4-6 over Opus. enable_prompt_caching
|
|
# caches the system prompt + growing conversation prefix (helps multi-turn cost/latency).
|
|
return AnthropicLLMService(
|
|
api_key=ANTHROPIC_API_KEY,
|
|
settings=AnthropicLLMService.Settings(
|
|
model=ANTHROPIC_MODEL,
|
|
enable_prompt_caching=True,
|
|
max_tokens=LLM_MAX_TOKENS,
|
|
),
|
|
)
|
|
logger.info(f"LLM provider: ollama ({OLLAMA_MODEL})")
|
|
return OLLamaLLMService(
|
|
settings=OLLamaLLMService.Settings(
|
|
model=OLLAMA_MODEL,
|
|
temperature=LLM_TEMPERATURE,
|
|
max_tokens=LLM_MAX_TOKENS,
|
|
),
|
|
base_url=OLLAMA_URL,
|
|
)
|
|
|
|
|
|
async def run_agent(transport, caller_number=None, call_sid=None, do_capture=True):
|
|
"""Build + run the AVC voice agent on a given transport. Shared by the phone path
|
|
(Twilio Media Stream) and the browser path (WebRTC) — same prompt, model, voice, and
|
|
booking/hang-up logic; only the transport differs. do_capture writes the post-call
|
|
appointment to Odoo (on for phone; off for browser testing so it doesn't make cards)."""
|
|
stt = HintedWhisperSTTService(
|
|
settings=WhisperSTTService.Settings(model=WHISPER_MODEL),
|
|
device=WHISPER_DEVICE,
|
|
compute_type=WHISPER_COMPUTE,
|
|
hotwords=WHISPER_HOTWORDS,
|
|
)
|
|
llm = build_llm_service()
|
|
# In-call booking tool — only registered when ENABLE_TOOLS is on (auto: Claude yes,
|
|
# local Ollama no, since llama3.1:8b over-calls/leaks). The handler is a closure so it
|
|
# can stamp the verified caller-ID + call_sid onto the lead (the model never supplies a
|
|
# phone number — we don't ask for one). With tools on, this writes the Odoo lead IN-CALL,
|
|
# so the post-call extraction is skipped below to avoid a duplicate.
|
|
if ENABLE_TOOLS:
|
|
async def _record_appointment(params):
|
|
args = params.arguments or {}
|
|
if do_capture:
|
|
from practice import persist_appointment
|
|
persist_appointment({
|
|
"call_sid": call_sid,
|
|
"patient_name": args.get("patient_name"),
|
|
"callback_number": caller_number, # verified caller-ID, not model-supplied
|
|
"location": args.get("location"),
|
|
"reason": args.get("reason"),
|
|
"preferred_time": args.get("preferred_time"),
|
|
"source": "in_call_tool",
|
|
})
|
|
else:
|
|
logger.info(f"[capture off] would record appointment: {args.get('patient_name')} / {args.get('location')}")
|
|
await params.result_callback(
|
|
{"status": "recorded", "message": "Recorded — staff will call to confirm the time."}
|
|
)
|
|
|
|
llm.register_function("record_appointment_request", _record_appointment)
|
|
|
|
tts = SpokenKokoroTTSService(
|
|
model_path=os.path.join(MODEL_DIR, "kokoro-v1.0.onnx"),
|
|
voices_path=os.path.join(MODEL_DIR, "voices-v1.0.bin"),
|
|
settings=KokoroTTSService.Settings(voice=KOKORO_VOICE),
|
|
)
|
|
vad = VADProcessor(vad_analyzer=SileroVADAnalyzer(params=VADParams(
|
|
confidence=VAD_CONFIDENCE,
|
|
start_secs=VAD_START_SECS,
|
|
stop_secs=VAD_STOP_SECS,
|
|
min_volume=VAD_MIN_VOLUME,
|
|
)))
|
|
heartbeat = AudioHeartbeat()
|
|
gate = HalfDuplexGate(tail_secs=ECHO_TAIL_SECS) if HALF_DUPLEX else None
|
|
|
|
# Per-call system message = static prompt + the caller-ID number to confirm. Inject it
|
|
# ALREADY spelled out digit-by-digit so the model repeats clean words instead of mangling
|
|
# the raw digits (e.g. reading 197 as "one hundred ninety-seven").
|
|
if caller_number:
|
|
caller_line = (
|
|
f"\n\nCALLER ID: the caller's number on file, written so you read it digit by digit, "
|
|
f"is: {_spoken_phone(caller_number)}. Near the end, state it back and invite a "
|
|
"correction only ('...; if that's not the best number, just let me know.') — do NOT "
|
|
"ask a yes/no question or wait for a 'yes'. Only change it if they give a different "
|
|
"number. Do not say it any earlier in the call."
|
|
)
|
|
else:
|
|
caller_line = (
|
|
"\n\nCALLER ID: no number is available — near the end, ask the caller for the best "
|
|
"phone number to reach them."
|
|
)
|
|
system_content = SYSTEM_PROMPT + caller_line
|
|
context_kwargs = {"messages": [{"role": "system", "content": system_content}]}
|
|
if ENABLE_TOOLS:
|
|
context_kwargs["tools"] = _build_tools()
|
|
context = LLMContext(**context_kwargs)
|
|
agg = LLMContextAggregatorPair(context)
|
|
# Deterministic phone-confirmation safety net: if the agent reaches a closing without
|
|
# having read the caller-ID back, EndCallProcessor speaks this scripted line first.
|
|
if caller_number:
|
|
_spoken = _spoken_phone(caller_number)
|
|
phone_confirm_line = (
|
|
f"Also, I have your number as {_spoken}; if that's not the best number, just let me know."
|
|
)
|
|
phone_marker = _spoken.split(",")[0].strip() # e.g. "nine seven three"
|
|
else:
|
|
phone_confirm_line = phone_marker = None
|
|
endcall = EndCallProcessor(phone_confirm_line=phone_confirm_line, phone_marker=phone_marker)
|
|
|
|
watchdog = SilenceWatchdog(
|
|
silence_secs=SILENCE_REPROMPT_SECS,
|
|
max_prompts=MAX_REPROMPTS,
|
|
reprompt_line="I'm sorry, I didn't catch that — are you still there?",
|
|
closing_line="I'll let you go for now — please call us back anytime. Goodbye.",
|
|
) if SILENCE_WATCHDOG else None
|
|
|
|
# Stereo recorder (caller left / agent right) at the end so it captures what the system
|
|
# actually received + sent — for review and to debug silence with evidence, not guesses.
|
|
audiobuffer = AudioBufferProcessor(num_channels=2) if RECORD_CALLS else None
|
|
|
|
pipeline = Pipeline(
|
|
[
|
|
transport.input(),
|
|
heartbeat,
|
|
*( [gate] if gate else [] ), # half-duplex echo gate, before the VAD
|
|
vad,
|
|
stt,
|
|
agg.user(),
|
|
llm,
|
|
endcall,
|
|
*( [watchdog] if watchdog else [] ), # re-prompt on caller silence
|
|
tts,
|
|
transport.output(),
|
|
agg.assistant(),
|
|
*( [audiobuffer] if audiobuffer else [] ), # record both directions
|
|
]
|
|
)
|
|
|
|
task = PipelineTask(
|
|
pipeline,
|
|
params=PipelineParams(
|
|
audio_in_sample_rate=PIPELINE_SAMPLE_RATE,
|
|
audio_out_sample_rate=PIPELINE_SAMPLE_RATE,
|
|
allow_interruptions=True,
|
|
),
|
|
)
|
|
|
|
if audiobuffer:
|
|
os.makedirs(RECORDINGS_DIR, exist_ok=True)
|
|
|
|
@audiobuffer.event_handler("on_audio_data")
|
|
async def _on_audio_data(buf, audio, sample_rate, num_channels):
|
|
import wave
|
|
from datetime import datetime
|
|
if not audio:
|
|
return
|
|
fname = f"{datetime.now():%Y%m%d-%H%M%S}_{call_sid or 'web'}.wav"
|
|
path = os.path.join(RECORDINGS_DIR, fname)
|
|
with wave.open(path, "wb") as wf:
|
|
wf.setnchannels(num_channels)
|
|
wf.setsampwidth(2)
|
|
wf.setframerate(sample_rate)
|
|
wf.writeframes(audio)
|
|
logger.info(f"Saved call recording: {path} "
|
|
f"({len(audio)} bytes, {num_channels}ch @ {sample_rate}Hz)")
|
|
|
|
@transport.event_handler("on_client_connected")
|
|
async def on_client_connected(transport, client):
|
|
logger.info("Client connected -- greeting")
|
|
if audiobuffer:
|
|
await audiobuffer.start_recording()
|
|
await task.queue_frames(
|
|
[TTSSpeakFrame(
|
|
f"Thank you for calling Advanced Vision Care, this is {AGENT_NAME}. "
|
|
"How can I help you today?"
|
|
)]
|
|
)
|
|
|
|
@transport.event_handler("on_client_disconnected")
|
|
async def on_client_disconnected(transport, client):
|
|
logger.info("Client disconnected -- ending task")
|
|
if audiobuffer:
|
|
await audiobuffer.stop_recording()
|
|
await task.queue_frame(EndFrame())
|
|
|
|
runner = PipelineRunner(handle_sigint=False)
|
|
await runner.run(task)
|
|
|
|
# Call is over. Post-call extraction is the capture path ONLY when in-call tools are
|
|
# off (local Ollama). With tools on (Claude), the booking was already written in-call,
|
|
# so skip extraction to avoid a duplicate lead.
|
|
if do_capture and not ENABLE_TOOLS:
|
|
try:
|
|
from extract import extract_and_record
|
|
|
|
await extract_and_record(
|
|
context.messages, OLLAMA_URL, OLLAMA_MODEL,
|
|
call_sid=call_sid, caller_number=caller_number,
|
|
)
|
|
except Exception:
|
|
logger.exception("Post-call appointment extraction failed")
|
|
|
|
|
|
async def run_call(websocket, serializer: TwilioFrameSerializer, caller_number=None, call_sid=None):
|
|
"""Phone entrypoint: wrap the Twilio Media Stream in a transport, run the shared agent."""
|
|
transport = FastAPIWebsocketTransport(
|
|
websocket=websocket,
|
|
params=FastAPIWebsocketParams(
|
|
audio_in_enabled=True,
|
|
audio_out_enabled=True,
|
|
audio_in_sample_rate=PIPELINE_SAMPLE_RATE,
|
|
audio_out_sample_rate=PIPELINE_SAMPLE_RATE,
|
|
add_wav_header=False,
|
|
serializer=serializer,
|
|
),
|
|
)
|
|
await run_agent(transport, caller_number=caller_number, call_sid=call_sid, do_capture=True)
|