Add call recording (stereo WAV) + wire silence re-prompt watchdog
Stop debugging silence by guesswork: AudioBufferProcessor records every call to
recordings/<ts>_<callsid>.wav (caller=left, agent=right) so calls can be reviewed
with actual audio. (We had no audio before — that was the real gap; the earlier
"too quiet" explanation was unsupported.)
SilenceWatchdog: after the agent finishes, if the caller is silent for
SILENCE_REPROMPT_SECS (7s) it re-prompts ("are you still there?"); after
MAX_REPROMPTS it closes gracefully. This directly breaks the dead-silence
pattern (e.g. the 14s gap after the phone confirmation) instead of waiting.
Runtime-tested both. .gitignore already excludes recordings/.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
117
bot.py
117
bot.py
@@ -32,12 +32,14 @@ from pipecat.frames.frames import (
|
||||
LLMFullResponseEndFrame,
|
||||
LLMTextFrame,
|
||||
TTSSpeakFrame,
|
||||
UserStartedSpeakingFrame,
|
||||
)
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
|
||||
from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor
|
||||
from pipecat.processors.audio.vad_processor import VADProcessor
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.serializers.twilio import TwilioFrameSerializer
|
||||
@@ -110,6 +112,16 @@ VAD_STOP_SECS = float(os.environ.get("VAD_STOP_SECS", "0.5"))
|
||||
# cancels its reply (= caller hears silence). Set HALF_DUPLEX=false to allow barge-in.
|
||||
HALF_DUPLEX = os.environ.get("HALF_DUPLEX", "true").lower() not in ("false", "0", "no")
|
||||
ECHO_TAIL_SECS = float(os.environ.get("ECHO_TAIL_SECS", "0.25"))
|
||||
# Silence watchdog: if the caller goes quiet after the agent speaks, re-prompt instead of
|
||||
# dead-waiting (a missed/clipped answer otherwise hangs the call). After MAX re-prompts with
|
||||
# no response, close gracefully. SILENCE_REPROMPT_SECS must be > HANGUP_DELAY_SECS so a real
|
||||
# goodbye hangs up before the watchdog fires.
|
||||
SILENCE_WATCHDOG = os.environ.get("SILENCE_WATCHDOG", "true").lower() not in ("false", "0", "no")
|
||||
SILENCE_REPROMPT_SECS = float(os.environ.get("SILENCE_REPROMPT_SECS", "7.0"))
|
||||
MAX_REPROMPTS = int(os.environ.get("MAX_REPROMPTS", "2"))
|
||||
# Record each call to a stereo WAV (caller = left, agent = right) for review/debugging.
|
||||
RECORD_CALLS = os.environ.get("RECORD_CALLS", "true").lower() not in ("false", "0", "no")
|
||||
RECORDINGS_DIR = os.environ.get("RECORDINGS_DIR", os.path.join(HERE, "recordings"))
|
||||
|
||||
# Agent persona name — purely for warmth; change/remove freely.
|
||||
AGENT_NAME = os.environ.get("AGENT_NAME", "Sofia")
|
||||
@@ -342,6 +354,75 @@ class HalfDuplexGate(FrameProcessor):
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
|
||||
class SilenceWatchdog(FrameProcessor):
|
||||
"""Re-prompts on caller silence instead of dead-waiting. After the agent stops speaking it
|
||||
arms a timer; if the caller hasn't started speaking within `silence_secs`, it injects a
|
||||
re-prompt ("are you still there?"). After `max_prompts` unanswered re-prompts it speaks a
|
||||
graceful closing and ends the call. Any caller speech (UserStartedSpeakingFrame) resets it;
|
||||
the agent speaking cancels it. Place AFTER EndCallProcessor so its injected lines go to TTS
|
||||
and `silence_secs` > HANGUP_DELAY_SECS so a real goodbye hangs up before it fires."""
|
||||
|
||||
def __init__(self, silence_secs: float, max_prompts: int, reprompt_line: str, closing_line: str):
|
||||
super().__init__()
|
||||
self._silence_secs = silence_secs
|
||||
self._max_prompts = max_prompts
|
||||
self._reprompt_line = reprompt_line
|
||||
self._closing_line = closing_line
|
||||
self._timer = None
|
||||
self._prompts = 0
|
||||
self._bot_speaking = False
|
||||
self._ending = False
|
||||
|
||||
def _cancel(self):
|
||||
if self._timer and not self._timer.done():
|
||||
self._timer.cancel()
|
||||
self._timer = None
|
||||
|
||||
def _arm(self):
|
||||
self._cancel()
|
||||
self._timer = asyncio.create_task(self._fire())
|
||||
|
||||
async def _fire(self):
|
||||
try:
|
||||
await asyncio.sleep(self._silence_secs)
|
||||
except asyncio.CancelledError:
|
||||
return
|
||||
if self._bot_speaking or self._ending:
|
||||
return
|
||||
if self._prompts >= self._max_prompts:
|
||||
self._ending = True
|
||||
logger.info("SilenceWatchdog: still silent after re-prompts -- closing the call")
|
||||
await self.push_frame(TTSSpeakFrame(self._closing_line), FrameDirection.DOWNSTREAM)
|
||||
else:
|
||||
self._prompts += 1
|
||||
logger.info(f"SilenceWatchdog: caller silent -- re-prompt #{self._prompts}")
|
||||
await self.push_frame(TTSSpeakFrame(self._reprompt_line), FrameDirection.DOWNSTREAM)
|
||||
|
||||
async def _end_soon(self):
|
||||
await asyncio.sleep(HANGUP_DELAY_SECS)
|
||||
logger.info("SilenceWatchdog: ending task after silent close")
|
||||
try:
|
||||
await self.push_frame(EndTaskFrame(), FrameDirection.UPSTREAM)
|
||||
except Exception:
|
||||
logger.exception("watchdog EndTaskFrame push failed")
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
if isinstance(frame, UserStartedSpeakingFrame):
|
||||
self._prompts = 0 # caller engaged again — reset
|
||||
self._cancel()
|
||||
elif isinstance(frame, BotStartedSpeakingFrame):
|
||||
self._bot_speaking = True
|
||||
self._cancel()
|
||||
elif isinstance(frame, BotStoppedSpeakingFrame):
|
||||
self._bot_speaking = False
|
||||
if self._ending:
|
||||
asyncio.create_task(self._end_soon())
|
||||
else:
|
||||
self._arm() # start counting silence once the agent finishes
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
|
||||
class HintedWhisperSTTService(WhisperSTTService):
|
||||
"""WhisperSTTService that biases transcription toward domain vocabulary via
|
||||
faster-whisper `hotwords`. Pipecat's service doesn't expose hotwords, so we wrap
|
||||
@@ -540,6 +621,17 @@ async def run_agent(transport, caller_number=None, call_sid=None, do_capture=Tru
|
||||
phone_confirm_line = phone_marker = None
|
||||
endcall = EndCallProcessor(phone_confirm_line=phone_confirm_line, phone_marker=phone_marker)
|
||||
|
||||
watchdog = SilenceWatchdog(
|
||||
silence_secs=SILENCE_REPROMPT_SECS,
|
||||
max_prompts=MAX_REPROMPTS,
|
||||
reprompt_line="I'm sorry, I didn't catch that — are you still there?",
|
||||
closing_line="I'll let you go for now — please call us back anytime. Goodbye.",
|
||||
) if SILENCE_WATCHDOG else None
|
||||
|
||||
# Stereo recorder (caller left / agent right) at the end so it captures what the system
|
||||
# actually received + sent — for review and to debug silence with evidence, not guesses.
|
||||
audiobuffer = AudioBufferProcessor(num_channels=2) if RECORD_CALLS else None
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(),
|
||||
@@ -550,9 +642,11 @@ async def run_agent(transport, caller_number=None, call_sid=None, do_capture=Tru
|
||||
agg.user(),
|
||||
llm,
|
||||
endcall,
|
||||
*( [watchdog] if watchdog else [] ), # re-prompt on caller silence
|
||||
tts,
|
||||
transport.output(),
|
||||
agg.assistant(),
|
||||
*( [audiobuffer] if audiobuffer else [] ), # record both directions
|
||||
]
|
||||
)
|
||||
|
||||
@@ -565,9 +659,30 @@ async def run_agent(transport, caller_number=None, call_sid=None, do_capture=Tru
|
||||
),
|
||||
)
|
||||
|
||||
if audiobuffer:
|
||||
os.makedirs(RECORDINGS_DIR, exist_ok=True)
|
||||
|
||||
@audiobuffer.event_handler("on_audio_data")
|
||||
async def _on_audio_data(buf, audio, sample_rate, num_channels):
|
||||
import wave
|
||||
from datetime import datetime
|
||||
if not audio:
|
||||
return
|
||||
fname = f"{datetime.now():%Y%m%d-%H%M%S}_{call_sid or 'web'}.wav"
|
||||
path = os.path.join(RECORDINGS_DIR, fname)
|
||||
with wave.open(path, "wb") as wf:
|
||||
wf.setnchannels(num_channels)
|
||||
wf.setsampwidth(2)
|
||||
wf.setframerate(sample_rate)
|
||||
wf.writeframes(audio)
|
||||
logger.info(f"Saved call recording: {path} "
|
||||
f"({len(audio)} bytes, {num_channels}ch @ {sample_rate}Hz)")
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info("Client connected -- greeting")
|
||||
if audiobuffer:
|
||||
await audiobuffer.start_recording()
|
||||
await task.queue_frames(
|
||||
[TTSSpeakFrame(
|
||||
f"Thank you for calling Advanced Vision Care, this is {AGENT_NAME}. "
|
||||
@@ -578,6 +693,8 @@ async def run_agent(transport, caller_number=None, call_sid=None, do_capture=Tru
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info("Client disconnected -- ending task")
|
||||
if audiobuffer:
|
||||
await audiobuffer.stop_recording()
|
||||
await task.queue_frame(EndFrame())
|
||||
|
||||
runner = PipelineRunner(handle_sigint=False)
|
||||
|
||||
Reference in New Issue
Block a user