"""BAN YARO — Partner-Codes + Gründer-Lizenz""" from typing import Optional from fastapi import APIRouter, HTTPException, Depends from pydantic import BaseModel, Field from database import db from auth import require_admin, get_current_user router = APIRouter() class PartnerCodeCreate(BaseModel): code: str = Field(..., min_length=1, max_length=50) label: str = Field(..., min_length=1, max_length=200) grants_founder: int = 1 max_uses: Optional[int] = None class GrantRequest(BaseModel): is_founder: Optional[int] = None is_partner: Optional[int] = None # ------------------------------------------------------------------ # Admin: Partner-Codes verwalten # ------------------------------------------------------------------ @router.get("/admin/partner/codes") def list_partner_codes(user=Depends(require_admin)): """Alle Partner-Codes mit Stats (admin only).""" with db() as conn: rows = conn.execute( """SELECT pc.id, pc.code, pc.label, pc.grants_founder, pc.max_uses, pc.uses, pc.created_at, u.name AS created_by_name FROM partner_codes pc LEFT JOIN users u ON u.id = pc.created_by ORDER BY pc.created_at DESC""" ).fetchall() return [dict(r) for r in rows] @router.post("/admin/partner/codes", status_code=201) def create_partner_code(data: PartnerCodeCreate, user=Depends(require_admin)): """Neuen Partner-Code erstellen (admin only).""" code = data.code.strip().upper() if not code: raise HTTPException(400, "Code darf nicht leer sein.") with db() as conn: existing = conn.execute( "SELECT id FROM partner_codes WHERE code=?", (code,) ).fetchone() if existing: raise HTTPException(400, "Dieser Code existiert bereits.") conn.execute( """INSERT INTO partner_codes (code, label, grants_founder, max_uses, created_by) VALUES (?, ?, ?, ?, ?)""", (code, data.label.strip(), data.grants_founder, data.max_uses, user["id"]) ) row = conn.execute( "SELECT * FROM partner_codes WHERE code=?", (code,) ).fetchone() return dict(row) @router.delete("/admin/partner/codes/{code_id}", status_code=204) def delete_partner_code(code_id: int, user=Depends(require_admin)): """Partner-Code löschen (admin only).""" with db() as conn: existing = conn.execute( "SELECT id FROM partner_codes WHERE id=?", (code_id,) ).fetchone() if not existing: raise HTTPException(404, "Partner-Code nicht gefunden.") conn.execute("DELETE FROM partner_codes WHERE id=?", (code_id,)) return None @router.post("/admin/partner/users/{user_id}/grant") def grant_user_status(user_id: int, data: GrantRequest, user=Depends(require_admin)): """Founder- und/oder Partner-Status für einen User setzen (admin only).""" updates = {} if data.is_founder is not None: updates["is_founder"] = data.is_founder if data.is_partner is not None: updates["is_partner"] = data.is_partner if not updates: raise HTTPException(400, "Mindestens is_founder oder is_partner muss angegeben werden.") with db() as conn: target = conn.execute( "SELECT id, is_founder, founder_number FROM users WHERE id=?", (user_id,) ).fetchone() if not target: raise HTTPException(404, "User nicht gefunden.") if updates.get("is_founder") == 1 and not target["founder_number"]: # Atomare Gründer-Vergabe — kein TOCTOU mehr zwischen COUNT und UPDATE. # Sub-Query wird gegen Snapshot vor dem UPDATE evaluiert (SQL-Spec). cur = conn.execute( """UPDATE users SET is_founder = 1, founder_number = ( SELECT IFNULL(MAX(founder_number), 0) + 1 FROM users WHERE is_founder = 1 ) WHERE id = ? AND (SELECT COUNT(*) FROM users WHERE is_founder = 1) < ? AND (is_founder IS NULL OR is_founder = 0)""", (user_id, FOUNDER_MAX) ) if cur.rowcount == 0: raise HTTPException(400, f"Alle {FOUNDER_MAX} Gründer-Plätze sind vergeben.") # is_founder + founder_number sind atomar gesetzt — aus updates entfernen updates.pop("is_founder", None) updates.pop("founder_number", None) elif updates.get("is_founder") == 0: # Gründer-Status entfernen → founder_number ebenfalls leeren updates["founder_number"] = None if updates: # nach atomarer Founder-Vergabe ggf. leer set_clause = ", ".join(f"{k}=?" for k in updates) conn.execute( f"UPDATE users SET {set_clause} WHERE id=?", (*updates.values(), user_id) ) row = conn.execute( "SELECT id, name, email, is_founder, is_partner, founder_number FROM users WHERE id=?", (user_id,) ).fetchone() return dict(row) @router.get("/admin/users/search") def search_users(q: str, user=Depends(require_admin)): """User-Suche für Admin (Name-Präfix, max. 10 Ergebnisse).""" with db() as conn: rows = conn.execute( """SELECT id, name, email, is_founder, is_partner, rolle FROM users WHERE name LIKE ? COLLATE NOCASE ORDER BY name LIMIT 10""", (f"{q}%",) ).fetchall() return [dict(r) for r in rows] # ------------------------------------------------------------------ # Öffentlich: Gründer-Leaderboard + Code-Info # ------------------------------------------------------------------ FOUNDER_MAX = 100 @router.get("/partner/founders/stats") def founders_stats(): """Öffentliches Gründer-Leaderboard: Slots, Partner-Ranking, Gründer-Liste.""" with db() as conn: total = conn.execute( "SELECT COUNT(*) FROM users WHERE is_founder=1" ).fetchone()[0] # Partner-Ranking: nach uses absteigend partners = conn.execute( """SELECT pc.id, pc.label, pc.uses, pc.code, u.name AS partner_username, u.is_partner FROM partner_codes pc LEFT JOIN users u ON u.referred_by = -pc.id AND u.is_partner=1 WHERE pc.grants_founder=1 ORDER BY pc.uses DESC""" ).fetchall() # Erste 100 Gründer (für Galerie) founders = conn.execute( """SELECT u.name, u.founder_number, pc.label AS via_partner FROM users u LEFT JOIN partner_codes pc ON u.referred_by = -pc.id WHERE u.is_founder=1 AND u.founder_number IS NOT NULL ORDER BY u.founder_number ASC LIMIT 100""" ).fetchall() return { "total": total, "max": FOUNDER_MAX, "open": max(0, FOUNDER_MAX - total), "closed": total >= FOUNDER_MAX, "partners": [dict(p) for p in partners], "founders": [dict(f) for f in founders], } @router.get("/partner/codes/{code}/info") def partner_code_info(code: str): """Gibt zurück ob ein Partner-Code existiert und dessen Label (öffentlich).""" with db() as conn: row = conn.execute( """SELECT code, label, grants_founder, max_uses, uses FROM partner_codes WHERE code=?""", (code.strip().upper(),) ).fetchone() if not row: raise HTTPException(404, "Partner-Code nicht gefunden.") r = dict(row) if r["grants_founder"]: with db() as conn2: total = conn2.execute( "SELECT COUNT(*) FROM users WHERE is_founder=1" ).fetchone()[0] r["founder_slots_open"] = max(0, FOUNDER_MAX - total) r["redeemable"] = (r["max_uses"] is None or r["uses"] < r["max_uses"]) and total < FOUNDER_MAX else: r["founder_slots_open"] = None r["redeemable"] = r["max_uses"] is None or r["uses"] < r["max_uses"] return r