#!/usr/bin/env python3 """AVC optometry phone agent — the Pipecat pipeline for a single inbound call. Same VAD -> STT -> LLM -> TTS loop as pipecat-run/bot.py, but the ends are swapped for telephony: audio arrives/leaves as 8 kHz mu-law over a Twilio Media Stream (WebSocket), decoded by TwilioFrameSerializer. STT runs on the GPU; the LLM is the local `activeblue-avc` fine-tune via Ollama; TTS is local Kokoro. This module just builds + runs the pipeline for one connected call. server.py owns the FastAPI/TwiML/WebSocket side and calls run_call() once per call. """ import asyncio import os import re import time from loguru import logger from pipecat.adapters.schemas.function_schema import FunctionSchema from pipecat.adapters.schemas.tools_schema import ToolsSchema from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.audio.vad.vad_analyzer import VADParams from pipecat.frames.frames import ( BotStartedSpeakingFrame, BotStoppedSpeakingFrame, EndFrame, EndTaskFrame, Frame, InputAudioRawFrame, LLMFullResponseEndFrame, LLMTextFrame, TTSSpeakFrame, ) from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair from pipecat.processors.audio.vad_processor import VADProcessor from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.serializers.twilio import TwilioFrameSerializer from pipecat.services.anthropic.llm import AnthropicLLMService from pipecat.services.kokoro.tts import KokoroTTSService from pipecat.services.ollama.llm import OLLamaLLMService from pipecat.services.whisper.stt import WhisperSTTService from pipecat.transports.websocket.fastapi import ( FastAPIWebsocketParams, FastAPIWebsocketTransport, ) from practice import practice_summary # ── Config (env-overridable) ───────────────────────────────────────────────── HERE = os.path.dirname(os.path.abspath(__file__)) # Reuse the Kokoro model files already downloaded by the pipecat-run project. MODEL_DIR = os.environ.get("KOKORO_MODEL_DIR", "/home/tocmo0nlord/pipecat-run/models") OLLAMA_MODEL = os.environ.get("OLLAMA_MODEL", "activeblue-avc:latest") OLLAMA_URL = os.environ.get("OLLAMA_URL", "http://127.0.0.1:11434/v1") # Swappable LLM provider: "ollama" (local) or "anthropic" (Claude API). Same universal # LLMContext drives both — only the service construction differs (see build_llm_service). LLM_PROVIDER = os.environ.get("LLM_PROVIDER", "ollama").lower() ANTHROPIC_API_KEY = os.environ.get("ANTHROPIC_API_KEY", "") # Defaults to the most capable model. For low-latency PHONE voice, set ANTHROPIC_MODEL to # claude-haiku-4-5 (fastest) or claude-sonnet-4-6 (balance) — see notes in build_llm_service. ANTHROPIC_MODEL = os.environ.get("ANTHROPIC_MODEL", "claude-opus-4-8") # In-call function-calling: AUTO by provider — ON for Claude (reliable tool calls → real-time # Odoo booking), OFF for local Ollama (llama3.1:8b over-calls / leaks JSON). An explicit # ENABLE_TOOLS env overrides the auto choice either way. _enable_tools_env = os.environ.get("ENABLE_TOOLS") ENABLE_TOOLS = ( _enable_tools_env.lower() in ("1", "true", "yes") if _enable_tools_env is not None else (LLM_PROVIDER == "anthropic") ) LLM_TEMPERATURE = float(os.environ.get("LLM_TEMPERATURE", "0.3")) LLM_MAX_TOKENS = int(os.environ.get("LLM_MAX_TOKENS", "160")) KOKORO_VOICE = os.environ.get("KOKORO_VOICE", "af_heart") WHISPER_MODEL = os.environ.get("WHISPER_MODEL", "medium") # tiny|base|small|medium WHISPER_DEVICE = os.environ.get("WHISPER_DEVICE", "cuda") # cuda for the 5080 WHISPER_COMPUTE = os.environ.get("WHISPER_COMPUTE", "float16") # Bias transcription toward our domain vocabulary (office cities + optometry terms) so # 8 kHz telephony audio doesn't turn "Hialeah" into "high allele" or "eye exam" into "hire". WHISPER_HOTWORDS = os.environ.get( "WHISPER_HOTWORDS", "Advanced Vision Care, eye exam, annual exam, appointment, optometry, contact lens, " "Hialeah, Kendall, Tamarac, Pembroke Pines, Lauderdale Lakes, Miami Gardens, Boca Raton", ) # Twilio sends 8 kHz mu-law on the wire, but faster-whisper assumes any numpy array is # 16 kHz — so we run the PIPELINE at 16 kHz and let TwilioFrameSerializer resample to/from # the 8 kHz wire. Running the pipeline at 8 kHz makes Whisper hear 2x-speed audio and # transcribe nothing. (Silero VAD + Kokoro are happy at 16 kHz too.) WIRE_SAMPLE_RATE = 8000 # Twilio mu-law on the wire (serializer handles this) PIPELINE_SAMPLE_RATE = 16000 # internal rate Whisper/VAD actually need # VAD tuning. Defaults (confidence 0.7 / min_volume 0.6) are desktop-mic values that can # miss short/quiet 8 kHz telephony utterances like "yes" — loosen them for the phone. VAD_CONFIDENCE = float(os.environ.get("VAD_CONFIDENCE", "0.5")) VAD_MIN_VOLUME = float(os.environ.get("VAD_MIN_VOLUME", "0.3")) VAD_START_SECS = float(os.environ.get("VAD_START_SECS", "0.2")) VAD_STOP_SECS = float(os.environ.get("VAD_STOP_SECS", "0.5")) # Half-duplex: ignore inbound audio while the agent is speaking (+ this tail in seconds) # so the agent's own voice echoing back the phone line can't trigger a false barge-in that # cancels its reply (= caller hears silence). Set HALF_DUPLEX=false to allow barge-in. HALF_DUPLEX = os.environ.get("HALF_DUPLEX", "true").lower() not in ("false", "0", "no") ECHO_TAIL_SECS = float(os.environ.get("ECHO_TAIL_SECS", "0.5")) # Agent persona name — purely for warmth; change/remove freely. AGENT_NAME = os.environ.get("AGENT_NAME", "Sofia") # How the name should be SPOKEN. Kokoro reads all-caps "AVA" as letters ("A-V-A"), so we # respell it as a word for TTS only (logs/Odoo keep AGENT_NAME). Override to taste, e.g. # AGENT_NAME_SPOKEN=Eva for an "EE-vuh" sound. AGENT_NAME_SPOKEN = os.environ.get("AGENT_NAME_SPOKEN", "Ava") # Grace period after the agent finishes the goodbye before we drop the carrier leg, so # the caller isn't cut off mid-word. The hang-up itself (EndTaskFrame -> auto_hang_up) # is unchanged — this only delays it. HANGUP_DELAY_SECS = float(os.environ.get("HANGUP_DELAY_SECS", "4.0")) SYSTEM_PROMPT = ( f"You are {AGENT_NAME}, a warm, friendly receptionist for Advanced Vision Care, an " "optometry practice with eight offices in South Florida. You are on a real phone call, so " "talk like a helpful human being: natural, relaxed, and genuinely conversational. Keep every " "reply to ONE short sentence — two at the very most, never a paragraph. Speak in English. Say " "numbers, dates, and times as words a person would say.\n\n" "Your job is to answer questions and take appointment requests. Be warm but DIRECT and " "efficient: when the caller greets you, get to the point and lead the call by asking " "questions. Never re-ask for something they already told you, and keep each turn to one " "short question or statement. Work through the call in THIS order:\n" " 1. REASON FIRST — find out what they are calling about (the reason for the visit, or " "their question). If it is only a question, answer it.\n" " 2. LOCATION — ask which city or area is most convenient, then confirm the matching " "office (see the office rule below).\n" " 3. CALLER INFO — get their FULL name (first and last; if they give only a first name, " "ask their last name). From this point on, address the caller by their name. Then ask their " "insurance (log only — see below) and their preferred day and time (in their own words — " "see the date rule below).\n" " 4. VERIFY PHONE — near the end, state the callback number back in ONE line, exactly like: " "'I have your number as — is that the best number to reach " "you?'. Do NOT ask permission first ('may I read your number back?') and do NOT skip this " "step. If it's not right, use the number they give. Don't bring up the phone number before this.\n" " 5. WRAP UP — recap the booking as a REQUEST in one warm sentence (for example, 'I've " "noted your request to come in tomorrow afternoon at our Kendall office'), make clear a " "staff member will call back to CONFIRM it, then ASK IF THERE IS ANYTHING ELSE you can help " "them with. Only once they are all set, give the closing below.\n" "KEEP MOMENTUM: until the booking is complete, ALWAYS end your turn with the next question " "you still need answered. Never reply with only an acknowledgment and then stop — for " "example, after noting insurance, in the SAME turn go straight on to ask the preferred day " "and time. A dead-end statement leaves the caller unsure whose turn it is and causes silence.\n\n" "Stay truthful and within your limits:\n" "- Use ONLY the facts below for addresses, phone numbers, insurance, and services. Never " "make any of these up.\n" "- OFFICE SELECTION: ask what city or area is most convenient. When the caller names a place " "that matches one of our offices (for example they say 'Kendall' and we have a Kendall " "office), simply confirm THAT office and move on — do NOT offer, compare, or mention any " "other office or city, and do NOT ask them to choose between offices. Only if their area " "matches no office should you name the single nearest one. List offices only if asked.\n" "- INSURANCE — log only, never promise, never guess: ask open-endedly what insurance they " "have (for example, 'What insurance do you have?'). Do NOT read out or suggest plan names " "from the list — let the caller tell you. Capture ONLY what the caller actually says; NEVER " "fill in, complete, or guess their plan, and never put words in their mouth. If you don't " "clearly hear the plan, ask them to repeat it. Do not promise, confirm, or deny coverage or " "any treatment based on their insurance, even if the plan is one we list, and NEVER say 'we " "accept' or 'we take' a plan — just note it and say our staff will verify their coverage when " "they call back.\n" "- DATES — just take down the day and time the caller asks for in their OWN words (e.g. " "'next Monday', 'the fifth'). Do NOT work out, state, or correct the calendar date, and NEVER " "argue about what today's date is. Tell them staff will confirm the exact date and time on the " "callback.\n" "- You CANNOT see availability or book/confirm anything. Never say a slot is open or " "available, never offer to 'check availability', and NEVER tell the caller their appointment " "is 'booked', 'scheduled', 'set', or 'confirmed' — not even in the recap. It is always a " "REQUEST: say you've NOTED it and a staff member will call back to confirm the date and time.\n" "- Hours are not published — say they vary by office and staff will confirm; never give " "specific hours.\n" "- You don't give medical advice and can't transfer calls. If the caller mentions an eye " "problem, just note it as the reason and say a staff member or doctor will follow up.\n" "- If you're not sure you heard something, simply ask them to repeat it.\n" "- CLOSING — the word 'Goodbye' ENDS the call, so guard it carefully. You MUST first ask " "'Is there anything else I can help you with?' and hear the caller say they need nothing " "more. NEVER say 'Goodbye' in the same turn as confirming details or the phone number, and " "never before that anything-else question. Once they confirm they're all set, give a brief " "warm closing ending with 'Goodbye'.\n\n" "PRACTICE FACTS:\n" + practice_summary() ) def _build_tools() -> ToolsSchema: # Only the booking action is a tool. Practice facts already live in the system prompt, # so no get_practice_info tool (avoids needless calls/latency). callback_number is NOT # required — we have the caller-ID and inject it in the handler. return ToolsSchema( standard_tools=[ FunctionSchema( name="record_appointment_request", description=( "Record the caller's appointment request once you have their name and at " "least the office/city and reason. Call this when the caller wants to book " "a visit; staff will call back to confirm the exact time." ), properties={ "patient_name": {"type": "string", "description": "Caller's full name"}, "location": {"type": "string", "description": "Which office/city the caller wants, e.g. Hialeah, Kendall, Tamarac"}, "reason": {"type": "string", "description": "Reason for the visit, e.g. annual exam, broken glasses, eye pain"}, "preferred_time": {"type": "string", "description": "Preferred day/time in the caller's words, if given"}, }, required=["patient_name"], ), ] ) class EndCallProcessor(FrameProcessor): """Lets the agent hang up AND guarantees the callback number is confirmed once. Sits between the LLM and the TTS: it sees reply text (LLMTextFrame, downstream) and the upstream BotStoppedSpeakingFrame. On a closing ('goodbye'/'adiós') it waits for TTS to finish, pauses HANGUP_DELAY_SECS so the caller isn't clipped, then pushes EndTaskFrame (TwilioFrameSerializer auto_hang_up drops the call). Deterministic phone confirmation: the prompt asks the agent to read the callback number back, but the 8B skips it ~half the time. So if a closing is reached and the agent never spoke the number this call (`phone_marker` not seen in its replies), we suppress the hang-up and inject a scripted confirmation turn first — guaranteeing it happens exactly once (the agent's own readback satisfies the gate, so no double-ask in the common case).""" _CLOSINGS = ("goodbye", "good-bye", "good bye", "adiós", "adios", "hasta luego") # Only force phone confirmation when a booking was actually underway (not info-only calls). _BOOKING_KWS = ("appointment", "schedule", "book", "insurance", "what day", "what time", "come in", "preferred") def __init__(self, phone_confirm_line: str | None = None, phone_marker: str | None = None): super().__init__() self._buf = "" self._should_end = False self._end_task = None self._phone_confirm_line = phone_confirm_line self._phone_marker = (phone_marker or "").lower() # Nothing to confirm (no caller ID) → treat as already handled. self._phone_confirmed = not phone_confirm_line self._assistant_seen = "" self._pending_phone_inject = False @classmethod def _is_closing(cls, text: str) -> bool: t = (text or "").lower() return any(c in t for c in cls._CLOSINGS) async def _hang_up_after_delay(self): await asyncio.sleep(HANGUP_DELAY_SECS) logger.info(f"{AGENT_NAME} ending task / hanging up") try: await self.push_frame(EndTaskFrame(), FrameDirection.UPSTREAM) except Exception: logger.exception("EndTaskFrame push failed (pipeline already ending?)") async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) if isinstance(frame, LLMTextFrame): self._buf += frame.text self._assistant_seen += frame.text.lower() if self._phone_marker and self._phone_marker in self._assistant_seen: self._phone_confirmed = True # the agent read the number back itself elif isinstance(frame, LLMFullResponseEndFrame): if self._is_closing(self._buf): booking = any(k in self._assistant_seen for k in self._BOOKING_KWS) if self._phone_confirmed or not booking: self._should_end = True logger.info(f"{AGENT_NAME} signalled closing -- will hang up " f"{HANGUP_DELAY_SECS:.0f}s after she finishes speaking") else: # Booking call closing without the number confirmed — do it deterministically. self._pending_phone_inject = True logger.info(f"{AGENT_NAME} reached closing w/o phone confirmation -- injecting it") self._buf = "" elif isinstance(frame, BotStoppedSpeakingFrame): if self._pending_phone_inject: self._pending_phone_inject = False self._phone_confirmed = True await self.push_frame(TTSSpeakFrame(self._phone_confirm_line), FrameDirection.DOWNSTREAM) elif self._should_end: self._should_end = False # Schedule the teardown so we don't block the pipeline during the grace pause. if self._end_task is None: self._end_task = asyncio.create_task(self._hang_up_after_delay()) await self.push_frame(frame, direction) class AudioHeartbeat(FrameProcessor): """Diagnostic: logs how many inbound audio frames arrive every ~5s. If this keeps ticking but VAD never fires, the issue is VAD/threshold; if it drops to 0 after a turn, inbound audio stalled at the transport. Cheap, leave it on while stabilizing.""" def __init__(self): super().__init__() self._n = 0 self._t = time.time() async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) if isinstance(frame, InputAudioRawFrame): self._n += 1 now = time.time() if now - self._t >= 5: logger.info(f"[audio-in] {self._n} frames in last {now - self._t:.0f}s") self._n = 0 self._t = now await self.push_frame(frame, direction) class HalfDuplexGate(FrameProcessor): """Drops inbound audio while the agent is speaking (plus ECHO_TAIL_SECS after it stops). In this pipecat build interruptions are VAD-driven and always on (PipelineParams has no allow_interruptions). On a phone line the agent's own TTS echoes back and the VAD reads it as the caller speaking → it broadcasts an interruption that cancels the agent mid-reply, so the caller hears silence. Sitting BEFORE the VAD, this gate withholds inbound audio frames while the bot is speaking, so its echo never reaches the VAD. Trade-off: the caller can't barge in mid-utterance (fine for short receptionist replies). Bypass with HALF_DUPLEX=false.""" def __init__(self, tail_secs: float = 0.5): super().__init__() self._bot_speaking = False self._reopen_at = 0.0 self._tail = tail_secs async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) if isinstance(frame, BotStartedSpeakingFrame): self._bot_speaking = True elif isinstance(frame, BotStoppedSpeakingFrame): self._bot_speaking = False self._reopen_at = time.time() + self._tail # Withhold caller audio while the bot speaks (+ echo tail) so echo can't barge in. if isinstance(frame, InputAudioRawFrame) and (self._bot_speaking or time.time() < self._reopen_at): return await self.push_frame(frame, direction) class HintedWhisperSTTService(WhisperSTTService): """WhisperSTTService that biases transcription toward domain vocabulary via faster-whisper `hotwords`. Pipecat's service doesn't expose hotwords, so we wrap the model's transcribe() for the duration of each call. Each call gets its own Whisper instance, so this per-instance patch is race-free.""" def __init__(self, *args, hotwords: str | None = None, **kwargs): super().__init__(*args, **kwargs) self._hotwords = hotwords async def run_stt(self, audio): if self._hotwords and self._model is not None: real = self._model.transcribe def patched(audio_arg, **kw): kw.setdefault("hotwords", self._hotwords) return real(audio_arg, **kw) self._model.transcribe = patched try: async for frame in super().run_stt(audio): yield frame finally: self._model.transcribe = real else: async for frame in super().run_stt(audio): yield frame # ── TTS number normalization ────────────────────────────────────────────────── # Kokoro reads digit strings as cardinals with symbols spoken aloud, e.g. "983-4969" # becomes "nine hundred eighty-three dash forty-nine sixty-nine". For a phone agent that # reads back phone numbers, street numbers, and zips, that's unusable. We normalize the # text right before synthesis (run_tts receives the full sentence) so phone numbers and # long digit runs are spoken one digit at a time, regardless of what the model emitted. _DIGIT_WORDS = { "0": "zero", "1": "one", "2": "two", "3": "three", "4": "four", "5": "five", "6": "six", "7": "seven", "8": "eight", "9": "nine", } _PHONE_RE = re.compile(r"(?:\+?1[\s.\-]?)?\(?\d{3}\)?[\s.\-]?\d{3}[\s.\-]?\d{4}") _LONGNUM_RE = re.compile(r"\b\d{4,5}\b") # street numbers, zip codes def _say_digits(s: str) -> str: return " ".join(_DIGIT_WORDS[c] for c in s if c in _DIGIT_WORDS) def _spoken_phone(number: str) -> str: """Phone number as grouped, digit-by-digit words: '+19735731671' -> 'nine seven three, five seven three, one six seven one'.""" d = re.sub(r"\D", "", number or "") if len(d) == 11 and d[0] == "1": # drop US country code d = d[1:] if len(d) == 10: # group as area / prefix / line for natural cadence return f"{_say_digits(d[:3])}, {_say_digits(d[3:6])}, {_say_digits(d[6:])}" return _say_digits(d) def _phone_to_words(m: re.Match) -> str: return _spoken_phone(m.group(0)) def tts_normalize(text: str) -> str: """Make phone numbers, street numbers, and zips speak naturally (digit by digit), and respell the all-caps agent name so it's said as a word, not letter-by-letter.""" if AGENT_NAME != AGENT_NAME_SPOKEN: text = re.sub(rf"\b{re.escape(AGENT_NAME)}\b", AGENT_NAME_SPOKEN, text) text = _PHONE_RE.sub(_phone_to_words, text) text = _LONGNUM_RE.sub(lambda m: _say_digits(m.group(0)), text) return text class SpokenKokoroTTSService(KokoroTTSService): """KokoroTTSService that normalizes numbers to digit-by-digit speech before synthesis, so phone numbers/addresses/zips are read naturally instead of as cardinals + 'dash'.""" async def run_tts(self, text: str, context_id: str): async for frame in super().run_tts(tts_normalize(text), context_id): yield frame def build_llm_service(): """Build the LLM service for the selected provider. The universal LLMContext + aggregators work with either, so only this construction differs (true A/B swap).""" if LLM_PROVIDER == "anthropic": if not ANTHROPIC_API_KEY: raise RuntimeError("LLM_PROVIDER=anthropic but ANTHROPIC_API_KEY is not set") logger.info(f"LLM provider: anthropic ({ANTHROPIC_MODEL})") # NOTE: Opus 4.8/4.7 reject temperature/top_p/top_k (HTTP 400), so we omit them — # this keeps the default Opus model working. For low-latency phone voice, prefer # claude-haiku-4-5 (fastest) or claude-sonnet-4-6 over Opus. enable_prompt_caching # caches the system prompt + growing conversation prefix (helps multi-turn cost/latency). return AnthropicLLMService( api_key=ANTHROPIC_API_KEY, settings=AnthropicLLMService.Settings( model=ANTHROPIC_MODEL, enable_prompt_caching=True, max_tokens=LLM_MAX_TOKENS, ), ) logger.info(f"LLM provider: ollama ({OLLAMA_MODEL})") return OLLamaLLMService( settings=OLLamaLLMService.Settings( model=OLLAMA_MODEL, temperature=LLM_TEMPERATURE, max_tokens=LLM_MAX_TOKENS, ), base_url=OLLAMA_URL, ) async def run_agent(transport, caller_number=None, call_sid=None, do_capture=True): """Build + run the AVC voice agent on a given transport. Shared by the phone path (Twilio Media Stream) and the browser path (WebRTC) — same prompt, model, voice, and booking/hang-up logic; only the transport differs. do_capture writes the post-call appointment to Odoo (on for phone; off for browser testing so it doesn't make cards).""" stt = HintedWhisperSTTService( settings=WhisperSTTService.Settings(model=WHISPER_MODEL), device=WHISPER_DEVICE, compute_type=WHISPER_COMPUTE, hotwords=WHISPER_HOTWORDS, ) llm = build_llm_service() # In-call booking tool — only registered when ENABLE_TOOLS is on (auto: Claude yes, # local Ollama no, since llama3.1:8b over-calls/leaks). The handler is a closure so it # can stamp the verified caller-ID + call_sid onto the lead (the model never supplies a # phone number — we don't ask for one). With tools on, this writes the Odoo lead IN-CALL, # so the post-call extraction is skipped below to avoid a duplicate. if ENABLE_TOOLS: async def _record_appointment(params): args = params.arguments or {} if do_capture: from practice import persist_appointment persist_appointment({ "call_sid": call_sid, "patient_name": args.get("patient_name"), "callback_number": caller_number, # verified caller-ID, not model-supplied "location": args.get("location"), "reason": args.get("reason"), "preferred_time": args.get("preferred_time"), "source": "in_call_tool", }) else: logger.info(f"[capture off] would record appointment: {args.get('patient_name')} / {args.get('location')}") await params.result_callback( {"status": "recorded", "message": "Recorded — staff will call to confirm the time."} ) llm.register_function("record_appointment_request", _record_appointment) tts = SpokenKokoroTTSService( model_path=os.path.join(MODEL_DIR, "kokoro-v1.0.onnx"), voices_path=os.path.join(MODEL_DIR, "voices-v1.0.bin"), settings=KokoroTTSService.Settings(voice=KOKORO_VOICE), ) vad = VADProcessor(vad_analyzer=SileroVADAnalyzer(params=VADParams( confidence=VAD_CONFIDENCE, start_secs=VAD_START_SECS, stop_secs=VAD_STOP_SECS, min_volume=VAD_MIN_VOLUME, ))) heartbeat = AudioHeartbeat() gate = HalfDuplexGate(tail_secs=ECHO_TAIL_SECS) if HALF_DUPLEX else None # Per-call system message = static prompt + the caller-ID number to confirm. Inject it # ALREADY spelled out digit-by-digit so the model repeats clean words instead of mangling # the raw digits (e.g. reading 197 as "one hundred ninety-seven"). if caller_number: caller_line = ( f"\n\nCALLER ID: the caller's number on file, written so you read it digit by digit, " f"is: {_spoken_phone(caller_number)}. When it's time to confirm it (near the end), say " "it back exactly like that and ask if it's the best number; if they say no, use the " "number they give. Do not say it any earlier in the call." ) else: caller_line = ( "\n\nCALLER ID: no number is available — near the end, ask the caller for the best " "phone number to reach them." ) system_content = SYSTEM_PROMPT + caller_line context_kwargs = {"messages": [{"role": "system", "content": system_content}]} if ENABLE_TOOLS: context_kwargs["tools"] = _build_tools() context = LLMContext(**context_kwargs) agg = LLMContextAggregatorPair(context) # Deterministic phone-confirmation safety net: if the agent reaches a closing without # having read the caller-ID back, EndCallProcessor speaks this scripted line first. if caller_number: _spoken = _spoken_phone(caller_number) phone_confirm_line = ( f"Before you go, let me make sure I have the best number to reach you: " f"{_spoken}. Is that correct?" ) phone_marker = _spoken.split(",")[0].strip() # e.g. "nine seven three" else: phone_confirm_line = phone_marker = None endcall = EndCallProcessor(phone_confirm_line=phone_confirm_line, phone_marker=phone_marker) pipeline = Pipeline( [ transport.input(), heartbeat, *( [gate] if gate else [] ), # half-duplex echo gate, before the VAD vad, stt, agg.user(), llm, endcall, tts, transport.output(), agg.assistant(), ] ) task = PipelineTask( pipeline, params=PipelineParams( audio_in_sample_rate=PIPELINE_SAMPLE_RATE, audio_out_sample_rate=PIPELINE_SAMPLE_RATE, allow_interruptions=True, ), ) @transport.event_handler("on_client_connected") async def on_client_connected(transport, client): logger.info("Client connected -- greeting") await task.queue_frames( [TTSSpeakFrame( f"Thank you for calling Advanced Vision Care, this is {AGENT_NAME}. " "How can I help you today?" )] ) @transport.event_handler("on_client_disconnected") async def on_client_disconnected(transport, client): logger.info("Client disconnected -- ending task") await task.queue_frame(EndFrame()) runner = PipelineRunner(handle_sigint=False) await runner.run(task) # Call is over. Post-call extraction is the capture path ONLY when in-call tools are # off (local Ollama). With tools on (Claude), the booking was already written in-call, # so skip extraction to avoid a duplicate lead. if do_capture and not ENABLE_TOOLS: try: from extract import extract_and_record await extract_and_record( context.messages, OLLAMA_URL, OLLAMA_MODEL, call_sid=call_sid, caller_number=caller_number, ) except Exception: logger.exception("Post-call appointment extraction failed") async def run_call(websocket, serializer: TwilioFrameSerializer, caller_number=None, call_sid=None): """Phone entrypoint: wrap the Twilio Media Stream in a transport, run the shared agent.""" transport = FastAPIWebsocketTransport( websocket=websocket, params=FastAPIWebsocketParams( audio_in_enabled=True, audio_out_enabled=True, audio_in_sample_rate=PIPELINE_SAMPLE_RATE, audio_out_sample_rate=PIPELINE_SAMPLE_RATE, add_wav_header=False, serializer=serializer, ), ) await run_agent(transport, caller_number=caller_number, call_sid=call_sid, do_capture=True)