""" Google Takeout pre-processor. Detects Takeout folder structures, reads JSON sidecars, and enriches the files table with corrected timestamps, normalized filenames, and edit-version flags. """ import json import os import re import sqlite3 from datetime import datetime, timezone from pathlib import Path # Google edit suffixes appended to filenames EDIT_SUFFIXES = ("-edited", "-effects", "-smile", "-mix") def _find_sidecar(media_path: str) -> str | None: """Return path to the JSON sidecar for a media file, or None.""" p = Path(media_path) # Try filename.ext.json first, then filename.json candidates = [ str(p) + ".json", str(p.with_suffix(".json")), ] for c in candidates: if os.path.isfile(c): return c return None def _strip_collision_suffix(filename: str) -> str: """Strip Google's (1), (2) collision suffixes from a filename.""" stem = Path(filename).stem ext = Path(filename).suffix cleaned = re.sub(r"\(\d+\)$", "", stem).rstrip() return cleaned + ext def _is_edited(filename: str) -> bool: stem = Path(filename).stem.lower() return any(stem.endswith(s) for s in EDIT_SUFFIXES) def is_takeout_folder(folder_path: str) -> bool: """ Heuristic: walk folder looking for .json files whose names match adjacent media files. If we find at least 5 such pairs, call it Takeout. """ count = 0 dirs_checked = 0 MAX_DIRS = 50 # sample at most 50 directories — fast on any library size for root, dirs, files in os.walk(folder_path): dirs[:] = [d for d in dirs if not d.startswith(".")] dirs_checked += 1 if dirs_checked > MAX_DIRS: break file_set = set(files) for f in files: if not f.endswith(".json"): continue base = f[:-5] # strip .json if base in file_set: count += 1 if count >= 5: return True return False def process_takeout(folder_path: str, db_path: str) -> int: """ Walk folder_path, find all media files with JSON sidecars, and enrich their DB records. Returns count of files enriched. """ con = sqlite3.connect(db_path) con.row_factory = sqlite3.Row cur = con.cursor() enriched = 0 for root, dirs, files in os.walk(folder_path): dirs[:] = [d for d in dirs if not d.startswith(".")] for fname in files: if fname.endswith(".json"): continue media_path = os.path.join(root, fname) sidecar = _find_sidecar(media_path) if not sidecar: continue try: with open(sidecar, "r", encoding="utf-8") as f: data = json.load(f) except (json.JSONDecodeError, OSError): continue # Extract fields from sidecar photo_taken_ts = None try: ts = int(data["photoTakenTime"]["timestamp"]) dt = datetime.fromtimestamp(ts, tz=timezone.utc) photo_taken_ts = dt.strftime("%Y-%m-%dT%H:%M:%S") except (KeyError, ValueError, TypeError): pass title = data.get("title", "") takeout_json_str = json.dumps(data) # Normalized filename: use title if present, else strip suffix from fname if title: normalized = _strip_collision_suffix(title) else: normalized = _strip_collision_suffix(fname) edited = _is_edited(fname) # Update the DB record for this file updates = { "is_takeout": 1, "filename": normalized, "takeout_json": takeout_json_str, } if photo_taken_ts: updates["exif_datetime"] = photo_taken_ts set_clause = ", ".join(f"{k} = ?" for k in updates) values = list(updates.values()) + [media_path] cur.execute( f"UPDATE files SET {set_clause}, updated_at = CURRENT_TIMESTAMP " f"WHERE path = ?", values, ) # Handle edited flag — add is_edited column if needed (migration-safe) if edited: try: cur.execute( "UPDATE files SET is_edited = 1 WHERE path = ?", (media_path,), ) except sqlite3.OperationalError: pass # column doesn't exist yet, skip if cur.rowcount > 0: enriched += 1 con.commit() con.close() return enriched