feat: add Odoo XML-RPC client with connection pool and retry
This commit is contained in:
0
agent_service/tools/__init__.py
Normal file
0
agent_service/tools/__init__.py
Normal file
201
agent_service/tools/odoo_client.py
Normal file
201
agent_service/tools/odoo_client.py
Normal file
@@ -0,0 +1,201 @@
|
||||
from __future__ import annotations
|
||||
import asyncio, logging, os, socket, xmlrpc.client
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any
|
||||
import asyncpg
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
XMLRPC_TIMEOUT = int(os.environ.get('XMLRPC_TIMEOUT', '60'))
|
||||
_POOL_MAX = int(os.environ.get('POSTGRES_POOL_MAX', '10'))
|
||||
_executor = ThreadPoolExecutor(max_workers=max(10, _POOL_MAX * 2))
|
||||
|
||||
|
||||
class _TimeoutTransport(xmlrpc.client.Transport):
|
||||
def __init__(self, timeout=None):
|
||||
super().__init__()
|
||||
self._timeout = timeout or XMLRPC_TIMEOUT
|
||||
def make_connection(self, host):
|
||||
conn = super().make_connection(host)
|
||||
conn.timeout = self._timeout
|
||||
return conn
|
||||
|
||||
|
||||
@dataclass
|
||||
class WriteResult:
|
||||
success: bool
|
||||
model: str
|
||||
record_id: object
|
||||
action: str
|
||||
before: dict = field(default_factory=dict)
|
||||
after: dict = field(default_factory=dict)
|
||||
error: object = None
|
||||
|
||||
|
||||
class OdooAuthError(Exception): pass
|
||||
class OdooWriteError(Exception): pass
|
||||
|
||||
|
||||
class OdooClient:
|
||||
def __init__(self, url, db, api_key, pg_dsn=None, pg_pool_min=2, pg_pool_max=10):
|
||||
self._url = url.rstrip('/')
|
||||
self._db = db
|
||||
self._api_key = api_key
|
||||
self._pg_dsn = pg_dsn
|
||||
self._pg_pool_min = pg_pool_min
|
||||
self._pg_pool_max = pg_pool_max
|
||||
self._uid = None
|
||||
self._auth_lock = asyncio.Lock()
|
||||
self._pg_pool = None
|
||||
self._health_task = None
|
||||
transport = _TimeoutTransport()
|
||||
self._common = xmlrpc.client.ServerProxy(
|
||||
f'{self._url}/xmlrpc/2/common', transport=transport, allow_none=True)
|
||||
self._models = xmlrpc.client.ServerProxy(
|
||||
f'{self._url}/xmlrpc/2/object', transport=transport, allow_none=True)
|
||||
|
||||
async def connect(self):
|
||||
await self._authenticate()
|
||||
if self._pg_dsn:
|
||||
await self._init_pg_pool()
|
||||
|
||||
async def close(self):
|
||||
if self._health_task:
|
||||
self._health_task.cancel()
|
||||
if self._pg_pool:
|
||||
await self._pg_pool.close()
|
||||
|
||||
async def _xmlrpc_call(self, proxy, method, *args):
|
||||
loop = asyncio.get_event_loop()
|
||||
fn = getattr(proxy, method)
|
||||
return await loop.run_in_executor(_executor, fn, *args)
|
||||
|
||||
async def _authenticate(self):
|
||||
async with self._auth_lock:
|
||||
if self._uid is not None:
|
||||
return
|
||||
try:
|
||||
uid = await self._xmlrpc_call(
|
||||
self._common, 'authenticate',
|
||||
self._db, '__system__', self._api_key, {})
|
||||
if not uid:
|
||||
raise OdooAuthError('Authentication failed - check ODOO_API_KEY')
|
||||
self._uid = uid
|
||||
logger.info('Odoo authenticated uid=%s', uid)
|
||||
except OdooAuthError:
|
||||
raise
|
||||
except Exception as exc:
|
||||
raise OdooAuthError(f'Odoo auth error: {exc}') from exc
|
||||
|
||||
async def _call(self, model, method, args, kwargs=None):
|
||||
if kwargs is None:
|
||||
kwargs = {}
|
||||
last_exc = None
|
||||
for attempt in range(3):
|
||||
try:
|
||||
if self._uid is None:
|
||||
await self._authenticate()
|
||||
return await self._xmlrpc_call(
|
||||
self._models, 'execute_kw',
|
||||
self._db, self._uid, self._api_key, model, method, args, kwargs)
|
||||
except xmlrpc.client.Fault as exc:
|
||||
if 'Session expired' in str(exc) or 'AccessDenied' in str(exc):
|
||||
logger.warning('Odoo session expired, re-authenticating')
|
||||
self._uid = None
|
||||
await self._authenticate()
|
||||
continue
|
||||
raise OdooWriteError(str(exc)) from exc
|
||||
except (ConnectionRefusedError, OSError, socket.timeout) as exc:
|
||||
last_exc = exc
|
||||
wait = 2 ** attempt
|
||||
logger.warning('Odoo connection error (attempt %d/3), retrying in %ds: %s',
|
||||
attempt + 1, wait, exc)
|
||||
await asyncio.sleep(wait)
|
||||
raise ConnectionError(f'Odoo unreachable after 3 attempts: {last_exc}')
|
||||
|
||||
async def search_read(self, model, domain, fields, limit=None, order=None):
|
||||
kwargs = {'fields': fields}
|
||||
if limit is not None: kwargs['limit'] = limit
|
||||
if order is not None: kwargs['order'] = order
|
||||
return await self._call(model, 'search_read', [domain], kwargs)
|
||||
|
||||
async def read(self, model, ids, fields):
|
||||
return await self._call(model, 'read', [ids], {'fields': fields})
|
||||
|
||||
async def call(self, model, method, args, kwargs=None):
|
||||
return await self._call(model, method, args, kwargs or {})
|
||||
|
||||
async def write(self, model, ids, values):
|
||||
before_records = await self.read(model, ids, list(values.keys()))
|
||||
before = {r['id']: r for r in before_records}
|
||||
try:
|
||||
await self._call(model, 'write', [ids, values])
|
||||
after_records = await self.read(model, ids, list(values.keys()))
|
||||
after = {r['id']: r for r in after_records}
|
||||
logger.info('OdooClient.write model=%s ids=%s', model, ids)
|
||||
return WriteResult(True, model, ids[0] if ids else None, 'write', before, after)
|
||||
except Exception as exc:
|
||||
logger.error('OdooClient.write failed model=%s ids=%s: %s', model, ids, exc)
|
||||
return WriteResult(False, model, ids[0] if ids else None, 'write', before, error=str(exc))
|
||||
|
||||
async def create(self, model, values):
|
||||
try:
|
||||
record_id = await self._call(model, 'create', [values])
|
||||
after_records = await self.read(model, [record_id], list(values.keys()))
|
||||
after = after_records[0] if after_records else {}
|
||||
logger.info('OdooClient.create model=%s record_id=%s', model, record_id)
|
||||
return WriteResult(True, model, record_id, 'create', after=after)
|
||||
except Exception as exc:
|
||||
logger.error('OdooClient.create failed model=%s: %s', model, exc)
|
||||
return WriteResult(False, model, None, 'create', error=str(exc))
|
||||
|
||||
async def unlink(self, model, ids):
|
||||
logger.warning('OdooClient.unlink called - model=%s ids=%s. Irreversible.', model, ids)
|
||||
before_records = await self.read(model, ids, ['id', 'display_name'])
|
||||
before = {r['id']: r for r in before_records}
|
||||
try:
|
||||
await self._call(model, 'unlink', [ids])
|
||||
return WriteResult(True, model, ids[0] if ids else None, 'unlink', before)
|
||||
except Exception as exc:
|
||||
logger.error('OdooClient.unlink failed model=%s ids=%s: %s', model, ids, exc)
|
||||
return WriteResult(False, model, ids[0] if ids else None, 'unlink', before, error=str(exc))
|
||||
|
||||
async def post_chatter(self, model, record_id, body, subtype='mail.mt_note'):
|
||||
return await self._call(model, 'message_post', [[record_id]],
|
||||
{'body': body, 'subtype_xmlid': subtype})
|
||||
|
||||
async def _init_pg_pool(self):
|
||||
self._pg_pool = await asyncpg.create_pool(
|
||||
self._pg_dsn, min_size=self._pg_pool_min, max_size=self._pg_pool_max,
|
||||
max_inactive_connection_lifetime=300)
|
||||
self._health_task = asyncio.create_task(self._pg_health_loop())
|
||||
logger.info('asyncpg pool created min=%d max=%d', self._pg_pool_min, self._pg_pool_max)
|
||||
|
||||
async def _pg_health_loop(self):
|
||||
while True:
|
||||
await asyncio.sleep(60)
|
||||
try:
|
||||
async with self._pg_pool.acquire(timeout=10) as conn:
|
||||
await conn.execute('SELECT 1')
|
||||
except asyncpg.TooManyConnectionsError:
|
||||
logger.critical('asyncpg pool: TooManyConnectionsError')
|
||||
except Exception as exc:
|
||||
logger.warning('asyncpg pool health check failed: %s', exc)
|
||||
|
||||
@property
|
||||
def pg_pool(self): return self._pg_pool
|
||||
|
||||
async def pg_fetchrow(self, query, *args):
|
||||
if not self._pg_pool: raise RuntimeError('Postgres pool not initialized')
|
||||
async with self._pg_pool.acquire(timeout=10) as conn:
|
||||
return await conn.fetchrow(query, *args)
|
||||
|
||||
async def pg_fetch(self, query, *args):
|
||||
if not self._pg_pool: raise RuntimeError('Postgres pool not initialized')
|
||||
async with self._pg_pool.acquire(timeout=10) as conn:
|
||||
return await conn.fetch(query, *args)
|
||||
|
||||
async def pg_execute(self, query, *args):
|
||||
if not self._pg_pool: raise RuntimeError('Postgres pool not initialized')
|
||||
async with self._pg_pool.acquire(timeout=10) as conn:
|
||||
return await conn.execute(query, *args)
|
||||
Reference in New Issue
Block a user