banyaro/backend/auth.py
rene 553e9e7854 Sprint 12+13: Tagebuch Day-One-Redesign, Notiz-Feature, Icon-Fixes, SW by-v405
Tagebuch:
- Day-One-Listenansicht: Wochentag + Tageszahl + Meta-Zeile (Zeit/Ort/Wetter)
- 4 Ansichten: Liste, Medien-Mosaik, Kalender (mit Sprungbuttons), Karte (GPS-Marker)
- Detail-Ansicht inline im Content-Bereich (kein Fullscreen-Overlay mehr)
- Hero-Bild vollständig sichtbar (object-fit:contain), Lightbox mit Safe-Area
- 2-Spalten-Layout Desktop: Text + Leaflet-Karte + POI-Liste
- EXIF-GPS-Extraktion bei Foto-Upload, historisches Wetter via Archive-API
- NoteStation-Import: Fotos in diary_media (80 Einträge migriert, 94 Medien)
- Stats-Endpoints: /diary/stats, /diary/calendar, /diary/locations

Notiz-Feature:
- Generische notes-Tabelle (parent_type + parent_id + meta_json)
- 📝-Button in 8 Bereichen, Notizblock-Seite mit KI-Analyse
- KI-Toggle in Einstellungen, notes_ki_enabled in User-Profil

Icons & Design:
- fill:currentColor Fix für welcome/onboarding/friends.js
- --c-icon Variable, --c-text-muted Dark Mode aufgehellt
- 15+ neue Phosphor-Icons aus lokaler Kopie
- CSS Network-First im SW, Cache-Control-Middleware

Infrastruktur:
- Wiki-Anreicherungs-Scheduler-Jobs entfernt (abgeschlossen)
- auth.py: notes_ki_enabled + is_social_media im User-Response
2026-04-25 20:44:46 +02:00

137 lines
4.4 KiB
Python

"""
BAN YARO — Auth
JWT + Bcrypt. Einmal gebaut, von allen Routes genutzt.
"""
import os
import jwt
import bcrypt
import logging
from datetime import datetime, timedelta, timezone
from fastapi import Depends, HTTPException, status, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from database import db
logger = logging.getLogger(__name__)
JWT_SECRET = os.getenv("JWT_SECRET", "change-me-in-production")
JWT_ALGO = "HS256"
JWT_EXPIRY = int(os.getenv("JWT_EXPIRY_DAYS", "30"))
if JWT_SECRET == "change-me-in-production" and os.getenv("ENV") == "production":
raise RuntimeError(
"SICHERHEITSFEHLER: JWT_SECRET ist nicht gesetzt. "
"Bitte JWT_SECRET in .env setzen und Container neu starten."
)
security = HTTPBearer(auto_error=False)
# ------------------------------------------------------------------
# Passwort
# ------------------------------------------------------------------
def hash_password(password: str) -> str:
return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
def verify_password(password: str, hashed: str) -> bool:
return bcrypt.checkpw(password.encode(), hashed.encode())
# ------------------------------------------------------------------
# JWT
# ------------------------------------------------------------------
def create_token(user_id: int, rolle: str) -> str:
payload = {
"sub": str(user_id),
"rolle": rolle,
"exp": datetime.now(timezone.utc) + timedelta(days=JWT_EXPIRY),
"iat": datetime.now(timezone.utc),
}
return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGO)
def decode_token(token: str) -> dict:
return jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGO])
# ------------------------------------------------------------------
# FastAPI Dependencies
# ------------------------------------------------------------------
def _get_token_from_request(
request: Request,
credentials: HTTPAuthorizationCredentials = Depends(security),
) -> str | None:
"""Token aus Bearer-Header oder HttpOnly-Cookie."""
if credentials:
return credentials.credentials
return request.cookies.get("by_token")
def get_current_user(
request: Request,
credentials: HTTPAuthorizationCredentials = Depends(security),
):
"""Dependency: gibt den eingeloggten User zurück oder wirft 401."""
token = _get_token_from_request(request, credentials)
if not token:
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Nicht eingeloggt.")
try:
payload = decode_token(token)
except jwt.ExpiredSignatureError:
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Session abgelaufen.")
except jwt.InvalidTokenError:
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Ungültiges Token.")
user_id = int(payload["sub"])
with db() as conn:
row = conn.execute(
"SELECT id, email, name, rolle, is_premium, is_moderator, is_banned, ban_reason, is_social_media, notes_ki_enabled FROM users WHERE id=?",
(user_id,)
).fetchone()
if not row:
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "User nicht gefunden.")
user = dict(row)
if user.get("is_banned"):
reason = user.get("ban_reason") or "Kein Grund angegeben."
raise HTTPException(status.HTTP_403_FORBIDDEN, f"Account gesperrt: {reason}")
return user
def get_current_user_optional(
request: Request,
credentials: HTTPAuthorizationCredentials = Depends(security),
):
"""Dependency: gibt User zurück falls eingeloggt, sonst None."""
try:
return get_current_user(request, credentials)
except HTTPException:
return None
def require_premium(user=Depends(get_current_user)):
"""Dependency: nur für Premium-User."""
if not user["is_premium"]:
raise HTTPException(
status.HTTP_402_PAYMENT_REQUIRED,
"Dieses Feature erfordert Ban Yaro Premium."
)
return user
def require_admin(user=Depends(get_current_user)):
"""Dependency: nur für Admins."""
if user["rolle"] != "admin":
raise HTTPException(status.HTTP_403_FORBIDDEN, "Kein Zugriff.")
return user
def require_social_media(user=Depends(get_current_user)):
"""Dependency: Social-Media-Manager oder Admin."""
if not (user.get("is_social_media") or user["rolle"] == "admin"):
raise HTTPException(status.HTTP_403_FORBIDDEN, "Kein Zugriff.")
return user