banyaro/backend/routes/partner.py
rene 970480c1d6 QR-Stats: Registrierungen (bestätigt) vs. Versuche (unbestätigt) + Account-Detail-Liste
Rene: Statistik zählte alles in einen Topf (3 statt 2) und zeigte nicht,
WER sich registriert hat. Jetzt:
- registrations = email_verified=1, attempts = unbestätigte Versuche —
  Versuche werden bei späterer Bestätigung automatisch zu Registrierungen
- Admin: 👥-Button pro Kontingent klappt Account-Liste auf (Name, E-Mail,
  Datum, ✓ bestätigt/ Versuch, Sticker-Nr #seq) — lazy geladen, admin-only
  (personenbezogene Daten); Partner sehen weiter nur Zahlen (Registr. +N)
- Test deckt Versuch→Bestätigung-Übergang und Detail-Endpoint ab
2026-06-07 18:43:18 +02:00

792 lines
31 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""BAN YARO — Partner-Codes + Gründer-Lizenz + Partner-Profile (Showcase)"""
import asyncio
import json
import os
import uuid
from typing import Optional
from fastapi import APIRouter, HTTPException, Depends, UploadFile, File
from pydantic import BaseModel, Field
from database import db
from auth import require_admin, get_current_user
from config import MEDIA_DIR
from media_utils import validate_upload, convert_media, safe_media_path
router = APIRouter()
class PartnerCodeCreate(BaseModel):
code: str = Field(..., min_length=1, max_length=50)
label: str = Field(..., min_length=1, max_length=200)
grants_founder: int = 1
max_uses: Optional[int] = None
class GrantRequest(BaseModel):
is_founder: Optional[int] = None
is_partner: Optional[int] = None
# ------------------------------------------------------------------
# Admin: Partner-Codes verwalten
# ------------------------------------------------------------------
@router.get("/admin/partner/codes")
def list_partner_codes(user=Depends(require_admin)):
"""Alle Partner-Codes mit Stats (admin only)."""
with db() as conn:
rows = conn.execute(
"""SELECT pc.id, pc.code, pc.label, pc.grants_founder,
pc.max_uses, pc.uses, pc.created_at, pc.owner_user_id,
u.name AS created_by_name,
o.name AS owner_name
FROM partner_codes pc
LEFT JOIN users u ON u.id = pc.created_by
LEFT JOIN users o ON o.id = pc.owner_user_id
ORDER BY pc.created_at DESC"""
).fetchall()
return [dict(r) for r in rows]
@router.post("/admin/partner/codes", status_code=201)
def create_partner_code(data: PartnerCodeCreate, user=Depends(require_admin)):
"""Neuen Partner-Code erstellen (admin only)."""
code = data.code.strip().upper()
if not code:
raise HTTPException(400, "Code darf nicht leer sein.")
with db() as conn:
existing = conn.execute(
"SELECT id FROM partner_codes WHERE code=?", (code,)
).fetchone()
if existing:
raise HTTPException(400, "Dieser Code existiert bereits.")
conn.execute(
"""INSERT INTO partner_codes (code, label, grants_founder, max_uses, created_by)
VALUES (?, ?, ?, ?, ?)""",
(code, data.label.strip(), data.grants_founder, data.max_uses, user["id"])
)
row = conn.execute(
"SELECT * FROM partner_codes WHERE code=?", (code,)
).fetchone()
return dict(row)
@router.delete("/admin/partner/codes/{code_id}", status_code=204)
def delete_partner_code(code_id: int, user=Depends(require_admin)):
"""Partner-Code löschen (admin only)."""
with db() as conn:
existing = conn.execute(
"SELECT id FROM partner_codes WHERE id=?", (code_id,)
).fetchone()
if not existing:
raise HTTPException(404, "Partner-Code nicht gefunden.")
conn.execute("DELETE FROM partner_codes WHERE id=?", (code_id,))
return None
@router.post("/admin/partner/users/{user_id}/grant")
def grant_user_status(user_id: int, data: GrantRequest, user=Depends(require_admin)):
"""Founder- und/oder Partner-Status für einen User setzen (admin only)."""
updates = {}
if data.is_founder is not None:
updates["is_founder"] = data.is_founder
if data.is_partner is not None:
updates["is_partner"] = data.is_partner
if not updates:
raise HTTPException(400, "Mindestens is_founder oder is_partner muss angegeben werden.")
with db() as conn:
target = conn.execute(
"SELECT id, is_founder, founder_number FROM users WHERE id=?", (user_id,)
).fetchone()
if not target:
raise HTTPException(404, "User nicht gefunden.")
if updates.get("is_founder") == 1 and not target["founder_number"]:
# Atomare Gründer-Vergabe — kein TOCTOU mehr zwischen COUNT und UPDATE.
# Sub-Query wird gegen Snapshot vor dem UPDATE evaluiert (SQL-Spec).
cur = conn.execute(
"""UPDATE users
SET is_founder = 1,
founder_number = (
SELECT IFNULL(MAX(founder_number), 0) + 1
FROM users WHERE is_founder = 1
)
WHERE id = ?
AND (SELECT COUNT(*) FROM users WHERE is_founder = 1) < ?
AND (is_founder IS NULL OR is_founder = 0)""",
(user_id, FOUNDER_MAX)
)
if cur.rowcount == 0:
raise HTTPException(400, f"Alle {FOUNDER_MAX} Gründer-Plätze sind vergeben.")
# is_founder + founder_number sind atomar gesetzt — aus updates entfernen
updates.pop("is_founder", None)
updates.pop("founder_number", None)
elif updates.get("is_founder") == 0:
# Gründer-Status entfernen → founder_number ebenfalls leeren
updates["founder_number"] = None
if updates: # nach atomarer Founder-Vergabe ggf. leer
set_clause = ", ".join(f"{k}=?" for k in updates)
conn.execute(
f"UPDATE users SET {set_clause} WHERE id=?",
(*updates.values(), user_id)
)
row = conn.execute(
"SELECT id, name, email, is_founder, is_partner, founder_number FROM users WHERE id=?",
(user_id,)
).fetchone()
return dict(row)
@router.get("/admin/users/search")
def search_users(q: str, user=Depends(require_admin)):
"""User-Suche für Admin (Name-Präfix, max. 10 Ergebnisse)."""
with db() as conn:
rows = conn.execute(
"""SELECT id, name, email, is_founder, is_partner, rolle
FROM users WHERE name LIKE ? COLLATE NOCASE
ORDER BY name LIMIT 10""",
(f"{q}%",)
).fetchall()
return [dict(r) for r in rows]
# ------------------------------------------------------------------
# Öffentlich: Gründer-Leaderboard + Code-Info
# ------------------------------------------------------------------
FOUNDER_MAX = 100
@router.get("/partner/founders/stats")
def founders_stats():
"""Öffentliches Gründer-Leaderboard: Slots, Partner-Ranking, Gründer-Liste."""
with db() as conn:
total = conn.execute(
"SELECT COUNT(*) FROM users WHERE is_founder=1"
).fetchone()[0]
# Partner-Ranking: nach uses absteigend
partners = conn.execute(
"""SELECT pc.id, pc.label, pc.uses, pc.code,
u.name AS partner_username, u.is_partner
FROM partner_codes pc
LEFT JOIN users u ON u.referred_by = -pc.id AND u.is_partner=1
WHERE pc.grants_founder=1
ORDER BY pc.uses DESC"""
).fetchall()
# Erste 100 Gründer (für Galerie)
founders = conn.execute(
"""SELECT u.name, u.founder_number,
pc.label AS via_partner
FROM users u
LEFT JOIN partner_codes pc ON u.referred_by = -pc.id
WHERE u.is_founder=1 AND u.founder_number IS NOT NULL
ORDER BY u.founder_number ASC
LIMIT 100"""
).fetchall()
return {
"total": total,
"max": FOUNDER_MAX,
"open": max(0, FOUNDER_MAX - total),
"closed": total >= FOUNDER_MAX,
"partners": [dict(p) for p in partners],
"founders": [dict(f) for f in founders],
}
@router.get("/partner/codes/{code}/info")
def partner_code_info(code: str):
"""Gibt zurück ob ein Partner-Code existiert und dessen Label (öffentlich)."""
with db() as conn:
row = conn.execute(
"""SELECT code, label, grants_founder, max_uses, uses
FROM partner_codes WHERE code=?""",
(code.strip().upper(),)
).fetchone()
if not row:
raise HTTPException(404, "Partner-Code nicht gefunden.")
r = dict(row)
if r["grants_founder"]:
with db() as conn2:
total = conn2.execute(
"SELECT COUNT(*) FROM users WHERE is_founder=1"
).fetchone()[0]
r["founder_slots_open"] = max(0, FOUNDER_MAX - total)
r["redeemable"] = (r["max_uses"] is None or r["uses"] < r["max_uses"]) and total < FOUNDER_MAX
else:
r["founder_slots_open"] = None
r["redeemable"] = r["max_uses"] is None or r["uses"] < r["max_uses"]
return r
# ------------------------------------------------------------------
# Partner-Profile — Self-Service-Editor + öffentlicher Showcase
# Frontend: partner-profil.js (Editor), partner.js (Showcase)
# ------------------------------------------------------------------
_PP_STORAGE_LIMIT_MB = 200 # Gesamt-Budget pro Partner (Frontend zeigt die Bar dazu)
_PP_MAX_PHOTOS = 6
_PP_LOGO_MAX_MB = 5
_PP_FILE_MAX_MB = 200 # pro Datei (Videos)
_PP_IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".webp", ".gif", ".heic", ".heif"}
_PP_VIDEO_EXTS = {".mp4", ".webm", ".mov", ".avi", ".m4v"}
def require_partner(user=Depends(get_current_user)):
if not (user.get("is_partner") or user.get("rolle") == "admin"):
raise HTTPException(403, "Nur für Partner.")
return user
def _pp_dir(user_id: int) -> str:
path = os.path.join(MEDIA_DIR, "partner", str(user_id))
os.makedirs(path, exist_ok=True)
return path
def _pp_storage_mb(user_id: int) -> float:
"""Belegter Speicher des Partners in MB (Logo + Fotos/Videos)."""
path = os.path.join(MEDIA_DIR, "partner", str(user_id))
if not os.path.isdir(path):
return 0.0
total = sum(
os.path.getsize(os.path.join(path, f))
for f in os.listdir(path)
if os.path.isfile(os.path.join(path, f))
)
return round(total / (1024 * 1024), 4)
def _pp_profile_dict(row) -> dict:
d = dict(row)
try:
d["photos"] = json.loads(d.pop("photos_json") or "[]")
except (ValueError, TypeError):
d["photos"] = []
return d
def _pp_get_or_empty(conn, user_id: int) -> dict:
row = conn.execute(
"SELECT * FROM partner_profiles WHERE user_id=?", (user_id,)
).fetchone()
return _pp_profile_dict(row) if row else {}
class PartnerProfileUpdate(BaseModel):
display_name: Optional[str] = Field(None, max_length=60)
tagline: Optional[str] = Field(None, max_length=80)
bio: Optional[str] = Field(None, max_length=500)
website: Optional[str] = Field(None, max_length=300)
instagram: Optional[str] = Field(None, max_length=100)
@router.get("/partner/my-profile")
def get_my_partner_profile(user=Depends(require_partner)):
with db() as conn:
profile = _pp_get_or_empty(conn, user["id"])
return {
"profile": profile,
"storage_mb": _pp_storage_mb(user["id"]),
"storage_limit_mb": _PP_STORAGE_LIMIT_MB,
}
@router.put("/partner/my-profile")
def update_my_partner_profile(data: PartnerProfileUpdate, user=Depends(require_partner)):
website = (data.website or "").strip()
if website and not website.startswith(("http://", "https://")):
website = "https://" + website
instagram = (data.instagram or "").strip().lstrip("@")
with db() as conn:
conn.execute(
"""INSERT INTO partner_profiles
(user_id, display_name, tagline, bio, website, instagram, updated_at)
VALUES (?,?,?,?,?,?, datetime('now'))
ON CONFLICT(user_id) DO UPDATE SET
display_name=excluded.display_name, tagline=excluded.tagline,
bio=excluded.bio, website=excluded.website,
instagram=excluded.instagram, updated_at=datetime('now')""",
(user["id"], (data.display_name or "").strip() or None,
(data.tagline or "").strip() or None, (data.bio or "").strip() or None,
website or None, ("@" + instagram) if instagram else None)
)
profile = _pp_get_or_empty(conn, user["id"])
return {"profile": profile}
@router.post("/partner/my-profile/logo")
async def upload_partner_logo(file: UploadFile = File(...), user=Depends(require_partner)):
raw = await file.read()
filename = file.filename or "logo.png"
ext = os.path.splitext(filename)[1].lower()
if ext not in _PP_IMAGE_EXTS:
raise HTTPException(400, "Nur Bilder (PNG, JPG, WebP) als Logo.")
if len(raw) > _PP_LOGO_MAX_MB * 1024 * 1024:
raise HTTPException(400, f"Logo zu groß (max. {_PP_LOGO_MAX_MB} MB).")
try:
validate_upload(raw, filename)
except ValueError as e:
raise HTTPException(400, str(e))
save_dir = _pp_dir(user["id"])
new_name = f"logo_{uuid.uuid4().hex[:8]}.webp"
new_path = os.path.join(save_dir, new_name)
loop = asyncio.get_event_loop()
# HEIC/HEIF (iPhone) zuerst nach JPEG wandeln — Pillow kann HEIC nicht ohne Opener
data, ext = await loop.run_in_executor(None, lambda: convert_media(raw, filename))
if ext in (".heic", ".heif"):
raise HTTPException(400, "HEIC-Bild konnte nicht konvertiert werden. Bitte als JPG/PNG exportieren.")
def _save():
import io
from PIL import Image, ImageOps
img = Image.open(io.BytesIO(data))
img = ImageOps.exif_transpose(img)
# Transparenz erhalten (Logos sind oft PNG mit Alpha)
img = img.convert("RGBA" if "A" in (img.mode or "") or img.mode == "P" else "RGB")
img.thumbnail((512, 512))
img.save(new_path, format="WEBP", quality=85)
try:
await loop.run_in_executor(None, _save)
except Exception:
raise HTTPException(400, "Bild konnte nicht verarbeitet werden.")
logo_url = f"/media/partner/{user['id']}/{new_name}"
with db() as conn:
old = conn.execute(
"SELECT logo_url FROM partner_profiles WHERE user_id=?", (user["id"],)
).fetchone()
conn.execute(
"""INSERT INTO partner_profiles (user_id, logo_url, updated_at)
VALUES (?,?, datetime('now'))
ON CONFLICT(user_id) DO UPDATE SET
logo_url=excluded.logo_url, updated_at=datetime('now')""",
(user["id"], logo_url)
)
# Altes Logo vom Datenträger räumen
if old and old["logo_url"]:
old_path = safe_media_path(MEDIA_DIR, old["logo_url"])
if old_path and os.path.isfile(old_path):
try:
os.unlink(old_path)
except OSError:
pass
return {"logo_url": logo_url}
@router.post("/partner/my-profile/photos")
async def upload_partner_photo(file: UploadFile = File(...), user=Depends(require_partner)):
raw = await file.read()
filename = file.filename or "upload.jpg"
ext = os.path.splitext(filename)[1].lower()
if ext not in _PP_IMAGE_EXTS | _PP_VIDEO_EXTS:
raise HTTPException(400, "Nur Bilder (JPG, PNG, HEIC) oder Videos (MP4, MOV).")
if len(raw) > _PP_FILE_MAX_MB * 1024 * 1024:
raise HTTPException(400, f"Datei zu groß (max. {_PP_FILE_MAX_MB} MB).")
used_mb = _pp_storage_mb(user["id"])
if used_mb + len(raw) / (1024 * 1024) > _PP_STORAGE_LIMIT_MB:
raise HTTPException(400, f"Speicherlimit erreicht ({_PP_STORAGE_LIMIT_MB} MB). Bitte zuerst Dateien löschen.")
try:
validate_upload(raw, filename)
except ValueError as e:
raise HTTPException(400, str(e))
with db() as conn:
profile = _pp_get_or_empty(conn, user["id"])
photos = profile.get("photos", [])
if len(photos) >= _PP_MAX_PHOTOS:
raise HTTPException(400, f"Maximal {_PP_MAX_PHOTOS} Fotos/Videos.")
loop = asyncio.get_event_loop()
# HEIC→JPEG bzw. MOV/AVI→MP4 (ffmpeg, komprimiert) — blockierend, daher Threadpool
data, ext = await loop.run_in_executor(None, lambda: convert_media(raw, filename))
if ext in (".heic", ".heif"):
raise HTTPException(400, "HEIC-Bild konnte nicht konvertiert werden. Bitte als JPG/PNG exportieren.")
if ext in (".mov", ".avi", ".m4v"):
# ffmpeg-Konvertierung fehlgeschlagen — unkonvertiert wäre es im Browser nicht abspielbar
raise HTTPException(400, "Video konnte nicht konvertiert werden. Bitte als MP4 hochladen.")
save_dir = _pp_dir(user["id"])
file_id = uuid.uuid4().hex[:12]
if ext in _PP_VIDEO_EXTS:
new_name = f"media_{file_id}{ext}"
new_path = os.path.join(save_dir, new_name)
def _save_video():
with open(new_path, "wb") as f:
f.write(data)
await loop.run_in_executor(None, _save_video)
else:
new_name = f"media_{file_id}.webp"
new_path = os.path.join(save_dir, new_name)
def _save_image():
import io
from PIL import Image, ImageOps
img = Image.open(io.BytesIO(data))
img = ImageOps.exif_transpose(img)
img = img.convert("RGB")
img.thumbnail((1600, 1600))
img.save(new_path, format="WEBP", quality=85)
try:
await loop.run_in_executor(None, _save_image)
except Exception:
raise HTTPException(400, "Bild konnte nicht verarbeitet werden.")
photos.append(f"/media/partner/{user['id']}/{new_name}")
with db() as conn:
conn.execute(
"""INSERT INTO partner_profiles (user_id, photos_json, updated_at)
VALUES (?,?, datetime('now'))
ON CONFLICT(user_id) DO UPDATE SET
photos_json=excluded.photos_json, updated_at=datetime('now')""",
(user["id"], json.dumps(photos))
)
return {"photos": photos}
@router.post("/partner/my-profile/photos/{idx}/delete")
def delete_partner_photo(idx: int, user=Depends(require_partner)):
with db() as conn:
profile = _pp_get_or_empty(conn, user["id"])
photos = profile.get("photos", [])
if not (0 <= idx < len(photos)):
raise HTTPException(404, "Foto nicht gefunden.")
url = photos.pop(idx)
conn.execute(
"UPDATE partner_profiles SET photos_json=?, updated_at=datetime('now') WHERE user_id=?",
(json.dumps(photos), user["id"])
)
path = safe_media_path(MEDIA_DIR, url)
if path and os.path.isfile(path):
try:
os.unlink(path)
except OSError:
pass
return {"photos": photos}
@router.post("/partner/my-profile/submit")
def submit_partner_profile(user=Depends(require_partner)):
with db() as conn:
profile = _pp_get_or_empty(conn, user["id"])
if not profile.get("display_name"):
raise HTTPException(400, "Bitte zuerst einen Anzeigenamen speichern.")
# Abgelehnt → erneutes Einreichen setzt zurück auf 'in Prüfung'
conn.execute(
"""UPDATE partner_profiles
SET submitted_at=datetime('now'),
approved=CASE WHEN approved=1 THEN 1 ELSE 0 END,
updated_at=datetime('now')
WHERE user_id=?""",
(user["id"],)
)
profile = _pp_get_or_empty(conn, user["id"])
# Admin benachrichtigen — Fehler landen in failed_emails (Admin-Retry), kein Silent-Skip
admin_email = os.getenv("ADMIN_EMAIL", "")
if admin_email and profile.get("approved") != 1:
subject = f"[Ban Yaro] Partner-Profil eingereicht: {profile.get('display_name')}"
body = (f"Partner {user['name']} ({user['email']}) hat sein Profil zur "
f"Freigabe eingereicht.\n\nAdmin-Panel: https://banyaro.app/#admin")
try:
from routes.outreach import _send_smtp
_send_smtp(admin_email, subject, body, "support")
except Exception as exc:
from routes.auth import _log_smtp_failure
_log_smtp_failure(admin_email, subject, body, exc, context="partner_profile_submit")
return {"profile": profile}
@router.get("/partners/public")
def list_public_partners():
"""Freigegebene Partner-Profile für die öffentliche Partner-Seite."""
with db() as conn:
rows = conn.execute(
"""SELECT pp.user_id, pp.display_name, pp.tagline, pp.bio, pp.website,
pp.instagram, pp.logo_url, pp.photos_json,
u.name, u.avatar_url
FROM partner_profiles pp
JOIN users u ON u.id = pp.user_id
WHERE pp.approved=1 AND u.is_partner=1
ORDER BY pp.submitted_at ASC"""
).fetchall()
return {"partners": [_pp_profile_dict(r) for r in rows]}
# ---- Admin: Freigabe-Workflow ------------------------------------
class PartnerProfileReview(BaseModel):
approved: int = Field(..., ge=-1, le=1)
@router.get("/admin/partner/profiles")
def list_partner_profiles(user=Depends(require_admin)):
"""Alle Partner-Profile mit Status für den Admin-Tab."""
with db() as conn:
rows = conn.execute(
"""SELECT pp.*, u.name, u.email
FROM partner_profiles pp
JOIN users u ON u.id = pp.user_id
ORDER BY CASE WHEN pp.submitted_at IS NOT NULL AND pp.approved=0 THEN 0 ELSE 1 END,
pp.updated_at DESC"""
).fetchall()
return [_pp_profile_dict(r) for r in rows]
@router.post("/admin/partner/profiles/{user_id}/review")
def review_partner_profile(user_id: int, data: PartnerProfileReview, user=Depends(require_admin)):
with db() as conn:
row = conn.execute(
"SELECT user_id FROM partner_profiles WHERE user_id=?", (user_id,)
).fetchone()
if not row:
raise HTTPException(404, "Partner-Profil nicht gefunden.")
conn.execute(
"UPDATE partner_profiles SET approved=?, updated_at=datetime('now') WHERE user_id=?",
(data.approved, user_id)
)
profile = _pp_get_or_empty(conn, user_id)
return {"profile": profile}
# ------------------------------------------------------------------
# QR-Kontingente — gedruckte Sticker/Flyer mit Scan- und
# Registrierungs-Rückverfolgung pro Einzelcode und Kontingent.
# Bestellung: Admin legt Kontingent für einen Partner-Code an.
# Übergabe: PDF-Download (Admin + Partner im eigenen Profil).
# ------------------------------------------------------------------
_QR_MAX_QUANTITY = 500
# Ohne verwechselbare Zeichen (0/O, 1/l/I) — Tokens landen gedruckt auf Stickern
_QR_ALPHABET = "abcdefghjkmnpqrstuvwxyzABCDEFGHJKMNPQRSTUVWXYZ23456789"
_QR_BASE_URL = os.getenv("APP_URL", "https://banyaro.app")
def _qr_new_token(conn) -> str:
import secrets
for _ in range(20):
token = "".join(secrets.choice(_QR_ALPHABET) for _ in range(8))
if not conn.execute(
"SELECT 1 FROM partner_qr_codes WHERE token=?", (token,)
).fetchone():
return token
raise HTTPException(500, "Token-Generierung fehlgeschlagen.")
def _qr_batch_stats(conn, batch_id: int) -> dict:
"""Registrierungen = E-Mail bestätigt; Versuche = registriert, aber (noch) unbestätigt."""
row = conn.execute(
"""SELECT COUNT(*) AS codes, COALESCE(SUM(q.scans),0) AS scans,
(SELECT COUNT(*) FROM users u
JOIN partner_qr_codes q2 ON q2.token = u.referred_qr
WHERE q2.batch_id = ? AND u.email_verified = 1) AS registrations,
(SELECT COUNT(*) FROM users u
JOIN partner_qr_codes q2 ON q2.token = u.referred_qr
WHERE q2.batch_id = ? AND u.email_verified = 0) AS attempts
FROM partner_qr_codes q WHERE q.batch_id = ?""",
(batch_id, batch_id, batch_id)
).fetchone()
return dict(row)
def _qr_list_batches(conn, where_sql: str, params: tuple) -> list:
rows = conn.execute(
f"""SELECT b.id, b.label, b.quantity, b.created_at,
pc.code, pc.label AS code_label, pc.id AS partner_code_id
FROM partner_qr_batches b
JOIN partner_codes pc ON pc.id = b.partner_code_id
{where_sql}
ORDER BY b.created_at DESC""",
params
).fetchall()
result = []
for r in rows:
d = dict(r)
d.update(_qr_batch_stats(conn, r["id"]))
result.append(d)
return result
def _qr_batch_pdf(conn, batch_id: int) -> bytes:
"""Druckfertiges A4-PDF: 3×4 QR-Codes pro Seite mit Kurz-URL + laufender Nummer."""
import io as _io
import segno
from fpdf import FPDF
batch = conn.execute(
"""SELECT b.id, b.label, b.quantity, pc.code, pc.label AS code_label
FROM partner_qr_batches b
JOIN partner_codes pc ON pc.id = b.partner_code_id
WHERE b.id=?""",
(batch_id,)
).fetchone()
if not batch:
raise HTTPException(404, "Kontingent nicht gefunden.")
codes = conn.execute(
"SELECT token, seq FROM partner_qr_codes WHERE batch_id=? ORDER BY seq",
(batch_id,)
).fetchall()
pdf = FPDF(format="A4")
pdf.set_auto_page_break(False)
pdf.set_title(f"Ban Yaro QR-Kontingent — {batch['label']}")
COLS, ROWS = 3, 4
CELL_W, CELL_H = 60, 64 # mm — Zelle inkl. Beschriftung
MARGIN_X = (210 - COLS * CELL_W) / 2
MARGIN_Y = 18
QR_SIZE = 42 # mm
def _latin1(s: str) -> str:
return s.encode("latin-1", "replace").decode("latin-1")
for i, c in enumerate(codes):
pos = i % (COLS * ROWS)
if pos == 0:
pdf.add_page()
pdf.set_font("Helvetica", "B", 11)
pdf.set_text_color(60)
pdf.cell(0, 6, _latin1(f"Ban Yaro — {batch['code_label']} · Kontingent: {batch['label']} ({batch['quantity']} Stk.)"),
align="C", new_x="LMARGIN", new_y="NEXT")
col, row_ = pos % COLS, pos // COLS
x = MARGIN_X + col * CELL_W
y = MARGIN_Y + 10 + row_ * CELL_H
url = f"{_QR_BASE_URL}/q/{c['token']}"
buf = _io.BytesIO()
segno.make(url, error="m").save(buf, kind="png", scale=8, border=2)
buf.seek(0)
pdf.image(buf, x=x + (CELL_W - QR_SIZE) / 2, y=y, w=QR_SIZE, h=QR_SIZE)
pdf.set_xy(x, y + QR_SIZE + 1)
pdf.set_font("Helvetica", "", 8)
pdf.set_text_color(90)
pdf.cell(CELL_W, 4, _latin1(f"banyaro.app/q/{c['token']}"), align="C", new_x="LEFT", new_y="NEXT")
pdf.set_x(x)
pdf.set_font("Helvetica", "B", 8)
pdf.cell(CELL_W, 4, f"#{c['seq']}", align="C")
return bytes(pdf.output())
class QrBatchCreate(BaseModel):
label: str = Field(..., min_length=1, max_length=100)
quantity: int = Field(..., ge=1, le=_QR_MAX_QUANTITY)
@router.post("/admin/partner/codes/{code_id}/qr-batches", status_code=201)
def create_qr_batch(code_id: int, data: QrBatchCreate, user=Depends(require_admin)):
"""Bestellung: neues QR-Kontingent für einen Partner-Code anlegen."""
with db() as conn:
code = conn.execute(
"SELECT id FROM partner_codes WHERE id=?", (code_id,)
).fetchone()
if not code:
raise HTTPException(404, "Partner-Code nicht gefunden.")
conn.execute(
"INSERT INTO partner_qr_batches (partner_code_id, label, quantity, created_by) VALUES (?,?,?,?)",
(code_id, data.label.strip(), data.quantity, user["id"])
)
batch_id = conn.execute("SELECT last_insert_rowid()").fetchone()[0]
for seq in range(1, data.quantity + 1):
conn.execute(
"INSERT INTO partner_qr_codes (batch_id, token, seq) VALUES (?,?,?)",
(batch_id, _qr_new_token(conn), seq)
)
batches = _qr_list_batches(conn, "WHERE b.id=?", (batch_id,))
return batches[0]
@router.get("/admin/partner/qr-batches")
def list_qr_batches(user=Depends(require_admin)):
"""Alle QR-Kontingente mit Stats (Scans, Registrierungen)."""
with db() as conn:
return _qr_list_batches(conn, "", ())
@router.delete("/admin/partner/qr-batches/{batch_id}", status_code=204)
def delete_qr_batch(batch_id: int, user=Depends(require_admin)):
"""Kontingent löschen (z. B. Fehlbestellung) — Codes via CASCADE mit weg."""
with db() as conn:
if not conn.execute(
"SELECT id FROM partner_qr_batches WHERE id=?", (batch_id,)
).fetchone():
raise HTTPException(404, "Kontingent nicht gefunden.")
conn.execute("DELETE FROM partner_qr_batches WHERE id=?", (batch_id,))
return None
@router.get("/admin/partner/qr-batches/{batch_id}/registrations")
def qr_batch_registrations(batch_id: int, user=Depends(require_admin)):
"""Accounts, die über dieses Kontingent kamen — inkl. unbestätigter Versuche.
Admin-only (personenbezogene Daten)."""
with db() as conn:
if not conn.execute(
"SELECT id FROM partner_qr_batches WHERE id=?", (batch_id,)
).fetchone():
raise HTTPException(404, "Kontingent nicht gefunden.")
rows = conn.execute(
"""SELECT u.id, u.name, u.email, u.email_verified, u.created_at,
q.seq, q.token
FROM users u
JOIN partner_qr_codes q ON q.token = u.referred_qr
WHERE q.batch_id = ?
ORDER BY u.created_at DESC""",
(batch_id,)
).fetchall()
return [dict(r) for r in rows]
@router.get("/admin/partner/qr-batches/{batch_id}/pdf")
def qr_batch_pdf_admin(batch_id: int, user=Depends(require_admin)):
from fastapi.responses import Response
with db() as conn:
pdf = _qr_batch_pdf(conn, batch_id)
return Response(content=pdf, media_type="application/pdf",
headers={"Content-Disposition": f'attachment; filename="banyaro-qr-{batch_id}.pdf"'})
@router.get("/partner/my-qr")
def my_qr_batches(user=Depends(require_partner)):
"""Übergabe/Self-Service: eigene Kontingente mit Stats (Code-Besitzer)."""
with db() as conn:
return _qr_list_batches(
conn, "WHERE pc.owner_user_id = ?", (user["id"],)
)
@router.get("/partner/my-qr/{batch_id}/pdf")
def qr_batch_pdf_partner(batch_id: int, user=Depends(require_partner)):
from fastapi.responses import Response
with db() as conn:
own = conn.execute(
"""SELECT b.id FROM partner_qr_batches b
JOIN partner_codes pc ON pc.id = b.partner_code_id
WHERE b.id=? AND pc.owner_user_id=?""",
(batch_id, user["id"])
).fetchone()
if not own and user.get("rolle") != "admin":
raise HTTPException(403, "Kein Zugriff auf dieses Kontingent.")
pdf = _qr_batch_pdf(conn, batch_id)
return Response(content=pdf, media_type="application/pdf",
headers={"Content-Disposition": f'attachment; filename="banyaro-qr-{batch_id}.pdf"'})
class CodeOwnerSet(BaseModel):
user_id: int
@router.post("/admin/partner/codes/{code_id}/owner")
def set_code_owner(code_id: int, data: CodeOwnerSet, user=Depends(require_admin)):
"""Partner-Code einem User zuordnen (für Self-Service-QR-Zugriff)."""
with db() as conn:
if not conn.execute("SELECT id FROM partner_codes WHERE id=?", (code_id,)).fetchone():
raise HTTPException(404, "Partner-Code nicht gefunden.")
if not conn.execute("SELECT id FROM users WHERE id=?", (data.user_id,)).fetchone():
raise HTTPException(404, "User nicht gefunden.")
conn.execute(
"UPDATE partner_codes SET owner_user_id=? WHERE id=?",
(data.user_id, code_id)
)
return {"ok": True}