Files
odoo-ai/tests/test_peer_bus.py
2026-05-20 04:00:45 +00:00

172 lines
6.9 KiB
Python

"""Unit tests for PeerBus — request routing, circular detection, depth limit, timeout."""
import asyncio
import pytest
from unittest.mock import AsyncMock, MagicMock
from agent_service.agents.peer_bus import PeerBus, PeerResponse, PeerCircularRequestError
def _make_registry(agents=None, active=None):
registry = MagicMock()
agents = agents or {}
active_set = set(active) if active is not None else set(agents.keys())
async def is_active(key):
return key in active_set
registry.is_active = is_active
registry.get_agent_instance = lambda key: agents.get(key)
return registry
def _make_agent(response=None):
agent = MagicMock()
agent.handle_peer_request = AsyncMock(
return_value=response if response is not None else {'answer': 42, 'success': True}
)
return agent
# ── basic request routing ────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_request_routes_to_agent():
agent = _make_agent({'answer': 42, 'success': True})
reg = _make_registry(agents={'mock_agent': agent})
bus = PeerBus(reg, directive_id='d1')
resp = await bus.request('from_agent', 'mock_agent', 'some_request', {}, 'reason')
assert isinstance(resp, PeerResponse)
assert resp.available is True
assert resp.success is True
assert resp.data.get('answer') == 42
@pytest.mark.asyncio
async def test_request_passes_correct_args_to_handler():
agent = _make_agent()
reg = _make_registry(agents={'mock_agent': agent})
bus = PeerBus(reg, directive_id='d1')
await bus.request('from_agent', 'mock_agent', 'query_type', {'key': 'val'}, 'reason')
agent.handle_peer_request.assert_awaited_once_with('query_type', {'key': 'val'}, 'd1')
@pytest.mark.asyncio
async def test_request_records_call_log():
agent = _make_agent()
reg = _make_registry(agents={'mock_agent': agent})
bus = PeerBus(reg, directive_id='d1')
await bus.request('from_agent', 'mock_agent', 'some_type', {}, 'reason')
assert len(bus.call_log) == 1
assert bus.call_log[0]['from'] == 'from_agent'
assert bus.call_log[0]['to'] == 'mock_agent'
assert bus.call_log[0]['type'] == 'some_type'
# ── inactive / missing agent ─────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_request_inactive_agent_returns_unavailable():
reg = _make_registry(agents={}, active=set())
bus = PeerBus(reg, directive_id='d1')
resp = await bus.request('from_agent', 'inactive_agent', 'some_type', {}, 'reason')
assert resp.available is False
@pytest.mark.asyncio
async def test_request_no_instance_returns_unavailable():
reg = _make_registry(agents={}, active={'ghost_agent'})
bus = PeerBus(reg, directive_id='d1')
resp = await bus.request('from_agent', 'ghost_agent', 'some_type', {}, 'reason')
assert resp.available is False
# ── circular detection ───────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_circular_request_raises():
agent = _make_agent()
reg = _make_registry(agents={'mock_agent': agent})
bus = PeerBus(reg, directive_id='d1')
bus._call_chain = ['mock_agent']
with pytest.raises(PeerCircularRequestError):
await bus.request('some_agent', 'mock_agent', 'some_type', {}, 'reason')
# ── max depth limit ──────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_max_depth_returns_unavailable():
agent = _make_agent()
reg = _make_registry(agents={'mock_agent': agent})
bus = PeerBus(reg, directive_id='d1')
bus._call_chain = ['a', 'b', 'c'] # already at MAX_DEPTH=3
resp = await bus.request('some_agent', 'mock_agent', 'some_type', {}, 'reason')
assert resp.available is False
assert 'depth' in str(resp.error).lower() or 'max' in str(resp.error).lower()
# ── timeout ──────────────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_timeout_returns_success_false():
agent = MagicMock()
async def slow_handler(request_type, params, directive_id):
await asyncio.sleep(35)
return {}
agent.handle_peer_request = slow_handler
reg = _make_registry(agents={'slow_agent': agent})
bus = PeerBus(reg, directive_id='d1')
import unittest.mock as um
with um.patch('asyncio.wait_for', side_effect=asyncio.TimeoutError):
resp = await bus.request('from_agent', 'slow_agent', 'some_type', {}, 'reason')
assert resp.available is True
assert resp.success is False
assert 'timeout' in str(resp.error).lower()
# ── exception from agent ─────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_agent_exception_returns_success_false():
agent = MagicMock()
agent.handle_peer_request = AsyncMock(side_effect=RuntimeError('boom'))
reg = _make_registry(agents={'bad_agent': agent})
bus = PeerBus(reg, directive_id='d1')
resp = await bus.request('from_agent', 'bad_agent', 'some_type', {}, 'reason')
assert resp.available is True
assert resp.success is False
assert 'boom' in str(resp.error)
# ── call_log ─────────────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_call_log_grows_with_requests():
agent = _make_agent()
reg = _make_registry(agents={'mock_agent': agent})
bus = PeerBus(reg, directive_id='d1')
await bus.request('a', 'mock_agent', 't1', {}, 'r1')
await bus.request('b', 'mock_agent', 't2', {}, 'r2')
assert len(bus.call_log) == 2
@pytest.mark.asyncio
async def test_call_log_empty_initially():
reg = _make_registry()
bus = PeerBus(reg, directive_id='d1')
assert bus.call_log == []
# ── PeerResponse fields ──────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_response_has_agent_and_request_type():
agent = _make_agent()
reg = _make_registry(agents={'mock_agent': agent})
bus = PeerBus(reg, directive_id='d1')
resp = await bus.request('from_agent', 'mock_agent', 'my_type', {}, 'reason')
assert resp.agent == 'mock_agent'
assert resp.request_type == 'my_type'