from __future__ import annotations import asyncio, logging, os, socket, xmlrpc.client from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass, field from typing import Any import asyncpg logger = logging.getLogger(__name__) XMLRPC_TIMEOUT = int(os.environ.get('XMLRPC_TIMEOUT', '60')) _POOL_MAX = int(os.environ.get('POSTGRES_POOL_MAX', '10')) _executor = ThreadPoolExecutor(max_workers=max(10, _POOL_MAX * 2)) class _TimeoutTransport(xmlrpc.client.Transport): def __init__(self, timeout=None): super().__init__() self._timeout = timeout or XMLRPC_TIMEOUT def make_connection(self, host): conn = super().make_connection(host) conn.timeout = self._timeout return conn @dataclass class WriteResult: success: bool model: str record_id: object action: str before: dict = field(default_factory=dict) after: dict = field(default_factory=dict) error: object = None class OdooAuthError(Exception): pass class OdooWriteError(Exception): pass class OdooClient: def __init__(self, url, db, api_key, user='__system__', pg_dsn=None, pg_pool_min=2, pg_pool_max=10): self._url = url.rstrip('/') self._db = db self._api_key = api_key self._user = user self._pg_dsn = pg_dsn self._pg_pool_min = pg_pool_min self._pg_pool_max = pg_pool_max self._uid = None self._auth_lock = asyncio.Lock() self._pg_pool = None self._health_task = None transport = _TimeoutTransport() self._common = xmlrpc.client.ServerProxy( f'{self._url}/xmlrpc/2/common', transport=transport, allow_none=True) self._models = xmlrpc.client.ServerProxy( f'{self._url}/xmlrpc/2/object', transport=transport, allow_none=True) async def ping(self) -> None: """Raise if Odoo is unreachable.""" await self._xmlrpc_call(self._common, 'version') async def connect(self): await self._authenticate() if self._pg_dsn: await self._init_pg_pool() async def close(self): if self._health_task: self._health_task.cancel() if self._pg_pool: await self._pg_pool.close() async def _xmlrpc_call(self, proxy, method, *args): loop = asyncio.get_event_loop() fn = getattr(proxy, method) return await loop.run_in_executor(_executor, fn, *args) async def _authenticate(self): async with self._auth_lock: if self._uid is not None: return try: uid = await self._xmlrpc_call( self._common, 'authenticate', self._db, self._user, self._api_key, {}) if not uid: raise OdooAuthError('Authentication failed - check ODOO_API_KEY') self._uid = uid logger.info('Odoo authenticated uid=%s', uid) except OdooAuthError: raise except Exception as exc: raise OdooAuthError(f'Odoo auth error: {exc}') from exc async def _call(self, model, method, args, kwargs=None): if kwargs is None: kwargs = {} last_exc = None for attempt in range(3): try: if self._uid is None: await self._authenticate() return await self._xmlrpc_call( self._models, 'execute_kw', self._db, self._uid, self._api_key, model, method, args, kwargs) except xmlrpc.client.Fault as exc: if 'Session expired' in str(exc) or 'AccessDenied' in str(exc): logger.warning('Odoo session expired, re-authenticating') self._uid = None await self._authenticate() continue raise OdooWriteError(str(exc)) from exc except (ConnectionRefusedError, OSError, socket.timeout) as exc: last_exc = exc wait = 2 ** attempt logger.warning('Odoo connection error (attempt %d/3), retrying in %ds: %s', attempt + 1, wait, exc) await asyncio.sleep(wait) raise ConnectionError(f'Odoo unreachable after 3 attempts: {last_exc}') async def search_read(self, model, domain, fields, limit=None, order=None): kwargs = {'fields': fields} if limit is not None: kwargs['limit'] = limit if order is not None: kwargs['order'] = order return await self._call(model, 'search_read', [domain], kwargs) async def read(self, model, ids, fields): return await self._call(model, 'read', [ids], {'fields': fields}) async def call(self, model, method, args, kwargs=None): return await self._call(model, method, args, kwargs or {}) async def write(self, model, ids, values): before_records = await self.read(model, ids, list(values.keys())) before = {r['id']: r for r in before_records} try: await self._call(model, 'write', [ids, values]) after_records = await self.read(model, ids, list(values.keys())) after = {r['id']: r for r in after_records} logger.info('OdooClient.write model=%s ids=%s', model, ids) return WriteResult(True, model, ids[0] if ids else None, 'write', before, after) except Exception as exc: logger.error('OdooClient.write failed model=%s ids=%s: %s', model, ids, exc) return WriteResult(False, model, ids[0] if ids else None, 'write', before, error=str(exc)) async def create(self, model, values): try: record_id = await self._call(model, 'create', [values]) after_records = await self.read(model, [record_id], list(values.keys())) after = after_records[0] if after_records else {} logger.info('OdooClient.create model=%s record_id=%s', model, record_id) return WriteResult(True, model, record_id, 'create', after=after) except Exception as exc: logger.error('OdooClient.create failed model=%s: %s', model, exc) return WriteResult(False, model, None, 'create', error=str(exc)) async def unlink(self, model, ids): logger.warning('OdooClient.unlink called - model=%s ids=%s. Irreversible.', model, ids) before_records = await self.read(model, ids, ['id', 'display_name']) before = {r['id']: r for r in before_records} try: await self._call(model, 'unlink', [ids]) return WriteResult(True, model, ids[0] if ids else None, 'unlink', before) except Exception as exc: logger.error('OdooClient.unlink failed model=%s ids=%s: %s', model, ids, exc) return WriteResult(False, model, ids[0] if ids else None, 'unlink', before, error=str(exc)) async def post_chatter(self, model, record_id, body, subtype='mail.mt_note'): return await self._call(model, 'message_post', [[record_id]], {'body': body, 'subtype_xmlid': subtype}) async def _init_pg_pool(self): self._pg_pool = await asyncpg.create_pool( self._pg_dsn, min_size=self._pg_pool_min, max_size=self._pg_pool_max, max_inactive_connection_lifetime=300) self._health_task = asyncio.create_task(self._pg_health_loop()) logger.info('asyncpg pool created min=%d max=%d', self._pg_pool_min, self._pg_pool_max) async def _pg_health_loop(self): while True: await asyncio.sleep(60) try: async with self._pg_pool.acquire(timeout=10) as conn: await conn.execute('SELECT 1') except asyncpg.TooManyConnectionsError: logger.critical('asyncpg pool: TooManyConnectionsError') except Exception as exc: logger.warning('asyncpg pool health check failed: %s', exc) @property def pg_pool(self): return self._pg_pool async def pg_fetchrow(self, query, *args): if not self._pg_pool: raise RuntimeError('Postgres pool not initialized') async with self._pg_pool.acquire(timeout=10) as conn: return await conn.fetchrow(query, *args) async def pg_fetch(self, query, *args): if not self._pg_pool: raise RuntimeError('Postgres pool not initialized') async with self._pg_pool.acquire(timeout=10) as conn: return await conn.fetch(query, *args) async def pg_execute(self, query, *args): if not self._pg_pool: raise RuntimeError('Postgres pool not initialized') async with self._pg_pool.acquire(timeout=10) as conn: return await conn.execute(query, *args)