import sqlite3 import os import logging import re logger = logging.getLogger(__name__) HISTORY_DIR = "data/history" def _sanitize_channel(channel: str) -> str: name = channel.lstrip("#") name = re.sub(r"[#&+!]", "_", name) return name def _db_path(channel: str, nick: str) -> str: chan_dir = os.path.join(HISTORY_DIR, _sanitize_channel(channel)) os.makedirs(chan_dir, exist_ok=True) return os.path.join(chan_dir, f"{nick}.db") def _get_conn(path: str) -> sqlite3.Connection: conn = sqlite3.connect(path) conn.execute("PRAGMA journal_mode=WAL;") conn.execute(""" CREATE TABLE IF NOT EXISTS exchanges ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, user_input TEXT NOT NULL, bot_reply TEXT NOT NULL ) """) conn.commit() return conn def load_history(channel: str, nick: str, limit: int) -> list[dict]: path = _db_path(channel, nick) if not os.path.exists(path): return [] try: conn = _get_conn(path) cursor = conn.execute( "SELECT user_input, bot_reply FROM exchanges ORDER BY id DESC LIMIT ?", (limit,), ) rows = cursor.fetchall() conn.close() return [{"user": r[0], "assistant": r[1]} for r in reversed(rows)] except Exception as e: logger.error(f"[MEMORY] Failed to load history for {nick} in {channel}: {e}") return [] def save_exchange(channel: str, nick: str, user_input: str, bot_reply: str) -> None: path = _db_path(channel, nick) try: conn = _get_conn(path) conn.execute( "INSERT INTO exchanges (user_input, bot_reply) VALUES (?, ?)", (user_input, bot_reply), ) conn.commit() conn.close() except Exception as e: logger.error(f"[MEMORY] Failed to write exchange for {nick} in {channel}: {e}") def delete_user_history(channel: str, nick: str) -> None: path = _db_path(channel, nick) if os.path.exists(path): os.remove(path) logger.info(f"[MEMORY] Deleted history for {nick} in {channel}") def delete_channel_history(channel: str) -> None: chan_dir = os.path.join(HISTORY_DIR, _sanitize_channel(channel)) if os.path.isdir(chan_dir): for f in os.listdir(chan_dir): if f.endswith(".db"): os.remove(os.path.join(chan_dir, f)) logger.info(f"[MEMORY] Cleared all history for {channel}") def delete_all_history() -> None: for root, dirs, files in os.walk(HISTORY_DIR): for f in files: if f.endswith(".db"): os.remove(os.path.join(root, f)) logger.info("[MEMORY] All history deleted") def prune_old_exchanges(max_age_days: int) -> None: if max_age_days <= 0: return pruned = 0 for root, dirs, files in os.walk(HISTORY_DIR): for f in files: if not f.endswith(".db"): continue path = os.path.join(root, f) try: conn = sqlite3.connect(path) conn.execute("PRAGMA journal_mode=WAL;") cursor = conn.execute( "DELETE FROM exchanges WHERE timestamp < datetime('now', ?)", (f"-{max_age_days} days",), ) pruned += cursor.rowcount conn.commit() conn.close() except Exception as e: logger.error(f"[MEMORY] Pruning failed for {path}: {e}") if pruned: logger.info(f"[MEMORY] Pruned {pruned} old exchanges (>{max_age_days} days)") def list_channels() -> list[str]: if not os.path.isdir(HISTORY_DIR): return [] return [d for d in os.listdir(HISTORY_DIR) if os.path.isdir(os.path.join(HISTORY_DIR, d))] def list_nicks(channel_dir: str) -> list[str]: path = os.path.join(HISTORY_DIR, channel_dir) if not os.path.isdir(path): return [] return [f[:-3] for f in os.listdir(path) if f.endswith(".db")] def get_all_exchanges(channel_dir: str, nick: str) -> list[dict]: path = os.path.join(HISTORY_DIR, channel_dir, f"{nick}.db") if not os.path.exists(path): return [] try: conn = sqlite3.connect(path) cursor = conn.execute( "SELECT id, timestamp, user_input, bot_reply FROM exchanges ORDER BY id ASC" ) rows = cursor.fetchall() conn.close() return [{"id": r[0], "timestamp": r[1], "user": r[2], "assistant": r[3]} for r in rows] except Exception as e: logger.error(f"[MEMORY] Failed to read all exchanges: {e}") return [] def get_stats() -> dict: total = 0 total_size = 0 for root, dirs, files in os.walk(HISTORY_DIR): for f in files: if f.endswith(".db"): path = os.path.join(root, f) total_size += os.path.getsize(path) try: conn = sqlite3.connect(path) row = conn.execute("SELECT COUNT(*) FROM exchanges").fetchone() total += row[0] if row else 0 conn.close() except Exception: pass return {"total_exchanges": total, "total_size_bytes": total_size}