Files
odoo-ai/addons/activeblue_ai/models/ab_ai_mail.py
Carlos Garcia d87f3c3e99 Non-blocking agent dispatch: run LLM call in background thread
message_post now returns immediately after collecting attachment data.
The agent HTTP call and reply posting happen in a daemon thread, so
Odoo commits the user's message and the browser confirms receipt right
away -- instead of waiting 10+ seconds for Ollama to respond.

File clarification (no LLM) still posts inline since it's instant.
The background thread opens its own DB cursor to post the bot reply.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-16 12:27:03 -04:00

275 lines
10 KiB
Python

from __future__ import annotations
import base64
import io
import logging
import re
import threading
import zipfile
import requests as _requests
from markupsafe import Markup, escape
from odoo import SUPERUSER_ID, api, registry as odoo_registry, models
_logger = logging.getLogger(__name__)
_HTML_TAG = re.compile(r'<[^>]+>')
# How many recent messages to scan when looking for a pending file upload
_LOOKBACK_MESSAGES = 10
# File type labels shown in the clarification message
_EXT_LABELS = {
'jpg': 'image', 'jpeg': 'image', 'png': 'image', 'gif': 'image',
'bmp': 'image', 'tiff': 'image', 'tif': 'image', 'webp': 'image',
'pdf': 'PDF', 'html': 'HTML', 'htm': 'HTML',
'txt': 'text', 'csv': 'spreadsheet', 'xlsx': 'spreadsheet',
'zip': 'ZIP archive',
}
def _strip_html(html: str) -> str:
return _HTML_TAG.sub(' ', html or '').strip()
def _ext(filename: str) -> str:
return filename.rsplit('.', 1)[-1].lower() if '.' in filename else ''
def _text_to_html(text: str) -> Markup:
"""Convert plain text to HTML -- escapes content, turns newlines into <br>."""
return Markup('<br>').join(Markup(escape(line)) for line in text.split('\n'))
def _describe_zip(datas_b64: str, zip_name: str) -> str:
"""Return a plain-text summary of a ZIP archive's contents."""
try:
raw = base64.b64decode(datas_b64)
with zipfile.ZipFile(io.BytesIO(raw)) as zf:
members = [m for m in zf.namelist() if not m.endswith('/')]
if not members:
return f'{zip_name} (empty archive)'
counts: dict[str, int] = {}
for m in members:
label = _EXT_LABELS.get(_ext(m), 'file')
counts[label] = counts.get(label, 0) + 1
type_summary = ', '.join(f'{n} {t}(s)' for t, n in counts.items())
lines = [f'{zip_name} -- {len(members)} item(s): {type_summary}']
for m in members[:8]:
lines.append(f' - {m}')
if len(members) > 8:
lines.append(f' ... and {len(members) - 8} more')
return '\n'.join(lines)
except Exception as exc:
_logger.warning('_describe_zip failed for %s: %s', zip_name, exc)
return f'{zip_name} (could not inspect contents)'
def _post_bot_reply(db: str, channel_id: int, bot_partner_id: int, reply_text: str):
"""Open a fresh DB cursor and post the bot reply to the channel."""
try:
with odoo_registry(db).cursor() as cr:
env = api.Environment(cr, SUPERUSER_ID, {})
env['discuss.channel'].browse(channel_id).sudo().message_post(
body=_text_to_html(reply_text),
author_id=bot_partner_id,
message_type='comment',
subtype_xmlid='mail.mt_comment',
)
cr.commit()
except Exception as exc:
_logger.error('_post_bot_reply failed channel=%s: %s', channel_id, exc)
def _agent_thread(db: str, uid: int, text: str, att_data: list,
bot_partner_id: int, channel_id: int,
bot_url: str, bot_secret: str):
"""
Background thread: calls the agent service and posts the reply.
Runs entirely outside the Odoo HTTP request so message_post returns
immediately and the user sees their message without waiting for the LLM.
"""
try:
headers = {}
if bot_secret:
headers['X-ActiveBlue-Signature'] = bot_secret
if att_data:
files = [('files', (name, data, mime)) for name, data, mime in att_data]
if not files:
files = [('files', ('empty', b'', 'text/plain'))]
form = {
'user_id': str(uid),
'message': text or 'Create an employee expense report from these receipts.',
'session_id': '',
}
resp = _requests.post(bot_url + '/upload', data=form, files=files,
headers=headers, timeout=600)
else:
payload = {'user_id': str(uid), 'message': text,
'context': {'source': 'discuss'}}
headers['Content-Type'] = 'application/json'
resp = _requests.post(bot_url + '/dispatch', json=payload,
headers=headers, timeout=600)
resp.raise_for_status()
response = resp.json()
reply_text = (response or {}).get('reply') or (response or {}).get('message') or \
'I could not process your request right now.'
except _requests.exceptions.Timeout:
reply_text = 'The request timed out. Please try again.'
except Exception as exc:
_logger.error('Agent thread error channel=%s: %s', channel_id, exc)
reply_text = 'I encountered an error. Please try again or contact your administrator.'
_post_bot_reply(db, channel_id, bot_partner_id, reply_text)
class DiscussChannel(models.Model):
_inherit = 'discuss.channel'
@api.model
def _ai_bot_partner(self):
return self.env.ref('activeblue_ai.partner_activeblue_ai', raise_if_not_found=False)
def message_post(self, *, body='', author_id=None, **kwargs):
result = super().message_post(body=body, author_id=author_id, **kwargs)
# Only intercept direct-message channels
if self.channel_type != 'chat':
return result
bot_partner = self._ai_bot_partner()
if not bot_partner:
return result
member_partners = self.channel_member_ids.partner_id
if bot_partner not in member_partners:
return result
# Don't react to the bot's own messages
if author_id == bot_partner.id:
return result
text = _strip_html(body)
_logger.info(
'AB AI mail hook: body=%r kwargs_keys=%s '
'attachment_ids_kwarg=%r result.attachment_ids=%s',
(body or '')[:80],
list(kwargs.keys()),
kwargs.get('attachment_ids'),
result.attachment_ids.ids,
)
attachments = result.attachment_ids
if not text and not attachments:
return result
# -- Case 1: file(s) with no instruction --------------------------------
# Clarification is quick (no LLM) -- post inline, no thread needed.
if attachments and not text:
self._post_file_clarification(attachments, bot_partner)
return result
# -- Case 2: text (possibly with pending files from earlier upload) -----
pending = self.env['ir.attachment'].browse()
if text and not attachments:
pending = self._find_pending_attachments(bot_partner)
effective_attachments = attachments or pending
human_partner = member_partners.filtered(lambda p: p != bot_partner)[:1]
user = self.env['res.users'].search([('partner_id', '=', human_partner.id)], limit=1)
uid = user.id if user else self.env.uid
bot = self.env['ab.ai.bot'].sudo().search([('active', '=', True)], limit=1)
if not bot:
return result
# Read everything we need from the DB NOW (current transaction) before
# the background thread starts. The thread must not touch ORM objects
# from this transaction.
db = self.env.cr.dbname
bot_url = bot._get_service_url()
bot_secret = bot.webhook_secret or ''
channel_id = self.id
bot_partner_id = bot_partner.id
att_data: list[tuple[str, bytes, str]] = []
for att in effective_attachments:
try:
data = base64.b64decode(att.datas) if att.datas else b''
att_data.append((att.name or 'attachment', data,
att.mimetype or 'application/octet-stream'))
except Exception as exc:
_logger.warning('Could not read attachment %s: %s', att.id, exc)
# Launch the agent call in a daemon thread so this message_post returns
# immediately -- the user sees their message without waiting for the LLM.
threading.Thread(
target=_agent_thread,
args=(db, uid, text, att_data, bot_partner_id, channel_id,
bot_url, bot_secret),
daemon=True,
).start()
return result
def _post_file_clarification(self, attachments, bot_partner):
"""Describe the uploaded file(s) and ask the user what to do with them."""
file_lines = []
for att in attachments:
name = att.name or 'file'
ext = _ext(name)
if ext == 'zip' and att.datas:
file_lines.append(_describe_zip(att.datas, name))
else:
label = _EXT_LABELS.get(ext, 'file')
file_lines.append(f'{name} ({label})')
file_summary = '\n'.join(file_lines)
question = (
f'I received the following file(s):\n'
f'{file_summary}\n'
f'\n'
f'What would you like me to do with them? Some options:\n'
f' - Create an expense report from these receipts\n'
f' - Import products from this data\n'
f' - Something else -- just tell me what you need'
)
self.sudo().message_post(
body=_text_to_html(question),
author_id=bot_partner.id,
message_type='comment',
subtype_xmlid='mail.mt_comment',
)
def _find_pending_attachments(self, bot_partner):
"""
Scan the last _LOOKBACK_MESSAGES messages in this channel for the most
recent human-sent message that has attachments. Only returns them if
the immediately following bot message looks like a clarification question
(i.e. the bot hasn't already acted on those files).
"""
messages = self.message_ids.sorted('date', reverse=True)[:_LOOKBACK_MESSAGES]
_bot_question_phrases = (
'what would you like me to do',
'suspected duplicate',
'skip duplicates',
'keep all',
)
prev_was_bot_question = False
for msg in messages:
is_bot = msg.author_id == bot_partner
if is_bot:
body_lower = _strip_html(msg.body or '').lower()
if any(p in body_lower for p in _bot_question_phrases):
prev_was_bot_question = True
continue
# Human message
if msg.attachment_ids and prev_was_bot_question:
return msg.attachment_ids
prev_was_bot_question = False
return self.env['ir.attachment'].browse()