feat(mcp): add MCP gateway — 14 tools over SSE, all agent calls forced local
Architecture:
- agent_service/mcp/tools.py: 14 Tool definitions with JSON schemas
dispatch, finance_query, accounting_query, crm_query, sales_query,
project_query, elearning_query, expenses_query, employees_query,
get_health, list_agents, trigger_sweep, get_pending_approvals, approve_directive
- agent_service/mcp/server.py: mcp.Server with list_tools + call_tool handlers
- agent_service/routers/mcp_router.py: Starlette routes at /mcp/sse + /mcp/messages
- main.py: mounts MCP routes alongside existing FastAPI routers (graceful fallback if mcp not installed)
Privacy guarantee (enforced in server.py, not by convention):
- _force_local_context() sets llm_router._privacy_mode = 'local' before EVERY agent call
- _restore_mode() restores original mode after the tool returns
- HIPAA agents (finance, accounting, expenses, employees) were already Ollama-only;
MCP adds a second enforcement layer for all 8 agents
- MCP client (e.g. Claude Code CLI) receives only tool results — no LLM completions cross the boundary
Usage (Claude Code CLI):
claude mcp add --transport sse http://192.168.2.47:8001/mcp/sse
or copy claude_mcp_config.json to ~/.claude/mcp_servers.json
requirements.txt: added mcp==1.3.0
tests/test_mcp_server.py: 13 tests covering tool count, schemas, HIPAA labelling, privacy override
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
304
agent_service/mcp/server.py
Normal file
304
agent_service/mcp/server.py
Normal file
@@ -0,0 +1,304 @@
|
||||
"""
|
||||
ActiveBlue AI — MCP Server
|
||||
|
||||
Exposes 14 tools over SSE transport so Claude Code CLI (or any MCP client)
|
||||
can invoke our specialist agents via the Model Context Protocol.
|
||||
|
||||
PRIVACY GUARANTEE
|
||||
-----------------
|
||||
Every tool call that runs an agent is intercepted here and the LLM router is
|
||||
temporarily overridden to 'local' mode for the duration of that call.
|
||||
No agent reasoning ever leaves the machine — only the tool *results*
|
||||
(Odoo data, agent reports) are returned to the MCP client.
|
||||
|
||||
Transport: HTTP + SSE
|
||||
GET /mcp/sse — client connects, receives session endpoint
|
||||
POST /mcp/messages — client posts JSON-RPC tool calls
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
import uuid
|
||||
from contextlib import asynccontextmanager
|
||||
|
||||
from mcp.server import Server
|
||||
from mcp.server.sse import SseServerTransport
|
||||
from mcp.types import TextContent, Tool
|
||||
|
||||
from .tools import MCP_TOOLS, AGENT_TOOL_MAP
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
MCP_SERVER_NAME = 'activeblue-ai'
|
||||
MCP_SERVER_VERSION = '0.1.0'
|
||||
|
||||
|
||||
def _ok(data) -> list[TextContent]:
|
||||
"""Wrap a result as MCP TextContent."""
|
||||
if isinstance(data, str):
|
||||
return [TextContent(type='text', text=data)]
|
||||
return [TextContent(type='text', text=json.dumps(data, default=str, indent=2))]
|
||||
|
||||
|
||||
def _err(msg: str) -> list[TextContent]:
|
||||
return [TextContent(type='text', text=json.dumps({'error': msg}))]
|
||||
|
||||
|
||||
async def _force_local_context():
|
||||
"""
|
||||
Context manager that temporarily overrides the LLM router to local mode
|
||||
for the duration of an MCP tool call. Restores original mode on exit.
|
||||
"""
|
||||
from ..app_state import get_llm_router
|
||||
router = get_llm_router()
|
||||
if router is None:
|
||||
return None, None
|
||||
original_mode = getattr(router, '_privacy_mode', 'local')
|
||||
router._privacy_mode = 'local'
|
||||
return router, original_mode
|
||||
|
||||
|
||||
async def _restore_mode(router, original_mode):
|
||||
if router is not None and original_mode is not None:
|
||||
router._privacy_mode = original_mode
|
||||
|
||||
|
||||
def create_mcp_server() -> Server:
|
||||
server = Server(MCP_SERVER_NAME)
|
||||
|
||||
@server.list_tools()
|
||||
async def list_tools() -> list[Tool]:
|
||||
return MCP_TOOLS
|
||||
|
||||
@server.call_tool()
|
||||
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
|
||||
logger.info('MCP tool call: %s args=%s', name, list(arguments.keys()))
|
||||
t0 = time.monotonic()
|
||||
|
||||
# ----------------------------------------------------------
|
||||
# 1. dispatch — MasterAgent
|
||||
# ----------------------------------------------------------
|
||||
if name == 'dispatch':
|
||||
from ..app_state import get_master_agent
|
||||
master = get_master_agent()
|
||||
if master is None:
|
||||
return _err('Agent service not ready')
|
||||
message = arguments.get('message', '')
|
||||
user_id = str(arguments.get('user_id', 'mcp_user'))
|
||||
context = arguments.get('context', {})
|
||||
|
||||
router, orig = await _force_local_context()
|
||||
try:
|
||||
response = await asyncio.wait_for(
|
||||
master.handle_message(
|
||||
user_id=user_id,
|
||||
message=message,
|
||||
context=context,
|
||||
),
|
||||
timeout=120,
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
return _err('Dispatch timed out after 120s')
|
||||
except Exception as exc:
|
||||
logger.exception('dispatch error: %s', exc)
|
||||
return _err(str(exc))
|
||||
finally:
|
||||
await _restore_mode(router, orig)
|
||||
|
||||
result = {
|
||||
'directive_id': response.directive_id,
|
||||
'reply': response.reply,
|
||||
'escalations': response.escalations,
|
||||
'actions_taken': response.actions_taken,
|
||||
'duration_ms': round((time.monotonic() - t0) * 1000),
|
||||
}
|
||||
return _ok(result)
|
||||
|
||||
# ----------------------------------------------------------
|
||||
# 2. Specialist agent queries
|
||||
# ----------------------------------------------------------
|
||||
if name in AGENT_TOOL_MAP:
|
||||
agent_name = AGENT_TOOL_MAP[name]
|
||||
from ..app_state import get_master_agent
|
||||
master = get_master_agent()
|
||||
if master is None:
|
||||
return _err('Agent service not ready')
|
||||
|
||||
query = arguments.get('query', '')
|
||||
# Build context from remaining kwargs (partner_id, project_id, etc.)
|
||||
context = {k: v for k, v in arguments.items() if k != 'query'}
|
||||
|
||||
# Force local for this call
|
||||
router, orig = await _force_local_context()
|
||||
try:
|
||||
response = await asyncio.wait_for(
|
||||
master.handle_message(
|
||||
user_id='mcp_user',
|
||||
message=query,
|
||||
context={**context, '_preferred_agent': agent_name},
|
||||
),
|
||||
timeout=120,
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
return _err(f'{agent_name} timed out after 120s')
|
||||
except Exception as exc:
|
||||
logger.exception('%s error: %s', agent_name, exc)
|
||||
return _err(str(exc))
|
||||
finally:
|
||||
await _restore_mode(router, orig)
|
||||
|
||||
result = {
|
||||
'agent': agent_name,
|
||||
'reply': response.reply,
|
||||
'escalations': response.escalations,
|
||||
'actions_taken': response.actions_taken,
|
||||
'duration_ms': round((time.monotonic() - t0) * 1000),
|
||||
'privacy': 'local (Ollama)',
|
||||
}
|
||||
return _ok(result)
|
||||
|
||||
# ----------------------------------------------------------
|
||||
# 3. get_health
|
||||
# ----------------------------------------------------------
|
||||
if name == 'get_health':
|
||||
from ..app_state import get_db_pool, get_master_agent, get_llm_router
|
||||
from ..config import get_settings
|
||||
settings = get_settings()
|
||||
pool = get_db_pool()
|
||||
db_status = 'unavailable'
|
||||
if pool:
|
||||
try:
|
||||
async with pool.acquire(timeout=5) as conn:
|
||||
await conn.fetchval('SELECT 1')
|
||||
db_status = 'ok'
|
||||
except Exception as exc:
|
||||
db_status = f'error: {exc}'
|
||||
master_status = 'ok' if get_master_agent() else 'unavailable'
|
||||
llm_mode = getattr(get_llm_router(), '_privacy_mode', 'unknown') if get_llm_router() else 'unavailable'
|
||||
return _ok({
|
||||
'status': 'ok' if db_status == 'ok' and master_status == 'ok' else 'degraded',
|
||||
'db': db_status,
|
||||
'master_agent': master_status,
|
||||
'llm_mode': llm_mode,
|
||||
'mcp_privacy': 'local (all MCP tool calls use Ollama)',
|
||||
'uptime_ms': round((time.monotonic() - t0) * 1000),
|
||||
})
|
||||
|
||||
# ----------------------------------------------------------
|
||||
# 4. list_agents
|
||||
# ----------------------------------------------------------
|
||||
if name == 'list_agents':
|
||||
from ..app_state import get_agent_registry, get_llm_router
|
||||
registry = get_agent_registry()
|
||||
if registry is None:
|
||||
return _err('Registry not ready')
|
||||
agents = registry.get_all()
|
||||
result = []
|
||||
for a in agents:
|
||||
result.append({
|
||||
'name': a.get('name'),
|
||||
'domain': a.get('domain', ''),
|
||||
'active': a.get('active', True),
|
||||
'backend_in_mcp': 'ollama (forced local)',
|
||||
})
|
||||
return _ok({'agents': result, 'count': len(result)})
|
||||
|
||||
# ----------------------------------------------------------
|
||||
# 5. trigger_sweep
|
||||
# ----------------------------------------------------------
|
||||
if name == 'trigger_sweep':
|
||||
from ..app_state import get_sweep_coordinator
|
||||
coordinator = get_sweep_coordinator()
|
||||
if coordinator is None:
|
||||
return _err('Sweep coordinator not ready')
|
||||
agent_names = arguments.get('agents', [])
|
||||
router, orig = await _force_local_context()
|
||||
try:
|
||||
result = await asyncio.wait_for(
|
||||
coordinator.run_sweep(agents=agent_names or None),
|
||||
timeout=300,
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
return _err('Sweep timed out after 300s')
|
||||
except Exception as exc:
|
||||
logger.exception('sweep error: %s', exc)
|
||||
return _err(str(exc))
|
||||
finally:
|
||||
await _restore_mode(router, orig)
|
||||
result['duration_ms'] = round((time.monotonic() - t0) * 1000)
|
||||
return _ok(result)
|
||||
|
||||
# ----------------------------------------------------------
|
||||
# 6. get_pending_approvals
|
||||
# ----------------------------------------------------------
|
||||
if name == 'get_pending_approvals':
|
||||
from ..app_state import get_db_pool
|
||||
pool = get_db_pool()
|
||||
if pool is None:
|
||||
return _err('DB not ready')
|
||||
async with pool.acquire(timeout=10) as conn:
|
||||
rows = await conn.fetch(
|
||||
'SELECT directive_id, agent_name, action_type, description, created_at '
|
||||
'FROM ab_directive_log WHERE status = $1 ORDER BY created_at ASC',
|
||||
'pending_approval',
|
||||
)
|
||||
items = [
|
||||
{
|
||||
'directive_id': str(r['directive_id']),
|
||||
'agent': r['agent_name'] or '',
|
||||
'action': r['action_type'] or '',
|
||||
'description': r['description'] or '',
|
||||
'created_at': str(r['created_at']),
|
||||
}
|
||||
for r in rows
|
||||
]
|
||||
return _ok({'pending_approvals': items, 'count': len(items)})
|
||||
|
||||
# ----------------------------------------------------------
|
||||
# 7. approve_directive
|
||||
# ----------------------------------------------------------
|
||||
if name == 'approve_directive':
|
||||
from ..app_state import get_db_pool, get_master_agent
|
||||
pool = get_db_pool()
|
||||
if pool is None:
|
||||
return _err('DB not ready')
|
||||
directive_id = arguments.get('directive_id', '')
|
||||
approved = bool(arguments.get('approved', False))
|
||||
note = arguments.get('note', '')
|
||||
new_status = 'approved' if approved else 'rejected'
|
||||
async with pool.acquire(timeout=10) as conn:
|
||||
row = await conn.fetchrow(
|
||||
'SELECT status FROM ab_directive_log WHERE directive_id = $1',
|
||||
directive_id,
|
||||
)
|
||||
if not row:
|
||||
return _err(f'Directive {directive_id} not found')
|
||||
if row['status'] != 'pending_approval':
|
||||
return _err(f'Directive is not pending approval (status={row["status"]})')
|
||||
await conn.execute(
|
||||
'UPDATE ab_directive_log SET status=$1, approval_note=$2, updated_at=NOW() '
|
||||
'WHERE directive_id=$3',
|
||||
new_status, note, directive_id,
|
||||
)
|
||||
if approved:
|
||||
master = get_master_agent()
|
||||
if master and hasattr(master, 'resume_directive'):
|
||||
try:
|
||||
router, orig = await _force_local_context()
|
||||
await master.resume_directive(directive_id)
|
||||
await _restore_mode(router, orig)
|
||||
except Exception as exc:
|
||||
logger.error('resume_directive failed: %s', exc)
|
||||
return _ok({'directive_id': directive_id, 'status': new_status, 'note': note})
|
||||
|
||||
return _err(f'Unknown tool: {name}')
|
||||
|
||||
return server
|
||||
|
||||
|
||||
def build_sse_transport() -> SseServerTransport:
|
||||
"""Return an SSE transport bound to /mcp/messages."""
|
||||
return SseServerTransport('/mcp/messages')
|
||||
Reference in New Issue
Block a user