Files
duplicate-finder/app/scanner.py
Carlos 7436b23db3 Stage 2 #1: SFTP destinations CRUD + connection test
Foundation for the move/quarantine pipeline. Lets users register one or
more remote SFTP destinations through the API, store credentials at rest
under /data/sftp/{id}.{password|key} (mode 600), and verify connectivity
+ write access via a test endpoint.

Endpoints:
  GET    /api/sftp/destinations
  POST   /api/sftp/destinations             — create
  PUT    /api/sftp/destinations/{id}        — update
  DELETE /api/sftp/destinations/{id}
  POST   /api/sftp/destinations/{id}/test   — connect, stat base_path, mkdir probe
  POST   /api/sftp/keypair                  — generate ED25519 keypair

Host keys pinned per-destination on first connect (TOFU); subsequent
mismatches are rejected. paramiko added to requirements.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-26 20:04:42 -04:00

1229 lines
46 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
File scanner: discovery, per-file extraction, and all 4 duplicate detection passes.
"""
import hashlib
import mimetypes
import os
import sqlite3
import subprocess
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from pathlib import Path
import imagehash
from PIL import Image, ExifTags, UnidentifiedImageError
try:
from pillow_heif import register_heif_opener
register_heif_opener()
except ImportError:
pass
from takeout import is_takeout_folder, process_takeout
from gpu_hasher import get_phasher
PHOTO_EXT = {
".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".tif",
".webp", ".heic", ".heif", ".raw", ".cr2", ".nef", ".arw",
".dng", ".orf", ".rw2", ".pef", ".srw", ".x3f",
}
VIDEO_EXT = {
".mp4", ".mov", ".avi", ".mkv", ".m4v", ".3gp",
".wmv", ".mts", ".m2ts",
}
SUPPORTED_EXT = PHOTO_EXT | VIDEO_EXT
_DATA_DIR = Path("/data") if Path("/data").exists() else Path(__file__).parent.parent / "data"
_DATA_DIR.mkdir(parents=True, exist_ok=True)
DB_PATH = str(_DATA_DIR / "dupfinder.db")
# Shared scan state (updated by background thread, read by status endpoint)
scan_state = {
"scan_id": None,
"status": "idle", # idle|running|paused|complete|error
"phase": "idle", # takeout|indexing|phash|grouping|done
"progress": 0,
"total": 0,
"message": "",
"folder_path": None, # persists so resume knows where to continue
"pause_requested": False,
"files_indexed": 0, # cumulative across phases
"phashes_done": 0,
"stats": {},
}
# ── DB helpers ────────────────────────────────────────────────────────────────
def get_db() -> sqlite3.Connection:
con = sqlite3.connect(DB_PATH, timeout=30)
con.row_factory = sqlite3.Row
con.execute("PRAGMA journal_mode=WAL")
con.execute("PRAGMA foreign_keys=ON")
return con
def log_decision(cur, file_id: int, group_id: int | None, action: str, reason: str):
"""Append a row to the decisions audit log.
Captures the file's sha256 at decision time so a future move/delete tool
can detect when a file has changed since the user reviewed it.
"""
cur.execute("SELECT sha256 FROM files WHERE id=?", (file_id,))
row = cur.fetchone()
sha = row["sha256"] if row else None
cur.execute(
"INSERT INTO decisions (file_id, group_id, action, reason, sha256_at_decision) "
"VALUES (?, ?, ?, ?, ?)",
(file_id, group_id, action, reason, sha),
)
def init_db():
con = get_db()
cur = con.cursor()
cur.executescript("""
CREATE TABLE IF NOT EXISTS files (
id INTEGER PRIMARY KEY AUTOINCREMENT,
path TEXT UNIQUE NOT NULL,
filename TEXT NOT NULL,
extension TEXT,
file_size INTEGER,
mime_type TEXT,
sha256 TEXT,
phash TEXT,
exif_datetime TEXT,
exif_device TEXT,
width INTEGER,
height INTEGER,
file_mtime TEXT,
is_takeout INTEGER DEFAULT 0,
is_edited INTEGER DEFAULT 0,
takeout_json TEXT,
scan_id INTEGER,
status TEXT DEFAULT 'pending',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS scans (
id INTEGER PRIMARY KEY AUTOINCREMENT,
folder_path TEXT NOT NULL,
started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
completed_at TIMESTAMP,
total_files INTEGER DEFAULT 0,
files_indexed INTEGER DEFAULT 0,
phashes_done INTEGER DEFAULT 0,
last_phase TEXT DEFAULT 'indexing',
status TEXT DEFAULT 'running'
);
CREATE TABLE IF NOT EXISTS duplicate_groups (
id INTEGER PRIMARY KEY AUTOINCREMENT,
method TEXT NOT NULL,
method_value TEXT,
reviewed INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS duplicate_members (
id INTEGER PRIMARY KEY AUTOINCREMENT,
group_id INTEGER REFERENCES duplicate_groups(id) ON DELETE CASCADE,
file_id INTEGER REFERENCES files(id) ON DELETE CASCADE,
is_keeper INTEGER DEFAULT 0,
suggested INTEGER DEFAULT 0
);
CREATE TABLE IF NOT EXISTS sftp_destinations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL,
host TEXT NOT NULL,
port INTEGER NOT NULL DEFAULT 22,
username TEXT NOT NULL,
auth_method TEXT NOT NULL, -- 'password' | 'key'
base_path TEXT NOT NULL,
mirror_structure INTEGER NOT NULL DEFAULT 1,
enabled INTEGER NOT NULL DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_tested_at TIMESTAMP,
last_test_result TEXT
);
CREATE TABLE IF NOT EXISTS decisions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
file_id INTEGER NOT NULL,
group_id INTEGER,
action TEXT NOT NULL,
reason TEXT,
sha256_at_decision TEXT,
decided_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (file_id) REFERENCES files(id) ON DELETE CASCADE,
FOREIGN KEY (group_id) REFERENCES duplicate_groups(id) ON DELETE SET NULL
);
CREATE INDEX IF NOT EXISTS idx_sha256 ON files(sha256);
CREATE INDEX IF NOT EXISTS idx_phash ON files(phash);
CREATE INDEX IF NOT EXISTS idx_exif_dt ON files(exif_datetime, exif_device);
CREATE INDEX IF NOT EXISTS idx_size_dim ON files(file_size, width, height);
CREATE INDEX IF NOT EXISTS idx_status ON files(status);
CREATE INDEX IF NOT EXISTS idx_decisions_file ON decisions(file_id);
CREATE INDEX IF NOT EXISTS idx_decisions_group ON decisions(group_id);
""")
# Migration: add new columns to scans if upgrading from older schema
for col, defn in [
("files_indexed", "INTEGER DEFAULT 0"),
("phashes_done", "INTEGER DEFAULT 0"),
("last_phase", "TEXT DEFAULT 'indexing'"),
]:
try:
cur.execute(f"ALTER TABLE scans ADD COLUMN {col} {defn}")
except Exception:
pass # column already exists
# Migration: file_mtime added in v1.0.3 for keeper-selection scoring
try:
cur.execute("ALTER TABLE files ADD COLUMN file_mtime TEXT")
except Exception:
pass
con.commit()
# ── Detect interrupted scans from previous run ────────────────────────────
# Any scan left as 'running' means the server was killed mid-scan.
# Mark them 'paused' so the UI offers a resume button.
cur.execute("""
UPDATE scans SET status = 'paused'
WHERE status = 'running'
""")
con.commit()
# Restore scan_state if there's a paused scan
cur.execute("""
SELECT id, folder_path, files_indexed, phashes_done, last_phase
FROM scans WHERE status = 'paused'
ORDER BY started_at DESC LIMIT 1
""")
row = cur.fetchone()
if row:
scan_state.update(
scan_id=row["id"],
status="paused",
phase=row["last_phase"] or "indexing",
folder_path=row["folder_path"],
files_indexed=row["files_indexed"] or 0,
phashes_done=row["phashes_done"] or 0,
message=(
f"Paused — {row['files_indexed']:,} files indexed, "
f"{row['phashes_done']:,} phashes done"
),
)
con.close()
# ── Per-file extraction ───────────────────────────────────────────────────────
def _sha256(path: str) -> str:
h = hashlib.sha256()
with open(path, "rb") as f:
while chunk := f.read(65536):
h.update(chunk)
return h.hexdigest()
def _exif_data(path: str) -> tuple[str | None, str | None]:
"""Returns (exif_datetime, exif_device) or (None, None)."""
try:
img = Image.open(path)
exif_raw = img._getexif()
if not exif_raw:
return None, None
exif = {ExifTags.TAGS.get(k, k): v for k, v in exif_raw.items()}
dt = exif.get("DateTimeOriginal") or exif.get("DateTime")
if dt:
try:
dt = datetime.strptime(dt, "%Y:%m:%d %H:%M:%S").strftime("%Y-%m-%dT%H:%M:%S")
except ValueError:
dt = None
make = str(exif.get("Make", "")).strip()
model = str(exif.get("Model", "")).strip()
device = (make + " " + model).strip() if (make or model) else None
return dt, device
except Exception:
return None, None
def _image_dims(path: str) -> tuple[int | None, int | None]:
try:
with Image.open(path) as img:
return img.size # (width, height)
except Exception:
return None, None
def _phash(path: str) -> str | None:
try:
with Image.open(path) as img:
return str(imagehash.phash(img))
except Exception:
return None
def _video_dims(path: str) -> tuple[int | None, int | None]:
try:
result = subprocess.run(
[
"ffprobe", "-v", "error",
"-select_streams", "v:0",
"-show_entries", "stream=width,height",
"-of", "csv=p=0",
path,
],
capture_output=True, text=True, timeout=10,
)
parts = result.stdout.strip().split(",")
if len(parts) == 2:
return int(parts[0]), int(parts[1])
except Exception:
pass
return None, None
def _mtime_str(path: str) -> str | None:
try:
ts = os.path.getmtime(path)
return datetime.fromtimestamp(ts).strftime("%Y-%m-%dT%H:%M:%S")
except Exception:
return None
def extract_file(path: str) -> dict:
ext = Path(path).suffix.lower()
filename = Path(path).name
is_photo = ext in PHOTO_EXT
is_video = ext in VIDEO_EXT
record = {
"path": path,
"filename": filename,
"extension": ext,
"file_size": None,
"mime_type": None,
"sha256": None,
"phash": None,
"exif_datetime": None,
"exif_device": None,
"width": None,
"height": None,
"file_mtime": _mtime_str(path),
}
try:
record["file_size"] = os.path.getsize(path)
except OSError:
pass
record["mime_type"] = mimetypes.guess_type(path)[0]
try:
record["sha256"] = _sha256(path)
except OSError:
pass
if is_photo:
w, h = _image_dims(path)
record["width"], record["height"] = w, h
dt, device = _exif_data(path)
record["exif_datetime"] = dt or _mtime_str(path)
record["exif_device"] = device
# phash computed in separate phase for progress reporting
elif is_video:
w, h = _video_dims(path)
record["width"], record["height"] = w, h
record["exif_datetime"] = _mtime_str(path)
return record
# ── Union-Find for phash grouping ────────────────────────────────────────────
class UnionFind:
def __init__(self):
self.parent: dict[int, int] = {}
def find(self, x: int) -> int:
if x not in self.parent:
self.parent[x] = x
if self.parent[x] != x:
self.parent[x] = self.find(self.parent[x])
return self.parent[x]
def union(self, x: int, y: int):
px, py = self.find(x), self.find(y)
if px != py:
self.parent[px] = py
def groups(self) -> dict[int, list[int]]:
from collections import defaultdict
result: dict[int, list[int]] = defaultdict(list)
for x in self.parent:
result[self.find(x)].append(x)
return {k: v for k, v in result.items() if len(v) >= 2}
# ── Detection passes ──────────────────────────────────────────────────────────
# Explicit folder-priority ranking. Lower number = higher priority (preferred
# keeper). Higher number = mark redundant. Tokens match case-insensitively as
# substrings of the full path. When a path matches multiple tokens the WORST
# (highest) number wins — so /photos/#recycle/MobileBackup/foo.jpg ranks as
# #recycle (10), not MobileBackup (1).
#
# Override at runtime by writing /data/folder_priority.json:
# {"priorities": {"my_folder": 5, "trash": 10}, "default": 2}
_FOLDER_PRIORITY_DEFAULTS = (
("google photos", 11),
("googlephotos", 11),
("google_photos", 11),
("google-photos", 11),
("takeout", 11),
("google takeout", 11),
("googletakeout", 11),
("google backup", 11),
("googlebackup", 11),
("google_backup", 11),
("#recycle", 10),
("photoprism", 9),
("photoprizm", 8),
("photolibrary", 7),
("albumsbackup", 6),
("organized", 5),
("moved", 4),
("random", 3),
("mobilebackup", 1),
)
_FOLDER_PRIORITY_DEFAULT_BUCKET = 2 # "anything else"
_folder_priority_cache: tuple[tuple[tuple[str, int], ...], int] | None = None
def _load_folder_priority() -> tuple[tuple[tuple[str, int], ...], int]:
"""Load folder priority list from /data/folder_priority.json if present,
else fall back to defaults. Cached after first call per process."""
global _folder_priority_cache
if _folder_priority_cache is not None:
return _folder_priority_cache
entries: tuple[tuple[str, int], ...] = _FOLDER_PRIORITY_DEFAULTS
default_bucket = _FOLDER_PRIORITY_DEFAULT_BUCKET
try:
import json
path = "/data/folder_priority.json"
if os.path.exists(path):
with open(path) as f:
data = json.load(f)
entries = tuple(
(k.lower(), int(v))
for k, v in (data.get("priorities") or {}).items()
)
default_bucket = int(data.get("default", default_bucket))
except Exception:
pass
_folder_priority_cache = (entries, default_bucket)
return _folder_priority_cache
def _folder_priority(path: str) -> int:
"""Return the worst (highest) priority bucket matching any DIRECTORY segment
of this path, or default. Filename basename is intentionally excluded —
only folder names influence priority."""
entries, default_bucket = _load_folder_priority()
if not path:
return default_bucket
# Split on /, drop empty segments, drop the last (filename basename).
segments = [s.lower() for s in path.split("/") if s]
if len(segments) <= 1:
return default_bucket # no parent folder
dir_segments = segments[:-1]
worst: int | None = None
for seg in dir_segments:
for token, prio in entries:
if token in seg and (worst is None or prio > worst):
worst = prio
return worst if worst is not None else default_bucket
# Generic copy/backup signal — applies on top of explicit folder priority as a
# tiebreaker. Tokens match as whole-word-ish substrings of each path segment.
_DUP_FOLDER_TOKENS = (
"trash", "trashed", "dup", "dups", "duplicate", "duplicates",
"backup", "backups", "copy", "copies", "old", "archive", "archived",
)
def _path_penalty(path: str) -> int:
"""Higher = worse keeper candidate. Penalises FOLDERS (not filenames) that
look like copies/backups, plus repeated segments and very deep paths."""
if not path:
return 0
segments = [s for s in path.split("/") if s]
if not segments:
return 0
# Folder segments only — exclude filename basename
dir_segments = segments[:-1]
score = 0
for seg in dir_segments:
low = seg.lower()
for tok in _DUP_FOLDER_TOKENS:
if (tok in low.split() or tok == low
or f"_{tok}" in low or f"{tok}_" in low
or low.startswith(tok) or low.endswith(tok)):
score += 100
break
# Repeated folder segments like "Desktop/Desktop/Files" suggest a nested backup
seen: set[str] = set()
for seg in dir_segments:
low = seg.lower()
if low in seen:
score += 30
seen.add(low)
# Slight penalty for very deep paths (originals tend to live shallower)
score += max(0, len(dir_segments) - 6) * 5
return score
def _suggested_keeper_by_resolution(members: list[dict]) -> int:
"""Return file_id of best keeper.
Ranking, in order (lower wins):
1. Folder priority bucket (explicit list, e.g. #recycle = worst)
2. Highest pixel count (tie → largest file_size)
3. Lowest path penalty (Trashed/, Dups/, Backup/, deep nesting)
4. Earliest mtime (originals are usually older than their copies)
5. Earliest exif_datetime
"""
def res_size(m):
# Negate for descending sort with min()
return (-(m["width"] or 0) * (m["height"] or 0), -(m["file_size"] or 0))
def rank(m):
path = m.get("path") or ""
return (
_folder_priority(path),
res_size(m),
_path_penalty(path),
m.get("file_mtime") or "9999",
m.get("exif_datetime") or "9999-99-99T99:99:99",
)
return min(members, key=rank)["id"]
def _suggested_keeper_oldest(members: list[dict]) -> int:
def key(m):
return m["exif_datetime"] or "9999"
return min(members, key=key)["id"]
def _run_sha256_pass(con: sqlite3.Connection, scan_id: int):
cur = con.cursor()
cur.execute("""
SELECT sha256, COUNT(*) as cnt
FROM files
WHERE sha256 IS NOT NULL
GROUP BY sha256
HAVING cnt > 1
""")
rows = cur.fetchall()
for row in rows:
sha = row["sha256"]
cur.execute("""
SELECT id, path, width, height, file_size, exif_datetime, file_mtime
FROM files WHERE sha256 = ?
""", (sha,))
members = [dict(r) for r in cur.fetchall()]
keeper_id = _suggested_keeper_by_resolution(members)
cur.execute(
"INSERT INTO duplicate_groups (method, method_value) VALUES ('sha256', ?)",
(sha,),
)
group_id = cur.lastrowid
for m in members:
cur.execute(
"INSERT INTO duplicate_members (group_id, file_id, suggested) VALUES (?, ?, ?)",
(group_id, m["id"], 1 if m["id"] == keeper_id else 0),
)
def _run_phash_pass(con: sqlite3.Connection, scan_id: int):
cur = con.cursor()
# Exclude files already in sha256 groups
cur.execute("""
SELECT f.id, f.path, f.phash, f.width, f.height, f.file_size,
f.exif_datetime, f.file_mtime
FROM files f
WHERE f.phash IS NOT NULL
AND length(f.phash) = 16
AND f.extension NOT IN (
'.mp4','.mov','.avi','.mkv','.m4v','.3gp','.wmv','.mts','.m2ts'
)
AND f.id NOT IN (
SELECT dm.file_id FROM duplicate_members dm
JOIN duplicate_groups dg ON dg.id = dm.group_id
WHERE dg.method = 'sha256'
)
""")
rows = [dict(r) for r in cur.fetchall()]
if len(rows) < 2:
return
THRESHOLD = 10
# Multi-index pigeonhole: split each 64-bit phash into 16 nibble positions.
# If two hashes differ by ≤K bits, at least 16-K nibble positions are
# untouched, so any candidate pair shares at least one (position, nibble)
# bucket. Catches pairs the previous 2-hex-prefix bucketing missed.
buckets: dict[tuple[int, str], list[dict]] = {}
for r in rows:
for i, ch in enumerate(r["phash"]):
buckets.setdefault((i, ch), []).append(r)
uf = UnionFind()
for r in rows:
uf.find(r["id"])
hash_cache: dict[str, "imagehash.ImageHash"] = {}
def _h(s: str):
h = hash_cache.get(s)
if h is None:
h = imagehash.hex_to_hash(s)
hash_cache[s] = h
return h
seen_pairs: set[tuple[int, int]] = set()
for bucket in buckets.values():
if len(bucket) < 2:
continue
for i in range(len(bucket)):
for j in range(i + 1, len(bucket)):
a, b = bucket[i], bucket[j]
pair = (a["id"], b["id"]) if a["id"] < b["id"] else (b["id"], a["id"])
if pair in seen_pairs:
continue
seen_pairs.add(pair)
try:
if _h(a["phash"]) - _h(b["phash"]) <= THRESHOLD:
uf.union(a["id"], b["id"])
except Exception:
pass
id_map = {r["id"]: r for r in rows}
for _, member_ids in uf.groups().items():
members = [id_map[mid] for mid in member_ids if mid in id_map]
if len(members) < 2:
continue
keeper_id = _suggested_keeper_by_resolution(members)
keeper = id_map[keeper_id]
cur.execute(
"INSERT INTO duplicate_groups (method, method_value) VALUES ('phash', ?)",
(keeper["phash"],),
)
group_id = cur.lastrowid
for m in members:
cur.execute(
"INSERT INTO duplicate_members (group_id, file_id, suggested) VALUES (?, ?, ?)",
(group_id, m["id"], 1 if m["id"] == keeper_id else 0),
)
def _run_exif_pass(con: sqlite3.Connection, scan_id: int):
cur = con.cursor()
cur.execute("""
SELECT exif_datetime, exif_device, COUNT(*) as cnt
FROM files
WHERE exif_datetime IS NOT NULL
AND exif_device IS NOT NULL
AND id NOT IN (
SELECT file_id FROM duplicate_members dm
JOIN duplicate_groups dg ON dg.id = dm.group_id
WHERE dg.method IN ('sha256', 'phash')
)
GROUP BY exif_datetime, exif_device
HAVING cnt > 1
""")
rows = cur.fetchall()
for row in rows:
dt, dev = row["exif_datetime"], row["exif_device"]
cur.execute("""
SELECT id, path, width, height, file_size, exif_datetime, file_mtime
FROM files
WHERE exif_datetime = ? AND exif_device = ?
""", (dt, dev))
members = [dict(r) for r in cur.fetchall()]
keeper_id = _suggested_keeper_by_resolution(members)
method_value = f"{dt}::{dev}"
cur.execute(
"INSERT INTO duplicate_groups (method, method_value) VALUES ('exif', ?)",
(method_value,),
)
group_id = cur.lastrowid
for m in members:
cur.execute(
"INSERT INTO duplicate_members (group_id, file_id, suggested) VALUES (?, ?, ?)",
(group_id, m["id"], 1 if m["id"] == keeper_id else 0),
)
def _run_filesize_pass(con: sqlite3.Connection, scan_id: int):
cur = con.cursor()
cur.execute("""
SELECT file_size, width, height, COUNT(*) as cnt
FROM files
WHERE file_size IS NOT NULL
AND width IS NOT NULL
AND height IS NOT NULL
AND id NOT IN (
SELECT file_id FROM duplicate_members dm
JOIN duplicate_groups dg ON dg.id = dm.group_id
WHERE dg.method IN ('sha256', 'phash', 'exif')
)
GROUP BY file_size, width, height
HAVING cnt > 1
""")
rows = cur.fetchall()
for row in rows:
fs, w, h = row["file_size"], row["width"], row["height"]
cur.execute("""
SELECT id, path, width, height, file_size, exif_datetime, file_mtime
FROM files
WHERE file_size = ? AND width = ? AND height = ?
""", (fs, w, h))
members = [dict(r) for r in cur.fetchall()]
# Filesize+dim is the weakest signal — folder/mtime tiebreak helps a lot here
keeper_id = _suggested_keeper_by_resolution(members)
method_value = f"{fs}::{w}x{h}"
cur.execute(
"INSERT INTO duplicate_groups (method, method_value) VALUES ('filesize', ?)",
(method_value,),
)
group_id = cur.lastrowid
for m in members:
cur.execute(
"INSERT INTO duplicate_members (group_id, file_id, suggested) VALUES (?, ?, ?)",
(group_id, m["id"], 1 if m["id"] == keeper_id else 0),
)
# ── Pause helpers ────────────────────────────────────────────────────────────
def _save_pause_state(cur, scan_id: int, phase: str,
files_indexed: int, phashes_done: int):
"""Persist pause progress so the scan survives a server restart."""
cur.execute("""
UPDATE scans SET
status = 'paused',
last_phase = ?,
files_indexed = ?,
phashes_done = ?
WHERE id = ?
""", (phase, files_indexed, phashes_done, scan_id))
# ── Main scan entry point ─────────────────────────────────────────────────────
def run_scan(folder_path: str, scan_id: int, mode: str = "incremental"):
"""Main scan function — runs in background thread."""
global scan_state
scan_state["folder_path"] = folder_path # persist so resume knows where to continue
con = get_db()
cur = con.cursor()
try:
# ── Mode: full reset ──────────────────────────────────────────────
if mode == "full_reset":
cur.execute("DELETE FROM duplicate_members")
cur.execute("DELETE FROM duplicate_groups")
cur.execute("DELETE FROM files")
con.commit()
# ── Phase: takeout detection (sidecar processing deferred until after
# indexing — sidecars enrich existing DB rows, so files must be there). ─
scan_state.update(phase="takeout",
message="Checking for Google Takeout structure...")
is_takeout = is_takeout_folder(folder_path)
scan_state["message"] = (
"Takeout detected — sidecars will be processed after indexing"
if is_takeout else "Not a Takeout folder — skipping"
)
if scan_state["pause_requested"]:
_save_pause_state(cur, scan_id, "takeout", 0, 0)
con.commit()
scan_state.update(
status="paused", pause_requested=False,
message="Paused during Takeout check",
)
return
# ── Phases: discovery + indexing (pipelined) ──────────────────────
# Workers start hashing files the instant they are discovered —
# no waiting for the full directory walk to finish first.
#
# Workers: 2× CPU count, capped at 16. Tune via DUPFINDER_WORKERS.
N_WORKERS = int(os.environ.get(
"DUPFINDER_WORKERS",
min(max((os.cpu_count() or 4) * 2, 4), 16)
))
scan_state.update(
phase="indexing", progress=0, total=0,
message=f"Scanning — discovering & indexing in parallel ({N_WORKERS} workers)..."
)
# Pre-load existing DB records once (avoids per-file queries)
cur.execute("SELECT path, id, file_size FROM files")
existing_db: dict[str, dict] = {
row["path"]: {"id": row["id"], "file_size": row["file_size"]}
for row in cur.fetchall()
}
# Shared counters (updated from multiple threads)
_lock = threading.Lock()
_discovered = [0] # total files found by walker so far
_done = [0] # files fully indexed (skipped + processed)
_walk_done = [False]
_pause_at_end = False # set True when pause requested mid-walk
all_files: list[str] = []
to_skip: list[str] = []
changed_ids: list[int] = []
def _index_file(path: str) -> dict | None:
try:
return extract_file(path)
except Exception:
return None
def _write_result(path: str, record: dict | None, existing: dict | None):
"""Write one file result to DB. Called on main thread only."""
if record is None:
cur.execute(
"INSERT OR IGNORE INTO files "
" (path, filename, extension, scan_id, status) "
"VALUES (?, ?, ?, ?, 'error')",
(path, Path(path).name, Path(path).suffix.lower(), scan_id),
)
cur.execute(
"UPDATE files SET status='error', scan_id=?, "
" updated_at=CURRENT_TIMESTAMP WHERE path=?",
(scan_id, path),
)
else:
record["scan_id"] = scan_id
if existing:
cur.execute("""
UPDATE files SET
filename=:filename, extension=:extension,
file_size=:file_size, mime_type=:mime_type,
sha256=:sha256, exif_datetime=:exif_datetime,
exif_device=:exif_device, width=:width,
height=:height, file_mtime=:file_mtime,
scan_id=:scan_id,
status='pending', updated_at=CURRENT_TIMESTAMP
WHERE path=:path
""", record)
else:
cur.execute("""
INSERT OR IGNORE INTO files
(path, filename, extension, file_size, mime_type,
sha256, exif_datetime, exif_device, width,
height, file_mtime, scan_id, status)
VALUES
(:path, :filename, :extension, :file_size,
:mime_type, :sha256, :exif_datetime,
:exif_device, :width, :height, :file_mtime,
:scan_id, 'pending')
""", record)
with ThreadPoolExecutor(max_workers=N_WORKERS) as pool:
pending: dict = {} # future → (path, existing)
def _drain(limit: int = 50):
"""Collect up to `limit` completed futures and write to DB."""
done_futures = [f for f in list(pending) if f.done()][:limit]
for f in done_futures:
path, existing = pending.pop(f)
_write_result(path, f.result(), existing)
with _lock:
_done[0] += 1
d = _done[0]
disc = _discovered[0]
walking = not _walk_done[0]
scan_state["progress"] = d
scan_state["total"] = disc
scan_state["message"] = (
f"{'Discovering & i' if walking else 'I'}ndexing "
f"({N_WORKERS}w): {d:,}"
+ (f" / {disc:,}" if not walking else f"{disc:,} found so far")
)
if done_futures and _done[0] % 200 == 0:
con.commit()
# ── Walk + submit ─────────────────────────────────────────────
for root, dirs, files in os.walk(folder_path):
dirs[:] = [d for d in dirs if not d.startswith(".")]
if scan_state["pause_requested"]:
_pause_at_end = True
break # stop walking; in-flight futures drain normally
for fname in files:
if fname.endswith(".json"):
continue
ext = Path(fname).suffix.lower()
if ext not in SUPPORTED_EXT:
continue
path = os.path.join(root, fname)
all_files.append(path)
with _lock:
_discovered[0] += 1
existing = existing_db.get(path)
try:
current_size = os.path.getsize(path)
except OSError:
continue
# Skip unchanged files
if existing and mode in ("incremental", "new_files"):
if mode == "new_files" or existing["file_size"] == current_size:
to_skip.append(path)
with _lock:
_done[0] += 1
continue
changed_ids.append(existing["id"])
# Submit to thread pool immediately
future = pool.submit(_index_file, path)
pending[future] = (path, existing)
# Drain completed results regularly to avoid memory buildup
if len(pending) >= N_WORKERS * 4:
_drain(N_WORKERS * 2)
# Drain after each directory
_drain(20)
_walk_done[0] = True
# ── Bulk-stamp skipped files ──────────────────────────────────
for chunk_start in range(0, len(to_skip), 500):
chunk = to_skip[chunk_start : chunk_start + 500]
cur.executemany(
"UPDATE files SET scan_id = ? WHERE path = ?",
[(scan_id, p) for p in chunk],
)
for fid in changed_ids:
cur.execute(
"DELETE FROM duplicate_members WHERE file_id = ?", (fid,)
)
con.commit()
# ── Wait for remaining futures ────────────────────────────────
scan_state["total"] = len(all_files)
for future in as_completed(pending):
path, existing = pending[future]
_write_result(path, future.result(), existing)
with _lock:
_done[0] += 1
d = _done[0]
scan_state["progress"] = d
scan_state["message"] = (
f"Indexing ({N_WORKERS}w): {d:,} / {len(all_files):,}"
)
if d % 200 == 0:
con.commit()
con.commit()
# ── Pause checkpoint: after indexing ──────────────────────────────
scan_state["files_indexed"] = _done[0]
if _pause_at_end:
_save_pause_state(cur, scan_id, "indexing", _done[0], 0)
con.commit()
scan_state.update(
status="paused", pause_requested=False,
message=f"Paused — {_done[0]:,} files indexed",
)
return
# ── Takeout sidecar enrichment (now that files exist in DB) ───────
if is_takeout:
scan_state.update(phase="takeout",
message="Processing Google Takeout sidecars...")
try:
enriched = process_takeout(folder_path, DB_PATH)
scan_state["message"] = f"Takeout: enriched {enriched:,} files"
except Exception as exc:
scan_state["message"] = f"Takeout enrichment failed: {exc}"
# ── Phase: phash ──────────────────────────────────────────────────
phasher = get_phasher()
hw_label = "GPU" if phasher.using_gpu else "CPU"
scan_state.update(phase="phash", progress=0,
message=f"Computing perceptual hashes ({hw_label})...")
cur.execute("""
SELECT id, path FROM files
WHERE extension IN (
'.jpg','.jpeg','.png','.gif','.bmp','.tiff','.tif',
'.webp','.heic','.heif','.raw','.cr2','.nef','.arw',
'.dng','.orf','.rw2','.pef','.srw','.x3f'
) AND phash IS NULL AND status != 'error'
""")
photo_rows = cur.fetchall()
scan_state["total"] = len(photo_rows)
if photo_rows:
path_to_id = {row["path"]: row["id"] for row in photo_rows}
all_paths = list(path_to_id.keys())
# Process in chunks so pause requests are honoured between batches
PHASH_CHUNK = 500
phashes_written = 0
for chunk_start in range(0, len(all_paths), PHASH_CHUNK):
if scan_state["pause_requested"]:
_save_pause_state(
cur, scan_id, "phash",
scan_state["files_indexed"], phashes_written,
)
con.commit()
scan_state.update(
status="paused", pause_requested=False,
phashes_done=phashes_written,
message=(
f"Paused — {phashes_written:,} / {len(all_paths):,} "
"perceptual hashes computed"
),
)
return
chunk = all_paths[chunk_start : chunk_start + PHASH_CHUNK]
chunk_results = phasher.hash_files(chunk, progress_cb=None)
for path, ph in chunk_results.items():
fid = path_to_id.get(path)
if fid and ph:
cur.execute(
"UPDATE files SET phash=? WHERE id=?", (ph, fid)
)
con.commit()
phashes_written += len(chunk)
scan_state["phashes_done"] = phashes_written
scan_state["progress"] = phashes_written
scan_state["message"] = (
f"Phash ({hw_label}): {phashes_written:,} / {len(all_paths):,}"
)
con.commit()
# ── Phase: grouping ───────────────────────────────────────────────
scan_state.update(phase="grouping", progress=0, total=4,
message="Running duplicate detection...")
# Snapshot reviewed groups so we can re-apply decisions to any
# post-regrouping group whose member-set is unchanged.
prior_reviewed: dict[tuple[str, frozenset], int | None] = {}
if mode in ("incremental", "regroup"):
cur.execute("""
SELECT dg.id, dg.method, dm.file_id, dm.is_keeper
FROM duplicate_groups dg
JOIN duplicate_members dm ON dm.group_id = dg.id
WHERE dg.reviewed = 1
""")
snap: dict[int, dict] = {}
for r in cur.fetchall():
g = snap.setdefault(
r["id"],
{"method": r["method"], "members": set(), "keeper": None},
)
g["members"].add(r["file_id"])
if r["is_keeper"]:
g["keeper"] = r["file_id"]
for g in snap.values():
prior_reviewed[(g["method"], frozenset(g["members"]))] = g["keeper"]
if mode in ("incremental", "full_reset", "regroup"):
cur.execute("DELETE FROM duplicate_members")
cur.execute("DELETE FROM duplicate_groups")
con.commit()
elif mode == "new_files":
# Only clear groups containing new files
cur.execute("""
DELETE FROM duplicate_groups WHERE id IN (
SELECT DISTINCT dm.group_id FROM duplicate_members dm
JOIN files f ON f.id = dm.file_id
WHERE f.scan_id = ?
)
""", (scan_id,))
con.commit()
scan_state["message"] = "Pass 1/4: SHA-256 exact duplicates..."
_run_sha256_pass(con, scan_id)
scan_state["progress"] = 1
con.commit()
scan_state["message"] = "Pass 2/4: Perceptual hash similarity..."
_run_phash_pass(con, scan_id)
scan_state["progress"] = 2
con.commit()
scan_state["message"] = "Pass 3/4: EXIF timestamp + device..."
_run_exif_pass(con, scan_id)
scan_state["progress"] = 3
con.commit()
scan_state["message"] = "Pass 4/4: File size + dimensions..."
_run_filesize_pass(con, scan_id)
scan_state["progress"] = 4
con.commit()
# ── Re-apply prior review decisions where membership unchanged ────
if prior_reviewed:
cur.execute("""
SELECT dg.id, dg.method, dm.file_id
FROM duplicate_groups dg
JOIN duplicate_members dm ON dm.group_id = dg.id
""")
new_groups: dict[int, dict] = {}
for r in cur.fetchall():
g = new_groups.setdefault(
r["id"], {"method": r["method"], "members": set()}
)
g["members"].add(r["file_id"])
restored = 0
for gid, g in new_groups.items():
key = (g["method"], frozenset(g["members"]))
if key not in prior_reviewed:
continue
keeper = prior_reviewed[key]
cur.execute(
"UPDATE duplicate_groups SET reviewed=1 WHERE id=?", (gid,)
)
for fid in g["members"]:
is_k = 1 if fid == keeper else 0
cur.execute(
"UPDATE duplicate_members "
"SET is_keeper=?, suggested=? "
"WHERE group_id=? AND file_id=?",
(is_k, is_k, gid, fid),
)
cur.execute(
"UPDATE files SET status=? WHERE id=?",
("keeper" if is_k else "redundant", fid),
)
log_decision(
cur, fid, gid,
"keeper" if is_k else "redundant",
"rescan-restore",
)
restored += 1
con.commit()
scan_state["message"] = f"Restored {restored:,} prior review decisions"
# Reset orphaned keeper status for files no longer in any group
if mode == "incremental":
cur.execute("""
UPDATE files SET status='pending'
WHERE status IN ('keeper', 'redundant')
AND id NOT IN (SELECT file_id FROM duplicate_members)
""")
con.commit()
# Update scan record
cur.execute(
"UPDATE scans SET completed_at=CURRENT_TIMESTAMP, total_files=?, status='complete' "
"WHERE id=?",
(len(all_files), scan_id),
)
con.commit()
scan_state.update(status="complete", phase="done",
message="Scan complete.", progress=scan_state["total"])
_update_stats()
except Exception as e:
scan_state.update(status="error", message=str(e))
try:
_mark_scan(cur, scan_id, "error")
con.commit()
except Exception:
pass
finally:
con.close()
def _mark_scan(cur, scan_id: int, status: str):
cur.execute(
"UPDATE scans SET completed_at=CURRENT_TIMESTAMP, status=? WHERE id=?",
(status, scan_id),
)
def _update_stats():
"""Refresh stats in scan_state."""
try:
con = get_db()
cur = con.cursor()
cur.execute("SELECT COUNT(*) FROM files WHERE status != 'error'")
total_files = cur.fetchone()[0]
cur.execute("SELECT COUNT(*), SUM(file_size) FROM files WHERE status='redundant'")
r = cur.fetchone()
dup_count = r[0] or 0
dup_size = r[1] or 0
for method in ("sha256", "phash", "exif", "filesize"):
cur.execute(
"SELECT COUNT(*) FROM duplicate_groups WHERE method=?", (method,)
)
cur.execute("""
SELECT method,
COUNT(*) as groups,
(SELECT COUNT(*) FROM duplicate_members dm2
JOIN duplicate_groups dg2 ON dg2.id=dm2.group_id
WHERE dg2.method=dg.method) as files
FROM duplicate_groups dg
GROUP BY method
""")
by_method = {r["method"]: {"groups": r["groups"], "files": r["files"]}
for r in cur.fetchall()}
cur.execute("SELECT COUNT(*) FROM duplicate_groups WHERE reviewed=1")
reviewed = cur.fetchone()[0]
cur.execute("SELECT COUNT(*) FROM duplicate_groups WHERE reviewed=0")
pending = cur.fetchone()[0]
scan_state["stats"] = {
"total_files": total_files,
"duplicate_files": dup_count,
"duplicate_size_bytes": dup_size,
"groups_by_method": by_method,
"reviewed": reviewed,
"pending": pending,
}
con.close()
except Exception:
pass