banyaro/backend/ratelimit.py
rene de1677154f Security + E-Mail-HTML + Quartalsbericht + Registrierungspflicht
Registrierung & Login:
- E-Mail-Verifikation jetzt Pflicht vor erstem Login
- Register gibt keinen Token mehr zurück → "Postfach prüfen"-Screen
- Login blockt mit EMAIL_NOT_VERIFIED (403) wenn unverifiziert
- Resend-Verification ohne Auth (email-basiert)
- Frontend: _renderVerifyPending() nach Register und Login-Fehler
- Account-Lockout: 5 Fehlversuche → 15 Min gesperrt (ratelimit.py)
- Login Rate-Limit zusätzlich per E-Mail-Adresse (5/5 Min)
- Fehler-Tracking wird bei erfolgreichem Login zurückgesetzt

E-Mail-Templates (alle Mails jetzt HTML):
- email_html() Shared-Template in mailer.py (Gradient-Header, Warm-Beige)
- Verifikations-Mail, Passwort-Reset → HTML mit CTA-Button
- Admin-Outreach: plain text auto-wrapped in HTML
- Züchter-Mails (Antrag/Genehmigung/Ablehnung) → Template
- Tierschutz-Alert (litters.py) → Template
- send_support_mail → HTML
- outreach._build_message() + _send_smtp() unterstützen jetzt html= Parameter

Forum-Schutz:
- Post-Cooldown: 30 Sek zwischen beliebigen Posts (DB-Check)
- Stunden-Limit: 5 Threads / 20 Antworten pro User/Stunde
- Duplikat-Erkennung: gleicher Text in 5 Min blockiert (in-memory)
- content_filter.py: Spam-Keywords, URL-Sperre für Accounts < 7 Tage,
  Sonderzeichen-Ratio-Check

Security-Headers:
- HSTS: max-age=31536000; includeSubDomains
- Content-Security-Policy: frame-ancestors none, base-uri self, …
- X-Frame-Options entfernt (CSP frame-ancestors ist moderner)

Honeypot-Fallen (13 Scanner-Pfade → 24h IP-Sperre):
- /api/admin/users, /api/v1/users, /api/.env, /api/config,
  /api/setup, /api/install, /api/phpinfo, /api/debug,
  /api/actuator, /api/swagger, /api/graphql u.a.

Quartalsbericht-System:
- backend/scripts/generate_reports.py: 6 Sections
  (Sicherheit, Funktionsumfang, Dateien, Nutzer, Partner, Server)
- make reports: generiert alle Berichte aus dem Container, committed
- Scheduler: quarterly_report Job (1. Feb/Mai/Aug/Nov 07:00)
  → vollständige HTML-Mail an ADMIN_EMAIL
- quarterly_report erscheint im täglichen Status-Report

Admin-Panel:
- "Forum & Meldungen" → "Forum"
2026-05-01 08:20:53 +02:00

132 lines
4.5 KiB
Python

"""
BAN YARO — Rate Limiter + IP-Blocklist + Account-Lockout + Duplikat-Erkennung
Sliding-Window-Limiter, in-memory (kein Redis nötig für Single-Container).
"""
import hashlib
import threading
from collections import defaultdict, deque
from datetime import datetime, timedelta
from fastapi import HTTPException, Request
_buckets: dict[str, deque] = defaultdict(deque)
_blocklist: dict[str, datetime] = {} # ip → gesperrt bis
_login_failures: dict[str, list] = defaultdict(list) # email → [datetime, ...]
_post_hashes: dict[int, dict] = defaultdict(dict) # user_id → {hash: datetime}
_lock = threading.Lock()
_LOCKOUT_WINDOW = 15 # Minuten
_LOCKOUT_ATTEMPTS = 5 # Fehlversuche bis Sperre
_DUPLICATE_WINDOW = 300 # Sekunden (5 Minuten)
# ------------------------------------------------------------------
# IP-basiertes Rate Limiting
# ------------------------------------------------------------------
def check(request: Request, *, max_requests: int, window_seconds: int, key: str = ""):
"""Wirft HTTP 429 wenn max_requests im Zeitfenster überschritten."""
ip = (request.client.host if request.client else "unknown")
with _lock:
blocked_until = _blocklist.get(ip)
if blocked_until and datetime.utcnow() < blocked_until:
raise HTTPException(403, "Zugriff gesperrt.")
elif blocked_until:
del _blocklist[ip]
bucket_key = f"{key}:{ip}"
now = datetime.utcnow()
cutoff = now - timedelta(seconds=window_seconds)
with _lock:
dq = _buckets[bucket_key]
while dq and dq[0] < cutoff:
dq.popleft()
if len(dq) >= max_requests:
minutes = window_seconds // 60
raise HTTPException(
429,
f"Zu viele Versuche. Bitte warte {minutes} Minute(n) und versuche es erneut."
)
dq.append(now)
def block_ip(request: Request, hours: int = 24):
"""Sperrt eine IP für N Stunden (Honeypot-Treffer)."""
ip = request.client.host if request.client else None
if not ip or ip in ("127.0.0.1", "::1"):
return
with _lock:
_blocklist[ip] = datetime.utcnow() + timedelta(hours=hours)
def is_blocked(request: Request) -> bool:
ip = request.client.host if request.client else "unknown"
with _lock:
until = _blocklist.get(ip)
if until and datetime.utcnow() < until:
return True
elif until:
del _blocklist[ip]
return False
# ------------------------------------------------------------------
# Account-Lockout (per E-Mail)
# ------------------------------------------------------------------
def record_login_failure(email: str) -> int:
"""Failure aufzeichnen. Gibt Anzahl Fehlversuche im Fenster zurück."""
email = email.lower()
now = datetime.utcnow()
cutoff = now - timedelta(minutes=_LOCKOUT_WINDOW)
with _lock:
recent = [t for t in _login_failures[email] if t > cutoff]
recent.append(now)
_login_failures[email] = recent
return len(recent)
def is_account_locked(email: str) -> bool:
"""True wenn ≥5 Fehlversuche in den letzten 15 Minuten."""
email = email.lower()
now = datetime.utcnow()
cutoff = now - timedelta(minutes=_LOCKOUT_WINDOW)
with _lock:
recent = [t for t in _login_failures.get(email, []) if t > cutoff]
return len(recent) >= _LOCKOUT_ATTEMPTS
def clear_login_failures(email: str):
"""Bei erfolgreichem Login zurücksetzen."""
with _lock:
_login_failures.pop(email.lower(), None)
# ------------------------------------------------------------------
# Duplikat-Post-Erkennung (per User, in-memory)
# ------------------------------------------------------------------
def content_hash(text: str) -> str:
normalized = " ".join(text.lower().split())
return hashlib.sha256(normalized.encode()).hexdigest()[:20]
def is_duplicate_post(user_id: int, text: str) -> bool:
"""True wenn derselbe User denselben Text in den letzten 5 Minuten gepostet hat."""
h = content_hash(text)
now = datetime.utcnow()
cutoff = now - timedelta(seconds=_DUPLICATE_WINDOW)
with _lock:
hashes = _post_hashes[user_id]
# Alte Einträge bereinigen
expired = [k for k, ts in hashes.items() if ts < cutoff]
for k in expired:
del hashes[k]
return h in hashes
def record_post(user_id: int, text: str):
"""Post-Hash speichern nach erfolgreichem Erstellen."""
h = content_hash(text)
with _lock:
_post_hashes[user_id][h] = datetime.utcnow()