GPU: - Switch Dockerfile base to pytorch/pytorch:2.3.1-cuda12.1-cudnn8-runtime - Add gpu_hasher.py: batched 2D DCT on GPU via PyTorch matrix multiply, 256 images/batch, produces imagehash-compatible 64-bit hex hashes, auto-falls back to CPU when CUDA unavailable - Replace per-image phash loop in scanner.py with phasher.hash_files() - docker-compose.yml: add nvidia GPU device reservation Hang fix: - takeout.is_takeout_folder() now caps at 50 directories (was walking entire tree — blocked for minutes on 65k+ file libraries) - Add "Not a Takeout folder" status message so takeout phase is never silent Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
163 lines
5.8 KiB
Python
163 lines
5.8 KiB
Python
"""
|
||
GPU-accelerated perceptual hashing via PyTorch + CUDA.
|
||
|
||
Implements the same pHash algorithm as the `imagehash` library (DCT-II,
|
||
8×8 low-frequency block, 64-bit hash) so hashes produced here are
|
||
directly comparable with any existing imagehash-generated hashes in the DB.
|
||
|
||
Falls back to CPU if CUDA is not available — no code changes needed.
|
||
"""
|
||
|
||
import logging
|
||
import math
|
||
from pathlib import Path
|
||
|
||
import numpy as np
|
||
import torch
|
||
from PIL import Image, UnidentifiedImageError
|
||
|
||
try:
|
||
from pillow_heif import register_heif_opener
|
||
register_heif_opener()
|
||
except ImportError:
|
||
pass
|
||
|
||
log = logging.getLogger(__name__)
|
||
|
||
# Must match imagehash defaults: hash_size=8, highfreq_factor=4
|
||
HASH_SIZE = 8
|
||
IMG_SIZE = HASH_SIZE * 4 # 32
|
||
BATCH_SIZE = 256 # images per GPU batch; lower if VRAM is tight
|
||
|
||
|
||
class GpuPhasher:
|
||
"""
|
||
Batched perceptual hasher. Uses CUDA when available, CPU otherwise.
|
||
|
||
The DCT is implemented as two matrix multiplications:
|
||
DCT2D(X) = D @ X @ Dᵀ
|
||
where D is the precomputed orthonormal DCT-II matrix of size IMG_SIZE.
|
||
This runs entirely on-GPU for the full batch.
|
||
"""
|
||
|
||
def __init__(self, batch_size: int = BATCH_SIZE):
|
||
self.batch_size = batch_size
|
||
if torch.cuda.is_available():
|
||
self.device = torch.device("cuda")
|
||
dev_name = torch.cuda.get_device_name(0)
|
||
log.info("GpuPhasher: using CUDA device — %s", dev_name)
|
||
else:
|
||
self.device = torch.device("cpu")
|
||
log.info("GpuPhasher: CUDA not available, using CPU")
|
||
|
||
# Precompute orthonormal DCT-II matrix (IMG_SIZE × IMG_SIZE)
|
||
self._dct = self._build_dct_matrix(IMG_SIZE).to(self.device)
|
||
|
||
# ── DCT matrix ────────────────────────────────────────────────────────────
|
||
|
||
@staticmethod
|
||
def _build_dct_matrix(n: int) -> torch.Tensor:
|
||
"""Orthonormal DCT-II matrix of size n×n."""
|
||
k = torch.arange(n, dtype=torch.float32).unsqueeze(1) # (n, 1)
|
||
i = torch.arange(n, dtype=torch.float32).unsqueeze(0) # (1, n)
|
||
mat = torch.cos(math.pi * k * (2.0 * i + 1.0) / (2.0 * n)) # (n, n)
|
||
mat[0] *= 1.0 / math.sqrt(n)
|
||
mat[1:] *= math.sqrt(2.0 / n)
|
||
return mat # (n, n)
|
||
|
||
# ── Image loading ─────────────────────────────────────────────────────────
|
||
|
||
@staticmethod
|
||
def _load_image(path: str) -> np.ndarray | None:
|
||
"""Load image → greyscale float32 numpy array of shape (IMG_SIZE, IMG_SIZE)."""
|
||
try:
|
||
img = (
|
||
Image.open(path)
|
||
.convert("L")
|
||
.resize((IMG_SIZE, IMG_SIZE), Image.Resampling.LANCZOS)
|
||
)
|
||
return np.asarray(img, dtype=np.float32)
|
||
except (UnidentifiedImageError, OSError, Exception):
|
||
return None
|
||
|
||
# ── Core GPU batch ────────────────────────────────────────────────────────
|
||
|
||
def _phash_batch(self, arrays: list[np.ndarray]) -> list[str]:
|
||
"""
|
||
Compute pHash for a list of (IMG_SIZE, IMG_SIZE) float32 numpy arrays.
|
||
Returns a list of 16-char hex strings (64-bit hashes).
|
||
"""
|
||
# Stack into GPU tensor (B, H, W)
|
||
batch = torch.from_numpy(np.stack(arrays)).to(self.device) # (B, 32, 32)
|
||
|
||
# 2D DCT: D @ X @ Dᵀ
|
||
dct2d = self._dct @ batch @ self._dct.T # (B, 32, 32)
|
||
|
||
# Keep only top-left HASH_SIZE × HASH_SIZE block
|
||
low = dct2d[:, :HASH_SIZE, :HASH_SIZE] # (B, 8, 8)
|
||
flat = low.reshape(low.shape[0], -1) # (B, 64)
|
||
|
||
# Each bit: is value > row mean?
|
||
means = flat.mean(dim=1, keepdim=True)
|
||
bits = (flat > means).cpu().numpy() # (B, 64) bool
|
||
|
||
# Pack bits → bytes → hex (matches imagehash's __str__ format)
|
||
return [np.packbits(b).tobytes().hex() for b in bits]
|
||
|
||
# ── Public API ────────────────────────────────────────────────────────────
|
||
|
||
def hash_files(
|
||
self,
|
||
paths: list[str],
|
||
progress_cb=None,
|
||
) -> dict[str, str]:
|
||
"""
|
||
Compute pHash for every path in `paths`.
|
||
|
||
Returns {path: hex_hash_string}. Paths that fail to open are omitted.
|
||
progress_cb(n_done: int) is called after each batch.
|
||
"""
|
||
results: dict[str, str] = {}
|
||
done = 0
|
||
|
||
for i in range(0, len(paths), self.batch_size):
|
||
chunk = paths[i : i + self.batch_size]
|
||
|
||
arrays: list[np.ndarray] = []
|
||
valid: list[str] = []
|
||
|
||
for p in chunk:
|
||
arr = self._load_image(p)
|
||
if arr is not None:
|
||
arrays.append(arr)
|
||
valid.append(p)
|
||
|
||
if arrays:
|
||
try:
|
||
hashes = self._phash_batch(arrays)
|
||
results.update(zip(valid, hashes))
|
||
except Exception as exc:
|
||
log.warning("GPU batch failed (%s); skipping batch", exc)
|
||
|
||
done += len(chunk)
|
||
if progress_cb:
|
||
progress_cb(done)
|
||
|
||
return results
|
||
|
||
@property
|
||
def using_gpu(self) -> bool:
|
||
return self.device.type == "cuda"
|
||
|
||
|
||
# ── Module-level singleton (created once, reused across scan phases) ──────────
|
||
|
||
_phasher: GpuPhasher | None = None
|
||
|
||
|
||
def get_phasher() -> GpuPhasher:
|
||
global _phasher
|
||
if _phasher is None:
|
||
_phasher = GpuPhasher()
|
||
return _phasher
|