diff --git a/app/scanner.py b/app/scanner.py index 12ebe00..30e3244 100644 --- a/app/scanner.py +++ b/app/scanner.py @@ -7,6 +7,8 @@ import mimetypes import os import sqlite3 import subprocess +import threading +from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime from pathlib import Path @@ -533,81 +535,138 @@ def run_scan(folder_path: str, scan_id: int, mode: str = "incremental"): return # ── Phase: indexing ─────────────────────────────────────────────── - scan_state.update(phase="indexing", progress=0, - message="Indexing files (SHA-256 + EXIF + dimensions)...") + # I/O-bound — use a thread pool so SHA-256 reads run in parallel. + # Workers: 2× CPU count, capped at 16 (good for NAS/SSD; HDDs may + # benefit from tuning down via DUPFINDER_WORKERS env var). + N_WORKERS = int(os.environ.get( + "DUPFINDER_WORKERS", + min(max((os.cpu_count() or 4) * 2, 4), 16) + )) + scan_state.update( + phase="indexing", progress=0, + message=f"Indexing files — {N_WORKERS} parallel workers..." + ) - for i, path in enumerate(all_files): - if scan_state["cancel_requested"]: - _mark_scan(cur, scan_id, "cancelled") - con.commit() - scan_state["status"] = "cancelled" - return + # Pre-load all existing DB records in one query (avoids N per-file queries) + cur.execute("SELECT path, id, file_size FROM files") + existing_db: dict[str, dict] = { + row["path"]: {"id": row["id"], "file_size": row["file_size"]} + for row in cur.fetchall() + } - scan_state["progress"] = i + 1 - scan_state["message"] = f"Indexing: {Path(path).name}" - - # Check existing record - cur.execute("SELECT id, file_size, updated_at FROM files WHERE path = ?", (path,)) - existing = cur.fetchone() + # Split files into "skip" (unchanged) and "process" (new or changed) + to_process: list[str] = [] + to_skip: list[str] = [] + changed_ids: list[int] = [] # file IDs whose group memberships must be cleared + for path in all_files: + existing = existing_db.get(path) try: current_size = os.path.getsize(path) except OSError: continue if existing and mode in ("incremental", "new_files"): - if mode == "new_files": - # Skip entirely — don't re-hash existing files - cur.execute("UPDATE files SET scan_id = ? WHERE path = ?", (scan_id, path)) + if mode == "new_files" or existing["file_size"] == current_size: + to_skip.append(path) continue - # Incremental: skip if size unchanged (use size as proxy for change) - if existing["file_size"] == current_size: - cur.execute("UPDATE files SET scan_id = ? WHERE path = ?", (scan_id, path)) - continue - # File changed — re-hash, clear group memberships - cur.execute( - "DELETE FROM duplicate_members WHERE file_id = ?", (existing["id"],) - ) + # File changed — clear stale group memberships + changed_ids.append(existing["id"]) + to_process.append(path) + + # Bulk-stamp skipped files with current scan_id + for chunk_start in range(0, len(to_skip), 500): + chunk = to_skip[chunk_start : chunk_start + 500] + cur.executemany( + "UPDATE files SET scan_id = ? WHERE path = ?", + [(scan_id, p) for p in chunk], + ) + # Clear group memberships for changed files + for fid in changed_ids: + cur.execute("DELETE FROM duplicate_members WHERE file_id = ?", (fid,)) + con.commit() + + scan_state["total"] = len(all_files) + scan_state["progress"] = len(to_skip) + + # Thread-safe progress counter + _lock = threading.Lock() + _done = [len(to_skip)] # mutable int via list + _db_queue: list[dict] = [] # records to write; drained on main thread + + def _index_file(path: str) -> dict | None: + """Worker: extract file metadata. Returns record dict or None on error.""" try: - record = extract_file(path) - except Exception as e: - cur.execute( - "INSERT OR IGNORE INTO files (path, filename, extension, scan_id, status) " - "VALUES (?, ?, ?, ?, 'error')", - (path, Path(path).name, Path(path).suffix.lower(), scan_id), - ) - cur.execute( - "UPDATE files SET status='error', scan_id=?, updated_at=CURRENT_TIMESTAMP " - "WHERE path=?", - (scan_id, path), - ) - con.commit() - continue + return extract_file(path) + except Exception: + return None - record["scan_id"] = scan_id - if existing: - cur.execute(""" - UPDATE files SET - filename=:filename, extension=:extension, file_size=:file_size, - mime_type=:mime_type, sha256=:sha256, - exif_datetime=:exif_datetime, exif_device=:exif_device, - width=:width, height=:height, scan_id=:scan_id, - status='pending', updated_at=CURRENT_TIMESTAMP - WHERE path=:path - """, record) - else: - cur.execute(""" - INSERT OR IGNORE INTO files - (path, filename, extension, file_size, mime_type, sha256, - exif_datetime, exif_device, width, height, scan_id, status) - VALUES - (:path, :filename, :extension, :file_size, :mime_type, :sha256, - :exif_datetime, :exif_device, :width, :height, :scan_id, 'pending') - """, record) + with ThreadPoolExecutor(max_workers=N_WORKERS) as pool: + futures = {pool.submit(_index_file, p): p for p in to_process} - if (i + 1) % 100 == 0: - con.commit() + for future in as_completed(futures): + if scan_state["cancel_requested"]: + pool.shutdown(wait=False, cancel_futures=True) + _mark_scan(cur, scan_id, "cancelled") + con.commit() + scan_state["status"] = "cancelled" + return + + path = futures[future] + record = future.result() + + with _lock: + _done[0] += 1 + done_now = _done[0] + + scan_state["progress"] = done_now + scan_state["message"] = ( + f"Indexing ({N_WORKERS}w): {done_now:,} / {len(all_files):,}" + ) + + if record is None: + cur.execute( + "INSERT OR IGNORE INTO files " + " (path, filename, extension, scan_id, status) " + "VALUES (?, ?, ?, ?, 'error')", + (path, Path(path).name, Path(path).suffix.lower(), scan_id), + ) + cur.execute( + "UPDATE files SET status='error', scan_id=?, " + " updated_at=CURRENT_TIMESTAMP WHERE path=?", + (scan_id, path), + ) + else: + record["scan_id"] = scan_id + existing = existing_db.get(path) + if existing: + cur.execute(""" + UPDATE files SET + filename=:filename, extension=:extension, + file_size=:file_size, mime_type=:mime_type, + sha256=:sha256, exif_datetime=:exif_datetime, + exif_device=:exif_device, width=:width, + height=:height, scan_id=:scan_id, + status='pending', updated_at=CURRENT_TIMESTAMP + WHERE path=:path + """, record) + else: + cur.execute(""" + INSERT OR IGNORE INTO files + (path, filename, extension, file_size, mime_type, + sha256, exif_datetime, exif_device, width, + height, scan_id, status) + VALUES + (:path, :filename, :extension, :file_size, + :mime_type, :sha256, :exif_datetime, + :exif_device, :width, :height, :scan_id, + 'pending') + """, record) + + # Commit every 200 completions to keep memory in check + if done_now % 200 == 0: + con.commit() con.commit()