"""BAN YARO — Auth Routes""" import os import secrets import string from typing import Optional from fastapi import APIRouter, HTTPException, Response, Depends from pydantic import BaseModel, EmailStr from database import db from auth import ( hash_password, verify_password, create_token, get_current_user ) from username_blocklist import is_username_blocked router = APIRouter() COOKIE_NAME = "by_token" class LoginRequest(BaseModel): email: EmailStr password: str class RegisterRequest(BaseModel): email: EmailStr password: str name: str ref_code: Optional[str] = None def _gen_referral_code() -> str: alphabet = string.ascii_uppercase + string.digits return ''.join(secrets.choice(alphabet) for _ in range(8)) def _set_cookie(response: Response, token: str): response.set_cookie( key=COOKIE_NAME, value=token, httponly=True, secure=True, samesite="lax", max_age=30 * 24 * 3600 ) @router.post("/register") async def register(data: RegisterRequest, response: Response): name = data.name.strip() if len(name) < 2: raise HTTPException(400, "Benutzername muss mindestens 2 Zeichen lang sein.") if len(name) > 40: raise HTTPException(400, "Benutzername darf maximal 40 Zeichen lang sein.") if ' ' in name: raise HTTPException(400, "Benutzername darf keine Leerzeichen enthalten.") if is_username_blocked(name): raise HTTPException(400, "Dieser Benutzername ist nicht erlaubt.") with db() as conn: if conn.execute("SELECT 1 FROM users WHERE email=?", (data.email,)).fetchone(): raise HTTPException(400, "E-Mail bereits registriert.") if conn.execute( "SELECT 1 FROM users WHERE name=? COLLATE NOCASE", (name,) ).fetchone(): raise HTTPException(400, "Dieser Name ist bereits vergeben. Bitte wähle einen anderen.") code = _gen_referral_code() try: conn.execute( "INSERT INTO users (email, pw_hash, name, referral_code) VALUES (?,?,?,?)", (data.email, hash_password(data.password), name, code) ) except Exception: # Fallback falls UNIQUE-Index greift (Race Condition) raise HTTPException(400, "Dieser Name ist bereits vergeben. Bitte wähle einen anderen.") user = conn.execute( "SELECT id, rolle FROM users WHERE email=?", (data.email,) ).fetchone() new_user_id = user["id"] if data.ref_code: referrer = conn.execute( "SELECT id FROM users WHERE referral_code=? AND id != ?", (data.ref_code.strip().upper(), new_user_id) ).fetchone() if referrer: conn.execute("UPDATE users SET referred_by=? WHERE id=?", (referrer['id'], new_user_id)) token = create_token(user["id"], user["rolle"]) _set_cookie(response, token) return {"token": token, "name": name} @router.post("/login") async def login(data: LoginRequest, response: Response): with db() as conn: user = conn.execute( "SELECT id, pw_hash, name, rolle, is_premium FROM users WHERE email=?", (data.email,) ).fetchone() if not user or not verify_password(data.password, user["pw_hash"]): raise HTTPException(401, "E-Mail oder Passwort falsch.") token = create_token(user["id"], user["rolle"]) _set_cookie(response, token) with db() as conn: conn.execute( "UPDATE users SET last_login=datetime('now') WHERE id=?", (user["id"],) ) return {"token": token, "name": user["name"], "is_premium": bool(user["is_premium"])} @router.post("/logout") async def logout(response: Response): response.delete_cookie(COOKIE_NAME) return {"ok": True} @router.get("/referral") async def get_referral_info(user=Depends(get_current_user)): with db() as conn: row = conn.execute( """SELECT referral_code, COALESCE((SELECT COUNT(*) FROM users WHERE referred_by=?), 0) AS count FROM users WHERE id=?""", (user['id'], user['id']) ).fetchone() code = row["referral_code"] if row else None if not code: code = _gen_referral_code() conn.execute("UPDATE users SET referral_code=? WHERE id=?", (code, user['id'])) base = os.getenv("APP_URL", "https://banyaro.app") return { "code": code, "count": row["count"] if row else 0, "link": f"{base}/?ref={code}", } @router.get("/me") async def me(user=Depends(get_current_user)): with db() as conn: row = conn.execute( """SELECT id, name, real_name, email, rolle, is_premium, email_verified, bio, wohnort, erfahrung, social_link, profil_sichtbarkeit, avatar_url, created_at FROM users WHERE id=?""", (user["id"],) ).fetchone() if not row: raise HTTPException(404, "User nicht gefunden.") data = dict(row) data["is_premium"] = bool(data["is_premium"]) return data