""" GPU-accelerated perceptual hashing via PyTorch + CUDA. Implements the same pHash algorithm as the `imagehash` library (DCT-II, 8×8 low-frequency block, 64-bit hash) so hashes produced here are directly comparable with any existing imagehash-generated hashes in the DB. Falls back to CPU if CUDA is not available — no code changes needed. """ import logging import math from pathlib import Path import numpy as np import torch from PIL import Image, UnidentifiedImageError try: from pillow_heif import register_heif_opener register_heif_opener() except ImportError: pass log = logging.getLogger(__name__) # Must match imagehash defaults: hash_size=8, highfreq_factor=4 HASH_SIZE = 8 IMG_SIZE = HASH_SIZE * 4 # 32 BATCH_SIZE = 256 # images per GPU batch; lower if VRAM is tight class GpuPhasher: """ Batched perceptual hasher. Uses CUDA when available, CPU otherwise. The DCT is implemented as two matrix multiplications: DCT2D(X) = D @ X @ Dᵀ where D is the precomputed orthonormal DCT-II matrix of size IMG_SIZE. This runs entirely on-GPU for the full batch. """ def __init__(self, batch_size: int = BATCH_SIZE): self.batch_size = batch_size if torch.cuda.is_available(): self.device = torch.device("cuda") dev_name = torch.cuda.get_device_name(0) log.info("GpuPhasher: using CUDA device — %s", dev_name) else: self.device = torch.device("cpu") log.info("GpuPhasher: CUDA not available, using CPU") # Precompute orthonormal DCT-II matrix (IMG_SIZE × IMG_SIZE) self._dct = self._build_dct_matrix(IMG_SIZE).to(self.device) # ── DCT matrix ──────────────────────────────────────────────────────────── @staticmethod def _build_dct_matrix(n: int) -> torch.Tensor: """Orthonormal DCT-II matrix of size n×n.""" k = torch.arange(n, dtype=torch.float32).unsqueeze(1) # (n, 1) i = torch.arange(n, dtype=torch.float32).unsqueeze(0) # (1, n) mat = torch.cos(math.pi * k * (2.0 * i + 1.0) / (2.0 * n)) # (n, n) mat[0] *= 1.0 / math.sqrt(n) mat[1:] *= math.sqrt(2.0 / n) return mat # (n, n) # ── Image loading ───────────────────────────────────────────────────────── @staticmethod def _load_image(path: str) -> np.ndarray | None: """Load image → greyscale float32 numpy array of shape (IMG_SIZE, IMG_SIZE).""" try: img = ( Image.open(path) .convert("L") .resize((IMG_SIZE, IMG_SIZE), Image.Resampling.LANCZOS) ) return np.asarray(img, dtype=np.float32) except (UnidentifiedImageError, OSError, Exception): return None # ── Core GPU batch ──────────────────────────────────────────────────────── def _phash_batch(self, arrays: list[np.ndarray]) -> list[str]: """ Compute pHash for a list of (IMG_SIZE, IMG_SIZE) float32 numpy arrays. Returns a list of 16-char hex strings (64-bit hashes). """ # Stack into GPU tensor (B, H, W) batch = torch.from_numpy(np.stack(arrays)).to(self.device) # (B, 32, 32) # 2D DCT: D @ X @ Dᵀ dct2d = self._dct @ batch @ self._dct.T # (B, 32, 32) # Keep only top-left HASH_SIZE × HASH_SIZE block low = dct2d[:, :HASH_SIZE, :HASH_SIZE] # (B, 8, 8) flat = low.reshape(low.shape[0], -1) # (B, 64) # Each bit: is value > row mean? means = flat.mean(dim=1, keepdim=True) bits = (flat > means).cpu().numpy() # (B, 64) bool # Pack bits → bytes → hex (matches imagehash's __str__ format) return [np.packbits(b).tobytes().hex() for b in bits] # ── Public API ──────────────────────────────────────────────────────────── def hash_files( self, paths: list[str], progress_cb=None, ) -> dict[str, str]: """ Compute pHash for every path in `paths`. Returns {path: hex_hash_string}. Paths that fail to open are omitted. progress_cb(n_done: int) is called after each batch. """ results: dict[str, str] = {} done = 0 for i in range(0, len(paths), self.batch_size): chunk = paths[i : i + self.batch_size] arrays: list[np.ndarray] = [] valid: list[str] = [] for p in chunk: arr = self._load_image(p) if arr is not None: arrays.append(arr) valid.append(p) if arrays: try: hashes = self._phash_batch(arrays) results.update(zip(valid, hashes)) except Exception as exc: log.warning("GPU batch failed (%s); skipping batch", exc) done += len(chunk) if progress_cb: progress_cb(done) return results @property def using_gpu(self) -> bool: return self.device.type == "cuda" # ── Module-level singleton (created once, reused across scan phases) ────────── _phasher: GpuPhasher | None = None def get_phasher() -> GpuPhasher: global _phasher if _phasher is None: _phasher = GpuPhasher() return _phasher