Fix correctness bugs in scanner and reset endpoint

- Defer Takeout sidecar enrichment until after indexing so its UPDATE
  statements actually match rows. Previously it ran first and silently
  no-op'd on the very first scan because no files existed in the DB yet.

- Preserve user review decisions across incremental and regroup rescans.
  The grouping phase wipes duplicate_groups/duplicate_members, which
  also wiped reviewed=1 / is_keeper flags. Now snapshots reviewed groups
  by (method, frozenset of member file_ids) before the wipe and re-applies
  them to any post-regrouping group whose member set is unchanged.

- Replace 2-hex-char phash bucketing with multi-index pigeonhole
  (16 nibble buckets per hash). At threshold=10, the previous bucketing
  missed any near-duplicate pair that differed in the first byte, since
  they landed in different buckets and were never compared. Caches
  imagehash.hex_to_hash() per phash and dedups pair comparisons.

- Rewrite _suggested_keeper_by_resolution: previous implementation had
  a dead inner score() function and the lambda was missing the date
  tie-breaker (left as a TODO comment). Now picks largest pixels, ties
  by file size, then by oldest exif_datetime.

- Filter phash candidates to length(phash)=16 to skip malformed hashes
  rather than relying on the silent except in the comparison loop.

- Reject /api/scan/reset while a scan is running. Resetting mid-scan
  wiped tables the running scan thread was still writing to.

- Also clears stale 'redundant' file status (not just 'keeper') when
  a file no longer appears in any group after regrouping.
This commit is contained in:
Carlos
2026-04-24 00:42:13 -04:00
parent 356f922940
commit 3001be3a92
2 changed files with 121 additions and 33 deletions

View File

@@ -223,6 +223,10 @@ def scan_resume():
def scan_reset(confirm: str = Query("")):
if confirm != "RESET":
raise HTTPException(400, "Pass ?confirm=RESET to confirm")
if sc.scan_state["status"] == "running":
raise HTTPException(
400, "A scan is currently running — pause it before resetting"
)
con = get_db()
cur = con.cursor()
cur.execute("DELETE FROM duplicate_members")

View File

@@ -326,20 +326,16 @@ class UnionFind:
# ── Detection passes ──────────────────────────────────────────────────────────
def _suggested_keeper_by_resolution(members: list[dict]) -> int:
"""Return file_id of highest resolution member; tie-break by size then oldest date."""
def score(m):
w = m["width"] or 0
h = m["height"] or 0
size = m["file_size"] or 0
dt = m["exif_datetime"] or "9999"
return (w * h, size, dt)
"""Return file_id of best keeper: largest pixels, tie-break by file size,
final tie-break by oldest exif_datetime (likely the original)."""
def res_size(m):
return ((m["width"] or 0) * (m["height"] or 0), m["file_size"] or 0)
best = max(members, key=lambda m: (
(m["width"] or 0) * (m["height"] or 0),
m["file_size"] or 0,
# older date = better; invert by negating epoch or use str comparison inverted
))
return best["id"]
top = max(res_size(m) for m in members)
tied = [m for m in members if res_size(m) == top]
return min(
tied, key=lambda m: m.get("exif_datetime") or "9999-99-99T99:99:99"
)["id"]
def _suggested_keeper_oldest(members: list[dict]) -> int:
@@ -387,6 +383,7 @@ def _run_phash_pass(con: sqlite3.Connection, scan_id: int):
SELECT f.id, f.phash, f.width, f.height, f.file_size, f.exif_datetime
FROM files f
WHERE f.phash IS NOT NULL
AND length(f.phash) = 16
AND f.extension NOT IN (
'.mp4','.mov','.avi','.mkv','.m4v','.3gp','.wmv','.mts','.m2ts'
)
@@ -401,25 +398,43 @@ def _run_phash_pass(con: sqlite3.Connection, scan_id: int):
if len(rows) < 2:
return
# Bucket by first 2 hex chars to reduce O(n²) comparisons
buckets: dict[str, list[dict]] = {}
THRESHOLD = 10
# Multi-index pigeonhole: split each 64-bit phash into 16 nibble positions.
# If two hashes differ by ≤K bits, at least 16-K nibble positions are
# untouched, so any candidate pair shares at least one (position, nibble)
# bucket. Catches pairs the previous 2-hex-prefix bucketing missed.
buckets: dict[tuple[int, str], list[dict]] = {}
for r in rows:
key = r["phash"][:2]
buckets.setdefault(key, []).append(r)
for i, ch in enumerate(r["phash"]):
buckets.setdefault((i, ch), []).append(r)
uf = UnionFind()
# Ensure all IDs are registered
for r in rows:
uf.find(r["id"])
THRESHOLD = 10
hash_cache: dict[str, "imagehash.ImageHash"] = {}
def _h(s: str):
h = hash_cache.get(s)
if h is None:
h = imagehash.hex_to_hash(s)
hash_cache[s] = h
return h
seen_pairs: set[tuple[int, int]] = set()
for bucket in buckets.values():
if len(bucket) < 2:
continue
for i in range(len(bucket)):
for j in range(i + 1, len(bucket)):
a, b = bucket[i], bucket[j]
pair = (a["id"], b["id"]) if a["id"] < b["id"] else (b["id"], a["id"])
if pair in seen_pairs:
continue
seen_pairs.add(pair)
try:
dist = imagehash.hex_to_hash(a["phash"]) - imagehash.hex_to_hash(b["phash"])
if dist <= THRESHOLD:
if _h(a["phash"]) - _h(b["phash"]) <= THRESHOLD:
uf.union(a["id"], b["id"])
except Exception:
pass
@@ -552,14 +567,15 @@ def run_scan(folder_path: str, scan_id: int, mode: str = "incremental"):
cur.execute("DELETE FROM files")
con.commit()
# ── Phase: takeout check (quick sample, ≤50 dirs) ─────────────────
# ── Phase: takeout detection (sidecar processing deferred until after
# indexing — sidecars enrich existing DB rows, so files must be there). ─
scan_state.update(phase="takeout",
message="Checking for Google Takeout structure...")
if is_takeout_folder(folder_path):
scan_state["message"] = "Processing Google Takeout sidecars..."
process_takeout(folder_path, DB_PATH)
else:
scan_state["message"] = "Not a Takeout folder — skipping"
is_takeout = is_takeout_folder(folder_path)
scan_state["message"] = (
"Takeout detected — sidecars will be processed after indexing"
if is_takeout else "Not a Takeout folder — skipping"
)
if scan_state["pause_requested"]:
_save_pause_state(cur, scan_id, "takeout", 0, 0)
@@ -760,6 +776,16 @@ def run_scan(folder_path: str, scan_id: int, mode: str = "incremental"):
)
return
# ── Takeout sidecar enrichment (now that files exist in DB) ───────
if is_takeout:
scan_state.update(phase="takeout",
message="Processing Google Takeout sidecars...")
try:
enriched = process_takeout(folder_path, DB_PATH)
scan_state["message"] = f"Takeout: enriched {enriched:,} files"
except Exception as exc:
scan_state["message"] = f"Takeout enrichment failed: {exc}"
# ── Phase: phash ──────────────────────────────────────────────────
phasher = get_phasher()
hw_label = "GPU" if phasher.using_gpu else "CPU"
@@ -826,6 +852,28 @@ def run_scan(folder_path: str, scan_id: int, mode: str = "incremental"):
scan_state.update(phase="grouping", progress=0, total=4,
message="Running duplicate detection...")
# Snapshot reviewed groups so we can re-apply decisions to any
# post-regrouping group whose member-set is unchanged.
prior_reviewed: dict[tuple[str, frozenset], int | None] = {}
if mode in ("incremental", "regroup"):
cur.execute("""
SELECT dg.id, dg.method, dm.file_id, dm.is_keeper
FROM duplicate_groups dg
JOIN duplicate_members dm ON dm.group_id = dg.id
WHERE dg.reviewed = 1
""")
snap: dict[int, dict] = {}
for r in cur.fetchall():
g = snap.setdefault(
r["id"],
{"method": r["method"], "members": set(), "keeper": None},
)
g["members"].add(r["file_id"])
if r["is_keeper"]:
g["keeper"] = r["file_id"]
for g in snap.values():
prior_reviewed[(g["method"], frozenset(g["members"]))] = g["keeper"]
if mode in ("incremental", "full_reset", "regroup"):
cur.execute("DELETE FROM duplicate_members")
cur.execute("DELETE FROM duplicate_groups")
@@ -861,15 +909,51 @@ def run_scan(folder_path: str, scan_id: int, mode: str = "incremental"):
scan_state["progress"] = 4
con.commit()
# ── Restore keeper statuses for mode=incremental ──────────────────
# ── Re-apply prior review decisions where membership unchanged ────
if prior_reviewed:
cur.execute("""
SELECT dg.id, dg.method, dm.file_id
FROM duplicate_groups dg
JOIN duplicate_members dm ON dm.group_id = dg.id
""")
new_groups: dict[int, dict] = {}
for r in cur.fetchall():
g = new_groups.setdefault(
r["id"], {"method": r["method"], "members": set()}
)
g["members"].add(r["file_id"])
restored = 0
for gid, g in new_groups.items():
key = (g["method"], frozenset(g["members"]))
if key not in prior_reviewed:
continue
keeper = prior_reviewed[key]
cur.execute(
"UPDATE duplicate_groups SET reviewed=1 WHERE id=?", (gid,)
)
for fid in g["members"]:
is_k = 1 if fid == keeper else 0
cur.execute(
"UPDATE duplicate_members "
"SET is_keeper=?, suggested=? "
"WHERE group_id=? AND file_id=?",
(is_k, is_k, gid, fid),
)
cur.execute(
"UPDATE files SET status=? WHERE id=?",
("keeper" if is_k else "redundant", fid),
)
restored += 1
con.commit()
scan_state["message"] = f"Restored {restored:,} prior review decisions"
# Reset orphaned keeper status for files no longer in any group
if mode == "incremental":
# If a previously marked keeper no longer appears in any group, reset to pending
cur.execute("""
UPDATE files SET status='pending'
WHERE status='keeper'
AND id NOT IN (
SELECT file_id FROM duplicate_members WHERE is_keeper=1
)
WHERE status IN ('keeper', 'redundant')
AND id NOT IN (SELECT file_id FROM duplicate_members)
""")
con.commit()