"""BAN YARO — Push-Notification Routes""" import os import json import logging from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel from typing import Optional from pywebpush import webpush, WebPushException from database import db from auth import get_current_user router = APIRouter() logger = logging.getLogger(__name__) VAPID_PUBLIC_KEY = os.getenv("VAPID_PUBLIC_KEY", "") VAPID_PRIVATE_KEY = os.getenv("VAPID_PRIVATE_KEY", "") VAPID_CONTACT = os.getenv("VAPID_CONTACT", "mailto:admin@banyaro.app") # ------------------------------------------------------------------ # GET /api/push/vapid-key — Public Key für Frontend # ------------------------------------------------------------------ @router.get("/vapid-key") async def get_vapid_key(): if not VAPID_PUBLIC_KEY: raise HTTPException(503, "Push-Notifications nicht konfiguriert.") return {"vapid_public_key": VAPID_PUBLIC_KEY} # ------------------------------------------------------------------ # POST /api/push/subscribe — Subscription speichern # ------------------------------------------------------------------ class PushSubscription(BaseModel): endpoint: str keys: dict # { p256dh, auth } expirationTime: Optional[int] = None @router.post("/subscribe", status_code=201) async def subscribe(data: PushSubscription, user=Depends(get_current_user)): p256dh = data.keys.get("p256dh", "") auth = data.keys.get("auth", "") with db() as conn: conn.execute(""" INSERT INTO push_subscriptions (user_id, endpoint, p256dh, auth) VALUES (?, ?, ?, ?) ON CONFLICT(endpoint) DO UPDATE SET p256dh=excluded.p256dh, auth=excluded.auth, user_id=excluded.user_id """, (user["id"], data.endpoint, p256dh, auth)) logger.info(f"Push-Subscription gespeichert für user {user['id']}") return {"ok": True} # ------------------------------------------------------------------ # DELETE /api/push/subscribe — Subscription entfernen # ------------------------------------------------------------------ @router.delete("/subscribe") async def unsubscribe(user=Depends(get_current_user)): with db() as conn: conn.execute( "DELETE FROM push_subscriptions WHERE user_id=?", (user["id"],) ) return {"ok": True} # ------------------------------------------------------------------ # Interne Hilfsfunktion: Push an einen oder alle User schicken # ------------------------------------------------------------------ def send_push(subscription_row, payload: dict) -> bool: """Schickt eine Push-Notification. Gibt True bei Erfolg zurück.""" if not VAPID_PRIVATE_KEY: return False try: webpush( subscription_info={ "endpoint": subscription_row["endpoint"], "keys": { "p256dh": subscription_row["p256dh"], "auth": subscription_row["auth"], }, }, data=json.dumps(payload), vapid_private_key=VAPID_PRIVATE_KEY, vapid_claims={"sub": VAPID_CONTACT}, ) return True except WebPushException as e: status = e.response.status_code if e.response else 0 logger.warning(f"Push fehlgeschlagen (HTTP {status}): {e}") # 410 Gone = Subscription abgelaufen → aus DB löschen if status == 410: with db() as conn: conn.execute( "DELETE FROM push_subscriptions WHERE endpoint=?", (subscription_row["endpoint"],) ) return False def send_push_to_user(user_id: int, payload: dict): """Schickt Push an alle Subscriptions eines Users.""" with db() as conn: rows = conn.execute( "SELECT * FROM push_subscriptions WHERE user_id=?", (user_id,) ).fetchall() sent = 0 for row in rows: if send_push(row, payload): sent += 1 return sent def send_push_to_all(payload: dict): """Schickt Push an alle abonnierten User (z.B. Giftköder-Alarm).""" with db() as conn: rows = conn.execute("SELECT * FROM push_subscriptions").fetchall() sent = 0 for row in rows: if send_push(row, payload): sent += 1 logger.info(f"Push an {sent}/{len(rows)} Subscriptions gesendet.") return sent