"""Security-Tests: require_owner, JWT-Blacklist, Login-Lockout. Diese Tests verifizieren die in Sprint60 eingefuehrten Security-Helper und stellen sicher, dass z. B. eine versehentliche Aenderung an require_owner / blacklist_jti hier sofort einen roten Test ergibt. """ from __future__ import annotations import secrets import time import pytest # ------------------------------------------------------------------ # Helper: zweiten frischen User registrieren (wie das `user`-Fixture) # ------------------------------------------------------------------ def _make_other_user(client) -> dict: """Registriert einen zweiten verifizierten User mit JWT-Token.""" email = f"other-{secrets.token_hex(4)}@example.com" pw = "OtherPass123!" name = f"other{secrets.token_hex(3)}" r = client.post("/api/auth/register", json={ "email": email, "password": pw, "name": name }) assert r.status_code == 200, r.text from database import db with db() as conn: conn.execute("UPDATE users SET email_verified=1 WHERE email=?", (email,)) r2 = client.post("/api/auth/login", json={"email": email, "password": pw}) assert r2.status_code == 200, r2.text token = r2.json()["token"] return { "email": email, "password": pw, "name": name, "token": token, "headers": {"Authorization": f"Bearer {token}"}, } # ================================================================== # 1) Owner-Check via require_owner — Places # ================================================================== class TestRequireOwnerPlaces: """places.py nutzt `require_owner` fuer PATCH/DELETE — wir testen den vollen Lebenszyklus mit zwei verschiedenen Usern.""" def _create_place(self, client, user) -> dict: r = client.post( "/api/places", headers=user["headers"], json={ "name": "Test-Cafe", "typ": "restaurant", "lat": 49.5, "lon": 11.0, "hund_rein": True, }, ) assert r.status_code == 201, r.text return r.json() def test_get_place_is_public(self, client, user): """places sind public — fremder User darf lesen.""" place = self._create_place(client, user) other = _make_other_user(client) r = client.get(f"/api/places/{place['id']}", headers=other["headers"]) assert r.status_code == 200 assert r.json()["id"] == place["id"] def test_patch_place_other_user_is_forbidden(self, client, user): """PATCH mit fremdem User -> 403 (require_owner greift).""" place = self._create_place(client, user) other = _make_other_user(client) r = client.patch( f"/api/places/{place['id']}", headers=other["headers"], json={"name": "Hijacked"}, ) assert r.status_code == 403, r.text # Owner kann immer noch patchen r2 = client.patch( f"/api/places/{place['id']}", headers=user["headers"], json={"name": "Cafe Updated"}, ) assert r2.status_code == 200 assert r2.json()["name"] == "Cafe Updated" def test_delete_place_other_user_is_forbidden(self, client, user): """DELETE mit fremdem User -> 403.""" place = self._create_place(client, user) other = _make_other_user(client) r = client.delete(f"/api/places/{place['id']}", headers=other["headers"]) assert r.status_code == 403, r.text # Owner kann loeschen r2 = client.delete(f"/api/places/{place['id']}", headers=user["headers"]) assert r2.status_code == 204 def test_patch_nonexistent_place_is_404(self, client, user): """require_owner wirft 404 wenn row None ist.""" r = client.patch( "/api/places/9999999", headers=user["headers"], json={"name": "Ghost"}, ) assert r.status_code == 404 # ================================================================== # 2) JWT-Blacklist — Logout invalidiert Token serverseitig # ================================================================== class TestJwtBlacklist: def test_logout_blacklists_bearer_token(self, client): """Nach Logout muss das gleiche Token bei /auth/me 401 ergeben.""" # Frischer User in diesem Test — nicht das `user`-Fixture verwenden, # weil andere Tests es weiter brauchen koennten. info = _make_other_user(client) # 1) Token funktioniert r = client.get("/api/auth/me", headers=info["headers"]) assert r.status_code == 200 # 2) Logout r2 = client.post("/api/auth/logout", headers=info["headers"]) assert r2.status_code == 200 assert r2.json()["ok"] is True # 3) Token nun blacklisted -> 401 r3 = client.get("/api/auth/me", headers=info["headers"]) assert r3.status_code == 401, ( f"Token wurde nach Logout NICHT blacklisted (status={r3.status_code})" ) def test_blacklist_entry_persisted_in_db(self, client): """Pruefen, dass das jti tatsaechlich in jwt_blacklist landet.""" info = _make_other_user(client) # jti aus Token extrahieren import jwt as _jwt payload = _jwt.decode( info["token"], options={"verify_signature": False} ) jti = payload.get("jti") assert jti, "Token enthaelt kein jti" # Logout client.post("/api/auth/logout", headers=info["headers"]) from database import db with db() as conn: row = conn.execute( "SELECT jti, expires_at FROM jwt_blacklist WHERE jti=?", (jti,) ).fetchone() assert row is not None, "jwt_blacklist-Eintrag fehlt nach Logout" assert row["jti"] == jti # ================================================================== # 3) Login-Lockout — 5 Fehlversuche -> 429 mit Retry-After # ================================================================== class TestLoginLockout: """In conftest.py wird der Lockout fuer die Test-Session global deaktiviert (sonst wuerden Auth-Tests sich gegenseitig blocken). Hier aktivieren wir ihn fuer einzelne Tests gezielt zurueck.""" def _enable_lockout(self, monkeypatch): """Aktiviert die Lockout-Logik fuer einen einzelnen Test wieder. In conftest.py werden routes.auth._db_is_account_locked und _db_record_login_failure global gestubbt (damit Register-Spam durch andere Tests nicht zur Session-weiten Sperre fuehrt). Hier definieren wir die echten Implementierungen lokal neu (Kopie aus routes/auth.py) und setzen sie per monkeypatch zurueck — das wird nach dem Test automatisch revertiert. """ import routes.auth as _ra from database import db from datetime import datetime, timedelta, timezone # Counter leeren — sonst beeinflussen vorherige Tests die Sperre. with db() as conn: conn.execute("DELETE FROM login_attempts") _LOCKOUT_WINDOW_MIN = 15 _LOCKOUT_ATTEMPTS_MAX = 5 def _is_locked(email: str): with db() as conn: row = conn.execute( "SELECT locked_until FROM login_attempts WHERE email=? COLLATE NOCASE", (email,) ).fetchone() if not row or not row["locked_until"]: return None try: locked_until = datetime.fromisoformat(row["locked_until"]) except Exception: return None now = datetime.now(timezone.utc) if locked_until.tzinfo is None: locked_until = locked_until.replace(tzinfo=timezone.utc) if locked_until <= now: return None return int((locked_until - now).total_seconds()) def _record(email: str): now = datetime.now(timezone.utc) window_start = now - timedelta(minutes=_LOCKOUT_WINDOW_MIN) with db() as conn: row = conn.execute( "SELECT attempts, last_attempt FROM login_attempts WHERE email=? COLLATE NOCASE", (email,) ).fetchone() if row: try: last = datetime.fromisoformat(row["last_attempt"]) if last.tzinfo is None: last = last.replace(tzinfo=timezone.utc) except Exception: last = now attempts = (row["attempts"] + 1) if last >= window_start else 1 else: attempts = 1 locked_until = None if attempts >= _LOCKOUT_ATTEMPTS_MAX: locked_until = (now + timedelta(minutes=_LOCKOUT_WINDOW_MIN)).isoformat() conn.execute( """INSERT INTO login_attempts (email, attempts, last_attempt, locked_until) VALUES (?,?,?,?) ON CONFLICT(email) DO UPDATE SET attempts=excluded.attempts, last_attempt=excluded.last_attempt, locked_until=excluded.locked_until""", (email.lower(), attempts, now.isoformat(), locked_until) ) monkeypatch.setattr(_ra, "_db_is_account_locked", _is_locked) monkeypatch.setattr(_ra, "_db_record_login_failure", _record) def test_lockout_after_five_failed_logins(self, client, monkeypatch): """5x falsches PW -> 6. Versuch ergibt 429 mit Retry-After.""" # Eigenen User anlegen, damit andere Tests das Lockout-Limit nicht # zufaellig schon erreicht haben. info = _make_other_user(client) self._enable_lockout(monkeypatch) # 5 Fehlversuche for i in range(5): r = client.post("/api/auth/login", json={ "email": info["email"], "password": "WRONG-PW!" }) assert r.status_code == 401, ( f"Versuch {i+1}: erwartete 401, bekam {r.status_code}" ) # 6. Versuch: jetzt gelockt r6 = client.post("/api/auth/login", json={ "email": info["email"], "password": "WRONG-PW!" }) assert r6.status_code == 429, ( f"Erwartete 429 nach 5 Fehlversuchen, bekam {r6.status_code}" ) assert "Retry-After" in r6.headers, "Retry-After-Header fehlt bei Lockout" def test_lockout_writes_to_login_attempts_table(self, client, monkeypatch): """login_attempts-Eintrag muss locked_until enthalten.""" info = _make_other_user(client) self._enable_lockout(monkeypatch) for _ in range(5): client.post("/api/auth/login", json={ "email": info["email"], "password": "WRONG-PW!" }) from database import db with db() as conn: row = conn.execute( "SELECT attempts, locked_until FROM login_attempts WHERE email=? COLLATE NOCASE", (info["email"],) ).fetchone() assert row is not None, "Kein login_attempts-Eintrag angelegt" assert row["attempts"] >= 5 assert row["locked_until"] is not None, "locked_until nicht gesetzt" # ================================================================== # 4) Rate-Limit auf /auth/login (Brute-Force-Schutz auf IP-Ebene) # ================================================================== @pytest.mark.xfail( reason="rl_check ist in conftest fuer alle Tests gestubbt — Rate-Limit " "laesst sich pro Test nicht selektiv reaktivieren ohne andere " "parallele Tests zu beeintraechtigen." ) def test_login_rate_limit_blocks_burst(client): """20+ schnelle Logins -> 429 vom Rate-Limiter.""" info = _make_other_user(client) statuses = [] for _ in range(25): r = client.post("/api/auth/login", json={ "email": info["email"], "password": "WRONG-PW!" }) statuses.append(r.status_code) assert 429 in statuses, f"Kein Rate-Limit-Treffer in {statuses}"