paramiko's SSHClient.open_sftp() allocates an exec channel before the SFTP subsystem request, which Synology DSM closes immediately with 'Channel closed'. Manual sftp(1) and WinSCP avoid this by going straight to the SFTP subsystem on a fresh channel. Replaced SSHClient with direct paramiko.Transport + SFTPClient.from_transport, matching the OpenSSH/WinSCP flow. Larger flow-control windows (128 MB) too since Synology has been observed to bail mid-handshake with the default 1 MB. test_connection_verbose now reports per-step status (connect+auth, open_sftp, listdir /, stat base_path, write probe). API returns the steps array so the UI can show exactly which step failed. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1054 lines
35 KiB
Python
1054 lines
35 KiB
Python
"""
|
|
FastAPI application — all API routes for the duplicate finder.
|
|
"""
|
|
|
|
import csv
|
|
import io
|
|
import os
|
|
import sqlite3
|
|
import subprocess
|
|
import threading
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
from fastapi import FastAPI, HTTPException, Query, Request
|
|
from fastapi.responses import (
|
|
FileResponse, JSONResponse, Response, StreamingResponse
|
|
)
|
|
from fastapi.staticfiles import StaticFiles
|
|
from fastapi.templating import Jinja2Templates
|
|
from pydantic import BaseModel
|
|
|
|
import scanner as sc
|
|
import sftp as sftp_mod
|
|
|
|
app = FastAPI(title="Duplicate Finder")
|
|
|
|
# Resolve paths relative to this file so it works both in Docker and locally
|
|
_BASE = Path(__file__).parent
|
|
_TEMPLATES_DIR = (
|
|
str(_BASE / "templates") if (_BASE / "templates").exists()
|
|
else str(_BASE.parent / "templates") if (_BASE.parent / "templates").exists()
|
|
else "/app/templates"
|
|
)
|
|
_STATIC_DIR = str(_BASE / "static")
|
|
_STATIC_DIR = _STATIC_DIR if Path(_STATIC_DIR).exists() else "/app/static"
|
|
# Ensure static dir exists
|
|
Path(_STATIC_DIR).mkdir(parents=True, exist_ok=True)
|
|
|
|
templates = Jinja2Templates(directory=_TEMPLATES_DIR)
|
|
|
|
app.mount("/static", StaticFiles(directory=_STATIC_DIR), name="static")
|
|
|
|
METHOD_META = {
|
|
"sha256": {"color": "#378ADD", "label": "Exact copy"},
|
|
"phash": {"color": "#9b7de8", "label": "Visual match"},
|
|
"exif": {"color": "#e2a43a", "label": "Same moment"},
|
|
"filesize": {"color": "#888780", "label": "Possible match"},
|
|
}
|
|
|
|
# ── Startup ───────────────────────────────────────────────────────────────────
|
|
|
|
@app.on_event("startup")
|
|
def startup():
|
|
sc.init_db()
|
|
|
|
|
|
# ── Frontend ──────────────────────────────────────────────────────────────────
|
|
|
|
@app.get("/")
|
|
def index(request: Request):
|
|
return templates.TemplateResponse("index.html", {"request": request})
|
|
|
|
|
|
# ── DB helper ─────────────────────────────────────────────────────────────────
|
|
|
|
def get_db() -> sqlite3.Connection:
|
|
return sc.get_db()
|
|
|
|
|
|
# ── Scan management ───────────────────────────────────────────────────────────
|
|
|
|
class ScanStartBody(BaseModel):
|
|
folder_path: str
|
|
mode: str = "incremental"
|
|
|
|
|
|
@app.post("/api/scan/start")
|
|
def scan_start(body: ScanStartBody):
|
|
if sc.scan_state["status"] == "running":
|
|
raise HTTPException(400, "A scan is already running")
|
|
|
|
mode = body.mode
|
|
if mode not in ("incremental", "full_reset", "new_files", "regroup"):
|
|
raise HTTPException(400, f"Unknown scan mode: {mode}")
|
|
|
|
if mode == "full_reset":
|
|
con = get_db()
|
|
cur = con.cursor()
|
|
cur.execute("DELETE FROM duplicate_members")
|
|
cur.execute("DELETE FROM duplicate_groups")
|
|
cur.execute("DELETE FROM files")
|
|
cur.execute("DELETE FROM scans")
|
|
con.commit()
|
|
con.close()
|
|
|
|
con = get_db()
|
|
cur = con.cursor()
|
|
cur.execute(
|
|
"INSERT INTO scans (folder_path, status) VALUES (?, 'running')",
|
|
(body.folder_path,),
|
|
)
|
|
scan_id = cur.lastrowid
|
|
con.commit()
|
|
con.close()
|
|
|
|
sc.scan_state.update(
|
|
scan_id=scan_id,
|
|
status="running",
|
|
phase="takeout",
|
|
progress=0,
|
|
total=0,
|
|
message="Starting...",
|
|
pause_requested=False,
|
|
files_indexed=0,
|
|
phashes_done=0,
|
|
folder_path=body.folder_path,
|
|
stats={},
|
|
)
|
|
|
|
def _scan_then_thumbs():
|
|
try:
|
|
sc.run_scan(body.folder_path, scan_id, mode)
|
|
finally:
|
|
# Kick off thumbnail pre-generation immediately when scan ends.
|
|
# Limited to files actually in duplicate groups — that's the gallery
|
|
# view and the only place thumbs are looked at.
|
|
_start_thumb_thread(only_in_groups=True)
|
|
|
|
thread = threading.Thread(target=_scan_then_thumbs, daemon=True)
|
|
thread.start()
|
|
|
|
return {"scan_id": scan_id}
|
|
|
|
|
|
@app.get("/api/scan/status")
|
|
def scan_status():
|
|
state = sc.scan_state
|
|
con = get_db()
|
|
cur = con.cursor()
|
|
|
|
# Build group counts per method
|
|
stats = {}
|
|
for method in ("sha256", "phash", "exif", "filesize"):
|
|
cur.execute("SELECT COUNT(*) FROM duplicate_groups WHERE method=?", (method,))
|
|
stats[f"{method}_groups"] = cur.fetchone()[0]
|
|
|
|
cur.execute("SELECT COUNT(*) FROM duplicate_groups")
|
|
stats["total_groups"] = cur.fetchone()[0]
|
|
cur.execute("SELECT COUNT(*) FROM duplicate_groups WHERE reviewed=1")
|
|
stats["reviewed"] = cur.fetchone()[0]
|
|
cur.execute("SELECT COUNT(*) FROM duplicate_groups WHERE reviewed=0")
|
|
stats["pending"] = cur.fetchone()[0]
|
|
cur.execute("SELECT COUNT(*) FROM files WHERE status != 'error'")
|
|
stats["total_files"] = cur.fetchone()[0]
|
|
con.close()
|
|
|
|
return {
|
|
"scan_id": state["scan_id"],
|
|
"status": state["status"],
|
|
"phase": state["phase"],
|
|
"progress": state["progress"],
|
|
"total": state["total"],
|
|
"message": state["message"],
|
|
"folder_path": state.get("folder_path"),
|
|
"files_indexed": state.get("files_indexed", 0),
|
|
"phashes_done": state.get("phashes_done", 0),
|
|
"stats": stats,
|
|
}
|
|
|
|
|
|
@app.post("/api/scan/pause")
|
|
def scan_pause():
|
|
if sc.scan_state["status"] != "running":
|
|
raise HTTPException(400, "No scan is currently running")
|
|
sc.scan_state["pause_requested"] = True
|
|
return {"success": True}
|
|
|
|
|
|
# Keep /cancel as an alias so any lingering clients still work
|
|
@app.post("/api/scan/cancel")
|
|
def scan_cancel():
|
|
return scan_pause()
|
|
|
|
|
|
@app.post("/api/scan/resume")
|
|
def scan_resume():
|
|
if sc.scan_state["status"] != "paused":
|
|
raise HTTPException(400, "No paused scan to resume")
|
|
|
|
folder_path = sc.scan_state.get("folder_path")
|
|
if not folder_path:
|
|
raise HTTPException(400, "No folder path saved — please start a new scan")
|
|
|
|
con = get_db()
|
|
cur = con.cursor()
|
|
cur.execute(
|
|
"INSERT INTO scans (folder_path, status) VALUES (?, 'running')",
|
|
(folder_path,),
|
|
)
|
|
scan_id = cur.lastrowid
|
|
con.commit()
|
|
con.close()
|
|
|
|
sc.scan_state.update(
|
|
scan_id=scan_id,
|
|
status="running",
|
|
phase="takeout",
|
|
progress=0,
|
|
total=0,
|
|
message="Resuming scan...",
|
|
pause_requested=False,
|
|
files_indexed=0,
|
|
phashes_done=0,
|
|
folder_path=folder_path,
|
|
stats={},
|
|
)
|
|
|
|
thread = threading.Thread(
|
|
target=sc.run_scan,
|
|
args=(folder_path, scan_id, "incremental"),
|
|
daemon=True,
|
|
)
|
|
thread.start()
|
|
|
|
return {"scan_id": scan_id}
|
|
|
|
|
|
@app.delete("/api/scan/reset")
|
|
def scan_reset(confirm: str = Query("")):
|
|
if confirm != "RESET":
|
|
raise HTTPException(400, "Pass ?confirm=RESET to confirm")
|
|
if sc.scan_state["status"] == "running":
|
|
raise HTTPException(
|
|
400, "A scan is currently running — pause it before resetting"
|
|
)
|
|
con = get_db()
|
|
cur = con.cursor()
|
|
cur.execute("DELETE FROM duplicate_members")
|
|
cur.execute("DELETE FROM duplicate_groups")
|
|
cur.execute("DELETE FROM files")
|
|
cur.execute("DELETE FROM scans")
|
|
con.commit()
|
|
con.close()
|
|
sc.scan_state.update(
|
|
scan_id=None, status="idle", phase="idle",
|
|
progress=0, total=0, message="",
|
|
pause_requested=False, files_indexed=0,
|
|
phashes_done=0, folder_path=None, stats={},
|
|
)
|
|
return {"success": True}
|
|
|
|
|
|
# ── Duplicate groups ──────────────────────────────────────────────────────────
|
|
|
|
@app.get("/api/groups")
|
|
def list_groups(
|
|
method: str = "all",
|
|
reviewed: str = "false",
|
|
sort: str = "count",
|
|
offset: int = 0,
|
|
limit: int = 50,
|
|
):
|
|
con = get_db()
|
|
cur = con.cursor()
|
|
|
|
where = []
|
|
params: list = []
|
|
|
|
if method != "all":
|
|
where.append("dg.method = ?")
|
|
params.append(method)
|
|
|
|
if reviewed == "false":
|
|
where.append("dg.reviewed = 0")
|
|
elif reviewed == "true":
|
|
where.append("dg.reviewed = 1")
|
|
|
|
where_clause = ("WHERE " + " AND ".join(where)) if where else ""
|
|
|
|
order = {
|
|
"count": "member_count DESC",
|
|
"method": "dg.method, member_count DESC",
|
|
"date": "dg.created_at DESC",
|
|
}.get(sort, "member_count DESC")
|
|
|
|
cur.execute(f"""
|
|
SELECT COUNT(*) FROM duplicate_groups dg {where_clause}
|
|
""", params)
|
|
total = cur.fetchone()[0]
|
|
|
|
cur.execute(f"""
|
|
SELECT dg.id, dg.method, dg.reviewed,
|
|
COUNT(dm.id) as member_count,
|
|
(SELECT dm2.file_id FROM duplicate_members dm2
|
|
WHERE dm2.group_id = dg.id AND dm2.suggested = 1
|
|
LIMIT 1) as suggested_file_id
|
|
FROM duplicate_groups dg
|
|
LEFT JOIN duplicate_members dm ON dm.group_id = dg.id
|
|
{where_clause}
|
|
GROUP BY dg.id
|
|
ORDER BY {order}
|
|
LIMIT ? OFFSET ?
|
|
""", params + [limit, offset])
|
|
|
|
groups = []
|
|
for row in cur.fetchall():
|
|
meta = METHOD_META.get(row["method"], {"color": "#888", "label": row["method"]})
|
|
suggested = None
|
|
if row["suggested_file_id"]:
|
|
cur.execute(
|
|
"SELECT id, filename, width, height FROM files WHERE id=?",
|
|
(row["suggested_file_id"],),
|
|
)
|
|
f = cur.fetchone()
|
|
if f:
|
|
suggested = {
|
|
"file_id": f["id"],
|
|
"filename": f["filename"],
|
|
"width": f["width"],
|
|
"height": f["height"],
|
|
"thumb_url": f"/api/thumb/{f['id']}",
|
|
}
|
|
groups.append({
|
|
"id": row["id"],
|
|
"method": row["method"],
|
|
"method_color": meta["color"],
|
|
"method_label": meta["label"],
|
|
"member_count": row["member_count"],
|
|
"reviewed": bool(row["reviewed"]),
|
|
"suggested_keeper": suggested,
|
|
})
|
|
|
|
con.close()
|
|
return {"total": total, "groups": groups}
|
|
|
|
|
|
@app.get("/api/groups/{group_id}")
|
|
def get_group(group_id: int):
|
|
con = get_db()
|
|
cur = con.cursor()
|
|
|
|
cur.execute("SELECT * FROM duplicate_groups WHERE id=?", (group_id,))
|
|
grp = cur.fetchone()
|
|
if not grp:
|
|
raise HTTPException(404, "Group not found")
|
|
|
|
meta = METHOD_META.get(grp["method"], {"color": "#888", "label": grp["method"]})
|
|
|
|
cur.execute("""
|
|
SELECT f.id, f.filename, f.path, f.file_size, f.width, f.height,
|
|
f.mime_type, f.exif_datetime, f.exif_device,
|
|
f.is_takeout, f.is_edited,
|
|
dm.is_keeper, dm.suggested
|
|
FROM duplicate_members dm
|
|
JOIN files f ON f.id = dm.file_id
|
|
WHERE dm.group_id = ?
|
|
ORDER BY dm.suggested DESC, f.width * f.height DESC
|
|
""", (group_id,))
|
|
|
|
members = []
|
|
for r in cur.fetchall():
|
|
members.append({
|
|
"file_id": r["id"],
|
|
"filename": r["filename"],
|
|
"path": r["path"],
|
|
"file_size": r["file_size"],
|
|
"width": r["width"],
|
|
"height": r["height"],
|
|
"mime_type": r["mime_type"],
|
|
"exif_datetime": r["exif_datetime"],
|
|
"exif_device": r["exif_device"],
|
|
"is_takeout": bool(r["is_takeout"]),
|
|
"is_edited": bool(r["is_edited"]) if r["is_edited"] is not None else False,
|
|
"is_suggested": bool(r["suggested"]),
|
|
"is_keeper": bool(r["is_keeper"]),
|
|
"thumb_url": f"/api/thumb/{r['id']}",
|
|
})
|
|
|
|
con.close()
|
|
return {
|
|
"id": grp["id"],
|
|
"method": grp["method"],
|
|
"method_color": meta["color"],
|
|
"method_label": meta["label"],
|
|
"method_value": grp["method_value"],
|
|
"reviewed": bool(grp["reviewed"]),
|
|
"members": members,
|
|
}
|
|
|
|
|
|
# ── Decisions ─────────────────────────────────────────────────────────────────
|
|
|
|
class DecideBody(BaseModel):
|
|
keeper_file_id: int
|
|
|
|
|
|
@app.post("/api/groups/{group_id}/decide")
|
|
def decide(group_id: int, body: DecideBody):
|
|
con = get_db()
|
|
cur = con.cursor()
|
|
|
|
cur.execute("SELECT id FROM duplicate_groups WHERE id=?", (group_id,))
|
|
if not cur.fetchone():
|
|
raise HTTPException(404, "Group not found")
|
|
|
|
cur.execute("SELECT file_id FROM duplicate_members WHERE group_id=?", (group_id,))
|
|
all_members = [r["file_id"] for r in cur.fetchall()]
|
|
|
|
if body.keeper_file_id not in all_members:
|
|
raise HTTPException(400, "keeper_file_id is not a member of this group")
|
|
|
|
for fid in all_members:
|
|
is_k = 1 if fid == body.keeper_file_id else 0
|
|
cur.execute(
|
|
"UPDATE duplicate_members SET is_keeper=? WHERE group_id=? AND file_id=?",
|
|
(is_k, group_id, fid),
|
|
)
|
|
status = "keeper" if is_k else "redundant"
|
|
cur.execute("UPDATE files SET status=? WHERE id=?", (status, fid))
|
|
sc.log_decision(cur, fid, group_id, status, "manual")
|
|
|
|
cur.execute("UPDATE duplicate_groups SET reviewed=1 WHERE id=?", (group_id,))
|
|
con.commit()
|
|
con.close()
|
|
return {"success": True}
|
|
|
|
|
|
@app.post("/api/groups/{group_id}/skip")
|
|
def skip_group(group_id: int):
|
|
con = get_db()
|
|
cur = con.cursor()
|
|
cur.execute("SELECT id FROM duplicate_groups WHERE id=?", (group_id,))
|
|
if not cur.fetchone():
|
|
raise HTTPException(404, "Group not found")
|
|
cur.execute("SELECT file_id FROM duplicate_members WHERE group_id=?", (group_id,))
|
|
for r in cur.fetchall():
|
|
sc.log_decision(cur, r["file_id"], group_id, "skip", "manual")
|
|
cur.execute("UPDATE duplicate_groups SET reviewed=1 WHERE id=?", (group_id,))
|
|
con.commit()
|
|
con.close()
|
|
return {"success": True}
|
|
|
|
|
|
@app.post("/api/groups/{group_id}/keep-all")
|
|
def keep_all(group_id: int):
|
|
con = get_db()
|
|
cur = con.cursor()
|
|
cur.execute("SELECT id FROM duplicate_groups WHERE id=?", (group_id,))
|
|
if not cur.fetchone():
|
|
raise HTTPException(404, "Group not found")
|
|
cur.execute("SELECT file_id FROM duplicate_members WHERE group_id=?", (group_id,))
|
|
for r in cur.fetchall():
|
|
cur.execute(
|
|
"UPDATE duplicate_members SET is_keeper=1 WHERE group_id=? AND file_id=?",
|
|
(group_id, r["file_id"]),
|
|
)
|
|
cur.execute("UPDATE files SET status='keeper' WHERE id=?", (r["file_id"],))
|
|
sc.log_decision(cur, r["file_id"], group_id, "keeper", "keep-all")
|
|
cur.execute("UPDATE duplicate_groups SET reviewed=1 WHERE id=?", (group_id,))
|
|
con.commit()
|
|
con.close()
|
|
return {"success": True}
|
|
|
|
|
|
@app.post("/api/groups/{group_id}/unreview")
|
|
def unreview_group(group_id: int):
|
|
con = get_db()
|
|
cur = con.cursor()
|
|
cur.execute("SELECT id FROM duplicate_groups WHERE id=?", (group_id,))
|
|
if not cur.fetchone():
|
|
raise HTTPException(404, "Group not found")
|
|
cur.execute("SELECT file_id FROM duplicate_members WHERE group_id=?", (group_id,))
|
|
for r in cur.fetchall():
|
|
cur.execute(
|
|
"UPDATE duplicate_members SET is_keeper=0 WHERE group_id=? AND file_id=?",
|
|
(group_id, r["file_id"]),
|
|
)
|
|
cur.execute("UPDATE files SET status='pending' WHERE id=?", (r["file_id"],))
|
|
sc.log_decision(cur, r["file_id"], group_id, "unreview", "manual")
|
|
cur.execute("UPDATE duplicate_groups SET reviewed=0 WHERE id=?", (group_id,))
|
|
con.commit()
|
|
con.close()
|
|
return {"success": True}
|
|
|
|
|
|
@app.post("/api/groups/auto-resolve-exact")
|
|
def auto_resolve_exact():
|
|
con = get_db()
|
|
cur = con.cursor()
|
|
cur.execute("""
|
|
SELECT id FROM duplicate_groups
|
|
WHERE method='sha256' AND reviewed=0
|
|
""")
|
|
groups = [r["id"] for r in cur.fetchall()]
|
|
resolved = 0
|
|
|
|
for gid in groups:
|
|
cur.execute("""
|
|
SELECT f.id, f.path, f.width, f.height, f.file_size,
|
|
f.exif_datetime, f.file_mtime
|
|
FROM duplicate_members dm
|
|
JOIN files f ON f.id = dm.file_id
|
|
WHERE dm.group_id = ?
|
|
""", (gid,))
|
|
members = [dict(r) for r in cur.fetchall()]
|
|
if not members:
|
|
continue
|
|
|
|
keeper_id = sc._suggested_keeper_by_resolution(members)
|
|
for m in members:
|
|
is_k = 1 if m["id"] == keeper_id else 0
|
|
cur.execute(
|
|
"UPDATE duplicate_members SET is_keeper=? WHERE group_id=? AND file_id=?",
|
|
(is_k, gid, m["id"]),
|
|
)
|
|
cur.execute(
|
|
"UPDATE files SET status=? WHERE id=?",
|
|
("keeper" if is_k else "redundant", m["id"]),
|
|
)
|
|
sc.log_decision(
|
|
cur, m["id"], gid,
|
|
"keeper" if is_k else "redundant",
|
|
"auto-resolve-exact",
|
|
)
|
|
cur.execute("UPDATE duplicate_groups SET reviewed=1 WHERE id=?", (gid,))
|
|
resolved += 1
|
|
|
|
con.commit()
|
|
con.close()
|
|
return {"resolved": resolved}
|
|
|
|
|
|
# ── Files + thumbnails ────────────────────────────────────────────────────────
|
|
|
|
VIDEO_PLACEHOLDER_SVG = """<svg xmlns="http://www.w3.org/2000/svg" width="200" height="200"
|
|
viewBox="0 0 200 200">
|
|
<rect width="200" height="200" fill="#1e1e2e"/>
|
|
<polygon points="75,55 75,145 145,100" fill="#9b7de8"/>
|
|
</svg>"""
|
|
|
|
VIDEO_EXT = {".mp4", ".mov", ".avi", ".mkv", ".m4v", ".3gp", ".wmv", ".mts", ".m2ts"}
|
|
|
|
|
|
THUMB_CACHE_DIR = "/data/thumbs"
|
|
THUMB_MAX = 256 # square bounding box; preserves aspect
|
|
|
|
|
|
def _thumb_cache_path(file_id: int) -> str:
|
|
"""Sharded cache path so no directory holds more than ~1000 files."""
|
|
shard = file_id // 1000
|
|
d = os.path.join(THUMB_CACHE_DIR, str(shard))
|
|
os.makedirs(d, exist_ok=True)
|
|
return os.path.join(d, f"{file_id}.jpg")
|
|
|
|
|
|
def _generate_thumb(src_path: str, dest_path: str, ext: str) -> bool:
|
|
"""Generate a 256px JPEG thumbnail at dest_path. Returns True on success."""
|
|
try:
|
|
if ext in VIDEO_EXT:
|
|
# ffmpeg first frame, scaled to fit
|
|
result = subprocess.run(
|
|
[
|
|
"ffmpeg", "-y", "-i", src_path,
|
|
"-vframes", "1",
|
|
"-vf", f"scale='min({THUMB_MAX},iw)':'-1'",
|
|
"-q:v", "5",
|
|
dest_path,
|
|
],
|
|
capture_output=True, timeout=20,
|
|
)
|
|
return result.returncode == 0 and os.path.getsize(dest_path) > 0
|
|
# Image branch — Pillow handles JPEG/PNG/GIF/WebP/TIFF/BMP natively;
|
|
# pillow-heif registers HEIC/HEIF as a Pillow-readable format.
|
|
from PIL import Image, ImageOps
|
|
try:
|
|
import pillow_heif # noqa: F401 (registers HEIF opener)
|
|
pillow_heif.register_heif_opener()
|
|
except Exception:
|
|
pass
|
|
with Image.open(src_path) as im:
|
|
im = ImageOps.exif_transpose(im) # respect EXIF rotation
|
|
im.thumbnail((THUMB_MAX, THUMB_MAX))
|
|
if im.mode not in ("RGB", "L"):
|
|
im = im.convert("RGB")
|
|
im.save(dest_path, "JPEG", quality=80, optimize=True)
|
|
return True
|
|
except Exception:
|
|
# Cleanup partial write
|
|
try:
|
|
if os.path.exists(dest_path):
|
|
os.unlink(dest_path)
|
|
except Exception:
|
|
pass
|
|
return False
|
|
|
|
|
|
@app.get("/api/thumb/{file_id}")
|
|
def get_thumb(file_id: int):
|
|
con = get_db()
|
|
cur = con.cursor()
|
|
cur.execute("SELECT path, mime_type, extension FROM files WHERE id=?", (file_id,))
|
|
row = cur.fetchone()
|
|
con.close()
|
|
|
|
if not row:
|
|
raise HTTPException(404, "File not found")
|
|
|
|
ext = (row["extension"] or "").lower()
|
|
cached = _thumb_cache_path(file_id)
|
|
|
|
# Cache hit — serve the local JPEG, never touches the NAS
|
|
if os.path.isfile(cached) and os.path.getsize(cached) > 0:
|
|
return FileResponse(cached, media_type="image/jpeg")
|
|
|
|
src = row["path"]
|
|
if not os.path.isfile(src):
|
|
raise HTTPException(404, "File not on disk")
|
|
|
|
if _generate_thumb(src, cached, ext):
|
|
return FileResponse(cached, media_type="image/jpeg")
|
|
|
|
# Final fallback: video placeholder for videos, original file for photos
|
|
if ext in VIDEO_EXT:
|
|
return Response(content=VIDEO_PLACEHOLDER_SVG, media_type="image/svg+xml")
|
|
mime = row["mime_type"] or "application/octet-stream"
|
|
return FileResponse(src, media_type=mime)
|
|
|
|
|
|
@app.delete("/api/thumb/cache")
|
|
def clear_thumb_cache():
|
|
"""Wipe the thumbnail cache. Safe to call any time — they regenerate on demand."""
|
|
import shutil
|
|
if os.path.isdir(THUMB_CACHE_DIR):
|
|
shutil.rmtree(THUMB_CACHE_DIR, ignore_errors=True)
|
|
return {"cleared": True}
|
|
|
|
|
|
# ── Bulk thumbnail pre-generation ────────────────────────────────────────────
|
|
|
|
thumb_state: dict = {
|
|
"status": "idle", # idle | running | done | error
|
|
"total": 0,
|
|
"done": 0,
|
|
"skipped": 0, # already cached
|
|
"failed": 0,
|
|
"current": "",
|
|
"started_at": None,
|
|
"completed_at": None,
|
|
}
|
|
_thumb_thread_lock = threading.Lock()
|
|
|
|
|
|
def _generate_all_thumbs(only_in_groups: bool = False):
|
|
"""Walk every file and generate any missing thumbnail.
|
|
|
|
Runs in a background thread. Idempotent — already-cached files are
|
|
counted as skipped, not regenerated.
|
|
"""
|
|
import time
|
|
from datetime import datetime
|
|
thumb_state.update(
|
|
status="running", total=0, done=0, skipped=0, failed=0,
|
|
current="", started_at=datetime.utcnow().isoformat() + "Z",
|
|
completed_at=None,
|
|
)
|
|
try:
|
|
con = get_db()
|
|
cur = con.cursor()
|
|
if only_in_groups:
|
|
cur.execute("""
|
|
SELECT DISTINCT f.id, f.path, f.extension
|
|
FROM files f
|
|
JOIN duplicate_members dm ON dm.file_id = f.id
|
|
""")
|
|
else:
|
|
cur.execute("SELECT id, path, extension FROM files")
|
|
files = cur.fetchall()
|
|
con.close()
|
|
thumb_state["total"] = len(files)
|
|
|
|
for r in files:
|
|
fid = r["id"]
|
|
path = r["path"]
|
|
ext = (r["extension"] or "").lower()
|
|
cached = _thumb_cache_path(fid)
|
|
|
|
thumb_state["current"] = path or ""
|
|
|
|
if os.path.isfile(cached) and os.path.getsize(cached) > 0:
|
|
thumb_state["skipped"] += 1
|
|
elif not path or not os.path.isfile(path):
|
|
thumb_state["failed"] += 1
|
|
elif _generate_thumb(path, cached, ext):
|
|
thumb_state["done"] += 1
|
|
else:
|
|
thumb_state["failed"] += 1
|
|
|
|
# Yield occasionally so the API stays responsive
|
|
if (thumb_state["done"] + thumb_state["skipped"] + thumb_state["failed"]) % 50 == 0:
|
|
time.sleep(0)
|
|
|
|
from datetime import datetime as _dt
|
|
thumb_state["status"] = "done"
|
|
thumb_state["completed_at"] = _dt.utcnow().isoformat() + "Z"
|
|
thumb_state["current"] = ""
|
|
except Exception as e:
|
|
thumb_state["status"] = "error"
|
|
thumb_state["current"] = f"error: {e}"
|
|
|
|
|
|
def _start_thumb_thread(only_in_groups: bool = False) -> bool:
|
|
"""Start the background generator if not already running. Returns True if started."""
|
|
with _thumb_thread_lock:
|
|
if thumb_state["status"] == "running":
|
|
return False
|
|
t = threading.Thread(
|
|
target=_generate_all_thumbs,
|
|
args=(only_in_groups,),
|
|
daemon=True,
|
|
)
|
|
t.start()
|
|
return True
|
|
|
|
|
|
@app.post("/api/thumbs/generate")
|
|
def generate_thumbs(only_in_groups: bool = Query(False)):
|
|
"""Pre-generate thumbnails for every file (or only files in a duplicate group)."""
|
|
if not _start_thumb_thread(only_in_groups):
|
|
raise HTTPException(409, "Thumbnail generation already in progress")
|
|
return {"status": "started"}
|
|
|
|
|
|
@app.get("/api/thumbs/status")
|
|
def thumbs_status():
|
|
return dict(thumb_state)
|
|
|
|
|
|
@app.get("/api/files/{file_id}")
|
|
def get_file_meta(file_id: int):
|
|
con = get_db()
|
|
cur = con.cursor()
|
|
cur.execute("SELECT * FROM files WHERE id=?", (file_id,))
|
|
row = cur.fetchone()
|
|
con.close()
|
|
if not row:
|
|
raise HTTPException(404, "File not found")
|
|
return dict(row)
|
|
|
|
|
|
# ── Stats ─────────────────────────────────────────────────────────────────────
|
|
|
|
@app.get("/api/browse")
|
|
def browse(path: str = Query("/")):
|
|
"""List subdirectories at the given path for the folder picker."""
|
|
try:
|
|
p = Path(path).resolve()
|
|
except Exception:
|
|
raise HTTPException(400, "Invalid path")
|
|
if not p.exists() or not p.is_dir():
|
|
raise HTTPException(404, "Path not found")
|
|
|
|
dirs = []
|
|
try:
|
|
for entry in sorted(p.iterdir()):
|
|
if entry.is_dir() and not entry.name.startswith("."):
|
|
dirs.append(entry.name)
|
|
except PermissionError:
|
|
pass
|
|
|
|
parent = str(p.parent) if p != p.parent else None
|
|
return {
|
|
"current": str(p),
|
|
"parent": parent,
|
|
"dirs": dirs,
|
|
}
|
|
|
|
|
|
@app.get("/api/stats")
|
|
def get_stats():
|
|
con = get_db()
|
|
cur = con.cursor()
|
|
|
|
cur.execute("SELECT COUNT(*), SUM(file_size) FROM files WHERE status != 'error'")
|
|
r = cur.fetchone()
|
|
total_files = r[0] or 0
|
|
total_size = r[1] or 0
|
|
|
|
cur.execute("""
|
|
SELECT COUNT(*), SUM(f.file_size)
|
|
FROM files f
|
|
JOIN duplicate_members dm ON dm.file_id = f.id
|
|
WHERE dm.is_keeper = 0
|
|
""")
|
|
r = cur.fetchone()
|
|
dup_files = r[0] or 0
|
|
dup_size = r[1] or 0
|
|
|
|
by_method = {}
|
|
for method in ("sha256", "phash", "exif", "filesize"):
|
|
cur.execute("SELECT COUNT(*) FROM duplicate_groups WHERE method=?", (method,))
|
|
groups = cur.fetchone()[0]
|
|
cur.execute("""
|
|
SELECT COUNT(*) FROM duplicate_members dm
|
|
JOIN duplicate_groups dg ON dg.id = dm.group_id
|
|
WHERE dg.method = ?
|
|
""", (method,))
|
|
files = cur.fetchone()[0]
|
|
by_method[method] = {"groups": groups, "files": files}
|
|
|
|
cur.execute("SELECT COUNT(*) FROM duplicate_groups WHERE reviewed=1")
|
|
reviewed = cur.fetchone()[0]
|
|
cur.execute("SELECT COUNT(*) FROM duplicate_groups WHERE reviewed=0")
|
|
pending = cur.fetchone()[0]
|
|
|
|
cur.execute("SELECT COUNT(*) FROM files WHERE is_takeout=1")
|
|
takeout_files = cur.fetchone()[0]
|
|
|
|
con.close()
|
|
return {
|
|
"total_files": total_files,
|
|
"total_size_bytes": total_size,
|
|
"duplicate_files": dup_files,
|
|
"duplicate_size_bytes": dup_size,
|
|
"groups_by_method": by_method,
|
|
"reviewed": reviewed,
|
|
"pending": pending,
|
|
"takeout_files": takeout_files,
|
|
}
|
|
|
|
|
|
# ── Export ────────────────────────────────────────────────────────────────────
|
|
|
|
@app.get("/api/export/csv")
|
|
def export_csv():
|
|
con = get_db()
|
|
cur = con.cursor()
|
|
cur.execute("""
|
|
SELECT dg.id as group_id, dg.method, f.id as file_id,
|
|
f.path, f.filename, f.file_size,
|
|
f.width, f.height, f.exif_datetime, f.exif_device,
|
|
dm.is_keeper,
|
|
CASE WHEN dm.is_keeper=0 AND dg.reviewed=1 THEN 1 ELSE 0 END as is_redundant,
|
|
dg.reviewed
|
|
FROM duplicate_groups dg
|
|
JOIN duplicate_members dm ON dm.group_id = dg.id
|
|
JOIN files f ON f.id = dm.file_id
|
|
ORDER BY dg.id, dm.is_keeper DESC
|
|
""")
|
|
rows = cur.fetchall()
|
|
con.close()
|
|
|
|
output = io.StringIO()
|
|
# QUOTE_ALL + explicit lineterminator handles paths/filenames containing
|
|
# embedded \r, \n, quotes, or NULs — which the default dialect refuses.
|
|
writer = csv.writer(output, quoting=csv.QUOTE_ALL, lineterminator="\n")
|
|
writer.writerow([
|
|
"group_id", "method", "file_id", "path", "filename",
|
|
"size", "width", "height", "exif_date", "device",
|
|
"is_keeper", "is_redundant", "reviewed",
|
|
])
|
|
|
|
def _clean(v):
|
|
# Strip NULs (csv writer rejects them) and normalise embedded line breaks
|
|
if isinstance(v, str):
|
|
return v.replace("\x00", "").replace("\r\n", " ").replace("\r", " ").replace("\n", " ")
|
|
return v
|
|
|
|
for r in rows:
|
|
# path column = directory only; filename has the basename already
|
|
full = r["path"] or ""
|
|
dir_only = full.rsplit("/", 1)[0] if "/" in full else ""
|
|
writer.writerow([
|
|
r["group_id"], r["method"], r["file_id"],
|
|
_clean(dir_only), _clean(r["filename"]), r["file_size"],
|
|
r["width"], r["height"], _clean(r["exif_datetime"]),
|
|
_clean(r["exif_device"]),
|
|
r["is_keeper"], r["is_redundant"], r["reviewed"],
|
|
])
|
|
|
|
output.seek(0)
|
|
return StreamingResponse(
|
|
iter([output.getvalue()]),
|
|
media_type="text/csv",
|
|
headers={"Content-Disposition": "attachment; filename=dup-finder-export.csv"},
|
|
)
|
|
|
|
|
|
# ── SFTP destinations ────────────────────────────────────────────────────────
|
|
|
|
class SFTPDestBody(BaseModel):
|
|
name: str
|
|
host: str
|
|
port: int = 22
|
|
username: str
|
|
auth_method: str # 'password' | 'key'
|
|
base_path: str
|
|
mirror_structure: bool = True
|
|
# Either password (for password auth) or private_key (for key auth).
|
|
# Optional on update — omit to leave existing credential untouched.
|
|
password: Optional[str] = None
|
|
private_key: Optional[str] = None
|
|
|
|
|
|
def _dest_row_to_dict(row) -> dict:
|
|
return {
|
|
"id": row["id"],
|
|
"name": row["name"],
|
|
"host": row["host"],
|
|
"port": row["port"],
|
|
"username": row["username"],
|
|
"auth_method": row["auth_method"],
|
|
"base_path": row["base_path"],
|
|
"mirror_structure": bool(row["mirror_structure"]),
|
|
"enabled": bool(row["enabled"]),
|
|
"created_at": row["created_at"],
|
|
"last_tested_at": row["last_tested_at"],
|
|
"last_test_result": row["last_test_result"],
|
|
"has_credentials": sftp_mod.has_credentials(row["id"], row["auth_method"]),
|
|
}
|
|
|
|
|
|
@app.get("/api/sftp/destinations")
|
|
def list_destinations():
|
|
con = get_db()
|
|
cur = con.cursor()
|
|
cur.execute("SELECT * FROM sftp_destinations ORDER BY name")
|
|
out = [_dest_row_to_dict(r) for r in cur.fetchall()]
|
|
con.close()
|
|
return out
|
|
|
|
|
|
@app.post("/api/sftp/destinations", status_code=201)
|
|
def create_destination(body: SFTPDestBody):
|
|
if body.auth_method not in ("password", "key"):
|
|
raise HTTPException(400, "auth_method must be 'password' or 'key'")
|
|
if body.auth_method == "password" and not body.password:
|
|
raise HTTPException(400, "password required for password auth")
|
|
if body.auth_method == "key" and not body.private_key:
|
|
raise HTTPException(400, "private_key required for key auth")
|
|
|
|
con = get_db()
|
|
cur = con.cursor()
|
|
try:
|
|
cur.execute("""
|
|
INSERT INTO sftp_destinations
|
|
(name, host, port, username, auth_method, base_path, mirror_structure)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
""", (body.name, body.host, body.port, body.username,
|
|
body.auth_method, body.base_path, 1 if body.mirror_structure else 0))
|
|
dest_id = cur.lastrowid
|
|
con.commit()
|
|
except sqlite3.IntegrityError:
|
|
con.close()
|
|
raise HTTPException(409, f"Destination name already in use: {body.name}")
|
|
|
|
if body.auth_method == "password":
|
|
sftp_mod.write_password(dest_id, body.password)
|
|
else:
|
|
sftp_mod.write_private_key(dest_id, body.private_key)
|
|
|
|
cur.execute("SELECT * FROM sftp_destinations WHERE id=?", (dest_id,))
|
|
out = _dest_row_to_dict(cur.fetchone())
|
|
con.close()
|
|
return out
|
|
|
|
|
|
@app.put("/api/sftp/destinations/{dest_id}")
|
|
def update_destination(dest_id: int, body: SFTPDestBody):
|
|
con = get_db()
|
|
cur = con.cursor()
|
|
cur.execute("SELECT * FROM sftp_destinations WHERE id=?", (dest_id,))
|
|
row = cur.fetchone()
|
|
if not row:
|
|
con.close()
|
|
raise HTTPException(404, "Destination not found")
|
|
|
|
cur.execute("""
|
|
UPDATE sftp_destinations
|
|
SET name=?, host=?, port=?, username=?, auth_method=?,
|
|
base_path=?, mirror_structure=?
|
|
WHERE id=?
|
|
""", (body.name, body.host, body.port, body.username,
|
|
body.auth_method, body.base_path,
|
|
1 if body.mirror_structure else 0, dest_id))
|
|
|
|
# If auth method changed, drop old creds
|
|
if row["auth_method"] != body.auth_method:
|
|
sftp_mod.delete_credentials(dest_id)
|
|
|
|
if body.auth_method == "password" and body.password:
|
|
sftp_mod.write_password(dest_id, body.password)
|
|
elif body.auth_method == "key" and body.private_key:
|
|
sftp_mod.write_private_key(dest_id, body.private_key)
|
|
|
|
con.commit()
|
|
cur.execute("SELECT * FROM sftp_destinations WHERE id=?", (dest_id,))
|
|
out = _dest_row_to_dict(cur.fetchone())
|
|
con.close()
|
|
return out
|
|
|
|
|
|
@app.delete("/api/sftp/destinations/{dest_id}", status_code=204)
|
|
def delete_destination(dest_id: int):
|
|
con = get_db()
|
|
cur = con.cursor()
|
|
cur.execute("DELETE FROM sftp_destinations WHERE id=?", (dest_id,))
|
|
if cur.rowcount == 0:
|
|
con.close()
|
|
raise HTTPException(404, "Destination not found")
|
|
con.commit()
|
|
con.close()
|
|
sftp_mod.delete_credentials(dest_id)
|
|
return Response(status_code=204)
|
|
|
|
|
|
@app.post("/api/sftp/destinations/{dest_id}/test")
|
|
def test_destination(dest_id: int):
|
|
con = get_db()
|
|
cur = con.cursor()
|
|
cur.execute("SELECT * FROM sftp_destinations WHERE id=?", (dest_id,))
|
|
row = cur.fetchone()
|
|
if not row:
|
|
con.close()
|
|
raise HTTPException(404, "Destination not found")
|
|
dest = _dest_row_to_dict(row)
|
|
if not dest["has_credentials"]:
|
|
con.close()
|
|
raise HTTPException(400, "No credentials stored for this destination")
|
|
|
|
ok, message, steps = sftp_mod.test_connection_verbose(dest)
|
|
cur.execute("""
|
|
UPDATE sftp_destinations
|
|
SET last_tested_at=CURRENT_TIMESTAMP, last_test_result=?
|
|
WHERE id=?
|
|
""", ("ok" if ok else message, dest_id))
|
|
con.commit()
|
|
cur.execute("SELECT * FROM sftp_destinations WHERE id=?", (dest_id,))
|
|
out = _dest_row_to_dict(cur.fetchone())
|
|
con.close()
|
|
return {"ok": ok, "message": message, "steps": steps, "destination": out}
|
|
|
|
|
|
@app.post("/api/sftp/keypair")
|
|
def generate_keypair():
|
|
"""Generate a fresh ED25519 keypair. Returns the private + public halves;
|
|
the caller is expected to paste the private key into a destination's
|
|
private_key field on create/update."""
|
|
private_pem, public_openssh, fingerprint = sftp_mod.generate_keypair()
|
|
return {
|
|
"private_key": private_pem,
|
|
"public_key": public_openssh,
|
|
"fingerprint": fingerprint,
|
|
}
|