diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..40c20de --- /dev/null +++ b/.env.example @@ -0,0 +1,62 @@ +# Copy to .env and fill in. run.sh auto-loads it. + +# ── Public ingress (Twilio dials this back) ────────────────────────────────── +# Public hostname; nginx terminates TLS here and proxies to the app. Must match the +# Twilio webhook host (Twilio signs https://PUBLIC_HOST/voice). +PUBLIC_HOST=voip.activeblue.net +PORT=8200 +# App bind address. Default 127.0.0.1 (nginx proxies in locally) — not exposed on LAN. +BIND_HOST=127.0.0.1 + +# ── Twilio ─────────────────────────────────────────────────────────────────── +# From console.twilio.com. Account SID + a Standard API Key (scoped to this app, +# revocable independently). The Auth Token stays in the Twilio console only — never on +# this server. Create the key under Account → API Keys → Create Standard key, name it +# avc-phone-agent-prod; the Secret is shown once. Used to auto-hang-up the carrier leg +# and validate inbound webhook signatures. +TWILIO_ACCOUNT_SID=ACxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx +TWILIO_API_KEY_SID=SKxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx +TWILIO_API_KEY_SECRET=your_api_key_secret_here +# Inbound webhook signature validation is ON whenever TWILIO_API_KEY_SECRET is set. +# Set to false only for local testing without real Twilio requests. +TWILIO_VALIDATE=true +# Shared secret embedded in the Media Stream wss URL to gate /ws. Set a stable random +# value (e.g. `openssl rand -base64 24`); if blank, one is generated per process start. +STREAM_TOKEN= + +# ── Odoo appointment integration ───────────────────────────────────────────── +# Leave ODOO_USER/ODOO_API_KEY blank to disable Odoo and log requests to JSONL only. +# Same creds the activeblue-agent container uses (docker inspect activeblue-agent). +# Verified working against db1 with ODOO_TARGET=crm. +ODOO_URL=http://localhost:8069 +ODOO_DB=db1 +ODOO_USER=mr.garcia09@gmail.com +ODOO_API_KEY= +ODOO_TARGET=crm # crm = callback lead (recommended) | calendar = tentative event + +# ── Capacity ───────────────────────────────────────────────────────────────── +# Max simultaneous calls (each uses GPU; Ollama serializes generation). Over-cap +# callers hear BUSY_MESSAGE and are hung up. Tune to your GPU headroom (2-3 typical). +MAX_CONCURRENT_CALLS=2 +# BUSY_MESSAGE=Thank you for calling Advanced Vision Care. All of our lines are busy right now. Please call back in a few minutes. Goodbye. + +# ── Models (defaults are fine) ─────────────────────────────────────────────── +OLLAMA_MODEL=llama3.1:8b +OLLAMA_URL=http://127.0.0.1:11434/v1 +# LLM provider: ollama (local, default) | anthropic (Claude API). Flip to A/B test Claude. +LLM_PROVIDER=ollama +ANTHROPIC_API_KEY= +# Default is the most capable model; for low-latency phone voice prefer claude-haiku-4-5 +# (fastest) or claude-sonnet-4-6 (balance). +ANTHROPIC_MODEL=claude-opus-4-8 +# ── STT: Deepgram (real-time, in-call only) ────────────────────────────────── +# Nova-2 delivers end-of-utterance in <300ms (vs Whisper's 1-3s buffering). Key from +# console.deepgram.com. Model is fixed to nova-2 in code; DEEPGRAM_MODEL is informational. +DEEPGRAM_API_KEY= +DEEPGRAM_MODEL=nova-2 +# Whisper is retained for POST-CALL transcription only (Phase 3), not the live pipeline. +WHISPER_MODEL=base +WHISPER_DEVICE=cuda +WHISPER_COMPUTE=float16 +KOKORO_VOICE=af_heart +KOKORO_MODEL_DIR=/home/tocmo0nlord/pipecat-run/models diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ff04b07 --- /dev/null +++ b/.gitignore @@ -0,0 +1,19 @@ +# Secrets — never commit +.env + +# Recordings (local only, may contain PHI) +recordings/ + +# Model weights +*.gguf + +# Python +__pycache__/ +*.py[cod] +*.egg-info/ +.venv/ +venv/ + +# OS / editor +.DS_Store +*.swp diff --git a/README.md b/README.md new file mode 100644 index 0000000..1a4de53 --- /dev/null +++ b/README.md @@ -0,0 +1,132 @@ +# AVC Phone Agent — inbound optometry line (Pipecat + Twilio, fully local) + +A real phone number that callers dial; the agent answers in voice, handles hours / +location / insurance / services questions, and **captures appointment requests** for +staff callback. All AI runs **locally on this box**: + +``` +caller ─▶ Twilio ─▶ wss (Traefik TLS) ─▶ server.py ─▶ Pipecat pipeline: + Twilio Media Stream (8kHz µ-law) + │ + ▼ + Silero VAD ─▶ Whisper STT (GPU) ─▶ activeblue-avc (Ollama) ─▶ Kokoro TTS ─▶ back to caller +``` + +Inbound only. No cloud STT/TTS — audio stays on the machine except the Twilio carrier leg. + +## Files +| File | Role | +|---|---| +| `server.py` | FastAPI: `POST /voice` (TwiML) + `WS /ws` (Twilio Media Stream) | +| `bot.py` | The per-call Pipecat pipeline (VAD→STT→LLM→TTS) + tool wiring | +| `practice.py` | **AVC business facts (PLACEHOLDERS — edit before go-live)** + appointment-capture tool | +| `odoo_client.py` | Writes captured requests into Odoo (CRM lead by default) via XML-RPC | +| `run.sh` | Launcher (reuses pipecat-run venv + sets CUDA lib path) | +| `avc-phone.service` | systemd unit (install on this box) | +| `deploy/setup-tls.sh` | One-shot: Let's Encrypt cert + nginx vhost install (run as root) | +| `deploy/nginx-*.conf` | nginx TLS reverse-proxy vhost + WebSocket-upgrade map | +| `traefik-avc-phone.yml` | Unused alternative (kept for a future multi-host/Traefik setup) | +| `.env.example` | Copy to `.env`, fill Twilio creds + public host + Odoo creds | +| `appointment_requests.jsonl` | Local fallback — only used if Odoo is unreachable/disabled | + +## What's done vs. what YOU must supply + +**Working / verified locally:** +- Pipeline assembles; all services construct (smoke-tested). +- GPU Whisper fixed — installed CUDA12 `cublas`+`cudnn` wheels into the venv; `run.sh` + sets `LD_LIBRARY_PATH` so faster-whisper finds them. Verified transcribe on GPU. +- Local model `activeblue-avc:latest` is the brain; Kokoro voice; appointment tool. +- **Odoo appointment integration wired + verified** against prod `db1`: a captured + request creates a `crm.lead` (callback to-do) via XML-RPC using the same API key the + `activeblue-agent` service uses. Verified create→read→delete (no residue left in db1). + If Odoo is unreachable or creds are blank, it falls back to `appointment_requests.jsonl` + and still confirms to the caller — a request is never lost. + +**You must supply (can't be done from this box):** +1. **Twilio account + a Voice phone number.** +2. **Port-forward 443** (and 80) from your router to this box, and run `deploy/setup-tls.sh` + for the nginx TLS reverse proxy (Twilio needs real TLS on 443 for the `wss` stream). +3. **Real AVC facts** in `practice.py` (hours, address, insurance, services, phone). +4. **Odoo creds in `.env`** (`ODOO_USER` + `ODOO_API_KEY`) to enable lead creation. + Set `ODOO_DB` (`db1` for prod) and `ODOO_TARGET` (`crm` lead, or `calendar` event). + Leave creds blank to disable Odoo and log to JSONL only. + +## Setup + +1. **Config** + ```bash + cd /home/tocmo0nlord/avc-phone + cp .env.example .env # fill PUBLIC_HOST, TWILIO_ACCOUNT_SID, TWILIO_AUTH_TOKEN + $EDITOR practice.py # replace PLACEHOLDER hours/address/insurance/services + ``` + +2. **Run it** + ```bash + ./run.sh # listens plain HTTP on :8200 (Traefik terminates TLS) + curl localhost:8200/health # {"status":"ok",...} + ``` + +3. **TLS reverse proxy (nginx, on this box).** No Traefik — `voip.activeblue.net` points + at your WAN IP (`66.23.239.222`) which NATs to this box (`10.10.1.221`). nginx is + already installed and only serving the default site, so we add a vhost for the domain. + **Twilio's `wss` media stream needs real TLS on 443**, so: + - **Forward `443` (and `80`) on your router → `10.10.1.221`.** (80 is for the + Let's Encrypt challenge + the http→https redirect; 443 is the actual traffic.) + - Run the one-shot setup (gets a Let's Encrypt cert, installs the vhost + ws map, + reloads nginx): + ```bash + sudo bash deploy/setup-tls.sh + ``` + It uses `deploy/nginx-voip.activeblue.net.conf` (proxies 443 → `127.0.0.1:8200`, + forwards the `/ws` upgrade, 1-hour stream timeout) and `deploy/nginx-ws-upgrade.conf`. + - Verify publicly: `curl https://voip.activeblue.net/health`. + +4. **Twilio number config** (console.twilio.com → your number → Voice): + - **A call comes in** → Webhook → `https://voip.activeblue.net/voice` → HTTP **POST**. + - Save. That's it — the TwiML we return tells Twilio to open the Media Stream to + `wss://voip.activeblue.net/ws`. + +5. **Call the number.** You should hear the greeting and be able to talk to it. + +## Security (built in) +- **Webhook signature validation:** `POST /voice` verifies Twilio's `X-Twilio-Signature` + (HMAC-SHA1 over the public URL + sorted POST params, keyed by `TWILIO_AUTH_TOKEN`). + Enforced automatically whenever `TWILIO_AUTH_TOKEN` is set. Verified against Twilio's + published reference vector. Unsigned/forged requests get `403`. Set `TWILIO_VALIDATE=false` + only for local testing. + - The signed URL must match exactly, so **`PUBLIC_HOST` must equal the host on the number's + webhook** (`https://$PUBLIC_HOST/voice`). If Traefik rewrites host/path, signatures fail. +- **Media-stream gate:** `/ws` can't carry a usable Twilio signature, so it's gated by a + shared `STREAM_TOKEN` embedded in the wss URL we hand Twilio. Bad/missing token → socket + closed. Set a stable `STREAM_TOKEN` in `.env` (`openssl rand -base64 24`). + +## Run it as a service (systemd) +A unit is provided: `avc-phone.service` (runs as your user, `Restart=always`, ordered +after `ollama.service`). Install (needs sudo — paste these in a `!` shell or a terminal): +```bash +sudo cp /home/tocmo0nlord/avc-phone/avc-phone.service /etc/systemd/system/ +sudo systemctl daemon-reload +sudo systemctl enable --now avc-phone.service +systemctl status avc-phone.service # check it's running +journalctl -u avc-phone.service -f # follow logs +``` +Restart after editing `.env` or `practice.py`: `sudo systemctl restart avc-phone.service`. +(No-sudo alternative: a `systemctl --user` unit + `loginctl enable-linger tocmo0nlord` — +ask and I'll convert it.) + +## Concurrency cap (built in) +`MAX_CONCURRENT_CALLS` (default **2**) bounds simultaneous live calls. The count tracks +active `/ws` pipelines (the real GPU consumers); when full, `/voice` speaks `BUSY_MESSAGE` +and hangs up **before any GPU work**, so in-progress calls are never degraded. A hard +reservation at `/ws` covers the rare race. `/health` reports `active_calls`/`max_calls` +for monitoring. Tune the cap to your GPU headroom. + +## Known limits / next steps +- **Per-call Whisper load:** each call currently constructs its own Whisper model on the + GPU. Fine within the cap; a future optimization is sharing one warm Whisper instance + across calls to cut memory + first-utterance latency. +- **Latency:** first call after start pays one-time model loads (Whisper/Kokoro/Ollama). + Keep the process warm. Tune `WHISPER_MODEL=tiny` if you need faster STT. +- **Function-calling reliability:** `activeblue-avc` is an 8B fine-tune; tool-calling + may need prompt tuning. If it's flaky, we can fall back to a deterministic slot-filling + flow for appointment capture. diff --git a/avc-phone.service b/avc-phone.service new file mode 100644 index 0000000..ada446d --- /dev/null +++ b/avc-phone.service @@ -0,0 +1,34 @@ +[Unit] +Description=AVC optometry phone agent (Pipecat + Twilio, local GPU) +Documentation=file:///home/tocmo0nlord/avc-phone/README.md +# Needs the network up and Ollama serving the activeblue-avc model. Docker (Odoo) is +# only a soft dependency — the agent falls back to JSONL if Odoo is down — so it is +# ordered after but not required. +Wants=network-online.target +After=network-online.target ollama.service docker.service +# Restart rate limit (these are [Unit]-level directives). +StartLimitIntervalSec=300 +StartLimitBurst=5 + +[Service] +Type=simple +User=tocmo0nlord +Group=tocmo0nlord +WorkingDirectory=/home/tocmo0nlord/avc-phone +# run.sh loads .env, sets LD_LIBRARY_PATH for the CUDA libs, and execs the server. +ExecStart=/home/tocmo0nlord/avc-phone/run.sh +# A phone line should always come back up. +Restart=always +RestartSec=5 +# Give model loads time before a failed start counts against the restart limit. +TimeoutStartSec=120 + +# Light hardening. Home is left writable on purpose: the app reads the shared venv + +# Kokoro models + HF Whisper cache under /home and writes the JSONL fallback, so a +# read-only home would risk breaking model-cache locks. /usr,/etc stay read-only. +NoNewPrivileges=true +ProtectSystem=full +PrivateTmp=true + +[Install] +WantedBy=multi-user.target diff --git a/bot.py b/bot.py new file mode 100644 index 0000000..cc2a717 --- /dev/null +++ b/bot.py @@ -0,0 +1,382 @@ +#!/usr/bin/env python3 +"""AVC optometry phone agent — the Pipecat pipeline for a single inbound call. + +Same VAD -> STT -> LLM -> TTS loop as pipecat-run/bot.py, but the ends are swapped +for telephony: audio arrives/leaves as 8 kHz mu-law over a Twilio Media Stream +(WebSocket), decoded by TwilioFrameSerializer. STT runs on the GPU; the LLM is the +local `activeblue-avc` fine-tune via Ollama; TTS is local Kokoro. + +This module just builds + runs the pipeline for one connected call. server.py owns +the FastAPI/TwiML/WebSocket side and calls run_call() once per call. +""" + +import os + +import re +import time + +from loguru import logger + +from pipecat.adapters.schemas.function_schema import FunctionSchema +from pipecat.adapters.schemas.tools_schema import ToolsSchema +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.audio.vad.vad_analyzer import VADParams +from pipecat.frames.frames import ( + BotStoppedSpeakingFrame, + EndFrame, + EndTaskFrame, + Frame, + InputAudioRawFrame, + LLMFullResponseEndFrame, + LLMTextFrame, + TTSSpeakFrame, +) +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.llm_context import LLMContext +from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair +from pipecat.processors.audio.vad_processor import VADProcessor +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.serializers.twilio import TwilioFrameSerializer +from pipecat.services.anthropic.llm import AnthropicLLMService +from pipecat.services.deepgram.stt import DeepgramSTTService +from pipecat.services.kokoro.tts import KokoroTTSService +from pipecat.services.ollama.llm import OLLamaLLMService +from pipecat.transports.websocket.fastapi import ( + FastAPIWebsocketParams, + FastAPIWebsocketTransport, +) + +from practice import practice_summary + +# ── Config (env-overridable) ───────────────────────────────────────────────── +HERE = os.path.dirname(os.path.abspath(__file__)) +# Reuse the Kokoro model files already downloaded by the pipecat-run project. +MODEL_DIR = os.environ.get("KOKORO_MODEL_DIR", "/home/tocmo0nlord/pipecat-run/models") + +OLLAMA_MODEL = os.environ.get("OLLAMA_MODEL", "activeblue-avc:latest") +OLLAMA_URL = os.environ.get("OLLAMA_URL", "http://127.0.0.1:11434/v1") +# Swappable LLM provider: "ollama" (local) or "anthropic" (Claude API). Same universal +# LLMContext drives both — only the service construction differs (see build_llm_service). +LLM_PROVIDER = os.environ.get("LLM_PROVIDER", "ollama").lower() +ANTHROPIC_API_KEY = os.environ.get("ANTHROPIC_API_KEY", "") +# Defaults to the most capable model. For low-latency PHONE voice, set ANTHROPIC_MODEL to +# claude-haiku-4-5 (fastest) or claude-sonnet-4-6 (balance) — see notes in build_llm_service. +ANTHROPIC_MODEL = os.environ.get("ANTHROPIC_MODEL", "claude-opus-4-8") +# In-call function-calling: AUTO by provider — ON for Claude (reliable tool calls → real-time +# Odoo booking), OFF for local Ollama (llama3.1:8b over-calls / leaks JSON). An explicit +# ENABLE_TOOLS env overrides the auto choice either way. +_enable_tools_env = os.environ.get("ENABLE_TOOLS") +ENABLE_TOOLS = ( + _enable_tools_env.lower() in ("1", "true", "yes") + if _enable_tools_env is not None + else (LLM_PROVIDER == "anthropic") +) +LLM_TEMPERATURE = float(os.environ.get("LLM_TEMPERATURE", "0.3")) +LLM_MAX_TOKENS = int(os.environ.get("LLM_MAX_TOKENS", "160")) +KOKORO_VOICE = os.environ.get("KOKORO_VOICE", "af_heart") +# Real-time STT is Deepgram Nova-2: end-of-utterance events in <300ms (vs Whisper's +# 1-3s of chunk buffering, the main cause of non-reply / repeat-yourself). Whisper +# large-v3 is retained for post-call transcription only (Phase 3). +DEEPGRAM_API_KEY = os.environ.get("DEEPGRAM_API_KEY", "") + +# Twilio sends 8 kHz mu-law on the wire — we run the PIPELINE at 16 kHz and let +# TwilioFrameSerializer resample to/from the 8 kHz wire. (Silero VAD, Deepgram, and +# Kokoro are all happy at 16 kHz.) +WIRE_SAMPLE_RATE = 8000 # Twilio mu-law on the wire (serializer handles this) +PIPELINE_SAMPLE_RATE = 16000 # internal rate Whisper/VAD actually need + +# VAD tuning. Defaults (confidence 0.7 / min_volume 0.6) are desktop-mic values that can +# miss short/quiet 8 kHz telephony utterances like "yes" — loosen them for the phone. +VAD_CONFIDENCE = float(os.environ.get("VAD_CONFIDENCE", "0.5")) +VAD_MIN_VOLUME = float(os.environ.get("VAD_MIN_VOLUME", "0.3")) +VAD_START_SECS = float(os.environ.get("VAD_START_SECS", "0.2")) +VAD_STOP_SECS = float(os.environ.get("VAD_STOP_SECS", "0.5")) + +# Agent persona name — purely for warmth; change/remove freely. +AGENT_NAME = os.environ.get("AGENT_NAME", "Sofia") + +SYSTEM_PROMPT = ( + f"You are {AGENT_NAME}, a warm, friendly receptionist for Advanced Vision Care, an " + "optometry practice with eight offices in South Florida. You are on a real phone call, so " + "talk like a helpful human being: natural, relaxed, and genuinely conversational — usually " + "just one short sentence at a time. Speak in English. Say numbers, dates, and times as " + "words a person would say.\n\n" + "Your job is to answer callers' questions and to take appointment requests. To book a " + "visit you need four things: which office or city, the reason for the visit, a preferred " + "day and time, and their name. Gather these naturally as the conversation flows — don't " + "interrogate, and never ask for something the caller already told you (people often give " + "their name or reason in their first sentence). You already have their number from caller " + "ID, so never ask for a phone number. When you have the details, repeat them back in one " + "warm sentence to confirm, and let them know a staff member will call to finalize the time.\n\n" + "Stay truthful and within your limits:\n" + "- Use ONLY the facts below for addresses, phone numbers, insurance, and services. Never " + "make any of these up.\n" + "- To find the right office, ask what CITY or AREA is most convenient for the caller. Do " + "NOT suggest or name a specific office yourself — you don't know where they are. Only after " + "they tell you their area, name the matching office; and only list locations if they ask " + "what offices exist.\n" + "- You cannot see a calendar, so never say a time is open or available — take the time as " + "a request that staff will confirm.\n" + "- Insurance: only confirm a plan that is in the list below. For any plan that is not " + "listed (UnitedHealthcare, Aetna, Cigna, and so on), don't say yes or no — say our staff " + "will verify their coverage.\n" + "- Hours are not published — say they vary by office and staff will confirm; never give " + "specific hours.\n" + "- You don't give medical advice and can't transfer calls. If the caller mentions an eye " + "problem, just note it as the reason and say a staff member or doctor will follow up.\n" + "- If you're not sure you heard something, simply ask them to repeat it.\n" + "- When the caller is all set, give a brief, warm closing that ends with the word " + "'Goodbye' — that ends the call, so only say it when you truly mean to.\n\n" + "PRACTICE FACTS:\n" + practice_summary() +) + + +def _build_tools() -> ToolsSchema: + # Only the booking action is a tool. Practice facts already live in the system prompt, + # so no get_practice_info tool (avoids needless calls/latency). callback_number is NOT + # required — we have the caller-ID and inject it in the handler. + return ToolsSchema( + standard_tools=[ + FunctionSchema( + name="record_appointment_request", + description=( + "Record the caller's appointment request once you have their name and at " + "least the office/city and reason. Call this when the caller wants to book " + "a visit; staff will call back to confirm the exact time." + ), + properties={ + "patient_name": {"type": "string", "description": "Caller's full name"}, + "location": {"type": "string", "description": "Which office/city the caller wants, e.g. Hialeah, Kendall, Tamarac"}, + "reason": {"type": "string", "description": "Reason for the visit, e.g. annual exam, broken glasses, eye pain"}, + "preferred_time": {"type": "string", "description": "Preferred day/time in the caller's words, if given"}, + }, + required=["patient_name"], + ), + ] + ) + + +class EndCallProcessor(FrameProcessor): + """Lets Sofia hang up. MUST sit between the LLM and the TTS: there it sees her reply + text (LLMTextFrame, flowing downstream) AND the upstream copy of BotStoppedSpeakingFrame + the output transport emits. It accumulates each reply; if the finished reply contains a + closing ('goodbye'/'adiós'), it waits until she's done speaking, then pushes EndTaskFrame + upstream — the task ends and TwilioFrameSerializer (auto_hang_up) drops the call.""" + + _CLOSINGS = ("goodbye", "good-bye", "good bye", "adiós", "adios", "hasta luego") + + def __init__(self): + super().__init__() + self._buf = "" + self._should_end = False + + @classmethod + def _is_closing(cls, text: str) -> bool: + t = (text or "").lower() + return any(c in t for c in cls._CLOSINGS) + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if isinstance(frame, LLMTextFrame): + self._buf += frame.text + elif isinstance(frame, LLMFullResponseEndFrame): + if self._is_closing(self._buf): + self._should_end = True + logger.info("Sofia signalled closing -- will hang up after she finishes speaking") + self._buf = "" + elif isinstance(frame, BotStoppedSpeakingFrame) and self._should_end: + self._should_end = False + logger.info("Sofia closed the call -- ending task / hanging up") + await self.push_frame(EndTaskFrame(), FrameDirection.UPSTREAM) + await self.push_frame(frame, direction) + + +class AudioHeartbeat(FrameProcessor): + """Diagnostic: logs how many inbound audio frames arrive every ~5s. If this keeps + ticking but VAD never fires, the issue is VAD/threshold; if it drops to 0 after a + turn, inbound audio stalled at the transport. Cheap, leave it on while stabilizing.""" + + def __init__(self): + super().__init__() + self._n = 0 + self._t = time.time() + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if isinstance(frame, InputAudioRawFrame): + self._n += 1 + now = time.time() + if now - self._t >= 5: + logger.info(f"[audio-in] {self._n} frames in last {now - self._t:.0f}s") + self._n = 0 + self._t = now + await self.push_frame(frame, direction) + + +def build_llm_service(): + """Build the LLM service for the selected provider. The universal LLMContext + + aggregators work with either, so only this construction differs (true A/B swap).""" + if LLM_PROVIDER == "anthropic": + if not ANTHROPIC_API_KEY: + raise RuntimeError("LLM_PROVIDER=anthropic but ANTHROPIC_API_KEY is not set") + logger.info(f"LLM provider: anthropic ({ANTHROPIC_MODEL})") + # NOTE: Opus 4.8/4.7 reject temperature/top_p/top_k (HTTP 400), so we omit them — + # this keeps the default Opus model working. For low-latency phone voice, prefer + # claude-haiku-4-5 (fastest) or claude-sonnet-4-6 over Opus. enable_prompt_caching + # caches the system prompt + growing conversation prefix (helps multi-turn cost/latency). + return AnthropicLLMService( + api_key=ANTHROPIC_API_KEY, + settings=AnthropicLLMService.Settings( + model=ANTHROPIC_MODEL, + enable_prompt_caching=True, + max_tokens=LLM_MAX_TOKENS, + ), + ) + logger.info(f"LLM provider: ollama ({OLLAMA_MODEL})") + return OLLamaLLMService( + settings=OLLamaLLMService.Settings( + model=OLLAMA_MODEL, + temperature=LLM_TEMPERATURE, + max_tokens=LLM_MAX_TOKENS, + ), + base_url=OLLAMA_URL, + ) + + +async def run_agent(transport, caller_number=None, call_sid=None, do_capture=True): + """Build + run the AVC voice agent on a given transport. Shared by the phone path + (Twilio Media Stream) and the browser path (WebRTC) — same prompt, model, voice, and + booking/hang-up logic; only the transport differs. do_capture writes the post-call + appointment to Odoo (on for phone; off for browser testing so it doesn't make cards).""" + stt = DeepgramSTTService( + api_key=DEEPGRAM_API_KEY, + settings=DeepgramSTTService.Settings( + model="nova-2", + language="en-US", + smart_format=True, + punctuate=True, + interim_results=False, # final transcripts only — avoids double-firing + utterance_end_ms=1000, # ms of silence before end-of-utterance fires + ), + ) + llm = build_llm_service() + # In-call booking tool — only registered when ENABLE_TOOLS is on (auto: Claude yes, + # local Ollama no, since llama3.1:8b over-calls/leaks). The handler is a closure so it + # can stamp the verified caller-ID + call_sid onto the lead (the model never supplies a + # phone number — we don't ask for one). With tools on, this writes the Odoo lead IN-CALL, + # so the post-call extraction is skipped below to avoid a duplicate. + if ENABLE_TOOLS: + async def _record_appointment(params): + args = params.arguments or {} + if do_capture: + from practice import persist_appointment + persist_appointment({ + "call_sid": call_sid, + "patient_name": args.get("patient_name"), + "callback_number": caller_number, # verified caller-ID, not model-supplied + "location": args.get("location"), + "reason": args.get("reason"), + "preferred_time": args.get("preferred_time"), + "source": "in_call_tool", + }) + else: + logger.info(f"[capture off] would record appointment: {args.get('patient_name')} / {args.get('location')}") + await params.result_callback( + {"status": "recorded", "message": "Recorded — staff will call to confirm the time."} + ) + + llm.register_function("record_appointment_request", _record_appointment) + + tts = KokoroTTSService( + model_path=os.path.join(MODEL_DIR, "kokoro-v1.0.onnx"), + voices_path=os.path.join(MODEL_DIR, "voices-v1.0.bin"), + settings=KokoroTTSService.Settings(voice=KOKORO_VOICE), + ) + vad = VADProcessor(vad_analyzer=SileroVADAnalyzer(params=VADParams( + confidence=VAD_CONFIDENCE, + start_secs=VAD_START_SECS, + stop_secs=VAD_STOP_SECS, + min_volume=VAD_MIN_VOLUME, + ))) + heartbeat = AudioHeartbeat() + + context_kwargs = {"messages": [{"role": "system", "content": SYSTEM_PROMPT}]} + if ENABLE_TOOLS: + context_kwargs["tools"] = _build_tools() + context = LLMContext(**context_kwargs) + agg = LLMContextAggregatorPair(context) + endcall = EndCallProcessor() + + pipeline = Pipeline( + [ + transport.input(), + heartbeat, + vad, + stt, + agg.user(), + llm, + endcall, + tts, + transport.output(), + agg.assistant(), + ] + ) + + task = PipelineTask( + pipeline, + params=PipelineParams( + audio_in_sample_rate=PIPELINE_SAMPLE_RATE, + audio_out_sample_rate=PIPELINE_SAMPLE_RATE, + allow_interruptions=True, + ), + ) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info("Client connected -- greeting") + await task.queue_frames( + [TTSSpeakFrame( + f"Thank you for calling Advanced Vision Care, this is {AGENT_NAME}. " + "How can I help you today?" + )] + ) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info("Client disconnected -- ending task") + await task.queue_frame(EndFrame()) + + runner = PipelineRunner(handle_sigint=False) + await runner.run(task) + + # Call is over. Post-call extraction is the capture path ONLY when in-call tools are + # off (local Ollama). With tools on (Claude), the booking was already written in-call, + # so skip extraction to avoid a duplicate lead. + if do_capture and not ENABLE_TOOLS: + try: + from extract import extract_and_record + + await extract_and_record( + context.messages, OLLAMA_URL, OLLAMA_MODEL, + call_sid=call_sid, caller_number=caller_number, + ) + except Exception: + logger.exception("Post-call appointment extraction failed") + + +async def run_call(websocket, serializer: TwilioFrameSerializer, caller_number=None, call_sid=None): + """Phone entrypoint: wrap the Twilio Media Stream in a transport, run the shared agent.""" + transport = FastAPIWebsocketTransport( + websocket=websocket, + params=FastAPIWebsocketParams( + audio_in_enabled=True, + audio_out_enabled=True, + audio_in_sample_rate=PIPELINE_SAMPLE_RATE, + audio_out_sample_rate=PIPELINE_SAMPLE_RATE, + add_wav_header=False, + serializer=serializer, + ), + ) + await run_agent(transport, caller_number=caller_number, call_sid=call_sid, do_capture=True) diff --git a/bot_web.py b/bot_web.py new file mode 100644 index 0000000..e191da0 --- /dev/null +++ b/bot_web.py @@ -0,0 +1,63 @@ +#!/usr/bin/env python3 +"""Browser entrypoint for the SAME Sofia agent — for fast iteration without phoning in. + +Reuses bot.run_agent (identical prompt, model, voice, booking + hang-up logic) but over a +browser WebRTC transport via Pipecat's dev runner. Serves a mic UI at http://localhost:7860. + +Caveat: this path is 16 kHz WebRTC, NOT 8 kHz telephony — great for testing conversation / +prompt / voice / hang-up, but it does NOT reproduce phone-specific audio (µ-law, clipping, +VAD-on-8kHz). Appointment capture to Odoo is OFF here (do_capture=False) so browser tests +don't create CRM cards. + +Run: + ./run_web.sh # then open http://localhost:7860, click Connect, allow mic, talk +""" + +from pipecat.runner.utils import create_transport +from pipecat.transports.base_transport import TransportParams + +from bot import run_agent + + +import os + +# Browser tests don't write to Odoo by default (keeps test bookings out of the CRM). Set +# WEB_ALLOW_CAPTURE=true for a one-off test that actually creates the lead. +WEB_ALLOW_CAPTURE = os.environ.get("WEB_ALLOW_CAPTURE", "false").lower() in ("1", "true", "yes") + + +async def bot(runner_args): + """Called by pipecat.runner.run.main for each browser connection.""" + transport = await create_transport( + runner_args, + {"webrtc": lambda: TransportParams(audio_in_enabled=True, audio_out_enabled=True)}, + ) + await run_agent(transport, do_capture=WEB_ALLOW_CAPTURE) + + +if __name__ == "__main__": + import os + import sys + + import uvicorn + + # Serve on the LAN over HTTPS so the mic works from another machine (browsers require + # a secure context off-localhost). Reuses the self-signed certs (valid for 10.10.1.221). + CERT = os.environ.get("WEB_CERT", "/home/tocmo0nlord/pipecat-run/certs/cert.pem") + KEY = os.environ.get("WEB_KEY", "/home/tocmo0nlord/pipecat-run/certs/key.pem") + if os.path.exists(CERT) and os.path.exists(KEY): + _orig_run = uvicorn.run + + def _run_with_tls(app, *args, **kwargs): + kwargs.setdefault("ssl_certfile", CERT) + kwargs.setdefault("ssl_keyfile", KEY) + return _orig_run(app, *args, **kwargs) + + uvicorn.run = _run_with_tls + if "--host" not in sys.argv: + sys.argv += ["--host", "0.0.0.0"] + print("Browser UI: https://10.10.1.221:7860 (accept the self-signed cert once)") + + from pipecat.runner.run import main + + main() diff --git a/deploy/nginx-voip.activeblue.net.conf b/deploy/nginx-voip.activeblue.net.conf new file mode 100644 index 0000000..e1d0376 --- /dev/null +++ b/deploy/nginx-voip.activeblue.net.conf @@ -0,0 +1,58 @@ +# nginx reverse proxy for the AVC phone agent. +# Terminates TLS for voip.activeblue.net and proxies to the app on 127.0.0.1:8200, +# forwarding the /ws WebSocket (Twilio Media Stream) with long timeouts. +# +# Install: +# sudo cp deploy/nginx-voip.activeblue.net.conf /etc/nginx/sites-available/voip.activeblue.net +# sudo cp deploy/nginx-ws-upgrade.conf /etc/nginx/conf.d/ws-upgrade.conf +# sudo ln -s /etc/nginx/sites-available/voip.activeblue.net /etc/nginx/sites-enabled/ +# sudo nginx -t && sudo systemctl reload nginx +# (Get the cert FIRST — see README — or the 443 block fails to load.) + +# ── HTTP :80 — ACME challenge + redirect everything else to HTTPS ───────────── +server { + listen 80; + listen [::]:80; + server_name voip.activeblue.net; + + # Let's Encrypt HTTP-01 webroot challenge (served, not redirected). + location /.well-known/acme-challenge/ { + root /var/www/html; + } + + location / { + return 301 https://$host$request_uri; + } +} + +# ── HTTPS :443 — TLS termination + proxy to the app ────────────────────────── +server { + listen 443 ssl; + listen [::]:443 ssl; + http2 on; + server_name voip.activeblue.net; + + ssl_certificate /etc/letsencrypt/live/voip.activeblue.net/fullchain.pem; + ssl_certificate_key /etc/letsencrypt/live/voip.activeblue.net/privkey.pem; + ssl_protocols TLSv1.2 TLSv1.3; + ssl_ciphers HIGH:!aNULL:!MD5; + ssl_prefer_server_ciphers on; + + # Twilio Media Streams hold the WebSocket open for the whole call — allow it. + proxy_read_timeout 3600s; + proxy_send_timeout 3600s; + + location / { + proxy_pass http://127.0.0.1:8200; + proxy_http_version 1.1; + + # WebSocket upgrade (for /ws). $connection_upgrade comes from ws-upgrade.conf. + proxy_set_header Upgrade $http_upgrade; + proxy_set_header Connection $connection_upgrade; + + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + } +} diff --git a/deploy/nginx-ws-upgrade.conf b/deploy/nginx-ws-upgrade.conf new file mode 100644 index 0000000..eddf262 --- /dev/null +++ b/deploy/nginx-ws-upgrade.conf @@ -0,0 +1,8 @@ +# Maps the Connection header for WebSocket proxying. Goes in /etc/nginx/conf.d/ so it +# lives in the http{} context (a `map` can't go inside a server/location block). +# When a request carries `Upgrade: websocket`, send `Connection: upgrade`; otherwise +# `Connection: close`. Used by the voip.activeblue.net vhost for the /ws media stream. +map $http_upgrade $connection_upgrade { + default upgrade; + '' close; +} diff --git a/deploy/setup-tls.sh b/deploy/setup-tls.sh new file mode 100755 index 0000000..0c10260 --- /dev/null +++ b/deploy/setup-tls.sh @@ -0,0 +1,43 @@ +#!/usr/bin/env bash +# One-shot TLS + nginx reverse-proxy setup for the AVC phone agent. +# RUN AS ROOT: sudo bash deploy/setup-tls.sh +# +# Prerequisites (must be true BEFORE running): +# - DNS: voip.activeblue.net -> your WAN IP (done: 66.23.239.222) +# - Router forwards external 80 AND 443 -> this box (10.10.1.221) +# - nginx running with its default :80 site (used to answer the ACME challenge) +# +# What it does: installs certbot, gets a Let's Encrypt cert via the webroot challenge +# (served by the existing default :80 site), installs the vhost + ws-upgrade map, then +# tests and reloads nginx. Idempotent-ish; safe to re-run. +set -euo pipefail + +DOMAIN="voip.activeblue.net" +EMAIL="mr.garcia09@gmail.com" +APP_DIR="/home/tocmo0nlord/avc-phone" +WEBROOT="/var/www/html" + +if [ "$(id -u)" -ne 0 ]; then echo "Run as root (sudo)."; exit 1; fi + +echo "==> 1/4 install certbot" +if ! command -v certbot >/dev/null 2>&1; then + apt-get update && apt-get install -y certbot +fi + +echo "==> 2/4 obtain certificate for $DOMAIN (webroot challenge)" +mkdir -p "$WEBROOT/.well-known/acme-challenge" +certbot certonly --webroot -w "$WEBROOT" -d "$DOMAIN" \ + --non-interactive --agree-tos -m "$EMAIL" --keep-until-expiring + +echo "==> 3/4 install nginx vhost + ws-upgrade map" +cp "$APP_DIR/deploy/nginx-ws-upgrade.conf" /etc/nginx/conf.d/ws-upgrade.conf +cp "$APP_DIR/deploy/nginx-voip.activeblue.net.conf" /etc/nginx/sites-available/voip.activeblue.net +ln -sf /etc/nginx/sites-available/voip.activeblue.net /etc/nginx/sites-enabled/voip.activeblue.net + +echo "==> 4/4 test + reload nginx" +nginx -t +systemctl reload nginx + +echo +echo "Done. Verify: curl https://$DOMAIN/health" +echo "Cert auto-renews via the certbot systemd timer; nginx reload on renew is handled by certbot's deploy hook." diff --git a/extract.py b/extract.py new file mode 100644 index 0000000..f33501d --- /dev/null +++ b/extract.py @@ -0,0 +1,101 @@ +"""Post-call appointment extraction. + +Instead of unreliable in-call tool-calling (which made llama3.1:8b speak raw JSON), +we let the agent gather appointment details conversationally, then run ONE structured +extraction over the finished transcript and write it to Odoo. Reliable because it's a +single JSON-mode completion, not mid-conversation tool emission. +""" + +import json +import re + +import httpx +from loguru import logger + +from practice import persist_appointment + +_EXTRACT_INSTRUCTIONS = ( + "You are reviewing a phone-call transcript between a caller and the receptionist " + "for an optometry practice. Extract any APPOINTMENT REQUEST the caller made.\n" + "Respond with ONLY a JSON object with these keys:\n" + ' "wants_appointment": boolean — true only if the caller asked to book/schedule a visit\n' + ' "patient_name": string or null\n' + ' "callback_number": string or null (digits the caller gave to be called back)\n' + ' "location": string or null (which office/city)\n' + ' "reason": string or null (e.g. eye exam, broken glasses)\n' + ' "preferred_time": string or null (day/time in the caller\'s words)\n' + "Use null for anything not clearly stated. Do not invent values." +) + + +async def extract_and_record(messages, ollama_url, model, call_sid=None, caller_number=None): + """Extract an appointment from the transcript and persist it. Returns the record + dict if one was saved, else None.""" + # Build a plain transcript from the conversation (skip the system prompt). + turns = [ + f"{m['role']}: {m['content']}" + for m in messages + if m.get("role") in ("user", "assistant") and isinstance(m.get("content"), str) and m["content"].strip() + ] + if not any(m.get("role") == "user" for m in messages): + return None # nobody said anything + transcript = "\n".join(turns) + + base = ollama_url.rstrip("/") + if base.endswith("/v1"): + base = base[:-3] + + try: + async with httpx.AsyncClient(timeout=30) as client: + r = await client.post( + f"{base}/api/chat", + json={ + "model": model, + "format": "json", + "stream": False, + "options": {"temperature": 0}, + "messages": [ + {"role": "system", "content": _EXTRACT_INSTRUCTIONS}, + {"role": "user", "content": f"Transcript:\n{transcript}"}, + ], + }, + ) + r.raise_for_status() + data = json.loads(r.json()["message"]["content"]) + except Exception: + logger.exception("Appointment extraction call failed") + return None + + if not data.get("wants_appointment"): + logger.info("Post-call extraction: no appointment requested") + return None + + # Don't create near-empty cards from quick hang-ups: require at least a name or a + # reason. A bare location + caller-ID isn't enough to be worth a worklist card. + name = (data.get("patient_name") or "").strip() + reason_raw = (data.get("reason") or "").strip() + if not name and not reason_raw: + logger.info("Post-call extraction: appointment intent but no name/reason captured — skipping card") + return None + + # Prefer the verified Twilio caller-ID over a number pulled from the transcript — + # the model sometimes invents/echoes a phone number. Keep a genuinely different + # spoken number as a note for staff. + spoken = (data.get("callback_number") or "").strip() + callback = caller_number or spoken or None + reason = data.get("reason") + if spoken and caller_number and re.sub(r"\D", "", spoken) != re.sub(r"\D", "", caller_number): + reason = f"{reason or ''} (caller mentioned alternate number: {spoken})".strip() + + record = { + "call_sid": call_sid, + "patient_name": data.get("patient_name"), + "callback_number": callback, + "location": data.get("location"), + "reason": reason, + "preferred_time": data.get("preferred_time"), + "source": "post_call_extraction", + } + where = persist_appointment(record) + logger.info(f"Post-call appointment saved ({where}): {record['patient_name']} / {record['location']}") + return record diff --git a/odoo_client.py b/odoo_client.py new file mode 100644 index 0000000..768d288 --- /dev/null +++ b/odoo_client.py @@ -0,0 +1,106 @@ +"""Minimal Odoo XML-RPC client for the phone agent. + +Creates an appointment *request* in Odoo from a captured call. A request is NOT a +confirmed booking — staff call the patient back to finalize — so by default we write +a CRM lead (a clean "to-do" that doesn't occupy a real calendar slot). Set +ODOO_TARGET=calendar to instead drop a tentative event on the calendar. + +Auth + target are all env-driven (see .env.example). Connection is lazy and every +failure is swallowed by the caller's fallback, so a flaky Odoo never drops a request. +""" + +import os +import xmlrpc.client +from datetime import datetime, timedelta +from html import escape + +ODOO_URL = os.environ.get("ODOO_URL", "http://localhost:8069") +ODOO_DB = os.environ.get("ODOO_DB", "db1") +ODOO_USER = os.environ.get("ODOO_USER", "") +ODOO_API_KEY = os.environ.get("ODOO_API_KEY", "") +ODOO_TARGET = os.environ.get("ODOO_TARGET", "crm").lower() # "crm" | "calendar" +# Pipeline placement for crm target. If ODOO_STAGE_ID is set, the request is created as a +# staged opportunity (shows up in the CRM pipeline as a worklist) instead of a bare lead. +ODOO_STAGE_ID = int(os.environ["ODOO_STAGE_ID"]) if os.environ.get("ODOO_STAGE_ID") else None +ODOO_TEAM_ID = int(os.environ["ODOO_TEAM_ID"]) if os.environ.get("ODOO_TEAM_ID") else None +ODOO_USER_ID = int(os.environ["ODOO_USER_ID"]) if os.environ.get("ODOO_USER_ID") else None + + +class OdooError(RuntimeError): + pass + + +def _connect(): + if not (ODOO_USER and ODOO_API_KEY): + raise OdooError("ODOO_USER / ODOO_API_KEY not set") + common = xmlrpc.client.ServerProxy(f"{ODOO_URL}/xmlrpc/2/common") + uid = common.authenticate(ODOO_DB, ODOO_USER, ODOO_API_KEY, {}) + if not uid: + raise OdooError("Odoo authentication failed (check db/user/key)") + models = xmlrpc.client.ServerProxy(f"{ODOO_URL}/xmlrpc/2/object") + return uid, models + + +def _exec(uid, models, model, method, *args, **kw): + return models.execute_kw(ODOO_DB, uid, ODOO_API_KEY, model, method, list(args), kw) + + +def _find_or_create_partner(uid, models, name, phone): + """Return a res.partner id, matching on phone first, else creating one.""" + domain = [] + if phone: + domain = ["|", ["phone", "=", phone], ["mobile", "=", phone]] + if domain: + hit = _exec(uid, models, "res.partner", "search", domain, limit=1) + if hit: + return hit[0] + vals = {"name": name or "Phone caller", "phone": phone or False, "company_type": "person"} + return _exec(uid, models, "res.partner", "create", vals) + + +def create_appointment_request(patient_name, callback_number, reason, preferred_time, call_sid=None): + """Create the request in Odoo. Returns (model, record_id) or raises OdooError.""" + uid, models = _connect() + summary = f"📞 Phone appt request — {patient_name or 'caller'}" + # description is an Odoo HTML field — build with
so it renders in the UI. + rows = [ + ("Name", patient_name), + ("Callback", callback_number), + ("Reason", reason), + ("Preferred time (patient's words)", preferred_time), + ("Twilio call SID", call_sid), + ] + note = "

Captured by the AVC phone agent (UNCONFIRMED — call patient to finalize).

" + \ + "
".join(f"{escape(k)}: {escape(str(v)) if v else '—'}" for k, v in rows) + "

" + + if ODOO_TARGET == "calendar": + partner_id = _find_or_create_partner(uid, models, patient_name, callback_number) + # Tentative 30-min slot tomorrow 9:00 as a visible placeholder; real time set on callback. + start = (datetime.utcnow() + timedelta(days=1)).replace(hour=9, minute=0, second=0, microsecond=0) + vals = { + "name": summary, + "start": start.strftime("%Y-%m-%d %H:%M:%S"), + "stop": (start + timedelta(minutes=30)).strftime("%Y-%m-%d %H:%M:%S"), + "description": note, + "partner_ids": [(4, partner_id)], + } + rec = _exec(uid, models, "calendar.event", "create", vals) + return ("calendar.event", rec) + + # CRM target. With a stage configured, create a staged opportunity (lands in the + # pipeline as a worklist staff act on); otherwise a plain lead. + vals = { + "name": summary, + "contact_name": patient_name or False, + "phone": callback_number or False, + "description": note, + "type": "opportunity" if ODOO_STAGE_ID else "lead", + } + if ODOO_STAGE_ID: + vals["stage_id"] = ODOO_STAGE_ID + if ODOO_TEAM_ID: + vals["team_id"] = ODOO_TEAM_ID + if ODOO_USER_ID: + vals["user_id"] = ODOO_USER_ID + rec = _exec(uid, models, "crm.lead", "create", vals) + return ("crm.lead", rec) diff --git a/practice.py b/practice.py new file mode 100644 index 0000000..3c6ec5d --- /dev/null +++ b/practice.py @@ -0,0 +1,155 @@ +"""Advanced Vision Care practice facts + the phone agent's tools. + +Facts sourced from advancedvisioncareflorida.com (8 locations across Broward, +Miami-Dade, Palm Beach). NOTE: the website does NOT publish office hours, so we do +NOT assert hours — the agent must offer to have staff confirm them instead of +inventing them. Fill HOURS in if/when you have them. +""" + +import json +import os +import re +from datetime import datetime, timezone + +from loguru import logger + +# ───────────────────────────────────────────────────────────────────────────── +# Real facts from advancedvisioncareflorida.com +# ───────────────────────────────────────────────────────────────────────────── +LOCATIONS = [ + # Broward County + {"city": "Hollywood / Fort Lauderdale", "address": "2873 Stirling Rd, Fort Lauderdale, FL 33312", "phone": "(954) 983-4969"}, + {"city": "Tamarac", "address": "5865 N University Dr, Tamarac, FL 33321", "phone": "(954) 720-2720"}, + {"city": "Pembroke Pines", "address": "246 S Flamingo Rd, Pembroke Pines, FL 33027", "phone": "(954) 443-1230"}, + {"city": "Lauderdale Lakes", "address": "3682 W Oakland Park Blvd, Lauderdale Lakes, FL 33311", "phone": "(954) 730-8087"}, + # Miami-Dade County + {"city": "Hialeah", "address": "1770 W 32nd Pl, Hialeah, FL 33012", "phone": "(305) 885-4477"}, + {"city": "Kendall", "address": "11605 N Kendall Dr, Miami, FL 33176", "phone": "(305) 982-8927"}, + {"city": "Miami Gardens", "address": "4771 NW 183rd St, Miami Gardens, FL 33055", "phone": "(305) 390-2467"}, + # Palm Beach County + {"city": "Boca Raton", "address": "21673 State Road 7, Boca Raton, FL 33428", "phone": "(561) 470-2310"}, +] + +PRACTICE_FACTS = { + "name": "Advanced Vision Care", + "locations": LOCATIONS, + "insurance": [ + "CarePlus", "Doctors Health", "Florida Blue Medicare", "Optum", "Spectera", + "Sunshine Health", "VSP", "WellCare", + ], + "services": ( + "routine and medical eye exams, contact lens exams, pediatric eye exams, " + "and LASIK consultations" + ), + # Website does not publish hours — leave None so the agent won't invent them. + "hours": None, +} + +REQUESTS_LOG = os.path.join(os.path.dirname(os.path.abspath(__file__)), "appointment_requests.jsonl") + + +# Expand street abbreviations so the TTS speaks "North Kendall Drive", not "N … D-R". +_ABBREV = { + "NW": "Northwest", "NE": "Northeast", "SW": "Southwest", "SE": "Southeast", + "N": "North", "S": "South", "E": "East", "W": "West", + "Dr": "Drive", "Rd": "Road", "Blvd": "Boulevard", "St": "Street", + "Ave": "Avenue", "Pl": "Place", "Ln": "Lane", "Ct": "Court", "Hwy": "Highway", + "FL": "Florida", +} + + +def _spoken_address(addr: str) -> str: + """Expand directional + street-type abbreviations for natural speech.""" + return re.sub( + r"\b(" + "|".join(re.escape(k) for k in _ABBREV) + r")\b", + lambda m: _ABBREV[m.group(1)], + addr, + ) + + +def practice_summary() -> str: + """Compact facts block for the system prompt.""" + f = PRACTICE_FACTS + loc_lines = "\n".join(f" - {l['city']}: {_spoken_address(l['address'])} — {l['phone']}" for l in f["locations"]) + hours = f["hours"] or ( + "NOT published — do not state specific hours; offer to have the office confirm." + ) + return ( + f"Practice name: {f['name']}\n" + f"Locations ({len(f['locations'])} offices across South Florida):\n{loc_lines}\n" + f"Insurance accepted (these EXACT plans only): {', '.join(f['insurance'])}.\n" + f"Services: {f['services']}\n" + f"Hours: {hours}\n" + ) + + +def _find_location(name: str): + """Loose match a caller's city/location text to a known office.""" + if not name: + return None + n = name.lower() + for l in LOCATIONS: + if n in l["city"].lower() or l["city"].lower() in n: + return l + return None + + +# ─── Tools (used when ENABLE_TOOLS=true and the model supports tool-calling) ── + +def persist_appointment(record: dict) -> str: + """Write an appointment request to Odoo (a crm.lead) if configured, else to the + JSONL fallback so a request is never lost. Returns where it landed. Used by both + the post-call extraction and the (optional) in-call tool.""" + record.setdefault("ts", datetime.now(timezone.utc).isoformat()) + if os.environ.get("ODOO_USER") and os.environ.get("ODOO_API_KEY"): + try: + from odoo_client import create_appointment_request + + model, rec_id = create_appointment_request( + patient_name=record.get("patient_name"), + callback_number=record.get("callback_number"), + reason=f"[{record.get('location') or 'location TBD'}] {record.get('reason') or ''}".strip(), + preferred_time=record.get("preferred_time"), + call_sid=record.get("call_sid"), + ) + logger.info(f"Appointment -> Odoo {model} id={rec_id}: {record.get('patient_name')}") + return f"odoo:{model}:{rec_id}" + except Exception as e: + logger.warning(f"Odoo write failed ({e!r}); falling back to local log") + record["odoo_error"] = repr(e) + + with open(REQUESTS_LOG, "a") as fh: + fh.write(json.dumps(record) + "\n") + logger.info(f"Appointment -> JSONL: {record.get('patient_name')}") + return "jsonl" + + +async def record_appointment_request(params): + """In-call tool path (only used when ENABLE_TOOLS=true). Wraps persist_appointment.""" + args = params.arguments or {} + persist_appointment({ + "call_sid": getattr(params, "call_sid", None), + "patient_name": args.get("patient_name"), + "callback_number": args.get("callback_number"), + "location": args.get("location"), + "reason": args.get("reason"), + "preferred_time": args.get("preferred_time"), + "source": "in_call_tool", + }) + await params.result_callback( + {"status": "captured", "message": "Got it — our staff will call you back to confirm the time."} + ) + + +async def get_practice_info(params): + """Return practice facts (optionally narrowed to one location) for accurate answers.""" + args = params.arguments or {} + loc = _find_location(args.get("location", "")) + result = { + "name": PRACTICE_FACTS["name"], + "insurance": PRACTICE_FACTS["insurance"], + "services": PRACTICE_FACTS["services"], + "hours": "not published — offer to have the office confirm", + } + result["location"] = loc if loc else PRACTICE_FACTS["locations"] + await params.result_callback(result) diff --git a/run.sh b/run.sh new file mode 100755 index 0000000..9028f4c --- /dev/null +++ b/run.sh @@ -0,0 +1,19 @@ +#!/usr/bin/env bash +# Launch the AVC phone agent. Reuses the pipecat-run venv (has pipecat-ai 1.3.0, +# faster-whisper, kokoro-onnx, fastapi) and the CUDA12 libs we installed into it. +set -euo pipefail + +HERE="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +VENV="/home/tocmo0nlord/pipecat-run/.venv" + +# faster-whisper (ctranslate2) needs the CUDA12 cublas + cudnn shared libs at runtime. +NV="$VENV/lib/python3.13/site-packages/nvidia" +export LD_LIBRARY_PATH="$NV/cublas/lib:$NV/cudnn/lib:${LD_LIBRARY_PATH:-}" + +# Load .env if present (Twilio creds, PUBLIC_HOST, model overrides). +if [ -f "$HERE/.env" ]; then + set -a; . "$HERE/.env"; set +a +fi + +cd "$HERE" +exec "$VENV/bin/python" server.py diff --git a/run_web.sh b/run_web.sh new file mode 100755 index 0000000..14ab8c9 --- /dev/null +++ b/run_web.sh @@ -0,0 +1,20 @@ +#!/usr/bin/env bash +# Launch the browser (WebRTC) version of Sofia for fast iteration — same brain as the +# phone agent, served at http://localhost:7860. Mirrors run.sh (venv + CUDA libs + .env). +set -euo pipefail + +HERE="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +VENV="/home/tocmo0nlord/pipecat-run/.venv" + +# faster-whisper (ctranslate2) needs the CUDA12 cublas + cudnn shared libs at runtime. +NV="$VENV/lib/python3.13/site-packages/nvidia" +export LD_LIBRARY_PATH="$NV/cublas/lib:$NV/cudnn/lib:${LD_LIBRARY_PATH:-}" + +# Load .env (model, voice, VAD, etc.) — same config the phone agent uses. +if [ -f "$HERE/.env" ]; then + set -a; . "$HERE/.env"; set +a +fi + +cd "$HERE" +echo "Sofia (browser) starting — open http://localhost:7860 once it's up." +exec "$VENV/bin/python" bot_web.py diff --git a/server.py b/server.py new file mode 100644 index 0000000..db3c5a8 --- /dev/null +++ b/server.py @@ -0,0 +1,218 @@ +#!/usr/bin/env python3 +"""Twilio-facing web server for the AVC phone agent. + +Two endpoints, both reached by Twilio over your public Traefik domain: + + POST /voice -> returns TwiML telling Twilio to open a bidirectional Media Stream + back to wss:///ws?token= + WS /ws -> Twilio's Media Stream. We check the stream token, read the opening + 'start' event for the SIDs, then hand the socket to the pipeline. + +Security: + - POST /voice is authenticated with Twilio's X-Twilio-Signature (HMAC-SHA1 over the + public URL + sorted POST params, keyed by the API Key Secret). Enforced whenever + TWILIO_API_KEY_SECRET is set; set TWILIO_VALIDATE=false to bypass for local testing. + - WS /ws can't carry an X-Twilio-Signature usefully, so we gate it with a shared + STREAM_TOKEN embedded in the wss URL we hand Twilio in the TwiML. + +Inbound only. Run behind Traefik (TLS terminated there); this app listens plain HTTP +on $PORT. See README for the Twilio number + Traefik wiring. +""" + +import asyncio +import base64 +import hashlib +import hmac +import json +import os +import secrets + +from fastapi import FastAPI, Request, WebSocket +from fastapi.responses import HTMLResponse +from loguru import logger + +from bot import run_call +from pipecat.serializers.twilio import TwilioFrameSerializer + +# Public hostname Twilio dials back into (your Traefik domain), e.g. phone.example.com +PUBLIC_HOST = os.environ.get("PUBLIC_HOST", "CHANGE-ME.example.com") +PORT = int(os.environ.get("PORT", "8200")) +# Bind localhost by default: nginx terminates TLS and proxies in from 127.0.0.1, so the +# app needn't be exposed on the LAN. Set BIND_HOST=0.0.0.0 only if a remote proxy needs it. +BIND_HOST = os.environ.get("BIND_HOST", "127.0.0.1") + +# Twilio REST creds — let the serializer auto-hang-up the carrier leg on EndFrame, +# and validate inbound webhook signatures. +TWILIO_ACCOUNT_SID = os.environ.get("TWILIO_ACCOUNT_SID") +# Standard API Key (scoped to this app, revocable independently) instead of the account +# master Auth Token. The Secret is used both for HMAC webhook-signature validation and as +# the serializer credential for auto-hang-up. +TWILIO_API_KEY_SID = os.environ.get("TWILIO_API_KEY_SID") +TWILIO_API_KEY_SECRET = os.environ.get("TWILIO_API_KEY_SECRET") +# Signature validation is ON by default when the API key secret exists; explicit opt-out. +TWILIO_VALIDATE = os.environ.get("TWILIO_VALIDATE", "true").lower() not in ("false", "0", "no") + +# Shared secret embedded in the Media Stream wss URL to gate /ws. Auto-generated if +# unset (fine for a single process), but set it in .env for stability across restarts. +STREAM_TOKEN = os.environ.get("STREAM_TOKEN") or secrets.token_urlsafe(24) + +# Max simultaneous live calls. Each call holds an Ollama context on the 16GB GPU and +# Ollama serializes generation, so cap this to protect call quality. +# Over-cap callers hear BUSY_MESSAGE and are hung up — existing calls are never degraded. +MAX_CONCURRENT_CALLS = int(os.environ.get("MAX_CONCURRENT_CALLS", "2")) +BUSY_MESSAGE = os.environ.get( + "BUSY_MESSAGE", + "Thank you for calling Advanced Vision Care. All of our lines are busy right now. " + "Please call back in a few minutes. Goodbye.", +) + +app = FastAPI() + +# Live count of active /ws pipelines (the real GPU consumers), guarded by a lock. +_active_calls = 0 +_active_lock = asyncio.Lock() + + +async def _reserve_call_slot() -> bool: + """Atomically take a call slot. Returns False if at capacity.""" + global _active_calls + async with _active_lock: + if _active_calls >= MAX_CONCURRENT_CALLS: + return False + _active_calls += 1 + return True + + +async def _release_call_slot(): + global _active_calls + async with _active_lock: + _active_calls = max(0, _active_calls - 1) + + +def _twilio_signature_ok(url: str, params: dict, header_sig: str) -> bool: + """Recompute Twilio's request signature and compare in constant time. + + Algorithm (Twilio docs): take the full public URL, append each POST param as + key+value sorted by key, HMAC-SHA1 with the API Key Secret, base64-encode. + """ + if not (TWILIO_API_KEY_SECRET and header_sig): + return False + payload = url + "".join(f"{k}{params[k]}" for k in sorted(params)) + digest = hmac.new(TWILIO_API_KEY_SECRET.encode(), payload.encode("utf-8"), hashlib.sha1).digest() + expected = base64.b64encode(digest).decode() + return hmac.compare_digest(expected, header_sig) + + +@app.get("/health") +async def health(): + return { + "status": "ok", + "public_host": PUBLIC_HOST, + "validate": TWILIO_VALIDATE and bool(TWILIO_API_KEY_SECRET), + "active_calls": _active_calls, + "max_calls": MAX_CONCURRENT_CALLS, + } + + +@app.post("/voice") +async def voice(request: Request): + """TwiML: connect the call to our Media Stream WebSocket (bidirectional).""" + form = dict(await request.form()) + if TWILIO_VALIDATE and TWILIO_API_KEY_SECRET: + # Validate against the PUBLIC url Twilio actually signed, not the internal one. + public_url = f"https://{PUBLIC_HOST}/voice" + sig = request.headers.get("X-Twilio-Signature", "") + if not _twilio_signature_ok(public_url, form, sig): + logger.warning("Rejected /voice: bad or missing X-Twilio-Signature") + return HTMLResponse(status_code=403, content="forbidden") + elif not TWILIO_API_KEY_SECRET: + logger.warning("/voice signature validation DISABLED (no TWILIO_API_KEY_SECRET set)") + + caller = form.get("From", "") # caller-ID; passed through for appointment callback + + # Capacity gate: if all slots are busy, speak the busy message and hang up here — + # before any GPU work — so in-progress calls are never degraded. (A reservation is + # taken at /ws, tied to the socket lifecycle; this is the live read of that count.) + if _active_calls >= MAX_CONCURRENT_CALLS: + logger.info(f"At capacity ({_active_calls}/{MAX_CONCURRENT_CALLS}) — returning busy") + busy = f""" + + {BUSY_MESSAGE} + +""" + return HTMLResponse(content=busy, media_type="application/xml") + + # NOTE: is bidirectional (agent can speak back). + # would be one-way (listen only) — do not use that here. + # Token passed as a (Twilio does NOT preserve a query string on the + # wss URL); it arrives in the /ws 'start' message's customParameters. + twiml = f""" + + + + + + + +""" + return HTMLResponse(content=twiml, media_type="application/xml") + + +@app.websocket("/ws") +async def media_stream(websocket: WebSocket): + # The stream token rides in the TwiML , which Twilio delivers inside the + # 'start' message's customParameters — so we must accept the socket to read it, then + # validate + capacity-gate before doing any real work. + await websocket.accept() + + call_sid = None + reserved = False + try: + # Twilio sends a 'connected' frame, then a 'start' frame with SIDs + params. + msgs = websocket.iter_text() + await msgs.__anext__() # 'connected' + start = json.loads(await msgs.__anext__()) # 'start' + start_data = start["start"] + + token = (start_data.get("customParameters") or {}).get("token") + if token != STREAM_TOKEN: + logger.warning("Rejected /ws: bad or missing stream token") + await websocket.close(code=1008) # policy violation + return + + # Capacity gate (hard safety net for the /voice→/ws race). + if not await _reserve_call_slot(): + logger.warning(f"/ws over capacity ({_active_calls}/{MAX_CONCURRENT_CALLS}) — closing") + await websocket.close(code=1013) # try again later + return + reserved = True + + stream_sid = start_data["streamSid"] + call_sid = start_data["callSid"] + caller_number = (start_data.get("customParameters") or {}).get("caller") or None + logger.info( + f"Media stream start: call={call_sid} stream={stream_sid} caller={caller_number} " + f"({_active_calls}/{MAX_CONCURRENT_CALLS} slots in use)" + ) + + serializer = TwilioFrameSerializer( + stream_sid=stream_sid, + call_sid=call_sid, + account_sid=TWILIO_ACCOUNT_SID, + auth_token=TWILIO_API_KEY_SECRET, + ) + await run_call(websocket, serializer, caller_number=caller_number, call_sid=call_sid) + except Exception: + logger.exception("Call pipeline error") + finally: + if reserved: + await _release_call_slot() + logger.info(f"Call ended: {call_sid} ({_active_calls}/{MAX_CONCURRENT_CALLS} slots in use)") + + +if __name__ == "__main__": + import uvicorn + + logger.info(f"AVC phone agent on {BIND_HOST}:{PORT} | public={PUBLIC_HOST} | " + f"sig_validation={'on' if (TWILIO_VALIDATE and TWILIO_API_KEY_SECRET) else 'OFF'}") + uvicorn.run(app, host=BIND_HOST, port=PORT) diff --git a/traefik-avc-phone.yml b/traefik-avc-phone.yml new file mode 100644 index 0000000..ca18c26 --- /dev/null +++ b/traefik-avc-phone.yml @@ -0,0 +1,71 @@ +# Traefik DYNAMIC configuration for the AVC phone agent. +# File-provider snippet — Traefik does NOT run on the GPU box (miaai), so this routes +# your existing Traefik host to the phone service over the network. +# +# ── Install ────────────────────────────────────────────────────────────────── +# Drop this into the directory your Traefik watches as a file provider, e.g. +# traefik.yml (static): +# providers: +# file: +# directory: /etc/traefik/dynamic +# watch: true +# then: cp traefik-avc-phone.yml /etc/traefik/dynamic/ (Traefik hot-reloads it) +# +# ── BEFORE IT WORKS, set these 4 things to match YOUR Traefik ──────────────── +# 1) HOST -> the real domain (must equal PUBLIC_HOST in the app's .env, +# because Twilio's signature is computed over https:///voice) +# 2) entryPoints -> your HTTPS entrypoint name (commonly `websecure`; could be `https`) +# 3) certResolver -> your ACME resolver name (commonly `le`/`letsencrypt`/`myresolver`) +# 4) the service url -> reachable address of the GPU box from the Traefik host +# (LAN 10.10.1.221:8200; swap for the NetBird overlay IP if that's +# the path Traefik uses to reach miaai) +# +# WebSockets (the /ws Twilio Media Stream): Traefik forwards the Upgrade handshake +# automatically — no special middleware needed. One router/service covers both the +# /voice HTTPS POST and the /ws WSS upgrade because they share host + backend. + +http: + routers: + avc-phone: + rule: "Host(`phone.example.com`)" # 1) <-- your domain + entryPoints: + - websecure # 2) <-- your HTTPS entrypoint + service: avc-phone + tls: + certResolver: le # 3) <-- your ACME resolver + + # Optional: redirect plain HTTP -> HTTPS for this host. Omit if your Traefik does + # this globally already. + avc-phone-http: + rule: "Host(`phone.example.com`)" # 1) <-- your domain + entryPoints: + - web # your HTTP (:80) entrypoint name + middlewares: + - avc-phone-https-redirect + service: avc-phone # never reached (redirected first) + + services: + avc-phone: + loadBalancer: + passHostHeader: true + servers: + - url: "http://10.10.1.221:8200" # 4) <-- GPU box (miaai) : app port + # Twilio media streams are long-lived; keep generous timeouts on the + # entrypoint (static config) — see note at bottom. + + middlewares: + avc-phone-https-redirect: + redirectScheme: + scheme: https + permanent: true + +# ── One static-config reminder (NOT part of this file) ─────────────────────── +# Twilio Media Streams hold the WebSocket open for the whole call. If your HTTPS +# entrypoint has a short respondingTimeout/idleTimeout, long calls get cut. In the +# STATIC traefik config, ensure the entrypoint allows long-lived streams, e.g.: +# entryPoints: +# websecure: +# address: ":443" +# transport: +# respondingTimeouts: +# idleTimeout: 3600s