Health endpoint called .ping() on both but neither implemented it, causing ollama/odoo to always show as error and the bot to stay offline. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
207 lines
8.6 KiB
Python
207 lines
8.6 KiB
Python
from __future__ import annotations
|
|
import asyncio, logging, os, socket, xmlrpc.client
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from dataclasses import dataclass, field
|
|
from typing import Any
|
|
import asyncpg
|
|
|
|
logger = logging.getLogger(__name__)
|
|
XMLRPC_TIMEOUT = int(os.environ.get('XMLRPC_TIMEOUT', '60'))
|
|
_POOL_MAX = int(os.environ.get('POSTGRES_POOL_MAX', '10'))
|
|
_executor = ThreadPoolExecutor(max_workers=max(10, _POOL_MAX * 2))
|
|
|
|
|
|
class _TimeoutTransport(xmlrpc.client.Transport):
|
|
def __init__(self, timeout=None):
|
|
super().__init__()
|
|
self._timeout = timeout or XMLRPC_TIMEOUT
|
|
def make_connection(self, host):
|
|
conn = super().make_connection(host)
|
|
conn.timeout = self._timeout
|
|
return conn
|
|
|
|
|
|
@dataclass
|
|
class WriteResult:
|
|
success: bool
|
|
model: str
|
|
record_id: object
|
|
action: str
|
|
before: dict = field(default_factory=dict)
|
|
after: dict = field(default_factory=dict)
|
|
error: object = None
|
|
|
|
|
|
class OdooAuthError(Exception): pass
|
|
class OdooWriteError(Exception): pass
|
|
|
|
|
|
class OdooClient:
|
|
def __init__(self, url, db, api_key, user='__system__', pg_dsn=None, pg_pool_min=2, pg_pool_max=10):
|
|
self._url = url.rstrip('/')
|
|
self._db = db
|
|
self._api_key = api_key
|
|
self._user = user
|
|
self._pg_dsn = pg_dsn
|
|
self._pg_pool_min = pg_pool_min
|
|
self._pg_pool_max = pg_pool_max
|
|
self._uid = None
|
|
self._auth_lock = asyncio.Lock()
|
|
self._pg_pool = None
|
|
self._health_task = None
|
|
transport = _TimeoutTransport()
|
|
self._common = xmlrpc.client.ServerProxy(
|
|
f'{self._url}/xmlrpc/2/common', transport=transport, allow_none=True)
|
|
self._models = xmlrpc.client.ServerProxy(
|
|
f'{self._url}/xmlrpc/2/object', transport=transport, allow_none=True)
|
|
|
|
async def ping(self) -> None:
|
|
"""Raise if Odoo is unreachable."""
|
|
await self._xmlrpc_call(self._common, 'version')
|
|
|
|
async def connect(self):
|
|
await self._authenticate()
|
|
if self._pg_dsn:
|
|
await self._init_pg_pool()
|
|
|
|
async def close(self):
|
|
if self._health_task:
|
|
self._health_task.cancel()
|
|
if self._pg_pool:
|
|
await self._pg_pool.close()
|
|
|
|
async def _xmlrpc_call(self, proxy, method, *args):
|
|
loop = asyncio.get_event_loop()
|
|
fn = getattr(proxy, method)
|
|
return await loop.run_in_executor(_executor, fn, *args)
|
|
|
|
async def _authenticate(self):
|
|
async with self._auth_lock:
|
|
if self._uid is not None:
|
|
return
|
|
try:
|
|
uid = await self._xmlrpc_call(
|
|
self._common, 'authenticate',
|
|
self._db, self._user, self._api_key, {})
|
|
if not uid:
|
|
raise OdooAuthError('Authentication failed - check ODOO_API_KEY')
|
|
self._uid = uid
|
|
logger.info('Odoo authenticated uid=%s', uid)
|
|
except OdooAuthError:
|
|
raise
|
|
except Exception as exc:
|
|
raise OdooAuthError(f'Odoo auth error: {exc}') from exc
|
|
|
|
async def _call(self, model, method, args, kwargs=None):
|
|
if kwargs is None:
|
|
kwargs = {}
|
|
last_exc = None
|
|
for attempt in range(3):
|
|
try:
|
|
if self._uid is None:
|
|
await self._authenticate()
|
|
return await self._xmlrpc_call(
|
|
self._models, 'execute_kw',
|
|
self._db, self._uid, self._api_key, model, method, args, kwargs)
|
|
except xmlrpc.client.Fault as exc:
|
|
if 'Session expired' in str(exc) or 'AccessDenied' in str(exc):
|
|
logger.warning('Odoo session expired, re-authenticating')
|
|
self._uid = None
|
|
await self._authenticate()
|
|
continue
|
|
raise OdooWriteError(str(exc)) from exc
|
|
except (ConnectionRefusedError, OSError, socket.timeout) as exc:
|
|
last_exc = exc
|
|
wait = 2 ** attempt
|
|
logger.warning('Odoo connection error (attempt %d/3), retrying in %ds: %s',
|
|
attempt + 1, wait, exc)
|
|
await asyncio.sleep(wait)
|
|
raise ConnectionError(f'Odoo unreachable after 3 attempts: {last_exc}')
|
|
|
|
async def search_read(self, model, domain, fields, limit=None, order=None):
|
|
kwargs = {'fields': fields}
|
|
if limit is not None: kwargs['limit'] = limit
|
|
if order is not None: kwargs['order'] = order
|
|
return await self._call(model, 'search_read', [domain], kwargs)
|
|
|
|
async def read(self, model, ids, fields):
|
|
return await self._call(model, 'read', [ids], {'fields': fields})
|
|
|
|
async def call(self, model, method, args, kwargs=None):
|
|
return await self._call(model, method, args, kwargs or {})
|
|
|
|
async def write(self, model, ids, values):
|
|
before_records = await self.read(model, ids, list(values.keys()))
|
|
before = {r['id']: r for r in before_records}
|
|
try:
|
|
await self._call(model, 'write', [ids, values])
|
|
after_records = await self.read(model, ids, list(values.keys()))
|
|
after = {r['id']: r for r in after_records}
|
|
logger.info('OdooClient.write model=%s ids=%s', model, ids)
|
|
return WriteResult(True, model, ids[0] if ids else None, 'write', before, after)
|
|
except Exception as exc:
|
|
logger.error('OdooClient.write failed model=%s ids=%s: %s', model, ids, exc)
|
|
return WriteResult(False, model, ids[0] if ids else None, 'write', before, error=str(exc))
|
|
|
|
async def create(self, model, values):
|
|
try:
|
|
record_id = await self._call(model, 'create', [values])
|
|
after_records = await self.read(model, [record_id], list(values.keys()))
|
|
after = after_records[0] if after_records else {}
|
|
logger.info('OdooClient.create model=%s record_id=%s', model, record_id)
|
|
return WriteResult(True, model, record_id, 'create', after=after)
|
|
except Exception as exc:
|
|
logger.error('OdooClient.create failed model=%s: %s', model, exc)
|
|
return WriteResult(False, model, None, 'create', error=str(exc))
|
|
|
|
async def unlink(self, model, ids):
|
|
logger.warning('OdooClient.unlink called - model=%s ids=%s. Irreversible.', model, ids)
|
|
before_records = await self.read(model, ids, ['id', 'display_name'])
|
|
before = {r['id']: r for r in before_records}
|
|
try:
|
|
await self._call(model, 'unlink', [ids])
|
|
return WriteResult(True, model, ids[0] if ids else None, 'unlink', before)
|
|
except Exception as exc:
|
|
logger.error('OdooClient.unlink failed model=%s ids=%s: %s', model, ids, exc)
|
|
return WriteResult(False, model, ids[0] if ids else None, 'unlink', before, error=str(exc))
|
|
|
|
async def post_chatter(self, model, record_id, body, subtype='mail.mt_note'):
|
|
return await self._call(model, 'message_post', [[record_id]],
|
|
{'body': body, 'subtype_xmlid': subtype})
|
|
|
|
async def _init_pg_pool(self):
|
|
self._pg_pool = await asyncpg.create_pool(
|
|
self._pg_dsn, min_size=self._pg_pool_min, max_size=self._pg_pool_max,
|
|
max_inactive_connection_lifetime=300)
|
|
self._health_task = asyncio.create_task(self._pg_health_loop())
|
|
logger.info('asyncpg pool created min=%d max=%d', self._pg_pool_min, self._pg_pool_max)
|
|
|
|
async def _pg_health_loop(self):
|
|
while True:
|
|
await asyncio.sleep(60)
|
|
try:
|
|
async with self._pg_pool.acquire(timeout=10) as conn:
|
|
await conn.execute('SELECT 1')
|
|
except asyncpg.TooManyConnectionsError:
|
|
logger.critical('asyncpg pool: TooManyConnectionsError')
|
|
except Exception as exc:
|
|
logger.warning('asyncpg pool health check failed: %s', exc)
|
|
|
|
@property
|
|
def pg_pool(self): return self._pg_pool
|
|
|
|
async def pg_fetchrow(self, query, *args):
|
|
if not self._pg_pool: raise RuntimeError('Postgres pool not initialized')
|
|
async with self._pg_pool.acquire(timeout=10) as conn:
|
|
return await conn.fetchrow(query, *args)
|
|
|
|
async def pg_fetch(self, query, *args):
|
|
if not self._pg_pool: raise RuntimeError('Postgres pool not initialized')
|
|
async with self._pg_pool.acquire(timeout=10) as conn:
|
|
return await conn.fetch(query, *args)
|
|
|
|
async def pg_execute(self, query, *args):
|
|
if not self._pg_pool: raise RuntimeError('Postgres pool not initialized')
|
|
async with self._pg_pool.acquire(timeout=10) as conn:
|
|
return await conn.execute(query, *args)
|