#!/usr/bin/env python3 """AVC optometry phone agent — the Pipecat pipeline for a single inbound call. Same VAD -> STT -> LLM -> TTS loop as pipecat-run/bot.py, but the ends are swapped for telephony: audio arrives/leaves as 8 kHz mu-law over a Twilio Media Stream (WebSocket), decoded by TwilioFrameSerializer. STT runs on the GPU; the LLM is the local `activeblue-avc` fine-tune via Ollama; TTS is local Kokoro. This module just builds + runs the pipeline for one connected call. server.py owns the FastAPI/TwiML/WebSocket side and calls run_call() once per call. """ import os import re import time from loguru import logger from pipecat.adapters.schemas.function_schema import FunctionSchema from pipecat.adapters.schemas.tools_schema import ToolsSchema from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.audio.vad.vad_analyzer import VADParams from pipecat.frames.frames import ( BotStoppedSpeakingFrame, EndFrame, EndTaskFrame, Frame, InputAudioRawFrame, LLMFullResponseEndFrame, LLMTextFrame, TTSSpeakFrame, ) from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair from pipecat.processors.audio.vad_processor import VADProcessor from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.serializers.twilio import TwilioFrameSerializer from pipecat.services.anthropic.llm import AnthropicLLMService from pipecat.services.deepgram.stt import DeepgramSTTService from pipecat.services.kokoro.tts import KokoroTTSService from pipecat.services.ollama.llm import OLLamaLLMService from pipecat.transports.websocket.fastapi import ( FastAPIWebsocketParams, FastAPIWebsocketTransport, ) from practice import practice_summary # ── Config (env-overridable) ───────────────────────────────────────────────── HERE = os.path.dirname(os.path.abspath(__file__)) # Reuse the Kokoro model files already downloaded by the pipecat-run project. MODEL_DIR = os.environ.get("KOKORO_MODEL_DIR", "/home/tocmo0nlord/pipecat-run/models") OLLAMA_MODEL = os.environ.get("OLLAMA_MODEL", "activeblue-avc:latest") OLLAMA_URL = os.environ.get("OLLAMA_URL", "http://127.0.0.1:11434/v1") # Swappable LLM provider: "ollama" (local) or "anthropic" (Claude API). Same universal # LLMContext drives both — only the service construction differs (see build_llm_service). LLM_PROVIDER = os.environ.get("LLM_PROVIDER", "ollama").lower() ANTHROPIC_API_KEY = os.environ.get("ANTHROPIC_API_KEY", "") # Defaults to the most capable model. For low-latency PHONE voice, set ANTHROPIC_MODEL to # claude-haiku-4-5 (fastest) or claude-sonnet-4-6 (balance) — see notes in build_llm_service. ANTHROPIC_MODEL = os.environ.get("ANTHROPIC_MODEL", "claude-opus-4-8") # In-call function-calling: AUTO by provider — ON for Claude (reliable tool calls → real-time # Odoo booking), OFF for local Ollama (llama3.1:8b over-calls / leaks JSON). An explicit # ENABLE_TOOLS env overrides the auto choice either way. _enable_tools_env = os.environ.get("ENABLE_TOOLS") ENABLE_TOOLS = ( _enable_tools_env.lower() in ("1", "true", "yes") if _enable_tools_env is not None else (LLM_PROVIDER == "anthropic") ) LLM_TEMPERATURE = float(os.environ.get("LLM_TEMPERATURE", "0.3")) LLM_MAX_TOKENS = int(os.environ.get("LLM_MAX_TOKENS", "160")) KOKORO_VOICE = os.environ.get("KOKORO_VOICE", "af_heart") # Real-time STT is Deepgram Nova-2: end-of-utterance events in <300ms (vs Whisper's # 1-3s of chunk buffering, the main cause of non-reply / repeat-yourself). Whisper # large-v3 is retained for post-call transcription only (Phase 3). DEEPGRAM_API_KEY = os.environ.get("DEEPGRAM_API_KEY", "") # Twilio sends 8 kHz mu-law on the wire — we run the PIPELINE at 16 kHz and let # TwilioFrameSerializer resample to/from the 8 kHz wire. (Silero VAD, Deepgram, and # Kokoro are all happy at 16 kHz.) WIRE_SAMPLE_RATE = 8000 # Twilio mu-law on the wire (serializer handles this) PIPELINE_SAMPLE_RATE = 16000 # internal rate Whisper/VAD actually need # VAD tuning. Defaults (confidence 0.7 / min_volume 0.6) are desktop-mic values that can # miss short/quiet 8 kHz telephony utterances like "yes" — loosen them for the phone. VAD_CONFIDENCE = float(os.environ.get("VAD_CONFIDENCE", "0.5")) VAD_MIN_VOLUME = float(os.environ.get("VAD_MIN_VOLUME", "0.3")) VAD_START_SECS = float(os.environ.get("VAD_START_SECS", "0.2")) VAD_STOP_SECS = float(os.environ.get("VAD_STOP_SECS", "0.5")) # Agent persona name — purely for warmth; change/remove freely. AGENT_NAME = os.environ.get("AGENT_NAME", "Sofia") SYSTEM_PROMPT = ( f"You are {AGENT_NAME}, a warm, friendly receptionist for Advanced Vision Care, an " "optometry practice with eight offices in South Florida. You are on a real phone call, so " "talk like a helpful human being: natural, relaxed, and genuinely conversational — usually " "just one short sentence at a time. Speak in English. Say numbers, dates, and times as " "words a person would say.\n\n" "Your job is to answer callers' questions and to take appointment requests. To book a " "visit you need four things: which office or city, the reason for the visit, a preferred " "day and time, and their name. Gather these naturally as the conversation flows — don't " "interrogate, and never ask for something the caller already told you (people often give " "their name or reason in their first sentence). You already have their number from caller " "ID, so never ask for a phone number. When you have the details, repeat them back in one " "warm sentence to confirm, and let them know a staff member will call to finalize the time.\n\n" "Stay truthful and within your limits:\n" "- Use ONLY the facts below for addresses, phone numbers, insurance, and services. Never " "make any of these up.\n" "- To find the right office, ask what CITY or AREA is most convenient for the caller. Do " "NOT suggest or name a specific office yourself — you don't know where they are. Only after " "they tell you their area, name the matching office; and only list locations if they ask " "what offices exist.\n" "- You cannot see a calendar, so never say a time is open or available — take the time as " "a request that staff will confirm.\n" "- Insurance: only confirm a plan that is in the list below. For any plan that is not " "listed (UnitedHealthcare, Aetna, Cigna, and so on), don't say yes or no — say our staff " "will verify their coverage.\n" "- Hours are not published — say they vary by office and staff will confirm; never give " "specific hours.\n" "- You don't give medical advice and can't transfer calls. If the caller mentions an eye " "problem, just note it as the reason and say a staff member or doctor will follow up.\n" "- If you're not sure you heard something, simply ask them to repeat it.\n" "- When the caller is all set, give a brief, warm closing that ends with the word " "'Goodbye' — that ends the call, so only say it when you truly mean to.\n\n" "PRACTICE FACTS:\n" + practice_summary() ) def _build_tools() -> ToolsSchema: # Only the booking action is a tool. Practice facts already live in the system prompt, # so no get_practice_info tool (avoids needless calls/latency). callback_number is NOT # required — we have the caller-ID and inject it in the handler. return ToolsSchema( standard_tools=[ FunctionSchema( name="record_appointment_request", description=( "Record the caller's appointment request once you have their name and at " "least the office/city and reason. Call this when the caller wants to book " "a visit; staff will call back to confirm the exact time." ), properties={ "patient_name": {"type": "string", "description": "Caller's full name"}, "location": {"type": "string", "description": "Which office/city the caller wants, e.g. Hialeah, Kendall, Tamarac"}, "reason": {"type": "string", "description": "Reason for the visit, e.g. annual exam, broken glasses, eye pain"}, "preferred_time": {"type": "string", "description": "Preferred day/time in the caller's words, if given"}, }, required=["patient_name"], ), ] ) class EndCallProcessor(FrameProcessor): """Lets Sofia hang up. MUST sit between the LLM and the TTS: there it sees her reply text (LLMTextFrame, flowing downstream) AND the upstream copy of BotStoppedSpeakingFrame the output transport emits. It accumulates each reply; if the finished reply contains a closing ('goodbye'/'adiós'), it waits until she's done speaking, then pushes EndTaskFrame upstream — the task ends and TwilioFrameSerializer (auto_hang_up) drops the call.""" _CLOSINGS = ("goodbye", "good-bye", "good bye", "adiós", "adios", "hasta luego") def __init__(self): super().__init__() self._buf = "" self._should_end = False @classmethod def _is_closing(cls, text: str) -> bool: t = (text or "").lower() return any(c in t for c in cls._CLOSINGS) async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) if isinstance(frame, LLMTextFrame): self._buf += frame.text elif isinstance(frame, LLMFullResponseEndFrame): if self._is_closing(self._buf): self._should_end = True logger.info("Sofia signalled closing -- will hang up after she finishes speaking") self._buf = "" elif isinstance(frame, BotStoppedSpeakingFrame) and self._should_end: self._should_end = False logger.info("Sofia closed the call -- ending task / hanging up") await self.push_frame(EndTaskFrame(), FrameDirection.UPSTREAM) await self.push_frame(frame, direction) class AudioHeartbeat(FrameProcessor): """Diagnostic: logs how many inbound audio frames arrive every ~5s. If this keeps ticking but VAD never fires, the issue is VAD/threshold; if it drops to 0 after a turn, inbound audio stalled at the transport. Cheap, leave it on while stabilizing.""" def __init__(self): super().__init__() self._n = 0 self._t = time.time() async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) if isinstance(frame, InputAudioRawFrame): self._n += 1 now = time.time() if now - self._t >= 5: logger.info(f"[audio-in] {self._n} frames in last {now - self._t:.0f}s") self._n = 0 self._t = now await self.push_frame(frame, direction) def build_llm_service(): """Build the LLM service for the selected provider. The universal LLMContext + aggregators work with either, so only this construction differs (true A/B swap).""" if LLM_PROVIDER == "anthropic": if not ANTHROPIC_API_KEY: raise RuntimeError("LLM_PROVIDER=anthropic but ANTHROPIC_API_KEY is not set") logger.info(f"LLM provider: anthropic ({ANTHROPIC_MODEL})") # NOTE: Opus 4.8/4.7 reject temperature/top_p/top_k (HTTP 400), so we omit them — # this keeps the default Opus model working. For low-latency phone voice, prefer # claude-haiku-4-5 (fastest) or claude-sonnet-4-6 over Opus. enable_prompt_caching # caches the system prompt + growing conversation prefix (helps multi-turn cost/latency). return AnthropicLLMService( api_key=ANTHROPIC_API_KEY, settings=AnthropicLLMService.Settings( model=ANTHROPIC_MODEL, enable_prompt_caching=True, max_tokens=LLM_MAX_TOKENS, ), ) logger.info(f"LLM provider: ollama ({OLLAMA_MODEL})") return OLLamaLLMService( settings=OLLamaLLMService.Settings( model=OLLAMA_MODEL, temperature=LLM_TEMPERATURE, max_tokens=LLM_MAX_TOKENS, ), base_url=OLLAMA_URL, ) async def run_agent(transport, caller_number=None, call_sid=None, do_capture=True): """Build + run the AVC voice agent on a given transport. Shared by the phone path (Twilio Media Stream) and the browser path (WebRTC) — same prompt, model, voice, and booking/hang-up logic; only the transport differs. do_capture writes the post-call appointment to Odoo (on for phone; off for browser testing so it doesn't make cards).""" stt = DeepgramSTTService( api_key=DEEPGRAM_API_KEY, settings=DeepgramSTTService.Settings( model="nova-2", language="en-US", smart_format=True, punctuate=True, interim_results=False, # final transcripts only — avoids double-firing utterance_end_ms=1000, # ms of silence before end-of-utterance fires ), ) llm = build_llm_service() # In-call booking tool — only registered when ENABLE_TOOLS is on (auto: Claude yes, # local Ollama no, since llama3.1:8b over-calls/leaks). The handler is a closure so it # can stamp the verified caller-ID + call_sid onto the lead (the model never supplies a # phone number — we don't ask for one). With tools on, this writes the Odoo lead IN-CALL, # so the post-call extraction is skipped below to avoid a duplicate. if ENABLE_TOOLS: async def _record_appointment(params): args = params.arguments or {} if do_capture: from practice import persist_appointment persist_appointment({ "call_sid": call_sid, "patient_name": args.get("patient_name"), "callback_number": caller_number, # verified caller-ID, not model-supplied "location": args.get("location"), "reason": args.get("reason"), "preferred_time": args.get("preferred_time"), "source": "in_call_tool", }) else: logger.info(f"[capture off] would record appointment: {args.get('patient_name')} / {args.get('location')}") await params.result_callback( {"status": "recorded", "message": "Recorded — staff will call to confirm the time."} ) llm.register_function("record_appointment_request", _record_appointment) tts = KokoroTTSService( model_path=os.path.join(MODEL_DIR, "kokoro-v1.0.onnx"), voices_path=os.path.join(MODEL_DIR, "voices-v1.0.bin"), settings=KokoroTTSService.Settings(voice=KOKORO_VOICE), ) vad = VADProcessor(vad_analyzer=SileroVADAnalyzer(params=VADParams( confidence=VAD_CONFIDENCE, start_secs=VAD_START_SECS, stop_secs=VAD_STOP_SECS, min_volume=VAD_MIN_VOLUME, ))) heartbeat = AudioHeartbeat() context_kwargs = {"messages": [{"role": "system", "content": SYSTEM_PROMPT}]} if ENABLE_TOOLS: context_kwargs["tools"] = _build_tools() context = LLMContext(**context_kwargs) agg = LLMContextAggregatorPair(context) endcall = EndCallProcessor() pipeline = Pipeline( [ transport.input(), heartbeat, vad, stt, agg.user(), llm, endcall, tts, transport.output(), agg.assistant(), ] ) task = PipelineTask( pipeline, params=PipelineParams( audio_in_sample_rate=PIPELINE_SAMPLE_RATE, audio_out_sample_rate=PIPELINE_SAMPLE_RATE, allow_interruptions=True, ), ) @transport.event_handler("on_client_connected") async def on_client_connected(transport, client): logger.info("Client connected -- greeting") await task.queue_frames( [TTSSpeakFrame( f"Thank you for calling Advanced Vision Care, this is {AGENT_NAME}. " "How can I help you today?" )] ) @transport.event_handler("on_client_disconnected") async def on_client_disconnected(transport, client): logger.info("Client disconnected -- ending task") await task.queue_frame(EndFrame()) runner = PipelineRunner(handle_sigint=False) await runner.run(task) # Call is over. Post-call extraction is the capture path ONLY when in-call tools are # off (local Ollama). With tools on (Claude), the booking was already written in-call, # so skip extraction to avoid a duplicate lead. if do_capture and not ENABLE_TOOLS: try: from extract import extract_and_record await extract_and_record( context.messages, OLLAMA_URL, OLLAMA_MODEL, call_sid=call_sid, caller_number=caller_number, ) except Exception: logger.exception("Post-call appointment extraction failed") async def run_call(websocket, serializer: TwilioFrameSerializer, caller_number=None, call_sid=None): """Phone entrypoint: wrap the Twilio Media Stream in a transport, run the shared agent.""" transport = FastAPIWebsocketTransport( websocket=websocket, params=FastAPIWebsocketParams( audio_in_enabled=True, audio_out_enabled=True, audio_in_sample_rate=PIPELINE_SAMPLE_RATE, audio_out_sample_rate=PIPELINE_SAMPLE_RATE, add_wav_header=False, serializer=serializer, ), ) await run_agent(transport, caller_number=caller_number, call_sid=call_sid, do_capture=True)