diff --git a/agent_service/agents/peer_bus.py b/agent_service/agents/peer_bus.py index 3670945..c4c6949 100644 --- a/agent_service/agents/peer_bus.py +++ b/agent_service/agents/peer_bus.py @@ -20,7 +20,7 @@ class PeerCircularRequestError(Exception): pass class PeerBus: - def __init__(self, registry, directive_id): + def __init__(self, registry, directive_id=None): self._registry = registry self._directive_id = directive_id self._call_log: list[dict] = [] diff --git a/agent_service/config.py b/agent_service/config.py index a31f349..fed19b9 100644 --- a/agent_service/config.py +++ b/agent_service/config.py @@ -14,10 +14,13 @@ class Settings(BaseSettings): ollama_url: str = 'http://localhost:11434' ollama_model: str = 'llama3' ollama_timeout: int = 120 + ollama_max_concurrent: int = 2 # Anthropic / Claude anthropic_api_key: str = '' claude_model: str = 'claude-sonnet-4-6' + claude_timeout: int = 120 + claude_max_concurrent: int = 2 # Privacy llm_privacy_mode: str = 'local' # local | hybrid | cloud diff --git a/agent_service/main.py b/agent_service/main.py index b61025b..dbac35b 100644 --- a/agent_service/main.py +++ b/agent_service/main.py @@ -16,18 +16,32 @@ logger = logging.getLogger(__name__) async def _init_db(settings) -> asyncpg.Pool: - pool = await asyncpg.create_pool( - host=settings.postgres_host, - port=settings.postgres_port, - database=settings.postgres_db, - user=settings.postgres_user, - password=settings.postgres_password, - min_size=settings.postgres_min_connections, - max_size=settings.postgres_max_connections, - max_inactive_connection_lifetime=300, + redacted_dsn = ( + f'postgresql://{settings.postgres_user}:***' + f'@{settings.postgres_host}:{settings.postgres_port}/{settings.postgres_db}' ) - logger.info('DB pool created (min=%d max=%d)', settings.postgres_min_connections, settings.postgres_max_connections) - return pool + last_exc: Exception | None = None + for attempt in range(1, 6): + try: + pool = await asyncpg.create_pool( + host=settings.postgres_host, + port=settings.postgres_port, + database=settings.postgres_db, + user=settings.postgres_user, + password=settings.postgres_password, + min_size=settings.postgres_min_connections, + max_size=settings.postgres_max_connections, + max_inactive_connection_lifetime=300, + ) + logger.info('DB pool created (min=%d max=%d dsn=%s)', + settings.postgres_min_connections, settings.postgres_max_connections, redacted_dsn) + return pool + except Exception as exc: + last_exc = exc + logger.warning('DB connect attempt %d/5 failed (dsn=%s): %s', attempt, redacted_dsn, exc) + if attempt < 5: + await asyncio.sleep(2) + raise last_exc # type: ignore[misc] async def _db_health_loop(pool: asyncpg.Pool) -> None: @@ -56,35 +70,8 @@ async def lifespan(app: FastAPI): # 2. LLM Router try: - from .llm.ollama_backend import OllamaBackend - from .llm.llm_config_store import LLMConfigStore from .llm.llm_router import LLMRouter - - ollama = OllamaBackend( - base_url=settings.ollama_url, - model=settings.ollama_model, - timeout=settings.ollama_timeout, - ) - config_store = LLMConfigStore(pool) if pool else None - claude = None - if settings.llm_privacy_mode != 'local' and settings.anthropic_api_key: - from .llm.claude_backend import ClaudeBackend - claude = ClaudeBackend(api_key=settings.anthropic_api_key, model=settings.claude_model) - - llm_router = LLMRouter( - ollama=ollama, - claude=claude, - config_store=config_store, - privacy_mode=settings.llm_privacy_mode, - env_overrides={ - name: settings.agent_backend_override(name) - for name in [ - 'finance_agent', 'accounting_agent', 'crm_agent', 'sales_agent', - 'project_agent', 'elearning_agent', 'expenses_agent', 'employees_agent', - ] - if settings.agent_backend_override(name) - }, - ) + llm_router = LLMRouter(config=settings, pg_pool=pool) app_state.set_llm_router(llm_router) logger.info('LLM router ready (mode=%s)', settings.llm_privacy_mode) except Exception as exc: @@ -107,7 +94,9 @@ async def lifespan(app: FastAPI): # 4. Agent registry try: from .agents.registry import AgentRegistry - agent_registry = AgentRegistry(odoo=odoo, pool=pool) + agent_registry = AgentRegistry() + if odoo: + await agent_registry.load_from_odoo(odoo) app_state.set_agent_registry(agent_registry) except Exception as exc: logger.error('Failed to init agent registry: %s', exc) @@ -124,8 +113,9 @@ async def lifespan(app: FastAPI): # 6. Peer bus + specialist agents try: from .agents.peer_bus import PeerBus - peer_bus = PeerBus() - _register_specialist_agents(peer_bus, odoo, llm_router) + peer_bus = PeerBus(registry=agent_registry) + if agent_registry: + _register_specialist_agents(agent_registry, peer_bus, odoo, llm_router) except Exception as exc: logger.error('Failed to init peer bus / specialist agents: %s', exc) peer_bus = None @@ -137,7 +127,6 @@ async def lifespan(app: FastAPI): odoo=odoo, llm=llm_router, memory=memory_mgr, - peer_bus=peer_bus, registry=agent_registry, ) app_state.set_master_agent(master) @@ -164,10 +153,10 @@ async def lifespan(app: FastAPI): logger.info('Agent service shut down') -def _register_specialist_agents(peer_bus, odoo, llm_router) -> None: +def _register_specialist_agents(agent_registry, peer_bus, odoo, llm_router) -> None: try: from .agents.finance_agent import FinanceAgent - peer_bus.register('finance_agent', FinanceAgent(odoo=odoo, llm=llm_router, peer_bus=peer_bus)) + agent_registry.register('finance_agent', FinanceAgent(odoo=odoo, llm=llm_router, peer_bus=peer_bus)) except Exception as exc: logger.warning('Could not register finance_agent: %s', exc) @@ -181,12 +170,11 @@ def _register_specialist_agents(peer_bus, odoo, llm_router) -> None: 'employees_agent': 'EmployeesAgent', } for agent_name, class_name in specialist_map.items(): - module_name = agent_name.replace('_agent', '_agent') try: import importlib mod = importlib.import_module(f'.agents.{agent_name}', package='agent_service') cls = getattr(mod, class_name) - peer_bus.register(agent_name, cls(odoo=odoo, llm=llm_router, peer_bus=peer_bus)) + agent_registry.register(agent_name, cls(odoo=odoo, llm=llm_router, peer_bus=peer_bus)) except ImportError: logger.debug('%s module not yet implemented, skipping', agent_name) except Exception as exc: diff --git a/agent_service/routers/mcp_router.py b/agent_service/routers/mcp_router.py index e388f67..4ff2ab8 100644 --- a/agent_service/routers/mcp_router.py +++ b/agent_service/routers/mcp_router.py @@ -10,6 +10,7 @@ from __future__ import annotations import logging from mcp.server.models import InitializationOptions +from mcp.server.lowlevel import NotificationOptions from starlette.requests import Request from starlette.responses import Response from starlette.routing import Route @@ -26,7 +27,7 @@ _INIT_OPTIONS = InitializationOptions( server_name='activeblue-ai', server_version='0.1.0', capabilities=_mcp_server.get_capabilities( - notification_options=None, + notification_options=NotificationOptions(), experimental_capabilities={}, ), )