Files
avc-phone-ai/callstate.py
tocmo0nlord 54d707ceac Fix unasked pleasantries + callback re-asks (live call 2026-07-04 #3)
- PLEASANTRIES: the 8B parroted the verbatim example ("I'm doing well, thank
  you for asking") when the caller never asked how she was, then burned two
  more turns "starting fresh". Rule is now strictly conditional with no canned
  example: answer+ask-back only if the caller literally asks; never answer a
  question that wasn't asked.
- callstate: extraction now captures the CALLBACK request note ("are my
  glasses ready" -> "status of an order"), so the checklist stops the "what's
  the reason for your call?" re-ask; callback wrap-up wording now says STATE
  the caller-ID number, never ask for one (she asked "what's the best phone
  number" despite having it); first-name-only callbacks still ask the last name.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-07-04 03:58:15 +00:00

245 lines
11 KiB
Python

"""In-call slot-state tracking — deterministic memory for a weak LLM.
The 8B keeps re-asking for things the caller already said (name, reason, phone) because
it has to *infer* call state from a long transcript under ~1,400 tokens of rules. This
module makes the state explicit instead: after each agent turn (while the caller is
talking — off the latency-critical path) it runs one short JSON-mode extraction over the
transcript, then injects a live checklist into the system message before the next
generation:
CALL STATE ... ALREADY COLLECTED (never ask again): name=Carlos Garcia, ...
STILL NEEDED: insurance, preferred day/time
Small models follow an explicit checklist at the end of the system prompt far more
reliably than they track slots from conversation history. Same philosophy as the
deterministic phone-confirm safety net in EndCallProcessor: scaffold around the model.
CallStateGroomer also merges consecutive user messages in the context (VAD splits one
utterance like "Monday" / "3 p.m." into two turns, which derails the 8B) — done
synchronously on LLMContextFrame, right before the LLM reads the context.
"""
import asyncio
import json
import httpx
from loguru import logger
from pipecat.frames.frames import BotStoppedSpeakingFrame, Frame, LLMContextFrame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
# Short, in-call variant of the post-call extractor (extract.py): only what's needed to
# build the checklist, temperature 0, capped output. Runs on the local Ollama model.
_STATE_INSTRUCTIONS = (
"You are tracking the state of a LIVE phone call between a caller and the receptionist "
"of an optometry practice. From the transcript, extract only what the CALLER has clearly "
"provided so far. Respond with ONLY a JSON object with these keys:\n"
' "call_type": "booking" (wants to schedule a visit), "callback" (wants something staff '
"must check off-phone: order/frames/lens/prescription status, billing, account lookup, "
'reach a person), "question" (just asking something), or "unknown"\n'
' "reason": string or null — WHAT the caller wants. For booking: the visit type or eye '
'problem (e.g. "annual exam", "eye pain"). For callback: what they want checked or done, '
'even if phrased as a question (e.g. "are my glasses ready", "status of an order", '
'"billing question"). Only \'an appointment\' with no visit reason is NOT a reason — '
"use null then.\n"
' "location": string or null — the office/city the caller wants\n'
' "patient_name": string or null — the caller\'s name as given (full or first-only)\n'
' "name_is_full": boolean — true only if it clearly has first AND last name\n'
' "insurance": string or null — the plan the caller named, exactly as said\n'
' "preferred_time": string or null — day/time in the caller\'s own words\n'
"Use null unless the caller clearly stated it. Never invent values."
)
# Booking slots in the order the call script gathers them.
_BOOKING_ORDER = [
("reason", "reason for the visit"),
("location", "which office/city"),
("patient_name", "full name"),
("insurance", "insurance"),
("preferred_time", "preferred day and time"),
]
async def extract_call_state(messages, ollama_url, model, timeout=15):
"""One short JSON-mode pass over the transcript-so-far. Returns the state dict or None."""
turns = [
f"{m['role']}: {m['content']}"
for m in messages
if m.get("role") in ("user", "assistant")
and isinstance(m.get("content"), str) and m["content"].strip()
]
if not turns:
return None
base = ollama_url.rstrip("/")
if base.endswith("/v1"):
base = base[:-3]
body_extra = {}
if "qwen3" in model or "deepseek-r1" in model:
body_extra["think"] = False # thinking models emit non-JSON otherwise
async with httpx.AsyncClient(timeout=timeout) as client:
r = await client.post(
f"{base}/api/chat",
json={
"model": model,
"format": "json",
"stream": False,
"options": {"temperature": 0, "num_predict": 200},
**body_extra,
"messages": [
{"role": "system", "content": _STATE_INSTRUCTIONS},
{"role": "user", "content": "Transcript:\n" + "\n".join(turns)},
],
},
)
r.raise_for_status()
return json.loads(r.json()["message"]["content"])
def build_state_block(state) -> str:
"""Render the extracted state as an explicit checklist for the system prompt.
Returns "" when there's nothing worth injecting yet (first turns)."""
if not state:
return ""
ctype = (state.get("call_type") or "unknown").strip().lower()
got, needed = [], []
for key, label in _BOOKING_ORDER:
val = (state.get(key) or "").strip() if isinstance(state.get(key), str) else ""
if key == "patient_name" and val and not state.get("name_is_full"):
got.append(f"first name: {val}")
needed.append("their LAST name (you have the first)")
continue
if val:
got.append(f"{label}: {val}")
else:
needed.append(label)
if ctype == "callback":
reason = (state.get("reason") or "").strip() if isinstance(state.get("reason"), str) else ""
lines = [
"CALL STATE (auto-tracked from this conversation — trust it over your memory):",
"- This is a NON-BOOKING call: the caller needs staff to handle something off the "
"phone. Do NOT ask about insurance, office, or a preferred day/time.",
]
if reason:
lines.append(f"- Their request (already known — NEVER ask what they're calling "
f"about again): {reason}")
got = [g for g in got if not g.startswith("reason for the visit")] # shown above
if got:
lines.append("- ALREADY COLLECTED — NEVER ask for these again: " + "; ".join(got))
wrap = ("STATE the number on file back to them (it's in CALLER ID above) and invite a "
"correction only — NEVER ask them for a phone number — then say staff will call "
"them back, and close.")
if state.get("patient_name") is None:
lines.append(f"- Still needed: their name. Then {wrap}")
elif not state.get("name_is_full"):
lines.append(f"- Still needed: their LAST name (you have the first). Then {wrap}")
else:
lines.append(f"- You have what you need: {wrap}")
return "\n".join(lines)
if ctype == "booking" and (got or needed):
lines = ["CALL STATE (auto-tracked from this conversation — trust it over your memory):"]
if got:
lines.append("- ALREADY COLLECTED — NEVER ask for these again: " + "; ".join(got))
if needed:
lines.append("- STILL NEEDED — ask for the FIRST of these, one per turn: "
+ ", ".join(needed))
# The observed failure loop: caller says "an appointment", model keeps asking why.
if not (state.get("reason") or "").strip():
lines.append("- No visit reason yet: if you have ALREADY asked what the visit "
"is for and they only said 'an appointment', do NOT ask again — "
"note it as a general visit and ask the next needed item instead.")
else:
lines.append("- All booking details collected: confirm the callback number, recap "
"as a REQUEST, ask if there's anything else, then close.")
return "\n".join(lines)
return "" # question/unknown — nothing useful to inject
def merge_consecutive_user_messages(messages):
"""Collapse back-to-back user messages (VAD-fragmented utterances) into one turn.
Returns a new list; non-string content (tool results) is left untouched."""
out = []
for m in messages:
prev = out[-1] if out else None
if (
prev is not None
and m.get("role") == "user" and prev.get("role") == "user"
and isinstance(m.get("content"), str) and isinstance(prev.get("content"), str)
):
prev = dict(prev)
prev["content"] = (prev["content"].rstrip() + " " + m["content"].lstrip()).strip()
out[-1] = prev
else:
out.append(m)
return out
class CallStateGroomer(FrameProcessor):
"""Sits between the user aggregator and the LLM.
Downstream LLMContextFrame (= a generation is about to start): synchronously groom the
context — merge fragmented user turns, refresh the system message with the latest
CALL STATE checklist.
Upstream BotStoppedSpeakingFrame (= the agent finished a reply; Ollama is idle and the
caller is about to talk): kick off the next state extraction in the background. Its
result is applied on the *next* LLMContextFrame — one turn of lag, zero added latency.
"""
def __init__(self, context, base_system: str, ollama_url: str, model: str):
super().__init__()
self._context = context
self._base_system = base_system
self._ollama_url = ollama_url
self._model = model
self._state = None
self._task = None
def _extract_done(self, task):
self._task = None
if task.cancelled():
return
exc = task.exception()
if exc:
logger.warning(f"CallState extraction failed: {exc}")
return
state = task.result()
if state:
self._state = state
logger.info(f"CallState updated: {json.dumps(state, ensure_ascii=False)}")
def _maybe_extract(self):
if self._task is not None: # one in flight at a time
return
messages = list(self._context.messages)
if not any(m.get("role") == "user" for m in messages):
return # greeting only — nothing to extract yet
self._task = asyncio.create_task(
extract_call_state(messages, self._ollama_url, self._model)
)
self._task.add_done_callback(self._extract_done)
def _groom_context(self):
messages = merge_consecutive_user_messages(list(self._context.messages))
block = build_state_block(self._state)
for i, m in enumerate(messages):
if m.get("role") == "system":
content = self._base_system + ("\n\n" + block if block else "")
if m.get("content") != content:
messages[i] = {**m, "content": content}
break
self._context.set_messages(messages)
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, LLMContextFrame) and direction == FrameDirection.DOWNSTREAM:
try:
self._groom_context()
except Exception:
logger.exception("CallState groom failed (continuing with raw context)")
elif isinstance(frame, BotStoppedSpeakingFrame):
self._maybe_extract()
await self.push_frame(frame, direction)