Files
duplicate-finder/app/scanner.py
tocmo 356f922940 feat: replace Cancel with Pause/Resume — survives server restarts
- scanner.py: replace cancel_requested with pause_requested throughout;
  pause during walk drains in-flight futures gracefully then saves state;
  phash phase processes in 500-image chunks with pause check between each;
  _save_pause_state() persists files_indexed/phashes_done/last_phase to DB;
  init_db() already detects killed-mid-scan (running→paused) on startup

- main.py: add POST /api/scan/pause and POST /api/scan/resume endpoints;
  /api/scan/cancel kept as alias; scan_status now returns folder_path,
  files_indexed, phashes_done; scan_reset clears all new fields

- index.html: "Cancel" → "⏸ Pause" button; new #paused-area banner shows
  folder, files indexed, phashes done with "▶ Resume" and "Full reset"
  buttons; updateScanUI handles paused status; pauseScan()/resumeScan()
  JS functions added; chip gains .paused amber style

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-05 02:11:00 -04:00

951 lines
35 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
File scanner: discovery, per-file extraction, and all 4 duplicate detection passes.
"""
import hashlib
import mimetypes
import os
import sqlite3
import subprocess
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from pathlib import Path
import imagehash
from PIL import Image, ExifTags, UnidentifiedImageError
try:
from pillow_heif import register_heif_opener
register_heif_opener()
except ImportError:
pass
from takeout import is_takeout_folder, process_takeout
from gpu_hasher import get_phasher
PHOTO_EXT = {
".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".tif",
".webp", ".heic", ".heif", ".raw", ".cr2", ".nef", ".arw",
".dng", ".orf", ".rw2", ".pef", ".srw", ".x3f",
}
VIDEO_EXT = {
".mp4", ".mov", ".avi", ".mkv", ".m4v", ".3gp",
".wmv", ".mts", ".m2ts",
}
SUPPORTED_EXT = PHOTO_EXT | VIDEO_EXT
_DATA_DIR = Path("/data") if Path("/data").exists() else Path(__file__).parent.parent / "data"
_DATA_DIR.mkdir(parents=True, exist_ok=True)
DB_PATH = str(_DATA_DIR / "dupfinder.db")
# Shared scan state (updated by background thread, read by status endpoint)
scan_state = {
"scan_id": None,
"status": "idle", # idle|running|paused|complete|error
"phase": "idle", # takeout|indexing|phash|grouping|done
"progress": 0,
"total": 0,
"message": "",
"folder_path": None, # persists so resume knows where to continue
"pause_requested": False,
"files_indexed": 0, # cumulative across phases
"phashes_done": 0,
"stats": {},
}
# ── DB helpers ────────────────────────────────────────────────────────────────
def get_db() -> sqlite3.Connection:
con = sqlite3.connect(DB_PATH, timeout=30)
con.row_factory = sqlite3.Row
con.execute("PRAGMA journal_mode=WAL")
con.execute("PRAGMA foreign_keys=ON")
return con
def init_db():
con = get_db()
cur = con.cursor()
cur.executescript("""
CREATE TABLE IF NOT EXISTS files (
id INTEGER PRIMARY KEY AUTOINCREMENT,
path TEXT UNIQUE NOT NULL,
filename TEXT NOT NULL,
extension TEXT,
file_size INTEGER,
mime_type TEXT,
sha256 TEXT,
phash TEXT,
exif_datetime TEXT,
exif_device TEXT,
width INTEGER,
height INTEGER,
is_takeout INTEGER DEFAULT 0,
is_edited INTEGER DEFAULT 0,
takeout_json TEXT,
scan_id INTEGER,
status TEXT DEFAULT 'pending',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS scans (
id INTEGER PRIMARY KEY AUTOINCREMENT,
folder_path TEXT NOT NULL,
started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
completed_at TIMESTAMP,
total_files INTEGER DEFAULT 0,
files_indexed INTEGER DEFAULT 0,
phashes_done INTEGER DEFAULT 0,
last_phase TEXT DEFAULT 'indexing',
status TEXT DEFAULT 'running'
);
CREATE TABLE IF NOT EXISTS duplicate_groups (
id INTEGER PRIMARY KEY AUTOINCREMENT,
method TEXT NOT NULL,
method_value TEXT,
reviewed INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS duplicate_members (
id INTEGER PRIMARY KEY AUTOINCREMENT,
group_id INTEGER REFERENCES duplicate_groups(id) ON DELETE CASCADE,
file_id INTEGER REFERENCES files(id) ON DELETE CASCADE,
is_keeper INTEGER DEFAULT 0,
suggested INTEGER DEFAULT 0
);
CREATE INDEX IF NOT EXISTS idx_sha256 ON files(sha256);
CREATE INDEX IF NOT EXISTS idx_phash ON files(phash);
CREATE INDEX IF NOT EXISTS idx_exif_dt ON files(exif_datetime, exif_device);
CREATE INDEX IF NOT EXISTS idx_size_dim ON files(file_size, width, height);
CREATE INDEX IF NOT EXISTS idx_status ON files(status);
""")
# Migration: add new columns to scans if upgrading from older schema
for col, defn in [
("files_indexed", "INTEGER DEFAULT 0"),
("phashes_done", "INTEGER DEFAULT 0"),
("last_phase", "TEXT DEFAULT 'indexing'"),
]:
try:
cur.execute(f"ALTER TABLE scans ADD COLUMN {col} {defn}")
except Exception:
pass # column already exists
con.commit()
# ── Detect interrupted scans from previous run ────────────────────────────
# Any scan left as 'running' means the server was killed mid-scan.
# Mark them 'paused' so the UI offers a resume button.
cur.execute("""
UPDATE scans SET status = 'paused'
WHERE status = 'running'
""")
con.commit()
# Restore scan_state if there's a paused scan
cur.execute("""
SELECT id, folder_path, files_indexed, phashes_done, last_phase
FROM scans WHERE status = 'paused'
ORDER BY started_at DESC LIMIT 1
""")
row = cur.fetchone()
if row:
scan_state.update(
scan_id=row["id"],
status="paused",
phase=row["last_phase"] or "indexing",
folder_path=row["folder_path"],
files_indexed=row["files_indexed"] or 0,
phashes_done=row["phashes_done"] or 0,
message=(
f"Paused — {row['files_indexed']:,} files indexed, "
f"{row['phashes_done']:,} phashes done"
),
)
con.close()
# ── Per-file extraction ───────────────────────────────────────────────────────
def _sha256(path: str) -> str:
h = hashlib.sha256()
with open(path, "rb") as f:
while chunk := f.read(65536):
h.update(chunk)
return h.hexdigest()
def _exif_data(path: str) -> tuple[str | None, str | None]:
"""Returns (exif_datetime, exif_device) or (None, None)."""
try:
img = Image.open(path)
exif_raw = img._getexif()
if not exif_raw:
return None, None
exif = {ExifTags.TAGS.get(k, k): v for k, v in exif_raw.items()}
dt = exif.get("DateTimeOriginal") or exif.get("DateTime")
if dt:
try:
dt = datetime.strptime(dt, "%Y:%m:%d %H:%M:%S").strftime("%Y-%m-%dT%H:%M:%S")
except ValueError:
dt = None
make = str(exif.get("Make", "")).strip()
model = str(exif.get("Model", "")).strip()
device = (make + " " + model).strip() if (make or model) else None
return dt, device
except Exception:
return None, None
def _image_dims(path: str) -> tuple[int | None, int | None]:
try:
with Image.open(path) as img:
return img.size # (width, height)
except Exception:
return None, None
def _phash(path: str) -> str | None:
try:
with Image.open(path) as img:
return str(imagehash.phash(img))
except Exception:
return None
def _video_dims(path: str) -> tuple[int | None, int | None]:
try:
result = subprocess.run(
[
"ffprobe", "-v", "error",
"-select_streams", "v:0",
"-show_entries", "stream=width,height",
"-of", "csv=p=0",
path,
],
capture_output=True, text=True, timeout=10,
)
parts = result.stdout.strip().split(",")
if len(parts) == 2:
return int(parts[0]), int(parts[1])
except Exception:
pass
return None, None
def _mtime_str(path: str) -> str | None:
try:
ts = os.path.getmtime(path)
return datetime.fromtimestamp(ts).strftime("%Y-%m-%dT%H:%M:%S")
except Exception:
return None
def extract_file(path: str) -> dict:
ext = Path(path).suffix.lower()
filename = Path(path).name
is_photo = ext in PHOTO_EXT
is_video = ext in VIDEO_EXT
record = {
"path": path,
"filename": filename,
"extension": ext,
"file_size": None,
"mime_type": None,
"sha256": None,
"phash": None,
"exif_datetime": None,
"exif_device": None,
"width": None,
"height": None,
}
try:
record["file_size"] = os.path.getsize(path)
except OSError:
pass
record["mime_type"] = mimetypes.guess_type(path)[0]
try:
record["sha256"] = _sha256(path)
except OSError:
pass
if is_photo:
w, h = _image_dims(path)
record["width"], record["height"] = w, h
dt, device = _exif_data(path)
record["exif_datetime"] = dt or _mtime_str(path)
record["exif_device"] = device
# phash computed in separate phase for progress reporting
elif is_video:
w, h = _video_dims(path)
record["width"], record["height"] = w, h
record["exif_datetime"] = _mtime_str(path)
return record
# ── Union-Find for phash grouping ────────────────────────────────────────────
class UnionFind:
def __init__(self):
self.parent: dict[int, int] = {}
def find(self, x: int) -> int:
if x not in self.parent:
self.parent[x] = x
if self.parent[x] != x:
self.parent[x] = self.find(self.parent[x])
return self.parent[x]
def union(self, x: int, y: int):
px, py = self.find(x), self.find(y)
if px != py:
self.parent[px] = py
def groups(self) -> dict[int, list[int]]:
from collections import defaultdict
result: dict[int, list[int]] = defaultdict(list)
for x in self.parent:
result[self.find(x)].append(x)
return {k: v for k, v in result.items() if len(v) >= 2}
# ── Detection passes ──────────────────────────────────────────────────────────
def _suggested_keeper_by_resolution(members: list[dict]) -> int:
"""Return file_id of highest resolution member; tie-break by size then oldest date."""
def score(m):
w = m["width"] or 0
h = m["height"] or 0
size = m["file_size"] or 0
dt = m["exif_datetime"] or "9999"
return (w * h, size, dt)
best = max(members, key=lambda m: (
(m["width"] or 0) * (m["height"] or 0),
m["file_size"] or 0,
# older date = better; invert by negating epoch or use str comparison inverted
))
return best["id"]
def _suggested_keeper_oldest(members: list[dict]) -> int:
def key(m):
return m["exif_datetime"] or "9999"
return min(members, key=key)["id"]
def _run_sha256_pass(con: sqlite3.Connection, scan_id: int):
cur = con.cursor()
cur.execute("""
SELECT sha256, COUNT(*) as cnt
FROM files
WHERE sha256 IS NOT NULL
GROUP BY sha256
HAVING cnt > 1
""")
rows = cur.fetchall()
for row in rows:
sha = row["sha256"]
cur.execute("""
SELECT id, width, height, file_size, exif_datetime
FROM files WHERE sha256 = ?
""", (sha,))
members = [dict(r) for r in cur.fetchall()]
keeper_id = _suggested_keeper_by_resolution(members)
cur.execute(
"INSERT INTO duplicate_groups (method, method_value) VALUES ('sha256', ?)",
(sha,),
)
group_id = cur.lastrowid
for m in members:
cur.execute(
"INSERT INTO duplicate_members (group_id, file_id, suggested) VALUES (?, ?, ?)",
(group_id, m["id"], 1 if m["id"] == keeper_id else 0),
)
def _run_phash_pass(con: sqlite3.Connection, scan_id: int):
cur = con.cursor()
# Exclude files already in sha256 groups
cur.execute("""
SELECT f.id, f.phash, f.width, f.height, f.file_size, f.exif_datetime
FROM files f
WHERE f.phash IS NOT NULL
AND f.extension NOT IN (
'.mp4','.mov','.avi','.mkv','.m4v','.3gp','.wmv','.mts','.m2ts'
)
AND f.id NOT IN (
SELECT dm.file_id FROM duplicate_members dm
JOIN duplicate_groups dg ON dg.id = dm.group_id
WHERE dg.method = 'sha256'
)
""")
rows = [dict(r) for r in cur.fetchall()]
if len(rows) < 2:
return
# Bucket by first 2 hex chars to reduce O(n²) comparisons
buckets: dict[str, list[dict]] = {}
for r in rows:
key = r["phash"][:2]
buckets.setdefault(key, []).append(r)
uf = UnionFind()
# Ensure all IDs are registered
for r in rows:
uf.find(r["id"])
THRESHOLD = 10
for bucket in buckets.values():
for i in range(len(bucket)):
for j in range(i + 1, len(bucket)):
a, b = bucket[i], bucket[j]
try:
dist = imagehash.hex_to_hash(a["phash"]) - imagehash.hex_to_hash(b["phash"])
if dist <= THRESHOLD:
uf.union(a["id"], b["id"])
except Exception:
pass
id_map = {r["id"]: r for r in rows}
for _, member_ids in uf.groups().items():
members = [id_map[mid] for mid in member_ids if mid in id_map]
if len(members) < 2:
continue
keeper_id = _suggested_keeper_by_resolution(members)
keeper = id_map[keeper_id]
cur.execute(
"INSERT INTO duplicate_groups (method, method_value) VALUES ('phash', ?)",
(keeper["phash"],),
)
group_id = cur.lastrowid
for m in members:
cur.execute(
"INSERT INTO duplicate_members (group_id, file_id, suggested) VALUES (?, ?, ?)",
(group_id, m["id"], 1 if m["id"] == keeper_id else 0),
)
def _run_exif_pass(con: sqlite3.Connection, scan_id: int):
cur = con.cursor()
cur.execute("""
SELECT exif_datetime, exif_device, COUNT(*) as cnt
FROM files
WHERE exif_datetime IS NOT NULL
AND exif_device IS NOT NULL
AND id NOT IN (
SELECT file_id FROM duplicate_members dm
JOIN duplicate_groups dg ON dg.id = dm.group_id
WHERE dg.method IN ('sha256', 'phash')
)
GROUP BY exif_datetime, exif_device
HAVING cnt > 1
""")
rows = cur.fetchall()
for row in rows:
dt, dev = row["exif_datetime"], row["exif_device"]
cur.execute("""
SELECT id, width, height, file_size, exif_datetime
FROM files
WHERE exif_datetime = ? AND exif_device = ?
""", (dt, dev))
members = [dict(r) for r in cur.fetchall()]
keeper_id = _suggested_keeper_by_resolution(members)
method_value = f"{dt}::{dev}"
cur.execute(
"INSERT INTO duplicate_groups (method, method_value) VALUES ('exif', ?)",
(method_value,),
)
group_id = cur.lastrowid
for m in members:
cur.execute(
"INSERT INTO duplicate_members (group_id, file_id, suggested) VALUES (?, ?, ?)",
(group_id, m["id"], 1 if m["id"] == keeper_id else 0),
)
def _run_filesize_pass(con: sqlite3.Connection, scan_id: int):
cur = con.cursor()
cur.execute("""
SELECT file_size, width, height, COUNT(*) as cnt
FROM files
WHERE file_size IS NOT NULL
AND width IS NOT NULL
AND height IS NOT NULL
AND id NOT IN (
SELECT file_id FROM duplicate_members dm
JOIN duplicate_groups dg ON dg.id = dm.group_id
WHERE dg.method IN ('sha256', 'phash', 'exif')
)
GROUP BY file_size, width, height
HAVING cnt > 1
""")
rows = cur.fetchall()
for row in rows:
fs, w, h = row["file_size"], row["width"], row["height"]
cur.execute("""
SELECT id, width, height, file_size, exif_datetime
FROM files
WHERE file_size = ? AND width = ? AND height = ?
""", (fs, w, h))
members = [dict(r) for r in cur.fetchall()]
keeper_id = _suggested_keeper_oldest(members)
method_value = f"{fs}::{w}x{h}"
cur.execute(
"INSERT INTO duplicate_groups (method, method_value) VALUES ('filesize', ?)",
(method_value,),
)
group_id = cur.lastrowid
for m in members:
cur.execute(
"INSERT INTO duplicate_members (group_id, file_id, suggested) VALUES (?, ?, ?)",
(group_id, m["id"], 1 if m["id"] == keeper_id else 0),
)
# ── Pause helpers ────────────────────────────────────────────────────────────
def _save_pause_state(cur, scan_id: int, phase: str,
files_indexed: int, phashes_done: int):
"""Persist pause progress so the scan survives a server restart."""
cur.execute("""
UPDATE scans SET
status = 'paused',
last_phase = ?,
files_indexed = ?,
phashes_done = ?
WHERE id = ?
""", (phase, files_indexed, phashes_done, scan_id))
# ── Main scan entry point ─────────────────────────────────────────────────────
def run_scan(folder_path: str, scan_id: int, mode: str = "incremental"):
"""Main scan function — runs in background thread."""
global scan_state
scan_state["folder_path"] = folder_path # persist so resume knows where to continue
con = get_db()
cur = con.cursor()
try:
# ── Mode: full reset ──────────────────────────────────────────────
if mode == "full_reset":
cur.execute("DELETE FROM duplicate_members")
cur.execute("DELETE FROM duplicate_groups")
cur.execute("DELETE FROM files")
con.commit()
# ── Phase: takeout check (quick sample, ≤50 dirs) ─────────────────
scan_state.update(phase="takeout",
message="Checking for Google Takeout structure...")
if is_takeout_folder(folder_path):
scan_state["message"] = "Processing Google Takeout sidecars..."
process_takeout(folder_path, DB_PATH)
else:
scan_state["message"] = "Not a Takeout folder — skipping"
if scan_state["pause_requested"]:
_save_pause_state(cur, scan_id, "takeout", 0, 0)
con.commit()
scan_state.update(
status="paused", pause_requested=False,
message="Paused during Takeout check",
)
return
# ── Phases: discovery + indexing (pipelined) ──────────────────────
# Workers start hashing files the instant they are discovered —
# no waiting for the full directory walk to finish first.
#
# Workers: 2× CPU count, capped at 16. Tune via DUPFINDER_WORKERS.
N_WORKERS = int(os.environ.get(
"DUPFINDER_WORKERS",
min(max((os.cpu_count() or 4) * 2, 4), 16)
))
scan_state.update(
phase="indexing", progress=0, total=0,
message=f"Scanning — discovering & indexing in parallel ({N_WORKERS} workers)..."
)
# Pre-load existing DB records once (avoids per-file queries)
cur.execute("SELECT path, id, file_size FROM files")
existing_db: dict[str, dict] = {
row["path"]: {"id": row["id"], "file_size": row["file_size"]}
for row in cur.fetchall()
}
# Shared counters (updated from multiple threads)
_lock = threading.Lock()
_discovered = [0] # total files found by walker so far
_done = [0] # files fully indexed (skipped + processed)
_walk_done = [False]
_pause_at_end = False # set True when pause requested mid-walk
all_files: list[str] = []
to_skip: list[str] = []
changed_ids: list[int] = []
def _index_file(path: str) -> dict | None:
try:
return extract_file(path)
except Exception:
return None
def _write_result(path: str, record: dict | None, existing: dict | None):
"""Write one file result to DB. Called on main thread only."""
if record is None:
cur.execute(
"INSERT OR IGNORE INTO files "
" (path, filename, extension, scan_id, status) "
"VALUES (?, ?, ?, ?, 'error')",
(path, Path(path).name, Path(path).suffix.lower(), scan_id),
)
cur.execute(
"UPDATE files SET status='error', scan_id=?, "
" updated_at=CURRENT_TIMESTAMP WHERE path=?",
(scan_id, path),
)
else:
record["scan_id"] = scan_id
if existing:
cur.execute("""
UPDATE files SET
filename=:filename, extension=:extension,
file_size=:file_size, mime_type=:mime_type,
sha256=:sha256, exif_datetime=:exif_datetime,
exif_device=:exif_device, width=:width,
height=:height, scan_id=:scan_id,
status='pending', updated_at=CURRENT_TIMESTAMP
WHERE path=:path
""", record)
else:
cur.execute("""
INSERT OR IGNORE INTO files
(path, filename, extension, file_size, mime_type,
sha256, exif_datetime, exif_device, width,
height, scan_id, status)
VALUES
(:path, :filename, :extension, :file_size,
:mime_type, :sha256, :exif_datetime,
:exif_device, :width, :height, :scan_id,
'pending')
""", record)
with ThreadPoolExecutor(max_workers=N_WORKERS) as pool:
pending: dict = {} # future → (path, existing)
def _drain(limit: int = 50):
"""Collect up to `limit` completed futures and write to DB."""
done_futures = [f for f in list(pending) if f.done()][:limit]
for f in done_futures:
path, existing = pending.pop(f)
_write_result(path, f.result(), existing)
with _lock:
_done[0] += 1
d = _done[0]
disc = _discovered[0]
walking = not _walk_done[0]
scan_state["progress"] = d
scan_state["total"] = disc
scan_state["message"] = (
f"{'Discovering & i' if walking else 'I'}ndexing "
f"({N_WORKERS}w): {d:,}"
+ (f" / {disc:,}" if not walking else f"{disc:,} found so far")
)
if done_futures and _done[0] % 200 == 0:
con.commit()
# ── Walk + submit ─────────────────────────────────────────────
for root, dirs, files in os.walk(folder_path):
dirs[:] = [d for d in dirs if not d.startswith(".")]
if scan_state["pause_requested"]:
_pause_at_end = True
break # stop walking; in-flight futures drain normally
for fname in files:
if fname.endswith(".json"):
continue
ext = Path(fname).suffix.lower()
if ext not in SUPPORTED_EXT:
continue
path = os.path.join(root, fname)
all_files.append(path)
with _lock:
_discovered[0] += 1
existing = existing_db.get(path)
try:
current_size = os.path.getsize(path)
except OSError:
continue
# Skip unchanged files
if existing and mode in ("incremental", "new_files"):
if mode == "new_files" or existing["file_size"] == current_size:
to_skip.append(path)
with _lock:
_done[0] += 1
continue
changed_ids.append(existing["id"])
# Submit to thread pool immediately
future = pool.submit(_index_file, path)
pending[future] = (path, existing)
# Drain completed results regularly to avoid memory buildup
if len(pending) >= N_WORKERS * 4:
_drain(N_WORKERS * 2)
# Drain after each directory
_drain(20)
_walk_done[0] = True
# ── Bulk-stamp skipped files ──────────────────────────────────
for chunk_start in range(0, len(to_skip), 500):
chunk = to_skip[chunk_start : chunk_start + 500]
cur.executemany(
"UPDATE files SET scan_id = ? WHERE path = ?",
[(scan_id, p) for p in chunk],
)
for fid in changed_ids:
cur.execute(
"DELETE FROM duplicate_members WHERE file_id = ?", (fid,)
)
con.commit()
# ── Wait for remaining futures ────────────────────────────────
scan_state["total"] = len(all_files)
for future in as_completed(pending):
path, existing = pending[future]
_write_result(path, future.result(), existing)
with _lock:
_done[0] += 1
d = _done[0]
scan_state["progress"] = d
scan_state["message"] = (
f"Indexing ({N_WORKERS}w): {d:,} / {len(all_files):,}"
)
if d % 200 == 0:
con.commit()
con.commit()
# ── Pause checkpoint: after indexing ──────────────────────────────
scan_state["files_indexed"] = _done[0]
if _pause_at_end:
_save_pause_state(cur, scan_id, "indexing", _done[0], 0)
con.commit()
scan_state.update(
status="paused", pause_requested=False,
message=f"Paused — {_done[0]:,} files indexed",
)
return
# ── Phase: phash ──────────────────────────────────────────────────
phasher = get_phasher()
hw_label = "GPU" if phasher.using_gpu else "CPU"
scan_state.update(phase="phash", progress=0,
message=f"Computing perceptual hashes ({hw_label})...")
cur.execute("""
SELECT id, path FROM files
WHERE extension IN (
'.jpg','.jpeg','.png','.gif','.bmp','.tiff','.tif',
'.webp','.heic','.heif','.raw','.cr2','.nef','.arw',
'.dng','.orf','.rw2','.pef','.srw','.x3f'
) AND phash IS NULL AND status != 'error'
""")
photo_rows = cur.fetchall()
scan_state["total"] = len(photo_rows)
if photo_rows:
path_to_id = {row["path"]: row["id"] for row in photo_rows}
all_paths = list(path_to_id.keys())
# Process in chunks so pause requests are honoured between batches
PHASH_CHUNK = 500
phashes_written = 0
for chunk_start in range(0, len(all_paths), PHASH_CHUNK):
if scan_state["pause_requested"]:
_save_pause_state(
cur, scan_id, "phash",
scan_state["files_indexed"], phashes_written,
)
con.commit()
scan_state.update(
status="paused", pause_requested=False,
phashes_done=phashes_written,
message=(
f"Paused — {phashes_written:,} / {len(all_paths):,} "
"perceptual hashes computed"
),
)
return
chunk = all_paths[chunk_start : chunk_start + PHASH_CHUNK]
chunk_results = phasher.hash_files(chunk, progress_cb=None)
for path, ph in chunk_results.items():
fid = path_to_id.get(path)
if fid and ph:
cur.execute(
"UPDATE files SET phash=? WHERE id=?", (ph, fid)
)
con.commit()
phashes_written += len(chunk)
scan_state["phashes_done"] = phashes_written
scan_state["progress"] = phashes_written
scan_state["message"] = (
f"Phash ({hw_label}): {phashes_written:,} / {len(all_paths):,}"
)
con.commit()
# ── Phase: grouping ───────────────────────────────────────────────
scan_state.update(phase="grouping", progress=0, total=4,
message="Running duplicate detection...")
if mode in ("incremental", "full_reset", "regroup"):
cur.execute("DELETE FROM duplicate_members")
cur.execute("DELETE FROM duplicate_groups")
con.commit()
elif mode == "new_files":
# Only clear groups containing new files
cur.execute("""
DELETE FROM duplicate_groups WHERE id IN (
SELECT DISTINCT dm.group_id FROM duplicate_members dm
JOIN files f ON f.id = dm.file_id
WHERE f.scan_id = ?
)
""", (scan_id,))
con.commit()
scan_state["message"] = "Pass 1/4: SHA-256 exact duplicates..."
_run_sha256_pass(con, scan_id)
scan_state["progress"] = 1
con.commit()
scan_state["message"] = "Pass 2/4: Perceptual hash similarity..."
_run_phash_pass(con, scan_id)
scan_state["progress"] = 2
con.commit()
scan_state["message"] = "Pass 3/4: EXIF timestamp + device..."
_run_exif_pass(con, scan_id)
scan_state["progress"] = 3
con.commit()
scan_state["message"] = "Pass 4/4: File size + dimensions..."
_run_filesize_pass(con, scan_id)
scan_state["progress"] = 4
con.commit()
# ── Restore keeper statuses for mode=incremental ──────────────────
if mode == "incremental":
# If a previously marked keeper no longer appears in any group, reset to pending
cur.execute("""
UPDATE files SET status='pending'
WHERE status='keeper'
AND id NOT IN (
SELECT file_id FROM duplicate_members WHERE is_keeper=1
)
""")
con.commit()
# Update scan record
cur.execute(
"UPDATE scans SET completed_at=CURRENT_TIMESTAMP, total_files=?, status='complete' "
"WHERE id=?",
(len(all_files), scan_id),
)
con.commit()
scan_state.update(status="complete", phase="done",
message="Scan complete.", progress=scan_state["total"])
_update_stats()
except Exception as e:
scan_state.update(status="error", message=str(e))
try:
_mark_scan(cur, scan_id, "error")
con.commit()
except Exception:
pass
finally:
con.close()
def _mark_scan(cur, scan_id: int, status: str):
cur.execute(
"UPDATE scans SET completed_at=CURRENT_TIMESTAMP, status=? WHERE id=?",
(status, scan_id),
)
def _update_stats():
"""Refresh stats in scan_state."""
try:
con = get_db()
cur = con.cursor()
cur.execute("SELECT COUNT(*) FROM files WHERE status != 'error'")
total_files = cur.fetchone()[0]
cur.execute("SELECT COUNT(*), SUM(file_size) FROM files WHERE status='redundant'")
r = cur.fetchone()
dup_count = r[0] or 0
dup_size = r[1] or 0
for method in ("sha256", "phash", "exif", "filesize"):
cur.execute(
"SELECT COUNT(*) FROM duplicate_groups WHERE method=?", (method,)
)
cur.execute("""
SELECT method,
COUNT(*) as groups,
(SELECT COUNT(*) FROM duplicate_members dm2
JOIN duplicate_groups dg2 ON dg2.id=dm2.group_id
WHERE dg2.method=dg.method) as files
FROM duplicate_groups dg
GROUP BY method
""")
by_method = {r["method"]: {"groups": r["groups"], "files": r["files"]}
for r in cur.fetchall()}
cur.execute("SELECT COUNT(*) FROM duplicate_groups WHERE reviewed=1")
reviewed = cur.fetchone()[0]
cur.execute("SELECT COUNT(*) FROM duplicate_groups WHERE reviewed=0")
pending = cur.fetchone()[0]
scan_state["stats"] = {
"total_files": total_files,
"duplicate_files": dup_count,
"duplicate_size_bytes": dup_size,
"groups_by_method": by_method,
"reviewed": reviewed,
"pending": pending,
}
con.close()
except Exception:
pass