banyaro/backend/ratelimit.py
rene 15f854d96c Session 2026-04-23: Security, Content-Schutz, Wiki-Temperament-Migration
Security (9 Fixes):
- JWT_SECRET Pflicht-Check beim Start (Production)
- Rate-Limit: Login (10/5min), Register (5/h), KI-Training (10/h), Giftköder (3/h)
- KI-Training-Endpoint: Auth-Pflicht hinzugefügt
- Private Profile aus Freunde-Suche gefiltert
- OG-Tags XSS mit html.escape() gesichert
- Globales File-Upload-Limit 20 MB (Middleware)
- E-Mail-Maskierung für Moderatoren im Admin-Panel
- IP-Blocklist in ratelimit.py

Content-Schutz (4 Schichten):
- robots.txt: /api/ komplett Disallow, SSR-Seiten Allow
- Rate-Limit auf /api/wiki/rassen (60/min) + Detail (30/min)
- Honeypot /api/wiki/trap + unsichtbarer Link in index.html
- Wasserzeichen in KI-Enricher-Prompt

Wiki Temperament-Migration:
- 60-Wort Übersetzungsmap EN→DE
- Datenmüll-Filter (hunderasse, dog breed etc.)
- translate_existing_temperaments() + Admin-Button
- SW by-v318, APP_VER 306
2026-04-23 18:34:05 +02:00

67 lines
2.1 KiB
Python

"""
BAN YARO — Rate Limiter + IP-Blocklist
Sliding-Window-Limiter, in-memory (kein Redis nötig für Single-Container).
Blocklist für Honeypot-Treffer.
"""
import threading
from collections import defaultdict, deque
from datetime import datetime, timedelta
from fastapi import HTTPException, Request
_buckets: dict[str, deque] = defaultdict(deque)
_blocklist: dict[str, datetime] = {} # ip → gesperrt bis
_lock = threading.Lock()
def check(request: Request, *, max_requests: int, window_seconds: int, key: str = ""):
"""
Wirft HTTP 429 wenn max_requests im Zeitfenster überschritten.
key: optionaler Präfix um verschiedene Limits zu trennen (z.B. 'register', 'login').
"""
ip = (request.client.host if request.client else "unknown")
# Blocklist prüfen
with _lock:
blocked_until = _blocklist.get(ip)
if blocked_until and datetime.utcnow() < blocked_until:
raise HTTPException(403, "Zugriff gesperrt.")
elif blocked_until:
del _blocklist[ip]
bucket_key = f"{key}:{ip}"
now = datetime.utcnow()
cutoff = now - timedelta(seconds=window_seconds)
with _lock:
dq = _buckets[bucket_key]
while dq and dq[0] < cutoff:
dq.popleft()
if len(dq) >= max_requests:
minutes = window_seconds // 60
raise HTTPException(
429,
f"Zu viele Versuche. Bitte warte {minutes} Minute(n) und versuche es erneut."
)
dq.append(now)
def block_ip(request: Request, hours: int = 24):
"""Sperrt eine IP für N Stunden (Honeypot-Treffer)."""
ip = request.client.host if request.client else None
if not ip or ip in ("127.0.0.1", "::1"):
return
with _lock:
_blocklist[ip] = datetime.utcnow() + timedelta(hours=hours)
def is_blocked(request: Request) -> bool:
ip = request.client.host if request.client else "unknown"
with _lock:
until = _blocklist.get(ip)
if until and datetime.utcnow() < until:
return True
elif until:
del _blocklist[ip]
return False