Security (9 Fixes): - JWT_SECRET Pflicht-Check beim Start (Production) - Rate-Limit: Login (10/5min), Register (5/h), KI-Training (10/h), Giftköder (3/h) - KI-Training-Endpoint: Auth-Pflicht hinzugefügt - Private Profile aus Freunde-Suche gefiltert - OG-Tags XSS mit html.escape() gesichert - Globales File-Upload-Limit 20 MB (Middleware) - E-Mail-Maskierung für Moderatoren im Admin-Panel - IP-Blocklist in ratelimit.py Content-Schutz (4 Schichten): - robots.txt: /api/ komplett Disallow, SSR-Seiten Allow - Rate-Limit auf /api/wiki/rassen (60/min) + Detail (30/min) - Honeypot /api/wiki/trap + unsichtbarer Link in index.html - Wasserzeichen in KI-Enricher-Prompt Wiki Temperament-Migration: - 60-Wort Übersetzungsmap EN→DE - Datenmüll-Filter (hunderasse, dog breed etc.) - translate_existing_temperaments() + Admin-Button - SW by-v318, APP_VER 306
67 lines
2.1 KiB
Python
67 lines
2.1 KiB
Python
"""
|
|
BAN YARO — Rate Limiter + IP-Blocklist
|
|
Sliding-Window-Limiter, in-memory (kein Redis nötig für Single-Container).
|
|
Blocklist für Honeypot-Treffer.
|
|
"""
|
|
|
|
import threading
|
|
from collections import defaultdict, deque
|
|
from datetime import datetime, timedelta
|
|
|
|
from fastapi import HTTPException, Request
|
|
|
|
_buckets: dict[str, deque] = defaultdict(deque)
|
|
_blocklist: dict[str, datetime] = {} # ip → gesperrt bis
|
|
_lock = threading.Lock()
|
|
|
|
|
|
def check(request: Request, *, max_requests: int, window_seconds: int, key: str = ""):
|
|
"""
|
|
Wirft HTTP 429 wenn max_requests im Zeitfenster überschritten.
|
|
key: optionaler Präfix um verschiedene Limits zu trennen (z.B. 'register', 'login').
|
|
"""
|
|
ip = (request.client.host if request.client else "unknown")
|
|
|
|
# Blocklist prüfen
|
|
with _lock:
|
|
blocked_until = _blocklist.get(ip)
|
|
if blocked_until and datetime.utcnow() < blocked_until:
|
|
raise HTTPException(403, "Zugriff gesperrt.")
|
|
elif blocked_until:
|
|
del _blocklist[ip]
|
|
|
|
bucket_key = f"{key}:{ip}"
|
|
now = datetime.utcnow()
|
|
cutoff = now - timedelta(seconds=window_seconds)
|
|
|
|
with _lock:
|
|
dq = _buckets[bucket_key]
|
|
while dq and dq[0] < cutoff:
|
|
dq.popleft()
|
|
if len(dq) >= max_requests:
|
|
minutes = window_seconds // 60
|
|
raise HTTPException(
|
|
429,
|
|
f"Zu viele Versuche. Bitte warte {minutes} Minute(n) und versuche es erneut."
|
|
)
|
|
dq.append(now)
|
|
|
|
|
|
def block_ip(request: Request, hours: int = 24):
|
|
"""Sperrt eine IP für N Stunden (Honeypot-Treffer)."""
|
|
ip = request.client.host if request.client else None
|
|
if not ip or ip in ("127.0.0.1", "::1"):
|
|
return
|
|
with _lock:
|
|
_blocklist[ip] = datetime.utcnow() + timedelta(hours=hours)
|
|
|
|
|
|
def is_blocked(request: Request) -> bool:
|
|
ip = request.client.host if request.client else "unknown"
|
|
with _lock:
|
|
until = _blocklist.get(ip)
|
|
if until and datetime.utcnow() < until:
|
|
return True
|
|
elif until:
|
|
del _blocklist[ip]
|
|
return False
|