diff --git a/app/scanner.py b/app/scanner.py index 30e3244..34b487b 100644 --- a/app/scanner.py +++ b/app/scanner.py @@ -482,35 +482,6 @@ def run_scan(folder_path: str, scan_id: int, mode: str = "incremental"): cur = con.cursor() try: - # ── Phase: discovery ────────────────────────────────────────────── - scan_state.update(phase="discovery", progress=0, total=0, - message="Discovering files...") - - all_files = [] - for root, dirs, files in os.walk(folder_path): - dirs[:] = [d for d in dirs if not d.startswith(".")] - for fname in files: - if fname.endswith(".json"): - continue - ext = Path(fname).suffix.lower() - if ext in SUPPORTED_EXT: - all_files.append(os.path.join(root, fname)) - # Live count update every 250 files so UI doesn't look frozen - if len(all_files) % 250 == 0: - scan_state["message"] = f"Discovering... {len(all_files):,} files found" - - if scan_state["cancel_requested"]: - break - - scan_state["total"] = len(all_files) - scan_state["message"] = f"Found {len(all_files):,} files" - - if scan_state["cancel_requested"]: - _mark_scan(cur, scan_id, "cancelled") - con.commit() - scan_state["status"] = "cancelled" - return - # ── Mode: full reset ────────────────────────────────────────────── if mode == "full_reset": cur.execute("DELETE FROM duplicate_members") @@ -518,10 +489,9 @@ def run_scan(folder_path: str, scan_id: int, mode: str = "incremental"): cur.execute("DELETE FROM files") con.commit() - # ── Phase: takeout pre-processing ───────────────────────────────── - # Detection samples ≤50 dirs so it never blocks on large libraries + # ── Phase: takeout check (quick sample, ≤50 dirs) ───────────────── scan_state.update(phase="takeout", - message="Checking for Google Takeout structure (sampling)...") + message="Checking for Google Takeout structure...") if is_takeout_folder(folder_path): scan_state["message"] = "Processing Google Takeout sidecars..." process_takeout(folder_path, DB_PATH) @@ -534,78 +504,110 @@ def run_scan(folder_path: str, scan_id: int, mode: str = "incremental"): scan_state["status"] = "cancelled" return - # ── Phase: indexing ─────────────────────────────────────────────── - # I/O-bound — use a thread pool so SHA-256 reads run in parallel. - # Workers: 2× CPU count, capped at 16 (good for NAS/SSD; HDDs may - # benefit from tuning down via DUPFINDER_WORKERS env var). + # ── Phases: discovery + indexing (pipelined) ────────────────────── + # Workers start hashing files the instant they are discovered — + # no waiting for the full directory walk to finish first. + # + # Workers: 2× CPU count, capped at 16. Tune via DUPFINDER_WORKERS. N_WORKERS = int(os.environ.get( "DUPFINDER_WORKERS", min(max((os.cpu_count() or 4) * 2, 4), 16) )) scan_state.update( - phase="indexing", progress=0, - message=f"Indexing files — {N_WORKERS} parallel workers..." + phase="indexing", progress=0, total=0, + message=f"Scanning — discovering & indexing in parallel ({N_WORKERS} workers)..." ) - # Pre-load all existing DB records in one query (avoids N per-file queries) + # Pre-load existing DB records once (avoids per-file queries) cur.execute("SELECT path, id, file_size FROM files") existing_db: dict[str, dict] = { row["path"]: {"id": row["id"], "file_size": row["file_size"]} for row in cur.fetchall() } - # Split files into "skip" (unchanged) and "process" (new or changed) - to_process: list[str] = [] - to_skip: list[str] = [] - changed_ids: list[int] = [] # file IDs whose group memberships must be cleared - - for path in all_files: - existing = existing_db.get(path) - try: - current_size = os.path.getsize(path) - except OSError: - continue - - if existing and mode in ("incremental", "new_files"): - if mode == "new_files" or existing["file_size"] == current_size: - to_skip.append(path) - continue - # File changed — clear stale group memberships - changed_ids.append(existing["id"]) - - to_process.append(path) - - # Bulk-stamp skipped files with current scan_id - for chunk_start in range(0, len(to_skip), 500): - chunk = to_skip[chunk_start : chunk_start + 500] - cur.executemany( - "UPDATE files SET scan_id = ? WHERE path = ?", - [(scan_id, p) for p in chunk], - ) - # Clear group memberships for changed files - for fid in changed_ids: - cur.execute("DELETE FROM duplicate_members WHERE file_id = ?", (fid,)) - con.commit() - - scan_state["total"] = len(all_files) - scan_state["progress"] = len(to_skip) - - # Thread-safe progress counter - _lock = threading.Lock() - _done = [len(to_skip)] # mutable int via list - _db_queue: list[dict] = [] # records to write; drained on main thread + # Shared counters (updated from multiple threads) + _lock = threading.Lock() + _discovered = [0] # total files found by walker so far + _done = [0] # files fully indexed (skipped + processed) + _walk_done = [False] + all_files: list[str] = [] + to_skip: list[str] = [] + changed_ids: list[int] = [] def _index_file(path: str) -> dict | None: - """Worker: extract file metadata. Returns record dict or None on error.""" try: return extract_file(path) except Exception: return None - with ThreadPoolExecutor(max_workers=N_WORKERS) as pool: - futures = {pool.submit(_index_file, p): p for p in to_process} + def _write_result(path: str, record: dict | None, existing: dict | None): + """Write one file result to DB. Called on main thread only.""" + if record is None: + cur.execute( + "INSERT OR IGNORE INTO files " + " (path, filename, extension, scan_id, status) " + "VALUES (?, ?, ?, ?, 'error')", + (path, Path(path).name, Path(path).suffix.lower(), scan_id), + ) + cur.execute( + "UPDATE files SET status='error', scan_id=?, " + " updated_at=CURRENT_TIMESTAMP WHERE path=?", + (scan_id, path), + ) + else: + record["scan_id"] = scan_id + if existing: + cur.execute(""" + UPDATE files SET + filename=:filename, extension=:extension, + file_size=:file_size, mime_type=:mime_type, + sha256=:sha256, exif_datetime=:exif_datetime, + exif_device=:exif_device, width=:width, + height=:height, scan_id=:scan_id, + status='pending', updated_at=CURRENT_TIMESTAMP + WHERE path=:path + """, record) + else: + cur.execute(""" + INSERT OR IGNORE INTO files + (path, filename, extension, file_size, mime_type, + sha256, exif_datetime, exif_device, width, + height, scan_id, status) + VALUES + (:path, :filename, :extension, :file_size, + :mime_type, :sha256, :exif_datetime, + :exif_device, :width, :height, :scan_id, + 'pending') + """, record) + + with ThreadPoolExecutor(max_workers=N_WORKERS) as pool: + pending: dict = {} # future → (path, existing) + + def _drain(limit: int = 50): + """Collect up to `limit` completed futures and write to DB.""" + done_futures = [f for f in list(pending) if f.done()][:limit] + for f in done_futures: + path, existing = pending.pop(f) + _write_result(path, f.result(), existing) + with _lock: + _done[0] += 1 + d = _done[0] + disc = _discovered[0] + walking = not _walk_done[0] + scan_state["progress"] = d + scan_state["total"] = disc + scan_state["message"] = ( + f"{'Discovering & i' if walking else 'I'}ndexing " + f"({N_WORKERS}w): {d:,}" + + (f" / {disc:,}" if not walking else f" — {disc:,} found so far") + ) + if done_futures and _done[0] % 200 == 0: + con.commit() + + # ── Walk + submit ───────────────────────────────────────────── + for root, dirs, files in os.walk(folder_path): + dirs[:] = [d for d in dirs if not d.startswith(".")] - for future in as_completed(futures): if scan_state["cancel_requested"]: pool.shutdown(wait=False, cancel_futures=True) _mark_scan(cur, scan_id, "cancelled") @@ -613,59 +615,78 @@ def run_scan(folder_path: str, scan_id: int, mode: str = "incremental"): scan_state["status"] = "cancelled" return - path = futures[future] - record = future.result() + for fname in files: + if fname.endswith(".json"): + continue + ext = Path(fname).suffix.lower() + if ext not in SUPPORTED_EXT: + continue + path = os.path.join(root, fname) + all_files.append(path) + with _lock: + _discovered[0] += 1 + + existing = existing_db.get(path) + try: + current_size = os.path.getsize(path) + except OSError: + continue + + # Skip unchanged files + if existing and mode in ("incremental", "new_files"): + if mode == "new_files" or existing["file_size"] == current_size: + to_skip.append(path) + with _lock: + _done[0] += 1 + continue + changed_ids.append(existing["id"]) + + # Submit to thread pool immediately + future = pool.submit(_index_file, path) + pending[future] = (path, existing) + + # Drain completed results regularly to avoid memory buildup + if len(pending) >= N_WORKERS * 4: + _drain(N_WORKERS * 2) + + # Drain after each directory + _drain(20) + + _walk_done[0] = True + + # ── Bulk-stamp skipped files ────────────────────────────────── + for chunk_start in range(0, len(to_skip), 500): + chunk = to_skip[chunk_start : chunk_start + 500] + cur.executemany( + "UPDATE files SET scan_id = ? WHERE path = ?", + [(scan_id, p) for p in chunk], + ) + for fid in changed_ids: + cur.execute( + "DELETE FROM duplicate_members WHERE file_id = ?", (fid,) + ) + con.commit() + + # ── Wait for remaining futures ──────────────────────────────── + scan_state["total"] = len(all_files) + for future in as_completed(pending): + if scan_state["cancel_requested"]: + pool.shutdown(wait=False, cancel_futures=True) + _mark_scan(cur, scan_id, "cancelled") + con.commit() + scan_state["status"] = "cancelled" + return + path, existing = pending[future] + _write_result(path, future.result(), existing) with _lock: _done[0] += 1 - done_now = _done[0] - - scan_state["progress"] = done_now + d = _done[0] + scan_state["progress"] = d scan_state["message"] = ( - f"Indexing ({N_WORKERS}w): {done_now:,} / {len(all_files):,}" + f"Indexing ({N_WORKERS}w): {d:,} / {len(all_files):,}" ) - - if record is None: - cur.execute( - "INSERT OR IGNORE INTO files " - " (path, filename, extension, scan_id, status) " - "VALUES (?, ?, ?, ?, 'error')", - (path, Path(path).name, Path(path).suffix.lower(), scan_id), - ) - cur.execute( - "UPDATE files SET status='error', scan_id=?, " - " updated_at=CURRENT_TIMESTAMP WHERE path=?", - (scan_id, path), - ) - else: - record["scan_id"] = scan_id - existing = existing_db.get(path) - if existing: - cur.execute(""" - UPDATE files SET - filename=:filename, extension=:extension, - file_size=:file_size, mime_type=:mime_type, - sha256=:sha256, exif_datetime=:exif_datetime, - exif_device=:exif_device, width=:width, - height=:height, scan_id=:scan_id, - status='pending', updated_at=CURRENT_TIMESTAMP - WHERE path=:path - """, record) - else: - cur.execute(""" - INSERT OR IGNORE INTO files - (path, filename, extension, file_size, mime_type, - sha256, exif_datetime, exif_device, width, - height, scan_id, status) - VALUES - (:path, :filename, :extension, :file_size, - :mime_type, :sha256, :exif_datetime, - :exif_device, :width, :height, :scan_id, - 'pending') - """, record) - - # Commit every 200 completions to keep memory in check - if done_now % 200 == 0: + if d % 200 == 0: con.commit() con.commit() diff --git a/templates/index.html b/templates/index.html index 3204388..894eb99 100644 --- a/templates/index.html +++ b/templates/index.html @@ -759,9 +759,8 @@
- Discovery Takeout - Indexing + Discover + Index Phash Grouping
@@ -1025,7 +1024,7 @@ async function refreshStats() { // ── Scan polling ────────────────────────────────────────────────────────────── let scanPoller = null; -const PHASES = ['discovery','takeout','indexing','phash','grouping']; +const PHASES = ['takeout','indexing','phash','grouping']; function startPoller() { if (scanPoller) return; @@ -1065,7 +1064,7 @@ function updateScanUI(s) { if (isRunning) { el('progress-msg').textContent = s.message || ''; - const indeterminate = s.phase === 'discovery' || s.phase === 'takeout' || s.total === 0; + const indeterminate = s.phase === 'takeout' || s.total === 0; const fill = el('progress-fill'); fill.classList.toggle('indeterminate', indeterminate); if (!indeterminate) {