"""BAN YARO — User-Profil Routes""" import io import os import uuid from typing import Optional from fastapi import APIRouter, Depends, HTTPException, UploadFile, File from pydantic import BaseModel from auth import get_current_user from database import db router = APIRouter() MEDIA_DIR = os.getenv("MEDIA_DIR", "/data/media") VALID_ERFAHRUNG = {"einsteiger", "erfahren", "trainer", "zuechter"} VALID_SICHTBARKEIT = {"public", "friends", "private"} class ProfileUpdate(BaseModel): real_name: Optional[str] = None bio: Optional[str] = None wohnort: Optional[str] = None erfahrung: Optional[str] = None social_link: Optional[str] = None profil_sichtbarkeit: Optional[str] = None notes_ki_enabled: Optional[int] = None gassi_stunde_push: Optional[int] = None preferred_theme: Optional[str] = None billing_address: Optional[str] = None def _load_user(user_id: int) -> dict: with db() as conn: row = conn.execute( """SELECT id, name, real_name, email, rolle, is_premium, email_verified, bio, wohnort, erfahrung, social_link, profil_sichtbarkeit, avatar_url, created_at, billing_address FROM users WHERE id=?""", (user_id,) ).fetchone() if not row: raise HTTPException(404, "User nicht gefunden.") data = dict(row) data["is_premium"] = bool(data["is_premium"]) return data @router.patch("") async def update_profile(data: ProfileUpdate, user=Depends(get_current_user)): fields = data.model_dump(exclude_none=True) # Validierungen if "erfahrung" in fields and fields["erfahrung"] not in VALID_ERFAHRUNG: raise HTTPException(400, f"erfahrung muss eines von {sorted(VALID_ERFAHRUNG)} sein.") if "profil_sichtbarkeit" in fields and fields["profil_sichtbarkeit"] not in VALID_SICHTBARKEIT: raise HTTPException(400, f"profil_sichtbarkeit muss eines von {sorted(VALID_SICHTBARKEIT)} sein.") if "preferred_theme" in fields and fields["preferred_theme"] not in ("system", "light", "dark"): raise HTTPException(400, "preferred_theme muss 'system', 'light' oder 'dark' sein.") if "bio" in fields and len(fields["bio"]) > 300: raise HTTPException(400, "bio darf maximal 300 Zeichen lang sein.") if "wohnort" in fields and len(fields["wohnort"]) > 60: raise HTTPException(400, "wohnort darf maximal 60 Zeichen lang sein.") if "social_link" in fields and len(fields["social_link"]) > 120: raise HTTPException(400, "social_link darf maximal 120 Zeichen lang sein.") if not fields: return _load_user(user["id"]) set_clause = ", ".join(f"{k}=?" for k in fields) values = list(fields.values()) + [user["id"]] with db() as conn: conn.execute( f"UPDATE users SET {set_clause} WHERE id=?", values ) return _load_user(user["id"]) @router.post("/avatar") async def upload_avatar( file: UploadFile = File(...), user=Depends(get_current_user), ): # HEIC-Support registrieren falls vorhanden try: import pillow_heif pillow_heif.register_heif_opener() except ImportError: pass from PIL import Image, ImageOps content = await file.read() try: img = Image.open(io.BytesIO(content)) img = ImageOps.exif_transpose(img) # EXIF-Orientierung anwenden img = img.convert("RGB") buf = io.BytesIO() img.save(buf, format="JPEG", quality=90) content = buf.getvalue() except Exception: pass # Fallback: Originaldaten speichern filename = f"avatar_{user['id']}_{uuid.uuid4().hex[:8]}.jpg" path = os.path.join(MEDIA_DIR, "avatars", filename) os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "wb") as f: f.write(content) avatar_url = f"/media/avatars/{filename}" with db() as conn: conn.execute( "UPDATE users SET avatar_url=? WHERE id=?", (avatar_url, user["id"]) ) return {"avatar_url": avatar_url} # ---------------------------------------------------------- # GET /profile/world-config — Welten-Chip-Konfiguration laden # PUT /profile/world-config — Welten-Chip-Konfiguration speichern # ---------------------------------------------------------- import json as _json @router.get('/world-config') async def get_world_config(user=Depends(get_current_user)): with db() as conn: row = conn.execute("SELECT world_config FROM users WHERE id=?", (user['id'],)).fetchone() cfg = row['world_config'] if row and row['world_config'] else None return {"config": _json.loads(cfg) if cfg else None} class WorldConfigIn(BaseModel): config: dict @router.put('/world-config') async def put_world_config(body: WorldConfigIn, user=Depends(get_current_user)): with db() as conn: conn.execute("UPDATE users SET world_config=? WHERE id=?", (_json.dumps(body.config), user['id'])) return {"status": "ok"} # ---------------------------------------------------------- # DELETE /profile/account — Konto unwiderruflich löschen # ---------------------------------------------------------- @router.delete('/account') async def delete_account(user=Depends(get_current_user)): """Löscht das Konto und alle zugehörigen Daten unwiderruflich.""" uid = user['id'] with db() as conn: # Alle Hunde-IDs des Users dog_ids = [r['id'] for r in conn.execute( "SELECT id FROM dogs WHERE user_id=?", (uid,)).fetchall()] for did in dog_ids: conn.execute("DELETE FROM diary WHERE dog_id=?", (did,)) conn.execute("DELETE FROM health WHERE dog_id=?", (did,)) conn.execute("DELETE FROM training_sessions WHERE dog_id=?", (did,)) conn.execute("DELETE FROM training_streaks WHERE dog_id=?", (did,)) conn.execute("DELETE FROM expenses WHERE dog_id=?", (did,)) conn.execute("DELETE FROM dogs WHERE user_id=?", (uid,)) conn.execute("DELETE FROM push_subscriptions WHERE user_id=?", (uid,)) conn.execute("DELETE FROM notifications WHERE user_id=?", (uid,)) conn.execute("DELETE FROM forum_posts WHERE user_id=?", (uid,)) conn.execute("DELETE FROM users WHERE id=?", (uid,)) return {"status": "deleted"} # ---------------------------------------------------------- # GET /profile/export — DSGVO Datenexport (Art. 20) # ---------------------------------------------------------- @router.get('/export') async def export_user_data(user=Depends(get_current_user)): """Gibt alle personenbezogenen Daten des Users als JSON zurück (Art. 20 DSGVO).""" import json as _json from datetime import datetime as _dt from fastapi.responses import Response as _Response def _q(conn, sql, params=()): """Sicheres Query — gibt leere Liste zurück wenn Tabelle/Spalte fehlt.""" try: return [dict(r) for r in conn.execute(sql, params).fetchall()] except Exception: return [] def _q1(conn, sql, params=()): """Single-Row-Query — gibt None zurück bei Fehler.""" try: r = conn.execute(sql, params).fetchone() return dict(r) if r else None except Exception: return None uid = user['id'] with db() as conn: # Nutzerprofil u = _q1(conn, "SELECT id, name, email, bio, wohnort, erfahrung, social_link, " "email_verified, is_premium, subscription_tier, created_at " "FROM users WHERE id=?", (uid,)) or {} # Hunde dogs_raw = _q(conn, "SELECT * FROM dogs WHERE user_id=?", (uid,)) dogs_out = [] for dog in dogs_raw: did = dog['id'] # Tagebuch (nur vorhandene Spalten) diary_rows = _q(conn, "SELECT id, datum, typ, titel, text, gps_lat, gps_lon, " "is_milestone, created_at FROM diary WHERE dog_id=?", (did,)) for de in diary_rows: # diary_media: preview_url existiert nicht → url + media_type de['media'] = _q(conn, "SELECT url, media_type FROM diary_media WHERE diary_id=?", (de['id'],)) # Gesundheit (alle via Migration ergänzten Spalten schützen) health_rows = _q(conn, "SELECT id, typ, bezeichnung, datum, naechstes, notiz FROM health " "WHERE dog_id=?", (did,)) for he in health_rows: he['media'] = _q(conn, "SELECT url, media_type FROM health_media WHERE health_id=?", (he['id'],)) dog['tagebuch'] = diary_rows dog['gesundheit'] = health_rows dog['trainingsfortschritt'] = _q(conn, "SELECT exercise_id, status, updated_at FROM exercise_progress " "WHERE dog_id=?", (did,)) dog['ausgaben'] = _q(conn, "SELECT datum, betrag, kategorie, notiz FROM expenses " "WHERE dog_id=?", (did,)) dog['verhaltensprotokoll'] = _q(conn, "SELECT datum, uhrzeit, kategorie, intensitaet, trigger, notiz " "FROM behavior_log WHERE dog_id=?", (did,)) dog['versicherung'] = _q(conn, "SELECT anbieter, police_nr, jahresbeitrag, kontakt, ablaufdatum, notizen " "FROM dog_insurance WHERE dog_id=?", (did,)) dog['ernaehrungsprofil'] = _q1(conn, "SELECT futter_typ, marke, kcal_tag, portionen, notizen " "FROM futter_profil WHERE dog_id=?", (did,)) dog['futter_eintraege'] = _q(conn, "SELECT datum, uhrzeit, futter_name, futter_typ, menge_g, notiz " "FROM futter_eintraege WHERE dog_id=?", (did,)) dog['futter_reaktionen'] = _q(conn, "SELECT datum, uhrzeit, reaktion_typ, intensitaet, notiz " "FROM futter_reaktionen WHERE dog_id=?", (did,)) dog['routen'] = _q(conn, "SELECT r.name, r.distanz_km, date(r.created_at) AS datum " "FROM routes r JOIN route_dogs rd ON rd.route_id=r.id " "WHERE rd.dog_id=?", (did,)) dogs_out.append(dog) forum = _q(conn, "SELECT ft.title, fp.content, fp.created_at, " "CASE WHEN fp.parent_id IS NULL THEN 'Thread' ELSE 'Antwort' END AS art " "FROM forum_posts fp LEFT JOIN forum_threads ft ON ft.id=fp.thread_id " "WHERE fp.user_id=? ORDER BY fp.created_at DESC", (uid,)) walk_participations = _q(conn, "SELECT w.titel, w.datum, w.uhrzeit, w.ort_name " "FROM walk_participants wp JOIN walks w ON w.id=wp.walk_id " "WHERE wp.user_id=?", (uid,)) walk_photos = _q(conn, "SELECT wp.url, w.datum AS walk_datum, w.titel AS walk_titel, wp.created_at " "FROM walk_photos wp JOIN walks w ON w.id=wp.walk_id " "WHERE wp.user_id=?", (uid,)) push_count = _q1(conn, "SELECT COUNT(*) AS n FROM push_subscriptions WHERE user_id=?", (uid,)) push_count = (push_count or {}).get('n', 0) export = { "export_erstellt": _dt.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"), "hinweis": "Dieser Export enthält alle personenbezogenen Daten deines Ban-Yaro-Kontos gemäß Art. 20 DSGVO.", "profil": u, "hunde": dogs_out, "forum_beitraege": [dict(f) for f in forum], "gassi_teilnahmen": [dict(w) for w in walk_participations], "gassi_fotos": [dict(p) for p in walk_photos], "push_subscriptions": push_count, } content = _json.dumps(export, ensure_ascii=False, indent=2, default=str) today = _dt.utcnow().strftime("%Y-%m-%d") filename = f"banyaro-export-{today}.json" return _Response( content = content, media_type = "application/json", headers = {"Content-Disposition": f'attachment; filename="{filename}"'}, )