Fix re-asking: deterministic slot memory + user-turn merge + reason-loop prompt
Historical calls showed the 8B re-asking for name/reason/phone it already had
("I already gave you my full name", the "I want an appointment" -> "what brings
you in?" loop) and VAD splitting one utterance into consecutive user turns.
- callstate.py: CallStateGroomer between agg.user() and the LLM. After each
agent turn (off the critical path) it extracts collected slots via one short
JSON-mode Ollama pass, then before each generation injects an ALREADY
COLLECTED / STILL NEEDED checklist into the system message and merges
VAD-fragmented consecutive user messages. Callback-type calls get an explicit
"no booking questions" line. CALL_STATE_TRACKING env (auto: on for ollama,
off for anthropic).
- bot.py prompt step 1: "I want an appointment" is the booking intent, not the
reason - ask the visit reason once, never twice.
- scripts/ab_replay.py: regression harness replaying the real failed calls.
llama3.1-8b raw = 3 failures; with CALL STATE = 0 failures across all
scenarios (chat latency 0.31s -> 0.55s median, well under the 3s gate).
Qwen3-14B A/B'd and rejected: no better raw, ~3s/turn, 11GB VRAM.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
232
callstate.py
Normal file
232
callstate.py
Normal file
@@ -0,0 +1,232 @@
|
||||
"""In-call slot-state tracking — deterministic memory for a weak LLM.
|
||||
|
||||
The 8B keeps re-asking for things the caller already said (name, reason, phone) because
|
||||
it has to *infer* call state from a long transcript under ~1,400 tokens of rules. This
|
||||
module makes the state explicit instead: after each agent turn (while the caller is
|
||||
talking — off the latency-critical path) it runs one short JSON-mode extraction over the
|
||||
transcript, then injects a live checklist into the system message before the next
|
||||
generation:
|
||||
|
||||
CALL STATE ... ALREADY COLLECTED (never ask again): name=Carlos Garcia, ...
|
||||
STILL NEEDED: insurance, preferred day/time
|
||||
|
||||
Small models follow an explicit checklist at the end of the system prompt far more
|
||||
reliably than they track slots from conversation history. Same philosophy as the
|
||||
deterministic phone-confirm safety net in EndCallProcessor: scaffold around the model.
|
||||
|
||||
CallStateGroomer also merges consecutive user messages in the context (VAD splits one
|
||||
utterance like "Monday" / "3 p.m." into two turns, which derails the 8B) — done
|
||||
synchronously on LLMContextFrame, right before the LLM reads the context.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
|
||||
import httpx
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.frames.frames import BotStoppedSpeakingFrame, Frame, LLMContextFrame
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
|
||||
# Short, in-call variant of the post-call extractor (extract.py): only what's needed to
|
||||
# build the checklist, temperature 0, capped output. Runs on the local Ollama model.
|
||||
_STATE_INSTRUCTIONS = (
|
||||
"You are tracking the state of a LIVE phone call between a caller and the receptionist "
|
||||
"of an optometry practice. From the transcript, extract only what the CALLER has clearly "
|
||||
"provided so far. Respond with ONLY a JSON object with these keys:\n"
|
||||
' "call_type": "booking" (wants to schedule a visit), "callback" (wants something staff '
|
||||
"must check off-phone: order/frames/lens/prescription status, billing, account lookup, "
|
||||
'reach a person), "question" (just asking something), or "unknown"\n'
|
||||
' "reason": string or null — for booking, why they want to be seen (visit type or eye '
|
||||
"problem); for callback, a one-line note of what they need. 'an appointment' alone is NOT "
|
||||
"a reason — use null.\n"
|
||||
' "location": string or null — the office/city the caller wants\n'
|
||||
' "patient_name": string or null — the caller\'s name as given (full or first-only)\n'
|
||||
' "name_is_full": boolean — true only if it clearly has first AND last name\n'
|
||||
' "insurance": string or null — the plan the caller named, exactly as said\n'
|
||||
' "preferred_time": string or null — day/time in the caller\'s own words\n'
|
||||
"Use null unless the caller clearly stated it. Never invent values."
|
||||
)
|
||||
|
||||
# Booking slots in the order the call script gathers them.
|
||||
_BOOKING_ORDER = [
|
||||
("reason", "reason for the visit"),
|
||||
("location", "which office/city"),
|
||||
("patient_name", "full name"),
|
||||
("insurance", "insurance"),
|
||||
("preferred_time", "preferred day and time"),
|
||||
]
|
||||
|
||||
|
||||
async def extract_call_state(messages, ollama_url, model, timeout=15):
|
||||
"""One short JSON-mode pass over the transcript-so-far. Returns the state dict or None."""
|
||||
turns = [
|
||||
f"{m['role']}: {m['content']}"
|
||||
for m in messages
|
||||
if m.get("role") in ("user", "assistant")
|
||||
and isinstance(m.get("content"), str) and m["content"].strip()
|
||||
]
|
||||
if not turns:
|
||||
return None
|
||||
base = ollama_url.rstrip("/")
|
||||
if base.endswith("/v1"):
|
||||
base = base[:-3]
|
||||
body_extra = {}
|
||||
if "qwen3" in model or "deepseek-r1" in model:
|
||||
body_extra["think"] = False # thinking models emit non-JSON otherwise
|
||||
async with httpx.AsyncClient(timeout=timeout) as client:
|
||||
r = await client.post(
|
||||
f"{base}/api/chat",
|
||||
json={
|
||||
"model": model,
|
||||
"format": "json",
|
||||
"stream": False,
|
||||
"options": {"temperature": 0, "num_predict": 200},
|
||||
**body_extra,
|
||||
"messages": [
|
||||
{"role": "system", "content": _STATE_INSTRUCTIONS},
|
||||
{"role": "user", "content": "Transcript:\n" + "\n".join(turns)},
|
||||
],
|
||||
},
|
||||
)
|
||||
r.raise_for_status()
|
||||
return json.loads(r.json()["message"]["content"])
|
||||
|
||||
|
||||
def build_state_block(state) -> str:
|
||||
"""Render the extracted state as an explicit checklist for the system prompt.
|
||||
Returns "" when there's nothing worth injecting yet (first turns)."""
|
||||
if not state:
|
||||
return ""
|
||||
ctype = (state.get("call_type") or "unknown").strip().lower()
|
||||
got, needed = [], []
|
||||
for key, label in _BOOKING_ORDER:
|
||||
val = (state.get(key) or "").strip() if isinstance(state.get(key), str) else ""
|
||||
if key == "patient_name" and val and not state.get("name_is_full"):
|
||||
got.append(f"first name: {val}")
|
||||
needed.append("their LAST name (you have the first)")
|
||||
continue
|
||||
if val:
|
||||
got.append(f"{label}: {val}")
|
||||
else:
|
||||
needed.append(label)
|
||||
|
||||
if ctype == "callback":
|
||||
lines = [
|
||||
"CALL STATE (auto-tracked from this conversation — trust it over your memory):",
|
||||
"- This is a NON-BOOKING call: the caller needs staff to handle something off the "
|
||||
"phone. Do NOT ask about insurance, office, or a preferred day/time.",
|
||||
]
|
||||
if got:
|
||||
lines.append("- ALREADY COLLECTED — NEVER ask for these again: " + "; ".join(got))
|
||||
if state.get("patient_name") is None:
|
||||
lines.append("- Still needed: their name. Then confirm the callback number and close.")
|
||||
else:
|
||||
lines.append("- You have what you need: confirm the callback number and close.")
|
||||
return "\n".join(lines)
|
||||
|
||||
if ctype == "booking" and (got or needed):
|
||||
lines = ["CALL STATE (auto-tracked from this conversation — trust it over your memory):"]
|
||||
if got:
|
||||
lines.append("- ALREADY COLLECTED — NEVER ask for these again: " + "; ".join(got))
|
||||
if needed:
|
||||
lines.append("- STILL NEEDED — ask for the FIRST of these, one per turn: "
|
||||
+ ", ".join(needed))
|
||||
# The observed failure loop: caller says "an appointment", model keeps asking why.
|
||||
if not (state.get("reason") or "").strip():
|
||||
lines.append("- No visit reason yet: if you have ALREADY asked what the visit "
|
||||
"is for and they only said 'an appointment', do NOT ask again — "
|
||||
"note it as a general visit and ask the next needed item instead.")
|
||||
else:
|
||||
lines.append("- All booking details collected: confirm the callback number, recap "
|
||||
"as a REQUEST, ask if there's anything else, then close.")
|
||||
return "\n".join(lines)
|
||||
|
||||
return "" # question/unknown — nothing useful to inject
|
||||
|
||||
|
||||
def merge_consecutive_user_messages(messages):
|
||||
"""Collapse back-to-back user messages (VAD-fragmented utterances) into one turn.
|
||||
Returns a new list; non-string content (tool results) is left untouched."""
|
||||
out = []
|
||||
for m in messages:
|
||||
prev = out[-1] if out else None
|
||||
if (
|
||||
prev is not None
|
||||
and m.get("role") == "user" and prev.get("role") == "user"
|
||||
and isinstance(m.get("content"), str) and isinstance(prev.get("content"), str)
|
||||
):
|
||||
prev = dict(prev)
|
||||
prev["content"] = (prev["content"].rstrip() + " " + m["content"].lstrip()).strip()
|
||||
out[-1] = prev
|
||||
else:
|
||||
out.append(m)
|
||||
return out
|
||||
|
||||
|
||||
class CallStateGroomer(FrameProcessor):
|
||||
"""Sits between the user aggregator and the LLM.
|
||||
|
||||
Downstream LLMContextFrame (= a generation is about to start): synchronously groom the
|
||||
context — merge fragmented user turns, refresh the system message with the latest
|
||||
CALL STATE checklist.
|
||||
|
||||
Upstream BotStoppedSpeakingFrame (= the agent finished a reply; Ollama is idle and the
|
||||
caller is about to talk): kick off the next state extraction in the background. Its
|
||||
result is applied on the *next* LLMContextFrame — one turn of lag, zero added latency.
|
||||
"""
|
||||
|
||||
def __init__(self, context, base_system: str, ollama_url: str, model: str):
|
||||
super().__init__()
|
||||
self._context = context
|
||||
self._base_system = base_system
|
||||
self._ollama_url = ollama_url
|
||||
self._model = model
|
||||
self._state = None
|
||||
self._task = None
|
||||
|
||||
def _extract_done(self, task):
|
||||
self._task = None
|
||||
if task.cancelled():
|
||||
return
|
||||
exc = task.exception()
|
||||
if exc:
|
||||
logger.warning(f"CallState extraction failed: {exc}")
|
||||
return
|
||||
state = task.result()
|
||||
if state:
|
||||
self._state = state
|
||||
logger.info(f"CallState updated: {json.dumps(state, ensure_ascii=False)}")
|
||||
|
||||
def _maybe_extract(self):
|
||||
if self._task is not None: # one in flight at a time
|
||||
return
|
||||
messages = list(self._context.messages)
|
||||
if not any(m.get("role") == "user" for m in messages):
|
||||
return # greeting only — nothing to extract yet
|
||||
self._task = asyncio.create_task(
|
||||
extract_call_state(messages, self._ollama_url, self._model)
|
||||
)
|
||||
self._task.add_done_callback(self._extract_done)
|
||||
|
||||
def _groom_context(self):
|
||||
messages = merge_consecutive_user_messages(list(self._context.messages))
|
||||
block = build_state_block(self._state)
|
||||
for i, m in enumerate(messages):
|
||||
if m.get("role") == "system":
|
||||
content = self._base_system + ("\n\n" + block if block else "")
|
||||
if m.get("content") != content:
|
||||
messages[i] = {**m, "content": content}
|
||||
break
|
||||
self._context.set_messages(messages)
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
if isinstance(frame, LLMContextFrame) and direction == FrameDirection.DOWNSTREAM:
|
||||
try:
|
||||
self._groom_context()
|
||||
except Exception:
|
||||
logger.exception("CallState groom failed (continuing with raw context)")
|
||||
elif isinstance(frame, BotStoppedSpeakingFrame):
|
||||
self._maybe_extract()
|
||||
await self.push_frame(frame, direction)
|
||||
Reference in New Issue
Block a user