""" BAN YARO — Rate Limiter + IP-Blocklist + Account-Lockout + Duplikat-Erkennung Sliding-Window-Limiter, in-memory (kein Redis nötig für Single-Container). """ import hashlib import threading from collections import defaultdict, deque from datetime import datetime, timedelta from fastapi import HTTPException, Request _buckets: dict[str, deque] = defaultdict(deque) _blocklist: dict[str, datetime] = {} # ip → gesperrt bis _login_failures: dict[str, list] = defaultdict(list) # email → [datetime, ...] _post_hashes: dict[int, dict] = defaultdict(dict) # user_id → {hash: datetime} _lock = threading.Lock() _LOCKOUT_WINDOW = 15 # Minuten _LOCKOUT_ATTEMPTS = 5 # Fehlversuche bis Sperre _DUPLICATE_WINDOW = 300 # Sekunden (5 Minuten) # ------------------------------------------------------------------ # IP-basiertes Rate Limiting # ------------------------------------------------------------------ def check(request: Request, *, max_requests: int, window_seconds: int, key: str = ""): """Wirft HTTP 429 wenn max_requests im Zeitfenster überschritten.""" ip = (request.client.host if request.client else "unknown") with _lock: blocked_until = _blocklist.get(ip) if blocked_until and datetime.utcnow() < blocked_until: raise HTTPException(403, "Zugriff gesperrt.") elif blocked_until: del _blocklist[ip] bucket_key = f"{key}:{ip}" now = datetime.utcnow() cutoff = now - timedelta(seconds=window_seconds) with _lock: dq = _buckets[bucket_key] while dq and dq[0] < cutoff: dq.popleft() if len(dq) >= max_requests: minutes = window_seconds // 60 raise HTTPException( 429, f"Zu viele Versuche. Bitte warte {minutes} Minute(n) und versuche es erneut." ) dq.append(now) def block_ip(request: Request, hours: int = 24): """Sperrt eine IP für N Stunden (Honeypot-Treffer).""" ip = request.client.host if request.client else None if not ip or ip in ("127.0.0.1", "::1"): return with _lock: _blocklist[ip] = datetime.utcnow() + timedelta(hours=hours) def is_blocked(request: Request) -> bool: ip = request.client.host if request.client else "unknown" with _lock: until = _blocklist.get(ip) if until and datetime.utcnow() < until: return True elif until: del _blocklist[ip] return False # ------------------------------------------------------------------ # Account-Lockout (per E-Mail) # ------------------------------------------------------------------ def record_login_failure(email: str) -> int: """Failure aufzeichnen. Gibt Anzahl Fehlversuche im Fenster zurück.""" email = email.lower() now = datetime.utcnow() cutoff = now - timedelta(minutes=_LOCKOUT_WINDOW) with _lock: recent = [t for t in _login_failures[email] if t > cutoff] recent.append(now) _login_failures[email] = recent return len(recent) def is_account_locked(email: str) -> bool: """True wenn ≥5 Fehlversuche in den letzten 15 Minuten.""" email = email.lower() now = datetime.utcnow() cutoff = now - timedelta(minutes=_LOCKOUT_WINDOW) with _lock: recent = [t for t in _login_failures.get(email, []) if t > cutoff] return len(recent) >= _LOCKOUT_ATTEMPTS def clear_login_failures(email: str): """Bei erfolgreichem Login zurücksetzen.""" with _lock: _login_failures.pop(email.lower(), None) # ------------------------------------------------------------------ # Duplikat-Post-Erkennung (per User, in-memory) # ------------------------------------------------------------------ def content_hash(text: str) -> str: normalized = " ".join(text.lower().split()) return hashlib.sha256(normalized.encode()).hexdigest()[:20] def is_duplicate_post(user_id: int, text: str) -> bool: """True wenn derselbe User denselben Text in den letzten 5 Minuten gepostet hat.""" h = content_hash(text) now = datetime.utcnow() cutoff = now - timedelta(seconds=_DUPLICATE_WINDOW) with _lock: hashes = _post_hashes[user_id] # Alte Einträge bereinigen expired = [k for k, ts in hashes.items() if ts < cutoff] for k in expired: del hashes[k] return h in hashes def record_post(user_id: int, text: str): """Post-Hash speichern nach erfolgreichem Erstellen.""" h = content_hash(text) with _lock: _post_hashes[user_id][h] = datetime.utcnow()