feat: add base agent and peer bus
This commit is contained in:
66
agent_service/agents/peer_bus.py
Normal file
66
agent_service/agents/peer_bus.py
Normal file
@@ -0,0 +1,66 @@
|
||||
from __future__ import annotations
|
||||
import asyncio, logging
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
MAX_DEPTH = 3
|
||||
|
||||
|
||||
@dataclass
|
||||
class PeerResponse:
|
||||
available: bool
|
||||
success: bool = False
|
||||
data: dict = field(default_factory=dict)
|
||||
agent: str = ''
|
||||
request_type: str = ''
|
||||
error: object = None
|
||||
|
||||
|
||||
class PeerCircularRequestError(Exception): pass
|
||||
|
||||
|
||||
class PeerBus:
|
||||
def __init__(self, registry, directive_id):
|
||||
self._registry = registry
|
||||
self._directive_id = directive_id
|
||||
self._call_log: list[dict] = []
|
||||
self._call_chain: list[str] = []
|
||||
|
||||
async def request(self, from_agent, to_agent, request_type, params, reason):
|
||||
if to_agent in self._call_chain:
|
||||
logger.warning('PeerBus: circular request blocked %s->%s chain=%s',
|
||||
from_agent, to_agent, self._call_chain)
|
||||
raise PeerCircularRequestError(f'Circular peer request: {from_agent}->{to_agent}')
|
||||
if len(self._call_chain) >= MAX_DEPTH:
|
||||
logger.warning('PeerBus: max depth %d reached', MAX_DEPTH)
|
||||
return PeerResponse(available=False, error=f'Max peer depth {MAX_DEPTH} reached')
|
||||
if not await self._registry.is_active(to_agent):
|
||||
logger.debug('PeerBus: agent %s inactive, from=%s', to_agent, from_agent)
|
||||
return PeerResponse(available=False, agent=to_agent, request_type=request_type)
|
||||
agent = self._registry.get_agent_instance(to_agent)
|
||||
if agent is None:
|
||||
return PeerResponse(available=False, agent=to_agent, request_type=request_type)
|
||||
self._call_chain.append(from_agent)
|
||||
try:
|
||||
result = await asyncio.wait_for(
|
||||
agent.handle_peer_request(request_type, params, self._directive_id),
|
||||
timeout=30)
|
||||
entry = {'from': from_agent, 'to': to_agent, 'type': request_type,
|
||||
'params': params, 'reason': reason, 'success': result.get('success', True)}
|
||||
self._call_log.append(entry)
|
||||
logger.debug('PeerBus: %s->%s type=%s ok', from_agent, to_agent, request_type)
|
||||
return PeerResponse(available=True, success=True, data=result,
|
||||
agent=to_agent, request_type=request_type)
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning('PeerBus: timeout %s->%s', from_agent, to_agent)
|
||||
return PeerResponse(available=True, success=False, agent=to_agent,
|
||||
request_type=request_type, error='Peer timeout after 30s')
|
||||
except Exception as exc:
|
||||
logger.error('PeerBus: error %s->%s: %s', from_agent, to_agent, exc)
|
||||
return PeerResponse(available=True, success=False, agent=to_agent,
|
||||
request_type=request_type, error=str(exc))
|
||||
finally:
|
||||
self._call_chain.pop()
|
||||
|
||||
@property
|
||||
def call_log(self): return self._call_log
|
||||
Reference in New Issue
Block a user