#!/usr/bin/env python3 """AVC optometry phone agent — the Pipecat pipeline for a single inbound call. Same VAD -> STT -> LLM -> TTS loop as pipecat-run/bot.py, but the ends are swapped for telephony: audio arrives/leaves as 8 kHz mu-law over a Twilio Media Stream (WebSocket), decoded by TwilioFrameSerializer. STT runs on the GPU; the LLM is the local `activeblue-avc` fine-tune via Ollama; TTS is local Kokoro. This module just builds + runs the pipeline for one connected call. server.py owns the FastAPI/TwiML/WebSocket side and calls run_call() once per call. """ import asyncio import os import re import time from datetime import datetime, timedelta from loguru import logger from pipecat.adapters.schemas.function_schema import FunctionSchema from pipecat.adapters.schemas.tools_schema import ToolsSchema from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.audio.vad.vad_analyzer import VADParams from pipecat.frames.frames import ( BotStoppedSpeakingFrame, EndFrame, EndTaskFrame, Frame, InputAudioRawFrame, LLMFullResponseEndFrame, LLMTextFrame, TTSSpeakFrame, ) from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair from pipecat.processors.audio.vad_processor import VADProcessor from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.serializers.twilio import TwilioFrameSerializer from pipecat.services.anthropic.llm import AnthropicLLMService from pipecat.services.kokoro.tts import KokoroTTSService from pipecat.services.ollama.llm import OLLamaLLMService from pipecat.services.whisper.stt import WhisperSTTService from pipecat.transports.websocket.fastapi import ( FastAPIWebsocketParams, FastAPIWebsocketTransport, ) from practice import practice_summary # ── Config (env-overridable) ───────────────────────────────────────────────── HERE = os.path.dirname(os.path.abspath(__file__)) # Reuse the Kokoro model files already downloaded by the pipecat-run project. MODEL_DIR = os.environ.get("KOKORO_MODEL_DIR", "/home/tocmo0nlord/pipecat-run/models") OLLAMA_MODEL = os.environ.get("OLLAMA_MODEL", "activeblue-avc:latest") OLLAMA_URL = os.environ.get("OLLAMA_URL", "http://127.0.0.1:11434/v1") # Swappable LLM provider: "ollama" (local) or "anthropic" (Claude API). Same universal # LLMContext drives both — only the service construction differs (see build_llm_service). LLM_PROVIDER = os.environ.get("LLM_PROVIDER", "ollama").lower() ANTHROPIC_API_KEY = os.environ.get("ANTHROPIC_API_KEY", "") # Defaults to the most capable model. For low-latency PHONE voice, set ANTHROPIC_MODEL to # claude-haiku-4-5 (fastest) or claude-sonnet-4-6 (balance) — see notes in build_llm_service. ANTHROPIC_MODEL = os.environ.get("ANTHROPIC_MODEL", "claude-opus-4-8") # In-call function-calling: AUTO by provider — ON for Claude (reliable tool calls → real-time # Odoo booking), OFF for local Ollama (llama3.1:8b over-calls / leaks JSON). An explicit # ENABLE_TOOLS env overrides the auto choice either way. _enable_tools_env = os.environ.get("ENABLE_TOOLS") ENABLE_TOOLS = ( _enable_tools_env.lower() in ("1", "true", "yes") if _enable_tools_env is not None else (LLM_PROVIDER == "anthropic") ) LLM_TEMPERATURE = float(os.environ.get("LLM_TEMPERATURE", "0.3")) LLM_MAX_TOKENS = int(os.environ.get("LLM_MAX_TOKENS", "160")) KOKORO_VOICE = os.environ.get("KOKORO_VOICE", "af_heart") WHISPER_MODEL = os.environ.get("WHISPER_MODEL", "medium") # tiny|base|small|medium WHISPER_DEVICE = os.environ.get("WHISPER_DEVICE", "cuda") # cuda for the 5080 WHISPER_COMPUTE = os.environ.get("WHISPER_COMPUTE", "float16") # Bias transcription toward our domain vocabulary (office cities + optometry terms) so # 8 kHz telephony audio doesn't turn "Hialeah" into "high allele" or "eye exam" into "hire". WHISPER_HOTWORDS = os.environ.get( "WHISPER_HOTWORDS", "Advanced Vision Care, eye exam, annual exam, appointment, optometry, contact lens, " "Hialeah, Kendall, Tamarac, Pembroke Pines, Lauderdale Lakes, Miami Gardens, Boca Raton", ) # Twilio sends 8 kHz mu-law on the wire, but faster-whisper assumes any numpy array is # 16 kHz — so we run the PIPELINE at 16 kHz and let TwilioFrameSerializer resample to/from # the 8 kHz wire. Running the pipeline at 8 kHz makes Whisper hear 2x-speed audio and # transcribe nothing. (Silero VAD + Kokoro are happy at 16 kHz too.) WIRE_SAMPLE_RATE = 8000 # Twilio mu-law on the wire (serializer handles this) PIPELINE_SAMPLE_RATE = 16000 # internal rate Whisper/VAD actually need # VAD tuning. Defaults (confidence 0.7 / min_volume 0.6) are desktop-mic values that can # miss short/quiet 8 kHz telephony utterances like "yes" — loosen them for the phone. VAD_CONFIDENCE = float(os.environ.get("VAD_CONFIDENCE", "0.5")) VAD_MIN_VOLUME = float(os.environ.get("VAD_MIN_VOLUME", "0.3")) VAD_START_SECS = float(os.environ.get("VAD_START_SECS", "0.2")) VAD_STOP_SECS = float(os.environ.get("VAD_STOP_SECS", "0.5")) # Agent persona name — purely for warmth; change/remove freely. AGENT_NAME = os.environ.get("AGENT_NAME", "Sofia") # Grace period after the agent finishes the goodbye before we drop the carrier leg, so # the caller isn't cut off mid-word. The hang-up itself (EndTaskFrame -> auto_hang_up) # is unchanged — this only delays it. HANGUP_DELAY_SECS = float(os.environ.get("HANGUP_DELAY_SECS", "4.0")) SYSTEM_PROMPT = ( f"You are {AGENT_NAME}, a warm, friendly receptionist for Advanced Vision Care, an " "optometry practice with eight offices in South Florida. You are on a real phone call, so " "talk like a helpful human being: natural, relaxed, and genuinely conversational — usually " "just one short sentence at a time. Speak in English. Say numbers, dates, and times as " "words a person would say.\n\n" "Your job is to answer callers' questions and to take appointment requests. For a " "booking, gather these SIX things naturally as the conversation flows — don't " "interrogate, and never ask for something the caller already told you:\n" " 1. Their FULL name (first and last). If they give only a first name, warmly ask for " "their last name too.\n" " 2. The phone number to reach them. Their caller-ID number is given to you below — read " "it back and ask if that is the best number. If they say no, ask for the right number and " "use that instead.\n" " 3. Which office or city is most convenient.\n" " 4. The reason for the visit.\n" " 5. Their insurance — ask what insurance they have and simply note it (see the insurance " "rule below).\n" " 6. The day and time they prefer (validate the date — see the date rule below).\n" "When you have the details, repeat them back in one warm sentence to confirm, and let them " "know a staff member will call to finalize the time.\n\n" "Stay truthful and within your limits:\n" "- Use ONLY the facts below for addresses, phone numbers, insurance, and services. Never " "make any of these up.\n" "- To find the right office, ask what CITY or AREA is most convenient for the caller. Do " "NOT suggest or name a specific office yourself — you don't know where they are. Only after " "they tell you their area, name the matching office; and only list locations if they ask " "what offices exist.\n" "- INSURANCE — log only, never promise: ask what insurance they have and note it for staff. " "Do NOT promise, confirm, or deny coverage or any treatment based on their insurance, even " "if the plan is one we list. Always say our staff will verify their coverage when they call " "back. Just capture the plan name.\n" "- DATES — always validate against the calendar provided below. Work out the real date the " "caller means and check it. If the weekday and the date they say do not match, or the date " "does not exist, gently correct them and offer the right one, then confirm before booking. " "For example, if they say 'Monday the fifth' but the Monday next month is the sixth, say: " "'Next month, Monday lands on the sixth — would you like to schedule that date?' Never accept " "an impossible or mismatched date silently.\n" "- You cannot see a calendar of openings, so never say a time slot is open or available — " "take the day/time as a request that staff will confirm.\n" "- Hours are not published — say they vary by office and staff will confirm; never give " "specific hours.\n" "- You don't give medical advice and can't transfer calls. If the caller mentions an eye " "problem, just note it as the reason and say a staff member or doctor will follow up.\n" "- If you're not sure you heard something, simply ask them to repeat it.\n" "- When the caller is all set, give a brief, warm closing that ends with the word " "'Goodbye' — that ends the call, so only say it when you truly mean to.\n\n" "PRACTICE FACTS:\n" + practice_summary() ) def _date_context(now: datetime | None = None) -> str: """Calendar grounding injected per call so the local model can resolve and VALIDATE the dates a caller mentions (e.g. catch 'Monday the 5th' when the Monday is the 6th). Recomputed each call because the server is long-running.""" now = now or datetime.now() today = now.date() # 45 days covers 'next month' references for any call date. lines = [] for i in range(45): d = today + timedelta(days=i) tag = " <- TODAY" if i == 0 else (" <- tomorrow" if i == 1 else "") lines.append(f" {d.strftime('%A, %B %d, %Y').replace(' 0', ' ')}{tag}") return ( "CALENDAR — authoritative, use for EVERY date the caller mentions:\n" f"Today is {today.strftime('%A, %B %d, %Y').replace(' 0', ' ')}.\n" "Upcoming dates:\n" + "\n".join(lines) + "\n" ) def _build_tools() -> ToolsSchema: # Only the booking action is a tool. Practice facts already live in the system prompt, # so no get_practice_info tool (avoids needless calls/latency). callback_number is NOT # required — we have the caller-ID and inject it in the handler. return ToolsSchema( standard_tools=[ FunctionSchema( name="record_appointment_request", description=( "Record the caller's appointment request once you have their name and at " "least the office/city and reason. Call this when the caller wants to book " "a visit; staff will call back to confirm the exact time." ), properties={ "patient_name": {"type": "string", "description": "Caller's full name"}, "location": {"type": "string", "description": "Which office/city the caller wants, e.g. Hialeah, Kendall, Tamarac"}, "reason": {"type": "string", "description": "Reason for the visit, e.g. annual exam, broken glasses, eye pain"}, "preferred_time": {"type": "string", "description": "Preferred day/time in the caller's words, if given"}, }, required=["patient_name"], ), ] ) class EndCallProcessor(FrameProcessor): """Lets the agent hang up. MUST sit between the LLM and the TTS: there it sees her reply text (LLMTextFrame, flowing downstream) AND the upstream copy of BotStoppedSpeakingFrame the output transport emits. It accumulates each reply; if the finished reply contains a closing ('goodbye'/'adiós'), it waits until she's done speaking, pauses HANGUP_DELAY_SECS so the caller isn't clipped, then pushes EndTaskFrame upstream — the task ends and TwilioFrameSerializer (auto_hang_up) drops the call.""" _CLOSINGS = ("goodbye", "good-bye", "good bye", "adiós", "adios", "hasta luego") def __init__(self): super().__init__() self._buf = "" self._should_end = False self._end_task = None @classmethod def _is_closing(cls, text: str) -> bool: t = (text or "").lower() return any(c in t for c in cls._CLOSINGS) async def _hang_up_after_delay(self): await asyncio.sleep(HANGUP_DELAY_SECS) logger.info(f"{AGENT_NAME} ending task / hanging up") try: await self.push_frame(EndTaskFrame(), FrameDirection.UPSTREAM) except Exception: logger.exception("EndTaskFrame push failed (pipeline already ending?)") async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) if isinstance(frame, LLMTextFrame): self._buf += frame.text elif isinstance(frame, LLMFullResponseEndFrame): if self._is_closing(self._buf): self._should_end = True logger.info(f"{AGENT_NAME} signalled closing -- will hang up " f"{HANGUP_DELAY_SECS:.0f}s after she finishes speaking") self._buf = "" elif isinstance(frame, BotStoppedSpeakingFrame) and self._should_end: self._should_end = False # Schedule the teardown so we don't block the pipeline during the grace pause. if self._end_task is None: self._end_task = asyncio.create_task(self._hang_up_after_delay()) await self.push_frame(frame, direction) class AudioHeartbeat(FrameProcessor): """Diagnostic: logs how many inbound audio frames arrive every ~5s. If this keeps ticking but VAD never fires, the issue is VAD/threshold; if it drops to 0 after a turn, inbound audio stalled at the transport. Cheap, leave it on while stabilizing.""" def __init__(self): super().__init__() self._n = 0 self._t = time.time() async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) if isinstance(frame, InputAudioRawFrame): self._n += 1 now = time.time() if now - self._t >= 5: logger.info(f"[audio-in] {self._n} frames in last {now - self._t:.0f}s") self._n = 0 self._t = now await self.push_frame(frame, direction) class HintedWhisperSTTService(WhisperSTTService): """WhisperSTTService that biases transcription toward domain vocabulary via faster-whisper `hotwords`. Pipecat's service doesn't expose hotwords, so we wrap the model's transcribe() for the duration of each call. Each call gets its own Whisper instance, so this per-instance patch is race-free.""" def __init__(self, *args, hotwords: str | None = None, **kwargs): super().__init__(*args, **kwargs) self._hotwords = hotwords async def run_stt(self, audio): if self._hotwords and self._model is not None: real = self._model.transcribe def patched(audio_arg, **kw): kw.setdefault("hotwords", self._hotwords) return real(audio_arg, **kw) self._model.transcribe = patched try: async for frame in super().run_stt(audio): yield frame finally: self._model.transcribe = real else: async for frame in super().run_stt(audio): yield frame def build_llm_service(): """Build the LLM service for the selected provider. The universal LLMContext + aggregators work with either, so only this construction differs (true A/B swap).""" if LLM_PROVIDER == "anthropic": if not ANTHROPIC_API_KEY: raise RuntimeError("LLM_PROVIDER=anthropic but ANTHROPIC_API_KEY is not set") logger.info(f"LLM provider: anthropic ({ANTHROPIC_MODEL})") # NOTE: Opus 4.8/4.7 reject temperature/top_p/top_k (HTTP 400), so we omit them — # this keeps the default Opus model working. For low-latency phone voice, prefer # claude-haiku-4-5 (fastest) or claude-sonnet-4-6 over Opus. enable_prompt_caching # caches the system prompt + growing conversation prefix (helps multi-turn cost/latency). return AnthropicLLMService( api_key=ANTHROPIC_API_KEY, settings=AnthropicLLMService.Settings( model=ANTHROPIC_MODEL, enable_prompt_caching=True, max_tokens=LLM_MAX_TOKENS, ), ) logger.info(f"LLM provider: ollama ({OLLAMA_MODEL})") return OLLamaLLMService( settings=OLLamaLLMService.Settings( model=OLLAMA_MODEL, temperature=LLM_TEMPERATURE, max_tokens=LLM_MAX_TOKENS, ), base_url=OLLAMA_URL, ) async def run_agent(transport, caller_number=None, call_sid=None, do_capture=True): """Build + run the AVC voice agent on a given transport. Shared by the phone path (Twilio Media Stream) and the browser path (WebRTC) — same prompt, model, voice, and booking/hang-up logic; only the transport differs. do_capture writes the post-call appointment to Odoo (on for phone; off for browser testing so it doesn't make cards).""" stt = HintedWhisperSTTService( settings=WhisperSTTService.Settings(model=WHISPER_MODEL), device=WHISPER_DEVICE, compute_type=WHISPER_COMPUTE, hotwords=WHISPER_HOTWORDS, ) llm = build_llm_service() # In-call booking tool — only registered when ENABLE_TOOLS is on (auto: Claude yes, # local Ollama no, since llama3.1:8b over-calls/leaks). The handler is a closure so it # can stamp the verified caller-ID + call_sid onto the lead (the model never supplies a # phone number — we don't ask for one). With tools on, this writes the Odoo lead IN-CALL, # so the post-call extraction is skipped below to avoid a duplicate. if ENABLE_TOOLS: async def _record_appointment(params): args = params.arguments or {} if do_capture: from practice import persist_appointment persist_appointment({ "call_sid": call_sid, "patient_name": args.get("patient_name"), "callback_number": caller_number, # verified caller-ID, not model-supplied "location": args.get("location"), "reason": args.get("reason"), "preferred_time": args.get("preferred_time"), "source": "in_call_tool", }) else: logger.info(f"[capture off] would record appointment: {args.get('patient_name')} / {args.get('location')}") await params.result_callback( {"status": "recorded", "message": "Recorded — staff will call to confirm the time."} ) llm.register_function("record_appointment_request", _record_appointment) tts = KokoroTTSService( model_path=os.path.join(MODEL_DIR, "kokoro-v1.0.onnx"), voices_path=os.path.join(MODEL_DIR, "voices-v1.0.bin"), settings=KokoroTTSService.Settings(voice=KOKORO_VOICE), ) vad = VADProcessor(vad_analyzer=SileroVADAnalyzer(params=VADParams( confidence=VAD_CONFIDENCE, start_secs=VAD_START_SECS, stop_secs=VAD_STOP_SECS, min_volume=VAD_MIN_VOLUME, ))) heartbeat = AudioHeartbeat() # Per-call system message = static prompt + today's calendar + the caller-ID number to # confirm. Built here (not at import) so the date is current on a long-running server. if caller_number: caller_line = ( f"\n\nCALLER ID: the caller's number on file is {caller_number}. Read it back and " "ask if it's the best number to reach them; if they say no, use the number they give." ) else: caller_line = ( "\n\nCALLER ID: no number is available — ask the caller for the best phone number " "to reach them." ) system_content = SYSTEM_PROMPT + "\n\n" + _date_context() + caller_line context_kwargs = {"messages": [{"role": "system", "content": system_content}]} if ENABLE_TOOLS: context_kwargs["tools"] = _build_tools() context = LLMContext(**context_kwargs) agg = LLMContextAggregatorPair(context) endcall = EndCallProcessor() pipeline = Pipeline( [ transport.input(), heartbeat, vad, stt, agg.user(), llm, endcall, tts, transport.output(), agg.assistant(), ] ) task = PipelineTask( pipeline, params=PipelineParams( audio_in_sample_rate=PIPELINE_SAMPLE_RATE, audio_out_sample_rate=PIPELINE_SAMPLE_RATE, allow_interruptions=True, ), ) @transport.event_handler("on_client_connected") async def on_client_connected(transport, client): logger.info("Client connected -- greeting") await task.queue_frames( [TTSSpeakFrame( f"Thank you for calling Advanced Vision Care, this is {AGENT_NAME}. " "How can I help you today?" )] ) @transport.event_handler("on_client_disconnected") async def on_client_disconnected(transport, client): logger.info("Client disconnected -- ending task") await task.queue_frame(EndFrame()) runner = PipelineRunner(handle_sigint=False) await runner.run(task) # Call is over. Post-call extraction is the capture path ONLY when in-call tools are # off (local Ollama). With tools on (Claude), the booking was already written in-call, # so skip extraction to avoid a duplicate lead. if do_capture and not ENABLE_TOOLS: try: from extract import extract_and_record await extract_and_record( context.messages, OLLAMA_URL, OLLAMA_MODEL, call_sid=call_sid, caller_number=caller_number, ) except Exception: logger.exception("Post-call appointment extraction failed") async def run_call(websocket, serializer: TwilioFrameSerializer, caller_number=None, call_sid=None): """Phone entrypoint: wrap the Twilio Media Stream in a transport, run the shared agent.""" transport = FastAPIWebsocketTransport( websocket=websocket, params=FastAPIWebsocketParams( audio_in_enabled=True, audio_out_enabled=True, audio_in_sample_rate=PIPELINE_SAMPLE_RATE, audio_out_sample_rate=PIPELINE_SAMPLE_RATE, add_wav_header=False, serializer=serializer, ), ) await run_agent(transport, caller_number=caller_number, call_sid=call_sid, do_capture=True)