From 80824a7ab0c77df12de2242b4672f0a94e9149f7 Mon Sep 17 00:00:00 2001 From: tocmo0nlord Date: Sat, 27 Jun 2026 17:46:07 +0000 Subject: [PATCH] Add call recording (stereo WAV) + wire silence re-prompt watchdog MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Stop debugging silence by guesswork: AudioBufferProcessor records every call to recordings/_.wav (caller=left, agent=right) so calls can be reviewed with actual audio. (We had no audio before — that was the real gap; the earlier "too quiet" explanation was unsupported.) SilenceWatchdog: after the agent finishes, if the caller is silent for SILENCE_REPROMPT_SECS (7s) it re-prompts ("are you still there?"); after MAX_REPROMPTS it closes gracefully. This directly breaks the dead-silence pattern (e.g. the 14s gap after the phone confirmation) instead of waiting. Runtime-tested both. .gitignore already excludes recordings/. Co-Authored-By: Claude Opus 4.8 --- bot.py | 117 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 117 insertions(+) diff --git a/bot.py b/bot.py index 7cec333..f3edcf8 100644 --- a/bot.py +++ b/bot.py @@ -32,12 +32,14 @@ from pipecat.frames.frames import ( LLMFullResponseEndFrame, LLMTextFrame, TTSSpeakFrame, + UserStartedSpeakingFrame, ) from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair +from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor from pipecat.processors.audio.vad_processor import VADProcessor from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.serializers.twilio import TwilioFrameSerializer @@ -110,6 +112,16 @@ VAD_STOP_SECS = float(os.environ.get("VAD_STOP_SECS", "0.5")) # cancels its reply (= caller hears silence). Set HALF_DUPLEX=false to allow barge-in. HALF_DUPLEX = os.environ.get("HALF_DUPLEX", "true").lower() not in ("false", "0", "no") ECHO_TAIL_SECS = float(os.environ.get("ECHO_TAIL_SECS", "0.25")) +# Silence watchdog: if the caller goes quiet after the agent speaks, re-prompt instead of +# dead-waiting (a missed/clipped answer otherwise hangs the call). After MAX re-prompts with +# no response, close gracefully. SILENCE_REPROMPT_SECS must be > HANGUP_DELAY_SECS so a real +# goodbye hangs up before the watchdog fires. +SILENCE_WATCHDOG = os.environ.get("SILENCE_WATCHDOG", "true").lower() not in ("false", "0", "no") +SILENCE_REPROMPT_SECS = float(os.environ.get("SILENCE_REPROMPT_SECS", "7.0")) +MAX_REPROMPTS = int(os.environ.get("MAX_REPROMPTS", "2")) +# Record each call to a stereo WAV (caller = left, agent = right) for review/debugging. +RECORD_CALLS = os.environ.get("RECORD_CALLS", "true").lower() not in ("false", "0", "no") +RECORDINGS_DIR = os.environ.get("RECORDINGS_DIR", os.path.join(HERE, "recordings")) # Agent persona name — purely for warmth; change/remove freely. AGENT_NAME = os.environ.get("AGENT_NAME", "Sofia") @@ -342,6 +354,75 @@ class HalfDuplexGate(FrameProcessor): await self.push_frame(frame, direction) +class SilenceWatchdog(FrameProcessor): + """Re-prompts on caller silence instead of dead-waiting. After the agent stops speaking it + arms a timer; if the caller hasn't started speaking within `silence_secs`, it injects a + re-prompt ("are you still there?"). After `max_prompts` unanswered re-prompts it speaks a + graceful closing and ends the call. Any caller speech (UserStartedSpeakingFrame) resets it; + the agent speaking cancels it. Place AFTER EndCallProcessor so its injected lines go to TTS + and `silence_secs` > HANGUP_DELAY_SECS so a real goodbye hangs up before it fires.""" + + def __init__(self, silence_secs: float, max_prompts: int, reprompt_line: str, closing_line: str): + super().__init__() + self._silence_secs = silence_secs + self._max_prompts = max_prompts + self._reprompt_line = reprompt_line + self._closing_line = closing_line + self._timer = None + self._prompts = 0 + self._bot_speaking = False + self._ending = False + + def _cancel(self): + if self._timer and not self._timer.done(): + self._timer.cancel() + self._timer = None + + def _arm(self): + self._cancel() + self._timer = asyncio.create_task(self._fire()) + + async def _fire(self): + try: + await asyncio.sleep(self._silence_secs) + except asyncio.CancelledError: + return + if self._bot_speaking or self._ending: + return + if self._prompts >= self._max_prompts: + self._ending = True + logger.info("SilenceWatchdog: still silent after re-prompts -- closing the call") + await self.push_frame(TTSSpeakFrame(self._closing_line), FrameDirection.DOWNSTREAM) + else: + self._prompts += 1 + logger.info(f"SilenceWatchdog: caller silent -- re-prompt #{self._prompts}") + await self.push_frame(TTSSpeakFrame(self._reprompt_line), FrameDirection.DOWNSTREAM) + + async def _end_soon(self): + await asyncio.sleep(HANGUP_DELAY_SECS) + logger.info("SilenceWatchdog: ending task after silent close") + try: + await self.push_frame(EndTaskFrame(), FrameDirection.UPSTREAM) + except Exception: + logger.exception("watchdog EndTaskFrame push failed") + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if isinstance(frame, UserStartedSpeakingFrame): + self._prompts = 0 # caller engaged again — reset + self._cancel() + elif isinstance(frame, BotStartedSpeakingFrame): + self._bot_speaking = True + self._cancel() + elif isinstance(frame, BotStoppedSpeakingFrame): + self._bot_speaking = False + if self._ending: + asyncio.create_task(self._end_soon()) + else: + self._arm() # start counting silence once the agent finishes + await self.push_frame(frame, direction) + + class HintedWhisperSTTService(WhisperSTTService): """WhisperSTTService that biases transcription toward domain vocabulary via faster-whisper `hotwords`. Pipecat's service doesn't expose hotwords, so we wrap @@ -540,6 +621,17 @@ async def run_agent(transport, caller_number=None, call_sid=None, do_capture=Tru phone_confirm_line = phone_marker = None endcall = EndCallProcessor(phone_confirm_line=phone_confirm_line, phone_marker=phone_marker) + watchdog = SilenceWatchdog( + silence_secs=SILENCE_REPROMPT_SECS, + max_prompts=MAX_REPROMPTS, + reprompt_line="I'm sorry, I didn't catch that — are you still there?", + closing_line="I'll let you go for now — please call us back anytime. Goodbye.", + ) if SILENCE_WATCHDOG else None + + # Stereo recorder (caller left / agent right) at the end so it captures what the system + # actually received + sent — for review and to debug silence with evidence, not guesses. + audiobuffer = AudioBufferProcessor(num_channels=2) if RECORD_CALLS else None + pipeline = Pipeline( [ transport.input(), @@ -550,9 +642,11 @@ async def run_agent(transport, caller_number=None, call_sid=None, do_capture=Tru agg.user(), llm, endcall, + *( [watchdog] if watchdog else [] ), # re-prompt on caller silence tts, transport.output(), agg.assistant(), + *( [audiobuffer] if audiobuffer else [] ), # record both directions ] ) @@ -565,9 +659,30 @@ async def run_agent(transport, caller_number=None, call_sid=None, do_capture=Tru ), ) + if audiobuffer: + os.makedirs(RECORDINGS_DIR, exist_ok=True) + + @audiobuffer.event_handler("on_audio_data") + async def _on_audio_data(buf, audio, sample_rate, num_channels): + import wave + from datetime import datetime + if not audio: + return + fname = f"{datetime.now():%Y%m%d-%H%M%S}_{call_sid or 'web'}.wav" + path = os.path.join(RECORDINGS_DIR, fname) + with wave.open(path, "wb") as wf: + wf.setnchannels(num_channels) + wf.setsampwidth(2) + wf.setframerate(sample_rate) + wf.writeframes(audio) + logger.info(f"Saved call recording: {path} " + f"({len(audio)} bytes, {num_channels}ch @ {sample_rate}Hz)") + @transport.event_handler("on_client_connected") async def on_client_connected(transport, client): logger.info("Client connected -- greeting") + if audiobuffer: + await audiobuffer.start_recording() await task.queue_frames( [TTSSpeakFrame( f"Thank you for calling Advanced Vision Care, this is {AGENT_NAME}. " @@ -578,6 +693,8 @@ async def run_agent(transport, caller_number=None, call_sid=None, do_capture=Tru @transport.event_handler("on_client_disconnected") async def on_client_disconnected(transport, client): logger.info("Client disconnected -- ending task") + if audiobuffer: + await audiobuffer.stop_recording() await task.queue_frame(EndFrame()) runner = PipelineRunner(handle_sigint=False)