Files
odoo-ai/tests/conftest.py
ActiveBlue Build 7487fc73f9 feat(infra): add sweep coordinator, structured logging, test suite, and README
Sweep coordinator (Step 16):
- SweepCoordinator runs all 8 agents in parallel with 60s per-agent / 300s total timeout
- Aggregates findings, actions, errors into SweepCoordinatorResult
- Registered in FastAPI lifespan; triggered via POST /sweep

Structured logging (Step 18):
- logging_utils/structured.py: JSONFormatter emitting ts/level/logger/msg + custom fields
- log_directive_event() for structured directive lifecycle logging
- push_to_loki() async Loki push (graceful no-op if LOKI_URL unset)
- configure_logging() replaces root handler at startup

Tests (Steps 17+19):
- conftest.py: mock_odoo, mock_pool, mock_llm fixtures
- test_tool_validator.py: 9 tests covering validation, coercion, hallucination stripping
- test_llm_router.py: 6 tests covering local/cloud/hybrid modes and HIPAA enforcement
- test_peer_bus.py: 6 tests covering registration, timeout, depth, circular detection
- test_finance_agent.py: 10 tests covering all 6 steps + sweep + peer request
- test_memory_manager.py: 3 tests covering context build + hard cap enforcement
- test_dispatch_router.py: 3 tests covering dispatch, rate limit, health endpoint
- test_odoo_client.py: 4 tests covering search_read, write result, unlink warning
- test_e2e_dispatch.py: 2 E2E tests - full dispatch cycle + peer bus communication

README (Step 20): architecture diagram, privacy modes, quick start, env vars, structure

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-12 18:08:11 -04:00

48 lines
1.3 KiB
Python

import asyncio
import pytest
from unittest.mock import AsyncMock, MagicMock
@pytest.fixture(scope='session')
def event_loop():
loop = asyncio.new_event_loop()
yield loop
loop.close()
@pytest.fixture
def mock_odoo():
odoo = MagicMock()
odoo.search_read = AsyncMock(return_value=[])
odoo.write = AsyncMock()
odoo.call = AsyncMock(return_value=True)
odoo.ping = AsyncMock(return_value=True)
return odoo
@pytest.fixture
def mock_pool():
pool = MagicMock()
conn = AsyncMock()
conn.fetchval = AsyncMock(return_value=1)
conn.fetch = AsyncMock(return_value=[])
conn.fetchrow = AsyncMock(return_value=None)
conn.execute = AsyncMock()
pool.acquire = MagicMock(return_value=conn)
conn.__aenter__ = AsyncMock(return_value=conn)
conn.__aexit__ = AsyncMock(return_value=False)
return pool
@pytest.fixture
def mock_llm():
from agent_service.llm.llm_types import LLMResponse
llm = MagicMock()
llm.get_backend = AsyncMock(return_value='ollama')
llm.complete = AsyncMock(return_value=LLMResponse(
content='{"intent": "finance_query", "agents": ["finance_agent"], "confidence": 0.9}',
tool_calls=[], backend_used='ollama', model_used='llama3',
tokens_in=10, tokens_out=20, latency_ms=100,
))
return llm