From 0e13b93e3a1b0decb67fb454ddb290ae16d6bfd8 Mon Sep 17 00:00:00 2001 From: ActiveBlue Build Date: Sun, 12 Apr 2026 16:46:13 -0400 Subject: [PATCH] feat: add Odoo XML-RPC client with connection pool and retry --- agent_service/tools/__init__.py | 0 agent_service/tools/odoo_client.py | 201 +++++++++++++++++++++++++++++ 2 files changed, 201 insertions(+) create mode 100644 agent_service/tools/__init__.py create mode 100644 agent_service/tools/odoo_client.py diff --git a/agent_service/tools/__init__.py b/agent_service/tools/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/agent_service/tools/odoo_client.py b/agent_service/tools/odoo_client.py new file mode 100644 index 0000000..6749b8a --- /dev/null +++ b/agent_service/tools/odoo_client.py @@ -0,0 +1,201 @@ +from __future__ import annotations +import asyncio, logging, os, socket, xmlrpc.client +from concurrent.futures import ThreadPoolExecutor +from dataclasses import dataclass, field +from typing import Any +import asyncpg + +logger = logging.getLogger(__name__) +XMLRPC_TIMEOUT = int(os.environ.get('XMLRPC_TIMEOUT', '60')) +_POOL_MAX = int(os.environ.get('POSTGRES_POOL_MAX', '10')) +_executor = ThreadPoolExecutor(max_workers=max(10, _POOL_MAX * 2)) + + +class _TimeoutTransport(xmlrpc.client.Transport): + def __init__(self, timeout=None): + super().__init__() + self._timeout = timeout or XMLRPC_TIMEOUT + def make_connection(self, host): + conn = super().make_connection(host) + conn.timeout = self._timeout + return conn + + +@dataclass +class WriteResult: + success: bool + model: str + record_id: object + action: str + before: dict = field(default_factory=dict) + after: dict = field(default_factory=dict) + error: object = None + + +class OdooAuthError(Exception): pass +class OdooWriteError(Exception): pass + + +class OdooClient: + def __init__(self, url, db, api_key, pg_dsn=None, pg_pool_min=2, pg_pool_max=10): + self._url = url.rstrip('/') + self._db = db + self._api_key = api_key + self._pg_dsn = pg_dsn + self._pg_pool_min = pg_pool_min + self._pg_pool_max = pg_pool_max + self._uid = None + self._auth_lock = asyncio.Lock() + self._pg_pool = None + self._health_task = None + transport = _TimeoutTransport() + self._common = xmlrpc.client.ServerProxy( + f'{self._url}/xmlrpc/2/common', transport=transport, allow_none=True) + self._models = xmlrpc.client.ServerProxy( + f'{self._url}/xmlrpc/2/object', transport=transport, allow_none=True) + + async def connect(self): + await self._authenticate() + if self._pg_dsn: + await self._init_pg_pool() + + async def close(self): + if self._health_task: + self._health_task.cancel() + if self._pg_pool: + await self._pg_pool.close() + + async def _xmlrpc_call(self, proxy, method, *args): + loop = asyncio.get_event_loop() + fn = getattr(proxy, method) + return await loop.run_in_executor(_executor, fn, *args) + + async def _authenticate(self): + async with self._auth_lock: + if self._uid is not None: + return + try: + uid = await self._xmlrpc_call( + self._common, 'authenticate', + self._db, '__system__', self._api_key, {}) + if not uid: + raise OdooAuthError('Authentication failed - check ODOO_API_KEY') + self._uid = uid + logger.info('Odoo authenticated uid=%s', uid) + except OdooAuthError: + raise + except Exception as exc: + raise OdooAuthError(f'Odoo auth error: {exc}') from exc + + async def _call(self, model, method, args, kwargs=None): + if kwargs is None: + kwargs = {} + last_exc = None + for attempt in range(3): + try: + if self._uid is None: + await self._authenticate() + return await self._xmlrpc_call( + self._models, 'execute_kw', + self._db, self._uid, self._api_key, model, method, args, kwargs) + except xmlrpc.client.Fault as exc: + if 'Session expired' in str(exc) or 'AccessDenied' in str(exc): + logger.warning('Odoo session expired, re-authenticating') + self._uid = None + await self._authenticate() + continue + raise OdooWriteError(str(exc)) from exc + except (ConnectionRefusedError, OSError, socket.timeout) as exc: + last_exc = exc + wait = 2 ** attempt + logger.warning('Odoo connection error (attempt %d/3), retrying in %ds: %s', + attempt + 1, wait, exc) + await asyncio.sleep(wait) + raise ConnectionError(f'Odoo unreachable after 3 attempts: {last_exc}') + + async def search_read(self, model, domain, fields, limit=None, order=None): + kwargs = {'fields': fields} + if limit is not None: kwargs['limit'] = limit + if order is not None: kwargs['order'] = order + return await self._call(model, 'search_read', [domain], kwargs) + + async def read(self, model, ids, fields): + return await self._call(model, 'read', [ids], {'fields': fields}) + + async def call(self, model, method, args, kwargs=None): + return await self._call(model, method, args, kwargs or {}) + + async def write(self, model, ids, values): + before_records = await self.read(model, ids, list(values.keys())) + before = {r['id']: r for r in before_records} + try: + await self._call(model, 'write', [ids, values]) + after_records = await self.read(model, ids, list(values.keys())) + after = {r['id']: r for r in after_records} + logger.info('OdooClient.write model=%s ids=%s', model, ids) + return WriteResult(True, model, ids[0] if ids else None, 'write', before, after) + except Exception as exc: + logger.error('OdooClient.write failed model=%s ids=%s: %s', model, ids, exc) + return WriteResult(False, model, ids[0] if ids else None, 'write', before, error=str(exc)) + + async def create(self, model, values): + try: + record_id = await self._call(model, 'create', [values]) + after_records = await self.read(model, [record_id], list(values.keys())) + after = after_records[0] if after_records else {} + logger.info('OdooClient.create model=%s record_id=%s', model, record_id) + return WriteResult(True, model, record_id, 'create', after=after) + except Exception as exc: + logger.error('OdooClient.create failed model=%s: %s', model, exc) + return WriteResult(False, model, None, 'create', error=str(exc)) + + async def unlink(self, model, ids): + logger.warning('OdooClient.unlink called - model=%s ids=%s. Irreversible.', model, ids) + before_records = await self.read(model, ids, ['id', 'display_name']) + before = {r['id']: r for r in before_records} + try: + await self._call(model, 'unlink', [ids]) + return WriteResult(True, model, ids[0] if ids else None, 'unlink', before) + except Exception as exc: + logger.error('OdooClient.unlink failed model=%s ids=%s: %s', model, ids, exc) + return WriteResult(False, model, ids[0] if ids else None, 'unlink', before, error=str(exc)) + + async def post_chatter(self, model, record_id, body, subtype='mail.mt_note'): + return await self._call(model, 'message_post', [[record_id]], + {'body': body, 'subtype_xmlid': subtype}) + + async def _init_pg_pool(self): + self._pg_pool = await asyncpg.create_pool( + self._pg_dsn, min_size=self._pg_pool_min, max_size=self._pg_pool_max, + max_inactive_connection_lifetime=300) + self._health_task = asyncio.create_task(self._pg_health_loop()) + logger.info('asyncpg pool created min=%d max=%d', self._pg_pool_min, self._pg_pool_max) + + async def _pg_health_loop(self): + while True: + await asyncio.sleep(60) + try: + async with self._pg_pool.acquire(timeout=10) as conn: + await conn.execute('SELECT 1') + except asyncpg.TooManyConnectionsError: + logger.critical('asyncpg pool: TooManyConnectionsError') + except Exception as exc: + logger.warning('asyncpg pool health check failed: %s', exc) + + @property + def pg_pool(self): return self._pg_pool + + async def pg_fetchrow(self, query, *args): + if not self._pg_pool: raise RuntimeError('Postgres pool not initialized') + async with self._pg_pool.acquire(timeout=10) as conn: + return await conn.fetchrow(query, *args) + + async def pg_fetch(self, query, *args): + if not self._pg_pool: raise RuntimeError('Postgres pool not initialized') + async with self._pg_pool.acquire(timeout=10) as conn: + return await conn.fetch(query, *args) + + async def pg_execute(self, query, *args): + if not self._pg_pool: raise RuntimeError('Postgres pool not initialized') + async with self._pg_pool.acquire(timeout=10) as conn: + return await conn.execute(query, *args)