Initial commit: Odoo 18.0-20251222 extra-addons
This commit is contained in:
5
tracking_manager/models/__init__.py
Executable file
5
tracking_manager/models/__init__.py
Executable file
@@ -0,0 +1,5 @@
|
||||
from . import mail_thread
|
||||
from . import ir_model
|
||||
from . import ir_model_fields
|
||||
from . import models
|
||||
from . import mail_tracking_value
|
||||
162
tracking_manager/models/ir_model.py
Executable file
162
tracking_manager/models/ir_model.py
Executable file
@@ -0,0 +1,162 @@
|
||||
# Copyright (C) 2022 Akretion (<http://www.akretion.com>).
|
||||
# @author Kévin Roche <kevin.roche@akretion.com>
|
||||
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
|
||||
|
||||
from ast import literal_eval
|
||||
|
||||
from odoo import api, fields, models, tools
|
||||
from odoo.osv import expression
|
||||
|
||||
|
||||
class IrModel(models.Model):
|
||||
_inherit = "ir.model"
|
||||
|
||||
active_custom_tracking = fields.Boolean()
|
||||
tracked_field_count = fields.Integer(compute="_compute_tracked_field_count")
|
||||
automatic_custom_tracking = fields.Boolean(
|
||||
compute="_compute_automatic_custom_tracking",
|
||||
readonly=False,
|
||||
store=True,
|
||||
help=(
|
||||
"If marked, the fields matching the matched by the domain"
|
||||
" below will be automatically tracked for this model."
|
||||
),
|
||||
)
|
||||
automatic_custom_tracking_domain = fields.Char(
|
||||
string="Domain",
|
||||
compute="_compute_automatic_custom_tracking_domain",
|
||||
store=True,
|
||||
readonly=False,
|
||||
)
|
||||
|
||||
@tools.ormcache()
|
||||
def _get_custom_tracked_fields_per_model(self):
|
||||
models = self.sudo().search([("active_custom_tracking", "=", True)])
|
||||
return {
|
||||
model.model: model.field_id.filtered(
|
||||
lambda f, model=model: f.custom_tracking
|
||||
and self.env[model.model]._fields.get(f.name)
|
||||
).mapped("name")
|
||||
for model in models
|
||||
if model.model in self.env
|
||||
}
|
||||
|
||||
@tools.ormcache()
|
||||
def _get_model_tracked_by_o2m(self):
|
||||
"""For each model tracked due to a o2m relation
|
||||
compute the information of
|
||||
- the fields to track
|
||||
- the 'notify" field to found the related record to post the message
|
||||
return example
|
||||
{
|
||||
"res.partner.bank": {
|
||||
"fields": ["acc_holder_name", "acc_number", ...],
|
||||
"notify": [["partner_id", "bank_ids"]],
|
||||
}
|
||||
}
|
||||
"""
|
||||
self = self.sudo()
|
||||
fields = self.env["ir.model.fields"].search(
|
||||
[
|
||||
("custom_tracking", "=", True),
|
||||
("model_id.active_custom_tracking", "=", True),
|
||||
("ttype", "=", "one2many"),
|
||||
]
|
||||
)
|
||||
related_models = self.env["ir.model"].search(
|
||||
[
|
||||
("model", "in", fields.mapped("relation")),
|
||||
]
|
||||
)
|
||||
custom_tracked_fields = self._get_custom_tracked_fields_per_model()
|
||||
res = {}
|
||||
for model in related_models:
|
||||
if model.model not in self.env:
|
||||
# If the model do not exist skip it (ex: during module update)
|
||||
continue
|
||||
if model.model in custom_tracked_fields:
|
||||
tracked_fields = custom_tracked_fields[model.model]
|
||||
else:
|
||||
tracked_fields = model.field_id.filtered(
|
||||
lambda s, model=model: not s.readonly
|
||||
and not s.related
|
||||
and not s.ttype == "one2many"
|
||||
and s.name in self.env[model.model]._fields
|
||||
).mapped("name")
|
||||
res[model.model] = {"fields": tracked_fields, "notify": []}
|
||||
|
||||
for field in fields:
|
||||
model_name = field.model_id.model
|
||||
if (
|
||||
model_name in self.env
|
||||
and self.env[model_name]._fields.get(field.name)
|
||||
and field.relation in res
|
||||
):
|
||||
res[field.relation]["notify"].append(
|
||||
[self.env[model_name]._fields[field.name].inverse_name, field.name]
|
||||
)
|
||||
return res
|
||||
|
||||
@api.depends("active_custom_tracking")
|
||||
def _compute_automatic_custom_tracking(self):
|
||||
for record in self:
|
||||
record.automatic_custom_tracking = False
|
||||
|
||||
def _default_automatic_custom_tracking_domain_rules(self):
|
||||
return {
|
||||
"product.product": [
|
||||
("readonly", "=", False),
|
||||
"|",
|
||||
("ttype", "!=", "one2many"),
|
||||
("name", "in", ["barcode_ids"]),
|
||||
],
|
||||
"sale.order": [
|
||||
("readonly", "=", False),
|
||||
"|",
|
||||
("ttype", "!=", "one2many"),
|
||||
("name", "in", ["order_line"]),
|
||||
],
|
||||
"account.move": [
|
||||
("readonly", "=", False),
|
||||
"|",
|
||||
("ttype", "!=", "one2many"),
|
||||
("name", "in", ["invoice_line_ids"]),
|
||||
],
|
||||
"default_automatic_rule": [
|
||||
("ttype", "!=", "one2many"),
|
||||
("readonly", "=", False),
|
||||
],
|
||||
}
|
||||
|
||||
@api.depends("automatic_custom_tracking")
|
||||
def _compute_automatic_custom_tracking_domain(self):
|
||||
rules = self._default_automatic_custom_tracking_domain_rules()
|
||||
for record in self:
|
||||
automatic_custom_tracking_domain = rules.get(record.model) or rules.get(
|
||||
"default_automatic_rule", []
|
||||
)
|
||||
automatic_custom_tracking_domain = expression.AND(
|
||||
[automatic_custom_tracking_domain, [("model", "=", record.model)]]
|
||||
)
|
||||
record.automatic_custom_tracking_domain = str(
|
||||
automatic_custom_tracking_domain
|
||||
)
|
||||
|
||||
def update_custom_tracking(self):
|
||||
for record in self:
|
||||
fields = record.field_id.filtered("trackable").filtered_domain(
|
||||
literal_eval(record.automatic_custom_tracking_domain)
|
||||
)
|
||||
fields.write({"custom_tracking": True})
|
||||
untrack_fields = record.field_id - fields
|
||||
untrack_fields.write({"custom_tracking": False})
|
||||
|
||||
@api.depends("field_id.custom_tracking")
|
||||
def _compute_tracked_field_count(self):
|
||||
for rec in self:
|
||||
rec.tracked_field_count = len(rec.field_id.filtered("custom_tracking"))
|
||||
|
||||
def write(self, vals):
|
||||
if "active_custom_tracking" in vals:
|
||||
self.env.registry.clear_cache()
|
||||
return super().write(vals)
|
||||
62
tracking_manager/models/ir_model_fields.py
Executable file
62
tracking_manager/models/ir_model_fields.py
Executable file
@@ -0,0 +1,62 @@
|
||||
# Copyright (C) 2022 Akretion (<http://www.akretion.com>).
|
||||
# @author Kévin Roche <kevin.roche@akretion.com>
|
||||
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
|
||||
|
||||
from ast import literal_eval
|
||||
|
||||
from odoo import api, fields, models
|
||||
|
||||
|
||||
class IrModelFields(models.Model):
|
||||
_inherit = "ir.model.fields"
|
||||
|
||||
custom_tracking = fields.Boolean(
|
||||
compute="_compute_custom_tracking",
|
||||
store=True,
|
||||
readonly=False,
|
||||
)
|
||||
native_tracking = fields.Boolean(
|
||||
compute="_compute_native_tracking",
|
||||
store=True,
|
||||
)
|
||||
trackable = fields.Boolean(
|
||||
compute="_compute_trackable",
|
||||
store=True,
|
||||
)
|
||||
|
||||
@api.depends("native_tracking")
|
||||
def _compute_custom_tracking(self):
|
||||
for record in self:
|
||||
if record.model_id.automatic_custom_tracking:
|
||||
domain = literal_eval(record.model_id.automatic_custom_tracking_domain)
|
||||
record.custom_tracking = bool(record.filtered_domain(domain))
|
||||
else:
|
||||
record.custom_tracking = record.native_tracking
|
||||
|
||||
@api.depends("tracking")
|
||||
def _compute_native_tracking(self):
|
||||
for record in self:
|
||||
record.native_tracking = bool(record.tracking)
|
||||
|
||||
@api.depends("related", "store")
|
||||
def _compute_trackable(self):
|
||||
blacklists = [
|
||||
"activity_ids",
|
||||
"message_ids",
|
||||
"message_last_post",
|
||||
"message_main_attachment",
|
||||
"message_main_attachement_id",
|
||||
]
|
||||
|
||||
for rec in self:
|
||||
rec.trackable = rec.name not in blacklists and rec.store and not rec.related
|
||||
|
||||
def write(self, vals):
|
||||
custom_tracking = None
|
||||
if "custom_tracking" in vals:
|
||||
self.env.registry.clear_cache()
|
||||
self.check_access("write")
|
||||
custom_tracking = vals.pop("custom_tracking")
|
||||
self._write({"custom_tracking": custom_tracking})
|
||||
self.invalidate_model(fnames=["custom_tracking"])
|
||||
return super().write(vals)
|
||||
17
tracking_manager/models/mail_thread.py
Executable file
17
tracking_manager/models/mail_thread.py
Executable file
@@ -0,0 +1,17 @@
|
||||
# Copyright 2022 Akretion (https://www.akretion.com).
|
||||
# @author Kévin Roche <kevin.roche@akretion.com>
|
||||
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).
|
||||
|
||||
from odoo import models, tools
|
||||
|
||||
|
||||
class MailThread(models.AbstractModel):
|
||||
_inherit = "mail.thread"
|
||||
|
||||
@tools.ormcache("self.env.uid", "self.env.su")
|
||||
def _track_get_fields(self):
|
||||
fields_per_models = self.env["ir.model"]._get_custom_tracked_fields_per_model()
|
||||
if self._name in fields_per_models:
|
||||
return set(self.fields_get(fields_per_models[self._name]))
|
||||
else:
|
||||
return super()._track_get_fields()
|
||||
68
tracking_manager/models/mail_tracking_value.py
Executable file
68
tracking_manager/models/mail_tracking_value.py
Executable file
@@ -0,0 +1,68 @@
|
||||
# Copyright 2025 Tecnativa - Víctor Martínez
|
||||
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
|
||||
from odoo import api, models
|
||||
from odoo.tools import html2plaintext
|
||||
|
||||
|
||||
class MailTracking(models.Model):
|
||||
_inherit = "mail.tracking.value"
|
||||
|
||||
# TODO: Remove if merged https://github.com/odoo/odoo/pull/156236
|
||||
def _create_tracking_values_property(
|
||||
self, initial_value, new_value, col_name, col_info, record
|
||||
):
|
||||
field = self.env["ir.model.fields"]._get(record._name, col_name)
|
||||
field_info = {
|
||||
"desc": f"{field.field_description}: {col_info['string']}",
|
||||
"name": col_name,
|
||||
"type": col_info["type"],
|
||||
}
|
||||
if col_info["type"] in ("many2one", "many2many"):
|
||||
comodel = self.env[col_info["comodel"]]
|
||||
initial_value = comodel.browse(initial_value) if initial_value else False
|
||||
new_value = comodel.browse(new_value) if new_value else False
|
||||
values = self.env["mail.tracking.value"]._create_tracking_values(
|
||||
initial_value, new_value, col_name, col_info, record
|
||||
)
|
||||
del values["field_id"]
|
||||
return {**values, "field_info": field_info}
|
||||
|
||||
@api.model
|
||||
def _create_tracking_values(
|
||||
self, initial_value, new_value, col_name, col_info, record
|
||||
):
|
||||
try:
|
||||
return super()._create_tracking_values(
|
||||
initial_value, new_value, col_name, col_info, record
|
||||
)
|
||||
except NotImplementedError:
|
||||
if col_info["type"] == "html":
|
||||
field = self.env["ir.model.fields"]._get(record._name, col_name)
|
||||
values = {"field_id": field.id}
|
||||
values.update(
|
||||
{
|
||||
"old_value_char": html2plaintext(initial_value) or "",
|
||||
"new_value_char": html2plaintext(new_value) or "",
|
||||
}
|
||||
)
|
||||
return values
|
||||
elif col_info["type"] == "properties":
|
||||
# TODO: Remove if merged https://github.com/odoo/odoo/pull/156236
|
||||
# A return is necessary to avoid the NotImplementedError error
|
||||
field = self.env["ir.model.fields"]._get(record._name, col_name)
|
||||
return {"field_id": field.id}
|
||||
elif col_info["type"] == "tags":
|
||||
# TODO: Remove if merged https://github.com/odoo/odoo/pull/156236
|
||||
field = self.env["ir.model.fields"]._get(record._name, col_name)
|
||||
return {
|
||||
"field_id": field.id,
|
||||
"old_value_char": (
|
||||
", ".join(value for value in initial_value)
|
||||
if initial_value
|
||||
else ""
|
||||
),
|
||||
"new_value_char": (
|
||||
", ".join(value for value in new_value) if new_value else ""
|
||||
),
|
||||
}
|
||||
raise
|
||||
224
tracking_manager/models/models.py
Executable file
224
tracking_manager/models/models.py
Executable file
@@ -0,0 +1,224 @@
|
||||
# Copyright 2023 Akretion (https://www.akretion.com).
|
||||
# @author Sébastien BEAU <sebastien.beau@akretion.com>
|
||||
# Copyright 2025 Tecnativa - Víctor Martínez
|
||||
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).
|
||||
|
||||
|
||||
from collections import defaultdict
|
||||
|
||||
from odoo import Command, api, models, tools
|
||||
from odoo.exceptions import AccessError
|
||||
|
||||
from ..tools import format_m2m
|
||||
|
||||
# To avoid conflict with other module and avoid too long function name
|
||||
# specific tracking_manager method are prefixed with _tm
|
||||
|
||||
|
||||
class Base(models.AbstractModel):
|
||||
_inherit = "base"
|
||||
|
||||
@tools.ormcache()
|
||||
def is_tracked_by_o2m(self):
|
||||
return self._name in self.env["ir.model"]._get_model_tracked_by_o2m()
|
||||
|
||||
def _tm_get_fields_to_notify(self):
|
||||
return (
|
||||
self.env["ir.model"]
|
||||
._get_model_tracked_by_o2m()
|
||||
.get(self._name, {})
|
||||
.get("notify", [])
|
||||
)
|
||||
|
||||
def _tm_get_fields_to_track(self):
|
||||
# We track manually
|
||||
# all fields that belong to a model tracked via a one2many
|
||||
# all the many2many fields
|
||||
return (
|
||||
self.env["ir.model"]
|
||||
._get_model_tracked_by_o2m()
|
||||
.get(self._name, {})
|
||||
.get("fields", [])
|
||||
)
|
||||
|
||||
def _tm_notify_owner(self, mode, changes=None):
|
||||
"""Notify all model that have a one2many linked to the record changed"""
|
||||
self.ensure_one()
|
||||
data = self.env.cr.precommit.data.setdefault(
|
||||
"tracking.manager.data",
|
||||
defaultdict(lambda: defaultdict(lambda: defaultdict(list))),
|
||||
)
|
||||
for field_name, owner_field_name in self._tm_get_fields_to_notify():
|
||||
owner = self[field_name]
|
||||
# Skip processing if the owner is not a valid Odoo recordset or is empty.
|
||||
if not (owner and isinstance(owner, models.BaseModel)):
|
||||
continue
|
||||
data[owner._name][owner.id][owner_field_name].append(
|
||||
{
|
||||
"mode": mode,
|
||||
"record": self.display_name,
|
||||
"changes": changes,
|
||||
}
|
||||
)
|
||||
|
||||
def _tm_get_field_description(self, field_name):
|
||||
return self._fields[field_name].get_description(self.env)["string"]
|
||||
|
||||
def _tm_get_changes(self, values):
|
||||
self.ensure_one()
|
||||
changes = []
|
||||
for field_name, before in values.items():
|
||||
field = self._fields[field_name]
|
||||
if before != self[field_name]:
|
||||
if field.type == "many2many":
|
||||
old = format_m2m(before)
|
||||
new = format_m2m(self[field_name])
|
||||
elif field.type == "many2one":
|
||||
old = before.display_name
|
||||
new = self[field_name]["display_name"]
|
||||
else:
|
||||
old = before
|
||||
new = self[field_name]
|
||||
changes.append(
|
||||
{
|
||||
"name": self._tm_get_field_description(field_name),
|
||||
"old": old,
|
||||
"new": new,
|
||||
}
|
||||
)
|
||||
return changes
|
||||
|
||||
def _tm_post_message(self, data):
|
||||
for model_name, model_data in data.items():
|
||||
# check if record has mail.thread mixin
|
||||
if not getattr(self.env[model_name], "message_post_with_source", False):
|
||||
continue
|
||||
for record_id, messages_by_field in model_data.items():
|
||||
# Avoid error if no record is linked (example: child_ids of res.partner)
|
||||
if not record_id:
|
||||
continue
|
||||
record = self.env[model_name].browse(record_id)
|
||||
messages = [
|
||||
{
|
||||
"name": record._tm_get_field_description(field_name),
|
||||
"messages": messages,
|
||||
}
|
||||
for field_name, messages in messages_by_field.items()
|
||||
]
|
||||
# We do not use message_post_with_view() because emails would be sent
|
||||
rendered_template = self.env["ir.qweb"]._render(
|
||||
"tracking_manager.track_o2m_m2m_template",
|
||||
{"lines": messages, "object": record},
|
||||
minimal_qcontext=True,
|
||||
)
|
||||
record._message_log(body=rendered_template)
|
||||
|
||||
def _tm_prepare_o2m_tracking(self):
|
||||
fnames = self._tm_get_fields_to_track()
|
||||
if not fnames:
|
||||
return
|
||||
self.env.cr.precommit.add(self._tm_finalize_o2m_tracking)
|
||||
initial_values = self.env.cr.precommit.data.setdefault(
|
||||
f"tracking.manager.before.{self._name}", {}
|
||||
)
|
||||
for record in self:
|
||||
values = initial_values.setdefault(record.id, {})
|
||||
if values is not None:
|
||||
for fname in fnames:
|
||||
try:
|
||||
values.setdefault(fname, record[fname])
|
||||
except AccessError:
|
||||
# User does not have access to the field (example with groups)
|
||||
continue
|
||||
|
||||
def _tm_finalize_o2m_tracking(self):
|
||||
initial_values = self.env.cr.precommit.data.pop(
|
||||
f"tracking.manager.before.{self._name}", {}
|
||||
)
|
||||
for _id, values in initial_values.items():
|
||||
# Always use sudo in case that the record have been modified using sudo
|
||||
record = self.sudo().browse(_id)
|
||||
if not record.exists():
|
||||
# if a record have been modify and then deleted
|
||||
# it's not need to track the change so skip it
|
||||
continue
|
||||
changes = record._tm_get_changes(values)
|
||||
if changes:
|
||||
record._tm_notify_owner("update", changes)
|
||||
data = self.env.cr.precommit.data.pop("tracking.manager.data", {})
|
||||
self._tm_post_message(data)
|
||||
self.flush_model()
|
||||
|
||||
def _tm_track_create_unlink(self, mode):
|
||||
self.env.cr.precommit.add(self._tm_finalize_o2m_tracking)
|
||||
for record in self:
|
||||
record._tm_notify_owner(mode)
|
||||
|
||||
def write(self, vals):
|
||||
if self.is_tracked_by_o2m():
|
||||
self._tm_prepare_o2m_tracking()
|
||||
return super().write(vals)
|
||||
|
||||
@api.model_create_multi
|
||||
def create(self, list_vals):
|
||||
records = super().create(list_vals)
|
||||
if self.is_tracked_by_o2m():
|
||||
records._tm_track_create_unlink("create")
|
||||
return records
|
||||
|
||||
def unlink(self):
|
||||
if self.is_tracked_by_o2m():
|
||||
self._tm_track_create_unlink("unlink")
|
||||
return super().unlink()
|
||||
|
||||
# TODO: Remove if merged https://github.com/odoo/odoo/pull/156236
|
||||
def _mail_track(self, tracked_fields, initial_values):
|
||||
_tracked_fields = tracked_fields
|
||||
tracked_fields_properties = {}
|
||||
for tf_key in list(_tracked_fields.keys()):
|
||||
tracked_field = tracked_fields[tf_key]
|
||||
if tracked_field["type"] == "properties":
|
||||
tracked_fields_properties[tf_key] = tracked_field
|
||||
updated, tracking_value_ids = super()._mail_track(
|
||||
tracked_fields, initial_values
|
||||
)
|
||||
# Remove unnecessary tracking_value_ids
|
||||
tracking_value_ids_keys_to_delete = []
|
||||
for tf_key in list(tracked_fields_properties.keys()):
|
||||
field = self.env["ir.model.fields"]._get(self._name, tf_key)
|
||||
for key, vals in enumerate(tracking_value_ids):
|
||||
if vals[2]["field_id"] == field.id:
|
||||
tracking_value_ids_keys_to_delete.append(key)
|
||||
for key in tracking_value_ids_keys_to_delete:
|
||||
tracking_value_ids.pop(key)
|
||||
# Extra things for properties
|
||||
for col_name, _sequence in self._mail_track_order_fields(
|
||||
tracked_fields_properties
|
||||
):
|
||||
if col_name not in initial_values:
|
||||
continue
|
||||
initial_value, new_value = initial_values[col_name], self[col_name]
|
||||
if new_value == initial_value or (not new_value and not initial_value):
|
||||
continue
|
||||
p_keys = list(initial_value.keys()) if initial_value else []
|
||||
properties_data = {}
|
||||
for definition in self.read([col_name])[0][col_name]:
|
||||
properties_data[definition["name"]] = definition
|
||||
tracking_value_ids.extend(
|
||||
Command.create(
|
||||
self.env["mail.tracking.value"]._create_tracking_values_property(
|
||||
initial_value[p_key],
|
||||
new_value[p_key],
|
||||
col_name,
|
||||
properties_data[p_key],
|
||||
self,
|
||||
),
|
||||
)
|
||||
for p_key in p_keys
|
||||
if (
|
||||
p_key in properties_data
|
||||
and properties_data[p_key]["type"] != "separator"
|
||||
and initial_value[p_key] != new_value[p_key]
|
||||
)
|
||||
)
|
||||
return updated, tracking_value_ids
|
||||
Reference in New Issue
Block a user