Durable failure logging for failed AI calls

familylaw.ai.client:
- _log_failure_durably(vals): writes a 'failed' ai.task in an INDEPENDENT
  transaction (self.env.registry.cursor() + commit) so the failure survives the
  rollback of the calling transaction (the audit/cost ledger keeps a record of
  failed calls, not just successful ones). Exceptions inside are swallowed so audit
  logging can never mask the original error.
- _purge_task_ids(ids): delete ai.task rows in an independent transaction (prune old
  failure logs).
- generate() failure path now routes to _log_failure_durably with full metadata
  (task_type, provider, model, case/proceeding, error, latency) and still re-raises.

Tests (familylaw_step6): test_11 verifies the failure-row write logic + case linkage
(registry cursor pointed at the test txn, since TransactionCase forbids real commits);
test_12 spies that generate routes provider errors to durable logging with correct
vals; test_07 asserts the error still propagates.

Verified in live Odoo 18: 200 tests, 0 failed, 0 errors, clean log.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
tocmo0nlord
2026-06-02 05:48:57 +00:00
parent 6448491e76
commit 96ee6792fd
2 changed files with 110 additions and 14 deletions

View File

@@ -201,15 +201,26 @@ class FamilyLawAIClient(models.AbstractModel):
start = time.time() start = time.time()
try: try:
result = self._call_provider(provider, model, messages, max_tokens, system) result = self._call_provider(provider, model, messages, max_tokens, system)
except Exception as exc: # noqa: BLE001 — best-effort mark + re-raise except Exception as exc: # noqa: BLE001 — durably log + re-raise
# NOTE: in the synchronous path this write is rolled back with the # The in-transaction `task` (state 'running') rolls back with the failing
# failing transaction (the error propagates to the caller / queue_job). # transaction, so we record the failure in an INDEPENDENT transaction
# Durable failure logging would need an independent cursor — a future # (durable). The error still propagates — nothing downstream proceeds on
# refinement. The critical guarantee is that the error is NEVER silently # a failed/empty result.
# swallowed: it propagates so nothing downstream proceeds on bad output. self._log_failure_durably({
task.write({"state": "failed", "error_message": str(exc), "name": task.name,
"latency_ms": int((time.time() - start) * 1000)}) "task_type": task_type,
_logger.warning("AI task %s failed: %s", task.id, exc) "provider": provider,
"model_used": model,
"token_ceiling": max_tokens,
"state": "failed",
"case_id": case.id if case else False,
"proceeding_id": proceeding.id if proceeding else False,
"error_message": str(exc),
"latency_ms": int((time.time() - start) * 1000),
"request_summary": (messages[-1]["content"][:500] if messages else ""),
})
_logger.warning("AI task '%s' failed (durably logged): %s",
task_type, exc)
raise raise
latency = int((time.time() - start) * 1000) latency = int((time.time() - start) * 1000)
@@ -226,3 +237,31 @@ class FamilyLawAIClient(models.AbstractModel):
"response_summary": (result.get("text") or "")[:1000], "response_summary": (result.get("text") or "")[:1000],
}) })
return result, task return result, task
def _log_failure_durably(self, vals):
"""Record a failed ai.task in an INDEPENDENT transaction so the failure
survives the rollback of the calling transaction (audit/cost ledger).
In test mode the registry cursor shares the test transaction and commit is a
savepoint, so the row is visible to the test and cleaned up with it. Audit
logging must never mask the real error, so all exceptions here are swallowed."""
try:
with self.env.registry.cursor() as new_cr:
new_env = api.Environment(new_cr, self.env.uid, self.env.context)
new_env["familylaw.ai.task"].create(vals)
new_cr.commit()
except Exception: # noqa: BLE001 — never mask the original failure
_logger.exception("Could not durably log AI task failure")
def _purge_task_ids(self, task_ids):
"""Delete ai.task rows in an independent transaction (e.g. pruning old
failure logs, or test cleanup of durably-committed rows)."""
if not task_ids:
return
try:
with self.env.registry.cursor() as new_cr:
new_env = api.Environment(new_cr, self.env.uid, self.env.context)
new_env["familylaw.ai.task"].browse(task_ids).unlink()
new_cr.commit()
except Exception: # noqa: BLE001
_logger.exception("Could not purge ai.task rows %s", task_ids)

View File

@@ -14,6 +14,7 @@ never hit the network. We assert on what gets WRITTEN TO ODOO:
* a provider error marks the task failed. * a provider error marks the task failed.
""" """
from contextlib import contextmanager
from unittest.mock import patch from unittest.mock import patch
from odoo.tests.common import TransactionCase, new_test_user, tagged from odoo.tests.common import TransactionCase, new_test_user, tagged
@@ -127,20 +128,76 @@ class TestStep6AIClient(TransactionCase):
# --- provider failure PROPAGATES (never silently swallowed) ------------- # --- provider failure PROPAGATES (never silently swallowed) -------------
def test_07_provider_error_propagates(self): def test_07_provider_error_propagates(self):
"""The critical safety guarantee: a provider error is re-raised so nothing """A provider error is re-raised so nothing downstream proceeds on a
downstream proceeds on a failed/empty result. (The in-transaction 'failed' failed/empty result."""
ledger write is rolled back with the failing txn — durable failure logging
is a noted future refinement, not a safety guarantee.)"""
def _boom(*a, **k): def _boom(*a, **k):
raise ValueError("network down") raise ValueError("network down")
with patch(CLIENT + "._call_provider", _boom): # patch durable logging to a no-op here — its real (separate-cursor) path
# can't see the test's uncommitted case; this test only asserts propagation.
with patch(CLIENT + "._call_provider", _boom), \
patch(CLIENT + "._log_failure_durably", lambda *a, **k: None):
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
self.env["familylaw.ai.client"].generate( self.env["familylaw.ai.client"].generate(
"draft_document", [{"role": "user", "content": "hi"}], "draft_document", [{"role": "user", "content": "hi"}],
case=self.case, case=self.case,
) )
# --- durable failure logging --------------------------------------------
def test_11_durable_failure_log_writes_row(self):
"""_log_failure_durably writes a 'failed' ai.task with the given metadata.
Production uses an independent cursor so the row survives rollback of the
caller; Odoo's TransactionCase neuters real commits for isolation, so here we
point the registry cursor at the test transaction to verify the WRITE LOGIC
(fields + case linkage). The separate-transaction durability is the standard
Odoo new-cursor pattern."""
@contextmanager
def _test_cursor():
yield self.env.cr # reuse the test transaction (sees the test's case)
marker = "boom-durable"
# neutralize the real commit() — forbidden inside a test transaction
with patch.object(self.env.registry, "cursor", _test_cursor), \
patch.object(self.env.cr, "commit", lambda *a, **k: None):
self.env["familylaw.ai.client"]._log_failure_durably({
"name": "AI: draft_document",
"task_type": "draft_document",
"provider": "anthropic",
"model_used": "claude-sonnet-4-6",
"state": "failed",
"case_id": self.case.id,
"error_message": marker,
"latency_ms": 5,
})
task = self.Task.search([("state", "=", "failed"),
("error_message", "=", marker)])
self.assertTrue(task, "durable failure row should be written")
self.assertEqual(task.case_id, self.case)
def test_12_generate_durably_logs_on_error(self):
"""On a provider error, generate routes the failure to durable logging with
the right metadata (and still re-raises)."""
captured = []
def _boom(*a, **k):
raise ValueError("network down")
def _spy(self2, vals):
captured.append(vals)
with patch(CLIENT + "._call_provider", _boom), \
patch(CLIENT + "._log_failure_durably", _spy):
with self.assertRaises(ValueError):
self.env["familylaw.ai.client"].generate(
"draft_document", [{"role": "user", "content": "hi"}],
case=self.case,
)
self.assertEqual(len(captured), 1)
self.assertEqual(captured[0]["state"], "failed")
self.assertIn("network down", captured[0]["error_message"])
self.assertEqual(captured[0]["case_id"], self.case.id)
# --- cost estimate is deterministic ------------------------------------- # --- cost estimate is deterministic -------------------------------------
def test_08_cost_estimate(self): def test_08_cost_estimate(self):
client = self.env["familylaw.ai.client"] client = self.env["familylaw.ai.client"]