"""In-call slot-state tracking — deterministic memory for a weak LLM. The 8B keeps re-asking for things the caller already said (name, reason, phone) because it has to *infer* call state from a long transcript under ~1,400 tokens of rules. This module makes the state explicit instead: after each agent turn (while the caller is talking — off the latency-critical path) it runs one short JSON-mode extraction over the transcript, then injects a live checklist into the system message before the next generation: CALL STATE ... ALREADY COLLECTED (never ask again): name=Carlos Garcia, ... STILL NEEDED: insurance, preferred day/time Small models follow an explicit checklist at the end of the system prompt far more reliably than they track slots from conversation history. Same philosophy as the deterministic phone-confirm safety net in EndCallProcessor: scaffold around the model. CallStateGroomer also merges consecutive user messages in the context (VAD splits one utterance like "Monday" / "3 p.m." into two turns, which derails the 8B) — done synchronously on LLMContextFrame, right before the LLM reads the context. """ import asyncio import json import httpx from loguru import logger from pipecat.frames.frames import BotStoppedSpeakingFrame, Frame, LLMContextFrame from pipecat.processors.frame_processor import FrameDirection, FrameProcessor # Short, in-call variant of the post-call extractor (extract.py): only what's needed to # build the checklist, temperature 0, capped output. Runs on the local Ollama model. _STATE_INSTRUCTIONS = ( "You are tracking the state of a LIVE phone call between a caller and the receptionist " "of an optometry practice. From the transcript, extract only what the CALLER has clearly " "provided so far. Respond with ONLY a JSON object with these keys:\n" ' "call_type": "booking" (wants to schedule a visit), "callback" (wants something staff ' "must check off-phone: order/frames/lens/prescription status, billing, account lookup, " 'reach a person), "question" (just asking something), or "unknown"\n' ' "reason": string or null — for booking, why they want to be seen (visit type or eye ' "problem); for callback, a one-line note of what they need. 'an appointment' alone is NOT " "a reason — use null.\n" ' "location": string or null — the office/city the caller wants\n' ' "patient_name": string or null — the caller\'s name as given (full or first-only)\n' ' "name_is_full": boolean — true only if it clearly has first AND last name\n' ' "insurance": string or null — the plan the caller named, exactly as said\n' ' "preferred_time": string or null — day/time in the caller\'s own words\n' "Use null unless the caller clearly stated it. Never invent values." ) # Booking slots in the order the call script gathers them. _BOOKING_ORDER = [ ("reason", "reason for the visit"), ("location", "which office/city"), ("patient_name", "full name"), ("insurance", "insurance"), ("preferred_time", "preferred day and time"), ] async def extract_call_state(messages, ollama_url, model, timeout=15): """One short JSON-mode pass over the transcript-so-far. Returns the state dict or None.""" turns = [ f"{m['role']}: {m['content']}" for m in messages if m.get("role") in ("user", "assistant") and isinstance(m.get("content"), str) and m["content"].strip() ] if not turns: return None base = ollama_url.rstrip("/") if base.endswith("/v1"): base = base[:-3] body_extra = {} if "qwen3" in model or "deepseek-r1" in model: body_extra["think"] = False # thinking models emit non-JSON otherwise async with httpx.AsyncClient(timeout=timeout) as client: r = await client.post( f"{base}/api/chat", json={ "model": model, "format": "json", "stream": False, "options": {"temperature": 0, "num_predict": 200}, **body_extra, "messages": [ {"role": "system", "content": _STATE_INSTRUCTIONS}, {"role": "user", "content": "Transcript:\n" + "\n".join(turns)}, ], }, ) r.raise_for_status() return json.loads(r.json()["message"]["content"]) def build_state_block(state) -> str: """Render the extracted state as an explicit checklist for the system prompt. Returns "" when there's nothing worth injecting yet (first turns).""" if not state: return "" ctype = (state.get("call_type") or "unknown").strip().lower() got, needed = [], [] for key, label in _BOOKING_ORDER: val = (state.get(key) or "").strip() if isinstance(state.get(key), str) else "" if key == "patient_name" and val and not state.get("name_is_full"): got.append(f"first name: {val}") needed.append("their LAST name (you have the first)") continue if val: got.append(f"{label}: {val}") else: needed.append(label) if ctype == "callback": lines = [ "CALL STATE (auto-tracked from this conversation — trust it over your memory):", "- This is a NON-BOOKING call: the caller needs staff to handle something off the " "phone. Do NOT ask about insurance, office, or a preferred day/time.", ] if got: lines.append("- ALREADY COLLECTED — NEVER ask for these again: " + "; ".join(got)) if state.get("patient_name") is None: lines.append("- Still needed: their name. Then confirm the callback number and close.") else: lines.append("- You have what you need: confirm the callback number and close.") return "\n".join(lines) if ctype == "booking" and (got or needed): lines = ["CALL STATE (auto-tracked from this conversation — trust it over your memory):"] if got: lines.append("- ALREADY COLLECTED — NEVER ask for these again: " + "; ".join(got)) if needed: lines.append("- STILL NEEDED — ask for the FIRST of these, one per turn: " + ", ".join(needed)) # The observed failure loop: caller says "an appointment", model keeps asking why. if not (state.get("reason") or "").strip(): lines.append("- No visit reason yet: if you have ALREADY asked what the visit " "is for and they only said 'an appointment', do NOT ask again — " "note it as a general visit and ask the next needed item instead.") else: lines.append("- All booking details collected: confirm the callback number, recap " "as a REQUEST, ask if there's anything else, then close.") return "\n".join(lines) return "" # question/unknown — nothing useful to inject def merge_consecutive_user_messages(messages): """Collapse back-to-back user messages (VAD-fragmented utterances) into one turn. Returns a new list; non-string content (tool results) is left untouched.""" out = [] for m in messages: prev = out[-1] if out else None if ( prev is not None and m.get("role") == "user" and prev.get("role") == "user" and isinstance(m.get("content"), str) and isinstance(prev.get("content"), str) ): prev = dict(prev) prev["content"] = (prev["content"].rstrip() + " " + m["content"].lstrip()).strip() out[-1] = prev else: out.append(m) return out class CallStateGroomer(FrameProcessor): """Sits between the user aggregator and the LLM. Downstream LLMContextFrame (= a generation is about to start): synchronously groom the context — merge fragmented user turns, refresh the system message with the latest CALL STATE checklist. Upstream BotStoppedSpeakingFrame (= the agent finished a reply; Ollama is idle and the caller is about to talk): kick off the next state extraction in the background. Its result is applied on the *next* LLMContextFrame — one turn of lag, zero added latency. """ def __init__(self, context, base_system: str, ollama_url: str, model: str): super().__init__() self._context = context self._base_system = base_system self._ollama_url = ollama_url self._model = model self._state = None self._task = None def _extract_done(self, task): self._task = None if task.cancelled(): return exc = task.exception() if exc: logger.warning(f"CallState extraction failed: {exc}") return state = task.result() if state: self._state = state logger.info(f"CallState updated: {json.dumps(state, ensure_ascii=False)}") def _maybe_extract(self): if self._task is not None: # one in flight at a time return messages = list(self._context.messages) if not any(m.get("role") == "user" for m in messages): return # greeting only — nothing to extract yet self._task = asyncio.create_task( extract_call_state(messages, self._ollama_url, self._model) ) self._task.add_done_callback(self._extract_done) def _groom_context(self): messages = merge_consecutive_user_messages(list(self._context.messages)) block = build_state_block(self._state) for i, m in enumerate(messages): if m.get("role") == "system": content = self._base_system + ("\n\n" + block if block else "") if m.get("content") != content: messages[i] = {**m, "content": content} break self._context.set_messages(messages) async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) if isinstance(frame, LLMContextFrame) and direction == FrameDirection.DOWNSTREAM: try: self._groom_context() except Exception: logger.exception("CallState groom failed (continuing with raw context)") elif isinstance(frame, BotStoppedSpeakingFrame): self._maybe_extract() await self.push_frame(frame, direction)