banyaro/backend/routes/dogs.py

1400 lines
49 KiB
Python

"""BAN YARO — Hunde-Profil Routes"""
import os
import uuid
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File
from pydantic import BaseModel
from typing import Optional
from database import db
from auth import get_current_user, has_pro_access
from routes.push import send_push_to_user
from media_utils import safe_media_path, preview_url_from
router = APIRouter()
MEDIA_DIR = os.getenv("MEDIA_DIR", "/data/media")
class DogCreate(BaseModel):
name: str
rasse: Optional[str] = None
geburtstag: Optional[str] = None
geschlecht: Optional[str] = None
gewicht_kg: Optional[float] = None
widerrist_cm: Optional[float] = None
chip_nr: Optional[str] = None
bio: Optional[str] = None
is_public: bool = False
class DogUpdate(BaseModel):
name: Optional[str] = None
rasse: Optional[str] = None
rasse_id: Optional[int] = None
geburtstag: Optional[str] = None
geschlecht: Optional[str] = None
gewicht_kg: Optional[float] = None
widerrist_cm: Optional[float] = None
chip_nr: Optional[str] = None
bio: Optional[str] = None
is_public: Optional[bool] = None
@router.get("")
async def list_dogs(user=Depends(get_current_user)):
with db() as conn:
own = conn.execute(
"SELECT *, NULL AS shared_by, NULL AS share_role FROM dogs WHERE user_id=? AND (verstorben_am IS NULL) AND (is_active IS NULL OR is_active=1) ORDER BY id",
(user["id"],)
).fetchall()
shared = conn.execute(
"""SELECT d.*, u.name AS shared_by, ds.role AS share_role
FROM dog_shares ds
JOIN dogs d ON d.id = ds.dog_id
JOIN users u ON u.id = ds.owner_id
WHERE ds.shared_with_id = ? AND ds.accepted_at IS NOT NULL""",
(user["id"],)
).fetchall()
guest_rows = conn.execute("""
SELECT d.*, ss.id AS sub_id, ss.valid_until AS sitting_until,
u.name AS owner_name, NULL AS shared_by, NULL AS share_role
FROM sitting_subscriptions ss
JOIN dogs d ON d.id = ss.dog_id
JOIN users u ON u.id = ss.owner_id
WHERE ss.sitter_id = ?
AND ss.valid_until >= date('now')
""", (user["id"],)).fetchall()
result = []
for r in own:
d = dict(r)
d["is_guest"] = False
result.append(d)
for r in shared:
d = dict(r)
d["is_guest"] = False
result.append(d)
for r in guest_rows:
d = dict(r)
d["is_guest"] = True
result.append(d)
# HdM-Siege pro Hund anhängen
if result:
dog_ids = [d["id"] for d in result]
with db() as conn:
wins_rows = conn.execute(
f"SELECT dog_id, monat FROM hund_des_monats_wins WHERE dog_id IN ({','.join('?'*len(dog_ids))}) ORDER BY monat DESC",
dog_ids,
).fetchall()
wins_map: dict[int, list[str]] = {}
for w in wins_rows:
wins_map.setdefault(w["dog_id"], []).append(w["monat"])
for d in result:
d["hdm_wins"] = wins_map.get(d["id"], [])
return result
def _is_plausible_dog(name: str, rasse: str, geburtstag) -> tuple[bool, str]:
"""Einfache Plausibilitätsprüfung für Hunde-Profile."""
import re, datetime
name = (name or "").strip()
rasse = (rasse or "").strip()
if len(name) < 2:
return False, "Der Name muss mindestens 2 Zeichen haben."
if not re.search(r'[a-zA-ZäöüÄÖÜß]', name):
return False, "Der Name muss mindestens einen Buchstaben enthalten."
if len(set(name.lower())) < 2:
return False, "Bitte einen echten Namen eingeben."
if rasse and len(rasse) < 2:
return False, "Bitte eine gültige Rasse eingeben."
if rasse and not re.search(r'[a-zA-ZäöüÄÖÜß]', rasse):
return False, "Die Rasse muss Buchstaben enthalten."
if geburtstag:
try:
if isinstance(geburtstag, str):
year = int(geburtstag[:4])
else:
year = geburtstag.year
now = datetime.date.today().year
if year > now:
return False, "Das Geburtsdatum liegt in der Zukunft."
if year < now - 30:
return False, "Das Geburtsdatum ist unrealistisch."
except Exception:
pass
return True, ""
@router.post("")
async def create_dog(data: DogCreate, user=Depends(get_current_user)):
with db() as conn:
existing = conn.execute(
"SELECT COUNT(*) FROM dogs WHERE user_id=?", (user["id"],)
).fetchone()[0]
if existing >= 1 and not has_pro_access(user):
raise HTTPException(
status_code=403,
detail="Mehrere Hunde sind ein Pro-Feature. Upgrade auf Ban Yaro Pro, um weitere Hunde anzulegen."
)
conn.execute(
"""INSERT INTO dogs (user_id, name, rasse, geburtstag, geschlecht,
gewicht_kg, widerrist_cm, chip_nr, bio, is_public)
VALUES (?,?,?,?,?,?,?,?,?,?)""",
(user["id"], data.name, data.rasse, data.geburtstag,
data.geschlecht, data.gewicht_kg, data.widerrist_cm,
data.chip_nr, data.bio, int(data.is_public))
)
dog = conn.execute(
"SELECT * FROM dogs WHERE user_id=? ORDER BY id DESC LIMIT 1",
(user["id"],)
).fetchone()
# Gründer-Aktivierung: erstes Hunde-Profil + is_founder_pending
user_row = conn.execute(
"SELECT is_founder_pending, is_founder FROM users WHERE id=?",
(user["id"],)
).fetchone()
if user_row and user_row["is_founder_pending"] and not user_row["is_founder"]:
dog_count = conn.execute(
"SELECT COUNT(*) FROM dogs WHERE user_id=?", (user["id"],)
).fetchone()[0]
if dog_count == 1: # genau dieser erste Hund
plausible, reason = _is_plausible_dog(data.name, data.rasse, data.geburtstag)
if plausible:
total = conn.execute(
"SELECT COUNT(*) FROM users WHERE is_founder=1"
).fetchone()[0]
if total < 100:
conn.execute(
"UPDATE users SET is_founder=1, founder_number=?, is_founder_pending=0 WHERE id=?",
(total + 1, user["id"])
)
return dict(dog)
@router.get("/{dog_id}/welcome-dashboard")
async def get_welcome_dashboard(dog_id: int, user=Depends(get_current_user)):
"""Liefert kompakte Dashboard-Daten für die Welcome-Ansicht eines Hundes."""
import random as _random
with db() as conn:
# Besitz prüfen
dog = conn.execute(
"SELECT id FROM dogs WHERE id=? AND user_id=?", (dog_id, user["id"])
).fetchone()
if not dog:
raise HTTPException(404, "Hund nicht gefunden.")
# Hintergrundfoto: Querformat-Bilder bevorzugt, tagesweise rotierend
# Ownership bereits durch Dog-Check oben gesichert (dog gehört user)
photos = conn.execute(
"""SELECT dm.url FROM diary_media dm
JOIN diary d ON d.id = dm.diary_id
WHERE d.dog_id=? AND dm.media_type='image'
AND dm.img_width IS NOT NULL AND dm.img_width > dm.img_height
ORDER BY d.datum DESC, d.id DESC, dm.id ASC""",
(dog_id,)
).fetchall()
# Fallback: Bilder ohne Dimensionsdaten (vor dem Backfill hochgeladen)
if not photos:
photos = conn.execute(
"""SELECT dm.url FROM diary_media dm
JOIN diary d ON d.id = dm.diary_id
WHERE d.dog_id=? AND dm.media_type='image'
AND dm.img_width IS NULL
ORDER BY d.datum DESC, d.id DESC, dm.id ASC""",
(dog_id,)
).fetchall()
random_photo = None
if photos:
import datetime as _dt2
tick = (_dt2.date.today() - _dt2.date(2024, 1, 1)).days
chosen_url = photos[tick % len(photos)]["url"]
random_photo = {
"url": chosen_url,
"preview_url": preview_url_from(chosen_url),
}
# Neuester Tagebucheintrag
last_diary_row = conn.execute(
"SELECT titel, datum FROM diary WHERE dog_id=? ORDER BY datum DESC LIMIT 1",
(dog_id,)
).fetchone()
last_diary = dict(last_diary_row) if last_diary_row else None
# Nächster Termin (kein Gewicht, nur innerhalb 60 Tage)
next_appt_row = conn.execute(
"""SELECT bezeichnung, naechstes, typ FROM health
WHERE dog_id=? AND naechstes IS NOT NULL
AND naechstes >= date('now')
AND naechstes <= date('now', '+60 days')
AND typ != 'gewicht'
ORDER BY naechstes ASC LIMIT 1""",
(dog_id,)
).fetchone()
next_appointment = dict(next_appt_row) if next_appt_row else None
# Letztes Gewicht
last_weight_row = conn.execute(
"""SELECT wert, einheit, datum FROM health
WHERE dog_id=? AND typ='gewicht'
ORDER BY datum DESC LIMIT 1""",
(dog_id,)
).fetchone()
last_weight = dict(last_weight_row) if last_weight_row else None
# Anzahl Tagebucheinträge
diary_count = conn.execute(
"SELECT COUNT(*) AS n FROM diary WHERE dog_id=?", (dog_id,)
).fetchone()["n"]
# Tagesübung — JOIN mit training_exercises via js_exercise_id, tagesstabil
import datetime as _dt
day_num = (_dt.date.today() - _dt.date(2024, 1, 1)).days
# Versuche JOIN (funktioniert wenn js_exercise_id-Spalte vorhanden)
# Nur Übungen des aktiven Hundes, 'sitzt' ausschließen
try:
joined = conn.execute(
"""SELECT ep.exercise_id, te.name, te.kategorie AS kategorie_raw,
te.schwierigkeit, te.js_exercise_id
FROM exercise_progress ep
JOIN training_exercises te ON te.js_exercise_id = ep.exercise_id
WHERE ep.dog_id = ? AND ep.status IN ('noch-nicht', 'manchmal', 'meistens')
ORDER BY ep.updated_at ASC LIMIT 50""",
(dog_id,)
).fetchall()
except Exception:
joined = []
daily_exercise = None
if joined:
row = joined[day_num % len(joined)]
# Tab-ID aus exercise_id ableiten (alles vor erstem '_' + '_')
ex_id = row["exercise_id"]
tab = ex_id.split('_')[0] if '_' in ex_id else ex_id
daily_exercise = {
"exercise_id": ex_id,
"name": row["name"],
"kategorie": tab,
"schwierigkeit": row["schwierigkeit"],
}
else:
# Fallback: exercise_progress ohne JOIN (Legacy-IDs ohne Matching in DB)
_KNOWN_PREFIXES = (
'grundkommandos_', 'tricks_', 'problemverhalten_',
'mentale-auslastung_', 'hundesport_', 'koerperpflege_', 'welpe-basics_',
)
raw = conn.execute(
"""SELECT exercise_id FROM exercise_progress
WHERE dog_id = ? AND status IN ('noch-nicht', 'manchmal', 'meistens')
ORDER BY updated_at ASC LIMIT 50""",
(dog_id,)
).fetchall()
valid = [r["exercise_id"] for r in raw
if any(r["exercise_id"].startswith(p) for p in _KNOWN_PREFIXES)]
if valid:
ex_id = valid[day_num % len(valid)]
for prefix in _KNOWN_PREFIXES:
if ex_id.startswith(prefix):
tab = prefix.rstrip('_')
name = ex_id[len(prefix):].replace('_', ' ')
daily_exercise = {"exercise_id": ex_id, "name": name, "kategorie": tab}
break
return {
"random_photo": random_photo,
"last_diary": last_diary,
"next_appointment": next_appointment,
"last_weight": last_weight,
"diary_count": diary_count,
"daily_exercise": daily_exercise,
}
@router.get("/{dog_id}/wrapped")
async def get_dog_wrapped(dog_id: int, year: int = None, user=Depends(get_current_user)):
"""Jahresrückblick ('Wrapped') für einen Hund."""
import json as _json
from datetime import date as _date
if year is None:
year = _date.today().year
with db() as conn:
dog = conn.execute(
"SELECT id, name, user_id FROM dogs WHERE id=? AND user_id=?",
(dog_id, user["id"])
).fetchone()
if not dog:
raise HTTPException(404, "Hund nicht gefunden.")
# km gelaufen (eigene Routen des Users)
gesamt_km_row = conn.execute(
"SELECT ROUND(COALESCE(SUM(distanz_km),0),1) AS km FROM routes "
"WHERE user_id=? AND strftime('%Y', created_at)=?",
(user["id"], str(year))
).fetchone()
gesamt_km = gesamt_km_row["km"] or 0.0
# Gassi-Tage (Distinct Datum in Diary)
gassi_tage = conn.execute(
"SELECT COUNT(DISTINCT datum) AS n FROM diary "
"WHERE dog_id=? AND strftime('%Y', datum)=?",
(dog_id, str(year))
).fetchone()["n"]
# Gesamte Einträge
eintraege_gesamt = conn.execute(
"SELECT COUNT(*) AS n FROM diary "
"WHERE dog_id=? AND strftime('%Y', datum)=?",
(dog_id, str(year))
).fetchone()["n"]
# Fotos gesamt
fotos_gesamt = conn.execute(
"SELECT COUNT(*) AS n FROM diary_media dm "
"JOIN diary d ON d.id=dm.diary_id "
"WHERE d.dog_id=? AND strftime('%Y', d.datum)=? AND dm.media_type='image'",
(dog_id, str(year))
).fetchone()["n"]
# Beste Route (längste distanz)
beste_route_row = conn.execute(
"SELECT MAX(distanz_km) AS km FROM routes "
"WHERE user_id=? AND strftime('%Y', created_at)=?",
(user["id"], str(year))
).fetchone()
beste_route = beste_route_row["km"] or 0.0
# Lieblingsmonat (meiste diary-Einträge)
monat_rows = conn.execute(
"SELECT strftime('%m', datum) AS monat, COUNT(*) AS n FROM diary "
"WHERE dog_id=? AND strftime('%Y', datum)=? "
"GROUP BY monat ORDER BY n DESC LIMIT 1",
(dog_id, str(year))
).fetchone()
lieblings_monat = None
if monat_rows:
_MONATE = ['Jan','Feb','Mär','Apr','Mai','Jun','Jul','Aug','Sep','Okt','Nov','Dez']
try:
lieblings_monat = _MONATE[int(monat_rows["monat"]) - 1]
except Exception:
pass
# Lieblingsaktivität (häufigster typ)
typ_row = conn.execute(
"SELECT typ, COUNT(*) AS n FROM diary "
"WHERE dog_id=? AND strftime('%Y', datum)=? "
"GROUP BY typ ORDER BY n DESC LIMIT 1",
(dog_id, str(year))
).fetchone()
lieblings_aktivitaet = typ_row["typ"] if typ_row else None
# Training-Sessions
training_sessions = conn.execute(
"SELECT COUNT(*) AS n FROM training_sessions "
"WHERE dog_id=? AND strftime('%Y', created_at)=?",
(dog_id, str(year))
).fetchone()["n"]
# Gesundheits-Einträge
gesundheit_eintraege = conn.execute(
"SELECT COUNT(*) AS n FROM health "
"WHERE dog_id=? AND strftime('%Y', datum)=?",
(dog_id, str(year))
).fetchone()["n"]
# Wetter-Tapferkeit: Tagebuch-Einträge mit weather_json
wetter_kalt = 0
wetter_warm = 0
wetter_rows = conn.execute(
"SELECT weather_json FROM diary "
"WHERE dog_id=? AND strftime('%Y', datum)=? AND weather_json IS NOT NULL",
(dog_id, str(year))
).fetchall()
for wr in wetter_rows:
try:
wj = _json.loads(wr["weather_json"])
temp = wj.get("temp_c") or wj.get("temperature") or wj.get("temp")
if temp is not None:
if float(temp) < 5:
wetter_kalt += 1
elif float(temp) > 25:
wetter_warm += 1
except Exception:
pass
return {
"dog_id": dog_id,
"dog_name": dog["name"],
"year": year,
"gesamt_km": gesamt_km,
"gassi_tage": gassi_tage,
"eintraege_gesamt": eintraege_gesamt,
"fotos_gesamt": fotos_gesamt,
"beste_route": beste_route,
"lieblings_monat": lieblings_monat,
"lieblings_aktivitaet": lieblings_aktivitaet,
"training_sessions": training_sessions,
"gesundheit_eintraege": gesundheit_eintraege,
"wetter_kalt": wetter_kalt,
"wetter_warm": wetter_warm,
}
@router.get("/{dog_id}/buch")
async def get_hunde_buch(
dog_id: int,
jahr: int = None,
limit: int = 50,
nur_fotos: bool = False,
nur_meilensteine: bool = False,
user=Depends(get_current_user),
):
"""Hunde-Buch: druckbare HTML-Ansicht der schoensten Tagebucheintraege."""
import json as _json
from datetime import date as _date
from fastapi.responses import HTMLResponse
from html import escape as _esc
with db() as conn:
dog = conn.execute(
"SELECT id, name, rasse, geburtstag, foto_url FROM dogs WHERE id=? AND user_id=?",
(dog_id, user["id"])
).fetchone()
if not dog:
raise HTTPException(404, "Hund nicht gefunden.")
dog = dict(dog)
# --- Eintraege laden ---
conditions = ["(d.dog_id=? OR dd.dog_id=?)"]
params: list = [dog_id, dog_id]
if jahr:
conditions.append("strftime('%Y', d.datum) = ?")
params.append(str(jahr))
if nur_meilensteine:
conditions.append("d.is_milestone = 1")
where = " AND ".join(conditions)
rows = conn.execute(
f"""SELECT DISTINCT d.id, d.datum, d.titel, d.text, d.tags,
d.gps_lat, d.gps_lon, d.location_name, d.weather_json,
d.is_milestone,
(SELECT dm.url FROM diary_media dm
WHERE dm.diary_id=d.id AND dm.media_type='image'
ORDER BY dm.is_cover DESC, dm.sort_order LIMIT 1) AS cover_url
FROM diary d
LEFT JOIN diary_dogs dd ON dd.diary_id = d.id
WHERE {where}
AND d.datum IS NOT NULL
ORDER BY d.datum ASC""",
params
).fetchall()
rows = [dict(r) for r in rows]
# Filtern: Eintraege mit Foto bevorzugen / nur Fotos-Modus
if nur_fotos:
rows = [r for r in rows if r.get("cover_url")]
else:
# Prioritaet: Meilensteine + Foto-Eintraege; Rest auffuellen bis limit
with_photo = [r for r in rows if r.get("cover_url")]
milestones = [r for r in rows if r.get("is_milestone") and not r.get("cover_url")]
rest = [r for r in rows if not r.get("cover_url") and not r.get("is_milestone")]
rows = with_photo + milestones + rest
rows.sort(key=lambda r: r["datum"] or "")
rows = rows[:limit]
# --- Hund-Alter berechnen ---
alter_str = ""
if dog.get("geburtstag"):
try:
geb = _date.fromisoformat(dog["geburtstag"])
heute = _date.today()
jahre = (heute - geb).days // 365
alter_str = f"{jahre} Jahre"
except Exception:
pass
# --- HTML bauen ---
dog_name = _esc(dog["name"] or "Mein Hund")
rasse_str = _esc(dog.get("rasse") or "")
jahr_str = str(jahr) if jahr else "Alle Jahre"
foto_url = dog.get("foto_url") or ""
cover_img = (
f'<img src="{_esc(foto_url)}" alt="{dog_name}" '
f'style="border-radius:50%;width:200px;height:200px;object-fit:cover;'
f'border:4px solid #b97c2a;margin-bottom:24px;" onerror="this.style.display=\'none\'">'
if foto_url else
f'<div style="width:200px;height:200px;border-radius:50%;background:#f0e8d8;'
f'display:flex;align-items:center;justify-content:center;font-size:5rem;'
f'margin:0 auto 24px">&#x1F43E;</div>'
)
subtitle_parts = [p for p in [rasse_str, alter_str] if p]
subtitle = " &middot; ".join(subtitle_parts)
_MONATE = ["Januar","Februar","März","April","Mai","Juni",
"Juli","August","September","Oktober","November","Dezember"]
def _fmt_datum(iso: str) -> str:
try:
d = _date.fromisoformat(iso)
return f"{d.day}. {_MONATE[d.month - 1]} {d.year}"
except Exception:
return iso or ""
def _wetter_chip(wj_str: str) -> str:
if not wj_str:
return ""
try:
wj = _json.loads(wj_str)
temp = wj.get("temp_c") or wj.get("temperature") or wj.get("temp")
if temp is None:
return ""
temp_i = int(float(temp))
emoji = "&#x2600;&#xFE0F;" if temp_i > 20 else ("&#x1F327;&#xFE0F;" if temp_i < 10 else "&#x26C5;")
return f'<span class="chip">{emoji} {temp_i}&deg;C</span>'
except Exception:
return ""
entries_html = ""
for e in rows:
milestone_class = "milestone" if e.get("is_milestone") else ""
datum_fmt = _fmt_datum(e.get("datum") or "")
titel = _esc(e.get("titel") or "")
text_raw = e.get("text") or ""
text = _esc(text_raw).replace("\n", "<br>")
wetter = _wetter_chip(e.get("weather_json") or "")
loc = _esc(e.get("location_name") or "")
cover = e.get("cover_url") or ""
foto_html = ""
if cover:
foto_html = (
f'<div class="entry-photo">'
f'<img src="{_esc(cover)}" alt="" loading="lazy">'
f'</div>'
)
loc_html = f'<span class="chip">&#x1F4CD; {loc}</span>' if loc else ""
chips_html = f'<div class="chips">{wetter}{loc_html}</div>' if (wetter or loc_html) else ""
titel_html = f'<div class="entry-title">{titel}</div>' if titel else ""
text_html = f'<div class="entry-text">{text}</div>' if text_raw else ""
entries_html += f"""
<div class="entry {milestone_class}">
{foto_html}
<div class="entry-date">{datum_fmt}</div>
{titel_html}
{text_html}
{chips_html}
</div>
"""
anzahl = len(rows)
html_page = f"""<!DOCTYPE html>
<html lang="de">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Hunde-Buch &mdash; {dog_name}</title>
<style>
*, *::before, *::after {{ box-sizing: border-box; }}
body {{
font-family: Georgia, 'Times New Roman', serif;
max-width: 800px;
margin: 0 auto;
padding: 0 20px 60px;
color: #2c2c2c;
background: #fff;
line-height: 1.6;
}}
/* Druck-Button */
.print-btn {{
position: fixed;
top: 20px;
right: 20px;
background: #b97c2a;
color: #fff;
border: none;
border-radius: 50px;
padding: 12px 24px;
font-size: 1rem;
font-family: system-ui, sans-serif;
cursor: pointer;
box-shadow: 0 4px 16px rgba(185,124,42,0.35);
z-index: 100;
display: flex;
align-items: center;
gap: 8px;
}}
.print-btn:hover {{ background: #a06820; }}
/* Titelseite */
.cover {{
text-align: center;
padding: 80px 40px 100px;
min-height: 90vh;
display: flex;
flex-direction: column;
align-items: center;
justify-content: center;
}}
.cover h1 {{
font-size: 3em;
color: #b97c2a;
margin: 0 0 12px;
line-height: 1.2;
}}
.cover .subtitle {{
font-size: 1.15em;
color: #666;
margin-bottom: 8px;
}}
.cover .year-label {{
font-size: 1em;
color: #aaa;
margin-bottom: 32px;
}}
.cover .stat-line {{
font-size: 0.95em;
color: #888;
font-family: system-ui, sans-serif;
margin-top: 24px;
}}
/* Eintraege */
.entry {{
padding: 32px 0;
border-bottom: 1px solid #e8e0d0;
break-inside: avoid;
}}
.entry:last-child {{ border-bottom: none; }}
.entry-photo {{ margin-bottom: 20px; }}
.entry-photo img {{
width: 100%;
max-height: 420px;
object-fit: cover;
border-radius: 10px;
display: block;
}}
.entry-date {{
color: #999;
font-size: 0.85em;
font-family: system-ui, sans-serif;
margin-bottom: 6px;
text-transform: uppercase;
letter-spacing: 0.04em;
}}
.entry-title {{
font-size: 1.45em;
font-weight: bold;
color: #1a1a1a;
margin-bottom: 10px;
line-height: 1.3;
}}
.entry-text {{
font-size: 1em;
line-height: 1.75;
color: #333;
}}
/* Meilenstein */
.milestone {{
border-left: 4px solid #b97c2a;
padding-left: 24px;
}}
.milestone .entry-title::before {{
content: "\\2605 ";
color: #b97c2a;
}}
/* Chips */
.chips {{
margin-top: 12px;
display: flex;
flex-wrap: wrap;
gap: 8px;
}}
.chip {{
background: #f5f0e8;
border-radius: 50px;
padding: 4px 12px;
font-size: 0.82em;
font-family: system-ui, sans-serif;
color: #666;
}}
/* Drucken */
@media print {{
.print-btn {{ display: none !important; }}
.cover {{ page-break-after: always; }}
.entry {{ break-inside: avoid; }}
body {{ padding: 0; }}
@page {{ margin: 2cm; }}
}}
</style>
</head>
<body>
<div class="print-btn" style="display:flex;gap:10px;flex-wrap:wrap">
<button onclick="window.history.length>1?window.history.back():window.close()"
style="background:#f0e6d3;color:#7a4a1e;border:none;border-radius:100px;
padding:10px 20px;font-size:0.9rem;cursor:pointer;font-weight:600">
← Zurück zur App
</button>
<button onclick="window.print()"
style="background:#C4843A;color:#fff;border:none;border-radius:100px;
padding:10px 20px;font-size:0.9rem;cursor:pointer;font-weight:600">
&#x1F5A8; Drucken / Als PDF speichern
</button>
</div>
<div class="cover">
{cover_img}
<h1>{dog_name}</h1>
{'<div class="subtitle">' + subtitle + '</div>' if subtitle else ''}
<div class="year-label">{jahr_str}</div>
<div class="stat-line">{anzahl} Eintr&auml;ge</div>
</div>
{entries_html}
</body>
</html>"""
return HTMLResponse(content=html_page)
# ------------------------------------------------------------------
# GET /api/dogs/verstorben — Alle verstorbenen Hunde des Users
# ------------------------------------------------------------------
@router.get("/verstorben")
async def get_verstorbene_hunde(user=Depends(get_current_user)):
with db() as conn:
rows = conn.execute(
"""SELECT id, name, rasse, foto_url, verstorben_am, geburtstag
FROM dogs WHERE user_id=? AND verstorben_am IS NOT NULL
ORDER BY verstorben_am DESC""",
(user["id"],)
).fetchall()
return [dict(r) for r in rows]
@router.get("/{dog_id}")
async def get_dog(dog_id: int, user=Depends(get_current_user)):
with db() as conn:
dog = conn.execute(
"SELECT * FROM dogs WHERE id=? AND user_id=?", (dog_id, user["id"])
).fetchone()
if not dog:
raise HTTPException(404, "Hund nicht gefunden.")
return dict(dog)
@router.patch("/{dog_id}")
async def update_dog(dog_id: int, data: DogUpdate, user=Depends(get_current_user)):
fields = {k: v for k, v in data.model_dump().items() if v is not None}
if not fields:
raise HTTPException(400, "Keine Änderungen angegeben.")
set_clause = ", ".join(f"{k}=?" for k in fields)
values = list(fields.values()) + [dog_id, user["id"]]
with db() as conn:
updated = conn.execute(
f"UPDATE dogs SET {set_clause} WHERE id=? AND user_id=?", values
).rowcount
if not updated:
raise HTTPException(404, "Hund nicht gefunden.")
dog = conn.execute(
"SELECT * FROM dogs WHERE id=? AND user_id=?", (dog_id, user["id"])
).fetchone()
return dict(dog)
@router.delete("/{dog_id}", status_code=204)
async def delete_dog(dog_id: int, user=Depends(get_current_user)):
with db() as conn:
conn.execute(
"DELETE FROM dogs WHERE id=? AND user_id=?", (dog_id, user["id"])
)
@router.post("/{dog_id}/photo")
async def upload_photo(
dog_id: int,
file: UploadFile = File(...),
user=Depends(get_current_user)
):
# Hund gehört dem User? Altes Foto merken für späteres Löschen.
with db() as conn:
dog = conn.execute(
"SELECT id, foto_url FROM dogs WHERE id=? AND user_id=?", (dog_id, user["id"])
).fetchone()
if not dog:
raise HTTPException(404, "Hund nicht gefunden.")
old_foto_url = dog["foto_url"]
# Datei immer als JPEG speichern (HEIC/PNG/WebP → kompatibel für alle Browser)
import io
from PIL import Image
try:
import pillow_heif
pillow_heif.register_heif_opener()
except ImportError:
pass
content = await file.read()
try:
from PIL import ImageOps
img = Image.open(io.BytesIO(content))
img = ImageOps.exif_transpose(img) # EXIF-Orientierung anwenden
img = img.convert("RGB")
buf = io.BytesIO()
img.save(buf, format="JPEG", quality=90)
content = buf.getvalue()
except Exception:
pass # Fallback: Originaldaten speichern
filename = f"dog_{dog_id}_{uuid.uuid4().hex[:8]}.jpg"
path = os.path.join(MEDIA_DIR, "dogs", filename)
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "wb") as f:
f.write(content)
foto_url = f"/media/dogs/{filename}"
with db() as conn:
conn.execute("UPDATE dogs SET foto_url=? WHERE id=?", (foto_url, dog_id))
# Altes Foto von Disk löschen
if old_foto_url:
try:
old_path = safe_media_path(MEDIA_DIR, old_foto_url)
if old_path and os.path.isfile(old_path):
os.remove(old_path)
except Exception:
pass
return {"foto_url": foto_url}
class PhotoPosition(BaseModel):
zoom: float = 1.0
offset_x: float = 0.0
offset_y: float = 0.0
@router.patch("/{dog_id}/photo-position")
async def update_photo_position(dog_id: int, pos: PhotoPosition, user=Depends(get_current_user)):
with db() as conn:
updated = conn.execute(
"UPDATE dogs SET foto_zoom=?, foto_offset_x=?, foto_offset_y=? WHERE id=? AND user_id=?",
(pos.zoom, pos.offset_x, pos.offset_y, dog_id, user["id"])
).rowcount
if not updated:
raise HTTPException(404, "Hund nicht gefunden.")
return {"ok": True}
@router.delete("/{dog_id}/photo", status_code=204)
async def delete_photo(dog_id: int, user=Depends(get_current_user)):
with db() as conn:
row = conn.execute(
"SELECT foto_url FROM dogs WHERE id=? AND user_id=?", (dog_id, user["id"])
).fetchone()
if not row:
raise HTTPException(404, "Hund nicht gefunden.")
if row["foto_url"]:
path = safe_media_path(MEDIA_DIR, row["foto_url"])
if path and os.path.exists(path):
os.remove(path)
with db() as conn:
conn.execute(
"UPDATE dogs SET foto_url=NULL, foto_zoom=1.0, foto_offset_x=0.0, foto_offset_y=0.0 WHERE id=? AND user_id=?",
(dog_id, user["id"])
)
# ------------------------------------------------------------------
# Fähigkeiten / Kommandos (für Profil + öffentliche Seite)
# ------------------------------------------------------------------
def _parse_exercise_name(exercise_id: str) -> str:
"""grundkommandos_Hier__Komm → 'Hier / Komm'"""
parts = exercise_id.split("_", 1)
if len(parts) < 2:
return exercise_id
return parts[1].replace("__", " / ").replace("_", " ")
def _load_skills(conn, dog_id: int, user_id: int) -> list:
"""Gibt Übungen mit Status 'sitzt' oder 'meistens' zurück, die mit diesem Hund trainiert wurden."""
rows = conn.execute(
"""
SELECT ep.exercise_id, ep.status,
(SELECT ts.exercise_name FROM training_sessions ts
WHERE ts.user_id = ep.user_id AND ts.dog_id = ?
AND ts.exercise_id = ep.exercise_id
ORDER BY ts.datum DESC, ts.created_at DESC LIMIT 1) AS exercise_name
FROM exercise_progress ep
WHERE ep.user_id = ?
AND ep.status IN ('sitzt', 'meistens')
AND EXISTS (SELECT 1 FROM training_sessions ts2
WHERE ts2.user_id = ep.user_id AND ts2.dog_id = ?
AND ts2.exercise_id = ep.exercise_id)
ORDER BY ep.status DESC, ep.exercise_id
""",
(dog_id, user_id, dog_id)
).fetchall()
return [
{
"exercise_id": r["exercise_id"],
"exercise_name": r["exercise_name"] or _parse_exercise_name(r["exercise_id"]),
"status": r["status"],
"tab": r["exercise_id"].split("_")[0],
}
for r in rows
]
@router.get("/{dog_id}/skills")
async def get_dog_skills(dog_id: int, user=Depends(get_current_user)):
uid = user["id"]
with db() as conn:
dog = conn.execute(
"SELECT id, user_id FROM dogs WHERE id=? AND (user_id=? OR id IN (SELECT dog_id FROM sitting_subscriptions WHERE sitter_id=? AND valid_until >= date('now')))",
(dog_id, uid, uid)
).fetchone()
if not dog:
raise HTTPException(404, "Hund nicht gefunden.")
return _load_skills(conn, dog_id, dog["user_id"])
# Öffentliches Profil (für NFC-Tag, kein Login nötig)
@router.get("/public/{dog_id}")
async def public_dog_profile(dog_id: int):
with db() as conn:
dog = conn.execute(
"""SELECT d.id, d.name, d.rasse, d.geburtstag, d.foto_url, d.bio,
d.user_id, u.name as besitzer_name
FROM dogs d JOIN users u ON d.user_id=u.id
WHERE d.id=? AND d.is_public=1""",
(dog_id,)
).fetchone()
if not dog:
raise HTTPException(404, "Profil nicht gefunden oder nicht öffentlich.")
skills = _load_skills(conn, dog_id, dog["user_id"])
result = dict(dog)
result.pop("user_id", None)
result["skills"] = skills
return result
class FoundReport(BaseModel):
message: Optional[str] = None
kontakt: Optional[str] = None
# Gefunden-Meldung (kein Login nötig)
@router.post("/public/{dog_id}/found")
async def report_found(dog_id: int, data: FoundReport = FoundReport()):
with db() as conn:
row = conn.execute(
"""SELECT d.id, d.name, d.user_id
FROM dogs d
WHERE d.id=? AND d.is_public=1""",
(dog_id,)
).fetchone()
if not row:
raise HTTPException(404, "Profil nicht gefunden oder nicht öffentlich.")
dog_name = row["name"]
user_id = row["user_id"]
body = data.message.strip() if data.message and data.message.strip() \
else "Jemand hat deinen Hund gefunden. Öffne die App für Details."
if data.kontakt and data.kontakt.strip():
body += f" Kontakt: {data.kontakt.strip()}"
send_push_to_user(user_id, {
"title": f"🐾 {dog_name} wurde gefunden!",
"body": body,
"data": {"page": "diary", "found": True},
"tag": f"found-{dog_id}",
})
return {"ok": True}
# ------------------------------------------------------------------
# GET /api/dogs/{id}/pflege — Pflegetipps für diesen Hund
# ------------------------------------------------------------------
@router.get("/{dog_id}/pflege")
async def get_pflege_tipps(dog_id: int, user=Depends(get_current_user)):
import json as _json
with db() as conn:
dog = conn.execute(
"SELECT id, name, rasse, rasse_id FROM dogs WHERE id=? AND user_id=?",
(dog_id, user["id"])
).fetchone()
if not dog:
raise HTTPException(404, "Hund nicht gefunden.")
# Rassen-Infos für Fell-Typ
rasse_info = None
with db() as conn:
if dog["rasse_id"]:
rasse_info = conn.execute(
"SELECT name, groesse, beschreibung FROM wiki_rassen WHERE id=?",
(dog["rasse_id"],)
).fetchone()
elif dog["rasse"]:
rasse_info = conn.execute(
"SELECT name, groesse, beschreibung FROM wiki_rassen WHERE name LIKE ? LIMIT 1",
(f"%{dog['rasse']}%",)
).fetchone()
# Fell-Typ und Pflegeart ableiten
fell_filter = None
fell_pflege_art_filter = None
if rasse_info:
beschr = (rasse_info["beschreibung"] or "").lower()
if any(w in beschr for w in ["lockig", "wellig", "kraus", "pudel", "doodle"]):
fell_filter = "lockig"
elif any(w in beschr for w in ["langhaar", "seidiges", "fließendes", "langes fell"]):
fell_filter = "lang"
elif any(w in beschr for w in ["kurzhaar", "kurzes fell", "glatthaarig"]):
fell_filter = "kurz"
elif rasse_info["groesse"] in ("gross", "sehr_gross"):
fell_filter = "doppel"
# Pflegeart: Trimmen vs. Schneiden
if any(w in beschr for w in ["trimm", "hand-stripping", "stripping", "rauhhaar", "drahthaar", "rauhaar"]):
fell_pflege_art_filter = "trimmen"
elif any(w in beschr for w in ["schneid", "geschoren", "schere", "clipper"]):
fell_pflege_art_filter = "schneiden"
with db() as conn:
alle_tipps = conn.execute(
"SELECT * FROM pflege_tipps ORDER BY kategorie, titel"
).fetchall()
# Relevante Tipps: kein Fell-Filter oder passend
from datetime import date
heute_saison = {1:"winter",2:"winter",3:"fruehling",4:"fruehling",5:"fruehling",
6:"sommer",7:"sommer",8:"sommer",9:"herbst",10:"herbst",
11:"herbst",12:"winter"}[date.today().month]
result = []
for t in alle_tipps:
t = dict(t)
# Fell-Typ-Filter
if fell_filter and t["fell_typ"] and t["fell_typ"] != "alle":
if fell_filter not in t["fell_typ"].split(","):
continue
# Pflegeart-Filter: Trimm-Tipps nicht bei Schneidehunden und umgekehrt
tipp_art = t.get("fell_pflege_art")
if tipp_art and tipp_art != "alle" and fell_pflege_art_filter:
if tipp_art != fell_pflege_art_filter:
continue
t["schritte"] = _json.loads(t["schritte"] or "[]")
t["saisonal_aktuell"] = bool(t["saison"] and heute_saison in t["saison"])
result.append(t)
# Tipp des Tages: erster aktuell-saisonaler oder zufällig deterministisch
from hashlib import md5
day_hash = int(md5(str(date.today()).encode()).hexdigest(), 16)
saisonal = [t for t in result if t["saisonal_aktuell"]]
tipp_des_tages = (saisonal or result)[day_hash % len(saisonal or result)] if result else None
return {
"dog_name": dog["name"],
"rasse_name": rasse_info["name"] if rasse_info else dog["rasse"],
"tipp_des_tages": tipp_des_tages,
"tipps": result,
"kategorien": list(dict.fromkeys(t["kategorie"] for t in result)),
"fell_pflege_art": fell_pflege_art_filter, # 'schneiden' | 'trimmen' | None
}
# ------------------------------------------------------------------
# LEBENS-TIMELINE
# ------------------------------------------------------------------
@router.get("/{dog_id}/timeline")
async def get_dog_timeline(dog_id: int, user=Depends(get_current_user)):
"""Aggregierte Lebens-Timeline eines Hundes aus allen Datenquellen."""
import json as _json
with db() as conn:
dog = conn.execute(
"SELECT id, name, user_id, geburtstag FROM dogs WHERE id=? AND user_id=?",
(dog_id, user["id"])
).fetchone()
if not dog:
raise HTTPException(404, "Hund nicht gefunden.")
events = []
with db() as conn:
# --- Tagebuch ---
diary_rows = conn.execute(
"""SELECT d.id, d.datum, d.titel, d.typ, d.is_milestone,
dm.url AS foto_url
FROM diary d
LEFT JOIN diary_media dm ON dm.diary_id = d.id AND dm.sort_order = 0
WHERE d.dog_id=?
ORDER BY d.datum ASC, d.id ASC""",
(dog_id,)
).fetchall()
for i, r in enumerate(diary_rows):
events.append({
"datum": r["datum"],
"kategorie": "tagebuch",
"titel": r["titel"] or ("Tagebucheintrag" if r["typ"] == "eintrag" else str(r["typ"]).capitalize()),
"typ": r["typ"],
"is_first": i == 0,
"is_milestone": bool(r["is_milestone"]),
"foto_url": r["foto_url"],
"ref_id": r["id"],
})
# --- Gesundheit ---
health_rows = conn.execute(
"""SELECT id, datum, bezeichnung, typ
FROM health
WHERE dog_id=?
ORDER BY datum ASC, id ASC""",
(dog_id,)
).fetchall()
typ_seen = {}
for r in health_rows:
t = r["typ"]
is_first = t not in typ_seen
if is_first:
typ_seen[t] = True
events.append({
"datum": r["datum"],
"kategorie": "gesundheit",
"titel": r["bezeichnung"],
"typ": t,
"is_first": is_first,
"is_milestone": False,
"foto_url": None,
"ref_id": r["id"],
})
# --- Training-Sessions ---
ts_rows = conn.execute(
"""SELECT id, datum, exercise_name, erfolgsquote, ist_top
FROM training_sessions
WHERE dog_id=? AND user_id=?
ORDER BY datum ASC, id ASC""",
(dog_id, user["id"])
).fetchall()
ts_first = True
ts_best = None
ts_best_score = -1
for r in ts_rows:
if r["erfolgsquote"] is not None and r["erfolgsquote"] > ts_best_score:
ts_best_score = r["erfolgsquote"]
ts_best = r
for i, r in enumerate(ts_rows):
is_first = (i == 0)
is_best = ts_best and r["id"] == ts_best["id"] and i > 0
events.append({
"datum": r["datum"],
"kategorie": "training",
"titel": r["exercise_name"],
"typ": "training",
"is_first": is_first,
"is_milestone": bool(r["ist_top"]) or is_best,
"foto_url": None,
"ref_id": r["id"],
})
# --- Routen (nur Routen wo dieser Hund mitgegangen ist) ---
route_rows = conn.execute(
"""SELECT r.id, r.name, r.distanz_km,
date(r.created_at) AS datum
FROM routes r
JOIN route_dogs rd ON rd.route_id = r.id AND rd.dog_id = ?
WHERE r.user_id = ?
ORDER BY r.created_at ASC""",
(dog_id, user["id"])
).fetchall()
route_first = True
route_longest = None
route_max_km = -1
for r in route_rows:
km = r["distanz_km"] or 0
if km > route_max_km:
route_max_km = km
route_longest = r
for i, r in enumerate(route_rows):
is_first = (i == 0)
is_longest = route_longest and r["id"] == route_longest["id"] and i > 0
events.append({
"datum": r["datum"],
"kategorie": "route",
"titel": r["name"],
"typ": "route",
"is_first": is_first,
"is_milestone": is_longest,
"foto_url": None,
"ref_id": r["id"],
"distanz_km": r["distanz_km"],
})
# Geburtstag des Hundes als erster Eintrag
if dog["geburtstag"]:
events.append({
"datum": dog["geburtstag"],
"kategorie": "meilenstein",
"titel": f"{dog['name']} wird geboren",
"typ": "geburtstag",
"is_first": True,
"is_milestone": True,
"foto_url": None,
"ref_id": None,
})
# Chronologisch sortieren
events.sort(key=lambda e: (e["datum"] or "0000-00-00", e["kategorie"]))
return {
"dog_name": dog["name"],
"geburtstag": dog["geburtstag"],
"events": events,
}
# ------------------------------------------------------------------
# POST /api/dogs/{id}/gedenken — Hund als verstorben markieren
# ------------------------------------------------------------------
class GedenkenData(BaseModel):
verstorben_am: str # YYYY-MM-DD
@router.post("/{dog_id}/gedenken")
async def mark_verstorben(dog_id: int, data: GedenkenData, user=Depends(get_current_user)):
with db() as conn:
updated = conn.execute(
"UPDATE dogs SET verstorben_am=? WHERE id=? AND user_id=?",
(data.verstorben_am, dog_id, user["id"])
).rowcount
if not updated:
raise HTTPException(404, "Hund nicht gefunden.")
return {"ok": True}
# ------------------------------------------------------------------
# GET /api/dogs/{id}/gedenkseite — Memorial-Daten
# ------------------------------------------------------------------
@router.get("/{dog_id}/gedenkseite")
async def get_gedenkseite(dog_id: int, user=Depends(get_current_user)):
with db() as conn:
dog = conn.execute(
"SELECT * FROM dogs WHERE id=? AND user_id=?", (dog_id, user["id"])
).fetchone()
if not dog:
raise HTTPException(404)
dog = dict(dog)
# Statistiken
km_total = conn.execute(
"SELECT COALESCE(ROUND(SUM(distanz_km),1),0) AS km FROM routes r "
"JOIN route_dogs rd ON rd.route_id=r.id WHERE rd.dog_id=?", (dog_id,)
).fetchone()["km"]
diary_count = conn.execute(
"SELECT COUNT(*) FROM diary WHERE dog_id=?", (dog_id,)
).fetchone()[0]
media_count = conn.execute(
"SELECT COUNT(*) FROM diary_media dm JOIN diary d ON d.id=dm.diary_id "
"WHERE d.dog_id=? AND dm.media_type='image'", (dog_id,)
).fetchone()[0]
training_count = conn.execute(
"SELECT COUNT(*) FROM training_sessions WHERE dog_id=?", (dog_id,)
).fetchone()[0]
# Letzter Tagebucheintrag
last_entry = conn.execute(
"SELECT titel, datum FROM diary WHERE dog_id=? ORDER BY datum DESC LIMIT 1",
(dog_id,)
).fetchone()
# Erste und letzte Aufnahme
first_entry = conn.execute(
"SELECT datum FROM diary WHERE dog_id=? ORDER BY datum ASC LIMIT 1",
(dog_id,)
).fetchone()
# Letzte 6 Fotos für Galerie
photos = conn.execute(
"""SELECT dm.url FROM diary_media dm
JOIN diary d ON d.id=dm.diary_id
WHERE d.dog_id=? AND dm.media_type='image'
AND dm.img_width IS NOT NULL AND dm.img_width > dm.img_height
ORDER BY d.datum DESC, dm.id DESC LIMIT 6""",
(dog_id,)
).fetchall()
if not photos:
photos = conn.execute(
"""SELECT dm.url FROM diary_media dm
JOIN diary d ON d.id=dm.diary_id
WHERE d.dog_id=? AND dm.media_type='image'
ORDER BY d.datum DESC, dm.id DESC LIMIT 6""",
(dog_id,)
).fetchall()
# Gemeinsame Zeit berechnen
joined = dog.get("geburtstag") or (first_entry["datum"] if first_entry else None)
passed = dog.get("verstorben_am")
gemeinsam_tage = None
if joined and passed:
try:
from datetime import date as _date
d1 = _date.fromisoformat(joined)
d2 = _date.fromisoformat(passed)
gemeinsam_tage = (d2 - d1).days
except Exception:
pass
return {
"dog": dog,
"km_total": km_total,
"diary_count": diary_count,
"media_count": media_count,
"training_count": training_count,
"last_entry": dict(last_entry) if last_entry else None,
"gemeinsam_tage": gemeinsam_tage,
"photos": [r["url"] for r in photos],
}