Files
irc-bot/bot/memory.py
tocmo0nlord ff3f6fe05b Add Debian .deb package support
- debian/control, changelog, conffiles — package metadata
- debian/postinst — creates irc-bot system user, installs Python venv,
  symlinks runtime dirs, enables systemd services
- debian/prerm/postrm — clean stop/uninstall with purge support
- debian/systemd/ — hardened systemd units for bot and portal
- build_deb.sh — assembles and builds the .deb via dpkg-deb
- Path resolver in irc_client.py, memory.py, config_manager.py, portal/app.py:
  uses /etc/irc-bot + /var/lib/irc-bot when installed as .deb, relative
  paths otherwise (Docker/venv unchanged)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-17 22:16:05 -04:00

169 lines
5.3 KiB
Python

import sqlite3
import os
import logging
import re
logger = logging.getLogger(__name__)
HISTORY_DIR = (
"/var/lib/irc-bot/history"
if os.path.isdir("/var/lib/irc-bot")
else "data/history"
)
def _sanitize_channel(channel: str) -> str:
name = channel.lstrip("#")
name = re.sub(r"[#&+!]", "_", name)
return name
def _db_path(channel: str, nick: str) -> str:
chan_dir = os.path.join(HISTORY_DIR, _sanitize_channel(channel))
os.makedirs(chan_dir, exist_ok=True)
return os.path.join(chan_dir, f"{nick}.db")
def _get_conn(path: str) -> sqlite3.Connection:
conn = sqlite3.connect(path)
conn.execute("PRAGMA journal_mode=WAL;")
conn.execute("""
CREATE TABLE IF NOT EXISTS exchanges (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
user_input TEXT NOT NULL,
bot_reply TEXT NOT NULL
)
""")
conn.commit()
return conn
def load_history(channel: str, nick: str, limit: int) -> list[dict]:
path = _db_path(channel, nick)
if not os.path.exists(path):
return []
try:
conn = _get_conn(path)
cursor = conn.execute(
"SELECT user_input, bot_reply FROM exchanges ORDER BY id DESC LIMIT ?",
(limit,),
)
rows = cursor.fetchall()
conn.close()
return [{"user": r[0], "assistant": r[1]} for r in reversed(rows)]
except Exception as e:
logger.error(f"[MEMORY] Failed to load history for {nick} in {channel}: {e}")
return []
def save_exchange(channel: str, nick: str, user_input: str, bot_reply: str) -> None:
path = _db_path(channel, nick)
try:
conn = _get_conn(path)
conn.execute(
"INSERT INTO exchanges (user_input, bot_reply) VALUES (?, ?)",
(user_input, bot_reply),
)
conn.commit()
conn.close()
except Exception as e:
logger.error(f"[MEMORY] Failed to write exchange for {nick} in {channel}: {e}")
def delete_user_history(channel: str, nick: str) -> None:
path = _db_path(channel, nick)
if os.path.exists(path):
os.remove(path)
logger.info(f"[MEMORY] Deleted history for {nick} in {channel}")
def delete_channel_history(channel: str) -> None:
chan_dir = os.path.join(HISTORY_DIR, _sanitize_channel(channel))
if os.path.isdir(chan_dir):
for f in os.listdir(chan_dir):
if f.endswith(".db"):
os.remove(os.path.join(chan_dir, f))
logger.info(f"[MEMORY] Cleared all history for {channel}")
def delete_all_history() -> None:
for root, dirs, files in os.walk(HISTORY_DIR):
for f in files:
if f.endswith(".db"):
os.remove(os.path.join(root, f))
logger.info("[MEMORY] All history deleted")
def prune_old_exchanges(max_age_days: int) -> None:
if max_age_days <= 0:
return
pruned = 0
for root, dirs, files in os.walk(HISTORY_DIR):
for f in files:
if not f.endswith(".db"):
continue
path = os.path.join(root, f)
try:
conn = sqlite3.connect(path)
conn.execute("PRAGMA journal_mode=WAL;")
cursor = conn.execute(
"DELETE FROM exchanges WHERE timestamp < datetime('now', ?)",
(f"-{max_age_days} days",),
)
pruned += cursor.rowcount
conn.commit()
conn.close()
except Exception as e:
logger.error(f"[MEMORY] Pruning failed for {path}: {e}")
if pruned:
logger.info(f"[MEMORY] Pruned {pruned} old exchanges (>{max_age_days} days)")
def list_channels() -> list[str]:
if not os.path.isdir(HISTORY_DIR):
return []
return [d for d in os.listdir(HISTORY_DIR) if os.path.isdir(os.path.join(HISTORY_DIR, d))]
def list_nicks(channel_dir: str) -> list[str]:
path = os.path.join(HISTORY_DIR, channel_dir)
if not os.path.isdir(path):
return []
return [f[:-3] for f in os.listdir(path) if f.endswith(".db")]
def get_all_exchanges(channel_dir: str, nick: str) -> list[dict]:
path = os.path.join(HISTORY_DIR, channel_dir, f"{nick}.db")
if not os.path.exists(path):
return []
try:
conn = sqlite3.connect(path)
cursor = conn.execute(
"SELECT id, timestamp, user_input, bot_reply FROM exchanges ORDER BY id ASC"
)
rows = cursor.fetchall()
conn.close()
return [{"id": r[0], "timestamp": r[1], "user": r[2], "assistant": r[3]} for r in rows]
except Exception as e:
logger.error(f"[MEMORY] Failed to read all exchanges: {e}")
return []
def get_stats() -> dict:
total = 0
total_size = 0
for root, dirs, files in os.walk(HISTORY_DIR):
for f in files:
if f.endswith(".db"):
path = os.path.join(root, f)
total_size += os.path.getsize(path)
try:
conn = sqlite3.connect(path)
row = conn.execute("SELECT COUNT(*) FROM exchanges").fetchone()
total += row[0] if row else 0
conn.close()
except Exception:
pass
return {"total_exchanges": total, "total_size_bytes": total_size}