Sprint 0: Backend, Docker, KI-Layer mit Free/Premium-Trennung
This commit is contained in:
parent
84f49fafcf
commit
00be2bbcd5
17 changed files with 1107 additions and 0 deletions
119
backend/auth.py
Normal file
119
backend/auth.py
Normal file
|
|
@ -0,0 +1,119 @@
|
|||
"""
|
||||
BAN YARO — Auth
|
||||
JWT + Bcrypt. Einmal gebaut, von allen Routes genutzt.
|
||||
"""
|
||||
|
||||
import os
|
||||
import jwt
|
||||
import bcrypt
|
||||
import logging
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from fastapi import Depends, HTTPException, status, Request
|
||||
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
||||
|
||||
from database import db
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
JWT_SECRET = os.getenv("JWT_SECRET", "change-me-in-production")
|
||||
JWT_ALGO = "HS256"
|
||||
JWT_EXPIRY = int(os.getenv("JWT_EXPIRY_DAYS", "30"))
|
||||
|
||||
security = HTTPBearer(auto_error=False)
|
||||
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Passwort
|
||||
# ------------------------------------------------------------------
|
||||
def hash_password(password: str) -> str:
|
||||
return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
|
||||
|
||||
|
||||
def verify_password(password: str, hashed: str) -> bool:
|
||||
return bcrypt.checkpw(password.encode(), hashed.encode())
|
||||
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# JWT
|
||||
# ------------------------------------------------------------------
|
||||
def create_token(user_id: int, rolle: str) -> str:
|
||||
payload = {
|
||||
"sub": str(user_id),
|
||||
"rolle": rolle,
|
||||
"exp": datetime.now(timezone.utc) + timedelta(days=JWT_EXPIRY),
|
||||
"iat": datetime.now(timezone.utc),
|
||||
}
|
||||
return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGO)
|
||||
|
||||
|
||||
def decode_token(token: str) -> dict:
|
||||
return jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGO])
|
||||
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# FastAPI Dependencies
|
||||
# ------------------------------------------------------------------
|
||||
def _get_token_from_request(
|
||||
request: Request,
|
||||
credentials: HTTPAuthorizationCredentials = Depends(security),
|
||||
) -> str | None:
|
||||
"""Token aus Bearer-Header oder HttpOnly-Cookie."""
|
||||
if credentials:
|
||||
return credentials.credentials
|
||||
return request.cookies.get("by_token")
|
||||
|
||||
|
||||
def get_current_user(
|
||||
request: Request,
|
||||
credentials: HTTPAuthorizationCredentials = Depends(security),
|
||||
):
|
||||
"""Dependency: gibt den eingeloggten User zurück oder wirft 401."""
|
||||
token = _get_token_from_request(request, credentials)
|
||||
if not token:
|
||||
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Nicht eingeloggt.")
|
||||
|
||||
try:
|
||||
payload = decode_token(token)
|
||||
except jwt.ExpiredSignatureError:
|
||||
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Session abgelaufen.")
|
||||
except jwt.InvalidTokenError:
|
||||
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Ungültiges Token.")
|
||||
|
||||
user_id = int(payload["sub"])
|
||||
with db() as conn:
|
||||
row = conn.execute(
|
||||
"SELECT id, email, name, rolle, is_premium FROM users WHERE id=?",
|
||||
(user_id,)
|
||||
).fetchone()
|
||||
|
||||
if not row:
|
||||
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "User nicht gefunden.")
|
||||
|
||||
return dict(row)
|
||||
|
||||
|
||||
def get_current_user_optional(
|
||||
request: Request,
|
||||
credentials: HTTPAuthorizationCredentials = Depends(security),
|
||||
):
|
||||
"""Dependency: gibt User zurück falls eingeloggt, sonst None."""
|
||||
try:
|
||||
return get_current_user(request, credentials)
|
||||
except HTTPException:
|
||||
return None
|
||||
|
||||
|
||||
def require_premium(user=Depends(get_current_user)):
|
||||
"""Dependency: nur für Premium-User."""
|
||||
if not user["is_premium"]:
|
||||
raise HTTPException(
|
||||
status.HTTP_402_PAYMENT_REQUIRED,
|
||||
"Dieses Feature erfordert Ban Yaro Premium."
|
||||
)
|
||||
return user
|
||||
|
||||
|
||||
def require_admin(user=Depends(get_current_user)):
|
||||
"""Dependency: nur für Admins."""
|
||||
if user["rolle"] != "admin":
|
||||
raise HTTPException(status.HTTP_403_FORBIDDEN, "Kein Zugriff.")
|
||||
return user
|
||||
Loading…
Add table
Add a link
Reference in a new issue