Files
duplicate-finder/app/gpu_hasher.py
tocmo c110a8e4f9 GPU-accelerated phash + fix discovery/takeout hang
GPU:
- Switch Dockerfile base to pytorch/pytorch:2.3.1-cuda12.1-cudnn8-runtime
- Add gpu_hasher.py: batched 2D DCT on GPU via PyTorch matrix multiply,
  256 images/batch, produces imagehash-compatible 64-bit hex hashes,
  auto-falls back to CPU when CUDA unavailable
- Replace per-image phash loop in scanner.py with phasher.hash_files()
- docker-compose.yml: add nvidia GPU device reservation

Hang fix:
- takeout.is_takeout_folder() now caps at 50 directories (was walking
  entire tree — blocked for minutes on 65k+ file libraries)
- Add "Not a Takeout folder" status message so takeout phase is never silent

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-05 01:37:28 -04:00

163 lines
5.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
GPU-accelerated perceptual hashing via PyTorch + CUDA.
Implements the same pHash algorithm as the `imagehash` library (DCT-II,
8×8 low-frequency block, 64-bit hash) so hashes produced here are
directly comparable with any existing imagehash-generated hashes in the DB.
Falls back to CPU if CUDA is not available — no code changes needed.
"""
import logging
import math
from pathlib import Path
import numpy as np
import torch
from PIL import Image, UnidentifiedImageError
try:
from pillow_heif import register_heif_opener
register_heif_opener()
except ImportError:
pass
log = logging.getLogger(__name__)
# Must match imagehash defaults: hash_size=8, highfreq_factor=4
HASH_SIZE = 8
IMG_SIZE = HASH_SIZE * 4 # 32
BATCH_SIZE = 256 # images per GPU batch; lower if VRAM is tight
class GpuPhasher:
"""
Batched perceptual hasher. Uses CUDA when available, CPU otherwise.
The DCT is implemented as two matrix multiplications:
DCT2D(X) = D @ X @ Dᵀ
where D is the precomputed orthonormal DCT-II matrix of size IMG_SIZE.
This runs entirely on-GPU for the full batch.
"""
def __init__(self, batch_size: int = BATCH_SIZE):
self.batch_size = batch_size
if torch.cuda.is_available():
self.device = torch.device("cuda")
dev_name = torch.cuda.get_device_name(0)
log.info("GpuPhasher: using CUDA device — %s", dev_name)
else:
self.device = torch.device("cpu")
log.info("GpuPhasher: CUDA not available, using CPU")
# Precompute orthonormal DCT-II matrix (IMG_SIZE × IMG_SIZE)
self._dct = self._build_dct_matrix(IMG_SIZE).to(self.device)
# ── DCT matrix ────────────────────────────────────────────────────────────
@staticmethod
def _build_dct_matrix(n: int) -> torch.Tensor:
"""Orthonormal DCT-II matrix of size n×n."""
k = torch.arange(n, dtype=torch.float32).unsqueeze(1) # (n, 1)
i = torch.arange(n, dtype=torch.float32).unsqueeze(0) # (1, n)
mat = torch.cos(math.pi * k * (2.0 * i + 1.0) / (2.0 * n)) # (n, n)
mat[0] *= 1.0 / math.sqrt(n)
mat[1:] *= math.sqrt(2.0 / n)
return mat # (n, n)
# ── Image loading ─────────────────────────────────────────────────────────
@staticmethod
def _load_image(path: str) -> np.ndarray | None:
"""Load image → greyscale float32 numpy array of shape (IMG_SIZE, IMG_SIZE)."""
try:
img = (
Image.open(path)
.convert("L")
.resize((IMG_SIZE, IMG_SIZE), Image.Resampling.LANCZOS)
)
return np.asarray(img, dtype=np.float32)
except (UnidentifiedImageError, OSError, Exception):
return None
# ── Core GPU batch ────────────────────────────────────────────────────────
def _phash_batch(self, arrays: list[np.ndarray]) -> list[str]:
"""
Compute pHash for a list of (IMG_SIZE, IMG_SIZE) float32 numpy arrays.
Returns a list of 16-char hex strings (64-bit hashes).
"""
# Stack into GPU tensor (B, H, W)
batch = torch.from_numpy(np.stack(arrays)).to(self.device) # (B, 32, 32)
# 2D DCT: D @ X @ Dᵀ
dct2d = self._dct @ batch @ self._dct.T # (B, 32, 32)
# Keep only top-left HASH_SIZE × HASH_SIZE block
low = dct2d[:, :HASH_SIZE, :HASH_SIZE] # (B, 8, 8)
flat = low.reshape(low.shape[0], -1) # (B, 64)
# Each bit: is value > row mean?
means = flat.mean(dim=1, keepdim=True)
bits = (flat > means).cpu().numpy() # (B, 64) bool
# Pack bits → bytes → hex (matches imagehash's __str__ format)
return [np.packbits(b).tobytes().hex() for b in bits]
# ── Public API ────────────────────────────────────────────────────────────
def hash_files(
self,
paths: list[str],
progress_cb=None,
) -> dict[str, str]:
"""
Compute pHash for every path in `paths`.
Returns {path: hex_hash_string}. Paths that fail to open are omitted.
progress_cb(n_done: int) is called after each batch.
"""
results: dict[str, str] = {}
done = 0
for i in range(0, len(paths), self.batch_size):
chunk = paths[i : i + self.batch_size]
arrays: list[np.ndarray] = []
valid: list[str] = []
for p in chunk:
arr = self._load_image(p)
if arr is not None:
arrays.append(arr)
valid.append(p)
if arrays:
try:
hashes = self._phash_batch(arrays)
results.update(zip(valid, hashes))
except Exception as exc:
log.warning("GPU batch failed (%s); skipping batch", exc)
done += len(chunk)
if progress_cb:
progress_cb(done)
return results
@property
def using_gpu(self) -> bool:
return self.device.type == "cuda"
# ── Module-level singleton (created once, reused across scan phases) ──────────
_phasher: GpuPhasher | None = None
def get_phasher() -> GpuPhasher:
global _phasher
if _phasher is None:
_phasher = GpuPhasher()
return _phasher