"""BAN YARO — Hunde-Profil Routes""" import os import uuid from fastapi import APIRouter, Depends, HTTPException, UploadFile, File from pydantic import BaseModel, Field from typing import Optional from database import db from auth import get_current_user, has_pro_access from routes.push import send_push_to_user from media_utils import safe_media_path, preview_url_from from cache import ttl_cache # ------------------------------------------------------------------ # Pflege-Tipps sind statische Stamm-Daten → 1h TTL-Cache # (Filterung pro Hund passiert weiter unten in-memory, NICHT gecached) # ------------------------------------------------------------------ @ttl_cache(ttl=3600) def _load_all_pflege_tipps() -> list[dict]: with db() as conn: rows = conn.execute( "SELECT * FROM pflege_tipps ORDER BY kategorie, titel" ).fetchall() return [dict(r) for r in rows] router = APIRouter() MEDIA_DIR = os.getenv("MEDIA_DIR", "/data/media") class DogCreate(BaseModel): name: str = Field(..., min_length=1, max_length=80) rasse: Optional[str] = Field(None, max_length=80) geburtstag: Optional[str] = Field(None, max_length=32) geschlecht: Optional[str] = Field(None, max_length=20) gewicht_kg: Optional[float] = None widerrist_cm: Optional[float] = None chip_nr: Optional[str] = Field(None, max_length=50) bio: Optional[str] = Field(None, max_length=2000) is_public: bool = False class DogUpdate(BaseModel): name: Optional[str] = Field(None, max_length=80) rasse: Optional[str] = Field(None, max_length=80) rasse_id: Optional[int] = None geburtstag: Optional[str] = Field(None, max_length=32) geschlecht: Optional[str] = Field(None, max_length=20) gewicht_kg: Optional[float] = None widerrist_cm: Optional[float] = None chip_nr: Optional[str] = Field(None, max_length=50) bio: Optional[str] = Field(None, max_length=2000) is_public: Optional[bool] = None @router.get("") async def list_dogs(user=Depends(get_current_user)): with db() as conn: own = conn.execute( "SELECT *, NULL AS shared_by, NULL AS share_role FROM dogs WHERE user_id=? AND (verstorben_am IS NULL) AND (is_active IS NULL OR is_active=1) ORDER BY id", (user["id"],) ).fetchall() shared = conn.execute( """SELECT d.*, u.name AS shared_by, ds.role AS share_role FROM dog_shares ds JOIN dogs d ON d.id = ds.dog_id JOIN users u ON u.id = ds.owner_id WHERE ds.shared_with_id = ? AND ds.accepted_at IS NOT NULL""", (user["id"],) ).fetchall() guest_rows = conn.execute(""" SELECT d.*, ss.id AS sub_id, ss.valid_until AS sitting_until, u.name AS owner_name, NULL AS shared_by, NULL AS share_role FROM sitting_subscriptions ss JOIN dogs d ON d.id = ss.dog_id JOIN users u ON u.id = ss.owner_id WHERE ss.sitter_id = ? AND ss.valid_until >= date('now') """, (user["id"],)).fetchall() result = [] for r in own: d = dict(r) d["is_guest"] = False result.append(d) for r in shared: d = dict(r) d["is_guest"] = False result.append(d) for r in guest_rows: d = dict(r) d["is_guest"] = True result.append(d) # HdM-Siege pro Hund anhängen if result: dog_ids = [d["id"] for d in result] with db() as conn: wins_rows = conn.execute( f"SELECT dog_id, monat FROM hund_des_monats_wins WHERE dog_id IN ({','.join('?'*len(dog_ids))}) ORDER BY monat DESC", dog_ids, ).fetchall() wins_map: dict[int, list[str]] = {} for w in wins_rows: wins_map.setdefault(w["dog_id"], []).append(w["monat"]) for d in result: d["hdm_wins"] = wins_map.get(d["id"], []) return result def _is_plausible_dog(name: str, rasse: str, geburtstag) -> tuple[bool, str]: """Einfache Plausibilitätsprüfung für Hunde-Profile.""" import re, datetime name = (name or "").strip() rasse = (rasse or "").strip() if len(name) < 2: return False, "Der Name muss mindestens 2 Zeichen haben." if not re.search(r'[a-zA-ZäöüÄÖÜß]', name): return False, "Der Name muss mindestens einen Buchstaben enthalten." if len(set(name.lower())) < 2: return False, "Bitte einen echten Namen eingeben." if rasse and len(rasse) < 2: return False, "Bitte eine gültige Rasse eingeben." if rasse and not re.search(r'[a-zA-ZäöüÄÖÜß]', rasse): return False, "Die Rasse muss Buchstaben enthalten." if geburtstag: try: if isinstance(geburtstag, str): year = int(geburtstag[:4]) else: year = geburtstag.year now = datetime.date.today().year if year > now: return False, "Das Geburtsdatum liegt in der Zukunft." if year < now - 30: return False, "Das Geburtsdatum ist unrealistisch." except Exception: pass return True, "" @router.post("") async def create_dog(data: DogCreate, user=Depends(get_current_user)): with db() as conn: existing = conn.execute( "SELECT COUNT(*) FROM dogs WHERE user_id=?", (user["id"],) ).fetchone()[0] if existing >= 1 and not has_pro_access(user): raise HTTPException( status_code=403, detail="Mehrere Hunde sind ein Pro-Feature. Upgrade auf Ban Yaro Pro, um weitere Hunde anzulegen." ) conn.execute( """INSERT INTO dogs (user_id, name, rasse, geburtstag, geschlecht, gewicht_kg, widerrist_cm, chip_nr, bio, is_public) VALUES (?,?,?,?,?,?,?,?,?,?)""", (user["id"], data.name, data.rasse, data.geburtstag, data.geschlecht, data.gewicht_kg, data.widerrist_cm, data.chip_nr, data.bio, int(data.is_public)) ) dog = conn.execute( "SELECT * FROM dogs WHERE user_id=? ORDER BY id DESC LIMIT 1", (user["id"],) ).fetchone() # Gründer-Aktivierung: erstes Hunde-Profil + is_founder_pending user_row = conn.execute( "SELECT is_founder_pending, is_founder FROM users WHERE id=?", (user["id"],) ).fetchone() if user_row and user_row["is_founder_pending"] and not user_row["is_founder"]: dog_count = conn.execute( "SELECT COUNT(*) FROM dogs WHERE user_id=?", (user["id"],) ).fetchone()[0] if dog_count == 1: # genau dieser erste Hund plausible, reason = _is_plausible_dog(data.name, data.rasse, data.geburtstag) if plausible: # Atomare Gründer-Vergabe — Race-frei via Sub-Query im UPDATE. # Wenn schon 100 Founder oder User schon is_founder=1 → kein Update (rowcount=0) conn.execute( """UPDATE users SET is_founder = 1, founder_number = ( SELECT IFNULL(MAX(founder_number), 0) + 1 FROM users WHERE is_founder = 1 ), is_founder_pending = 0 WHERE id = ? AND is_founder_pending = 1 AND (is_founder IS NULL OR is_founder = 0) AND (SELECT COUNT(*) FROM users WHERE is_founder = 1) < 100""", (user["id"],) ) return dict(dog) @router.get("/{dog_id}/welcome-dashboard") async def get_welcome_dashboard(dog_id: int, user=Depends(get_current_user)): """Liefert kompakte Dashboard-Daten für die Welcome-Ansicht eines Hundes.""" import random as _random with db() as conn: # Besitz prüfen dog = conn.execute( "SELECT id FROM dogs WHERE id=? AND user_id=?", (dog_id, user["id"]) ).fetchone() if not dog: raise HTTPException(404, "Hund nicht gefunden.") # Hintergrundfoto: Querformat-Bilder bevorzugt, tagesweise rotierend # Ownership bereits durch Dog-Check oben gesichert (dog gehört user) photos = conn.execute( """SELECT dm.url FROM diary_media dm JOIN diary d ON d.id = dm.diary_id WHERE d.dog_id=? AND dm.media_type='image' AND dm.img_width IS NOT NULL AND dm.img_width > dm.img_height ORDER BY d.datum DESC, d.id DESC, dm.id ASC""", (dog_id,) ).fetchall() # Fallback: Bilder ohne Dimensionsdaten (vor dem Backfill hochgeladen) if not photos: photos = conn.execute( """SELECT dm.url FROM diary_media dm JOIN diary d ON d.id = dm.diary_id WHERE d.dog_id=? AND dm.media_type='image' AND dm.img_width IS NULL ORDER BY d.datum DESC, d.id DESC, dm.id ASC""", (dog_id,) ).fetchall() random_photo = None if photos: import datetime as _dt2 tick = (_dt2.date.today() - _dt2.date(2024, 1, 1)).days chosen_url = photos[tick % len(photos)]["url"] random_photo = { "url": chosen_url, "preview_url": preview_url_from(chosen_url), } # Neuester Tagebucheintrag last_diary_row = conn.execute( "SELECT titel, datum FROM diary WHERE dog_id=? ORDER BY datum DESC LIMIT 1", (dog_id,) ).fetchone() last_diary = dict(last_diary_row) if last_diary_row else None # Nächster Termin (kein Gewicht, nur innerhalb 60 Tage) next_appt_row = conn.execute( """SELECT bezeichnung, naechstes, typ FROM health WHERE dog_id=? AND naechstes IS NOT NULL AND naechstes >= date('now') AND naechstes <= date('now', '+60 days') AND typ != 'gewicht' ORDER BY naechstes ASC LIMIT 1""", (dog_id,) ).fetchone() next_appointment = dict(next_appt_row) if next_appt_row else None # Letztes Gewicht last_weight_row = conn.execute( """SELECT wert, einheit, datum FROM health WHERE dog_id=? AND typ='gewicht' ORDER BY datum DESC LIMIT 1""", (dog_id,) ).fetchone() last_weight = dict(last_weight_row) if last_weight_row else None # Anzahl Tagebucheinträge diary_count = conn.execute( "SELECT COUNT(*) AS n FROM diary WHERE dog_id=?", (dog_id,) ).fetchone()["n"] # Tagesübung — JOIN mit training_exercises via js_exercise_id, tagesstabil import datetime as _dt day_num = (_dt.date.today() - _dt.date(2024, 1, 1)).days # Versuche JOIN (funktioniert wenn js_exercise_id-Spalte vorhanden) # Nur Übungen des aktiven Hundes, 'sitzt' ausschließen try: joined = conn.execute( """SELECT ep.exercise_id, te.name, te.kategorie AS kategorie_raw, te.schwierigkeit, te.js_exercise_id FROM exercise_progress ep JOIN training_exercises te ON te.js_exercise_id = ep.exercise_id WHERE ep.dog_id = ? AND ep.status IN ('noch-nicht', 'manchmal', 'meistens') ORDER BY ep.updated_at ASC LIMIT 50""", (dog_id,) ).fetchall() except Exception: joined = [] daily_exercise = None if joined: row = joined[day_num % len(joined)] # Tab-ID aus exercise_id ableiten (alles vor erstem '_' + '_') ex_id = row["exercise_id"] tab = ex_id.split('_')[0] if '_' in ex_id else ex_id daily_exercise = { "exercise_id": ex_id, "name": row["name"], "kategorie": tab, "schwierigkeit": row["schwierigkeit"], } else: # Fallback: exercise_progress ohne JOIN (Legacy-IDs ohne Matching in DB) _KNOWN_PREFIXES = ( 'grundkommandos_', 'tricks_', 'problemverhalten_', 'mentale-auslastung_', 'hundesport_', 'koerperpflege_', 'welpe-basics_', ) raw = conn.execute( """SELECT exercise_id FROM exercise_progress WHERE dog_id = ? AND status IN ('noch-nicht', 'manchmal', 'meistens') ORDER BY updated_at ASC LIMIT 50""", (dog_id,) ).fetchall() valid = [r["exercise_id"] for r in raw if any(r["exercise_id"].startswith(p) for p in _KNOWN_PREFIXES)] if valid: ex_id = valid[day_num % len(valid)] for prefix in _KNOWN_PREFIXES: if ex_id.startswith(prefix): tab = prefix.rstrip('_') name = ex_id[len(prefix):].replace('_', ' ') daily_exercise = {"exercise_id": ex_id, "name": name, "kategorie": tab} break return { "random_photo": random_photo, "last_diary": last_diary, "next_appointment": next_appointment, "last_weight": last_weight, "diary_count": diary_count, "daily_exercise": daily_exercise, } @router.get("/{dog_id}/wrapped") async def get_dog_wrapped(dog_id: int, year: int = None, user=Depends(get_current_user)): """Jahresrückblick ('Wrapped') für einen Hund.""" import json as _json from datetime import date as _date if year is None: year = _date.today().year with db() as conn: dog = conn.execute( "SELECT id, name, user_id FROM dogs WHERE id=? AND user_id=?", (dog_id, user["id"]) ).fetchone() if not dog: raise HTTPException(404, "Hund nicht gefunden.") # km gelaufen (eigene Routen des Users) gesamt_km_row = conn.execute( "SELECT ROUND(COALESCE(SUM(distanz_km),0),1) AS km FROM routes " "WHERE user_id=? AND strftime('%Y', created_at)=?", (user["id"], str(year)) ).fetchone() gesamt_km = gesamt_km_row["km"] or 0.0 # Gassi-Tage (Distinct Datum in Diary) gassi_tage = conn.execute( "SELECT COUNT(DISTINCT datum) AS n FROM diary " "WHERE dog_id=? AND strftime('%Y', datum)=?", (dog_id, str(year)) ).fetchone()["n"] # Gesamte Einträge eintraege_gesamt = conn.execute( "SELECT COUNT(*) AS n FROM diary " "WHERE dog_id=? AND strftime('%Y', datum)=?", (dog_id, str(year)) ).fetchone()["n"] # Fotos gesamt fotos_gesamt = conn.execute( "SELECT COUNT(*) AS n FROM diary_media dm " "JOIN diary d ON d.id=dm.diary_id " "WHERE d.dog_id=? AND strftime('%Y', d.datum)=? AND dm.media_type='image'", (dog_id, str(year)) ).fetchone()["n"] # Beste Route (längste distanz) beste_route_row = conn.execute( "SELECT MAX(distanz_km) AS km FROM routes " "WHERE user_id=? AND strftime('%Y', created_at)=?", (user["id"], str(year)) ).fetchone() beste_route = beste_route_row["km"] or 0.0 # Lieblingsmonat (meiste diary-Einträge) monat_rows = conn.execute( "SELECT strftime('%m', datum) AS monat, COUNT(*) AS n FROM diary " "WHERE dog_id=? AND strftime('%Y', datum)=? " "GROUP BY monat ORDER BY n DESC LIMIT 1", (dog_id, str(year)) ).fetchone() lieblings_monat = None if monat_rows: _MONATE = ['Jan','Feb','Mär','Apr','Mai','Jun','Jul','Aug','Sep','Okt','Nov','Dez'] try: lieblings_monat = _MONATE[int(monat_rows["monat"]) - 1] except Exception: pass # Lieblingsaktivität (häufigster typ) typ_row = conn.execute( "SELECT typ, COUNT(*) AS n FROM diary " "WHERE dog_id=? AND strftime('%Y', datum)=? " "GROUP BY typ ORDER BY n DESC LIMIT 1", (dog_id, str(year)) ).fetchone() lieblings_aktivitaet = typ_row["typ"] if typ_row else None # Training-Sessions training_sessions = conn.execute( "SELECT COUNT(*) AS n FROM training_sessions " "WHERE dog_id=? AND strftime('%Y', created_at)=?", (dog_id, str(year)) ).fetchone()["n"] # Gesundheits-Einträge gesundheit_eintraege = conn.execute( "SELECT COUNT(*) AS n FROM health " "WHERE dog_id=? AND strftime('%Y', datum)=?", (dog_id, str(year)) ).fetchone()["n"] # Wetter-Tapferkeit: Tagebuch-Einträge mit weather_json wetter_kalt = 0 wetter_warm = 0 wetter_rows = conn.execute( "SELECT weather_json FROM diary " "WHERE dog_id=? AND strftime('%Y', datum)=? AND weather_json IS NOT NULL", (dog_id, str(year)) ).fetchall() for wr in wetter_rows: try: wj = _json.loads(wr["weather_json"]) temp = wj.get("temp_c") or wj.get("temperature") or wj.get("temp") if temp is not None: if float(temp) < 5: wetter_kalt += 1 elif float(temp) > 25: wetter_warm += 1 except Exception: pass return { "dog_id": dog_id, "dog_name": dog["name"], "year": year, "gesamt_km": gesamt_km, "gassi_tage": gassi_tage, "eintraege_gesamt": eintraege_gesamt, "fotos_gesamt": fotos_gesamt, "beste_route": beste_route, "lieblings_monat": lieblings_monat, "lieblings_aktivitaet": lieblings_aktivitaet, "training_sessions": training_sessions, "gesundheit_eintraege": gesundheit_eintraege, "wetter_kalt": wetter_kalt, "wetter_warm": wetter_warm, } @router.get("/{dog_id}/buch") async def get_hunde_buch( dog_id: int, jahr: int = None, limit: int = 50, nur_fotos: bool = False, nur_meilensteine: bool = False, user=Depends(get_current_user), ): """Hunde-Buch: druckbare HTML-Ansicht der schoensten Tagebucheintraege.""" import json as _json from datetime import date as _date from fastapi.responses import HTMLResponse from html import escape as _esc with db() as conn: dog = conn.execute( "SELECT id, name, rasse, geburtstag, foto_url FROM dogs WHERE id=? AND user_id=?", (dog_id, user["id"]) ).fetchone() if not dog: raise HTTPException(404, "Hund nicht gefunden.") dog = dict(dog) # --- Eintraege laden --- conditions = ["(d.dog_id=? OR dd.dog_id=?)"] params: list = [dog_id, dog_id] if jahr: conditions.append("strftime('%Y', d.datum) = ?") params.append(str(jahr)) if nur_meilensteine: conditions.append("d.is_milestone = 1") where = " AND ".join(conditions) rows = conn.execute( f"""SELECT DISTINCT d.id, d.datum, d.titel, d.text, d.tags, d.gps_lat, d.gps_lon, d.location_name, d.weather_json, d.is_milestone, (SELECT dm.url FROM diary_media dm WHERE dm.diary_id=d.id AND dm.media_type='image' ORDER BY dm.is_cover DESC, dm.sort_order LIMIT 1) AS cover_url FROM diary d LEFT JOIN diary_dogs dd ON dd.diary_id = d.id WHERE {where} AND d.datum IS NOT NULL ORDER BY d.datum ASC""", params ).fetchall() rows = [dict(r) for r in rows] # Filtern: Eintraege mit Foto bevorzugen / nur Fotos-Modus if nur_fotos: rows = [r for r in rows if r.get("cover_url")] else: # Prioritaet: Meilensteine + Foto-Eintraege; Rest auffuellen bis limit with_photo = [r for r in rows if r.get("cover_url")] milestones = [r for r in rows if r.get("is_milestone") and not r.get("cover_url")] rest = [r for r in rows if not r.get("cover_url") and not r.get("is_milestone")] rows = with_photo + milestones + rest rows.sort(key=lambda r: r["datum"] or "") rows = rows[:limit] # --- Hund-Alter berechnen --- alter_str = "" if dog.get("geburtstag"): try: geb = _date.fromisoformat(dog["geburtstag"]) heute = _date.today() jahre = (heute - geb).days // 365 alter_str = f"{jahre} Jahre" except Exception: pass # --- HTML bauen --- dog_name = _esc(dog["name"] or "Mein Hund") rasse_str = _esc(dog.get("rasse") or "") jahr_str = str(jahr) if jahr else "Alle Jahre" foto_url = dog.get("foto_url") or "" cover_img = ( f'{dog_name}' if foto_url else f'
🐾
' ) subtitle_parts = [p for p in [rasse_str, alter_str] if p] subtitle = " · ".join(subtitle_parts) _MONATE = ["Januar","Februar","März","April","Mai","Juni", "Juli","August","September","Oktober","November","Dezember"] def _fmt_datum(iso: str) -> str: try: d = _date.fromisoformat(iso) return f"{d.day}. {_MONATE[d.month - 1]} {d.year}" except Exception: return iso or "" def _wetter_chip(wj_str: str) -> str: if not wj_str: return "" try: wj = _json.loads(wj_str) temp = wj.get("temp_c") or wj.get("temperature") or wj.get("temp") if temp is None: return "" temp_i = int(float(temp)) emoji = "☀️" if temp_i > 20 else ("🌧️" if temp_i < 10 else "⛅") return f'{emoji} {temp_i}°C' except Exception: return "" entries_html = "" for e in rows: milestone_class = "milestone" if e.get("is_milestone") else "" datum_fmt = _fmt_datum(e.get("datum") or "") titel = _esc(e.get("titel") or "") text_raw = e.get("text") or "" text = _esc(text_raw).replace("\n", "
") wetter = _wetter_chip(e.get("weather_json") or "") loc = _esc(e.get("location_name") or "") cover = e.get("cover_url") or "" foto_html = "" if cover: foto_html = ( f'
' f'' f'
' ) loc_html = f'📍 {loc}' if loc else "" chips_html = f'
{wetter}{loc_html}
' if (wetter or loc_html) else "" titel_html = f'
{titel}
' if titel else "" text_html = f'
{text}
' if text_raw else "" entries_html += f"""
{foto_html}
{datum_fmt}
{titel_html} {text_html} {chips_html}
""" anzahl = len(rows) html_page = f""" Hunde-Buch — {dog_name}
{cover_img}

{dog_name}

{'
' + subtitle + '
' if subtitle else ''}
{jahr_str}
{anzahl} Einträge
{entries_html} """ return HTMLResponse(content=html_page) # ------------------------------------------------------------------ # GET /api/dogs/verstorben — Alle verstorbenen Hunde des Users # ------------------------------------------------------------------ @router.get("/verstorben") async def get_verstorbene_hunde(user=Depends(get_current_user)): with db() as conn: rows = conn.execute( """SELECT id, name, rasse, foto_url, verstorben_am, geburtstag FROM dogs WHERE user_id=? AND verstorben_am IS NOT NULL ORDER BY verstorben_am DESC""", (user["id"],) ).fetchall() return [dict(r) for r in rows] @router.get("/{dog_id}") async def get_dog(dog_id: int, user=Depends(get_current_user)): with db() as conn: dog = conn.execute( "SELECT * FROM dogs WHERE id=? AND user_id=?", (dog_id, user["id"]) ).fetchone() if not dog: raise HTTPException(404, "Hund nicht gefunden.") return dict(dog) @router.patch("/{dog_id}") async def update_dog(dog_id: int, data: DogUpdate, user=Depends(get_current_user)): fields = {k: v for k, v in data.model_dump().items() if v is not None} if not fields: raise HTTPException(400, "Keine Änderungen angegeben.") set_clause = ", ".join(f"{k}=?" for k in fields) values = list(fields.values()) + [dog_id, user["id"]] with db() as conn: updated = conn.execute( f"UPDATE dogs SET {set_clause} WHERE id=? AND user_id=?", values ).rowcount if not updated: raise HTTPException(404, "Hund nicht gefunden.") dog = conn.execute( "SELECT * FROM dogs WHERE id=? AND user_id=?", (dog_id, user["id"]) ).fetchone() return dict(dog) @router.delete("/{dog_id}", status_code=204) async def delete_dog(dog_id: int, user=Depends(get_current_user)): with db() as conn: conn.execute( "DELETE FROM dogs WHERE id=? AND user_id=?", (dog_id, user["id"]) ) @router.post("/{dog_id}/photo") async def upload_photo( dog_id: int, file: UploadFile = File(...), user=Depends(get_current_user) ): # Hund gehört dem User? Altes Foto merken für späteres Löschen. with db() as conn: dog = conn.execute( "SELECT id, foto_url FROM dogs WHERE id=? AND user_id=?", (dog_id, user["id"]) ).fetchone() if not dog: raise HTTPException(404, "Hund nicht gefunden.") old_foto_url = dog["foto_url"] # Datei immer als JPEG speichern (HEIC/PNG/WebP → kompatibel für alle Browser) import io from PIL import Image try: import pillow_heif pillow_heif.register_heif_opener() except ImportError: pass content = await file.read() try: from PIL import ImageOps img = Image.open(io.BytesIO(content)) img = ImageOps.exif_transpose(img) # EXIF-Orientierung anwenden img = img.convert("RGB") buf = io.BytesIO() img.save(buf, format="JPEG", quality=90) content = buf.getvalue() except Exception: pass # Fallback: Originaldaten speichern filename = f"dog_{dog_id}_{uuid.uuid4().hex[:8]}.jpg" path = os.path.join(MEDIA_DIR, "dogs", filename) os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "wb") as f: f.write(content) foto_url = f"/media/dogs/{filename}" with db() as conn: conn.execute("UPDATE dogs SET foto_url=? WHERE id=?", (foto_url, dog_id)) # Altes Foto von Disk löschen if old_foto_url: try: old_path = safe_media_path(MEDIA_DIR, old_foto_url) if old_path and os.path.isfile(old_path): os.remove(old_path) except Exception: pass return {"foto_url": foto_url} class PhotoPosition(BaseModel): zoom: float = 1.0 offset_x: float = 0.0 offset_y: float = 0.0 @router.patch("/{dog_id}/photo-position") async def update_photo_position(dog_id: int, pos: PhotoPosition, user=Depends(get_current_user)): with db() as conn: updated = conn.execute( "UPDATE dogs SET foto_zoom=?, foto_offset_x=?, foto_offset_y=? WHERE id=? AND user_id=?", (pos.zoom, pos.offset_x, pos.offset_y, dog_id, user["id"]) ).rowcount if not updated: raise HTTPException(404, "Hund nicht gefunden.") return {"ok": True} @router.delete("/{dog_id}/photo", status_code=204) async def delete_photo(dog_id: int, user=Depends(get_current_user)): with db() as conn: row = conn.execute( "SELECT foto_url FROM dogs WHERE id=? AND user_id=?", (dog_id, user["id"]) ).fetchone() if not row: raise HTTPException(404, "Hund nicht gefunden.") if row["foto_url"]: path = safe_media_path(MEDIA_DIR, row["foto_url"]) if path and os.path.exists(path): os.remove(path) with db() as conn: conn.execute( "UPDATE dogs SET foto_url=NULL, foto_zoom=1.0, foto_offset_x=0.0, foto_offset_y=0.0 WHERE id=? AND user_id=?", (dog_id, user["id"]) ) # ------------------------------------------------------------------ # Fähigkeiten / Kommandos (für Profil + öffentliche Seite) # ------------------------------------------------------------------ def _parse_exercise_name(exercise_id: str) -> str: """grundkommandos_Hier__Komm → 'Hier / Komm'""" parts = exercise_id.split("_", 1) if len(parts) < 2: return exercise_id return parts[1].replace("__", " / ").replace("_", " ") def _load_skills(conn, dog_id: int, user_id: int) -> list: """Gibt Übungen mit Status 'sitzt' oder 'meistens' zurück, die mit diesem Hund trainiert wurden.""" rows = conn.execute( """ SELECT ep.exercise_id, ep.status, (SELECT ts.exercise_name FROM training_sessions ts WHERE ts.user_id = ep.user_id AND ts.dog_id = ? AND ts.exercise_id = ep.exercise_id ORDER BY ts.datum DESC, ts.created_at DESC LIMIT 1) AS exercise_name FROM exercise_progress ep WHERE ep.user_id = ? AND ep.status IN ('sitzt', 'meistens') AND EXISTS (SELECT 1 FROM training_sessions ts2 WHERE ts2.user_id = ep.user_id AND ts2.dog_id = ? AND ts2.exercise_id = ep.exercise_id) ORDER BY ep.status DESC, ep.exercise_id """, (dog_id, user_id, dog_id) ).fetchall() return [ { "exercise_id": r["exercise_id"], "exercise_name": r["exercise_name"] or _parse_exercise_name(r["exercise_id"]), "status": r["status"], "tab": r["exercise_id"].split("_")[0], } for r in rows ] @router.get("/{dog_id}/skills") async def get_dog_skills(dog_id: int, user=Depends(get_current_user)): uid = user["id"] with db() as conn: dog = conn.execute( "SELECT id, user_id FROM dogs WHERE id=? AND (user_id=? OR id IN (SELECT dog_id FROM sitting_subscriptions WHERE sitter_id=? AND valid_until >= date('now')))", (dog_id, uid, uid) ).fetchone() if not dog: raise HTTPException(404, "Hund nicht gefunden.") return _load_skills(conn, dog_id, dog["user_id"]) # Öffentliches Profil (für NFC-Tag, kein Login nötig) @router.get("/public/{dog_id}") async def public_dog_profile(dog_id: int): with db() as conn: dog = conn.execute( """SELECT d.id, d.name, d.rasse, d.geburtstag, d.foto_url, d.bio, d.user_id, u.name as besitzer_name FROM dogs d JOIN users u ON d.user_id=u.id WHERE d.id=? AND d.is_public=1""", (dog_id,) ).fetchone() if not dog: raise HTTPException(404, "Profil nicht gefunden oder nicht öffentlich.") skills = _load_skills(conn, dog_id, dog["user_id"]) result = dict(dog) result.pop("user_id", None) result["skills"] = skills return result class FoundReport(BaseModel): message: Optional[str] = Field(None, max_length=1000) kontakt: Optional[str] = Field(None, max_length=300) # Gefunden-Meldung (kein Login nötig) @router.post("/public/{dog_id}/found") async def report_found(dog_id: int, data: FoundReport = FoundReport()): with db() as conn: row = conn.execute( """SELECT d.id, d.name, d.user_id FROM dogs d WHERE d.id=? AND d.is_public=1""", (dog_id,) ).fetchone() if not row: raise HTTPException(404, "Profil nicht gefunden oder nicht öffentlich.") dog_name = row["name"] user_id = row["user_id"] body = data.message.strip() if data.message and data.message.strip() \ else "Jemand hat deinen Hund gefunden. Öffne die App für Details." if data.kontakt and data.kontakt.strip(): body += f" Kontakt: {data.kontakt.strip()}" send_push_to_user(user_id, { "title": f"🐾 {dog_name} wurde gefunden!", "body": body, "data": {"page": "diary", "found": True}, "tag": f"found-{dog_id}", }) return {"ok": True} # ------------------------------------------------------------------ # GET /api/dogs/{id}/pflege — Pflegetipps für diesen Hund # ------------------------------------------------------------------ @router.get("/{dog_id}/pflege") async def get_pflege_tipps(dog_id: int, user=Depends(get_current_user)): import json as _json with db() as conn: dog = conn.execute( "SELECT id, name, rasse, rasse_id FROM dogs WHERE id=? AND user_id=?", (dog_id, user["id"]) ).fetchone() if not dog: raise HTTPException(404, "Hund nicht gefunden.") # Rassen-Infos für Fell-Typ rasse_info = None with db() as conn: if dog["rasse_id"]: rasse_info = conn.execute( "SELECT name, groesse, beschreibung FROM wiki_rassen WHERE id=?", (dog["rasse_id"],) ).fetchone() elif dog["rasse"]: rasse_info = conn.execute( "SELECT name, groesse, beschreibung FROM wiki_rassen WHERE name LIKE ? LIMIT 1", (f"%{dog['rasse']}%",) ).fetchone() # Fell-Typ und Pflegeart ableiten fell_filter = None fell_pflege_art_filter = None if rasse_info: beschr = (rasse_info["beschreibung"] or "").lower() if any(w in beschr for w in ["lockig", "wellig", "kraus", "pudel", "doodle"]): fell_filter = "lockig" elif any(w in beschr for w in ["langhaar", "seidiges", "fließendes", "langes fell"]): fell_filter = "lang" elif any(w in beschr for w in ["kurzhaar", "kurzes fell", "glatthaarig"]): fell_filter = "kurz" elif rasse_info["groesse"] in ("gross", "sehr_gross"): fell_filter = "doppel" # Pflegeart: Trimmen vs. Schneiden if any(w in beschr for w in ["trimm", "hand-stripping", "stripping", "rauhhaar", "drahthaar", "rauhaar"]): fell_pflege_art_filter = "trimmen" elif any(w in beschr for w in ["schneid", "geschoren", "schere", "clipper"]): fell_pflege_art_filter = "schneiden" # Statische Tipps aus Cache (1h TTL) – Filterung passiert in-memory alle_tipps = _load_all_pflege_tipps() # Relevante Tipps: kein Fell-Filter oder passend from datetime import date heute_saison = {1:"winter",2:"winter",3:"fruehling",4:"fruehling",5:"fruehling", 6:"sommer",7:"sommer",8:"sommer",9:"herbst",10:"herbst", 11:"herbst",12:"winter"}[date.today().month] result = [] for t in alle_tipps: t = dict(t) # Fell-Typ-Filter if fell_filter and t["fell_typ"] and t["fell_typ"] != "alle": if fell_filter not in t["fell_typ"].split(","): continue # Pflegeart-Filter: Trimm-Tipps nicht bei Schneidehunden und umgekehrt tipp_art = t.get("fell_pflege_art") if tipp_art and tipp_art != "alle" and fell_pflege_art_filter: if tipp_art != fell_pflege_art_filter: continue t["schritte"] = _json.loads(t["schritte"] or "[]") t["saisonal_aktuell"] = bool(t["saison"] and heute_saison in t["saison"]) result.append(t) # Tipp des Tages: erster aktuell-saisonaler oder zufällig deterministisch from hashlib import md5 day_hash = int(md5(str(date.today()).encode()).hexdigest(), 16) saisonal = [t for t in result if t["saisonal_aktuell"]] tipp_des_tages = (saisonal or result)[day_hash % len(saisonal or result)] if result else None return { "dog_name": dog["name"], "rasse_name": rasse_info["name"] if rasse_info else dog["rasse"], "tipp_des_tages": tipp_des_tages, "tipps": result, "kategorien": list(dict.fromkeys(t["kategorie"] for t in result)), "fell_pflege_art": fell_pflege_art_filter, # 'schneiden' | 'trimmen' | None } # ------------------------------------------------------------------ # LEBENS-TIMELINE # ------------------------------------------------------------------ @router.get("/{dog_id}/timeline") async def get_dog_timeline(dog_id: int, user=Depends(get_current_user)): """Aggregierte Lebens-Timeline eines Hundes aus allen Datenquellen.""" import json as _json with db() as conn: dog = conn.execute( "SELECT id, name, user_id, geburtstag FROM dogs WHERE id=? AND user_id=?", (dog_id, user["id"]) ).fetchone() if not dog: raise HTTPException(404, "Hund nicht gefunden.") events = [] with db() as conn: # --- Tagebuch --- diary_rows = conn.execute( """SELECT d.id, d.datum, d.titel, d.typ, d.is_milestone, dm.url AS foto_url FROM diary d LEFT JOIN diary_media dm ON dm.diary_id = d.id AND dm.sort_order = 0 WHERE d.dog_id=? ORDER BY d.datum ASC, d.id ASC""", (dog_id,) ).fetchall() for i, r in enumerate(diary_rows): events.append({ "datum": r["datum"], "kategorie": "tagebuch", "titel": r["titel"] or ("Tagebucheintrag" if r["typ"] == "eintrag" else str(r["typ"]).capitalize()), "typ": r["typ"], "is_first": i == 0, "is_milestone": bool(r["is_milestone"]), "foto_url": r["foto_url"], "ref_id": r["id"], }) # --- Gesundheit --- health_rows = conn.execute( """SELECT id, datum, bezeichnung, typ FROM health WHERE dog_id=? ORDER BY datum ASC, id ASC""", (dog_id,) ).fetchall() typ_seen = {} for r in health_rows: t = r["typ"] is_first = t not in typ_seen if is_first: typ_seen[t] = True events.append({ "datum": r["datum"], "kategorie": "gesundheit", "titel": r["bezeichnung"], "typ": t, "is_first": is_first, "is_milestone": False, "foto_url": None, "ref_id": r["id"], }) # --- Training-Sessions --- ts_rows = conn.execute( """SELECT id, datum, exercise_name, erfolgsquote, ist_top FROM training_sessions WHERE dog_id=? AND user_id=? ORDER BY datum ASC, id ASC""", (dog_id, user["id"]) ).fetchall() ts_first = True ts_best = None ts_best_score = -1 for r in ts_rows: if r["erfolgsquote"] is not None and r["erfolgsquote"] > ts_best_score: ts_best_score = r["erfolgsquote"] ts_best = r for i, r in enumerate(ts_rows): is_first = (i == 0) is_best = ts_best and r["id"] == ts_best["id"] and i > 0 events.append({ "datum": r["datum"], "kategorie": "training", "titel": r["exercise_name"], "typ": "training", "is_first": is_first, "is_milestone": bool(r["ist_top"]) or is_best, "foto_url": None, "ref_id": r["id"], }) # --- Routen (nur Routen wo dieser Hund mitgegangen ist) --- route_rows = conn.execute( """SELECT r.id, r.name, r.distanz_km, date(r.created_at) AS datum FROM routes r JOIN route_dogs rd ON rd.route_id = r.id AND rd.dog_id = ? WHERE r.user_id = ? ORDER BY r.created_at ASC""", (dog_id, user["id"]) ).fetchall() route_first = True route_longest = None route_max_km = -1 for r in route_rows: km = r["distanz_km"] or 0 if km > route_max_km: route_max_km = km route_longest = r for i, r in enumerate(route_rows): is_first = (i == 0) is_longest = route_longest and r["id"] == route_longest["id"] and i > 0 events.append({ "datum": r["datum"], "kategorie": "route", "titel": r["name"], "typ": "route", "is_first": is_first, "is_milestone": is_longest, "foto_url": None, "ref_id": r["id"], "distanz_km": r["distanz_km"], }) # Geburtstag des Hundes als erster Eintrag if dog["geburtstag"]: events.append({ "datum": dog["geburtstag"], "kategorie": "meilenstein", "titel": f"{dog['name']} wird geboren", "typ": "geburtstag", "is_first": True, "is_milestone": True, "foto_url": None, "ref_id": None, }) # Chronologisch sortieren events.sort(key=lambda e: (e["datum"] or "0000-00-00", e["kategorie"])) return { "dog_name": dog["name"], "geburtstag": dog["geburtstag"], "events": events, } # ------------------------------------------------------------------ # POST /api/dogs/{id}/gedenken — Hund als verstorben markieren # ------------------------------------------------------------------ class GedenkenData(BaseModel): verstorben_am: str = Field(..., max_length=32) # YYYY-MM-DD @router.post("/{dog_id}/gedenken") async def mark_verstorben(dog_id: int, data: GedenkenData, user=Depends(get_current_user)): with db() as conn: updated = conn.execute( "UPDATE dogs SET verstorben_am=? WHERE id=? AND user_id=?", (data.verstorben_am, dog_id, user["id"]) ).rowcount if not updated: raise HTTPException(404, "Hund nicht gefunden.") return {"ok": True} # ------------------------------------------------------------------ # GET /api/dogs/{id}/gedenkseite — Memorial-Daten # ------------------------------------------------------------------ @router.get("/{dog_id}/gedenkseite") async def get_gedenkseite(dog_id: int, user=Depends(get_current_user)): with db() as conn: dog = conn.execute( "SELECT * FROM dogs WHERE id=? AND user_id=?", (dog_id, user["id"]) ).fetchone() if not dog: raise HTTPException(404) dog = dict(dog) # Statistiken km_total = conn.execute( "SELECT COALESCE(ROUND(SUM(distanz_km),1),0) AS km FROM routes r " "JOIN route_dogs rd ON rd.route_id=r.id WHERE rd.dog_id=?", (dog_id,) ).fetchone()["km"] diary_count = conn.execute( "SELECT COUNT(*) FROM diary WHERE dog_id=?", (dog_id,) ).fetchone()[0] media_count = conn.execute( "SELECT COUNT(*) FROM diary_media dm JOIN diary d ON d.id=dm.diary_id " "WHERE d.dog_id=? AND dm.media_type='image'", (dog_id,) ).fetchone()[0] training_count = conn.execute( "SELECT COUNT(*) FROM training_sessions WHERE dog_id=?", (dog_id,) ).fetchone()[0] # Letzter Tagebucheintrag last_entry = conn.execute( "SELECT titel, datum FROM diary WHERE dog_id=? ORDER BY datum DESC LIMIT 1", (dog_id,) ).fetchone() # Erste und letzte Aufnahme first_entry = conn.execute( "SELECT datum FROM diary WHERE dog_id=? ORDER BY datum ASC LIMIT 1", (dog_id,) ).fetchone() # Letzte 6 Fotos für Galerie photos = conn.execute( """SELECT dm.url FROM diary_media dm JOIN diary d ON d.id=dm.diary_id WHERE d.dog_id=? AND dm.media_type='image' AND dm.img_width IS NOT NULL AND dm.img_width > dm.img_height ORDER BY d.datum DESC, dm.id DESC LIMIT 6""", (dog_id,) ).fetchall() if not photos: photos = conn.execute( """SELECT dm.url FROM diary_media dm JOIN diary d ON d.id=dm.diary_id WHERE d.dog_id=? AND dm.media_type='image' ORDER BY d.datum DESC, dm.id DESC LIMIT 6""", (dog_id,) ).fetchall() # Gemeinsame Zeit berechnen joined = dog.get("geburtstag") or (first_entry["datum"] if first_entry else None) passed = dog.get("verstorben_am") gemeinsam_tage = None if joined and passed: try: from datetime import date as _date d1 = _date.fromisoformat(joined) d2 = _date.fromisoformat(passed) gemeinsam_tage = (d2 - d1).days except Exception: pass return { "dog": dog, "km_total": km_total, "diary_count": diary_count, "media_count": media_count, "training_count": training_count, "last_entry": dict(last_entry) if last_entry else None, "gemeinsam_tage": gemeinsam_tage, "photos": [r["url"] for r in photos], }