""" File scanner: discovery, per-file extraction, and all 4 duplicate detection passes. """ import hashlib import mimetypes import os import sqlite3 import subprocess import threading from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime from pathlib import Path import imagehash from PIL import Image, ExifTags, UnidentifiedImageError try: from pillow_heif import register_heif_opener register_heif_opener() except ImportError: pass from takeout import is_takeout_folder, process_takeout from gpu_hasher import get_phasher PHOTO_EXT = { ".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".tif", ".webp", ".heic", ".heif", ".raw", ".cr2", ".nef", ".arw", ".dng", ".orf", ".rw2", ".pef", ".srw", ".x3f", } VIDEO_EXT = { ".mp4", ".mov", ".avi", ".mkv", ".m4v", ".3gp", ".wmv", ".mts", ".m2ts", } SUPPORTED_EXT = PHOTO_EXT | VIDEO_EXT _DATA_DIR = Path("/data") if Path("/data").exists() else Path(__file__).parent.parent / "data" _DATA_DIR.mkdir(parents=True, exist_ok=True) DB_PATH = str(_DATA_DIR / "dupfinder.db") # Shared scan state (updated by background thread, read by status endpoint) scan_state = { "scan_id": None, "status": "idle", # idle | running | complete | error | cancelled "phase": "idle", # discovery | takeout | indexing | phash | grouping | done "progress": 0, "total": 0, "message": "", "cancel_requested": False, "stats": {}, } # ── DB helpers ──────────────────────────────────────────────────────────────── def get_db() -> sqlite3.Connection: con = sqlite3.connect(DB_PATH, timeout=30) con.row_factory = sqlite3.Row con.execute("PRAGMA journal_mode=WAL") con.execute("PRAGMA foreign_keys=ON") return con def init_db(): con = get_db() cur = con.cursor() cur.executescript(""" CREATE TABLE IF NOT EXISTS files ( id INTEGER PRIMARY KEY AUTOINCREMENT, path TEXT UNIQUE NOT NULL, filename TEXT NOT NULL, extension TEXT, file_size INTEGER, mime_type TEXT, sha256 TEXT, phash TEXT, exif_datetime TEXT, exif_device TEXT, width INTEGER, height INTEGER, is_takeout INTEGER DEFAULT 0, is_edited INTEGER DEFAULT 0, takeout_json TEXT, scan_id INTEGER, status TEXT DEFAULT 'pending', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS scans ( id INTEGER PRIMARY KEY AUTOINCREMENT, folder_path TEXT NOT NULL, started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, completed_at TIMESTAMP, total_files INTEGER DEFAULT 0, status TEXT DEFAULT 'running' ); CREATE TABLE IF NOT EXISTS duplicate_groups ( id INTEGER PRIMARY KEY AUTOINCREMENT, method TEXT NOT NULL, method_value TEXT, reviewed INTEGER DEFAULT 0, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS duplicate_members ( id INTEGER PRIMARY KEY AUTOINCREMENT, group_id INTEGER REFERENCES duplicate_groups(id) ON DELETE CASCADE, file_id INTEGER REFERENCES files(id) ON DELETE CASCADE, is_keeper INTEGER DEFAULT 0, suggested INTEGER DEFAULT 0 ); CREATE INDEX IF NOT EXISTS idx_sha256 ON files(sha256); CREATE INDEX IF NOT EXISTS idx_phash ON files(phash); CREATE INDEX IF NOT EXISTS idx_exif_dt ON files(exif_datetime, exif_device); CREATE INDEX IF NOT EXISTS idx_size_dim ON files(file_size, width, height); CREATE INDEX IF NOT EXISTS idx_status ON files(status); """) con.commit() con.close() # ── Per-file extraction ─────────────────────────────────────────────────────── def _sha256(path: str) -> str: h = hashlib.sha256() with open(path, "rb") as f: while chunk := f.read(65536): h.update(chunk) return h.hexdigest() def _exif_data(path: str) -> tuple[str | None, str | None]: """Returns (exif_datetime, exif_device) or (None, None).""" try: img = Image.open(path) exif_raw = img._getexif() if not exif_raw: return None, None exif = {ExifTags.TAGS.get(k, k): v for k, v in exif_raw.items()} dt = exif.get("DateTimeOriginal") or exif.get("DateTime") if dt: try: dt = datetime.strptime(dt, "%Y:%m:%d %H:%M:%S").strftime("%Y-%m-%dT%H:%M:%S") except ValueError: dt = None make = str(exif.get("Make", "")).strip() model = str(exif.get("Model", "")).strip() device = (make + " " + model).strip() if (make or model) else None return dt, device except Exception: return None, None def _image_dims(path: str) -> tuple[int | None, int | None]: try: with Image.open(path) as img: return img.size # (width, height) except Exception: return None, None def _phash(path: str) -> str | None: try: with Image.open(path) as img: return str(imagehash.phash(img)) except Exception: return None def _video_dims(path: str) -> tuple[int | None, int | None]: try: result = subprocess.run( [ "ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=width,height", "-of", "csv=p=0", path, ], capture_output=True, text=True, timeout=10, ) parts = result.stdout.strip().split(",") if len(parts) == 2: return int(parts[0]), int(parts[1]) except Exception: pass return None, None def _mtime_str(path: str) -> str | None: try: ts = os.path.getmtime(path) return datetime.fromtimestamp(ts).strftime("%Y-%m-%dT%H:%M:%S") except Exception: return None def extract_file(path: str) -> dict: ext = Path(path).suffix.lower() filename = Path(path).name is_photo = ext in PHOTO_EXT is_video = ext in VIDEO_EXT record = { "path": path, "filename": filename, "extension": ext, "file_size": None, "mime_type": None, "sha256": None, "phash": None, "exif_datetime": None, "exif_device": None, "width": None, "height": None, } try: record["file_size"] = os.path.getsize(path) except OSError: pass record["mime_type"] = mimetypes.guess_type(path)[0] try: record["sha256"] = _sha256(path) except OSError: pass if is_photo: w, h = _image_dims(path) record["width"], record["height"] = w, h dt, device = _exif_data(path) record["exif_datetime"] = dt or _mtime_str(path) record["exif_device"] = device # phash computed in separate phase for progress reporting elif is_video: w, h = _video_dims(path) record["width"], record["height"] = w, h record["exif_datetime"] = _mtime_str(path) return record # ── Union-Find for phash grouping ──────────────────────────────────────────── class UnionFind: def __init__(self): self.parent: dict[int, int] = {} def find(self, x: int) -> int: if x not in self.parent: self.parent[x] = x if self.parent[x] != x: self.parent[x] = self.find(self.parent[x]) return self.parent[x] def union(self, x: int, y: int): px, py = self.find(x), self.find(y) if px != py: self.parent[px] = py def groups(self) -> dict[int, list[int]]: from collections import defaultdict result: dict[int, list[int]] = defaultdict(list) for x in self.parent: result[self.find(x)].append(x) return {k: v for k, v in result.items() if len(v) >= 2} # ── Detection passes ────────────────────────────────────────────────────────── def _suggested_keeper_by_resolution(members: list[dict]) -> int: """Return file_id of highest resolution member; tie-break by size then oldest date.""" def score(m): w = m["width"] or 0 h = m["height"] or 0 size = m["file_size"] or 0 dt = m["exif_datetime"] or "9999" return (w * h, size, dt) best = max(members, key=lambda m: ( (m["width"] or 0) * (m["height"] or 0), m["file_size"] or 0, # older date = better; invert by negating epoch or use str comparison inverted )) return best["id"] def _suggested_keeper_oldest(members: list[dict]) -> int: def key(m): return m["exif_datetime"] or "9999" return min(members, key=key)["id"] def _run_sha256_pass(con: sqlite3.Connection, scan_id: int): cur = con.cursor() cur.execute(""" SELECT sha256, COUNT(*) as cnt FROM files WHERE sha256 IS NOT NULL GROUP BY sha256 HAVING cnt > 1 """) rows = cur.fetchall() for row in rows: sha = row["sha256"] cur.execute(""" SELECT id, width, height, file_size, exif_datetime FROM files WHERE sha256 = ? """, (sha,)) members = [dict(r) for r in cur.fetchall()] keeper_id = _suggested_keeper_by_resolution(members) cur.execute( "INSERT INTO duplicate_groups (method, method_value) VALUES ('sha256', ?)", (sha,), ) group_id = cur.lastrowid for m in members: cur.execute( "INSERT INTO duplicate_members (group_id, file_id, suggested) VALUES (?, ?, ?)", (group_id, m["id"], 1 if m["id"] == keeper_id else 0), ) def _run_phash_pass(con: sqlite3.Connection, scan_id: int): cur = con.cursor() # Exclude files already in sha256 groups cur.execute(""" SELECT f.id, f.phash, f.width, f.height, f.file_size, f.exif_datetime FROM files f WHERE f.phash IS NOT NULL AND f.extension NOT IN ( '.mp4','.mov','.avi','.mkv','.m4v','.3gp','.wmv','.mts','.m2ts' ) AND f.id NOT IN ( SELECT dm.file_id FROM duplicate_members dm JOIN duplicate_groups dg ON dg.id = dm.group_id WHERE dg.method = 'sha256' ) """) rows = [dict(r) for r in cur.fetchall()] if len(rows) < 2: return # Bucket by first 2 hex chars to reduce O(n²) comparisons buckets: dict[str, list[dict]] = {} for r in rows: key = r["phash"][:2] buckets.setdefault(key, []).append(r) uf = UnionFind() # Ensure all IDs are registered for r in rows: uf.find(r["id"]) THRESHOLD = 10 for bucket in buckets.values(): for i in range(len(bucket)): for j in range(i + 1, len(bucket)): a, b = bucket[i], bucket[j] try: dist = imagehash.hex_to_hash(a["phash"]) - imagehash.hex_to_hash(b["phash"]) if dist <= THRESHOLD: uf.union(a["id"], b["id"]) except Exception: pass id_map = {r["id"]: r for r in rows} for _, member_ids in uf.groups().items(): members = [id_map[mid] for mid in member_ids if mid in id_map] if len(members) < 2: continue keeper_id = _suggested_keeper_by_resolution(members) keeper = id_map[keeper_id] cur.execute( "INSERT INTO duplicate_groups (method, method_value) VALUES ('phash', ?)", (keeper["phash"],), ) group_id = cur.lastrowid for m in members: cur.execute( "INSERT INTO duplicate_members (group_id, file_id, suggested) VALUES (?, ?, ?)", (group_id, m["id"], 1 if m["id"] == keeper_id else 0), ) def _run_exif_pass(con: sqlite3.Connection, scan_id: int): cur = con.cursor() cur.execute(""" SELECT exif_datetime, exif_device, COUNT(*) as cnt FROM files WHERE exif_datetime IS NOT NULL AND exif_device IS NOT NULL AND id NOT IN ( SELECT file_id FROM duplicate_members dm JOIN duplicate_groups dg ON dg.id = dm.group_id WHERE dg.method IN ('sha256', 'phash') ) GROUP BY exif_datetime, exif_device HAVING cnt > 1 """) rows = cur.fetchall() for row in rows: dt, dev = row["exif_datetime"], row["exif_device"] cur.execute(""" SELECT id, width, height, file_size, exif_datetime FROM files WHERE exif_datetime = ? AND exif_device = ? """, (dt, dev)) members = [dict(r) for r in cur.fetchall()] keeper_id = _suggested_keeper_by_resolution(members) method_value = f"{dt}::{dev}" cur.execute( "INSERT INTO duplicate_groups (method, method_value) VALUES ('exif', ?)", (method_value,), ) group_id = cur.lastrowid for m in members: cur.execute( "INSERT INTO duplicate_members (group_id, file_id, suggested) VALUES (?, ?, ?)", (group_id, m["id"], 1 if m["id"] == keeper_id else 0), ) def _run_filesize_pass(con: sqlite3.Connection, scan_id: int): cur = con.cursor() cur.execute(""" SELECT file_size, width, height, COUNT(*) as cnt FROM files WHERE file_size IS NOT NULL AND width IS NOT NULL AND height IS NOT NULL AND id NOT IN ( SELECT file_id FROM duplicate_members dm JOIN duplicate_groups dg ON dg.id = dm.group_id WHERE dg.method IN ('sha256', 'phash', 'exif') ) GROUP BY file_size, width, height HAVING cnt > 1 """) rows = cur.fetchall() for row in rows: fs, w, h = row["file_size"], row["width"], row["height"] cur.execute(""" SELECT id, width, height, file_size, exif_datetime FROM files WHERE file_size = ? AND width = ? AND height = ? """, (fs, w, h)) members = [dict(r) for r in cur.fetchall()] keeper_id = _suggested_keeper_oldest(members) method_value = f"{fs}::{w}x{h}" cur.execute( "INSERT INTO duplicate_groups (method, method_value) VALUES ('filesize', ?)", (method_value,), ) group_id = cur.lastrowid for m in members: cur.execute( "INSERT INTO duplicate_members (group_id, file_id, suggested) VALUES (?, ?, ?)", (group_id, m["id"], 1 if m["id"] == keeper_id else 0), ) # ── Main scan entry point ───────────────────────────────────────────────────── def run_scan(folder_path: str, scan_id: int, mode: str = "incremental"): """Main scan function — runs in background thread.""" global scan_state con = get_db() cur = con.cursor() try: # ── Phase: discovery ────────────────────────────────────────────── scan_state.update(phase="discovery", progress=0, total=0, message="Discovering files...") all_files = [] for root, dirs, files in os.walk(folder_path): dirs[:] = [d for d in dirs if not d.startswith(".")] for fname in files: if fname.endswith(".json"): continue ext = Path(fname).suffix.lower() if ext in SUPPORTED_EXT: all_files.append(os.path.join(root, fname)) # Live count update every 250 files so UI doesn't look frozen if len(all_files) % 250 == 0: scan_state["message"] = f"Discovering... {len(all_files):,} files found" if scan_state["cancel_requested"]: break scan_state["total"] = len(all_files) scan_state["message"] = f"Found {len(all_files):,} files" if scan_state["cancel_requested"]: _mark_scan(cur, scan_id, "cancelled") con.commit() scan_state["status"] = "cancelled" return # ── Mode: full reset ────────────────────────────────────────────── if mode == "full_reset": cur.execute("DELETE FROM duplicate_members") cur.execute("DELETE FROM duplicate_groups") cur.execute("DELETE FROM files") con.commit() # ── Phase: takeout pre-processing ───────────────────────────────── # Detection samples ≤50 dirs so it never blocks on large libraries scan_state.update(phase="takeout", message="Checking for Google Takeout structure (sampling)...") if is_takeout_folder(folder_path): scan_state["message"] = "Processing Google Takeout sidecars..." process_takeout(folder_path, DB_PATH) else: scan_state["message"] = "Not a Takeout folder — skipping" if scan_state["cancel_requested"]: _mark_scan(cur, scan_id, "cancelled") con.commit() scan_state["status"] = "cancelled" return # ── Phase: indexing ─────────────────────────────────────────────── # I/O-bound — use a thread pool so SHA-256 reads run in parallel. # Workers: 2× CPU count, capped at 16 (good for NAS/SSD; HDDs may # benefit from tuning down via DUPFINDER_WORKERS env var). N_WORKERS = int(os.environ.get( "DUPFINDER_WORKERS", min(max((os.cpu_count() or 4) * 2, 4), 16) )) scan_state.update( phase="indexing", progress=0, message=f"Indexing files — {N_WORKERS} parallel workers..." ) # Pre-load all existing DB records in one query (avoids N per-file queries) cur.execute("SELECT path, id, file_size FROM files") existing_db: dict[str, dict] = { row["path"]: {"id": row["id"], "file_size": row["file_size"]} for row in cur.fetchall() } # Split files into "skip" (unchanged) and "process" (new or changed) to_process: list[str] = [] to_skip: list[str] = [] changed_ids: list[int] = [] # file IDs whose group memberships must be cleared for path in all_files: existing = existing_db.get(path) try: current_size = os.path.getsize(path) except OSError: continue if existing and mode in ("incremental", "new_files"): if mode == "new_files" or existing["file_size"] == current_size: to_skip.append(path) continue # File changed — clear stale group memberships changed_ids.append(existing["id"]) to_process.append(path) # Bulk-stamp skipped files with current scan_id for chunk_start in range(0, len(to_skip), 500): chunk = to_skip[chunk_start : chunk_start + 500] cur.executemany( "UPDATE files SET scan_id = ? WHERE path = ?", [(scan_id, p) for p in chunk], ) # Clear group memberships for changed files for fid in changed_ids: cur.execute("DELETE FROM duplicate_members WHERE file_id = ?", (fid,)) con.commit() scan_state["total"] = len(all_files) scan_state["progress"] = len(to_skip) # Thread-safe progress counter _lock = threading.Lock() _done = [len(to_skip)] # mutable int via list _db_queue: list[dict] = [] # records to write; drained on main thread def _index_file(path: str) -> dict | None: """Worker: extract file metadata. Returns record dict or None on error.""" try: return extract_file(path) except Exception: return None with ThreadPoolExecutor(max_workers=N_WORKERS) as pool: futures = {pool.submit(_index_file, p): p for p in to_process} for future in as_completed(futures): if scan_state["cancel_requested"]: pool.shutdown(wait=False, cancel_futures=True) _mark_scan(cur, scan_id, "cancelled") con.commit() scan_state["status"] = "cancelled" return path = futures[future] record = future.result() with _lock: _done[0] += 1 done_now = _done[0] scan_state["progress"] = done_now scan_state["message"] = ( f"Indexing ({N_WORKERS}w): {done_now:,} / {len(all_files):,}" ) if record is None: cur.execute( "INSERT OR IGNORE INTO files " " (path, filename, extension, scan_id, status) " "VALUES (?, ?, ?, ?, 'error')", (path, Path(path).name, Path(path).suffix.lower(), scan_id), ) cur.execute( "UPDATE files SET status='error', scan_id=?, " " updated_at=CURRENT_TIMESTAMP WHERE path=?", (scan_id, path), ) else: record["scan_id"] = scan_id existing = existing_db.get(path) if existing: cur.execute(""" UPDATE files SET filename=:filename, extension=:extension, file_size=:file_size, mime_type=:mime_type, sha256=:sha256, exif_datetime=:exif_datetime, exif_device=:exif_device, width=:width, height=:height, scan_id=:scan_id, status='pending', updated_at=CURRENT_TIMESTAMP WHERE path=:path """, record) else: cur.execute(""" INSERT OR IGNORE INTO files (path, filename, extension, file_size, mime_type, sha256, exif_datetime, exif_device, width, height, scan_id, status) VALUES (:path, :filename, :extension, :file_size, :mime_type, :sha256, :exif_datetime, :exif_device, :width, :height, :scan_id, 'pending') """, record) # Commit every 200 completions to keep memory in check if done_now % 200 == 0: con.commit() con.commit() # ── Phase: phash ────────────────────────────────────────────────── phasher = get_phasher() hw_label = "GPU" if phasher.using_gpu else "CPU" scan_state.update(phase="phash", progress=0, message=f"Computing perceptual hashes ({hw_label})...") cur.execute(""" SELECT id, path FROM files WHERE extension IN ( '.jpg','.jpeg','.png','.gif','.bmp','.tiff','.tif', '.webp','.heic','.heif','.raw','.cr2','.nef','.arw', '.dng','.orf','.rw2','.pef','.srw','.x3f' ) AND phash IS NULL AND status != 'error' """) photo_rows = cur.fetchall() scan_state["total"] = len(photo_rows) if photo_rows: # Build id lookup so we can write results back efficiently path_to_id = {row["path"]: row["id"] for row in photo_rows} all_paths = list(path_to_id.keys()) def _phash_progress(n_done: int): if scan_state["cancel_requested"]: return scan_state["progress"] = n_done scan_state["message"] = ( f"Phash ({hw_label}): {n_done:,} / {len(all_paths):,}" ) results = phasher.hash_files(all_paths, progress_cb=_phash_progress) # Bulk write to DB in chunks of 500 items = list(results.items()) for chunk_start in range(0, len(items), 500): if scan_state["cancel_requested"]: _mark_scan(cur, scan_id, "cancelled") con.commit() scan_state["status"] = "cancelled" return for path, ph in items[chunk_start : chunk_start + 500]: fid = path_to_id.get(path) if fid and ph: cur.execute( "UPDATE files SET phash=? WHERE id=?", (ph, fid) ) con.commit() con.commit() # ── Phase: grouping ─────────────────────────────────────────────── scan_state.update(phase="grouping", progress=0, total=4, message="Running duplicate detection...") if mode in ("incremental", "full_reset", "regroup"): cur.execute("DELETE FROM duplicate_members") cur.execute("DELETE FROM duplicate_groups") con.commit() elif mode == "new_files": # Only clear groups containing new files cur.execute(""" DELETE FROM duplicate_groups WHERE id IN ( SELECT DISTINCT dm.group_id FROM duplicate_members dm JOIN files f ON f.id = dm.file_id WHERE f.scan_id = ? ) """, (scan_id,)) con.commit() scan_state["message"] = "Pass 1/4: SHA-256 exact duplicates..." _run_sha256_pass(con, scan_id) scan_state["progress"] = 1 con.commit() scan_state["message"] = "Pass 2/4: Perceptual hash similarity..." _run_phash_pass(con, scan_id) scan_state["progress"] = 2 con.commit() scan_state["message"] = "Pass 3/4: EXIF timestamp + device..." _run_exif_pass(con, scan_id) scan_state["progress"] = 3 con.commit() scan_state["message"] = "Pass 4/4: File size + dimensions..." _run_filesize_pass(con, scan_id) scan_state["progress"] = 4 con.commit() # ── Restore keeper statuses for mode=incremental ────────────────── if mode == "incremental": # If a previously marked keeper no longer appears in any group, reset to pending cur.execute(""" UPDATE files SET status='pending' WHERE status='keeper' AND id NOT IN ( SELECT file_id FROM duplicate_members WHERE is_keeper=1 ) """) con.commit() # Update scan record cur.execute( "UPDATE scans SET completed_at=CURRENT_TIMESTAMP, total_files=?, status='complete' " "WHERE id=?", (len(all_files), scan_id), ) con.commit() scan_state.update(status="complete", phase="done", message="Scan complete.", progress=scan_state["total"]) _update_stats() except Exception as e: scan_state.update(status="error", message=str(e)) try: _mark_scan(cur, scan_id, "error") con.commit() except Exception: pass finally: con.close() def _mark_scan(cur, scan_id: int, status: str): cur.execute( "UPDATE scans SET completed_at=CURRENT_TIMESTAMP, status=? WHERE id=?", (status, scan_id), ) def _update_stats(): """Refresh stats in scan_state.""" try: con = get_db() cur = con.cursor() cur.execute("SELECT COUNT(*) FROM files WHERE status != 'error'") total_files = cur.fetchone()[0] cur.execute("SELECT COUNT(*), SUM(file_size) FROM files WHERE status='redundant'") r = cur.fetchone() dup_count = r[0] or 0 dup_size = r[1] or 0 for method in ("sha256", "phash", "exif", "filesize"): cur.execute( "SELECT COUNT(*) FROM duplicate_groups WHERE method=?", (method,) ) cur.execute(""" SELECT method, COUNT(*) as groups, (SELECT COUNT(*) FROM duplicate_members dm2 JOIN duplicate_groups dg2 ON dg2.id=dm2.group_id WHERE dg2.method=dg.method) as files FROM duplicate_groups dg GROUP BY method """) by_method = {r["method"]: {"groups": r["groups"], "files": r["files"]} for r in cur.fetchall()} cur.execute("SELECT COUNT(*) FROM duplicate_groups WHERE reviewed=1") reviewed = cur.fetchone()[0] cur.execute("SELECT COUNT(*) FROM duplicate_groups WHERE reviewed=0") pending = cur.fetchone()[0] scan_state["stats"] = { "total_files": total_files, "duplicate_files": dup_count, "duplicate_size_bytes": dup_size, "groups_by_method": by_method, "reviewed": reviewed, "pending": pending, } con.close() except Exception: pass