diff --git a/.env.example b/.env.example index f21bb9f..3199d4c 100644 --- a/.env.example +++ b/.env.example @@ -68,3 +68,7 @@ VAD_CONFIDENCE=0.5 VAD_MIN_VOLUME=0.15 VAD_START_SECS=0.1 VAD_STOP_SECS=0.5 +# Deterministic slot memory (callstate.py): injects an ALREADY-COLLECTED / STILL-NEEDED +# checklist into the system prompt each turn + merges VAD-fragmented user turns, so the +# local 8B stops re-asking for name/reason/phone. Default: on for ollama, off for anthropic. +#CALL_STATE_TRACKING=true diff --git a/CLAUDE.md b/CLAUDE.md index d968782..67e9f8c 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -83,7 +83,23 @@ audio while the bot is speaking (+`ECHO_TAIL_SECS`, default 0.5s) so echo never Trade-off: half-duplex — the caller can't barge in mid-utterance (fine for short replies). `HALF_DUPLEX=false` restores barge-in. Keep it on for telephony. -**Post-call extraction (`extract.py`)** — single JSON-mode completion after call ends. +**`CallStateGroomer` (`callstate.py`) — deterministic slot memory (2026-07-03).** Fixes the +8B re-asking for things the caller already gave (name, reason, phone — seen repeatedly in the +historical call logs: "Didn't you say you had my phone number?", "I already gave you my full +name", the "I want an appointment"→"what brings you in?" loop). Sits between `agg.user()` and +the LLM. Two jobs: (1) on upstream `BotStoppedSpeakingFrame` (agent finished; Ollama idle, +caller talking) it runs a ~1.2s JSON-mode extraction over the transcript-so-far — OFF the +latency-critical path, result applied next turn; (2) on downstream `LLMContextFrame` (right +before generation) it synchronously merges VAD-fragmented consecutive user messages +("Monday" / "3 p.m." → one turn) and injects an explicit checklist into the system message: +`CALL STATE ... ALREADY COLLECTED (NEVER ask again): name=Carlos Garcia ... STILL NEEDED: +insurance, preferred day/time`. It also carries call type (`callback` → "do NOT ask booking +questions"). Verified via `scripts/ab_replay.py` (replays the real failed calls): llama3.1-8B +raw = 3 failures, +CALL STATE = **0 failures**, chat latency 0.31s→0.55s med (system-message +churn re-evals the prompt; acceptable, still ≪ the 3s gate). Env: `CALL_STATE_TRACKING` +(default: on for ollama, off for anthropic — Claude tracks state fine on its own; extraction +always runs on the local Ollama model). Qwen3-14B was A/B'd as an alternative and rejected +for now: no better raw, ~3s/turn with state, needs `think:false` handling, ~11GB VRAM. Correctly uses `format: json`, uses verified Twilio caller-ID instead of trusting model output, falls back to JSONL if Odoo is unreachable. Keep it. **Classifies `request_type`:** `appointment` (booking), `callback` (a non-booking request staff @@ -495,6 +511,10 @@ Beyond the three reverted changes, the following hardening is live (see git hist - **Reason capture** — post-call extractor broadened to capture the eye problem/symptom as the reason (not just visit types); reason now shown in the log line and the Odoo lead title. - **Hang-up** — `HANGUP_DELAY_SECS=4` grace pause before dropping the carrier leg. - **Office selection** — confirm the matching office; never offer/compare others. +- **Re-ask fix (2026-07-03)** — `CallStateGroomer` slot-state checklist + user-turn merge (see + component note above); prompt step 1 now says "I want an appointment" is intent not reason — + ask the visit reason ONCE, then move on. Regression harness: `scripts/ab_replay.py [--state] + ` replays the historical failure scenarios and flags re-asks. ### Phase 2 — Accuracy (RAG + validation) diff --git a/bot.py b/bot.py index b2377d2..c2d40a6 100644 --- a/bot.py +++ b/bot.py @@ -53,6 +53,7 @@ from pipecat.transports.websocket.fastapi import ( FastAPIWebsocketTransport, ) +from callstate import CallStateGroomer from practice import practice_summary # ── Config (env-overridable) ───────────────────────────────────────────────── @@ -120,6 +121,17 @@ ECHO_TAIL_SECS = float(os.environ.get("ECHO_TAIL_SECS", "0.25")) SILENCE_WATCHDOG = os.environ.get("SILENCE_WATCHDOG", "true").lower() not in ("false", "0", "no") SILENCE_REPROMPT_SECS = float(os.environ.get("SILENCE_REPROMPT_SECS", "7.0")) MAX_REPROMPTS = int(os.environ.get("MAX_REPROMPTS", "2")) +# Deterministic slot-state tracking (callstate.py): after each agent turn, extract what the +# caller already provided and inject an explicit ALREADY-COLLECTED / STILL-NEEDED checklist +# into the system message, plus merge VAD-fragmented user turns. Fixes the 8B re-asking for +# name/reason/phone it was already given. Extraction runs on the local Ollama model, so it +# auto-disables for the anthropic provider (Claude tracks state fine on its own). +_call_state_env = os.environ.get("CALL_STATE_TRACKING") +CALL_STATE_TRACKING = ( + _call_state_env.lower() in ("1", "true", "yes") + if _call_state_env is not None + else (LLM_PROVIDER == "ollama") +) # Record each call to a stereo WAV (caller = left, agent = right) for review/debugging. RECORD_CALLS = os.environ.get("RECORD_CALLS", "true").lower() not in ("false", "0", "no") RECORDINGS_DIR = os.environ.get("RECORDINGS_DIR", os.path.join(HERE, "recordings")) @@ -165,7 +177,11 @@ SYSTEM_PROMPT = ( "THIS case — switch to taking a message; never force booking questions on a non-booking caller.\n" " • A BOOKING (they want to schedule a visit) — work through these steps in order:\n" " 1. REASON FIRST — find out what they are calling about (the reason for the visit, or " - "their question). If it is only a question, answer it.\n" + "their question). If it is only a question, answer it. NOTE: 'I want an appointment' / 'I " + "need to make an appointment' is the booking INTENT, not the reason — never treat it as a " + "non-answer. Acknowledge it and ask ONCE what the visit is for, e.g. 'Happy to help — what " + "would you like to be seen for?'. If they just say 'an appointment' again or give no medical " + "reason, note it as a general visit and MOVE ON to location — NEVER ask the reason twice.\n" " 2. LOCATION — ask which city or area is most convenient, then confirm the matching " "office (see the office rule below).\n" " 3. CALLER INFO — get their FULL name (first and last; if they give only a first name, " @@ -656,6 +672,11 @@ async def run_agent(transport, caller_number=None, call_sid=None, do_capture=Tru context_kwargs["tools"] = _build_tools() context = LLMContext(**context_kwargs) agg = LLMContextAggregatorPair(context) + # Deterministic slot memory: merges fragmented user turns + injects the live + # collected/needed checklist into the system message before each generation. + groomer = CallStateGroomer( + context, base_system=system_content, ollama_url=OLLAMA_URL, model=OLLAMA_MODEL, + ) if CALL_STATE_TRACKING else None # Deterministic phone-confirmation safety net: if the agent reaches a closing without # having read the caller-ID back, EndCallProcessor speaks this scripted line first. if caller_number: @@ -687,6 +708,7 @@ async def run_agent(transport, caller_number=None, call_sid=None, do_capture=Tru vad, stt, agg.user(), + *( [groomer] if groomer else [] ), # slot-state checklist + user-turn merge llm, endcall, *( [watchdog] if watchdog else [] ), # re-prompt on caller silence diff --git a/callstate.py b/callstate.py new file mode 100644 index 0000000..cf8d837 --- /dev/null +++ b/callstate.py @@ -0,0 +1,232 @@ +"""In-call slot-state tracking — deterministic memory for a weak LLM. + +The 8B keeps re-asking for things the caller already said (name, reason, phone) because +it has to *infer* call state from a long transcript under ~1,400 tokens of rules. This +module makes the state explicit instead: after each agent turn (while the caller is +talking — off the latency-critical path) it runs one short JSON-mode extraction over the +transcript, then injects a live checklist into the system message before the next +generation: + + CALL STATE ... ALREADY COLLECTED (never ask again): name=Carlos Garcia, ... + STILL NEEDED: insurance, preferred day/time + +Small models follow an explicit checklist at the end of the system prompt far more +reliably than they track slots from conversation history. Same philosophy as the +deterministic phone-confirm safety net in EndCallProcessor: scaffold around the model. + +CallStateGroomer also merges consecutive user messages in the context (VAD splits one +utterance like "Monday" / "3 p.m." into two turns, which derails the 8B) — done +synchronously on LLMContextFrame, right before the LLM reads the context. +""" + +import asyncio +import json + +import httpx +from loguru import logger + +from pipecat.frames.frames import BotStoppedSpeakingFrame, Frame, LLMContextFrame +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor + +# Short, in-call variant of the post-call extractor (extract.py): only what's needed to +# build the checklist, temperature 0, capped output. Runs on the local Ollama model. +_STATE_INSTRUCTIONS = ( + "You are tracking the state of a LIVE phone call between a caller and the receptionist " + "of an optometry practice. From the transcript, extract only what the CALLER has clearly " + "provided so far. Respond with ONLY a JSON object with these keys:\n" + ' "call_type": "booking" (wants to schedule a visit), "callback" (wants something staff ' + "must check off-phone: order/frames/lens/prescription status, billing, account lookup, " + 'reach a person), "question" (just asking something), or "unknown"\n' + ' "reason": string or null — for booking, why they want to be seen (visit type or eye ' + "problem); for callback, a one-line note of what they need. 'an appointment' alone is NOT " + "a reason — use null.\n" + ' "location": string or null — the office/city the caller wants\n' + ' "patient_name": string or null — the caller\'s name as given (full or first-only)\n' + ' "name_is_full": boolean — true only if it clearly has first AND last name\n' + ' "insurance": string or null — the plan the caller named, exactly as said\n' + ' "preferred_time": string or null — day/time in the caller\'s own words\n' + "Use null unless the caller clearly stated it. Never invent values." +) + +# Booking slots in the order the call script gathers them. +_BOOKING_ORDER = [ + ("reason", "reason for the visit"), + ("location", "which office/city"), + ("patient_name", "full name"), + ("insurance", "insurance"), + ("preferred_time", "preferred day and time"), +] + + +async def extract_call_state(messages, ollama_url, model, timeout=15): + """One short JSON-mode pass over the transcript-so-far. Returns the state dict or None.""" + turns = [ + f"{m['role']}: {m['content']}" + for m in messages + if m.get("role") in ("user", "assistant") + and isinstance(m.get("content"), str) and m["content"].strip() + ] + if not turns: + return None + base = ollama_url.rstrip("/") + if base.endswith("/v1"): + base = base[:-3] + body_extra = {} + if "qwen3" in model or "deepseek-r1" in model: + body_extra["think"] = False # thinking models emit non-JSON otherwise + async with httpx.AsyncClient(timeout=timeout) as client: + r = await client.post( + f"{base}/api/chat", + json={ + "model": model, + "format": "json", + "stream": False, + "options": {"temperature": 0, "num_predict": 200}, + **body_extra, + "messages": [ + {"role": "system", "content": _STATE_INSTRUCTIONS}, + {"role": "user", "content": "Transcript:\n" + "\n".join(turns)}, + ], + }, + ) + r.raise_for_status() + return json.loads(r.json()["message"]["content"]) + + +def build_state_block(state) -> str: + """Render the extracted state as an explicit checklist for the system prompt. + Returns "" when there's nothing worth injecting yet (first turns).""" + if not state: + return "" + ctype = (state.get("call_type") or "unknown").strip().lower() + got, needed = [], [] + for key, label in _BOOKING_ORDER: + val = (state.get(key) or "").strip() if isinstance(state.get(key), str) else "" + if key == "patient_name" and val and not state.get("name_is_full"): + got.append(f"first name: {val}") + needed.append("their LAST name (you have the first)") + continue + if val: + got.append(f"{label}: {val}") + else: + needed.append(label) + + if ctype == "callback": + lines = [ + "CALL STATE (auto-tracked from this conversation — trust it over your memory):", + "- This is a NON-BOOKING call: the caller needs staff to handle something off the " + "phone. Do NOT ask about insurance, office, or a preferred day/time.", + ] + if got: + lines.append("- ALREADY COLLECTED — NEVER ask for these again: " + "; ".join(got)) + if state.get("patient_name") is None: + lines.append("- Still needed: their name. Then confirm the callback number and close.") + else: + lines.append("- You have what you need: confirm the callback number and close.") + return "\n".join(lines) + + if ctype == "booking" and (got or needed): + lines = ["CALL STATE (auto-tracked from this conversation — trust it over your memory):"] + if got: + lines.append("- ALREADY COLLECTED — NEVER ask for these again: " + "; ".join(got)) + if needed: + lines.append("- STILL NEEDED — ask for the FIRST of these, one per turn: " + + ", ".join(needed)) + # The observed failure loop: caller says "an appointment", model keeps asking why. + if not (state.get("reason") or "").strip(): + lines.append("- No visit reason yet: if you have ALREADY asked what the visit " + "is for and they only said 'an appointment', do NOT ask again — " + "note it as a general visit and ask the next needed item instead.") + else: + lines.append("- All booking details collected: confirm the callback number, recap " + "as a REQUEST, ask if there's anything else, then close.") + return "\n".join(lines) + + return "" # question/unknown — nothing useful to inject + + +def merge_consecutive_user_messages(messages): + """Collapse back-to-back user messages (VAD-fragmented utterances) into one turn. + Returns a new list; non-string content (tool results) is left untouched.""" + out = [] + for m in messages: + prev = out[-1] if out else None + if ( + prev is not None + and m.get("role") == "user" and prev.get("role") == "user" + and isinstance(m.get("content"), str) and isinstance(prev.get("content"), str) + ): + prev = dict(prev) + prev["content"] = (prev["content"].rstrip() + " " + m["content"].lstrip()).strip() + out[-1] = prev + else: + out.append(m) + return out + + +class CallStateGroomer(FrameProcessor): + """Sits between the user aggregator and the LLM. + + Downstream LLMContextFrame (= a generation is about to start): synchronously groom the + context — merge fragmented user turns, refresh the system message with the latest + CALL STATE checklist. + + Upstream BotStoppedSpeakingFrame (= the agent finished a reply; Ollama is idle and the + caller is about to talk): kick off the next state extraction in the background. Its + result is applied on the *next* LLMContextFrame — one turn of lag, zero added latency. + """ + + def __init__(self, context, base_system: str, ollama_url: str, model: str): + super().__init__() + self._context = context + self._base_system = base_system + self._ollama_url = ollama_url + self._model = model + self._state = None + self._task = None + + def _extract_done(self, task): + self._task = None + if task.cancelled(): + return + exc = task.exception() + if exc: + logger.warning(f"CallState extraction failed: {exc}") + return + state = task.result() + if state: + self._state = state + logger.info(f"CallState updated: {json.dumps(state, ensure_ascii=False)}") + + def _maybe_extract(self): + if self._task is not None: # one in flight at a time + return + messages = list(self._context.messages) + if not any(m.get("role") == "user" for m in messages): + return # greeting only — nothing to extract yet + self._task = asyncio.create_task( + extract_call_state(messages, self._ollama_url, self._model) + ) + self._task.add_done_callback(self._extract_done) + + def _groom_context(self): + messages = merge_consecutive_user_messages(list(self._context.messages)) + block = build_state_block(self._state) + for i, m in enumerate(messages): + if m.get("role") == "system": + content = self._base_system + ("\n\n" + block if block else "") + if m.get("content") != content: + messages[i] = {**m, "content": content} + break + self._context.set_messages(messages) + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if isinstance(frame, LLMContextFrame) and direction == FrameDirection.DOWNSTREAM: + try: + self._groom_context() + except Exception: + logger.exception("CallState groom failed (continuing with raw context)") + elif isinstance(frame, BotStoppedSpeakingFrame): + self._maybe_extract() + await self.push_frame(frame, direction) diff --git a/scripts/ab_replay.py b/scripts/ab_replay.py new file mode 100644 index 0000000..63845db --- /dev/null +++ b/scripts/ab_replay.py @@ -0,0 +1,165 @@ +#!/usr/bin/env python3 +"""A/B replay: re-run the historical problem scenarios against candidate LLMs. + +Replays scripted caller turns (taken from real failed calls in the run logs) through the +production system prompt and checks each model for the observed failure modes: re-asking +the reason ("I want an appointment" loop), re-asking name/phone, and forcing booking +questions (insurance/day-time) on non-booking callers. Also reports per-turn latency. + +Usage (inside the pipecat venv): + python scripts/ab_replay.py activeblue-avc:latest qwen3:14b + python scripts/ab_replay.py --state activeblue-avc:latest # with CALL STATE injection + +--state simulates the CallStateGroomer: between turns it runs the callstate extraction +and injects the ALREADY COLLECTED / STILL NEEDED checklist, exactly as in-call. +""" +import argparse +import asyncio +import re +import sys +import time +from pathlib import Path + +import httpx + +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) +from bot import SYSTEM_PROMPT # noqa: E402 (import parses env + practice facts only) +from callstate import build_state_block, extract_call_state # noqa: E402 + +OLLAMA = "http://127.0.0.1:11434" +CALLER_LINE = ( + "\n\nCALLER ID: the caller's number on file, written so you read it digit by digit, " + "is: nine seven three, five seven three, one six seven one. Near the end, state it back " + "and invite a correction only ('...; if that's not the best number, just let me know.') — " + "do NOT ask a yes/no question or wait for a 'yes'. Only change it if they give a different " + "number. Do not say it any earlier in the call." +) +GREETING = "Thank you for calling Advanced Vision Care, this is AVA. How can I help you today?" + +# Failure-mode detectors: (label, regex counted across assistant turns, max allowed count) +ASK_REASON = re.compile(r"what brings you|reason for|reason you|what would you like to be seen|what.s the visit for|what seems to be", re.I) +ASK_NAME = re.compile(r"(full |your |the )name", re.I) +ASK_INSURANCE = re.compile(r"insurance", re.I) +# Asking FOR a number is the failure; the statement-form readback ("I have your number +# as ...; if that's not the best number, just let me know") is correct behavior. +ASK_PHONE_Q = re.compile(r"(what('| i)s|can I (get|have)|may I (get|have)|could I (get|have)|give me).{0,40}(phone|number)", re.I) +ASK_LOCATION = re.compile(r"(which|what).{0,30}(city|area|office|location)", re.I) + +# Scenarios distilled from real failed calls (log refs in comments). A tuple within +# `turns` = VAD-fragmented utterance (two user messages, one reply) — from log.10 call#1. +SCENARIOS = [ + dict( + name="reason-loop (avc_run.log call#1 / log.21 call#5)", + turns=["I want an appointment.", "appointment", "Kendall", + "Carlos Garcia", "Humana", ("Monday", "3 p.m."), "No, that's all, thank you."], + checks=[("re-asked reason", ASK_REASON, 1), + ("re-asked name", ASK_NAME, 1), + ("re-asked location", ASK_LOCATION, 1), + ("asked for phone (has caller-ID)", ASK_PHONE_Q, 0)], + ), + dict( + name="glasses callback (log.23/24/25)", + turns=["Hey, I'm a patient in Kendall and I need to know when my glasses are ready.", + "Carlos Garcia", "That's what I'm asking — the status of my order.", + "Yes, that's a good number.", "No, that's all."], + checks=[("asked insurance on non-booking call", ASK_INSURANCE, 0), + ("asked day/time on non-booking call", + re.compile(r"(what|which) day|day and time|preferred (day|time)", re.I), 0), + ("re-asked name", ASK_NAME, 1)], + ), + dict( + name="early-info booking (log.4 call#1: reason+city up front)", + turns=["I'm having eye pain and I'm in Kendall, Florida.", "Yes please.", + "Carlos Garcia", "Florida Blue Medicare", ("Monday", "5 p.m."), + "No, that's everything."], + checks=[("re-asked reason", ASK_REASON, 1), + ("re-asked location", ASK_LOCATION, 0), # was given in turn 1 + ("re-asked name", ASK_NAME, 1), + ("asked for phone (has caller-ID)", ASK_PHONE_Q, 0)], + ), +] + + +async def chat(client, model, messages, think_capable): + body = { + "model": model, "stream": False, "messages": messages, + "options": {"temperature": 0.3, "num_predict": 160, "num_ctx": 8192}, + } + if think_capable: + body["think"] = False + t0 = time.time() + r = await client.post(f"{OLLAMA}/api/chat", json=body) + r.raise_for_status() + return r.json()["message"]["content"].strip(), time.time() - t0 + + +async def run_scenario(client, model, sc, with_state): + think_capable = "qwen3" in model or "deepseek-r1" in model + base_system = SYSTEM_PROMPT + CALLER_LINE + msgs = [{"role": "system", "content": base_system}, + {"role": "assistant", "content": GREETING}] + lats, transcript = [], [("A", GREETING)] + for turn in sc["turns"]: + frags = turn if isinstance(turn, tuple) else (turn,) + for f in frags: + msgs.append({"role": "user", "content": f}) + transcript.append(("C", f)) + if with_state: + try: + state = await extract_call_state(msgs, OLLAMA, model) + block = build_state_block(state) + msgs[0]["content"] = base_system + ("\n\n" + block if block else "") + except Exception as e: + print(f" (state extraction failed: {e})") + reply, dt = await chat(client, model, msgs, think_capable) + lats.append(dt) + msgs.append({"role": "assistant", "content": reply}) + transcript.append(("A", reply)) + if "goodbye" in reply.lower(): + break + return transcript, lats + + +def score(sc, transcript): + replies = [t for r, t in transcript if r == "A"] + fails = [] + for label, rx, max_ok in sc["checks"]: + n = sum(1 for t in replies if rx.search(t)) + if n > max_ok: + fails.append(f"{label} ({n}x, max {max_ok})") + return fails + + +async def main(): + ap = argparse.ArgumentParser() + ap.add_argument("models", nargs="+") + ap.add_argument("--state", action="store_true", help="inject CALL STATE checklist per turn") + ap.add_argument("-v", "--verbose", action="store_true", help="print transcripts") + args = ap.parse_args() + + async with httpx.AsyncClient(timeout=120) as client: + results = {} + for model in args.models: + print(f"\n{'='*70}\nMODEL: {model}{' + CALL STATE' if args.state else ''}\n{'='*70}") + total_fails, all_lats = 0, [] + for sc in SCENARIOS: + transcript, lats = await run_scenario(client, model, sc, args.state) + fails = score(sc, transcript) + total_fails += len(fails) + all_lats += lats + mark = "PASS" if not fails else "FAIL: " + "; ".join(fails) + print(f"\n--- {sc['name']} -> {mark}") + if args.verbose or fails: + for r, t in transcript: + print(f" {r}: {t}") + lat = sorted(all_lats) + results[model] = (total_fails, lat[len(lat)//2], lat[-1]) + print(f"\n{model}: {total_fails} failure(s) | latency med={lat[len(lat)//2]:.2f}s max={lat[-1]:.2f}s") + + print(f"\n{'='*70}\nSUMMARY{' (+state)' if args.state else ''}") + for m, (f, med, mx) in results.items(): + print(f" {m:35s} failures={f} lat med={med:.2f}s max={mx:.2f}s") + + +if __name__ == "__main__": + asyncio.run(main())