Files
odoo-ai/addons/activeblue_ai/models/ab_ai_mail.py
Carlos Garcia af1d27be89 feat: pre-creation confirmation step with inline duplicate warnings
Before writing any expense records the bot now posts a numbered table
of parsed vendor/amount/date for every receipt, with duplicate entries
flagged inline. User replies 'confirm' (skips dups) or 'confirm, keep
all'. This catches OCR amount misreads before they land in Odoo.

Also removes the separate awaiting_dup_approval step; duplicate review
is now part of the single confirmation table.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-16 16:54:25 -04:00

292 lines
11 KiB
Python

from __future__ import annotations
import base64
import io
import logging
import re
import threading
import zipfile
import requests as _requests
from markupsafe import Markup, escape
from odoo import SUPERUSER_ID, api, registry as odoo_registry, models
_logger = logging.getLogger(__name__)
_HTML_TAG = re.compile(r'<[^>]+>')
# How many recent messages to scan when looking for a pending file upload
_LOOKBACK_MESSAGES = 10
# File type labels shown in the clarification message
_EXT_LABELS = {
'jpg': 'image', 'jpeg': 'image', 'png': 'image', 'gif': 'image',
'bmp': 'image', 'tiff': 'image', 'tif': 'image', 'webp': 'image',
'pdf': 'PDF', 'html': 'HTML', 'htm': 'HTML',
'txt': 'text', 'csv': 'spreadsheet', 'xlsx': 'spreadsheet',
'zip': 'ZIP archive',
}
def _strip_html(html: str) -> str:
return _HTML_TAG.sub(' ', html or '').strip()
def _ext(filename: str) -> str:
return filename.rsplit('.', 1)[-1].lower() if '.' in filename else ''
def _text_to_html(text: str) -> Markup:
"""Convert plain text to HTML -- escapes content, turns newlines into <br>."""
return Markup('<br>').join(Markup(escape(line)) for line in text.split('\n'))
def _describe_zip(datas_b64: str, zip_name: str) -> str:
"""Return a plain-text summary of a ZIP archive's contents."""
try:
raw = base64.b64decode(datas_b64)
with zipfile.ZipFile(io.BytesIO(raw)) as zf:
members = [m for m in zf.namelist() if not m.endswith('/')]
if not members:
return f'{zip_name} (empty archive)'
counts: dict[str, int] = {}
for m in members:
label = _EXT_LABELS.get(_ext(m), 'file')
counts[label] = counts.get(label, 0) + 1
type_summary = ', '.join(f'{n} {t}(s)' for t, n in counts.items())
lines = [f'{zip_name} -- {len(members)} item(s): {type_summary}']
for m in members[:8]:
lines.append(f' - {m}')
if len(members) > 8:
lines.append(f' ... and {len(members) - 8} more')
return '\n'.join(lines)
except Exception as exc:
_logger.warning('_describe_zip failed for %s: %s', zip_name, exc)
return f'{zip_name} (could not inspect contents)'
def _post_bot_reply(db: str, channel_id: int, bot_partner_id: int, reply_text: str):
"""Open a fresh DB cursor and post the bot reply to the channel."""
try:
with odoo_registry(db).cursor() as cr:
env = api.Environment(cr, SUPERUSER_ID, {})
env['discuss.channel'].browse(channel_id).sudo().message_post(
body=_text_to_html(reply_text),
author_id=bot_partner_id,
message_type='comment',
subtype_xmlid='mail.mt_comment',
)
cr.commit()
except Exception as exc:
_logger.error('_post_bot_reply failed channel=%s: %s', channel_id, exc)
def _agent_thread(db: str, uid: int, text: str, att_data: list,
bot_partner_id: int, channel_id: int,
bot_url: str, bot_secret: str):
"""
Background thread: calls the agent service and posts the reply.
Runs entirely outside the Odoo HTTP request so message_post returns
immediately and the user sees their message without waiting for the LLM.
"""
try:
headers = {}
if bot_secret:
headers['X-ActiveBlue-Signature'] = bot_secret
if att_data:
files = [('files', (name, data, mime)) for name, data, mime in att_data]
if not files:
files = [('files', ('empty', b'', 'text/plain'))]
form = {
'user_id': str(uid),
'message': text or 'Create an employee expense report from these receipts.',
'session_id': '',
}
resp = _requests.post(bot_url + '/upload', data=form, files=files,
headers=headers, timeout=600)
else:
payload = {'user_id': str(uid), 'message': text,
'context': {'source': 'discuss'}}
headers['Content-Type'] = 'application/json'
resp = _requests.post(bot_url + '/dispatch', json=payload,
headers=headers, timeout=600)
resp.raise_for_status()
response = resp.json()
reply_text = (response or {}).get('reply') or (response or {}).get('message') or \
'I could not process your request right now.'
except _requests.exceptions.Timeout:
reply_text = 'The request timed out. Please try again.'
except _requests.exceptions.HTTPError as exc:
# Try to surface the server-side error detail so the user knows what failed
detail = ''
try:
detail = exc.response.json().get('detail') or ''
except Exception:
pass
_logger.error('Agent HTTP error channel=%s status=%s: %s',
channel_id, exc.response.status_code if exc.response else '?', exc)
if detail:
reply_text = f'The agent returned an error: {detail}'
else:
reply_text = (f'The agent service returned an unexpected error '
f'(HTTP {exc.response.status_code if exc.response else "?"}). '
f'Please try again or contact your administrator.')
except Exception as exc:
_logger.error('Agent thread error channel=%s: %s', channel_id, exc)
reply_text = f'I encountered an error: {exc}'
_post_bot_reply(db, channel_id, bot_partner_id, reply_text)
class DiscussChannel(models.Model):
_inherit = 'discuss.channel'
@api.model
def _ai_bot_partner(self):
return self.env.ref('activeblue_ai.partner_activeblue_ai', raise_if_not_found=False)
def message_post(self, *, body='', author_id=None, **kwargs):
result = super().message_post(body=body, author_id=author_id, **kwargs)
# Only intercept direct-message channels
if self.channel_type != 'chat':
return result
bot_partner = self._ai_bot_partner()
if not bot_partner:
return result
member_partners = self.channel_member_ids.partner_id
if bot_partner not in member_partners:
return result
# Don't react to the bot's own messages
if author_id == bot_partner.id:
return result
text = _strip_html(body)
_logger.info(
'AB AI mail hook: body=%r kwargs_keys=%s '
'attachment_ids_kwarg=%r result.attachment_ids=%s',
(body or '')[:80],
list(kwargs.keys()),
kwargs.get('attachment_ids'),
result.attachment_ids.ids,
)
attachments = result.attachment_ids
if not text and not attachments:
return result
# -- Case 1: file(s) with no instruction --------------------------------
# Clarification is quick (no LLM) -- post inline, no thread needed.
if attachments and not text:
self._post_file_clarification(attachments, bot_partner)
return result
# -- Case 2: text (possibly with pending files from earlier upload) -----
pending = self.env['ir.attachment'].browse()
if text and not attachments:
pending = self._find_pending_attachments(bot_partner)
effective_attachments = attachments or pending
human_partner = member_partners.filtered(lambda p: p != bot_partner)[:1]
user = self.env['res.users'].search([('partner_id', '=', human_partner.id)], limit=1)
uid = user.id if user else self.env.uid
bot = self.env['ab.ai.bot'].sudo().search([('active', '=', True)], limit=1)
if not bot:
return result
# Read everything we need from the DB NOW (current transaction) before
# the background thread starts. The thread must not touch ORM objects
# from this transaction.
db = self.env.cr.dbname
bot_url = bot._get_service_url()
bot_secret = bot.webhook_secret or ''
channel_id = self.id
bot_partner_id = bot_partner.id
att_data: list[tuple[str, bytes, str]] = []
for att in effective_attachments:
try:
data = base64.b64decode(att.datas) if att.datas else b''
att_data.append((att.name or 'attachment', data,
att.mimetype or 'application/octet-stream'))
except Exception as exc:
_logger.warning('Could not read attachment %s: %s', att.id, exc)
# Launch the agent call in a daemon thread so this message_post returns
# immediately -- the user sees their message without waiting for the LLM.
threading.Thread(
target=_agent_thread,
args=(db, uid, text, att_data, bot_partner_id, channel_id,
bot_url, bot_secret),
daemon=True,
).start()
return result
def _post_file_clarification(self, attachments, bot_partner):
"""Describe the uploaded file(s) and ask the user what to do with them."""
file_lines = []
for att in attachments:
name = att.name or 'file'
ext = _ext(name)
if ext == 'zip' and att.datas:
file_lines.append(_describe_zip(att.datas, name))
else:
label = _EXT_LABELS.get(ext, 'file')
file_lines.append(f'{name} ({label})')
file_summary = '\n'.join(file_lines)
question = (
f'I received the following file(s):\n'
f'{file_summary}\n'
f'\n'
f'What would you like me to do with them? Some options:\n'
f' - Create an expense report from these receipts\n'
f' - Import products from this data\n'
f' - Something else -- just tell me what you need'
)
self.sudo().message_post(
body=_text_to_html(question),
author_id=bot_partner.id,
message_type='comment',
subtype_xmlid='mail.mt_comment',
)
def _find_pending_attachments(self, bot_partner):
"""
Scan the last _LOOKBACK_MESSAGES messages in this channel for the most
recent human-sent message that has attachments. Only returns them if
the immediately following bot message looks like a clarification question
(i.e. the bot hasn't already acted on those files).
"""
messages = self.message_ids.sorted('date', reverse=True)[:_LOOKBACK_MESSAGES]
_bot_question_phrases = (
'what would you like me to do',
'suspected duplicate',
'skip duplicates',
'keep all',
'please review',
'reply "confirm"',
)
prev_was_bot_question = False
for msg in messages:
is_bot = msg.author_id == bot_partner
if is_bot:
body_lower = _strip_html(msg.body or '').lower()
if any(p in body_lower for p in _bot_question_phrases):
prev_was_bot_question = True
continue
# Human message
if msg.attachment_ids and prev_was_bot_question:
return msg.attachment_ids
prev_was_bot_question = False
return self.env['ir.attachment'].browse()