""" BAN YARO — Rate Limiter + IP-Blocklist Sliding-Window-Limiter, in-memory (kein Redis nötig für Single-Container). Blocklist für Honeypot-Treffer. """ import threading from collections import defaultdict, deque from datetime import datetime, timedelta from fastapi import HTTPException, Request _buckets: dict[str, deque] = defaultdict(deque) _blocklist: dict[str, datetime] = {} # ip → gesperrt bis _lock = threading.Lock() def check(request: Request, *, max_requests: int, window_seconds: int, key: str = ""): """ Wirft HTTP 429 wenn max_requests im Zeitfenster überschritten. key: optionaler Präfix um verschiedene Limits zu trennen (z.B. 'register', 'login'). """ ip = (request.client.host if request.client else "unknown") # Blocklist prüfen with _lock: blocked_until = _blocklist.get(ip) if blocked_until and datetime.utcnow() < blocked_until: raise HTTPException(403, "Zugriff gesperrt.") elif blocked_until: del _blocklist[ip] bucket_key = f"{key}:{ip}" now = datetime.utcnow() cutoff = now - timedelta(seconds=window_seconds) with _lock: dq = _buckets[bucket_key] while dq and dq[0] < cutoff: dq.popleft() if len(dq) >= max_requests: minutes = window_seconds // 60 raise HTTPException( 429, f"Zu viele Versuche. Bitte warte {minutes} Minute(n) und versuche es erneut." ) dq.append(now) def block_ip(request: Request, hours: int = 24): """Sperrt eine IP für N Stunden (Honeypot-Treffer).""" ip = request.client.host if request.client else None if not ip or ip in ("127.0.0.1", "::1"): return with _lock: _blocklist[ip] = datetime.utcnow() + timedelta(hours=hours) def is_blocked(request: Request) -> bool: ip = request.client.host if request.client else "unknown" with _lock: until = _blocklist.get(ip) if until and datetime.utcnow() < until: return True elif until: del _blocklist[ip] return False