NEUE HELPER in auth.py:
require_moderator(user=Depends(get_current_user))
Konsequente Dependency statt inline
'if user["rolle"] not in ("admin", "moderator")'
require_breeder(user=Depends(get_current_user))
Konsequente Dependency statt inline
'if user["subscription_tier"] not in ("breeder", "breeder_test")'
require_owner(row, user, owner_field='user_id',
not_found_msg, forbidden_msg) -> row
Zentralisiert das häufigste Pattern (54 Stellen im Audit):
Statt:
row = conn.execute(...).fetchone()
if not row: raise HTTPException(404, ...)
if row['user_id'] != user['id']: raise HTTPException(403, ...)
Jetzt:
row = require_owner(conn.execute(...).fetchone(), user,
not_found_msg='Ort nicht gefunden.')
is_owner_or_admin(row, user, owner_field='user_id') -> bool
True wenn Owner ODER Admin/Moderator (Admin-Override für
Moderations-Endpoints)
DEMO-MIGRATION:
places.py PATCH /places/{id} + DELETE /places/{id} migriert auf
require_owner() — als Style-Referenz für künftige Migrationen.
KEINE Massen-Migration der 54 Stellen — bewusste Entscheidung
weil security-kritisch. Helper sind bereitgestellt, neuer Code
nutzt sie, bestehender bleibt funktional identisch.
Tests 19/19 grün.
Hinweis: Massen-Migration der Owner-Checks ist eigener Sprint mit
sehr sorgfältigem Testing — bei jeder migrierten Route muss die
404→403→Cascade durchgeprüft werden, dass Owner+Non-Owner+Admin
sich identisch zum Vorher verhalten.
291 lines
11 KiB
Python
291 lines
11 KiB
Python
"""
|
|
BAN YARO — Auth
|
|
JWT + Bcrypt. Einmal gebaut, von allen Routes genutzt.
|
|
"""
|
|
|
|
import os
|
|
import uuid
|
|
import jwt
|
|
import bcrypt
|
|
import logging
|
|
from datetime import datetime, timedelta, timezone
|
|
from fastapi import Depends, HTTPException, status, Request, Response
|
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
|
|
|
from database import db
|
|
|
|
logger = logging.getLogger(__name__)
|
|
JWT_SECRET = os.getenv("JWT_SECRET", "change-me-in-production")
|
|
JWT_ALGO = "HS256"
|
|
# Default 7 Tage (vorher 30) — sensible Hundedaten brauchen kürzere Sessions.
|
|
JWT_EXPIRY = int(os.getenv("JWT_EXPIRY_DAYS", "7"))
|
|
# Wenn das Token älter als (JWT_EXPIRY / JWT_REFRESH_FRACTION) Tage ist,
|
|
# wird das Cookie bei einer authentifizierten Anfrage neu gesetzt (Sliding Session).
|
|
# Standard 2 → Refresh wenn mehr als die Hälfte der Laufzeit weg ist.
|
|
JWT_REFRESH_FRACTION = int(os.getenv("JWT_REFRESH_FRACTION", "2"))
|
|
|
|
if JWT_SECRET == "change-me-in-production" and os.getenv("ENV") == "production":
|
|
raise RuntimeError(
|
|
"SICHERHEITSFEHLER: JWT_SECRET ist nicht gesetzt. "
|
|
"Bitte JWT_SECRET in .env setzen und Container neu starten."
|
|
)
|
|
|
|
security = HTTPBearer(auto_error=False)
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
# JWT-Blacklist (Logout invalidiert Token serverseitig)
|
|
# ------------------------------------------------------------------
|
|
def _is_jti_blacklisted(jti: str) -> bool:
|
|
if not jti:
|
|
return False
|
|
with db() as conn:
|
|
row = conn.execute(
|
|
"SELECT 1 FROM jwt_blacklist WHERE jti=?", (jti,)
|
|
).fetchone()
|
|
return bool(row)
|
|
|
|
|
|
def blacklist_jti(jti: str, expires_at_iso: str):
|
|
"""Token-ID auf die Blacklist setzen. expires_at_iso entspricht dem urspr. exp."""
|
|
if not jti or not expires_at_iso:
|
|
return
|
|
with db() as conn:
|
|
conn.execute(
|
|
"INSERT OR IGNORE INTO jwt_blacklist (jti, expires_at) VALUES (?,?)",
|
|
(jti, expires_at_iso)
|
|
)
|
|
|
|
|
|
def _purge_expired_jwt() -> int:
|
|
"""Entfernt abgelaufene Blacklist-Einträge. Gibt Anzahl gelöschter Zeilen zurück.
|
|
Kann später aus dem Scheduler aufgerufen werden."""
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
with db() as conn:
|
|
cur = conn.execute(
|
|
"DELETE FROM jwt_blacklist WHERE expires_at < ?", (now,)
|
|
)
|
|
deleted = cur.rowcount or 0
|
|
if deleted:
|
|
logger.info("JWT-Blacklist Cleanup: %d abgelaufene Einträge entfernt.", deleted)
|
|
return deleted
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
# Passwort
|
|
# ------------------------------------------------------------------
|
|
def hash_password(password: str) -> str:
|
|
return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
|
|
|
|
|
|
def verify_password(password: str, hashed: str) -> bool:
|
|
return bcrypt.checkpw(password.encode(), hashed.encode())
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
# JWT
|
|
# ------------------------------------------------------------------
|
|
def create_token(user_id: int, rolle: str) -> str:
|
|
now = datetime.now(timezone.utc)
|
|
payload = {
|
|
"sub": str(user_id),
|
|
"rolle": rolle,
|
|
"exp": now + timedelta(days=JWT_EXPIRY),
|
|
"iat": now,
|
|
"jti": uuid.uuid4().hex,
|
|
}
|
|
return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGO)
|
|
|
|
|
|
def decode_token(token: str) -> dict:
|
|
"""Dekodiert + prüft Signatur, Ablauf und Blacklist.
|
|
Wirft jwt.InvalidTokenError wenn Token auf der Blacklist steht."""
|
|
payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGO])
|
|
jti = payload.get("jti")
|
|
if jti and _is_jti_blacklisted(jti):
|
|
raise jwt.InvalidTokenError("Token wurde invalidiert (Logout).")
|
|
return payload
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
# FastAPI Dependencies
|
|
# ------------------------------------------------------------------
|
|
def _get_token_from_request(
|
|
request: Request,
|
|
credentials: HTTPAuthorizationCredentials = Depends(security),
|
|
) -> str | None:
|
|
"""Token aus Bearer-Header oder HttpOnly-Cookie."""
|
|
if credentials:
|
|
return credentials.credentials
|
|
return request.cookies.get("by_token")
|
|
|
|
|
|
def _maybe_refresh_token(request: Request, payload: dict):
|
|
"""Sliding-Session: setzt das Cookie neu wenn das Token älter als JWT_EXPIRY/JWT_REFRESH_FRACTION ist.
|
|
Nur wirksam wenn das Token aus dem Cookie kommt (nicht Bearer-Header).
|
|
Hängt das neue Token an request.state.refresh_token, damit Middleware es ins Response-Cookie setzen kann."""
|
|
try:
|
|
# Nur refreshen wenn Token aus Cookie kam (Bearer-Clients verwalten Tokens selbst)
|
|
if not request.cookies.get("by_token"):
|
|
return
|
|
iat = payload.get("iat")
|
|
exp = payload.get("exp")
|
|
if not iat or not exp:
|
|
return
|
|
# Wenn mehr als 1/JWT_REFRESH_FRACTION der Laufzeit vergangen ist → neues Token ausstellen
|
|
now_ts = datetime.now(timezone.utc).timestamp()
|
|
lifetime = exp - iat
|
|
if lifetime <= 0 or JWT_REFRESH_FRACTION <= 0:
|
|
return
|
|
threshold = iat + (lifetime / JWT_REFRESH_FRACTION)
|
|
if now_ts < threshold:
|
|
return
|
|
new_token = create_token(int(payload["sub"]), payload.get("rolle", "user"))
|
|
request.state.refresh_token = new_token
|
|
except Exception:
|
|
# Refresh-Fehler dürfen die eigentliche Anfrage nie blockieren.
|
|
logger.exception("Sliding-Token-Refresh fehlgeschlagen")
|
|
|
|
|
|
def get_current_user(
|
|
request: Request,
|
|
credentials: HTTPAuthorizationCredentials = Depends(security),
|
|
):
|
|
"""Dependency: gibt den eingeloggten User zurück oder wirft 401."""
|
|
token = _get_token_from_request(request, credentials)
|
|
if not token:
|
|
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Nicht eingeloggt.")
|
|
|
|
try:
|
|
payload = decode_token(token)
|
|
except jwt.ExpiredSignatureError:
|
|
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Session abgelaufen.")
|
|
except jwt.InvalidTokenError:
|
|
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Ungültiges Token.")
|
|
|
|
user_id = int(payload["sub"])
|
|
with db() as conn:
|
|
row = conn.execute(
|
|
"SELECT id, email, name, rolle, is_premium, is_moderator, is_banned, ban_reason, is_social_media, notes_ki_enabled, gassi_stunde_push, breeder_status, is_founder, is_partner, founder_number, email_verified, luna_trial_until, subscription_tier FROM users WHERE id=?",
|
|
(user_id,)
|
|
).fetchone()
|
|
|
|
if not row:
|
|
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "User nicht gefunden.")
|
|
|
|
user = dict(row)
|
|
if user.get("is_banned"):
|
|
reason = user.get("ban_reason") or "Kein Grund angegeben."
|
|
raise HTTPException(status.HTTP_403_FORBIDDEN, f"Account gesperrt: {reason}")
|
|
|
|
# Sliding-Session: ggf. neues Token vorbereiten (in Middleware ins Cookie schreiben).
|
|
_maybe_refresh_token(request, payload)
|
|
|
|
return user
|
|
|
|
|
|
def get_current_user_optional(
|
|
request: Request,
|
|
credentials: HTTPAuthorizationCredentials = Depends(security),
|
|
):
|
|
"""Dependency: gibt User zurück falls eingeloggt, sonst None."""
|
|
try:
|
|
return get_current_user(request, credentials)
|
|
except HTTPException:
|
|
return None
|
|
|
|
|
|
def require_premium(user=Depends(get_current_user)):
|
|
"""Dependency: nur für Premium-User."""
|
|
if not user["is_premium"]:
|
|
raise HTTPException(
|
|
status.HTTP_402_PAYMENT_REQUIRED,
|
|
"Dieses Feature erfordert Ban Yaro Premium."
|
|
)
|
|
return user
|
|
|
|
|
|
def require_admin(user=Depends(get_current_user)):
|
|
"""Dependency: nur für Admins."""
|
|
if user["rolle"] != "admin":
|
|
raise HTTPException(status.HTTP_403_FORBIDDEN, "Kein Zugriff.")
|
|
return user
|
|
|
|
|
|
def require_moderator(user=Depends(get_current_user)):
|
|
"""Dependency: Admin oder Moderator. Konsequente Nutzung statt
|
|
Inline-`if user['rolle'] not in (...):` in den Routen."""
|
|
if user["rolle"] not in ("admin", "moderator") and not user.get("is_moderator"):
|
|
raise HTTPException(status.HTTP_403_FORBIDDEN, "Moderator-Zugriff erforderlich.")
|
|
return user
|
|
|
|
|
|
def require_breeder(user=Depends(get_current_user)):
|
|
"""Dependency: Admin oder Züchter (breeder/breeder_test)."""
|
|
if user["rolle"] == "admin":
|
|
return user
|
|
if user.get("subscription_tier") in ("breeder", "breeder_test"):
|
|
return user
|
|
raise HTTPException(status.HTTP_403_FORBIDDEN, "Züchter-Zugriff erforderlich.")
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
# Owner-Checks — zentral, statt 54x inline `if row['user_id'] != user['id']: 403`
|
|
# ------------------------------------------------------------------
|
|
def require_owner(row, user: dict, owner_field: str = "user_id",
|
|
not_found_msg: str = "Nicht gefunden",
|
|
forbidden_msg: str = "Kein Zugriff"):
|
|
"""Wirft 404 wenn row None/falsy ist, 403 wenn User nicht Besitzer.
|
|
Returns row für chainability:
|
|
dog = require_owner(conn.execute(...).fetchone(), user, 'user_id', 'Hund nicht gefunden')
|
|
"""
|
|
if not row:
|
|
raise HTTPException(status.HTTP_404_NOT_FOUND, not_found_msg)
|
|
if row[owner_field] != user["id"]:
|
|
raise HTTPException(status.HTTP_403_FORBIDDEN, forbidden_msg)
|
|
return row
|
|
|
|
|
|
def is_owner_or_admin(row, user: dict, owner_field: str = "user_id") -> bool:
|
|
"""True wenn User Owner ist oder Admin/Moderator."""
|
|
if not row:
|
|
return False
|
|
if user["rolle"] in ("admin", "moderator") or user.get("is_moderator"):
|
|
return True
|
|
return row[owner_field] == user["id"]
|
|
|
|
|
|
def has_pro_access(user: dict) -> bool:
|
|
"""True wenn User Pro-Features nutzen darf."""
|
|
if not user:
|
|
return False
|
|
role = user.get("rolle", "user")
|
|
tier = user.get("subscription_tier", "standard")
|
|
if role in ("admin", "moderator"):
|
|
return True
|
|
if user.get("is_moderator") or user.get("is_social_media"):
|
|
return True
|
|
return tier in ("pro", "breeder", "pro_test", "breeder_test")
|
|
|
|
|
|
def has_breeder_access(user: dict) -> bool:
|
|
"""True wenn User Züchter-Features nutzen darf."""
|
|
if not user:
|
|
return False
|
|
role = user.get("rolle", "user")
|
|
tier = user.get("subscription_tier", "standard")
|
|
if role in ("admin", "moderator"):
|
|
return True
|
|
if user.get("is_moderator") or user.get("is_social_media"):
|
|
return True
|
|
return tier in ("breeder", "breeder_test") or role == "breeder"
|
|
|
|
|
|
def require_social_media(user=Depends(get_current_user)):
|
|
"""Dependency: Social-Media-Manager, Luna-Probezugang oder Admin."""
|
|
from datetime import datetime as _dt
|
|
trial = user.get("luna_trial_until")
|
|
trial_active = bool(trial and _dt.utcnow().isoformat() < trial)
|
|
if not (user.get("is_social_media") or user["rolle"] == "admin" or trial_active):
|
|
raise HTTPException(status.HTTP_403_FORBIDDEN, "Kein Zugriff.")
|
|
return user
|