from __future__ import annotations import asyncio, logging, time from .llm_types import LLMResponse, OllamaTimeoutError, OllamaUnavailableError logger = logging.getLogger(__name__) class OllamaBackend: def __init__(self, url, model, timeout=120, max_concurrent=2): self._url = url self._model = model self._timeout = timeout self._semaphore = asyncio.Semaphore(max_concurrent) self._active = 0 async def submit(self, messages, tools=None, caller='unknown'): import ollama wait_start = time.monotonic() async with self._semaphore: wait_ms = int((time.monotonic() - wait_start) * 1000) self._active += 1 t0 = time.monotonic() try: kwargs = {'model': self._model, 'messages': messages} if tools: kwargs['tools'] = tools # Force structured JSON output for callers that parse JSON responses. # Without this llama3.1:8b returns plain English instead of JSON. _JSON_CALLERS = {'master', 'expenses_agent_receipt_parser'} if caller in _JSON_CALLERS and not tools: kwargs['format'] = 'json' client = ollama.AsyncClient(host=self._url) try: response = await asyncio.wait_for(client.chat(**kwargs), timeout=self._timeout) except asyncio.TimeoutError: raise OllamaTimeoutError(f'Ollama timeout after {self._timeout}s caller={caller}') except Exception as exc: s = str(exc).lower() if 'connect' in s or 'refused' in s or 'unreachable' in s: raise OllamaUnavailableError(f'Ollama unreachable: {exc}') from exc raise OllamaUnavailableError(f'Ollama error: {exc}') from exc ms = int((time.monotonic() - t0) * 1000) # ollama-python returns dicts in 0.3.x, pydantic objects in newer # releases — accept either shape. def _get(obj, key, default=None): if isinstance(obj, dict): return obj.get(key, default) return getattr(obj, key, default) msg = _get(response, 'message') or {} raw_tool_calls = _get(msg, 'tool_calls') tool_calls = None if raw_tool_calls: tool_calls = [] for tc in raw_tool_calls: fn = _get(tc, 'function') or {} tool_calls.append({ 'name': _get(fn, 'name'), 'arguments': _get(fn, 'arguments'), }) tin = _get(response, 'prompt_eval_count') or 0 tout = _get(response, 'eval_count') or 0 logger.info('ollama caller=%s wait_ms=%d inf_ms=%d tin=%d tout=%d', caller, wait_ms, ms, tin, tout) return LLMResponse(content=_get(msg, 'content') or '', tool_calls=tool_calls, backend_used='ollama', model_used=self._model, tokens_in=tin, tokens_out=tout, latency_ms=ms) finally: self._active -= 1 async def ping(self) -> None: """Raise if Ollama is unreachable.""" import ollama client = ollama.AsyncClient(host=self._url) try: await asyncio.wait_for(client.list(), timeout=5) except asyncio.TimeoutError: raise OllamaUnavailableError('Ollama ping timed out') except Exception as exc: raise OllamaUnavailableError(f'Ollama ping failed: {exc}') from exc @property def active_count(self): return self._active