""" File scanner: discovery, per-file extraction, and all 4 duplicate detection passes. """ import hashlib import mimetypes import os import sqlite3 import subprocess import threading from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime from pathlib import Path import imagehash from PIL import Image, ExifTags, UnidentifiedImageError try: from pillow_heif import register_heif_opener register_heif_opener() except ImportError: pass from takeout import is_takeout_folder, process_takeout from gpu_hasher import get_phasher PHOTO_EXT = { ".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".tif", ".webp", ".heic", ".heif", ".raw", ".cr2", ".nef", ".arw", ".dng", ".orf", ".rw2", ".pef", ".srw", ".x3f", } VIDEO_EXT = { ".mp4", ".mov", ".avi", ".mkv", ".m4v", ".3gp", ".wmv", ".mts", ".m2ts", } SUPPORTED_EXT = PHOTO_EXT | VIDEO_EXT _DATA_DIR = Path("/data") if Path("/data").exists() else Path(__file__).parent.parent / "data" _DATA_DIR.mkdir(parents=True, exist_ok=True) DB_PATH = str(_DATA_DIR / "dupfinder.db") # Shared scan state (updated by background thread, read by status endpoint) scan_state = { "scan_id": None, "status": "idle", # idle|running|paused|complete|error "phase": "idle", # takeout|indexing|phash|grouping|done "progress": 0, "total": 0, "message": "", "folder_path": None, # persists so resume knows where to continue "pause_requested": False, "files_indexed": 0, # cumulative across phases "phashes_done": 0, "stats": {}, } # ── DB helpers ──────────────────────────────────────────────────────────────── def get_db() -> sqlite3.Connection: con = sqlite3.connect(DB_PATH, timeout=30) con.row_factory = sqlite3.Row con.execute("PRAGMA journal_mode=WAL") con.execute("PRAGMA foreign_keys=ON") return con def log_decision(cur, file_id: int, group_id: int | None, action: str, reason: str): """Append a row to the decisions audit log. Captures the file's sha256 at decision time so a future move/delete tool can detect when a file has changed since the user reviewed it. """ cur.execute("SELECT sha256 FROM files WHERE id=?", (file_id,)) row = cur.fetchone() sha = row["sha256"] if row else None cur.execute( "INSERT INTO decisions (file_id, group_id, action, reason, sha256_at_decision) " "VALUES (?, ?, ?, ?, ?)", (file_id, group_id, action, reason, sha), ) def init_db(): con = get_db() cur = con.cursor() cur.executescript(""" CREATE TABLE IF NOT EXISTS files ( id INTEGER PRIMARY KEY AUTOINCREMENT, path TEXT UNIQUE NOT NULL, filename TEXT NOT NULL, extension TEXT, file_size INTEGER, mime_type TEXT, sha256 TEXT, phash TEXT, exif_datetime TEXT, exif_device TEXT, width INTEGER, height INTEGER, file_mtime TEXT, is_takeout INTEGER DEFAULT 0, is_edited INTEGER DEFAULT 0, takeout_json TEXT, scan_id INTEGER, status TEXT DEFAULT 'pending', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS scans ( id INTEGER PRIMARY KEY AUTOINCREMENT, folder_path TEXT NOT NULL, started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, completed_at TIMESTAMP, total_files INTEGER DEFAULT 0, files_indexed INTEGER DEFAULT 0, phashes_done INTEGER DEFAULT 0, last_phase TEXT DEFAULT 'indexing', status TEXT DEFAULT 'running' ); CREATE TABLE IF NOT EXISTS duplicate_groups ( id INTEGER PRIMARY KEY AUTOINCREMENT, method TEXT NOT NULL, method_value TEXT, reviewed INTEGER DEFAULT 0, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS duplicate_members ( id INTEGER PRIMARY KEY AUTOINCREMENT, group_id INTEGER REFERENCES duplicate_groups(id) ON DELETE CASCADE, file_id INTEGER REFERENCES files(id) ON DELETE CASCADE, is_keeper INTEGER DEFAULT 0, suggested INTEGER DEFAULT 0 ); CREATE TABLE IF NOT EXISTS decisions ( id INTEGER PRIMARY KEY AUTOINCREMENT, file_id INTEGER NOT NULL, group_id INTEGER, action TEXT NOT NULL, reason TEXT, sha256_at_decision TEXT, decided_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (file_id) REFERENCES files(id) ON DELETE CASCADE, FOREIGN KEY (group_id) REFERENCES duplicate_groups(id) ON DELETE SET NULL ); CREATE INDEX IF NOT EXISTS idx_sha256 ON files(sha256); CREATE INDEX IF NOT EXISTS idx_phash ON files(phash); CREATE INDEX IF NOT EXISTS idx_exif_dt ON files(exif_datetime, exif_device); CREATE INDEX IF NOT EXISTS idx_size_dim ON files(file_size, width, height); CREATE INDEX IF NOT EXISTS idx_status ON files(status); CREATE INDEX IF NOT EXISTS idx_decisions_file ON decisions(file_id); CREATE INDEX IF NOT EXISTS idx_decisions_group ON decisions(group_id); """) # Migration: add new columns to scans if upgrading from older schema for col, defn in [ ("files_indexed", "INTEGER DEFAULT 0"), ("phashes_done", "INTEGER DEFAULT 0"), ("last_phase", "TEXT DEFAULT 'indexing'"), ]: try: cur.execute(f"ALTER TABLE scans ADD COLUMN {col} {defn}") except Exception: pass # column already exists # Migration: file_mtime added in v1.0.3 for keeper-selection scoring try: cur.execute("ALTER TABLE files ADD COLUMN file_mtime TEXT") except Exception: pass con.commit() # ── Detect interrupted scans from previous run ──────────────────────────── # Any scan left as 'running' means the server was killed mid-scan. # Mark them 'paused' so the UI offers a resume button. cur.execute(""" UPDATE scans SET status = 'paused' WHERE status = 'running' """) con.commit() # Restore scan_state if there's a paused scan cur.execute(""" SELECT id, folder_path, files_indexed, phashes_done, last_phase FROM scans WHERE status = 'paused' ORDER BY started_at DESC LIMIT 1 """) row = cur.fetchone() if row: scan_state.update( scan_id=row["id"], status="paused", phase=row["last_phase"] or "indexing", folder_path=row["folder_path"], files_indexed=row["files_indexed"] or 0, phashes_done=row["phashes_done"] or 0, message=( f"Paused — {row['files_indexed']:,} files indexed, " f"{row['phashes_done']:,} phashes done" ), ) con.close() # ── Per-file extraction ─────────────────────────────────────────────────────── def _sha256(path: str) -> str: h = hashlib.sha256() with open(path, "rb") as f: while chunk := f.read(65536): h.update(chunk) return h.hexdigest() def _exif_data(path: str) -> tuple[str | None, str | None]: """Returns (exif_datetime, exif_device) or (None, None).""" try: img = Image.open(path) exif_raw = img._getexif() if not exif_raw: return None, None exif = {ExifTags.TAGS.get(k, k): v for k, v in exif_raw.items()} dt = exif.get("DateTimeOriginal") or exif.get("DateTime") if dt: try: dt = datetime.strptime(dt, "%Y:%m:%d %H:%M:%S").strftime("%Y-%m-%dT%H:%M:%S") except ValueError: dt = None make = str(exif.get("Make", "")).strip() model = str(exif.get("Model", "")).strip() device = (make + " " + model).strip() if (make or model) else None return dt, device except Exception: return None, None def _image_dims(path: str) -> tuple[int | None, int | None]: try: with Image.open(path) as img: return img.size # (width, height) except Exception: return None, None def _phash(path: str) -> str | None: try: with Image.open(path) as img: return str(imagehash.phash(img)) except Exception: return None def _video_dims(path: str) -> tuple[int | None, int | None]: try: result = subprocess.run( [ "ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=width,height", "-of", "csv=p=0", path, ], capture_output=True, text=True, timeout=10, ) parts = result.stdout.strip().split(",") if len(parts) == 2: return int(parts[0]), int(parts[1]) except Exception: pass return None, None def _mtime_str(path: str) -> str | None: try: ts = os.path.getmtime(path) return datetime.fromtimestamp(ts).strftime("%Y-%m-%dT%H:%M:%S") except Exception: return None def extract_file(path: str) -> dict: ext = Path(path).suffix.lower() filename = Path(path).name is_photo = ext in PHOTO_EXT is_video = ext in VIDEO_EXT record = { "path": path, "filename": filename, "extension": ext, "file_size": None, "mime_type": None, "sha256": None, "phash": None, "exif_datetime": None, "exif_device": None, "width": None, "height": None, "file_mtime": _mtime_str(path), } try: record["file_size"] = os.path.getsize(path) except OSError: pass record["mime_type"] = mimetypes.guess_type(path)[0] try: record["sha256"] = _sha256(path) except OSError: pass if is_photo: w, h = _image_dims(path) record["width"], record["height"] = w, h dt, device = _exif_data(path) record["exif_datetime"] = dt or _mtime_str(path) record["exif_device"] = device # phash computed in separate phase for progress reporting elif is_video: w, h = _video_dims(path) record["width"], record["height"] = w, h record["exif_datetime"] = _mtime_str(path) return record # ── Union-Find for phash grouping ──────────────────────────────────────────── class UnionFind: def __init__(self): self.parent: dict[int, int] = {} def find(self, x: int) -> int: if x not in self.parent: self.parent[x] = x if self.parent[x] != x: self.parent[x] = self.find(self.parent[x]) return self.parent[x] def union(self, x: int, y: int): px, py = self.find(x), self.find(y) if px != py: self.parent[px] = py def groups(self) -> dict[int, list[int]]: from collections import defaultdict result: dict[int, list[int]] = defaultdict(list) for x in self.parent: result[self.find(x)].append(x) return {k: v for k, v in result.items() if len(v) >= 2} # ── Detection passes ────────────────────────────────────────────────────────── # Explicit folder-priority ranking. Lower number = higher priority (preferred # keeper). Higher number = mark redundant. Tokens match case-insensitively as # substrings of the full path. When a path matches multiple tokens the WORST # (highest) number wins — so /photos/#recycle/MobileBackup/foo.jpg ranks as # #recycle (10), not MobileBackup (1). # # Override at runtime by writing /data/folder_priority.json: # {"priorities": {"my_folder": 5, "trash": 10}, "default": 2} _FOLDER_PRIORITY_DEFAULTS = ( ("google photos", 11), ("googlephotos", 11), ("google_photos", 11), ("google-photos", 11), ("takeout", 11), ("google takeout", 11), ("googletakeout", 11), ("google backup", 11), ("googlebackup", 11), ("google_backup", 11), ("#recycle", 10), ("photoprism", 9), ("photoprizm", 8), ("photolibrary", 7), ("albumsbackup", 6), ("organized", 5), ("moved", 4), ("random", 3), ("mobilebackup", 1), ) _FOLDER_PRIORITY_DEFAULT_BUCKET = 2 # "anything else" _folder_priority_cache: tuple[tuple[tuple[str, int], ...], int] | None = None def _load_folder_priority() -> tuple[tuple[tuple[str, int], ...], int]: """Load folder priority list from /data/folder_priority.json if present, else fall back to defaults. Cached after first call per process.""" global _folder_priority_cache if _folder_priority_cache is not None: return _folder_priority_cache entries: tuple[tuple[str, int], ...] = _FOLDER_PRIORITY_DEFAULTS default_bucket = _FOLDER_PRIORITY_DEFAULT_BUCKET try: import json path = "/data/folder_priority.json" if os.path.exists(path): with open(path) as f: data = json.load(f) entries = tuple( (k.lower(), int(v)) for k, v in (data.get("priorities") or {}).items() ) default_bucket = int(data.get("default", default_bucket)) except Exception: pass _folder_priority_cache = (entries, default_bucket) return _folder_priority_cache def _folder_priority(path: str) -> int: """Return the worst (highest) priority bucket matching any DIRECTORY segment of this path, or default. Filename basename is intentionally excluded — only folder names influence priority.""" entries, default_bucket = _load_folder_priority() if not path: return default_bucket # Split on /, drop empty segments, drop the last (filename basename). segments = [s.lower() for s in path.split("/") if s] if len(segments) <= 1: return default_bucket # no parent folder dir_segments = segments[:-1] worst: int | None = None for seg in dir_segments: for token, prio in entries: if token in seg and (worst is None or prio > worst): worst = prio return worst if worst is not None else default_bucket # Generic copy/backup signal — applies on top of explicit folder priority as a # tiebreaker. Tokens match as whole-word-ish substrings of each path segment. _DUP_FOLDER_TOKENS = ( "trash", "trashed", "dup", "dups", "duplicate", "duplicates", "backup", "backups", "copy", "copies", "old", "archive", "archived", ) def _path_penalty(path: str) -> int: """Higher = worse keeper candidate. Penalises FOLDERS (not filenames) that look like copies/backups, plus repeated segments and very deep paths.""" if not path: return 0 segments = [s for s in path.split("/") if s] if not segments: return 0 # Folder segments only — exclude filename basename dir_segments = segments[:-1] score = 0 for seg in dir_segments: low = seg.lower() for tok in _DUP_FOLDER_TOKENS: if (tok in low.split() or tok == low or f"_{tok}" in low or f"{tok}_" in low or low.startswith(tok) or low.endswith(tok)): score += 100 break # Repeated folder segments like "Desktop/Desktop/Files" suggest a nested backup seen: set[str] = set() for seg in dir_segments: low = seg.lower() if low in seen: score += 30 seen.add(low) # Slight penalty for very deep paths (originals tend to live shallower) score += max(0, len(dir_segments) - 6) * 5 return score def _suggested_keeper_by_resolution(members: list[dict]) -> int: """Return file_id of best keeper. Ranking, in order (lower wins): 1. Folder priority bucket (explicit list, e.g. #recycle = worst) 2. Highest pixel count (tie → largest file_size) 3. Lowest path penalty (Trashed/, Dups/, Backup/, deep nesting) 4. Earliest mtime (originals are usually older than their copies) 5. Earliest exif_datetime """ def res_size(m): # Negate for descending sort with min() return (-(m["width"] or 0) * (m["height"] or 0), -(m["file_size"] or 0)) def rank(m): path = m.get("path") or "" return ( _folder_priority(path), res_size(m), _path_penalty(path), m.get("file_mtime") or "9999", m.get("exif_datetime") or "9999-99-99T99:99:99", ) return min(members, key=rank)["id"] def _suggested_keeper_oldest(members: list[dict]) -> int: def key(m): return m["exif_datetime"] or "9999" return min(members, key=key)["id"] def _run_sha256_pass(con: sqlite3.Connection, scan_id: int): cur = con.cursor() cur.execute(""" SELECT sha256, COUNT(*) as cnt FROM files WHERE sha256 IS NOT NULL GROUP BY sha256 HAVING cnt > 1 """) rows = cur.fetchall() for row in rows: sha = row["sha256"] cur.execute(""" SELECT id, path, width, height, file_size, exif_datetime, file_mtime FROM files WHERE sha256 = ? """, (sha,)) members = [dict(r) for r in cur.fetchall()] keeper_id = _suggested_keeper_by_resolution(members) cur.execute( "INSERT INTO duplicate_groups (method, method_value) VALUES ('sha256', ?)", (sha,), ) group_id = cur.lastrowid for m in members: cur.execute( "INSERT INTO duplicate_members (group_id, file_id, suggested) VALUES (?, ?, ?)", (group_id, m["id"], 1 if m["id"] == keeper_id else 0), ) def _run_phash_pass(con: sqlite3.Connection, scan_id: int): cur = con.cursor() # Exclude files already in sha256 groups cur.execute(""" SELECT f.id, f.path, f.phash, f.width, f.height, f.file_size, f.exif_datetime, f.file_mtime FROM files f WHERE f.phash IS NOT NULL AND length(f.phash) = 16 AND f.extension NOT IN ( '.mp4','.mov','.avi','.mkv','.m4v','.3gp','.wmv','.mts','.m2ts' ) AND f.id NOT IN ( SELECT dm.file_id FROM duplicate_members dm JOIN duplicate_groups dg ON dg.id = dm.group_id WHERE dg.method = 'sha256' ) """) rows = [dict(r) for r in cur.fetchall()] if len(rows) < 2: return THRESHOLD = 10 # Multi-index pigeonhole: split each 64-bit phash into 16 nibble positions. # If two hashes differ by ≤K bits, at least 16-K nibble positions are # untouched, so any candidate pair shares at least one (position, nibble) # bucket. Catches pairs the previous 2-hex-prefix bucketing missed. buckets: dict[tuple[int, str], list[dict]] = {} for r in rows: for i, ch in enumerate(r["phash"]): buckets.setdefault((i, ch), []).append(r) uf = UnionFind() for r in rows: uf.find(r["id"]) hash_cache: dict[str, "imagehash.ImageHash"] = {} def _h(s: str): h = hash_cache.get(s) if h is None: h = imagehash.hex_to_hash(s) hash_cache[s] = h return h seen_pairs: set[tuple[int, int]] = set() for bucket in buckets.values(): if len(bucket) < 2: continue for i in range(len(bucket)): for j in range(i + 1, len(bucket)): a, b = bucket[i], bucket[j] pair = (a["id"], b["id"]) if a["id"] < b["id"] else (b["id"], a["id"]) if pair in seen_pairs: continue seen_pairs.add(pair) try: if _h(a["phash"]) - _h(b["phash"]) <= THRESHOLD: uf.union(a["id"], b["id"]) except Exception: pass id_map = {r["id"]: r for r in rows} for _, member_ids in uf.groups().items(): members = [id_map[mid] for mid in member_ids if mid in id_map] if len(members) < 2: continue keeper_id = _suggested_keeper_by_resolution(members) keeper = id_map[keeper_id] cur.execute( "INSERT INTO duplicate_groups (method, method_value) VALUES ('phash', ?)", (keeper["phash"],), ) group_id = cur.lastrowid for m in members: cur.execute( "INSERT INTO duplicate_members (group_id, file_id, suggested) VALUES (?, ?, ?)", (group_id, m["id"], 1 if m["id"] == keeper_id else 0), ) def _run_exif_pass(con: sqlite3.Connection, scan_id: int): cur = con.cursor() cur.execute(""" SELECT exif_datetime, exif_device, COUNT(*) as cnt FROM files WHERE exif_datetime IS NOT NULL AND exif_device IS NOT NULL AND id NOT IN ( SELECT file_id FROM duplicate_members dm JOIN duplicate_groups dg ON dg.id = dm.group_id WHERE dg.method IN ('sha256', 'phash') ) GROUP BY exif_datetime, exif_device HAVING cnt > 1 """) rows = cur.fetchall() for row in rows: dt, dev = row["exif_datetime"], row["exif_device"] cur.execute(""" SELECT id, path, width, height, file_size, exif_datetime, file_mtime FROM files WHERE exif_datetime = ? AND exif_device = ? """, (dt, dev)) members = [dict(r) for r in cur.fetchall()] keeper_id = _suggested_keeper_by_resolution(members) method_value = f"{dt}::{dev}" cur.execute( "INSERT INTO duplicate_groups (method, method_value) VALUES ('exif', ?)", (method_value,), ) group_id = cur.lastrowid for m in members: cur.execute( "INSERT INTO duplicate_members (group_id, file_id, suggested) VALUES (?, ?, ?)", (group_id, m["id"], 1 if m["id"] == keeper_id else 0), ) def _run_filesize_pass(con: sqlite3.Connection, scan_id: int): cur = con.cursor() cur.execute(""" SELECT file_size, width, height, COUNT(*) as cnt FROM files WHERE file_size IS NOT NULL AND width IS NOT NULL AND height IS NOT NULL AND id NOT IN ( SELECT file_id FROM duplicate_members dm JOIN duplicate_groups dg ON dg.id = dm.group_id WHERE dg.method IN ('sha256', 'phash', 'exif') ) GROUP BY file_size, width, height HAVING cnt > 1 """) rows = cur.fetchall() for row in rows: fs, w, h = row["file_size"], row["width"], row["height"] cur.execute(""" SELECT id, path, width, height, file_size, exif_datetime, file_mtime FROM files WHERE file_size = ? AND width = ? AND height = ? """, (fs, w, h)) members = [dict(r) for r in cur.fetchall()] # Filesize+dim is the weakest signal — folder/mtime tiebreak helps a lot here keeper_id = _suggested_keeper_by_resolution(members) method_value = f"{fs}::{w}x{h}" cur.execute( "INSERT INTO duplicate_groups (method, method_value) VALUES ('filesize', ?)", (method_value,), ) group_id = cur.lastrowid for m in members: cur.execute( "INSERT INTO duplicate_members (group_id, file_id, suggested) VALUES (?, ?, ?)", (group_id, m["id"], 1 if m["id"] == keeper_id else 0), ) # ── Pause helpers ──────────────────────────────────────────────────────────── def _save_pause_state(cur, scan_id: int, phase: str, files_indexed: int, phashes_done: int): """Persist pause progress so the scan survives a server restart.""" cur.execute(""" UPDATE scans SET status = 'paused', last_phase = ?, files_indexed = ?, phashes_done = ? WHERE id = ? """, (phase, files_indexed, phashes_done, scan_id)) # ── Main scan entry point ───────────────────────────────────────────────────── def run_scan(folder_path: str, scan_id: int, mode: str = "incremental"): """Main scan function — runs in background thread.""" global scan_state scan_state["folder_path"] = folder_path # persist so resume knows where to continue con = get_db() cur = con.cursor() try: # ── Mode: full reset ────────────────────────────────────────────── if mode == "full_reset": cur.execute("DELETE FROM duplicate_members") cur.execute("DELETE FROM duplicate_groups") cur.execute("DELETE FROM files") con.commit() # ── Phase: takeout detection (sidecar processing deferred until after # indexing — sidecars enrich existing DB rows, so files must be there). ─ scan_state.update(phase="takeout", message="Checking for Google Takeout structure...") is_takeout = is_takeout_folder(folder_path) scan_state["message"] = ( "Takeout detected — sidecars will be processed after indexing" if is_takeout else "Not a Takeout folder — skipping" ) if scan_state["pause_requested"]: _save_pause_state(cur, scan_id, "takeout", 0, 0) con.commit() scan_state.update( status="paused", pause_requested=False, message="Paused during Takeout check", ) return # ── Phases: discovery + indexing (pipelined) ────────────────────── # Workers start hashing files the instant they are discovered — # no waiting for the full directory walk to finish first. # # Workers: 2× CPU count, capped at 16. Tune via DUPFINDER_WORKERS. N_WORKERS = int(os.environ.get( "DUPFINDER_WORKERS", min(max((os.cpu_count() or 4) * 2, 4), 16) )) scan_state.update( phase="indexing", progress=0, total=0, message=f"Scanning — discovering & indexing in parallel ({N_WORKERS} workers)..." ) # Pre-load existing DB records once (avoids per-file queries) cur.execute("SELECT path, id, file_size FROM files") existing_db: dict[str, dict] = { row["path"]: {"id": row["id"], "file_size": row["file_size"]} for row in cur.fetchall() } # Shared counters (updated from multiple threads) _lock = threading.Lock() _discovered = [0] # total files found by walker so far _done = [0] # files fully indexed (skipped + processed) _walk_done = [False] _pause_at_end = False # set True when pause requested mid-walk all_files: list[str] = [] to_skip: list[str] = [] changed_ids: list[int] = [] def _index_file(path: str) -> dict | None: try: return extract_file(path) except Exception: return None def _write_result(path: str, record: dict | None, existing: dict | None): """Write one file result to DB. Called on main thread only.""" if record is None: cur.execute( "INSERT OR IGNORE INTO files " " (path, filename, extension, scan_id, status) " "VALUES (?, ?, ?, ?, 'error')", (path, Path(path).name, Path(path).suffix.lower(), scan_id), ) cur.execute( "UPDATE files SET status='error', scan_id=?, " " updated_at=CURRENT_TIMESTAMP WHERE path=?", (scan_id, path), ) else: record["scan_id"] = scan_id if existing: cur.execute(""" UPDATE files SET filename=:filename, extension=:extension, file_size=:file_size, mime_type=:mime_type, sha256=:sha256, exif_datetime=:exif_datetime, exif_device=:exif_device, width=:width, height=:height, file_mtime=:file_mtime, scan_id=:scan_id, status='pending', updated_at=CURRENT_TIMESTAMP WHERE path=:path """, record) else: cur.execute(""" INSERT OR IGNORE INTO files (path, filename, extension, file_size, mime_type, sha256, exif_datetime, exif_device, width, height, file_mtime, scan_id, status) VALUES (:path, :filename, :extension, :file_size, :mime_type, :sha256, :exif_datetime, :exif_device, :width, :height, :file_mtime, :scan_id, 'pending') """, record) with ThreadPoolExecutor(max_workers=N_WORKERS) as pool: pending: dict = {} # future → (path, existing) def _drain(limit: int = 50): """Collect up to `limit` completed futures and write to DB.""" done_futures = [f for f in list(pending) if f.done()][:limit] for f in done_futures: path, existing = pending.pop(f) _write_result(path, f.result(), existing) with _lock: _done[0] += 1 d = _done[0] disc = _discovered[0] walking = not _walk_done[0] scan_state["progress"] = d scan_state["total"] = disc scan_state["message"] = ( f"{'Discovering & i' if walking else 'I'}ndexing " f"({N_WORKERS}w): {d:,}" + (f" / {disc:,}" if not walking else f" — {disc:,} found so far") ) if done_futures and _done[0] % 200 == 0: con.commit() # ── Walk + submit ───────────────────────────────────────────── for root, dirs, files in os.walk(folder_path): dirs[:] = [d for d in dirs if not d.startswith(".")] if scan_state["pause_requested"]: _pause_at_end = True break # stop walking; in-flight futures drain normally for fname in files: if fname.endswith(".json"): continue ext = Path(fname).suffix.lower() if ext not in SUPPORTED_EXT: continue path = os.path.join(root, fname) all_files.append(path) with _lock: _discovered[0] += 1 existing = existing_db.get(path) try: current_size = os.path.getsize(path) except OSError: continue # Skip unchanged files if existing and mode in ("incremental", "new_files"): if mode == "new_files" or existing["file_size"] == current_size: to_skip.append(path) with _lock: _done[0] += 1 continue changed_ids.append(existing["id"]) # Submit to thread pool immediately future = pool.submit(_index_file, path) pending[future] = (path, existing) # Drain completed results regularly to avoid memory buildup if len(pending) >= N_WORKERS * 4: _drain(N_WORKERS * 2) # Drain after each directory _drain(20) _walk_done[0] = True # ── Bulk-stamp skipped files ────────────────────────────────── for chunk_start in range(0, len(to_skip), 500): chunk = to_skip[chunk_start : chunk_start + 500] cur.executemany( "UPDATE files SET scan_id = ? WHERE path = ?", [(scan_id, p) for p in chunk], ) for fid in changed_ids: cur.execute( "DELETE FROM duplicate_members WHERE file_id = ?", (fid,) ) con.commit() # ── Wait for remaining futures ──────────────────────────────── scan_state["total"] = len(all_files) for future in as_completed(pending): path, existing = pending[future] _write_result(path, future.result(), existing) with _lock: _done[0] += 1 d = _done[0] scan_state["progress"] = d scan_state["message"] = ( f"Indexing ({N_WORKERS}w): {d:,} / {len(all_files):,}" ) if d % 200 == 0: con.commit() con.commit() # ── Pause checkpoint: after indexing ────────────────────────────── scan_state["files_indexed"] = _done[0] if _pause_at_end: _save_pause_state(cur, scan_id, "indexing", _done[0], 0) con.commit() scan_state.update( status="paused", pause_requested=False, message=f"Paused — {_done[0]:,} files indexed", ) return # ── Takeout sidecar enrichment (now that files exist in DB) ─────── if is_takeout: scan_state.update(phase="takeout", message="Processing Google Takeout sidecars...") try: enriched = process_takeout(folder_path, DB_PATH) scan_state["message"] = f"Takeout: enriched {enriched:,} files" except Exception as exc: scan_state["message"] = f"Takeout enrichment failed: {exc}" # ── Phase: phash ────────────────────────────────────────────────── phasher = get_phasher() hw_label = "GPU" if phasher.using_gpu else "CPU" scan_state.update(phase="phash", progress=0, message=f"Computing perceptual hashes ({hw_label})...") cur.execute(""" SELECT id, path FROM files WHERE extension IN ( '.jpg','.jpeg','.png','.gif','.bmp','.tiff','.tif', '.webp','.heic','.heif','.raw','.cr2','.nef','.arw', '.dng','.orf','.rw2','.pef','.srw','.x3f' ) AND phash IS NULL AND status != 'error' """) photo_rows = cur.fetchall() scan_state["total"] = len(photo_rows) if photo_rows: path_to_id = {row["path"]: row["id"] for row in photo_rows} all_paths = list(path_to_id.keys()) # Process in chunks so pause requests are honoured between batches PHASH_CHUNK = 500 phashes_written = 0 for chunk_start in range(0, len(all_paths), PHASH_CHUNK): if scan_state["pause_requested"]: _save_pause_state( cur, scan_id, "phash", scan_state["files_indexed"], phashes_written, ) con.commit() scan_state.update( status="paused", pause_requested=False, phashes_done=phashes_written, message=( f"Paused — {phashes_written:,} / {len(all_paths):,} " "perceptual hashes computed" ), ) return chunk = all_paths[chunk_start : chunk_start + PHASH_CHUNK] chunk_results = phasher.hash_files(chunk, progress_cb=None) for path, ph in chunk_results.items(): fid = path_to_id.get(path) if fid and ph: cur.execute( "UPDATE files SET phash=? WHERE id=?", (ph, fid) ) con.commit() phashes_written += len(chunk) scan_state["phashes_done"] = phashes_written scan_state["progress"] = phashes_written scan_state["message"] = ( f"Phash ({hw_label}): {phashes_written:,} / {len(all_paths):,}" ) con.commit() # ── Phase: grouping ─────────────────────────────────────────────── scan_state.update(phase="grouping", progress=0, total=4, message="Running duplicate detection...") # Snapshot reviewed groups so we can re-apply decisions to any # post-regrouping group whose member-set is unchanged. prior_reviewed: dict[tuple[str, frozenset], int | None] = {} if mode in ("incremental", "regroup"): cur.execute(""" SELECT dg.id, dg.method, dm.file_id, dm.is_keeper FROM duplicate_groups dg JOIN duplicate_members dm ON dm.group_id = dg.id WHERE dg.reviewed = 1 """) snap: dict[int, dict] = {} for r in cur.fetchall(): g = snap.setdefault( r["id"], {"method": r["method"], "members": set(), "keeper": None}, ) g["members"].add(r["file_id"]) if r["is_keeper"]: g["keeper"] = r["file_id"] for g in snap.values(): prior_reviewed[(g["method"], frozenset(g["members"]))] = g["keeper"] if mode in ("incremental", "full_reset", "regroup"): cur.execute("DELETE FROM duplicate_members") cur.execute("DELETE FROM duplicate_groups") con.commit() elif mode == "new_files": # Only clear groups containing new files cur.execute(""" DELETE FROM duplicate_groups WHERE id IN ( SELECT DISTINCT dm.group_id FROM duplicate_members dm JOIN files f ON f.id = dm.file_id WHERE f.scan_id = ? ) """, (scan_id,)) con.commit() scan_state["message"] = "Pass 1/4: SHA-256 exact duplicates..." _run_sha256_pass(con, scan_id) scan_state["progress"] = 1 con.commit() scan_state["message"] = "Pass 2/4: Perceptual hash similarity..." _run_phash_pass(con, scan_id) scan_state["progress"] = 2 con.commit() scan_state["message"] = "Pass 3/4: EXIF timestamp + device..." _run_exif_pass(con, scan_id) scan_state["progress"] = 3 con.commit() scan_state["message"] = "Pass 4/4: File size + dimensions..." _run_filesize_pass(con, scan_id) scan_state["progress"] = 4 con.commit() # ── Re-apply prior review decisions where membership unchanged ──── if prior_reviewed: cur.execute(""" SELECT dg.id, dg.method, dm.file_id FROM duplicate_groups dg JOIN duplicate_members dm ON dm.group_id = dg.id """) new_groups: dict[int, dict] = {} for r in cur.fetchall(): g = new_groups.setdefault( r["id"], {"method": r["method"], "members": set()} ) g["members"].add(r["file_id"]) restored = 0 for gid, g in new_groups.items(): key = (g["method"], frozenset(g["members"])) if key not in prior_reviewed: continue keeper = prior_reviewed[key] cur.execute( "UPDATE duplicate_groups SET reviewed=1 WHERE id=?", (gid,) ) for fid in g["members"]: is_k = 1 if fid == keeper else 0 cur.execute( "UPDATE duplicate_members " "SET is_keeper=?, suggested=? " "WHERE group_id=? AND file_id=?", (is_k, is_k, gid, fid), ) cur.execute( "UPDATE files SET status=? WHERE id=?", ("keeper" if is_k else "redundant", fid), ) log_decision( cur, fid, gid, "keeper" if is_k else "redundant", "rescan-restore", ) restored += 1 con.commit() scan_state["message"] = f"Restored {restored:,} prior review decisions" # Reset orphaned keeper status for files no longer in any group if mode == "incremental": cur.execute(""" UPDATE files SET status='pending' WHERE status IN ('keeper', 'redundant') AND id NOT IN (SELECT file_id FROM duplicate_members) """) con.commit() # Update scan record cur.execute( "UPDATE scans SET completed_at=CURRENT_TIMESTAMP, total_files=?, status='complete' " "WHERE id=?", (len(all_files), scan_id), ) con.commit() scan_state.update(status="complete", phase="done", message="Scan complete.", progress=scan_state["total"]) _update_stats() except Exception as e: scan_state.update(status="error", message=str(e)) try: _mark_scan(cur, scan_id, "error") con.commit() except Exception: pass finally: con.close() def _mark_scan(cur, scan_id: int, status: str): cur.execute( "UPDATE scans SET completed_at=CURRENT_TIMESTAMP, status=? WHERE id=?", (status, scan_id), ) def _update_stats(): """Refresh stats in scan_state.""" try: con = get_db() cur = con.cursor() cur.execute("SELECT COUNT(*) FROM files WHERE status != 'error'") total_files = cur.fetchone()[0] cur.execute("SELECT COUNT(*), SUM(file_size) FROM files WHERE status='redundant'") r = cur.fetchone() dup_count = r[0] or 0 dup_size = r[1] or 0 for method in ("sha256", "phash", "exif", "filesize"): cur.execute( "SELECT COUNT(*) FROM duplicate_groups WHERE method=?", (method,) ) cur.execute(""" SELECT method, COUNT(*) as groups, (SELECT COUNT(*) FROM duplicate_members dm2 JOIN duplicate_groups dg2 ON dg2.id=dm2.group_id WHERE dg2.method=dg.method) as files FROM duplicate_groups dg GROUP BY method """) by_method = {r["method"]: {"groups": r["groups"], "files": r["files"]} for r in cur.fetchall()} cur.execute("SELECT COUNT(*) FROM duplicate_groups WHERE reviewed=1") reviewed = cur.fetchone()[0] cur.execute("SELECT COUNT(*) FROM duplicate_groups WHERE reviewed=0") pending = cur.fetchone()[0] scan_state["stats"] = { "total_files": total_files, "duplicate_files": dup_count, "duplicate_size_bytes": dup_size, "groups_by_method": by_method, "reviewed": reviewed, "pending": pending, } con.close() except Exception: pass