from __future__ import annotations import asyncio, logging from dataclasses import dataclass, field logger = logging.getLogger(__name__) MAX_DEPTH = 3 @dataclass class PeerResponse: available: bool success: bool = False data: dict = field(default_factory=dict) agent: str = '' request_type: str = '' error: object = None class PeerCircularRequestError(Exception): pass class PeerBus: def __init__(self, registry, directive_id=None): self._registry = registry self._directive_id = directive_id self._call_log: list[dict] = [] self._call_chain: list[str] = [] async def request(self, from_agent, to_agent, request_type, params, reason): if to_agent in self._call_chain: logger.warning('PeerBus: circular request blocked %s->%s chain=%s', from_agent, to_agent, self._call_chain) raise PeerCircularRequestError(f'Circular peer request: {from_agent}->{to_agent}') if len(self._call_chain) >= MAX_DEPTH: logger.warning('PeerBus: max depth %d reached', MAX_DEPTH) return PeerResponse(available=False, error=f'Max peer depth {MAX_DEPTH} reached') if not await self._registry.is_active(to_agent): logger.debug('PeerBus: agent %s inactive, from=%s', to_agent, from_agent) return PeerResponse(available=False, agent=to_agent, request_type=request_type) agent = self._registry.get_agent_instance(to_agent) if agent is None: return PeerResponse(available=False, agent=to_agent, request_type=request_type) self._call_chain.append(from_agent) try: result = await asyncio.wait_for( agent.handle_peer_request(request_type, params, self._directive_id), timeout=30) entry = {'from': from_agent, 'to': to_agent, 'type': request_type, 'params': params, 'reason': reason, 'success': result.get('success', True)} self._call_log.append(entry) logger.debug('PeerBus: %s->%s type=%s ok', from_agent, to_agent, request_type) return PeerResponse(available=True, success=True, data=result, agent=to_agent, request_type=request_type) except asyncio.TimeoutError: logger.warning('PeerBus: timeout %s->%s', from_agent, to_agent) return PeerResponse(available=True, success=False, agent=to_agent, request_type=request_type, error='Peer timeout after 30s') except Exception as exc: logger.error('PeerBus: error %s->%s: %s', from_agent, to_agent, exc) return PeerResponse(available=True, success=False, agent=to_agent, request_type=request_type, error=str(exc)) finally: self._call_chain.pop() @property def call_log(self): return self._call_log