""" BAN YARO — FastAPI Hauptanwendung """ import os import html import logging from collections import deque import httpx from fastapi import FastAPI, Request from fastapi.staticfiles import StaticFiles from fastapi.responses import FileResponse, JSONResponse, Response from starlette.middleware.base import BaseHTTPMiddleware from fastapi.middleware.gzip import GZipMiddleware from brotli_asgi import BrotliMiddleware from contextlib import asynccontextmanager from database import init_db import ki import scheduler as sched # In-Memory Log-Buffer (letzte 500 Zeilen) log_buffer: deque = deque(maxlen=500) class _BufferHandler(logging.Handler): _fmt = logging.Formatter() def emit(self, record): log_buffer.append({ 't': self._fmt.formatTime(record, '%H:%M:%S'), 'l': record.levelname, 'm': record.getMessage(), 'n': record.name, }) logging.basicConfig( level = logging.INFO, format = "%(asctime)s [%(levelname)s] %(name)s: %(message)s", ) logging.getLogger().addHandler(_BufferHandler()) logger = logging.getLogger(__name__) # ------------------------------------------------------------------ # Startup / Shutdown # ------------------------------------------------------------------ def _backfill_image_sizes(): """Füllt img_width/img_height für alle diary_media-Bilder ohne Maße nach.""" import io from database import db from media_utils import get_image_size MEDIA_DIR = os.getenv("MEDIA_DIR", "/data/media") with db() as conn: rows = conn.execute( "SELECT id, url FROM diary_media WHERE media_type='image' AND img_width IS NULL" ).fetchall() if not rows: return logger.info("Backfill Bildmaße: %d Einträge...", len(rows)) updated = 0 for row in rows: # url ist z.B. /media/diary/xxx.jpg → Pfad: MEDIA_DIR/diary/xxx.jpg rel = row["url"].removeprefix("/media/") path = os.path.join(MEDIA_DIR, rel) try: with open(path, "rb") as f: data = f.read() size = get_image_size(data) if size: with db() as conn: conn.execute( "UPDATE diary_media SET img_width=?, img_height=? WHERE id=?", (size[0], size[1], row["id"]) ) updated += 1 except Exception: pass logger.info("Backfill Bildmaße abgeschlossen: %d/%d aktualisiert.", updated, len(rows)) @asynccontextmanager async def lifespan(app: FastAPI): logger.info("Ban Yaro startet...") init_db() _backfill_image_sizes() from routes.movies import seed_movies seed_movies() logger.info(f"KI-Modus: {ki.KI_MODE}") sched.start() yield sched.stop() logger.info("Ban Yaro beendet.") # ------------------------------------------------------------------ # App # ------------------------------------------------------------------ app = FastAPI( title = "Ban Yaro API", version = "0.1.0", lifespan = lifespan, docs_url = "/api/docs" if os.getenv("ENV") != "production" else None, redoc_url = None, ) class SecurityHeadersMiddleware(BaseHTTPMiddleware): async def dispatch(self, request: Request, call_next): response = await call_next(request) response.headers["X-Content-Type-Options"] = "nosniff" response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin" response.headers["Permissions-Policy"] = "camera=(), microphone=(self), geolocation=(self)" response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains" response.headers["Content-Security-Policy"] = ( "default-src 'self'; " "script-src 'self' https://umami.motocamp.de; " # ohne unsafe-inline/eval — alle Inline-Scripts extrahiert "worker-src 'self' blob:; " # 'self' = Service Worker (sw.js); blob: = MapLibre-GL-Worker "style-src 'self' 'unsafe-inline'; " # Inline-Styles bleiben (zu viele Fundstellen für jetzt) "img-src 'self' data: blob: https:; " "media-src 'self' blob:; " # Audio/Video-Wiedergabe + lokale blob:-Vorschau (Sprachnotizen) "connect-src 'self' https:; " "frame-ancestors 'none'; " "base-uri 'self'; " "form-action 'self';" ) return response app.add_middleware(SecurityHeadersMiddleware) class _SlidingTokenRefreshMiddleware(BaseHTTPMiddleware): """Sliding-Session: wenn get_current_user ein neues Token vorbereitet hat (request.state.refresh_token), schreiben wir es als HttpOnly-Cookie zurück. So bleibt der User eingeloggt solange er aktiv ist, ohne langlaufendes Token.""" async def dispatch(self, request: Request, call_next): response = await call_next(request) new_token = getattr(request.state, "refresh_token", None) if new_token: from auth import JWT_EXPIRY as _JWT_EXPIRY response.set_cookie( key="by_token", value=new_token, httponly=True, secure=True, samesite="lax", max_age=_JWT_EXPIRY * 24 * 3600, ) return response app.add_middleware(_SlidingTokenRefreshMiddleware) # Globales File-Upload-Limit (20 MB) _MAX_UPLOAD_BYTES = 20 * 1024 * 1024 class _UploadSizeMiddleware(BaseHTTPMiddleware): async def dispatch(self, request: Request, call_next): if request.method in ("POST", "PUT", "PATCH"): cl = request.headers.get("content-length") if cl and int(cl) > _MAX_UPLOAD_BYTES: return JSONResponse( status_code=413, content={"detail": f"Datei zu groß (max. {_MAX_UPLOAD_BYTES // 1024 // 1024} MB)."} ) return await call_next(request) app.add_middleware(_UploadSizeMiddleware) class _AppVersionMiddleware(BaseHTTPMiddleware): """Fügt X-App-Version zu allen /api/-Antworten hinzu. api.js erkennt damit sofort wenn eine neue Version deployed wurde und lädt beim nächsten Seitenwechsel automatisch neu — kein Banner nötig. """ async def dispatch(self, request: Request, call_next): response = await call_next(request) if request.url.path.startswith('/api/'): response.headers['X-App-Version'] = APP_VER return response app.add_middleware(_AppVersionMiddleware) class _CacheControlMiddleware(BaseHTTPMiddleware): """Setzt Cache-Control-Header für statische Assets. JS/CSS: immer no-cache — SW übernimmt Caching. Immutable wäre gefährlich, weil Browser-HTTP-Cache nach force-update nicht geleert wird und veraltete app.js mit falschem APP_VER eine Update-Dauerschleife verursacht. """ async def dispatch(self, request: Request, call_next): response = await call_next(request) path = request.url.path if path.startswith(("/css/", "/js/", "/icons/phosphor.svg")): response.headers["Cache-Control"] = "no-cache, must-revalidate" return response app.add_middleware(_CacheControlMiddleware) class MediaCacheMiddleware(BaseHTTPMiddleware): """Setzt aggressive Cache-Header für /media/-Requests. UUID-basierte Dateinamen ändern sich nie → immutable caching. """ async def dispatch(self, request: Request, call_next): response = await call_next(request) if request.url.path.startswith('/media/'): if os.getenv('STAGING') == 'true': response.headers['Cache-Control'] = 'no-cache' else: response.headers['Cache-Control'] = 'public, max-age=31536000, immutable' return response app.add_middleware(MediaCacheMiddleware) app.add_middleware(BrotliMiddleware, minimum_size=1000, quality=4) app.add_middleware(GZipMiddleware, minimum_size=1000) # ------------------------------------------------------------------ # API-Router registrieren (werden nach und nach hinzugefügt) # ------------------------------------------------------------------ from routes.auth import router as auth_router from routes.dogs import router as dogs_router from routes.diary import router as diary_router from routes.health import router as health_router from routes.poison import router as poison_router from routes.push import router as push_router from routes.ki import router as ki_router from routes.tieraerzte import router as tieraerzte_router from routes.places import router as places_router from routes.routen import router as routen_router from routes.walks import router as walks_router from routes.events import router as events_router from routes.sitting import router as sitting_router from routes.osm import router as osm_router from routes.osm_auth import router as osm_auth_router from routes.osm_contrib import router as osm_contrib_router from routes.forum import router as forum_router from routes.lost import router as lost_router from routes.knigge import router as knigge_router from routes.wiki import router as wiki_router from routes.movies import router as movies_router from routes.friends import router as friends_router from routes.chat import router as chat_router from routes.media import router as media_router from routes.admin import router as admin_router from routes.webcal import router as webcal_router from routes.profile import router as profile_router from routes.import_data import router as import_router from routes.sharing import dog_router as sharing_dog_router, share_router as sharing_share_router from routes.widget import router as widget_router from routes.notifications import router as notifications_router from routes.alerts import router as alerts_router from routes.services import router as services_router from routes.ratings import router as ratings_router from routes.sitting_access import router as sitting_access_router from routes.stats import router as stats_router from routes.achievements import router as achievements_router from routes.training import router as training_router from routes.praise import router as praise_router from routes.weather import router as weather_router from routes.social import router as social_router from routes.moderation import router as moderation_router from routes.notes import router as notes_router from routes.breeder import router as breeder_router from routes.litters import router as litters_router from routes.laeufi import router as laeufi_router from routes.breeder_photos import router as breeder_photos_router from routes.zucht_hunde import router as zucht_hunde_router from routes.breeder_export import router as breeder_export_router from routes.zucht_ki import router as zucht_ki_router from routes.partner import router as partner_router from routes.outreach import router as outreach_router from routes.jobs import router as jobs_router from routes.streak import router as streak_router from routes.expenses import router as expenses_router from routes.recalls import router as recalls_router from routes.adoption import router as adoption_router from routes.health_docs import router as health_docs_router from routes.passport import router as passport_router from routes.playdate import router as playdate_router from routes.ernaehrung import router as ernaehrung_router from routes.challenges import router as challenges_router from routes.gassi_zeiten import router as gassi_zeiten_router from routes.help import router as help_router from routes.feedback import router as feedback_router from routes.contact import router as contact_router from routes.invoices import router as invoices_router app.include_router(auth_router, prefix="/api/auth", tags=["Auth"]) app.include_router(dogs_router, prefix="/api/dogs", tags=["Hunde"]) app.include_router(diary_router, prefix="/api/dogs", tags=["Tagebuch"]) app.include_router(health_router, prefix="/api/dogs", tags=["Gesundheit"]) app.include_router(poison_router, prefix="/api/poison", tags=["Giftköder"]) app.include_router(push_router, prefix="/api/push", tags=["Push"]) app.include_router(ki_router, prefix="/api/ki", tags=["KI"]) app.include_router(tieraerzte_router, prefix="/api/tieraerzte", tags=["Tierärzte"]) app.include_router(places_router, prefix="/api/places", tags=["Orte"]) app.include_router(routen_router, prefix="/api/routes", tags=["Routen"]) app.include_router(walks_router, prefix="/api/walks", tags=["Gassi-Treffen"]) app.include_router(events_router, prefix="/api/events", tags=["Events"]) app.include_router(sitting_router, prefix="/api/sitting", tags=["Sitting"]) app.include_router(osm_router, prefix="/api/osm", tags=["OSM"]) app.include_router(osm_auth_router, prefix="/api/osm-auth", tags=["OSM-Auth"]) app.include_router(osm_contrib_router, prefix="/api/osm-contrib", tags=["OSM-Beiträge"]) app.include_router(weather_router, prefix="/api/weather", tags=["Wetter"]) app.include_router(social_router, prefix="/api/social", tags=["Social"]) app.include_router(forum_router, prefix="/api/forum", tags=["Forum"]) app.include_router(lost_router, prefix="/api/lost", tags=["Verlorener Hund"]) app.include_router(knigge_router, prefix="/api/knigge", tags=["Knigge"]) app.include_router(wiki_router, prefix="/api/wiki", tags=["Wiki"]) app.include_router(movies_router, prefix="/api/movies", tags=["Filme"]) app.include_router(friends_router, prefix="/api/friends", tags=["Freunde"]) app.include_router(chat_router, prefix="/api/chat", tags=["Chat"]) app.include_router(media_router, prefix="/api/media", tags=["Medien"]) app.include_router(admin_router, prefix="/api/admin", tags=["Admin"]) app.include_router(breeder_router, prefix="/api", tags=["Züchter"]) app.include_router(litters_router, prefix="/api", tags=["Würfe"]) app.include_router(laeufi_router, prefix="/api", tags=["Läufigkeit"]) app.include_router(breeder_photos_router, prefix="/api", tags=["Züchter-Fotos"]) app.include_router(zucht_hunde_router, prefix="/api", tags=["Zuchtkartei"]) app.include_router(breeder_export_router, prefix="/api", tags=["Export"]) app.include_router(zucht_ki_router, prefix="/api", tags=["Züchter-KI"]) app.include_router(partner_router, prefix="/api", tags=["Partner"]) app.include_router(outreach_router, prefix="/api/outreach", tags=["Outreach"]) app.include_router(jobs_router, prefix="/api/jobs", tags=["Jobs"]) app.include_router(webcal_router, prefix="/api/webcal", tags=["WebCal"]) app.include_router(profile_router, prefix="/api/profile", tags=["Profil"]) app.include_router(import_router, prefix="/api/import", tags=["Import"]) app.include_router(sharing_dog_router, prefix="/api/dogs", tags=["Teilen"]) app.include_router(sharing_share_router, prefix="/api/share", tags=["Teilen"]) app.include_router(widget_router, prefix="/api/widget", tags=["Widget"]) app.include_router(notifications_router, prefix="/api/notifications", tags=["Notifications"]) app.include_router(alerts_router, prefix="/api/alerts", tags=["Alerts"]) app.include_router(services_router, prefix="/api/services", tags=["Services"]) app.include_router(ratings_router, prefix="/api/ratings", tags=["Ratings"]) app.include_router(sitting_access_router, prefix="/api/sitting-access", tags=["SittingAccess"]) app.include_router(stats_router, prefix="/api/stats", tags=["Stats"]) app.include_router(achievements_router, prefix="/api/achievements", tags=["Achievements"]) app.include_router(training_router, prefix="/api/training", tags=["Training"]) app.include_router(praise_router, prefix="/api/praise", tags=["Praise"]) app.include_router(moderation_router, prefix="/api/moderation", tags=["Moderation"]) app.include_router(notes_router, prefix="/api/notes", tags=["Notes"]) app.include_router(streak_router, prefix="/api", tags=["Streak"]) app.include_router(expenses_router, prefix="/api/expenses", tags=["Ausgaben"]) app.include_router(recalls_router, prefix="/api/recalls", tags=["Rückrufe"]) app.include_router(adoption_router, prefix="/api/adoption", tags=["Adoption"]) app.include_router(health_docs_router, prefix="/api/health-docs", tags=["Gesundheitsdokumente"]) app.include_router(passport_router, prefix="/api/passport", tags=["Hundepass"]) app.include_router(playdate_router, prefix="/api/playdate", tags=["Playdate"]) app.include_router(ernaehrung_router, prefix="/api/dogs", tags=["Ernährung"]) app.include_router(challenges_router, prefix="/api/challenges", tags=["Foto-Challenge"]) app.include_router(gassi_zeiten_router, prefix="/api/gassi-zeiten", tags=["Gassi-Zeiten"]) app.include_router(help_router, prefix="/api/help", tags=["Hilfe/FAQ"]) app.include_router(feedback_router, prefix="/api/feedback", tags=["Feedback"]) app.include_router(contact_router, prefix="/api/contact", tags=["Kontakt"]) app.include_router(invoices_router) # ------------------------------------------------------------------ # Fehlerbehandlung — einheitliches JSON-Format # ------------------------------------------------------------------ @app.exception_handler(Exception) async def global_exception_handler(request: Request, exc: Exception): logger.error(f"Unbehandelter Fehler: {exc}", exc_info=True) return JSONResponse( status_code=500, content={"detail": "Interner Serverfehler."} ) # ------------------------------------------------------------------ # Statische Dateien + SPA-Fallback # ------------------------------------------------------------------ STATIC_DIR = os.path.join(os.path.dirname(__file__), "static") app.mount("/css", StaticFiles(directory=f"{STATIC_DIR}/css"), name="css") app.mount("/js", StaticFiles(directory=f"{STATIC_DIR}/js"), name="js") app.mount("/icons", StaticFiles(directory=f"{STATIC_DIR}/icons"), name="icons") app.mount("/img", StaticFiles(directory=f"{STATIC_DIR}/img"), name="img") app.mount("/sounds", StaticFiles(directory=f"{STATIC_DIR}/sounds"), name="sounds") # Yaro-Navi-Sounds os.makedirs(f"{STATIC_DIR}/downloads", exist_ok=True) # Album-ZIPs (vom Build-Skript erzeugt) app.mount("/downloads", StaticFiles(directory=f"{STATIC_DIR}/downloads"), name="downloads") # Selbst-gehostete Vektor-Tiles (.pmtiles) — liegen im data-Volume, NICHT im Image. # WICHTIG: Starlettes StaticFiles/FileResponse liefert hinter unserer BaseHTTPMiddleware # KEINE Range-Requests (206) — app-weit kommt nur 200 ohne Accept-Ranges zurück. # MapLibre/pmtiles BRAUCHT aber Byte-Ranges (liest einzelne Tiles aus dem Single-File). # Daher eine eigene Route, die 206 als normales Response (Byte-Slice) zurückgibt — das # überlebt die Middleware. Für Produktion/Skalierung gehört das hinter nginx/NPM direkt # (Range nativ, keine App-CPU) — siehe docs/TILE_SERVER_HANDOVER.md, Entscheidung #2. _TILES_DIR = os.getenv("TILES_DIR", "/data/tiles") # Glyphs (Font-PBFs) für MapLibre-Labels — kleine Static-Files (kein Range nötig), # liegen im data-Volume unter tiles/fonts/{fontstack}/{range}.pbf. _FONTS_DIR = os.getenv("FONTS_DIR", os.path.join(_TILES_DIR, "fonts")) if os.path.isdir(_FONTS_DIR): app.mount("/fonts", StaticFiles(directory=_FONTS_DIR), name="fonts") # DWD-Regenvorhersage (tools/dwd-radar, Cron alle 5 Min): run-/rv_*.pmtiles + manifest.json. _RADAR_DIR = os.getenv("RADAR_DIR", "/data/radar") @app.get("/radar/manifest.json") async def radar_manifest(): path = os.path.join(_RADAR_DIR, "manifest.json") if not os.path.isfile(path): return Response(status_code=404) # Immer frisch: der Inhalt wechselt alle 5 Min bei GLEICHER URL return FileResponse(path, media_type="application/json", headers={"Cache-Control": "no-store"}) @app.api_route("/radar/{run}/{filename}", methods=["GET", "HEAD"]) async def serve_radar(run: str, filename: str, request: Request): # Kein Path-Traversal; Run-Verzeichnis ist content-stabil (Run-Id im Pfad) → lange cachebar. for part in (run, filename): if "/" in part or "\\" in part or ".." in part: return Response(status_code=404) path = os.path.join(_RADAR_DIR, run, filename) if not os.path.isfile(path): return Response(status_code=404) file_size = os.path.getsize(path) _etag = f'"{file_size:x}-{int(os.path.getmtime(path)):x}"' base_headers = {"Accept-Ranges": "bytes", "ETag": _etag, "Cache-Control": "public, max-age=3600, immutable"} if request.method == "HEAD": return Response(status_code=200, media_type="application/octet-stream", headers={**base_headers, "Content-Length": str(file_size)}) range_header = request.headers.get("range") if range_header and range_header.startswith("bytes="): rng = range_header[6:].split(",")[0] start_s, _, end_s = rng.partition("-") try: if start_s == "": length = int(end_s) start = max(0, file_size - length) end = file_size - 1 else: start = int(start_s) end = int(end_s) if end_s else file_size - 1 except ValueError: return Response(status_code=416, headers={**base_headers, "Content-Range": f"bytes */{file_size}"}) end = min(end, file_size - 1) if start > end or start >= file_size: return Response(status_code=416, headers={**base_headers, "Content-Range": f"bytes */{file_size}"}) with open(path, "rb") as f: f.seek(start) data = f.read(end - start + 1) return Response(data, status_code=206, media_type="application/octet-stream", headers={**base_headers, "Content-Range": f"bytes {start}-{end}/{file_size}"}) return FileResponse(path, media_type="application/octet-stream", headers=base_headers) @app.api_route("/tiles/{filename}", methods=["GET", "HEAD"]) async def serve_tile(filename: str, request: Request): # Kein Path-Traversal if "/" in filename or "\\" in filename or ".." in filename: return Response(status_code=404) path = os.path.join(_TILES_DIR, filename) if not os.path.isfile(path): return Response(status_code=404) file_size = os.path.getsize(path) _mtime = int(os.path.getmtime(path)) _etag = f'"{file_size:x}-{_mtime:x}"' # Versionierte URL (?v=…) ist inhaltsstabil → lange + immutable cachen. OHNE Version nur kurz cachen, # damit ein Tile-Swap (gleiche URL, neuer Inhalt) sich innerhalb ~1 Min von selbst heilt — sonst # liefert der Browser bis zu 24h die alten PMTiles-Bytes (alte Abdeckung). _versioned = "v" in request.query_params _cache = "public, max-age=31536000, immutable" if _versioned else "public, max-age=60" base_headers = {"Accept-Ranges": "bytes", "Cache-Control": _cache, "ETag": _etag} if request.method == "HEAD": return Response( status_code=200, media_type="application/octet-stream", headers={**base_headers, "Content-Length": str(file_size)}, ) range_header = request.headers.get("range") if range_header and range_header.startswith("bytes="): rng = range_header[6:].split(",")[0] # nur erster Range (pmtiles nutzt single-range) start_s, _, end_s = rng.partition("-") try: if start_s == "": # Suffix-Range "bytes=-N" length = int(end_s) start = max(0, file_size - length) end = file_size - 1 else: start = int(start_s) end = int(end_s) if end_s else file_size - 1 except ValueError: return Response(status_code=416, headers={**base_headers, "Content-Range": f"bytes */{file_size}"}) end = min(end, file_size - 1) if start > end or start >= file_size: return Response(status_code=416, headers={**base_headers, "Content-Range": f"bytes */{file_size}"}) with open(path, "rb") as f: f.seek(start) data = f.read(end - start + 1) return Response( data, status_code=206, media_type="application/octet-stream", headers={**base_headers, "Content-Range": f"bytes {start}-{end}/{file_size}"}, ) # Kein Range → ganze Datei streamen (pmtiles macht das normalerweise nicht). return FileResponse(path, media_type="application/octet-stream", headers=base_headers) @app.get("/maplibre-test") async def maplibre_test(): # Spike-Testseite: MapLibre rendert /tiles/*.pmtiles (Geometrie-Style, kein Glyph). return FileResponse(os.path.join(STATIC_DIR, "maplibre-test.html"), media_type="text/html") @app.get("/leaflet-vector-test") async def leaflet_vector_test(): # Isolationstest: protomaps-leaflet + map-vector.js + DACH-PMTiles, ohne App-Shell/Flag. return FileResponse(os.path.join(STATIC_DIR, "leaflet-vector-test.html"), media_type="text/html") @app.get("/ui-vector-test") async def ui_vector_test(): # Testet den echten ui.js-Vektor-Pfad (UI.map.create) ohne Auth/App-Shell. return FileResponse(os.path.join(STATIC_DIR, "ui-vector-test.html"), media_type="text/html") @app.get("/maplibre-perf-test") async def maplibre_perf_test(): # Wegwerf-Perf-Test: MapLibre GPU + 600 Cluster-Marker auf DACH-Basemap (Handy-Test). return FileResponse(os.path.join(STATIC_DIR, "maplibre-perf-test.html"), media_type="text/html") @app.get("/maplibre-markers-test") async def maplibre_markers_test(): # Headless-Proof für map-gl-markers.js (Cluster/Icons/Danger/Toggle/Popup, ohne Auth). return FileResponse(os.path.join(STATIC_DIR, "maplibre-markers-test.html"), media_type="text/html") # User-generierte Medien (Fotos aus Tagebuch, Giftköder-Alarm, etc.) MEDIA_DIR = os.getenv("MEDIA_DIR", "/data/media") os.makedirs(MEDIA_DIR, exist_ok=True) STAGING = os.getenv("STAGING", "false").lower() == "true" PROD_MEDIA_DIR = "/prod-media" _MIME_MAP = { ".jpg": "image/jpeg", ".jpeg": "image/jpeg", ".png": "image/png", ".webp": "image/webp", ".gif": "image/gif", ".mp4": "video/mp4", ".webm": "video/webm", ".pdf": "application/pdf", } from fastapi import Request as _Request from fastapi.responses import FileResponse as _FileResponse from auth import decode_token as _decode_token # Pfade die Login erfordern (kein DB-Lookup — UUID in Dateiname schützt ausreichend) _AUTH_REQUIRED = ("diary/", "health/", "walks/") def _is_logged_in(request: _Request) -> bool: token = request.cookies.get("by_token") if not token: return False try: _decode_token(token) return True except Exception: return False def _media_response(filepath: str): ext = os.path.splitext(filepath)[1].lower() mt = _MIME_MAP.get(ext, "application/octet-stream") return _FileResponse(filepath, media_type=mt, headers={"Cache-Control": "private, max-age=3600"}) def _resolve_media_path(path: str) -> str | None: primary = os.path.join(MEDIA_DIR, path) if os.path.isfile(primary): return primary if STAGING and os.path.isdir(PROD_MEDIA_DIR): fallback = os.path.join(PROD_MEDIA_DIR, path) if os.path.isfile(fallback): return fallback return None @app.api_route("/media/{path:path}", methods=["GET", "HEAD"]) async def serve_media(path: str, request: _Request): from fastapi import HTTPException as _HE prefix = path.split("/")[0] + "/" # Sensible Pfade: Login erforderlich — UUID-basierte Dateinamen verhindern Raten if prefix in _AUTH_REQUIRED and not _is_logged_in(request): raise _HE(401, "Anmeldung erforderlich.") filepath = _resolve_media_path(path) if not filepath: # iCloud-Hybrid (M0): Original liegt in der privaten iCloud des Besitzers, # auf dem Server existiert nur das Preview. Marker-404, damit Clients # unterscheiden können — Web fällt via onerror aufs Preview zurück. # Lookup nur im Miss-Pfad → keine Kosten für normale Medien-Requests. from database import db as _db from fastapi.responses import JSONResponse as _JSONResponse with _db() as conn: reg = conn.execute( "SELECT storage FROM media_registry WHERE url=?", ("/media/" + path,) ).fetchone() if reg and reg["storage"] == "icloud": return _JSONResponse( {"detail": "Original liegt in der iCloud des Besitzers.", "storage": "icloud"}, status_code=404, ) raise _HE(404, "Nicht gefunden.") return _media_response(filepath) # APP_VER wird zentral aus der VERSION-Datei im Projekt-Root gelesen. # Bumpe ausschliesslich via `make bump` — bumpt VERSION + sw.js + app.js + index.html atomar. def _read_app_ver() -> str: from pathlib import Path candidates = [ Path(__file__).resolve().parent.parent / "VERSION", # Projekt-Root (lokal/dev) Path("/app/VERSION"), # Container-Layout Path("/data/VERSION"), # falls als Volume gemountet ] for p in candidates: try: if p.is_file(): txt = p.read_text(encoding="utf-8").strip() if txt: return txt except Exception: pass return "0" APP_VER = _read_app_ver() # muss mit APP_VER in app.js übereinstimmen (siehe VERSION + `make bump`) @app.get("/.well-known/assetlinks.json") async def assetlinks(): """TWA-Verifikation für Google Play Store (app.banyaro.twa).""" return Response( content='[{"relation":["delegate_permission/common.handle_all_urls"],"target":{"namespace":"android_app","package_name":"app.banyaro.twa","sha256_cert_fingerprints":["49:02:DC:5B:63:C0:D7:42:7F:A4:DC:2F:EB:78:73:11:CC:B9:36:22:00:01:A0:03:1C:0A:F9:41:35:9F:D4:B7"]}}]', media_type="application/json", headers={"Cache-Control": "no-cache"}, ) @app.get("/api/version") async def app_version(): """Aktuelle Frontend-Version — wird beim App-Start gecheckt.""" return Response( content=f'{{"version":"{APP_VER}"}}', media_type="application/json", headers={"Cache-Control": "no-store"}, ) @app.get("/stats/script.js") async def umami_script_proxy(): async with httpx.AsyncClient(timeout=10) as client: r = await client.get("https://umami.motocamp.de/script.js") return Response(content=r.content, media_type="application/javascript", headers={"Cache-Control": "public, max-age=86400"}) @app.post("/stats/api/send") async def umami_send_proxy(request: Request): body = await request.body() async with httpx.AsyncClient(timeout=10) as client: r = await client.post( "https://umami.motocamp.de/api/send", content=body, headers={"Content-Type": "application/json", "User-Agent": request.headers.get("user-agent", "")}, ) return Response(content=r.content, status_code=r.status_code, media_type="application/json") @app.get("/robots.txt") async def robots(): return FileResponse(f"{STATIC_DIR}/robots.txt", media_type="text/plain") @app.get("/llms.txt") async def llms(): return FileResponse(f"{STATIC_DIR}/llms.txt", media_type="text/plain") @app.get("/sitemap.xml") async def sitemap(): from fastapi.responses import Response from database import db as _db from datetime import date today = date.today().isoformat() urls = [ ("https://banyaro.app/", "weekly", "1.0"), ("https://banyaro.app/zuechter", "weekly", "0.9"), ("https://banyaro.app/wurfboerse", "daily", "0.8"), ("https://banyaro.app/wiki/rassen", "weekly", "0.8"), ("https://banyaro.app/help", "monthly", "0.7"), ("https://banyaro.app/knigge", "monthly", "0.7"), ("https://banyaro.app/partner", "monthly", "0.6"), ("https://banyaro.app/datenschutz", "yearly", "0.3"), ("https://banyaro.app/agb", "yearly", "0.3"), ("https://banyaro.app/impressum", "yearly", "0.3"), ] try: with _db() as conn: rassen = conn.execute( "SELECT slug FROM wiki_rassen WHERE slug IS NOT NULL AND slug != '' LIMIT 500" ).fetchall() for r in rassen: urls.append((f"https://banyaro.app/wiki/rasse/{r['slug']}", "monthly", "0.7")) # Öffentliche Züchter-Profile breeders = conn.execute( "SELECT bp.zwingername FROM breeder_profiles bp " "JOIN users u ON u.id = bp.user_id " "WHERE bp.verified_at IS NOT NULL AND u.rolle = 'breeder'" ).fetchall() for b in breeders: if b["zwingername"]: from urllib.parse import quote urls.append(( f"https://banyaro.app/breeder/{quote(b['zwingername'])}", "weekly", "0.7" )) except Exception: pass entries = "\n".join( f""" {loc} {today} {freq} {prio} """ for loc, freq, prio in urls ) xml = f""" {entries} """ return Response(content=xml, media_type="application/xml") @app.get("/info") async def info_page(): return FileResponse(f"{STATIC_DIR}/landing.html", headers={"Cache-Control": "max-age=3600"}) @app.get("/zuechter") async def zuechter_landing(request: _Request): from fastapi.responses import RedirectResponse as _RR if os.getenv("STAGING") == "true": return _RR("https://banyaro.app/zuechter", status_code=301) return FileResponse(f"{STATIC_DIR}/zuechter.html", headers={"Cache-Control": "max-age=3600"}) # ------------------------------------------------------------------ # SEO: Server-gerenderete Wiki-Rassen-Übersicht /wiki/rassen # ------------------------------------------------------------------ @app.get("/wiki/rassen") async def wiki_rassen_page(): from fastapi.responses import HTMLResponse from database import db as _db with _db() as conn: rows = conn.execute( """SELECT name, slug, gruppe, groesse, aktivitaet, foto_url, kinder_geeignet, wohnung_geeignet FROM wiki_rassen WHERE slug IS NOT NULL ORDER BY name ASC""" ).fetchall() rassen = [dict(r) for r in rows] total = len(rassen) groessen_map = {"klein": "Klein", "mittel": "Mittel", "gross": "Groß", "sehr_gross": "Sehr groß"} aktivitaet_map = {"niedrig": "Niedrig", "mittel": "Mittel", "hoch": "Hoch", "sehr_hoch": "Sehr hoch"} cards = "" for r in rassen: foto = r["foto_url"] or "" img = f'{r[' if foto else '
🐕
' groesse = groessen_map.get(r.get("groesse") or "", r.get("groesse") or "") kinder = "✓ Kinder" if r.get("kinder_geeignet") else "" wohnung = "✓ Wohnung" if r.get("wohnung_geeignet") else "" tags = " ".join(f'{t}' for t in [groesse, kinder, wohnung] if t) cards += f""" {img}

{r['name']}

{r.get('gruppe') or ''}

{tags}
\n""" html = f""" Hunderassen-Wiki — {total} Rassen im Überblick | Ban Yaro

Hunderassen-Wiki

{total} Rassen — Charakter, Eignung, Pflege auf einen Blick

{cards}
""" return HTMLResponse(content=html, headers={"Cache-Control": "max-age=3600"}) # ------------------------------------------------------------------ # SEO: Server-gerenderete Wiki-Rassen-Detailseite /wiki/rasse/{slug} # ------------------------------------------------------------------ @app.get("/wiki/rasse/{slug}") async def wiki_rasse_page(slug: str): from fastapi.responses import HTMLResponse from database import db as _db def esc(s): if not s: return "" return str(s).replace("&","&").replace("<","<").replace(">",">").replace('"',""") with _db() as conn: rasse = conn.execute("SELECT * FROM wiki_rassen WHERE slug=?", (slug,)).fetchone() if not rasse: return HTMLResponse("

Rasse nicht gefunden

Alle Rassen", status_code=404) berichte = conn.execute( """SELECT wb.titel, wb.text, wb.created_at, u.name AS autor FROM wiki_berichte wb JOIN users u ON u.id=wb.user_id WHERE wb.rasse=? ORDER BY wb.created_at DESC LIMIT 20""", (slug,) ).fetchall() r = dict(rasse) try: dogs_count = conn.execute( "SELECT COUNT(DISTINCT d.user_id) FROM dogs d WHERE LOWER(d.rasse) LIKE ?", (f"%{r.get('name','').lower()}%",) ).fetchone()[0] except Exception: dogs_count = 0 try: zuchter_count = conn.execute( "SELECT COUNT(*) FROM wiki_zuchter WHERE rasse_slug=? AND verified=1", (slug,) ).fetchone()[0] except Exception: zuchter_count = 0 berichte = [dict(b) for b in berichte] berichte_count = len(berichte) groessen_map = {"klein":"Klein","mittel":"Mittel","gross":"Groß","sehr_gross":"Sehr groß"} aktivitaet_map = {"niedrig":"Niedrig","mittel":"Mittel","hoch":"Hoch","sehr_hoch":"Sehr hoch"} erfahrung_map = {"anfaenger":"Anfänger geeignet","fortgeschritten":"Für Erfahrene","experte":"Nur für Experten"} groesse = groessen_map.get(r.get("groesse") or "", r.get("groesse") or "–") aktivitaet = aktivitaet_map.get(r.get("aktivitaet") or "", r.get("aktivitaet") or "–") erfahrung = erfahrung_map.get(r.get("erfahrung") or "", r.get("erfahrung") or "–") kinder = "Ja" if r.get("kinder_geeignet") else "Bedingt" wohnung = "Ja" if r.get("wohnung_geeignet") else "Besser Haus mit Garten" gewicht = "" if r.get("gewicht_min_kg") and r.get("gewicht_max_kg"): gewicht = f"{r['gewicht_min_kg']}–{r['gewicht_max_kg']} kg" elif r.get("gewicht_max_kg"): gewicht = f"bis {r['gewicht_max_kg']} kg" foto_html = f'{esc(r[' if r.get("foto_url") else '
🐕
' facts = [ ("Gruppe / FCI", esc(r.get("gruppe") or "")), ("Herkunft", esc(r.get("herkunft") or "")), ("Größe", groesse), ("Gewicht", gewicht), ("Lebensdauer", esc(r.get("lebensdauer") or "")), ("Aktivitätslevel", aktivitaet), ("Für Anfänger", erfahrung), ("Kinder geeignet", kinder), ("Wohnungsgeeignet", wohnung), ("Ursprüngliche Aufgabe", esc(r.get("bred_for") or "")), ] facts_html = "".join( f'
{label}{val}
' for label, val in facts if val ) temperament_html = "" if r.get("temperament"): tags = [t.strip() for t in str(r["temperament"]).split(",") if t.strip()] temperament_html = ( '
' + "".join( f'{esc(t)}' for t in tags ) + '
' ) berichte_html = "" if berichte: for b in berichte: datum = b.get("created_at","")[:10] if b.get("created_at") else "" berichte_html += f"""
{esc(b.get('autor',''))} · {datum}

{esc(b.get('titel',''))}

{esc(b.get('text',''))}

""" beschreibung_html = "" if r.get("beschreibung"): beschreibung_html = ( '
' '

Charakter & Wesen

' f'

{esc(r["beschreibung"])}

' '
' ) vorkommen_html = "" if r.get("vorkommen_de"): vorkommen_html = ( '
' '

Vorkommen in Deutschland

' f'

{esc(r["vorkommen_de"])}

' '
' ) stats_html = ( '
' f'🐕 {dogs_count} Nutzer haben diesen Hund' f'🏆 {zuchter_count} Züchter eingetragen' f'💬 {berichte_count} Community-Berichte' '
' ) name = esc(r.get("name","")) gruppe = esc(r.get("gruppe") or "") herkunft = esc(r.get("herkunft") or "") temp_str = esc(r.get("temperament") or "") beschr_str = esc(r.get("beschreibung") or "") if beschr_str: desc = f"{name} — {beschr_str[:160]}".strip().rstrip(".") else: desc = f"{name} — Hunderasse aus {herkunft}. Größe: {groesse}. Aktivität: {aktivitaet}. {temp_str[:120] if temp_str else ''}".strip().rstrip(".") # Optionale Dog-Schema-Felder dog_schema_extras = [] if r.get("lebensdauer"): dog_schema_extras.append(f'"typicalAgeAtDeath":"{esc(r["lebensdauer"])}"') if herkunft: dog_schema_extras.append(f'"countryOfOrigin":"{herkunft}"') if r.get("gruppe"): dog_schema_extras.append(f'"breedGroup":"{gruppe}"') if gewicht: dog_schema_extras.append(f'"weight":"{gewicht}"') dog_extras_str = (", " + ", ".join(dog_schema_extras)) if dog_schema_extras else "" json_ld = f"""{{ "@context":"https://schema.org", "@type":"ItemPage", "headline":"{name} — Hunderasse Profil", "description":"{desc}", "url":"https://banyaro.app/wiki/rasse/{slug}", "inLanguage":"de", "publisher":{{"@type":"Organization","name":"Ban Yaro","url":"https://banyaro.app"}}, "mainEntityOfPage":{{"@type":"WebPage","@id":"https://banyaro.app/wiki/rasse/{slug}"}}, "about":{{ "@type":"Dog", "name":"{name}", "description":"{desc}"{dog_extras_str} }} }}""" html = f""" {name} — Hunderasse Profil | Ban Yaro Wiki
{foto_html}

{name}

{'

' + gruppe + '

' if gruppe else ''} {temperament_html} In der App öffnen
{beschreibung_html} {vorkommen_html}

Steckbrief

{facts_html}
{stats_html} {'

Erfahrungsberichte der Community (' + str(berichte_count) + ')

' + berichte_html + '
' if berichte else ''}

In der App

Ban Yaro ist die kostenlose Hunde-App für Deutschland, Österreich und die Schweiz. Tagebuch, Impfpass, Giftköder-Alarm, Gassi-Community und mehr — DSGVO-konform, ohne App Store.

Kostenlos starten {f'{name}-Welpen auf Ban Yaro' if zuchter_count > 0 else ''}

{f'{zuchter_count} verifizierte {name}-Züchter · ' if zuchter_count > 0 else ''}{dogs_count} Nutzer haben diesen Hund · Alle 1003 Rassen

""" return HTMLResponse(content=html, headers={"Cache-Control": "max-age=3600"}) @app.get("/favicon.ico") async def favicon(): return FileResponse(f"{STATIC_DIR}/icons/favicon.ico") @app.get("/manifest.json") async def manifest(): import json as _json IS_STAGING = os.getenv("STAGING", "false").lower() == "true" with open(f"{STATIC_DIR}/manifest.json", encoding="utf-8") as f: data = _json.load(f) if IS_STAGING: data["name"] = "Ban Yaro Staging" data["short_name"] = "BY ⚗️" data["theme_color"] = "#7c3aed" data["background_color"] = "#2d1b69" data["id"] = "/staging" # Icons mit Staging-Variante überschreiben falls vorhanden staging_icon = "/icons/icon-192-staging.png" import os as _os if _os.path.exists(f"{STATIC_DIR}/icons/icon-192-staging.png"): data["icons"] = [ {"src": "/icons/icon-192-staging.png", "sizes": "192x192", "type": "image/png", "purpose": "any maskable"}, {"src": "/icons/icon-512-staging.png", "sizes": "512x512", "type": "image/png", "purpose": "any maskable"}, ] from fastapi.responses import JSONResponse return JSONResponse(content=data, media_type="application/manifest+json") @app.get("/sw.js") async def service_worker(): return FileResponse( f"{STATIC_DIR}/sw.js", headers={"Cache-Control": "no-cache, no-store, must-revalidate"} ) # Web Share Target @app.post("/share") async def share_target(request: Request): # Empfängt geteilte Inhalte vom Handy (Fotos, Links, Text) # Weiterleitung zur App mit den Daten return FileResponse( f"{STATIC_DIR}/index.html", headers={"Cache-Control": "no-store, no-cache"} ) # Öffentliche Hunde-Profilseite (für NFC-Tags, kein Login nötig) @app.get("/hund/{dog_id}") async def public_dog_page(dog_id: int): from database import db as _db _og_name = "Hunde-Profil" _og_desc = "Hunde-Profil auf Ban Yaro — der deutschen Hunde-Plattform" _og_img = "https://banyaro.app/icons/icon-512.png" try: with _db() as conn: _dog = conn.execute( "SELECT name, rasse, foto_url, bio FROM dogs WHERE id=? AND is_public=1", (dog_id,) ).fetchone() if _dog: _og_name = _dog["name"] _rasse = f" · {_dog['rasse']}" if _dog.get("rasse") else "" _og_desc = f"{_dog['name']}{_rasse} — Profil auf Ban Yaro" if _dog.get("bio"): _og_desc = f"{_dog['name']}{_rasse}: {str(_dog['bio'])[:120]}" if _dog.get("foto_url"): _og_img = f"https://banyaro.app{_dog['foto_url']}" except Exception: pass _s = html.escape # XSS-Schutz für OG-Meta-Tags _html = f""" {_s(_og_name)} — Ban Yaro
Lade Profil…
""" from fastapi.responses import HTMLResponse return HTMLResponse(content=_html) # ------------------------------------------------------------------ # Einladungsseite /teilen/{token} — SPA lädt + nimmt Einladung an # ------------------------------------------------------------------ @app.get("/teilen/{token}") async def invite_page(token: str): from fastapi.responses import HTMLResponse with open(f"{STATIC_DIR}/index.html", encoding="utf-8") as _f: _html = _f.read() _html = _html.replace( '', '' ) return HTMLResponse(content=_html, headers={"Cache-Control": "no-store, no-cache"}) @app.get("/breeder/{zwingername}") async def breeder_profile_page(zwingername: str): from fastapi.responses import HTMLResponse from urllib.parse import unquote from database import db as _db import html as _html_mod name = unquote(zwingername) desc = f"Hundezüchter {_html_mod.escape(name)} auf Ban Yaro — Wurfbörse, Stammbaum und mehr." try: with _db() as conn: bp = conn.execute( "SELECT bp.rasse, bp.beschreibung FROM breeder_profiles bp " "JOIN users u ON u.id = bp.user_id WHERE bp.zwingername=? AND u.rolle='breeder' LIMIT 1", (name,) ).fetchone() if bp and bp["beschreibung"]: desc = _html_mod.escape(bp["beschreibung"][:160]) except Exception: pass with open(f"{STATIC_DIR}/index.html", encoding="utf-8") as _f: _page = _f.read() _page = _page.replace( '', f'' ).replace( 'Ban Yaro', f'{_html_mod.escape(name)} — Hundezüchter auf Ban Yaro' f'\n ' f'\n ' ) return HTMLResponse(content=_page, headers={"Cache-Control": "no-store, no-cache"}) @app.get("/litters") async def litters_page(): return FileResponse(f"{STATIC_DIR}/index.html", headers={"Cache-Control": "no-store, no-cache"}) # ------------------------------------------------------------------ # Widget-Vorschau /widget # ------------------------------------------------------------------ @app.get("/widget") async def widget_page(): return FileResponse(f"{STATIC_DIR}/index.html", headers={"Cache-Control": "no-store, no-cache"}) # ------------------------------------------------------------------ # Digitaler Heimtierausweis /ausweis/{dog_id} # ------------------------------------------------------------------ @app.get("/ausweis/{dog_id}") async def ausweis_page(dog_id: int, request: Request): from fastapi.responses import HTMLResponse from auth import get_current_user_optional, decode_token import json as _json # Auth via Cookie token = request.cookies.get("by_token") user_id = None if token: try: payload = decode_token(token) user_id = int(payload["sub"]) except Exception: pass if not user_id: return HTMLResponse( '

' 'Bitte einloggen um den Ausweis anzuzeigen.

', status_code=401 ) from database import db as _db with _db() as conn: dog = conn.execute( """SELECT d.* FROM dogs d LEFT JOIN dog_shares ds ON ds.dog_id=d.id AND ds.shared_with_id=? AND ds.accepted_at IS NOT NULL WHERE d.id=? AND (d.user_id=? OR ds.id IS NOT NULL)""", (user_id, dog_id, user_id) ).fetchone() if not dog: return HTMLResponse("

Hund nicht gefunden.

", status_code=404) owner = conn.execute("SELECT name, email FROM users WHERE id=?", (dog["user_id"],)).fetchone() health_rows = conn.execute( "SELECT * FROM health WHERE dog_id=? ORDER BY datum DESC", (dog_id,) ).fetchall() vets = conn.execute( """SELECT DISTINCT t.name, t.strasse, t.plz, t.ort, t.telefon FROM tieraerzte t JOIN health h ON h.tierarzt_id = t.id WHERE h.dog_id=?""", (dog_id,) ).fetchall() dog = dict(dog) vets = [dict(v) for v in vets] def esc(s): if not s: return "" return str(s).replace("&","&").replace("<","<").replace(">",">").replace('"',""") def fmt_date(d): if not d: return "–" try: from datetime import date parts = d.split("-") return f"{int(parts[2])}.{int(parts[1])}.{parts[0]}" except Exception: return d geschlecht = {"m": "Rüde", "w": "Hündin"}.get(dog.get("geschlecht",""), "–") # Impfungen impfungen = [r for r in health_rows if r["typ"] == "impfung"] # Medikamente (aktiv) medis = [r for r in health_rows if r["typ"] == "medikament" and r["aktiv"]] # Allergien allergien = [r for r in health_rows if r["typ"] == "allergie"] def health_rows_html(rows, cols): if not rows: return 'Keine Einträge' out = "" for r in rows: out += "" + "".join(f"{esc(r[c])}" for c in cols) + "" return out photo_html = f'{esc(dog[' if dog.get("foto_url") else '
🐕
' vets_html = "" for v in vets: addr = ", ".join(filter(None, [v.get("strasse"), v.get("plz"), v.get("ort")])) vets_html += f'
{esc(v["name"])}' if addr: vets_html += f'
{esc(addr)}' if v.get("telefon"): vets_html += f'
☎ {esc(v["telefon"])}' vets_html += "
" if not vets_html: vets_html = 'Keine Tierärzte eingetragen' html = f""" Heimtierausweis – {esc(dog["name"])}
{photo_html}

{esc(dog["name"])}

{esc(dog.get("rasse") or "Rasse unbekannt")}
Geburtstag
{fmt_date(dog.get("geburtstag"))}
Geschlecht
{geschlecht}
Gewicht
{f'{dog["gewicht_kg"]} kg' if dog.get("gewicht_kg") else "–"}
Transponder
{esc(dog.get("chip_nr")) or "–"}
{f'
Widerrist
{dog["widerrist_cm"]} cm
' if dog.get("widerrist_cm") else ''}
Besitzer
{esc(owner["name"]) if owner else "–"}

Impfungen

{health_rows_html(impfungen, ["bezeichnung","datum","naechstes","charge_nr","tierarzt_name"])}
ImpfungDatumNächste FälligkeitChargeTierarzt

Aktive Medikamente

{health_rows_html(medis, ["bezeichnung","datum","dosierung","haeufigkeit"])}
MedikamentSeitDosierungHäufigkeit

Allergien & Unverträglichkeiten

{health_rows_html(allergien, ["bezeichnung","schweregrad","reaktion","datum"])}
AllergenSchweregradReaktionSeit

Tierärzte

{vets_html}
""" return HTMLResponse(html) # ------------------------------------------------------------------ # SEO: Hunde-Knigge als server-gerenderete Seite /knigge # ------------------------------------------------------------------ @app.get("/knigge") async def knigge_page(): from fastapi.responses import HTMLResponse begegnungen = [ ("Fremder Hund", "Kurze Leine, ruhig bleiben. Hunde schnüffeln lassen wenn beide entspannt sind. Bei Eskalation: weglenken, Richtung wechseln. Freilaufende Hunde auf angeleine Hunde zulaufen lassen ist unhöflich — der angeleine Hund kann in Stress (Leinenfrust) geraten."), ("Kinder", "Hund nie unbeaufsichtigt mit fremden Kindern lassen. Kind fragen ob es streicheln darf. Hund seitlich positionieren, nicht zwischen Kind und Weg. Kinder sollten keinen direkten Augenkontakt mit unbekannten Hunden halten."), ("Radfahrer", "Hund rechtzeitig an die Seite nehmen. Fahrräder können für manche Hunde bedrohlich wirken. Frühzeitig Abstand gewinnen, ruhig bleiben, Hund bei sich halten."), ("Jogger", "Kurze Leine, Abstand halten, Hund nicht anspringen lassen. Jogger bewegen sich schnell — das kann Jagdinstinkt auslösen."), ("ÖPNV (Bus & Bahn)", "In Deutschland gilt im ÖPNV grundsätzlich Maulkorbpflicht für Hunde. Kleine Hunde in Transportbox fahren kostenlos oder zum Kindertarif. Große Hunde brauchen in den meisten Städten einen eigenen Fahrschein. Regeln variieren je nach Verkehrsverbund."), ("Supermarkt & Geschäfte", "Es gilt das Hausrecht des Betreibers. Ein 'Hunde willkommen'-Schild ist eine explizite Einladung. Im Zweifel immer fragen. Außen anbinden ist nur kurzzeitig und mit Sichtkontakt akzeptabel."), ("Restaurant", "Im Außenbereich erlauben viele Restaurants Hunde — aber Hausrecht gilt. Wenn ein anderer Gast Angst hat, ist Kompromissbereitschaft ein Zeichen guter Hundehaltung. Im Zweifelsfall Personal entscheiden lassen."), ("Spielplätze", "Hunde sind auf Kinderspielplätzen generell verboten (§ 2 Abs. 4 BImSchG und Gemeindesatzungen). Gilt auch für gut erzogene Hunde. Abstand halten, auch beim Vorbeigehen."), ] szenarien = [ ("Muss Kot immer aufgesammelt werden?", "Ja — auch im Gebüsch abseits des Weges. Kinder spielen überall, und Parasiten wie Spulwurm können für Menschen gefährlich sein. Bußgelder für liegengelassenen Hundekot liegen je nach Stadt zwischen 50 € und 500 €."), ("Leinenpflicht ohne Schild?", "Leinenpflicht ist Ländersache. Viele Bundesländer haben eine allgemeine Anleinpflicht in Ortschaften und öffentlichen Grünanlagen. Im Zweifel anleinen und die Gemeindesatzung prüfen."), ("Hund frei laufen trotz angeleintem Hund?", "Nein. Freilaufende Hunde auf angeleine Hunde zuzulassen ist unhöflich und kann den angeleinten Hund in Stress versetzen (Leinenfrust). Immer erst anleinen und fragen ob ein Treffen gewünscht ist."), ("Haftung bei Beißvorfall?", "Hundehalter haften verschuldensunabhängig für Schäden durch ihren Hund (§ 833 BGB). Eine Hunde-Haftpflichtversicherung ist in vielen Bundesländern Pflicht und in jedem Fall empfehlenswert."), ] begs_html = "".join( f'

{titel}

{text}

' for titel, text in begegnungen ) szen_html = "".join( f'

{frage}

{antwort}

' for frage, antwort in szenarien ) html = """ Hunde-Knigge — Regeln für Hundebesitzer | Ban Yaro

Hunde-Knigge

Regeln, Tipps und häufige Fragen für Hundebesitzer — im Alltag, im ÖPNV und in der Community

Begegnungen im Alltag

""" + begs_html + """

Häufige Fragen & Regeln

""" + szen_html + """

Mehr Tipps in der Ban Yaro App

Community-Abstimmungen zu kniffligen Situationen, KI-Situationsberater und alle Hunde-Funktionen kostenlos nutzen.

Kostenlos starten
""" return HTMLResponse(content=html, headers={"Cache-Control": "max-age=7200"}) # ------------------------------------------------------------------ # /presse — Presseseite # ------------------------------------------------------------------ @app.get("/presse") async def presse(): return FileResponse(f"{STATIC_DIR}/presse.html", headers={"Cache-Control": "max-age=3600"}) @app.get("/help") async def help_page(): """Öffentliche, server-gerenderte Hilfe/FAQ-Seite. Kein Login nötig — Apple-Reviewer und Suchmaschinen kommen hier direkt rein.""" from fastapi.responses import HTMLResponse import html as _html from routes.help import _load_active_help_articles KAT_LABEL = { "installation": "Installation & PWA", "erste_schritte": "Erste Schritte", "standort": "Standort & Wetter", "account": "Account & Passwort", "features": "Features erklärt", "probleme": "Technische Probleme", } articles = _load_active_help_articles() # Gruppieren nach Kategorie, Reihenfolge aus KAT_LABEL by_kat: dict[str, list] = {} for a in articles: by_kat.setdefault(a["kategorie"], []).append(a) kat_order = [k for k in KAT_LABEL.keys() if k in by_kat] + [ k for k in by_kat.keys() if k not in KAT_LABEL ] import json as _json sections_html = "" faq_items = [] for kat in kat_order: label = KAT_LABEL.get(kat, kat.replace("_", " ").title()) items = "".join( f'
{_html.escape(a["frage"])}' f'
{_html.escape(a["antwort"]).replace(chr(10), "
")}
' for a in by_kat[kat] ) sections_html += f'

{_html.escape(label)}

{items}
' for a in by_kat[kat]: faq_items.append({ "@type": "Question", "name": a["frage"], "acceptedAnswer": {"@type": "Answer", "text": a["antwort"]} }) faq_json_ld = _json.dumps(faq_items, ensure_ascii=False) html = f""" Hilfe & FAQ — Ban Yaro

Hilfe & FAQ

Antworten zu Ban Yaro und Ban Yaro Go — der nativen iOS-App für unterwegs.

{sections_html}

Direkt-Kontakt

Wenn du hier nichts findest, schreib uns:

support@banyaro.app

Mehr Hilfe gibt es nach dem Anmelden im Suchbereich von banyaro.app/#hilfe.

Impressum · Datenschutz · AGB

""" return HTMLResponse(content=html, headers={"Cache-Control": "max-age=600"}) @app.get("/konto-loeschen") async def konto_loeschen(): from fastapi.responses import HTMLResponse html = """ Konto löschen — Ban Yaro

← Zurück zu Ban Yaro

Konto löschen

Du kannst dein Ban Yaro-Konto und alle zugehörigen Daten dauerhaft löschen.

⚠️ Diese Aktion ist unwiderruflich. Alle Daten (Tagebuch, Gesundheit, Training, Fotos) werden dauerhaft gelöscht.

So löschst du dein Konto:

  1. Öffne banyaro.app und melde dich an
  2. Tippe auf das Menü-Symbol oben rechts
  3. Gehe zu Einstellungen
  4. Scrolle nach unten zu „Konto löschen"
  5. Bestätige die Löschung
Ban Yaro öffnen

Alternativ kannst du die Löschung per E-Mail an support@banyaro.app beantragen.

""" return HTMLResponse(content=html, headers={"Cache-Control": "max-age=3600"}) # /force-update — SW + Cache-Killer für hartnäckige alte Versionen # ------------------------------------------------------------------ @app.get("/force-update") async def force_update(): from fastapi.responses import HTMLResponse html = """ Ban Yaro — Update
⏳ Einen Moment…

Wir besorgen neue Leckerlis 🦴

""" return HTMLResponse(content=html, headers={"Cache-Control": "no-store"}) # /q/{token} — Partner-QR-Scan: zählen + auf Registrierung mit Code umleiten # ------------------------------------------------------------------ @app.get("/q/{token}") async def partner_qr_scan(token: str): from fastapi.responses import RedirectResponse as _Redirect from database import db as _db token = token.strip() with _db() as conn: row = conn.execute( "SELECT token FROM partner_qr_codes WHERE token = ?", (token,) ).fetchone() if not row: return _Redirect("/", status_code=302) conn.execute( """UPDATE partner_qr_codes SET scans = scans + 1, first_scan_at = COALESCE(first_scan_at, datetime('now')), last_scan_at = datetime('now') WHERE token = ?""", (token,) ) # Bewusst NUR der Token in der URL — der tippbare Partner-Code bleibt verborgen # (sonst könnte jeder Sticker-Scanner den Code ablesen und beliebig weitergeben). # Die Registrierung löst den Code server-seitig aus dem Token auf. return _Redirect(f"/?qr={row['token']}", status_code=302) # ------------------------------------------------------------------ # /partner — Influencer-Landingpage # ------------------------------------------------------------------ @app.get("/partner") async def partner_landing(): from fastapi.responses import HTMLResponse from database import db as _db with _db() as conn: total_founders = conn.execute("SELECT COUNT(*) FROM users WHERE is_founder=1").fetchone()[0] partners = conn.execute( """SELECT label, uses FROM partner_codes WHERE grants_founder=1 ORDER BY uses DESC LIMIT 5""" ).fetchall() open_slots = max(0, 100 - total_founders) partner_rows = ''.join([ f'
{p["label"]}' f'{p["uses"]} Gründer
' for p in partners ]) or '
Noch keine Partner aktiv — sei der Erste.
' html = f""" Ban Yaro Partner — Werde Teil der ersten 100
Ban Yaro · Influencer-Programm

Gib deiner Community
etwas für immer.

100 Gründer-Plätze. Weltweit. Nie wieder erhältlich.
Als Partner bringst du deine Follower nach vorne — und steigst im Ranking auf.

Jetzt Partner werden
{open_slots}
Gründer-Plätze noch frei (von 100)
0{total_founders} vergeben100
Was du und deine Community bekommen
🏆
Gründer-Lizenz für deine Follower
Jeder der sich mit deinem Code registriert bekommt einen der 100 Gründer-Plätze — mit einer nummerierten Badge „Gründer #N" die dauerhaft im Profil und im Forum sichtbar ist. Nie wieder erhältlich.
🤝
Dein persönlicher Partner-Code
Du bekommst einen eigenen Code (z.B. HUNDEBLOG). Follower die sich damit registrieren werden automatisch Gründer — du siehst in Echtzeit wie viele du gebracht hast.
📊
Öffentliches Partner-Ranking
Auf der Gründer-Seite siehen alle wer die meisten Gründer gebracht hat. Das Ranking motiviert deine Follower mitzumachen — und stärkt deine Position gegenüber anderen Influencern.
💜
Partner-Badge für dich
Du selbst bekommst ein „Partner"-Badge in deinem Profil — sichtbar für alle Nutzer der App.
🎁
Lebenslang kostenlos — für immer
Gründer zahlen nie für Premium-Features — egal was wir in Zukunft einführen. Das ist ein echtes Dankeschön für die Pioniere.
Wie es funktioniert
1
Kontakt aufnehmenSchreib uns kurz an partner@banyaro.app — wir richten deinen persönlichen Code ein.
2
Code teilenDu postest deinen Code in Story, Reel oder Post — deine Follower registrieren sich auf banyaro.app.
3
Gründer werdenJede Registrierung mit deinem Code sichert automatisch einen der 100 Gründer-Plätze. Du siehst deinen Fortschritt in Echtzeit.
4
Im Ranking aufsteigenJe mehr Gründer du bringst, desto höher dein Platz auf der öffentlichen Gründer-Seite.
{'
🏅 Aktuelles Partner-Ranking
' + partner_rows + '
' if partners else ''}
Was ist Ban Yaro?

Ban Yaro ist die Hunde-App für alles was Halter brauchen — Tagebuch, Gesundheit, Routen, Giftköder-Alarm, Community. Kostenlos, ohne App Store, direkt im Browser oder als PWA.

banyaro.app entdecken →

Bereit dabei zu sein?

Schreib uns kurz wer du bist und auf welchem Kanal du aktiv bist — wir richten deinen Code binnen 24h ein.

📧 partner@banyaro.app
""" return HTMLResponse(content=html, headers={"Cache-Control": "no-store, no-cache"}) # ------------------------------------------------------------------ # Honeypot-Fallen für Scanner und Bots # Jeder Aufruf → 24h IP-Sperre # ------------------------------------------------------------------ from ratelimit import block_ip as _block_ip _HONEYPOT_PATHS = [ "/api/admin/users", "/api/v1/users", "/api/users", "/api/.env", "/api/config", "/api/setup", "/api/install", "/api/phpinfo", "/api/debug", "/api/actuator", "/api/actuator/health", "/api/swagger", "/api/graphql", ] async def _honeypot_handler(request: Request): import logging as _log _log.getLogger("banyaro.security").warning( "Honeypot getroffen: %s %s — IP %s", request.method, request.url.path, request.client.host if request.client else "?" ) _block_ip(request, hours=24) from fastapi.responses import JSONResponse return JSONResponse(status_code=404, content={"detail": "Not Found"}) for _hp in _HONEYPOT_PATHS: app.add_api_route(_hp, _honeypot_handler, methods=["GET", "POST", "PUT", "DELETE"], include_in_schema=False) # ------------------------------------------------------------------ # Digitaler Hundepass — öffentlicher Share-Link (kein Login nötig) # ------------------------------------------------------------------ @app.get("/pass/{token}") async def passport_share_page(token: str): from fastapi.responses import HTMLResponse from database import db as _db from datetime import date as _date with _db() as conn: share = conn.execute( "SELECT * FROM passport_shares WHERE token=?", (token,) ).fetchone() if not share: return HTMLResponse( '' '

Link nicht gefunden

Dieser Hundepass-Link ist ungültig.

', status_code=404 ) if share["valid_until"] < _date.today().isoformat(): return HTMLResponse( '' '

Link abgelaufen

Dieser Hundepass-Link ist nicht mehr gültig.

', status_code=410 ) dog_id = share["dog_id"] dog = conn.execute("SELECT * FROM dogs WHERE id=?", (dog_id,)).fetchone() meta = conn.execute("SELECT * FROM dog_passport_meta WHERE dog_id=?", (dog_id,)).fetchone() vaccs = conn.execute( "SELECT * FROM vaccinations WHERE dog_id=? ORDER BY datum DESC", (dog_id,) ).fetchall() meds = conn.execute( "SELECT * FROM medications WHERE dog_id=? ORDER BY von DESC, id DESC", (dog_id,) ).fetchall() def _fmt(d): if not d: return "–" try: from datetime import datetime as _dt return _dt.strptime(d[:10], "%Y-%m-%d").strftime("%d.%m.%Y") except Exception: return d dog = dict(dog) meta = dict(meta) if meta else {} vaccs = [dict(v) for v in vaccs] meds = [dict(m) for m in meds] _g_map = {"m": "Rüde", "w": "Hündin"} vacc_rows = "".join(f""" {v['krankheit'] or ''} {_fmt(v['datum'])} {_fmt(v['naechste'])} {v['tierarzt'] or '–'} {v['charge_nr'] or '–'} """ for v in vaccs) or "Keine Einträge" med_rows = "".join(f""" {m['name'] or ''} {m['dosierung'] or '–'} {_fmt(m['von'])} {_fmt(m['bis']) if m['bis'] else 'dauerhaft'} {m['notiz'] or '–'} """ for m in meds) or "Keine Einträge" html = f""" Hundepass — {dog['name']}

Ban Yaro

Digitaler Hundepass — {dog['name']}

Hundeangaben

{dog['name']}
{dog.get('rasse') or '–'}
{_fmt(dog.get('geburtstag'))}
{_g_map.get(dog.get('geschlecht',''), '–')}
{dog.get('chip_nr') or '–'}
{meta.get('blutgruppe') or '–'}
{('
' f'
{meta["allergien"]}
') if meta.get("allergien") else ''} {('
' f'
{meta["besonderheiten"]}
') if meta.get("besonderheiten") else ''}

Impfungen

{vacc_rows}
KrankheitDatumNächsteTierarztCharge

Medikamente

{med_rows}
MedikamentDosierungVonBisNotiz
""" return HTMLResponse(html) # ------------------------------------------------------------------ # Wurfbörse /wurfboerse — SSR-Seite mit korrektem Canonical # ------------------------------------------------------------------ @app.get("/wurfboerse") async def wurfboerse_page(): from fastapi.responses import HTMLResponse from database import db as _db import html as _h import json as _json litters_html = "" rows = [] ld_items = [] try: with _db() as conn: rows = conn.execute( """SELECT l.id, l.welpen_verfuegbar, l.preis_spanne, l.status, bp.zwingername, bp.rasse, wr.name AS rasse_name FROM litters l JOIN breeder_profiles bp ON bp.id = l.breeder_id JOIN users u ON u.id = bp.user_id LEFT JOIN wiki_rassen wr ON wr.id = bp.breed_id WHERE l.sichtbar=1 AND u.rolle='breeder' AND (l.sichtbar_bis IS NULL OR l.sichtbar_bis >= date('now')) ORDER BY l.created_at DESC LIMIT 60""" ).fetchall() for i, r in enumerate(rows, 1): rasse_label = _h.escape(r["rasse_name"] or r["rasse"] or "") zw = _h.escape(r["zwingername"] or "") verfueg = r["welpen_verfuegbar"] preis = _h.escape(r["preis_spanne"] or "") status_map = {"geplant": "Geplant", "geboren": "Geboren", "verfuegbar": "Verfügbar"} status_label = status_map.get(r["status"], r["status"]) litters_html += f"""
{rasse_label or "Unbekannte Rasse"}
Züchter: {zw}
{status_label} {f'{verfueg} Welpen verfügbar' if verfueg else ''} {f'{preis}' if preis else ''}
""" ld_items.append({ "@type": "ListItem", "position": i, "name": f"{r['rasse_name'] or r['rasse'] or 'Welpen'} — {r['zwingername'] or 'Züchter'}", "url": f"https://banyaro.app/breeder/{r['zwingername']}" }) except Exception: pass count_text = f"{len(rows)} Würfe" if litters_html else "Aktuell keine Würfe eingetragen" ld_json = _json.dumps({ "@context": "https://schema.org", "@type": "ItemList", "name": "Wurfbörse — Hundewelpen bei Ban Yaro", "description": "Aktuelle Würfe von geprüften Züchtern auf Ban Yaro", "url": "https://banyaro.app/wurfboerse", "numberOfItems": len(rows), "itemListElement": ld_items }, ensure_ascii=False) html = f""" Wurfbörse — Hundewelpen bei Ban Yaro

Wurfbörse

Hundewelpen von geprüften Züchtern

{count_text}

{litters_html or '

Aktuell keine Würfe eingetragen.
Schau bald wieder vorbei!

'}
""" return HTMLResponse(content=html, headers={"Cache-Control": "max-age=1800"}) # Rechtsseiten: Pfad-URLs (SEO-Footer, App-Store-Metadaten, E-Mails) auf die # Rechtsseiten als eigenständige, crawlbare HTML-Seiten ausliefern (einzige Inhaltsquelle). # Die SPA-Module (#agb, #datenschutz, #impressum) holen denselben Inhalt per fetch und # injizieren ihn — so bleibt die In-App-Ansicht erhalten, ohne Text-Duplikat. # Muss VOR dem SPA-Fallback registriert sein. @app.get("/agb") @app.get("/datenschutz") @app.get("/impressum") async def legal_page(request: _Request): page = request.url.path.strip("/") return FileResponse(f"{STATIC_DIR}/{page}.html", headers={"Cache-Control": "max-age=3600"}) # SPA Fallback — ALLE nicht-API-Routen gehen zur index.html @app.get("/{full_path:path}") async def spa_fallback(full_path: str): IS_STAGING = os.getenv("STAGING", "false").lower() == "true" if IS_STAGING: from fastapi.responses import HTMLResponse with open(f"{STATIC_DIR}/index.html", encoding="utf-8") as f: html = f.read() html = html.replace( '', '', ) return HTMLResponse(content=html, headers={"Cache-Control": "no-store, no-cache"}) return FileResponse( f"{STATIC_DIR}/index.html", headers={"Cache-Control": "no-store, no-cache"} )