- PLEASANTRIES: the 8B parroted the verbatim example ("I'm doing well, thank
you for asking") when the caller never asked how she was, then burned two
more turns "starting fresh". Rule is now strictly conditional with no canned
example: answer+ask-back only if the caller literally asks; never answer a
question that wasn't asked.
- callstate: extraction now captures the CALLBACK request note ("are my
glasses ready" -> "status of an order"), so the checklist stops the "what's
the reason for your call?" re-ask; callback wrap-up wording now says STATE
the caller-ID number, never ask for one (she asked "what's the best phone
number" despite having it); first-name-only callbacks still ask the last name.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
245 lines
11 KiB
Python
245 lines
11 KiB
Python
"""In-call slot-state tracking — deterministic memory for a weak LLM.
|
|
|
|
The 8B keeps re-asking for things the caller already said (name, reason, phone) because
|
|
it has to *infer* call state from a long transcript under ~1,400 tokens of rules. This
|
|
module makes the state explicit instead: after each agent turn (while the caller is
|
|
talking — off the latency-critical path) it runs one short JSON-mode extraction over the
|
|
transcript, then injects a live checklist into the system message before the next
|
|
generation:
|
|
|
|
CALL STATE ... ALREADY COLLECTED (never ask again): name=Carlos Garcia, ...
|
|
STILL NEEDED: insurance, preferred day/time
|
|
|
|
Small models follow an explicit checklist at the end of the system prompt far more
|
|
reliably than they track slots from conversation history. Same philosophy as the
|
|
deterministic phone-confirm safety net in EndCallProcessor: scaffold around the model.
|
|
|
|
CallStateGroomer also merges consecutive user messages in the context (VAD splits one
|
|
utterance like "Monday" / "3 p.m." into two turns, which derails the 8B) — done
|
|
synchronously on LLMContextFrame, right before the LLM reads the context.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
|
|
import httpx
|
|
from loguru import logger
|
|
|
|
from pipecat.frames.frames import BotStoppedSpeakingFrame, Frame, LLMContextFrame
|
|
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
|
|
|
# Short, in-call variant of the post-call extractor (extract.py): only what's needed to
|
|
# build the checklist, temperature 0, capped output. Runs on the local Ollama model.
|
|
_STATE_INSTRUCTIONS = (
|
|
"You are tracking the state of a LIVE phone call between a caller and the receptionist "
|
|
"of an optometry practice. From the transcript, extract only what the CALLER has clearly "
|
|
"provided so far. Respond with ONLY a JSON object with these keys:\n"
|
|
' "call_type": "booking" (wants to schedule a visit), "callback" (wants something staff '
|
|
"must check off-phone: order/frames/lens/prescription status, billing, account lookup, "
|
|
'reach a person), "question" (just asking something), or "unknown"\n'
|
|
' "reason": string or null — WHAT the caller wants. For booking: the visit type or eye '
|
|
'problem (e.g. "annual exam", "eye pain"). For callback: what they want checked or done, '
|
|
'even if phrased as a question (e.g. "are my glasses ready", "status of an order", '
|
|
'"billing question"). Only \'an appointment\' with no visit reason is NOT a reason — '
|
|
"use null then.\n"
|
|
' "location": string or null — the office/city the caller wants\n'
|
|
' "patient_name": string or null — the caller\'s name as given (full or first-only)\n'
|
|
' "name_is_full": boolean — true only if it clearly has first AND last name\n'
|
|
' "insurance": string or null — the plan the caller named, exactly as said\n'
|
|
' "preferred_time": string or null — day/time in the caller\'s own words\n'
|
|
"Use null unless the caller clearly stated it. Never invent values."
|
|
)
|
|
|
|
# Booking slots in the order the call script gathers them.
|
|
_BOOKING_ORDER = [
|
|
("reason", "reason for the visit"),
|
|
("location", "which office/city"),
|
|
("patient_name", "full name"),
|
|
("insurance", "insurance"),
|
|
("preferred_time", "preferred day and time"),
|
|
]
|
|
|
|
|
|
async def extract_call_state(messages, ollama_url, model, timeout=15):
|
|
"""One short JSON-mode pass over the transcript-so-far. Returns the state dict or None."""
|
|
turns = [
|
|
f"{m['role']}: {m['content']}"
|
|
for m in messages
|
|
if m.get("role") in ("user", "assistant")
|
|
and isinstance(m.get("content"), str) and m["content"].strip()
|
|
]
|
|
if not turns:
|
|
return None
|
|
base = ollama_url.rstrip("/")
|
|
if base.endswith("/v1"):
|
|
base = base[:-3]
|
|
body_extra = {}
|
|
if "qwen3" in model or "deepseek-r1" in model:
|
|
body_extra["think"] = False # thinking models emit non-JSON otherwise
|
|
async with httpx.AsyncClient(timeout=timeout) as client:
|
|
r = await client.post(
|
|
f"{base}/api/chat",
|
|
json={
|
|
"model": model,
|
|
"format": "json",
|
|
"stream": False,
|
|
"options": {"temperature": 0, "num_predict": 200},
|
|
**body_extra,
|
|
"messages": [
|
|
{"role": "system", "content": _STATE_INSTRUCTIONS},
|
|
{"role": "user", "content": "Transcript:\n" + "\n".join(turns)},
|
|
],
|
|
},
|
|
)
|
|
r.raise_for_status()
|
|
return json.loads(r.json()["message"]["content"])
|
|
|
|
|
|
def build_state_block(state) -> str:
|
|
"""Render the extracted state as an explicit checklist for the system prompt.
|
|
Returns "" when there's nothing worth injecting yet (first turns)."""
|
|
if not state:
|
|
return ""
|
|
ctype = (state.get("call_type") or "unknown").strip().lower()
|
|
got, needed = [], []
|
|
for key, label in _BOOKING_ORDER:
|
|
val = (state.get(key) or "").strip() if isinstance(state.get(key), str) else ""
|
|
if key == "patient_name" and val and not state.get("name_is_full"):
|
|
got.append(f"first name: {val}")
|
|
needed.append("their LAST name (you have the first)")
|
|
continue
|
|
if val:
|
|
got.append(f"{label}: {val}")
|
|
else:
|
|
needed.append(label)
|
|
|
|
if ctype == "callback":
|
|
reason = (state.get("reason") or "").strip() if isinstance(state.get("reason"), str) else ""
|
|
lines = [
|
|
"CALL STATE (auto-tracked from this conversation — trust it over your memory):",
|
|
"- This is a NON-BOOKING call: the caller needs staff to handle something off the "
|
|
"phone. Do NOT ask about insurance, office, or a preferred day/time.",
|
|
]
|
|
if reason:
|
|
lines.append(f"- Their request (already known — NEVER ask what they're calling "
|
|
f"about again): {reason}")
|
|
got = [g for g in got if not g.startswith("reason for the visit")] # shown above
|
|
if got:
|
|
lines.append("- ALREADY COLLECTED — NEVER ask for these again: " + "; ".join(got))
|
|
wrap = ("STATE the number on file back to them (it's in CALLER ID above) and invite a "
|
|
"correction only — NEVER ask them for a phone number — then say staff will call "
|
|
"them back, and close.")
|
|
if state.get("patient_name") is None:
|
|
lines.append(f"- Still needed: their name. Then {wrap}")
|
|
elif not state.get("name_is_full"):
|
|
lines.append(f"- Still needed: their LAST name (you have the first). Then {wrap}")
|
|
else:
|
|
lines.append(f"- You have what you need: {wrap}")
|
|
return "\n".join(lines)
|
|
|
|
if ctype == "booking" and (got or needed):
|
|
lines = ["CALL STATE (auto-tracked from this conversation — trust it over your memory):"]
|
|
if got:
|
|
lines.append("- ALREADY COLLECTED — NEVER ask for these again: " + "; ".join(got))
|
|
if needed:
|
|
lines.append("- STILL NEEDED — ask for the FIRST of these, one per turn: "
|
|
+ ", ".join(needed))
|
|
# The observed failure loop: caller says "an appointment", model keeps asking why.
|
|
if not (state.get("reason") or "").strip():
|
|
lines.append("- No visit reason yet: if you have ALREADY asked what the visit "
|
|
"is for and they only said 'an appointment', do NOT ask again — "
|
|
"note it as a general visit and ask the next needed item instead.")
|
|
else:
|
|
lines.append("- All booking details collected: confirm the callback number, recap "
|
|
"as a REQUEST, ask if there's anything else, then close.")
|
|
return "\n".join(lines)
|
|
|
|
return "" # question/unknown — nothing useful to inject
|
|
|
|
|
|
def merge_consecutive_user_messages(messages):
|
|
"""Collapse back-to-back user messages (VAD-fragmented utterances) into one turn.
|
|
Returns a new list; non-string content (tool results) is left untouched."""
|
|
out = []
|
|
for m in messages:
|
|
prev = out[-1] if out else None
|
|
if (
|
|
prev is not None
|
|
and m.get("role") == "user" and prev.get("role") == "user"
|
|
and isinstance(m.get("content"), str) and isinstance(prev.get("content"), str)
|
|
):
|
|
prev = dict(prev)
|
|
prev["content"] = (prev["content"].rstrip() + " " + m["content"].lstrip()).strip()
|
|
out[-1] = prev
|
|
else:
|
|
out.append(m)
|
|
return out
|
|
|
|
|
|
class CallStateGroomer(FrameProcessor):
|
|
"""Sits between the user aggregator and the LLM.
|
|
|
|
Downstream LLMContextFrame (= a generation is about to start): synchronously groom the
|
|
context — merge fragmented user turns, refresh the system message with the latest
|
|
CALL STATE checklist.
|
|
|
|
Upstream BotStoppedSpeakingFrame (= the agent finished a reply; Ollama is idle and the
|
|
caller is about to talk): kick off the next state extraction in the background. Its
|
|
result is applied on the *next* LLMContextFrame — one turn of lag, zero added latency.
|
|
"""
|
|
|
|
def __init__(self, context, base_system: str, ollama_url: str, model: str):
|
|
super().__init__()
|
|
self._context = context
|
|
self._base_system = base_system
|
|
self._ollama_url = ollama_url
|
|
self._model = model
|
|
self._state = None
|
|
self._task = None
|
|
|
|
def _extract_done(self, task):
|
|
self._task = None
|
|
if task.cancelled():
|
|
return
|
|
exc = task.exception()
|
|
if exc:
|
|
logger.warning(f"CallState extraction failed: {exc}")
|
|
return
|
|
state = task.result()
|
|
if state:
|
|
self._state = state
|
|
logger.info(f"CallState updated: {json.dumps(state, ensure_ascii=False)}")
|
|
|
|
def _maybe_extract(self):
|
|
if self._task is not None: # one in flight at a time
|
|
return
|
|
messages = list(self._context.messages)
|
|
if not any(m.get("role") == "user" for m in messages):
|
|
return # greeting only — nothing to extract yet
|
|
self._task = asyncio.create_task(
|
|
extract_call_state(messages, self._ollama_url, self._model)
|
|
)
|
|
self._task.add_done_callback(self._extract_done)
|
|
|
|
def _groom_context(self):
|
|
messages = merge_consecutive_user_messages(list(self._context.messages))
|
|
block = build_state_block(self._state)
|
|
for i, m in enumerate(messages):
|
|
if m.get("role") == "system":
|
|
content = self._base_system + ("\n\n" + block if block else "")
|
|
if m.get("content") != content:
|
|
messages[i] = {**m, "content": content}
|
|
break
|
|
self._context.set_messages(messages)
|
|
|
|
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
|
await super().process_frame(frame, direction)
|
|
if isinstance(frame, LLMContextFrame) and direction == FrameDirection.DOWNSTREAM:
|
|
try:
|
|
self._groom_context()
|
|
except Exception:
|
|
logger.exception("CallState groom failed (continuing with raw context)")
|
|
elif isinstance(frame, BotStoppedSpeakingFrame):
|
|
self._maybe_extract()
|
|
await self.push_frame(frame, direction)
|