"""BAN YARO — Hunde-Profil Routes""" import os import uuid from fastapi import APIRouter, Depends, HTTPException, UploadFile, File from pydantic import BaseModel from typing import Optional from database import db from auth import get_current_user, has_pro_access from routes.push import send_push_to_user from media_utils import safe_media_path, preview_url_from router = APIRouter() MEDIA_DIR = os.getenv("MEDIA_DIR", "/data/media") class DogCreate(BaseModel): name: str rasse: Optional[str] = None geburtstag: Optional[str] = None geschlecht: Optional[str] = None gewicht_kg: Optional[float] = None widerrist_cm: Optional[float] = None chip_nr: Optional[str] = None bio: Optional[str] = None is_public: bool = False class DogUpdate(BaseModel): name: Optional[str] = None rasse: Optional[str] = None rasse_id: Optional[int] = None geburtstag: Optional[str] = None geschlecht: Optional[str] = None gewicht_kg: Optional[float] = None widerrist_cm: Optional[float] = None chip_nr: Optional[str] = None bio: Optional[str] = None is_public: Optional[bool] = None @router.get("") async def list_dogs(user=Depends(get_current_user)): with db() as conn: own = conn.execute( "SELECT *, NULL AS shared_by, NULL AS share_role FROM dogs WHERE user_id=? AND (verstorben_am IS NULL) ORDER BY id", (user["id"],) ).fetchall() shared = conn.execute( """SELECT d.*, u.name AS shared_by, ds.role AS share_role FROM dog_shares ds JOIN dogs d ON d.id = ds.dog_id JOIN users u ON u.id = ds.owner_id WHERE ds.shared_with_id = ? AND ds.accepted_at IS NOT NULL""", (user["id"],) ).fetchall() guest_rows = conn.execute(""" SELECT d.*, ss.id AS sub_id, ss.valid_until AS sitting_until, u.name AS owner_name, NULL AS shared_by, NULL AS share_role FROM sitting_subscriptions ss JOIN dogs d ON d.id = ss.dog_id JOIN users u ON u.id = ss.owner_id WHERE ss.sitter_id = ? AND ss.valid_until >= date('now') """, (user["id"],)).fetchall() result = [] for r in own: d = dict(r) d["is_guest"] = False result.append(d) for r in shared: d = dict(r) d["is_guest"] = False result.append(d) for r in guest_rows: d = dict(r) d["is_guest"] = True result.append(d) # HdM-Siege pro Hund anhängen if result: dog_ids = [d["id"] for d in result] with db() as conn: wins_rows = conn.execute( f"SELECT dog_id, monat FROM hund_des_monats_wins WHERE dog_id IN ({','.join('?'*len(dog_ids))}) ORDER BY monat DESC", dog_ids, ).fetchall() wins_map: dict[int, list[str]] = {} for w in wins_rows: wins_map.setdefault(w["dog_id"], []).append(w["monat"]) for d in result: d["hdm_wins"] = wins_map.get(d["id"], []) return result def _is_plausible_dog(name: str, rasse: str, geburtstag) -> tuple[bool, str]: """Einfache Plausibilitätsprüfung für Hunde-Profile.""" import re, datetime name = (name or "").strip() rasse = (rasse or "").strip() if len(name) < 2: return False, "Der Name muss mindestens 2 Zeichen haben." if not re.search(r'[a-zA-ZäöüÄÖÜß]', name): return False, "Der Name muss mindestens einen Buchstaben enthalten." if len(set(name.lower())) < 2: return False, "Bitte einen echten Namen eingeben." if rasse and len(rasse) < 2: return False, "Bitte eine gültige Rasse eingeben." if rasse and not re.search(r'[a-zA-ZäöüÄÖÜß]', rasse): return False, "Die Rasse muss Buchstaben enthalten." if geburtstag: try: if isinstance(geburtstag, str): year = int(geburtstag[:4]) else: year = geburtstag.year now = datetime.date.today().year if year > now: return False, "Das Geburtsdatum liegt in der Zukunft." if year < now - 30: return False, "Das Geburtsdatum ist unrealistisch." except Exception: pass return True, "" @router.post("") async def create_dog(data: DogCreate, user=Depends(get_current_user)): with db() as conn: existing = conn.execute( "SELECT COUNT(*) FROM dogs WHERE user_id=?", (user["id"],) ).fetchone()[0] if existing >= 1 and not has_pro_access(user): raise HTTPException( status_code=403, detail="Mehrere Hunde sind ein Pro-Feature. Upgrade auf Ban Yaro Pro, um weitere Hunde anzulegen." ) conn.execute( """INSERT INTO dogs (user_id, name, rasse, geburtstag, geschlecht, gewicht_kg, widerrist_cm, chip_nr, bio, is_public) VALUES (?,?,?,?,?,?,?,?,?,?)""", (user["id"], data.name, data.rasse, data.geburtstag, data.geschlecht, data.gewicht_kg, data.widerrist_cm, data.chip_nr, data.bio, int(data.is_public)) ) dog = conn.execute( "SELECT * FROM dogs WHERE user_id=? ORDER BY id DESC LIMIT 1", (user["id"],) ).fetchone() # Gründer-Aktivierung: erstes Hunde-Profil + is_founder_pending user_row = conn.execute( "SELECT is_founder_pending, is_founder FROM users WHERE id=?", (user["id"],) ).fetchone() if user_row and user_row["is_founder_pending"] and not user_row["is_founder"]: dog_count = conn.execute( "SELECT COUNT(*) FROM dogs WHERE user_id=?", (user["id"],) ).fetchone()[0] if dog_count == 1: # genau dieser erste Hund plausible, reason = _is_plausible_dog(data.name, data.rasse, data.geburtstag) if plausible: total = conn.execute( "SELECT COUNT(*) FROM users WHERE is_founder=1" ).fetchone()[0] if total < 100: conn.execute( "UPDATE users SET is_founder=1, founder_number=?, is_founder_pending=0 WHERE id=?", (total + 1, user["id"]) ) return dict(dog) @router.get("/{dog_id}/welcome-dashboard") async def get_welcome_dashboard(dog_id: int, user=Depends(get_current_user)): """Liefert kompakte Dashboard-Daten für die Welcome-Ansicht eines Hundes.""" import random as _random with db() as conn: # Besitz prüfen dog = conn.execute( "SELECT id FROM dogs WHERE id=? AND user_id=?", (dog_id, user["id"]) ).fetchone() if not dog: raise HTTPException(404, "Hund nicht gefunden.") # Hintergrundfoto: Querformat-Bilder bevorzugt, tagesweise rotierend photos = conn.execute( """SELECT dm.url FROM diary_media dm JOIN diary d ON d.id = dm.diary_id WHERE d.dog_id=? AND dm.media_type='image' AND dm.img_width IS NOT NULL AND dm.img_width > dm.img_height ORDER BY d.datum DESC, d.id DESC, dm.id ASC""", (dog_id,) ).fetchall() # Fallback: Bilder ohne Dimensionsdaten (vor dem Backfill hochgeladen) if not photos: photos = conn.execute( """SELECT dm.url FROM diary_media dm JOIN diary d ON d.id = dm.diary_id WHERE d.dog_id=? AND dm.media_type='image' AND dm.img_width IS NULL ORDER BY d.datum DESC, d.id DESC, dm.id ASC""", (dog_id,) ).fetchall() random_photo = None if photos: import datetime as _dt2 tick = (_dt2.date.today() - _dt2.date(2024, 1, 1)).days chosen_url = photos[tick % len(photos)]["url"] random_photo = { "url": chosen_url, "preview_url": preview_url_from(chosen_url), } # Neuester Tagebucheintrag last_diary_row = conn.execute( "SELECT titel, datum FROM diary WHERE dog_id=? ORDER BY datum DESC LIMIT 1", (dog_id,) ).fetchone() last_diary = dict(last_diary_row) if last_diary_row else None # Nächster Termin (kein Gewicht, nur innerhalb 60 Tage) next_appt_row = conn.execute( """SELECT bezeichnung, naechstes, typ FROM health WHERE dog_id=? AND naechstes IS NOT NULL AND naechstes >= date('now') AND naechstes <= date('now', '+60 days') AND typ != 'gewicht' ORDER BY naechstes ASC LIMIT 1""", (dog_id,) ).fetchone() next_appointment = dict(next_appt_row) if next_appt_row else None # Letztes Gewicht last_weight_row = conn.execute( """SELECT wert, einheit, datum FROM health WHERE dog_id=? AND typ='gewicht' ORDER BY datum DESC LIMIT 1""", (dog_id,) ).fetchone() last_weight = dict(last_weight_row) if last_weight_row else None # Anzahl Tagebucheinträge diary_count = conn.execute( "SELECT COUNT(*) AS n FROM diary WHERE dog_id=?", (dog_id,) ).fetchone()["n"] # Tagesübung — JOIN mit training_exercises via js_exercise_id, tagesstabil import datetime as _dt day_num = (_dt.date.today() - _dt.date(2024, 1, 1)).days # Versuche JOIN (funktioniert wenn js_exercise_id-Spalte vorhanden) # Nur Übungen des aktiven Hundes, 'sitzt' ausschließen try: joined = conn.execute( """SELECT ep.exercise_id, te.name, te.kategorie AS kategorie_raw, te.schwierigkeit, te.js_exercise_id FROM exercise_progress ep JOIN training_exercises te ON te.js_exercise_id = ep.exercise_id WHERE ep.dog_id = ? AND ep.status IN ('noch-nicht', 'manchmal', 'meistens') ORDER BY ep.updated_at ASC LIMIT 50""", (dog_id,) ).fetchall() except Exception: joined = [] daily_exercise = None if joined: row = joined[day_num % len(joined)] # Tab-ID aus exercise_id ableiten (alles vor erstem '_' + '_') ex_id = row["exercise_id"] tab = ex_id.split('_')[0] if '_' in ex_id else ex_id daily_exercise = { "exercise_id": ex_id, "name": row["name"], "kategorie": tab, "schwierigkeit": row["schwierigkeit"], } else: # Fallback: exercise_progress ohne JOIN (Legacy-IDs ohne Matching in DB) _KNOWN_PREFIXES = ( 'grundkommandos_', 'tricks_', 'problemverhalten_', 'mentale-auslastung_', 'hundesport_', 'koerperpflege_', 'welpe-basics_', ) raw = conn.execute( """SELECT exercise_id FROM exercise_progress WHERE dog_id = ? AND status IN ('noch-nicht', 'manchmal', 'meistens') ORDER BY updated_at ASC LIMIT 50""", (dog_id,) ).fetchall() valid = [r["exercise_id"] for r in raw if any(r["exercise_id"].startswith(p) for p in _KNOWN_PREFIXES)] if valid: ex_id = valid[day_num % len(valid)] for prefix in _KNOWN_PREFIXES: if ex_id.startswith(prefix): tab = prefix.rstrip('_') name = ex_id[len(prefix):].replace('_', ' ') daily_exercise = {"exercise_id": ex_id, "name": name, "kategorie": tab} break return { "random_photo": random_photo, "last_diary": last_diary, "next_appointment": next_appointment, "last_weight": last_weight, "diary_count": diary_count, "daily_exercise": daily_exercise, } @router.get("/{dog_id}/wrapped") async def get_dog_wrapped(dog_id: int, year: int = None, user=Depends(get_current_user)): """Jahresrückblick ('Wrapped') für einen Hund.""" import json as _json from datetime import date as _date if year is None: year = _date.today().year with db() as conn: dog = conn.execute( "SELECT id, name, user_id FROM dogs WHERE id=? AND user_id=?", (dog_id, user["id"]) ).fetchone() if not dog: raise HTTPException(404, "Hund nicht gefunden.") # km gelaufen (eigene Routen des Users) gesamt_km_row = conn.execute( "SELECT ROUND(COALESCE(SUM(distanz_km),0),1) AS km FROM routes " "WHERE user_id=? AND strftime('%Y', created_at)=?", (user["id"], str(year)) ).fetchone() gesamt_km = gesamt_km_row["km"] or 0.0 # Gassi-Tage (Distinct Datum in Diary) gassi_tage = conn.execute( "SELECT COUNT(DISTINCT datum) AS n FROM diary " "WHERE dog_id=? AND strftime('%Y', datum)=?", (dog_id, str(year)) ).fetchone()["n"] # Gesamte Einträge eintraege_gesamt = conn.execute( "SELECT COUNT(*) AS n FROM diary " "WHERE dog_id=? AND strftime('%Y', datum)=?", (dog_id, str(year)) ).fetchone()["n"] # Fotos gesamt fotos_gesamt = conn.execute( "SELECT COUNT(*) AS n FROM diary_media dm " "JOIN diary d ON d.id=dm.diary_id " "WHERE d.dog_id=? AND strftime('%Y', d.datum)=? AND dm.media_type='image'", (dog_id, str(year)) ).fetchone()["n"] # Beste Route (längste distanz) beste_route_row = conn.execute( "SELECT MAX(distanz_km) AS km FROM routes " "WHERE user_id=? AND strftime('%Y', created_at)=?", (user["id"], str(year)) ).fetchone() beste_route = beste_route_row["km"] or 0.0 # Lieblingsmonat (meiste diary-Einträge) monat_rows = conn.execute( "SELECT strftime('%m', datum) AS monat, COUNT(*) AS n FROM diary " "WHERE dog_id=? AND strftime('%Y', datum)=? " "GROUP BY monat ORDER BY n DESC LIMIT 1", (dog_id, str(year)) ).fetchone() lieblings_monat = None if monat_rows: _MONATE = ['Jan','Feb','Mär','Apr','Mai','Jun','Jul','Aug','Sep','Okt','Nov','Dez'] try: lieblings_monat = _MONATE[int(monat_rows["monat"]) - 1] except Exception: pass # Lieblingsaktivität (häufigster typ) typ_row = conn.execute( "SELECT typ, COUNT(*) AS n FROM diary " "WHERE dog_id=? AND strftime('%Y', datum)=? " "GROUP BY typ ORDER BY n DESC LIMIT 1", (dog_id, str(year)) ).fetchone() lieblings_aktivitaet = typ_row["typ"] if typ_row else None # Training-Sessions training_sessions = conn.execute( "SELECT COUNT(*) AS n FROM training_sessions " "WHERE dog_id=? AND strftime('%Y', created_at)=?", (dog_id, str(year)) ).fetchone()["n"] # Gesundheits-Einträge gesundheit_eintraege = conn.execute( "SELECT COUNT(*) AS n FROM health " "WHERE dog_id=? AND strftime('%Y', datum)=?", (dog_id, str(year)) ).fetchone()["n"] # Wetter-Tapferkeit: Tagebuch-Einträge mit weather_json wetter_kalt = 0 wetter_warm = 0 wetter_rows = conn.execute( "SELECT weather_json FROM diary " "WHERE dog_id=? AND strftime('%Y', datum)=? AND weather_json IS NOT NULL", (dog_id, str(year)) ).fetchall() for wr in wetter_rows: try: wj = _json.loads(wr["weather_json"]) temp = wj.get("temp_c") or wj.get("temperature") or wj.get("temp") if temp is not None: if float(temp) < 5: wetter_kalt += 1 elif float(temp) > 25: wetter_warm += 1 except Exception: pass return { "dog_id": dog_id, "dog_name": dog["name"], "year": year, "gesamt_km": gesamt_km, "gassi_tage": gassi_tage, "eintraege_gesamt": eintraege_gesamt, "fotos_gesamt": fotos_gesamt, "beste_route": beste_route, "lieblings_monat": lieblings_monat, "lieblings_aktivitaet": lieblings_aktivitaet, "training_sessions": training_sessions, "gesundheit_eintraege": gesundheit_eintraege, "wetter_kalt": wetter_kalt, "wetter_warm": wetter_warm, } @router.get("/{dog_id}/buch") async def get_hunde_buch( dog_id: int, jahr: int = None, limit: int = 50, nur_fotos: bool = False, nur_meilensteine: bool = False, user=Depends(get_current_user), ): """Hunde-Buch: druckbare HTML-Ansicht der schoensten Tagebucheintraege.""" import json as _json from datetime import date as _date from fastapi.responses import HTMLResponse from html import escape as _esc with db() as conn: dog = conn.execute( "SELECT id, name, rasse, geburtstag, foto_url FROM dogs WHERE id=? AND user_id=?", (dog_id, user["id"]) ).fetchone() if not dog: raise HTTPException(404, "Hund nicht gefunden.") dog = dict(dog) # --- Eintraege laden --- conditions = ["(d.dog_id=? OR dd.dog_id=?)"] params: list = [dog_id, dog_id] if jahr: conditions.append("strftime('%Y', d.datum) = ?") params.append(str(jahr)) if nur_meilensteine: conditions.append("d.is_milestone = 1") where = " AND ".join(conditions) rows = conn.execute( f"""SELECT DISTINCT d.id, d.datum, d.titel, d.text, d.tags, d.gps_lat, d.gps_lon, d.location_name, d.weather_json, d.is_milestone, (SELECT dm.url FROM diary_media dm WHERE dm.diary_id=d.id AND dm.media_type='image' ORDER BY dm.is_cover DESC, dm.sort_order LIMIT 1) AS cover_url FROM diary d LEFT JOIN diary_dogs dd ON dd.diary_id = d.id WHERE {where} AND d.datum IS NOT NULL ORDER BY d.datum ASC""", params ).fetchall() rows = [dict(r) for r in rows] # Filtern: Eintraege mit Foto bevorzugen / nur Fotos-Modus if nur_fotos: rows = [r for r in rows if r.get("cover_url")] else: # Prioritaet: Meilensteine + Foto-Eintraege; Rest auffuellen bis limit with_photo = [r for r in rows if r.get("cover_url")] milestones = [r for r in rows if r.get("is_milestone") and not r.get("cover_url")] rest = [r for r in rows if not r.get("cover_url") and not r.get("is_milestone")] rows = with_photo + milestones + rest rows.sort(key=lambda r: r["datum"] or "") rows = rows[:limit] # --- Hund-Alter berechnen --- alter_str = "" if dog.get("geburtstag"): try: geb = _date.fromisoformat(dog["geburtstag"]) heute = _date.today() jahre = (heute - geb).days // 365 alter_str = f"{jahre} Jahre" except Exception: pass # --- HTML bauen --- dog_name = _esc(dog["name"] or "Mein Hund") rasse_str = _esc(dog.get("rasse") or "") jahr_str = str(jahr) if jahr else "Alle Jahre" foto_url = dog.get("foto_url") or "" cover_img = ( f'{dog_name}' if foto_url else f'
🐾
' ) subtitle_parts = [p for p in [rasse_str, alter_str] if p] subtitle = " · ".join(subtitle_parts) _MONATE = ["Januar","Februar","März","April","Mai","Juni", "Juli","August","September","Oktober","November","Dezember"] def _fmt_datum(iso: str) -> str: try: d = _date.fromisoformat(iso) return f"{d.day}. {_MONATE[d.month - 1]} {d.year}" except Exception: return iso or "" def _wetter_chip(wj_str: str) -> str: if not wj_str: return "" try: wj = _json.loads(wj_str) temp = wj.get("temp_c") or wj.get("temperature") or wj.get("temp") if temp is None: return "" temp_i = int(float(temp)) emoji = "☀️" if temp_i > 20 else ("🌧️" if temp_i < 10 else "⛅") return f'{emoji} {temp_i}°C' except Exception: return "" entries_html = "" for e in rows: milestone_class = "milestone" if e.get("is_milestone") else "" datum_fmt = _fmt_datum(e.get("datum") or "") titel = _esc(e.get("titel") or "") text_raw = e.get("text") or "" text = _esc(text_raw).replace("\n", "
") wetter = _wetter_chip(e.get("weather_json") or "") loc = _esc(e.get("location_name") or "") cover = e.get("cover_url") or "" foto_html = "" if cover: foto_html = ( f'
' f'' f'
' ) loc_html = f'📍 {loc}' if loc else "" chips_html = f'
{wetter}{loc_html}
' if (wetter or loc_html) else "" titel_html = f'
{titel}
' if titel else "" text_html = f'
{text}
' if text_raw else "" entries_html += f"""
{foto_html}
{datum_fmt}
{titel_html} {text_html} {chips_html}
""" anzahl = len(rows) html_page = f""" Hunde-Buch — {dog_name}
{cover_img}

{dog_name}

{'
' + subtitle + '
' if subtitle else ''}
{jahr_str}
{anzahl} Einträge
{entries_html} """ return HTMLResponse(content=html_page) # ------------------------------------------------------------------ # GET /api/dogs/verstorben — Alle verstorbenen Hunde des Users # ------------------------------------------------------------------ @router.get("/verstorben") async def get_verstorbene_hunde(user=Depends(get_current_user)): with db() as conn: rows = conn.execute( """SELECT id, name, rasse, foto_url, verstorben_am, geburtstag FROM dogs WHERE user_id=? AND verstorben_am IS NOT NULL ORDER BY verstorben_am DESC""", (user["id"],) ).fetchall() return [dict(r) for r in rows] @router.get("/{dog_id}") async def get_dog(dog_id: int, user=Depends(get_current_user)): with db() as conn: dog = conn.execute( "SELECT * FROM dogs WHERE id=? AND user_id=?", (dog_id, user["id"]) ).fetchone() if not dog: raise HTTPException(404, "Hund nicht gefunden.") return dict(dog) @router.patch("/{dog_id}") async def update_dog(dog_id: int, data: DogUpdate, user=Depends(get_current_user)): fields = {k: v for k, v in data.model_dump().items() if v is not None} if not fields: raise HTTPException(400, "Keine Änderungen angegeben.") set_clause = ", ".join(f"{k}=?" for k in fields) values = list(fields.values()) + [dog_id, user["id"]] with db() as conn: updated = conn.execute( f"UPDATE dogs SET {set_clause} WHERE id=? AND user_id=?", values ).rowcount if not updated: raise HTTPException(404, "Hund nicht gefunden.") dog = conn.execute( "SELECT * FROM dogs WHERE id=? AND user_id=?", (dog_id, user["id"]) ).fetchone() return dict(dog) @router.delete("/{dog_id}", status_code=204) async def delete_dog(dog_id: int, user=Depends(get_current_user)): with db() as conn: conn.execute( "DELETE FROM dogs WHERE id=? AND user_id=?", (dog_id, user["id"]) ) @router.post("/{dog_id}/photo") async def upload_photo( dog_id: int, file: UploadFile = File(...), user=Depends(get_current_user) ): # Hund gehört dem User? with db() as conn: dog = conn.execute( "SELECT id FROM dogs WHERE id=? AND user_id=?", (dog_id, user["id"]) ).fetchone() if not dog: raise HTTPException(404, "Hund nicht gefunden.") # Datei immer als JPEG speichern (HEIC/PNG/WebP → kompatibel für alle Browser) import io from PIL import Image try: import pillow_heif pillow_heif.register_heif_opener() except ImportError: pass content = await file.read() try: from PIL import ImageOps img = Image.open(io.BytesIO(content)) img = ImageOps.exif_transpose(img) # EXIF-Orientierung anwenden img = img.convert("RGB") buf = io.BytesIO() img.save(buf, format="JPEG", quality=90) content = buf.getvalue() except Exception: pass # Fallback: Originaldaten speichern filename = f"dog_{dog_id}_{uuid.uuid4().hex[:8]}.jpg" path = os.path.join(MEDIA_DIR, "dogs", filename) os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "wb") as f: f.write(content) foto_url = f"/media/dogs/{filename}" with db() as conn: conn.execute("UPDATE dogs SET foto_url=? WHERE id=?", (foto_url, dog_id)) return {"foto_url": foto_url} class PhotoPosition(BaseModel): zoom: float = 1.0 offset_x: float = 0.0 offset_y: float = 0.0 @router.patch("/{dog_id}/photo-position") async def update_photo_position(dog_id: int, pos: PhotoPosition, user=Depends(get_current_user)): with db() as conn: updated = conn.execute( "UPDATE dogs SET foto_zoom=?, foto_offset_x=?, foto_offset_y=? WHERE id=? AND user_id=?", (pos.zoom, pos.offset_x, pos.offset_y, dog_id, user["id"]) ).rowcount if not updated: raise HTTPException(404, "Hund nicht gefunden.") return {"ok": True} @router.delete("/{dog_id}/photo", status_code=204) async def delete_photo(dog_id: int, user=Depends(get_current_user)): with db() as conn: row = conn.execute( "SELECT foto_url FROM dogs WHERE id=? AND user_id=?", (dog_id, user["id"]) ).fetchone() if not row: raise HTTPException(404, "Hund nicht gefunden.") if row["foto_url"]: path = safe_media_path(MEDIA_DIR, row["foto_url"]) if path and os.path.exists(path): os.remove(path) with db() as conn: conn.execute( "UPDATE dogs SET foto_url=NULL, foto_zoom=1.0, foto_offset_x=0.0, foto_offset_y=0.0 WHERE id=? AND user_id=?", (dog_id, user["id"]) ) # ------------------------------------------------------------------ # Fähigkeiten / Kommandos (für Profil + öffentliche Seite) # ------------------------------------------------------------------ def _parse_exercise_name(exercise_id: str) -> str: """grundkommandos_Hier__Komm → 'Hier / Komm'""" parts = exercise_id.split("_", 1) if len(parts) < 2: return exercise_id return parts[1].replace("__", " / ").replace("_", " ") def _load_skills(conn, dog_id: int, user_id: int) -> list: """Gibt Übungen mit Status 'sitzt' oder 'meistens' zurück, die mit diesem Hund trainiert wurden.""" rows = conn.execute( """ SELECT ep.exercise_id, ep.status, (SELECT ts.exercise_name FROM training_sessions ts WHERE ts.user_id = ep.user_id AND ts.dog_id = ? AND ts.exercise_id = ep.exercise_id ORDER BY ts.datum DESC, ts.created_at DESC LIMIT 1) AS exercise_name FROM exercise_progress ep WHERE ep.user_id = ? AND ep.status IN ('sitzt', 'meistens') AND EXISTS (SELECT 1 FROM training_sessions ts2 WHERE ts2.user_id = ep.user_id AND ts2.dog_id = ? AND ts2.exercise_id = ep.exercise_id) ORDER BY ep.status DESC, ep.exercise_id """, (dog_id, user_id, dog_id) ).fetchall() return [ { "exercise_id": r["exercise_id"], "exercise_name": r["exercise_name"] or _parse_exercise_name(r["exercise_id"]), "status": r["status"], "tab": r["exercise_id"].split("_")[0], } for r in rows ] @router.get("/{dog_id}/skills") async def get_dog_skills(dog_id: int, user=Depends(get_current_user)): uid = user["id"] with db() as conn: dog = conn.execute( "SELECT id, user_id FROM dogs WHERE id=? AND (user_id=? OR id IN (SELECT dog_id FROM sitting_subscriptions WHERE sitter_id=? AND valid_until >= date('now')))", (dog_id, uid, uid) ).fetchone() if not dog: raise HTTPException(404, "Hund nicht gefunden.") return _load_skills(conn, dog_id, dog["user_id"]) # Öffentliches Profil (für NFC-Tag, kein Login nötig) @router.get("/public/{dog_id}") async def public_dog_profile(dog_id: int): with db() as conn: dog = conn.execute( """SELECT d.id, d.name, d.rasse, d.geburtstag, d.foto_url, d.bio, d.user_id, u.name as besitzer_name FROM dogs d JOIN users u ON d.user_id=u.id WHERE d.id=? AND d.is_public=1""", (dog_id,) ).fetchone() if not dog: raise HTTPException(404, "Profil nicht gefunden oder nicht öffentlich.") skills = _load_skills(conn, dog_id, dog["user_id"]) result = dict(dog) result.pop("user_id", None) result["skills"] = skills return result class FoundReport(BaseModel): message: Optional[str] = None kontakt: Optional[str] = None # Gefunden-Meldung (kein Login nötig) @router.post("/public/{dog_id}/found") async def report_found(dog_id: int, data: FoundReport = FoundReport()): with db() as conn: row = conn.execute( """SELECT d.id, d.name, d.user_id FROM dogs d WHERE d.id=? AND d.is_public=1""", (dog_id,) ).fetchone() if not row: raise HTTPException(404, "Profil nicht gefunden oder nicht öffentlich.") dog_name = row["name"] user_id = row["user_id"] body = data.message.strip() if data.message and data.message.strip() \ else "Jemand hat deinen Hund gefunden. Öffne die App für Details." if data.kontakt and data.kontakt.strip(): body += f" Kontakt: {data.kontakt.strip()}" send_push_to_user(user_id, { "title": f"🐾 {dog_name} wurde gefunden!", "body": body, "data": {"page": "diary", "found": True}, "tag": f"found-{dog_id}", }) return {"ok": True} # ------------------------------------------------------------------ # GET /api/dogs/{id}/pflege — Pflegetipps für diesen Hund # ------------------------------------------------------------------ @router.get("/{dog_id}/pflege") async def get_pflege_tipps(dog_id: int, user=Depends(get_current_user)): import json as _json with db() as conn: dog = conn.execute( "SELECT id, name, rasse, rasse_id FROM dogs WHERE id=? AND user_id=?", (dog_id, user["id"]) ).fetchone() if not dog: raise HTTPException(404, "Hund nicht gefunden.") # Rassen-Infos für Fell-Typ rasse_info = None with db() as conn: if dog["rasse_id"]: rasse_info = conn.execute( "SELECT name, groesse, beschreibung FROM wiki_rassen WHERE id=?", (dog["rasse_id"],) ).fetchone() elif dog["rasse"]: rasse_info = conn.execute( "SELECT name, groesse, beschreibung FROM wiki_rassen WHERE name LIKE ? LIMIT 1", (f"%{dog['rasse']}%",) ).fetchone() # Fell-Typ und Pflegeart ableiten fell_filter = None fell_pflege_art_filter = None if rasse_info: beschr = (rasse_info["beschreibung"] or "").lower() if any(w in beschr for w in ["lockig", "wellig", "kraus", "pudel", "doodle"]): fell_filter = "lockig" elif any(w in beschr for w in ["langhaar", "seidiges", "fließendes", "langes fell"]): fell_filter = "lang" elif any(w in beschr for w in ["kurzhaar", "kurzes fell", "glatthaarig"]): fell_filter = "kurz" elif rasse_info["groesse"] in ("gross", "sehr_gross"): fell_filter = "doppel" # Pflegeart: Trimmen vs. Schneiden if any(w in beschr for w in ["trimm", "hand-stripping", "stripping", "rauhhaar", "drahthaar", "rauhaar"]): fell_pflege_art_filter = "trimmen" elif any(w in beschr for w in ["schneid", "geschoren", "schere", "clipper"]): fell_pflege_art_filter = "schneiden" with db() as conn: alle_tipps = conn.execute( "SELECT * FROM pflege_tipps ORDER BY kategorie, titel" ).fetchall() # Relevante Tipps: kein Fell-Filter oder passend from datetime import date heute_saison = {1:"winter",2:"winter",3:"fruehling",4:"fruehling",5:"fruehling", 6:"sommer",7:"sommer",8:"sommer",9:"herbst",10:"herbst", 11:"herbst",12:"winter"}[date.today().month] result = [] for t in alle_tipps: t = dict(t) # Fell-Typ-Filter if fell_filter and t["fell_typ"] and t["fell_typ"] != "alle": if fell_filter not in t["fell_typ"].split(","): continue # Pflegeart-Filter: Trimm-Tipps nicht bei Schneidehunden und umgekehrt tipp_art = t.get("fell_pflege_art") if tipp_art and tipp_art != "alle" and fell_pflege_art_filter: if tipp_art != fell_pflege_art_filter: continue t["schritte"] = _json.loads(t["schritte"] or "[]") t["saisonal_aktuell"] = bool(t["saison"] and heute_saison in t["saison"]) result.append(t) # Tipp des Tages: erster aktuell-saisonaler oder zufällig deterministisch from hashlib import md5 day_hash = int(md5(str(date.today()).encode()).hexdigest(), 16) saisonal = [t for t in result if t["saisonal_aktuell"]] tipp_des_tages = (saisonal or result)[day_hash % len(saisonal or result)] if result else None return { "dog_name": dog["name"], "rasse_name": rasse_info["name"] if rasse_info else dog["rasse"], "tipp_des_tages": tipp_des_tages, "tipps": result, "kategorien": list(dict.fromkeys(t["kategorie"] for t in result)), "fell_pflege_art": fell_pflege_art_filter, # 'schneiden' | 'trimmen' | None } # ------------------------------------------------------------------ # LEBENS-TIMELINE # ------------------------------------------------------------------ @router.get("/{dog_id}/timeline") async def get_dog_timeline(dog_id: int, user=Depends(get_current_user)): """Aggregierte Lebens-Timeline eines Hundes aus allen Datenquellen.""" import json as _json with db() as conn: dog = conn.execute( "SELECT id, name, user_id, geburtstag FROM dogs WHERE id=? AND user_id=?", (dog_id, user["id"]) ).fetchone() if not dog: raise HTTPException(404, "Hund nicht gefunden.") events = [] with db() as conn: # --- Tagebuch --- diary_rows = conn.execute( """SELECT d.id, d.datum, d.titel, d.typ, d.is_milestone, dm.url AS foto_url FROM diary d LEFT JOIN diary_media dm ON dm.diary_id = d.id AND dm.sort_order = 0 WHERE d.dog_id=? ORDER BY d.datum ASC, d.id ASC""", (dog_id,) ).fetchall() for i, r in enumerate(diary_rows): events.append({ "datum": r["datum"], "kategorie": "tagebuch", "titel": r["titel"] or ("Tagebucheintrag" if r["typ"] == "eintrag" else str(r["typ"]).capitalize()), "typ": r["typ"], "is_first": i == 0, "is_milestone": bool(r["is_milestone"]), "foto_url": r["foto_url"], "ref_id": r["id"], }) # --- Gesundheit --- health_rows = conn.execute( """SELECT id, datum, bezeichnung, typ FROM health WHERE dog_id=? ORDER BY datum ASC, id ASC""", (dog_id,) ).fetchall() typ_seen = {} for r in health_rows: t = r["typ"] is_first = t not in typ_seen if is_first: typ_seen[t] = True events.append({ "datum": r["datum"], "kategorie": "gesundheit", "titel": r["bezeichnung"], "typ": t, "is_first": is_first, "is_milestone": False, "foto_url": None, "ref_id": r["id"], }) # --- Training-Sessions --- ts_rows = conn.execute( """SELECT id, datum, exercise_name, erfolgsquote, ist_top FROM training_sessions WHERE dog_id=? AND user_id=? ORDER BY datum ASC, id ASC""", (dog_id, user["id"]) ).fetchall() ts_first = True ts_best = None ts_best_score = -1 for r in ts_rows: if r["erfolgsquote"] is not None and r["erfolgsquote"] > ts_best_score: ts_best_score = r["erfolgsquote"] ts_best = r for i, r in enumerate(ts_rows): is_first = (i == 0) is_best = ts_best and r["id"] == ts_best["id"] and i > 0 events.append({ "datum": r["datum"], "kategorie": "training", "titel": r["exercise_name"], "typ": "training", "is_first": is_first, "is_milestone": bool(r["ist_top"]) or is_best, "foto_url": None, "ref_id": r["id"], }) # --- Routen (nur Routen wo dieser Hund mitgegangen ist) --- route_rows = conn.execute( """SELECT r.id, r.name, r.distanz_km, date(r.created_at) AS datum FROM routes r JOIN route_dogs rd ON rd.route_id = r.id AND rd.dog_id = ? WHERE r.user_id = ? ORDER BY r.created_at ASC""", (dog_id, user["id"]) ).fetchall() route_first = True route_longest = None route_max_km = -1 for r in route_rows: km = r["distanz_km"] or 0 if km > route_max_km: route_max_km = km route_longest = r for i, r in enumerate(route_rows): is_first = (i == 0) is_longest = route_longest and r["id"] == route_longest["id"] and i > 0 events.append({ "datum": r["datum"], "kategorie": "route", "titel": r["name"], "typ": "route", "is_first": is_first, "is_milestone": is_longest, "foto_url": None, "ref_id": r["id"], "distanz_km": r["distanz_km"], }) # Geburtstag des Hundes als erster Eintrag if dog["geburtstag"]: events.append({ "datum": dog["geburtstag"], "kategorie": "meilenstein", "titel": f"{dog['name']} wird geboren", "typ": "geburtstag", "is_first": True, "is_milestone": True, "foto_url": None, "ref_id": None, }) # Chronologisch sortieren events.sort(key=lambda e: (e["datum"] or "0000-00-00", e["kategorie"])) return { "dog_name": dog["name"], "geburtstag": dog["geburtstag"], "events": events, } # ------------------------------------------------------------------ # POST /api/dogs/{id}/gedenken — Hund als verstorben markieren # ------------------------------------------------------------------ class GedenkenData(BaseModel): verstorben_am: str # YYYY-MM-DD @router.post("/{dog_id}/gedenken") async def mark_verstorben(dog_id: int, data: GedenkenData, user=Depends(get_current_user)): with db() as conn: updated = conn.execute( "UPDATE dogs SET verstorben_am=? WHERE id=? AND user_id=?", (data.verstorben_am, dog_id, user["id"]) ).rowcount if not updated: raise HTTPException(404, "Hund nicht gefunden.") return {"ok": True} # ------------------------------------------------------------------ # GET /api/dogs/{id}/gedenkseite — Memorial-Daten # ------------------------------------------------------------------ @router.get("/{dog_id}/gedenkseite") async def get_gedenkseite(dog_id: int, user=Depends(get_current_user)): with db() as conn: dog = conn.execute( "SELECT * FROM dogs WHERE id=? AND user_id=?", (dog_id, user["id"]) ).fetchone() if not dog: raise HTTPException(404) dog = dict(dog) # Statistiken km_total = conn.execute( "SELECT COALESCE(ROUND(SUM(distanz_km),1),0) AS km FROM routes r " "JOIN route_dogs rd ON rd.route_id=r.id WHERE rd.dog_id=?", (dog_id,) ).fetchone()["km"] diary_count = conn.execute( "SELECT COUNT(*) FROM diary WHERE dog_id=?", (dog_id,) ).fetchone()[0] media_count = conn.execute( "SELECT COUNT(*) FROM diary_media dm JOIN diary d ON d.id=dm.diary_id " "WHERE d.dog_id=? AND dm.media_type='image'", (dog_id,) ).fetchone()[0] training_count = conn.execute( "SELECT COUNT(*) FROM training_sessions WHERE dog_id=?", (dog_id,) ).fetchone()[0] # Letzter Tagebucheintrag last_entry = conn.execute( "SELECT titel, datum FROM diary WHERE dog_id=? ORDER BY datum DESC LIMIT 1", (dog_id,) ).fetchone() # Erste und letzte Aufnahme first_entry = conn.execute( "SELECT datum FROM diary WHERE dog_id=? ORDER BY datum ASC LIMIT 1", (dog_id,) ).fetchone() # Letzte 6 Fotos für Galerie photos = conn.execute( """SELECT dm.url FROM diary_media dm JOIN diary d ON d.id=dm.diary_id WHERE d.dog_id=? AND dm.media_type='image' AND dm.img_width IS NOT NULL AND dm.img_width > dm.img_height ORDER BY d.datum DESC, dm.id DESC LIMIT 6""", (dog_id,) ).fetchall() if not photos: photos = conn.execute( """SELECT dm.url FROM diary_media dm JOIN diary d ON d.id=dm.diary_id WHERE d.dog_id=? AND dm.media_type='image' ORDER BY d.datum DESC, dm.id DESC LIMIT 6""", (dog_id,) ).fetchall() # Gemeinsame Zeit berechnen joined = dog.get("geburtstag") or (first_entry["datum"] if first_entry else None) passed = dog.get("verstorben_am") gemeinsam_tage = None if joined and passed: try: from datetime import date as _date d1 = _date.fromisoformat(joined) d2 = _date.fromisoformat(passed) gemeinsam_tage = (d2 - d1).days except Exception: pass return { "dog": dog, "km_total": km_total, "diary_count": diary_count, "media_count": media_count, "training_count": training_count, "last_entry": dict(last_entry) if last_entry else None, "gemeinsam_tage": gemeinsam_tage, "photos": [r["url"] for r in photos], }