banyaro/backend/routes/webcal.py

381 lines
15 KiB
Python

"""
BAN YARO — WebCal / iCal-Feed
Kalender-Abo für Gesundheits-Erinnerungen, Events, Gassi-Treffen und Sittings.
Endpunkte:
GET /api/webcal/token → liefert (oder erzeugt) den persönlichen Kalender-Token
GET /api/webcal/{token}.ics → iCal-Feed (kein Auth, Token ist das Geheimnis)
"""
import secrets
from datetime import datetime, date, timedelta, timezone
from fastapi import APIRouter, HTTPException
from fastapi.responses import Response
from auth import get_current_user
from database import db
from fastapi import Depends
router = APIRouter()
# ------------------------------------------------------------------
# Hilfsfunktionen — RFC-5545-konformes iCal
# ------------------------------------------------------------------
def _esc(text: str) -> str:
"""Escape-Sonderzeichen für iCal-Textwerte."""
if not text:
return ''
return (text
.replace('\\', '\\\\')
.replace(',', '\\,')
.replace(';', '\\;')
.replace('\n', '\\n')
.replace('\r', ''))
def _fold(line: str) -> str:
"""Zeilenumbruch nach max. 75 Bytes (RFC 5545 §3.1)."""
encoded = line.encode('utf-8')
if len(encoded) <= 75:
return line
parts = []
while True:
chunk = line.encode('utf-8')
if len(chunk) <= 75:
parts.append(line)
break
# Schneide bei max. 75 Bytes ab, ohne Multibyte-Zeichen zu zerreißen
i = 75
while len(line[:i].encode('utf-8')) > 75:
i -= 1
parts.append(line[:i])
line = ' ' + line[i:]
return '\r\n'.join(parts)
def _date_str(d: str) -> str:
"""YYYY-MM-DD → YYYYMMDD"""
return d.replace('-', '')
def _datetime_str(d: str, t: str | None = None) -> str:
"""Datum + optionale Uhrzeit (HH:MM) → iCal-Datetime (lokale Zeit, kein Z)."""
base = _date_str(d)
if t:
hhmm = t.replace(':', '')[:4]
return f"{base}T{hhmm}00"
return f"{base}T000000"
def _uid(prefix: str, entry_id: int) -> str:
return f"{prefix}-{entry_id}@banyaro.app"
def _vevent(**kwargs) -> str:
"""Baut einen VEVENT-Block. Bekannte kwargs: uid, dtstart, dtend, summary, description, location, rrule."""
lines = ['BEGIN:VEVENT']
now = datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%SZ')
lines.append(f'DTSTAMP:{now}')
lines.append(f'UID:{kwargs["uid"]}')
lines.append(f'DTSTART{kwargs["dtstart"]}')
lines.append(f'DTEND{kwargs["dtend"]}')
lines.append(_fold(f'SUMMARY:{_esc(kwargs.get("summary", ""))}'))
if kwargs.get('description'):
lines.append(_fold(f'DESCRIPTION:{_esc(kwargs["description"])}'))
if kwargs.get('location'):
lines.append(_fold(f'LOCATION:{_esc(kwargs["location"])}'))
if kwargs.get('rrule'):
lines.append(f'RRULE:{kwargs["rrule"]}')
lines.append('END:VEVENT')
return '\r\n'.join(lines)
# ------------------------------------------------------------------
# Token erzeugen / abrufen
# ------------------------------------------------------------------
@router.get("/token")
async def get_calendar_token(user=Depends(get_current_user)):
"""Liefert den persönlichen Kalender-Token; erzeugt ihn beim ersten Aufruf."""
with db() as conn:
row = conn.execute(
"SELECT calendar_token FROM users WHERE id=?", (user["id"],)
).fetchone()
token = row["calendar_token"] if row else None
if not token:
token = secrets.token_urlsafe(32)
conn.execute(
"UPDATE users SET calendar_token=? WHERE id=?",
(token, user["id"])
)
return {"token": token}
@router.delete("/token")
async def reset_calendar_token(user=Depends(get_current_user)):
"""Setzt den Token zurück (invalidiert alle laufenden Abos)."""
token = secrets.token_urlsafe(32)
with db() as conn:
conn.execute(
"UPDATE users SET calendar_token=? WHERE id=?",
(token, user["id"])
)
return {"token": token}
# ------------------------------------------------------------------
# iCal-Feed
# ------------------------------------------------------------------
@router.get("/{token}.ics")
async def ical_feed(token: str):
"""
Öffentlicher iCal-Feed — kein Session-Cookie nötig.
Der Token dient als Authentifizierung.
"""
with db() as conn:
user_row = conn.execute(
"SELECT id FROM users WHERE calendar_token=?", (token,)
).fetchone()
if not user_row:
raise HTTPException(404, "Kalender nicht gefunden.")
user_id = user_row["id"]
dogs = conn.execute(
"SELECT id, name FROM dogs WHERE user_id=?", (user_id,)
).fetchall()
dog_map = {d["id"]: d["name"] for d in dogs}
dog_ids = list(dog_map.keys())
vevents = []
# ── 1. Health-Erinnerungen (Nächster Termin) ──────────────
if dog_ids:
ph = ','.join('?' * len(dog_ids))
health_rows = conn.execute(
f"""SELECT h.id, h.typ, h.bezeichnung, h.naechstes,
h.notiz, h.intervall_tage, h.dog_id
FROM health h
WHERE h.dog_id IN ({ph})
AND h.naechstes IS NOT NULL
AND h.typ IN ('impfung','entwurmung','medikament','laeufigkeit')
ORDER BY h.naechstes""",
dog_ids
).fetchall()
TYP_LABEL = {
'impfung': 'Impfung',
'entwurmung': 'Entwurmung',
'medikament': 'Medikament',
'laeufigkeit': 'Läufigkeit',
}
for e in health_rows:
dog_name = dog_map.get(e["dog_id"], "Hund")
typ_label = TYP_LABEL.get(e["typ"], e["typ"])
summary = f"{typ_label}: {e['bezeichnung']} ({dog_name})"
rrule = None
if e["intervall_tage"]:
rrule = f"FREQ=DAILY;INTERVAL={e['intervall_tage']}"
dtend_date = (date.fromisoformat(e["naechstes"]) + timedelta(days=1)).isoformat()
vevents.append(_vevent(
uid = _uid('health', e["id"]),
dtstart = f";VALUE=DATE:{_date_str(e['naechstes'])}",
dtend = f";VALUE=DATE:{_date_str(dtend_date)}",
summary = summary,
description = e["notiz"] or '',
rrule = rrule,
))
# ── 1b. Tierarzt-Termine ───────────────────────────────────
if dog_ids:
ph = ','.join('?' * len(dog_ids))
tierarzt_rows = conn.execute(
f"""SELECT h.id, h.bezeichnung, h.datum, h.notiz,
h.tierarzt_name, h.dog_id
FROM health h
WHERE h.dog_id IN ({ph})
AND h.typ = 'tierarzt'
AND h.datum >= date('now')
ORDER BY h.datum""",
dog_ids
).fetchall()
for e in tierarzt_rows:
dog_name = dog_map.get(e["dog_id"], "Hund")
desc_parts = []
if e["tierarzt_name"]:
desc_parts.append(f"Praxis: {e['tierarzt_name']}")
if e["notiz"]:
desc_parts.append(e["notiz"])
dtend_date = (date.fromisoformat(e["datum"]) + timedelta(days=1)).isoformat()
vevents.append(_vevent(
uid = _uid('tierarzt', e["id"]),
dtstart = f";VALUE=DATE:{_date_str(e['datum'])}",
dtend = f";VALUE=DATE:{_date_str(dtend_date)}",
summary = f"Tierarzt: {e['bezeichnung']} ({dog_name})",
description = '\n'.join(desc_parts),
))
# ── 1c. Medikamente mit Enddatum ──────────────────────────
if dog_ids:
ph = ','.join('?' * len(dog_ids))
medi_rows = conn.execute(
f"""SELECT h.id, h.bezeichnung, h.bis_datum, h.notiz,
h.dosierung, h.dog_id
FROM health h
WHERE h.dog_id IN ({ph})
AND h.typ = 'medikament'
AND h.bis_datum IS NOT NULL
AND h.bis_datum >= date('now')
ORDER BY h.bis_datum""",
dog_ids
).fetchall()
for e in medi_rows:
dog_name = dog_map.get(e["dog_id"], "Hund")
desc_parts = []
if e["dosierung"]:
desc_parts.append(f"Dosierung: {e['dosierung']}")
if e["notiz"]:
desc_parts.append(e["notiz"])
dtend_date = (date.fromisoformat(e["bis_datum"]) + timedelta(days=1)).isoformat()
vevents.append(_vevent(
uid = _uid('medi-end', e["id"]),
dtstart = f";VALUE=DATE:{_date_str(e['bis_datum'])}",
dtend = f";VALUE=DATE:{_date_str(dtend_date)}",
summary = f"Medikament Ende: {e['bezeichnung']} ({dog_name})",
description = '\n'.join(desc_parts),
))
# ── 1d. Hunde-Geburtstage (jährlich wiederkehrend) ────────
if dog_ids:
ph = ','.join('?' * len(dog_ids))
bday_rows = conn.execute(
f"""SELECT id, name, geburtstag
FROM dogs
WHERE id IN ({ph})
AND geburtstag IS NOT NULL""",
dog_ids
).fetchall()
for d in bday_rows:
try:
bday = date.fromisoformat(d["geburtstag"])
except ValueError:
continue
# Nächsten Geburtstag im laufenden oder nächsten Jahr
today = date.today()
next_bday = bday.replace(year=today.year)
if next_bday < today:
next_bday = bday.replace(year=today.year + 1)
dtend_date = (next_bday + timedelta(days=1)).isoformat()
age = next_bday.year - bday.year
vevents.append(_vevent(
uid = _uid('bday', d["id"]),
dtstart = f";VALUE=DATE:{_date_str(next_bday.isoformat())}",
dtend = f";VALUE=DATE:{_date_str(dtend_date)}",
summary = f"Geburtstag: {d['name']} ({age}. {'Geburtstag' if age == 1 else 'Geburtstag'})",
rrule = f"FREQ=YEARLY",
))
# ── 2. Events (zukünftige) ─────────────────────────────────
event_rows = conn.execute(
"""SELECT id, titel, datum, uhrzeit, ort_name, beschreibung
FROM events
WHERE user_id=? AND datum >= date('now')
ORDER BY datum, uhrzeit""",
(user_id,)
).fetchall()
for e in event_rows:
if e["uhrzeit"]:
dtstart = f":{_datetime_str(e['datum'], e['uhrzeit'])}"
dtend = f":{_datetime_str(e['datum'], e['uhrzeit'])}"
else:
next_day = (date.fromisoformat(e["datum"]) + timedelta(days=1)).isoformat()
dtstart = f";VALUE=DATE:{_date_str(e['datum'])}"
dtend = f";VALUE=DATE:{_date_str(next_day)}"
vevents.append(_vevent(
uid = _uid('event', e["id"]),
dtstart = dtstart,
dtend = dtend,
summary = e["titel"],
location = e["ort_name"] or '',
description = e["beschreibung"] or '',
))
# ── 3. Gassi-Treffen (erstellt oder beigetreten) ───────────
walk_rows = conn.execute(
"""SELECT DISTINCT w.id, w.titel, w.datum, w.uhrzeit, w.ort_name
FROM walks w
LEFT JOIN walk_participants wp
ON wp.walk_id = w.id AND wp.user_id = ?
WHERE (w.user_id = ? OR wp.user_id = ?)
AND w.datum >= date('now')
ORDER BY w.datum, w.uhrzeit""",
(user_id, user_id, user_id)
).fetchall()
for w in walk_rows:
if w["uhrzeit"]:
dtstart = f":{_datetime_str(w['datum'], w['uhrzeit'])}"
dtend = f":{_datetime_str(w['datum'], w['uhrzeit'])}"
else:
next_day = (date.fromisoformat(w["datum"]) + timedelta(days=1)).isoformat()
dtstart = f";VALUE=DATE:{_date_str(w['datum'])}"
dtend = f";VALUE=DATE:{_date_str(next_day)}"
vevents.append(_vevent(
uid = _uid('walk', w["id"]),
dtstart = dtstart,
dtend = dtend,
summary = f"Gassi-Treffen: {w['titel']}",
location = w["ort_name"] or '',
))
# ── 4. Angenommene Sittings ────────────────────────────────
sitting_rows = conn.execute(
"""SELECT sr.id, sr.von, sr.bis, u.name AS sitter_name
FROM sitting_requests sr
JOIN sitters s ON s.id = sr.sitter_id
JOIN users u ON u.id = s.user_id
WHERE sr.user_id = ?
AND sr.status = 'angenommen'
AND sr.bis >= date('now')
ORDER BY sr.von""",
(user_id,)
).fetchall()
for s in sitting_rows:
dtend_date = (date.fromisoformat(s["bis"]) + timedelta(days=1)).isoformat()
vevents.append(_vevent(
uid = _uid('sitting', s["id"]),
dtstart = f";VALUE=DATE:{_date_str(s['von'])}",
dtend = f";VALUE=DATE:{_date_str(dtend_date)}",
summary = f"Hundesitting bei {s['sitter_name'] or 'Sitter'}",
))
# ── Kalender zusammenbauen ─────────────────────────────────────
body = '\r\n'.join(vevents)
cal = (
"BEGIN:VCALENDAR\r\n"
"VERSION:2.0\r\n"
"PRODID:-//Ban Yaro//DE\r\n"
"CALSCALE:GREGORIAN\r\n"
"METHOD:PUBLISH\r\n"
"X-WR-CALNAME:Ban Yaro\r\n"
"X-WR-CALDESC:Impfungen\\, Events & Treffen aus Ban Yaro\r\n"
"X-WR-TIMEZONE:Europe/Berlin\r\n"
+ (body + "\r\n" if body else "")
+ "END:VCALENDAR\r\n"
)
return Response(
content = cal.encode('utf-8'),
media_type = "text/calendar; charset=utf-8",
headers = {"Content-Disposition": 'inline; filename="banyaro.ics"'},
)