Pipeline discovery and indexing — workers start immediately
Instead of walk-everything-first then index, workers now receive files the instant os.walk yields them. The thread pool is open before the walk starts; each discovered file is submitted immediately. Completed futures are drained after each directory to keep memory flat. Progress message shows: "Discovering & indexing (8w): 1,234 — 5,678 found so far" then once walk finishes: "Indexing (8w): 8,000 / 9,100" UI: merged Discovery + Indexing into a single "Discover + Index" phase pill. Indeterminate progress bar stays on until total file count is known. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
283
app/scanner.py
283
app/scanner.py
@@ -482,35 +482,6 @@ def run_scan(folder_path: str, scan_id: int, mode: str = "incremental"):
|
|||||||
cur = con.cursor()
|
cur = con.cursor()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# ── Phase: discovery ──────────────────────────────────────────────
|
|
||||||
scan_state.update(phase="discovery", progress=0, total=0,
|
|
||||||
message="Discovering files...")
|
|
||||||
|
|
||||||
all_files = []
|
|
||||||
for root, dirs, files in os.walk(folder_path):
|
|
||||||
dirs[:] = [d for d in dirs if not d.startswith(".")]
|
|
||||||
for fname in files:
|
|
||||||
if fname.endswith(".json"):
|
|
||||||
continue
|
|
||||||
ext = Path(fname).suffix.lower()
|
|
||||||
if ext in SUPPORTED_EXT:
|
|
||||||
all_files.append(os.path.join(root, fname))
|
|
||||||
# Live count update every 250 files so UI doesn't look frozen
|
|
||||||
if len(all_files) % 250 == 0:
|
|
||||||
scan_state["message"] = f"Discovering... {len(all_files):,} files found"
|
|
||||||
|
|
||||||
if scan_state["cancel_requested"]:
|
|
||||||
break
|
|
||||||
|
|
||||||
scan_state["total"] = len(all_files)
|
|
||||||
scan_state["message"] = f"Found {len(all_files):,} files"
|
|
||||||
|
|
||||||
if scan_state["cancel_requested"]:
|
|
||||||
_mark_scan(cur, scan_id, "cancelled")
|
|
||||||
con.commit()
|
|
||||||
scan_state["status"] = "cancelled"
|
|
||||||
return
|
|
||||||
|
|
||||||
# ── Mode: full reset ──────────────────────────────────────────────
|
# ── Mode: full reset ──────────────────────────────────────────────
|
||||||
if mode == "full_reset":
|
if mode == "full_reset":
|
||||||
cur.execute("DELETE FROM duplicate_members")
|
cur.execute("DELETE FROM duplicate_members")
|
||||||
@@ -518,10 +489,9 @@ def run_scan(folder_path: str, scan_id: int, mode: str = "incremental"):
|
|||||||
cur.execute("DELETE FROM files")
|
cur.execute("DELETE FROM files")
|
||||||
con.commit()
|
con.commit()
|
||||||
|
|
||||||
# ── Phase: takeout pre-processing ─────────────────────────────────
|
# ── Phase: takeout check (quick sample, ≤50 dirs) ─────────────────
|
||||||
# Detection samples ≤50 dirs so it never blocks on large libraries
|
|
||||||
scan_state.update(phase="takeout",
|
scan_state.update(phase="takeout",
|
||||||
message="Checking for Google Takeout structure (sampling)...")
|
message="Checking for Google Takeout structure...")
|
||||||
if is_takeout_folder(folder_path):
|
if is_takeout_folder(folder_path):
|
||||||
scan_state["message"] = "Processing Google Takeout sidecars..."
|
scan_state["message"] = "Processing Google Takeout sidecars..."
|
||||||
process_takeout(folder_path, DB_PATH)
|
process_takeout(folder_path, DB_PATH)
|
||||||
@@ -534,78 +504,110 @@ def run_scan(folder_path: str, scan_id: int, mode: str = "incremental"):
|
|||||||
scan_state["status"] = "cancelled"
|
scan_state["status"] = "cancelled"
|
||||||
return
|
return
|
||||||
|
|
||||||
# ── Phase: indexing ───────────────────────────────────────────────
|
# ── Phases: discovery + indexing (pipelined) ──────────────────────
|
||||||
# I/O-bound — use a thread pool so SHA-256 reads run in parallel.
|
# Workers start hashing files the instant they are discovered —
|
||||||
# Workers: 2× CPU count, capped at 16 (good for NAS/SSD; HDDs may
|
# no waiting for the full directory walk to finish first.
|
||||||
# benefit from tuning down via DUPFINDER_WORKERS env var).
|
#
|
||||||
|
# Workers: 2× CPU count, capped at 16. Tune via DUPFINDER_WORKERS.
|
||||||
N_WORKERS = int(os.environ.get(
|
N_WORKERS = int(os.environ.get(
|
||||||
"DUPFINDER_WORKERS",
|
"DUPFINDER_WORKERS",
|
||||||
min(max((os.cpu_count() or 4) * 2, 4), 16)
|
min(max((os.cpu_count() or 4) * 2, 4), 16)
|
||||||
))
|
))
|
||||||
scan_state.update(
|
scan_state.update(
|
||||||
phase="indexing", progress=0,
|
phase="indexing", progress=0, total=0,
|
||||||
message=f"Indexing files — {N_WORKERS} parallel workers..."
|
message=f"Scanning — discovering & indexing in parallel ({N_WORKERS} workers)..."
|
||||||
)
|
)
|
||||||
|
|
||||||
# Pre-load all existing DB records in one query (avoids N per-file queries)
|
# Pre-load existing DB records once (avoids per-file queries)
|
||||||
cur.execute("SELECT path, id, file_size FROM files")
|
cur.execute("SELECT path, id, file_size FROM files")
|
||||||
existing_db: dict[str, dict] = {
|
existing_db: dict[str, dict] = {
|
||||||
row["path"]: {"id": row["id"], "file_size": row["file_size"]}
|
row["path"]: {"id": row["id"], "file_size": row["file_size"]}
|
||||||
for row in cur.fetchall()
|
for row in cur.fetchall()
|
||||||
}
|
}
|
||||||
|
|
||||||
# Split files into "skip" (unchanged) and "process" (new or changed)
|
# Shared counters (updated from multiple threads)
|
||||||
to_process: list[str] = []
|
_lock = threading.Lock()
|
||||||
to_skip: list[str] = []
|
_discovered = [0] # total files found by walker so far
|
||||||
changed_ids: list[int] = [] # file IDs whose group memberships must be cleared
|
_done = [0] # files fully indexed (skipped + processed)
|
||||||
|
_walk_done = [False]
|
||||||
for path in all_files:
|
all_files: list[str] = []
|
||||||
existing = existing_db.get(path)
|
to_skip: list[str] = []
|
||||||
try:
|
changed_ids: list[int] = []
|
||||||
current_size = os.path.getsize(path)
|
|
||||||
except OSError:
|
|
||||||
continue
|
|
||||||
|
|
||||||
if existing and mode in ("incremental", "new_files"):
|
|
||||||
if mode == "new_files" or existing["file_size"] == current_size:
|
|
||||||
to_skip.append(path)
|
|
||||||
continue
|
|
||||||
# File changed — clear stale group memberships
|
|
||||||
changed_ids.append(existing["id"])
|
|
||||||
|
|
||||||
to_process.append(path)
|
|
||||||
|
|
||||||
# Bulk-stamp skipped files with current scan_id
|
|
||||||
for chunk_start in range(0, len(to_skip), 500):
|
|
||||||
chunk = to_skip[chunk_start : chunk_start + 500]
|
|
||||||
cur.executemany(
|
|
||||||
"UPDATE files SET scan_id = ? WHERE path = ?",
|
|
||||||
[(scan_id, p) for p in chunk],
|
|
||||||
)
|
|
||||||
# Clear group memberships for changed files
|
|
||||||
for fid in changed_ids:
|
|
||||||
cur.execute("DELETE FROM duplicate_members WHERE file_id = ?", (fid,))
|
|
||||||
con.commit()
|
|
||||||
|
|
||||||
scan_state["total"] = len(all_files)
|
|
||||||
scan_state["progress"] = len(to_skip)
|
|
||||||
|
|
||||||
# Thread-safe progress counter
|
|
||||||
_lock = threading.Lock()
|
|
||||||
_done = [len(to_skip)] # mutable int via list
|
|
||||||
_db_queue: list[dict] = [] # records to write; drained on main thread
|
|
||||||
|
|
||||||
def _index_file(path: str) -> dict | None:
|
def _index_file(path: str) -> dict | None:
|
||||||
"""Worker: extract file metadata. Returns record dict or None on error."""
|
|
||||||
try:
|
try:
|
||||||
return extract_file(path)
|
return extract_file(path)
|
||||||
except Exception:
|
except Exception:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
with ThreadPoolExecutor(max_workers=N_WORKERS) as pool:
|
def _write_result(path: str, record: dict | None, existing: dict | None):
|
||||||
futures = {pool.submit(_index_file, p): p for p in to_process}
|
"""Write one file result to DB. Called on main thread only."""
|
||||||
|
if record is None:
|
||||||
|
cur.execute(
|
||||||
|
"INSERT OR IGNORE INTO files "
|
||||||
|
" (path, filename, extension, scan_id, status) "
|
||||||
|
"VALUES (?, ?, ?, ?, 'error')",
|
||||||
|
(path, Path(path).name, Path(path).suffix.lower(), scan_id),
|
||||||
|
)
|
||||||
|
cur.execute(
|
||||||
|
"UPDATE files SET status='error', scan_id=?, "
|
||||||
|
" updated_at=CURRENT_TIMESTAMP WHERE path=?",
|
||||||
|
(scan_id, path),
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
record["scan_id"] = scan_id
|
||||||
|
if existing:
|
||||||
|
cur.execute("""
|
||||||
|
UPDATE files SET
|
||||||
|
filename=:filename, extension=:extension,
|
||||||
|
file_size=:file_size, mime_type=:mime_type,
|
||||||
|
sha256=:sha256, exif_datetime=:exif_datetime,
|
||||||
|
exif_device=:exif_device, width=:width,
|
||||||
|
height=:height, scan_id=:scan_id,
|
||||||
|
status='pending', updated_at=CURRENT_TIMESTAMP
|
||||||
|
WHERE path=:path
|
||||||
|
""", record)
|
||||||
|
else:
|
||||||
|
cur.execute("""
|
||||||
|
INSERT OR IGNORE INTO files
|
||||||
|
(path, filename, extension, file_size, mime_type,
|
||||||
|
sha256, exif_datetime, exif_device, width,
|
||||||
|
height, scan_id, status)
|
||||||
|
VALUES
|
||||||
|
(:path, :filename, :extension, :file_size,
|
||||||
|
:mime_type, :sha256, :exif_datetime,
|
||||||
|
:exif_device, :width, :height, :scan_id,
|
||||||
|
'pending')
|
||||||
|
""", record)
|
||||||
|
|
||||||
|
with ThreadPoolExecutor(max_workers=N_WORKERS) as pool:
|
||||||
|
pending: dict = {} # future → (path, existing)
|
||||||
|
|
||||||
|
def _drain(limit: int = 50):
|
||||||
|
"""Collect up to `limit` completed futures and write to DB."""
|
||||||
|
done_futures = [f for f in list(pending) if f.done()][:limit]
|
||||||
|
for f in done_futures:
|
||||||
|
path, existing = pending.pop(f)
|
||||||
|
_write_result(path, f.result(), existing)
|
||||||
|
with _lock:
|
||||||
|
_done[0] += 1
|
||||||
|
d = _done[0]
|
||||||
|
disc = _discovered[0]
|
||||||
|
walking = not _walk_done[0]
|
||||||
|
scan_state["progress"] = d
|
||||||
|
scan_state["total"] = disc
|
||||||
|
scan_state["message"] = (
|
||||||
|
f"{'Discovering & i' if walking else 'I'}ndexing "
|
||||||
|
f"({N_WORKERS}w): {d:,}"
|
||||||
|
+ (f" / {disc:,}" if not walking else f" — {disc:,} found so far")
|
||||||
|
)
|
||||||
|
if done_futures and _done[0] % 200 == 0:
|
||||||
|
con.commit()
|
||||||
|
|
||||||
|
# ── Walk + submit ─────────────────────────────────────────────
|
||||||
|
for root, dirs, files in os.walk(folder_path):
|
||||||
|
dirs[:] = [d for d in dirs if not d.startswith(".")]
|
||||||
|
|
||||||
for future in as_completed(futures):
|
|
||||||
if scan_state["cancel_requested"]:
|
if scan_state["cancel_requested"]:
|
||||||
pool.shutdown(wait=False, cancel_futures=True)
|
pool.shutdown(wait=False, cancel_futures=True)
|
||||||
_mark_scan(cur, scan_id, "cancelled")
|
_mark_scan(cur, scan_id, "cancelled")
|
||||||
@@ -613,59 +615,78 @@ def run_scan(folder_path: str, scan_id: int, mode: str = "incremental"):
|
|||||||
scan_state["status"] = "cancelled"
|
scan_state["status"] = "cancelled"
|
||||||
return
|
return
|
||||||
|
|
||||||
path = futures[future]
|
for fname in files:
|
||||||
record = future.result()
|
if fname.endswith(".json"):
|
||||||
|
continue
|
||||||
|
ext = Path(fname).suffix.lower()
|
||||||
|
if ext not in SUPPORTED_EXT:
|
||||||
|
continue
|
||||||
|
|
||||||
|
path = os.path.join(root, fname)
|
||||||
|
all_files.append(path)
|
||||||
|
with _lock:
|
||||||
|
_discovered[0] += 1
|
||||||
|
|
||||||
|
existing = existing_db.get(path)
|
||||||
|
try:
|
||||||
|
current_size = os.path.getsize(path)
|
||||||
|
except OSError:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Skip unchanged files
|
||||||
|
if existing and mode in ("incremental", "new_files"):
|
||||||
|
if mode == "new_files" or existing["file_size"] == current_size:
|
||||||
|
to_skip.append(path)
|
||||||
|
with _lock:
|
||||||
|
_done[0] += 1
|
||||||
|
continue
|
||||||
|
changed_ids.append(existing["id"])
|
||||||
|
|
||||||
|
# Submit to thread pool immediately
|
||||||
|
future = pool.submit(_index_file, path)
|
||||||
|
pending[future] = (path, existing)
|
||||||
|
|
||||||
|
# Drain completed results regularly to avoid memory buildup
|
||||||
|
if len(pending) >= N_WORKERS * 4:
|
||||||
|
_drain(N_WORKERS * 2)
|
||||||
|
|
||||||
|
# Drain after each directory
|
||||||
|
_drain(20)
|
||||||
|
|
||||||
|
_walk_done[0] = True
|
||||||
|
|
||||||
|
# ── Bulk-stamp skipped files ──────────────────────────────────
|
||||||
|
for chunk_start in range(0, len(to_skip), 500):
|
||||||
|
chunk = to_skip[chunk_start : chunk_start + 500]
|
||||||
|
cur.executemany(
|
||||||
|
"UPDATE files SET scan_id = ? WHERE path = ?",
|
||||||
|
[(scan_id, p) for p in chunk],
|
||||||
|
)
|
||||||
|
for fid in changed_ids:
|
||||||
|
cur.execute(
|
||||||
|
"DELETE FROM duplicate_members WHERE file_id = ?", (fid,)
|
||||||
|
)
|
||||||
|
con.commit()
|
||||||
|
|
||||||
|
# ── Wait for remaining futures ────────────────────────────────
|
||||||
|
scan_state["total"] = len(all_files)
|
||||||
|
for future in as_completed(pending):
|
||||||
|
if scan_state["cancel_requested"]:
|
||||||
|
pool.shutdown(wait=False, cancel_futures=True)
|
||||||
|
_mark_scan(cur, scan_id, "cancelled")
|
||||||
|
con.commit()
|
||||||
|
scan_state["status"] = "cancelled"
|
||||||
|
return
|
||||||
|
path, existing = pending[future]
|
||||||
|
_write_result(path, future.result(), existing)
|
||||||
with _lock:
|
with _lock:
|
||||||
_done[0] += 1
|
_done[0] += 1
|
||||||
done_now = _done[0]
|
d = _done[0]
|
||||||
|
scan_state["progress"] = d
|
||||||
scan_state["progress"] = done_now
|
|
||||||
scan_state["message"] = (
|
scan_state["message"] = (
|
||||||
f"Indexing ({N_WORKERS}w): {done_now:,} / {len(all_files):,}"
|
f"Indexing ({N_WORKERS}w): {d:,} / {len(all_files):,}"
|
||||||
)
|
)
|
||||||
|
if d % 200 == 0:
|
||||||
if record is None:
|
|
||||||
cur.execute(
|
|
||||||
"INSERT OR IGNORE INTO files "
|
|
||||||
" (path, filename, extension, scan_id, status) "
|
|
||||||
"VALUES (?, ?, ?, ?, 'error')",
|
|
||||||
(path, Path(path).name, Path(path).suffix.lower(), scan_id),
|
|
||||||
)
|
|
||||||
cur.execute(
|
|
||||||
"UPDATE files SET status='error', scan_id=?, "
|
|
||||||
" updated_at=CURRENT_TIMESTAMP WHERE path=?",
|
|
||||||
(scan_id, path),
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
record["scan_id"] = scan_id
|
|
||||||
existing = existing_db.get(path)
|
|
||||||
if existing:
|
|
||||||
cur.execute("""
|
|
||||||
UPDATE files SET
|
|
||||||
filename=:filename, extension=:extension,
|
|
||||||
file_size=:file_size, mime_type=:mime_type,
|
|
||||||
sha256=:sha256, exif_datetime=:exif_datetime,
|
|
||||||
exif_device=:exif_device, width=:width,
|
|
||||||
height=:height, scan_id=:scan_id,
|
|
||||||
status='pending', updated_at=CURRENT_TIMESTAMP
|
|
||||||
WHERE path=:path
|
|
||||||
""", record)
|
|
||||||
else:
|
|
||||||
cur.execute("""
|
|
||||||
INSERT OR IGNORE INTO files
|
|
||||||
(path, filename, extension, file_size, mime_type,
|
|
||||||
sha256, exif_datetime, exif_device, width,
|
|
||||||
height, scan_id, status)
|
|
||||||
VALUES
|
|
||||||
(:path, :filename, :extension, :file_size,
|
|
||||||
:mime_type, :sha256, :exif_datetime,
|
|
||||||
:exif_device, :width, :height, :scan_id,
|
|
||||||
'pending')
|
|
||||||
""", record)
|
|
||||||
|
|
||||||
# Commit every 200 completions to keep memory in check
|
|
||||||
if done_now % 200 == 0:
|
|
||||||
con.commit()
|
con.commit()
|
||||||
|
|
||||||
con.commit()
|
con.commit()
|
||||||
|
|||||||
@@ -759,9 +759,8 @@
|
|||||||
<div class="progress-bar-fill" id="progress-fill" style="width:0%"></div>
|
<div class="progress-bar-fill" id="progress-fill" style="width:0%"></div>
|
||||||
</div>
|
</div>
|
||||||
<div class="phase-pills">
|
<div class="phase-pills">
|
||||||
<span class="phase-pill" data-phase="discovery">Discovery</span>
|
|
||||||
<span class="phase-pill" data-phase="takeout">Takeout</span>
|
<span class="phase-pill" data-phase="takeout">Takeout</span>
|
||||||
<span class="phase-pill" data-phase="indexing">Indexing</span>
|
<span class="phase-pill" data-phase="indexing">Discover + Index</span>
|
||||||
<span class="phase-pill" data-phase="phash">Phash</span>
|
<span class="phase-pill" data-phase="phash">Phash</span>
|
||||||
<span class="phase-pill" data-phase="grouping">Grouping</span>
|
<span class="phase-pill" data-phase="grouping">Grouping</span>
|
||||||
</div>
|
</div>
|
||||||
@@ -1025,7 +1024,7 @@ async function refreshStats() {
|
|||||||
|
|
||||||
// ── Scan polling ──────────────────────────────────────────────────────────────
|
// ── Scan polling ──────────────────────────────────────────────────────────────
|
||||||
let scanPoller = null;
|
let scanPoller = null;
|
||||||
const PHASES = ['discovery','takeout','indexing','phash','grouping'];
|
const PHASES = ['takeout','indexing','phash','grouping'];
|
||||||
|
|
||||||
function startPoller() {
|
function startPoller() {
|
||||||
if (scanPoller) return;
|
if (scanPoller) return;
|
||||||
@@ -1065,7 +1064,7 @@ function updateScanUI(s) {
|
|||||||
|
|
||||||
if (isRunning) {
|
if (isRunning) {
|
||||||
el('progress-msg').textContent = s.message || '';
|
el('progress-msg').textContent = s.message || '';
|
||||||
const indeterminate = s.phase === 'discovery' || s.phase === 'takeout' || s.total === 0;
|
const indeterminate = s.phase === 'takeout' || s.total === 0;
|
||||||
const fill = el('progress-fill');
|
const fill = el('progress-fill');
|
||||||
fill.classList.toggle('indeterminate', indeterminate);
|
fill.classList.toggle('indeterminate', indeterminate);
|
||||||
if (!indeterminate) {
|
if (!indeterminate) {
|
||||||
|
|||||||
Reference in New Issue
Block a user