Files
odootrain/api/main.py
Carlos Garcia 7fb1573bac Initial commit: Odoo 18 RAG stack
Scraper, indexer, and FastAPI query service for Retrieval-Augmented
Generation over Odoo 18 documentation. Uses Qdrant + Ollama (nomic-embed-text
+ llama3.1). Integrates with ActiveBlue PeerBus agent interface.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-14 11:25:55 -04:00

317 lines
10 KiB
Python

#!/usr/bin/env python3
"""
Odoo 18 RAG Query API
======================
FastAPI service — embeds the question, retrieves top-K chunks from Qdrant,
builds a prompt, and streams or returns the answer from Ollama.
Endpoints:
POST /ask blocking answer + sources
POST /ask/stream Server-Sent Events token stream
POST /agent/ask ActiveBlue AI agent integration
GET /health connectivity check
GET /modules list indexed modules
GET /stats collection stats
Run:
uvicorn api.main:app --host 0.0.0.0 --port 8000 --reload
"""
import json
import logging
import os
import httpx
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field
from qdrant_client import QdrantClient
from qdrant_client.models import Filter, FieldCondition, MatchValue
from typing import AsyncIterator
logging.basicConfig(level=logging.INFO)
log = logging.getLogger("odoo18_rag")
OLLAMA_URL = os.getenv("OLLAMA_URL", "http://miaai:11434")
QDRANT_URL = os.getenv("QDRANT_URL", "http://qdrant:6333")
EMBED_MODEL = os.getenv("EMBED_MODEL", "nomic-embed-text")
GEN_MODEL = os.getenv("GEN_MODEL", "llama3.1")
COLLECTION_NAME = os.getenv("COLLECTION_NAME", "odoo18_docs")
TOP_K = 6
MAX_CONTEXT = 4000
SYSTEM_PROMPT = """\
You are an expert Odoo 18 consultant for ActiveBlue LLC, an MSP serving \
medical and dental practices. You have deep knowledge of all Odoo 18 modules: \
Finance, Accounting, Inventory, Manufacturing, Purchase, Sales, CRM, HR, \
Payroll, eCommerce, Helpdesk, Project, and more.
Answer questions clearly and concisely using the provided documentation context. \
Use numbered steps when explaining procedures. Always mention the Odoo menu path \
when explaining navigation. If the context doesn't cover the question fully, say \
so and answer from general knowledge.\
"""
# ── Models ────────────────────────────────────────────────────────────────────
class AskRequest(BaseModel):
question: str = Field(..., min_length=5, max_length=2000)
module: str | None = Field(None, description="Filter to one Odoo module")
model: str | None = Field(None, description="Override the LLM model")
top_k: int = Field(TOP_K, ge=1, le=20)
temperature: float = Field(0.3, ge=0.0, le=1.0)
class Source(BaseModel):
url: str
title: str
module: str
section: str
class AskResponse(BaseModel):
answer: str
sources: list[Source]
model: str
question: str
# ── App ───────────────────────────────────────────────────────────────────────
app = FastAPI(
title="Odoo 18 RAG API",
description="Retrieval-Augmented Generation over Odoo 18 documentation",
version="1.0.0",
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
qdrant = QdrantClient(url=QDRANT_URL)
# ── Helpers ───────────────────────────────────────────────────────────────────
async def embed_query(text: str) -> list:
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.post(
f"{OLLAMA_URL}/api/embed",
json={"model": EMBED_MODEL, "input": [text]},
)
resp.raise_for_status()
embeddings = resp.json().get("embeddings", [])
if not embeddings:
raise HTTPException(500, "Empty embedding response from Ollama")
return embeddings[0]
def retrieve(vector: list, top_k: int, module: str | None) -> list:
query_filter = None
if module:
query_filter = Filter(
must=[FieldCondition(key="module", match=MatchValue(value=module))]
)
results = qdrant.search(
collection_name=COLLECTION_NAME,
query_vector=vector,
limit=top_k,
query_filter=query_filter,
with_payload=True,
)
return [hit.payload for hit in results]
def build_prompt(question: str, chunks: list) -> str:
context_parts = []
char_count = 0
for i, chunk in enumerate(chunks, 1):
block = (
f"[Source {i}: {chunk.get('title', '')} | {chunk.get('section', '')}]\n"
f"{chunk.get('text', '')}\n"
f"URL: {chunk.get('url', '')}\n"
)
if char_count + len(block) > MAX_CONTEXT:
break
context_parts.append(block)
char_count += len(block)
return (
f"{SYSTEM_PROMPT}\n\n"
f"## Relevant documentation\n\n"
f"{'---'.join(context_parts)}\n\n"
f"---\n\n"
f"## Question\n\n{question}\n\n"
f"## Answer\n"
)
def dedupe_sources(chunks: list) -> list[Source]:
seen = set()
sources = []
for chunk in chunks:
url = chunk.get("url", "")
if url not in seen:
seen.add(url)
sources.append(Source(
url=url,
title=chunk.get("title", ""),
module=chunk.get("module", ""),
section=chunk.get("section", ""),
))
return sources
async def generate_blocking(prompt: str, model: str, temperature: float) -> str:
async with httpx.AsyncClient(timeout=120) as client:
resp = await client.post(
f"{OLLAMA_URL}/api/generate",
json={
"model": model,
"prompt": prompt,
"stream": False,
"options": {"temperature": temperature, "num_ctx": 8192},
},
)
resp.raise_for_status()
return resp.json().get("response", "").strip()
async def generate_stream(prompt: str, model: str, temperature: float) -> AsyncIterator[str]:
async with httpx.AsyncClient(timeout=120) as client:
async with client.stream(
"POST",
f"{OLLAMA_URL}/api/generate",
json={
"model": model,
"prompt": prompt,
"stream": True,
"options": {"temperature": temperature, "num_ctx": 8192},
},
) as resp:
async for line in resp.aiter_lines():
if line.strip():
try:
data = json.loads(line)
token = data.get("response", "")
if token:
yield token
if data.get("done"):
break
except json.JSONDecodeError:
continue
# ── Endpoints ─────────────────────────────────────────────────────────────────
@app.get("/health")
async def health():
status = {"api": "ok", "qdrant": "unknown", "ollama": "unknown"}
try:
info = qdrant.get_collection(COLLECTION_NAME)
status["qdrant"] = f"ok ({info.points_count} vectors)"
except Exception as e:
status["qdrant"] = f"error: {e}"
try:
async with httpx.AsyncClient(timeout=5) as client:
resp = await client.get(f"{OLLAMA_URL}/api/tags")
models = [m["name"] for m in resp.json().get("models", [])]
status["ollama"] = f"ok ({len(models)} models)"
except Exception as e:
status["ollama"] = f"error: {e}"
return status
@app.get("/modules")
async def list_modules():
try:
result = qdrant.scroll(collection_name=COLLECTION_NAME, limit=1000, with_payload=["module"])
modules = sorted(set(p.payload.get("module", "general") for p in result[0]))
return {"modules": modules}
except Exception as e:
raise HTTPException(500, str(e))
@app.get("/stats")
async def stats():
try:
info = qdrant.get_collection(COLLECTION_NAME)
return {
"collection": COLLECTION_NAME,
"vectors": info.points_count,
"vector_size": 768,
"embed_model": EMBED_MODEL,
"gen_model": GEN_MODEL,
}
except Exception as e:
raise HTTPException(500, str(e))
@app.post("/ask", response_model=AskResponse)
async def ask(req: AskRequest):
model = req.model or GEN_MODEL
try:
vector = await embed_query(req.question)
except Exception as e:
raise HTTPException(500, f"Embedding failed: {e}")
chunks = retrieve(vector, req.top_k, req.module)
if not chunks:
raise HTTPException(404, "No relevant documentation found.")
prompt = build_prompt(req.question, chunks)
try:
answer = await generate_blocking(prompt, model, req.temperature)
except Exception as e:
raise HTTPException(500, f"Generation failed: {e}")
return AskResponse(
answer=answer,
sources=dedupe_sources(chunks),
model=model,
question=req.question,
)
@app.post("/ask/stream")
async def ask_stream(req: AskRequest):
model = req.model or GEN_MODEL
try:
vector = await embed_query(req.question)
except Exception as e:
raise HTTPException(500, f"Embedding failed: {e}")
chunks = retrieve(vector, req.top_k, req.module)
if not chunks:
raise HTTPException(404, "No relevant documentation found.")
prompt = build_prompt(req.question, chunks)
sources = [s.model_dump() for s in dedupe_sources(chunks)]
async def sse():
async for token in generate_stream(prompt, model, req.temperature):
yield f"data: {json.dumps({'type': 'token', 'content': token})}\n\n"
yield f"data: {json.dumps({'type': 'sources', 'sources': sources})}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(sse(), media_type="text/event-stream")
@app.post("/agent/ask")
async def agent_ask(req: AskRequest):
"""ActiveBlue AI agent endpoint — compatible with PeerBus message format."""
result = await ask(req)
return {
"answer": result.answer,
"sources": [s.url for s in result.sources],
"module_context": req.module,
"model_used": result.model,
}