Files
duplicate-finder/app/takeout.py
tocmo 868da9016d Initial implementation of duplicate finder
Full project per spec: FastAPI backend, 4-method duplicate detection
(SHA-256, phash, EXIF, filesize), Google Takeout pre-processor,
4 scan modes, and dark-theme vanilla JS gallery frontend.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-04 23:42:58 -04:00

150 lines
4.6 KiB
Python

"""
Google Takeout pre-processor.
Detects Takeout folder structures, reads JSON sidecars, and enriches
the files table with corrected timestamps, normalized filenames, and
edit-version flags.
"""
import json
import os
import re
import sqlite3
from datetime import datetime, timezone
from pathlib import Path
# Google edit suffixes appended to filenames
EDIT_SUFFIXES = ("-edited", "-effects", "-smile", "-mix")
def _find_sidecar(media_path: str) -> str | None:
"""Return path to the JSON sidecar for a media file, or None."""
p = Path(media_path)
# Try filename.ext.json first, then filename.json
candidates = [
str(p) + ".json",
str(p.with_suffix(".json")),
]
for c in candidates:
if os.path.isfile(c):
return c
return None
def _strip_collision_suffix(filename: str) -> str:
"""Strip Google's (1), (2) collision suffixes from a filename."""
stem = Path(filename).stem
ext = Path(filename).suffix
cleaned = re.sub(r"\(\d+\)$", "", stem).rstrip()
return cleaned + ext
def _is_edited(filename: str) -> bool:
stem = Path(filename).stem.lower()
return any(stem.endswith(s) for s in EDIT_SUFFIXES)
def is_takeout_folder(folder_path: str) -> bool:
"""
Heuristic: walk folder looking for .json files whose names match
adjacent media files. If we find at least 5 such pairs, call it Takeout.
"""
count = 0
for root, dirs, files in os.walk(folder_path):
# Skip hidden dirs
dirs[:] = [d for d in dirs if not d.startswith(".")]
file_set = set(files)
for f in files:
if not f.endswith(".json"):
continue
# Check if a media file exists that this could be a sidecar for
base = f[:-5] # strip .json
if base in file_set:
count += 1
if count >= 5:
return True
return False
def process_takeout(folder_path: str, db_path: str) -> int:
"""
Walk folder_path, find all media files with JSON sidecars,
and enrich their DB records. Returns count of files enriched.
"""
con = sqlite3.connect(db_path)
con.row_factory = sqlite3.Row
cur = con.cursor()
enriched = 0
for root, dirs, files in os.walk(folder_path):
dirs[:] = [d for d in dirs if not d.startswith(".")]
for fname in files:
if fname.endswith(".json"):
continue
media_path = os.path.join(root, fname)
sidecar = _find_sidecar(media_path)
if not sidecar:
continue
try:
with open(sidecar, "r", encoding="utf-8") as f:
data = json.load(f)
except (json.JSONDecodeError, OSError):
continue
# Extract fields from sidecar
photo_taken_ts = None
try:
ts = int(data["photoTakenTime"]["timestamp"])
dt = datetime.fromtimestamp(ts, tz=timezone.utc)
photo_taken_ts = dt.strftime("%Y-%m-%dT%H:%M:%S")
except (KeyError, ValueError, TypeError):
pass
title = data.get("title", "")
takeout_json_str = json.dumps(data)
# Normalized filename: use title if present, else strip suffix from fname
if title:
normalized = _strip_collision_suffix(title)
else:
normalized = _strip_collision_suffix(fname)
edited = _is_edited(fname)
# Update the DB record for this file
updates = {
"is_takeout": 1,
"filename": normalized,
"takeout_json": takeout_json_str,
}
if photo_taken_ts:
updates["exif_datetime"] = photo_taken_ts
set_clause = ", ".join(f"{k} = ?" for k in updates)
values = list(updates.values()) + [media_path]
cur.execute(
f"UPDATE files SET {set_clause}, updated_at = CURRENT_TIMESTAMP "
f"WHERE path = ?",
values,
)
# Handle edited flag — add is_edited column if needed (migration-safe)
if edited:
try:
cur.execute(
"UPDATE files SET is_edited = 1 WHERE path = ?",
(media_path,),
)
except sqlite3.OperationalError:
pass # column doesn't exist yet, skip
if cur.rowcount > 0:
enriched += 1
con.commit()
con.close()
return enriched