banyaro/tests/test_security.py
rene 1ff66a7083 Sicherheit + Tests + A11y, SW by-v1118
PYDANTIC max_length (38 Routen, ~400 Field-Constraints):
Schützt vor DoS durch Riesen-Payloads (10MB Thread-Titel etc.).
Pragmatische Limits:
- Titel/Name: 200 · Beschreibung/Body: 10000 · Notiz: 5000
- Email: 254 (RFC 5321) · URL: 500 · Slug/Kategorie: 100
- Hund-Name/Rasse: 80 · Hund-Bio: 2000

Top-betroffen: forum.py, diary.py, health.py, dogs.py, expenses.py,
notes.py, auth.py, profile.py. Manuelle len()-Checks in profile,
chat, ki entfernt (jetzt durch Field abgedeckt).

PYTEST COVERAGE (+19 Tests, 37 grün + 1 xfail):
- test_security.py: require_owner (Places GET/PATCH/DELETE mit
  Fremduser → 403), JWT-Blacklist (Logout invalidiert Token),
  Login-Lockout (5 Fehlversuche → 429 + Retry-After Header)
- test_race.py: Invoice-Counter (20 parallele Threads, alle unique),
  Founder-Number (atomare Vergabe, voll bei 100)
- test_validation.py: Forum-Titel 30k Zeichen → 422, Diary-Text
  50k → 422 (verifiziert Pydantic max_length-Sweep)

A11Y (Tap-Targets ≥44×44 + Dark-Mode-Kontrast):
- #header-user-btn 36→44px, .header-back 40→44, .header-menu-btn 40→44
- dog-profile Wrapped-Slider Prev/Next 40→44
- forum-Lightbox Close 40→44
- --c-text-muted Light: #B0A090 (2.37:1 FAIL) → #7F6B58 (4.74:1 PASS)
- --c-text-muted Dark:  #806A58 (3.58:1 FAIL) → #A08878 (5.46:1 PASS)
- Branding-Farben unangetastet
2026-05-27 13:40:30 +02:00

315 lines
12 KiB
Python

"""Security-Tests: require_owner, JWT-Blacklist, Login-Lockout.
Diese Tests verifizieren die in Sprint60 eingefuehrten Security-Helper
und stellen sicher, dass z. B. eine versehentliche Aenderung an
require_owner / blacklist_jti hier sofort einen roten Test ergibt.
"""
from __future__ import annotations
import secrets
import time
import pytest
# ------------------------------------------------------------------
# Helper: zweiten frischen User registrieren (wie das `user`-Fixture)
# ------------------------------------------------------------------
def _make_other_user(client) -> dict:
"""Registriert einen zweiten verifizierten User mit JWT-Token."""
email = f"other-{secrets.token_hex(4)}@example.com"
pw = "OtherPass123!"
name = f"other{secrets.token_hex(3)}"
r = client.post("/api/auth/register", json={
"email": email, "password": pw, "name": name
})
assert r.status_code == 200, r.text
from database import db
with db() as conn:
conn.execute("UPDATE users SET email_verified=1 WHERE email=?", (email,))
r2 = client.post("/api/auth/login", json={"email": email, "password": pw})
assert r2.status_code == 200, r2.text
token = r2.json()["token"]
return {
"email": email,
"password": pw,
"name": name,
"token": token,
"headers": {"Authorization": f"Bearer {token}"},
}
# ==================================================================
# 1) Owner-Check via require_owner — Places
# ==================================================================
class TestRequireOwnerPlaces:
"""places.py nutzt `require_owner` fuer PATCH/DELETE — wir testen
den vollen Lebenszyklus mit zwei verschiedenen Usern."""
def _create_place(self, client, user) -> dict:
r = client.post(
"/api/places",
headers=user["headers"],
json={
"name": "Test-Cafe",
"typ": "restaurant",
"lat": 49.5,
"lon": 11.0,
"hund_rein": True,
},
)
assert r.status_code == 201, r.text
return r.json()
def test_get_place_is_public(self, client, user):
"""places sind public — fremder User darf lesen."""
place = self._create_place(client, user)
other = _make_other_user(client)
r = client.get(f"/api/places/{place['id']}", headers=other["headers"])
assert r.status_code == 200
assert r.json()["id"] == place["id"]
def test_patch_place_other_user_is_forbidden(self, client, user):
"""PATCH mit fremdem User -> 403 (require_owner greift)."""
place = self._create_place(client, user)
other = _make_other_user(client)
r = client.patch(
f"/api/places/{place['id']}",
headers=other["headers"],
json={"name": "Hijacked"},
)
assert r.status_code == 403, r.text
# Owner kann immer noch patchen
r2 = client.patch(
f"/api/places/{place['id']}",
headers=user["headers"],
json={"name": "Cafe Updated"},
)
assert r2.status_code == 200
assert r2.json()["name"] == "Cafe Updated"
def test_delete_place_other_user_is_forbidden(self, client, user):
"""DELETE mit fremdem User -> 403."""
place = self._create_place(client, user)
other = _make_other_user(client)
r = client.delete(f"/api/places/{place['id']}", headers=other["headers"])
assert r.status_code == 403, r.text
# Owner kann loeschen
r2 = client.delete(f"/api/places/{place['id']}", headers=user["headers"])
assert r2.status_code == 204
def test_patch_nonexistent_place_is_404(self, client, user):
"""require_owner wirft 404 wenn row None ist."""
r = client.patch(
"/api/places/9999999",
headers=user["headers"],
json={"name": "Ghost"},
)
assert r.status_code == 404
# ==================================================================
# 2) JWT-Blacklist — Logout invalidiert Token serverseitig
# ==================================================================
class TestJwtBlacklist:
def test_logout_blacklists_bearer_token(self, client):
"""Nach Logout muss das gleiche Token bei /auth/me 401 ergeben."""
# Frischer User in diesem Test — nicht das `user`-Fixture verwenden,
# weil andere Tests es weiter brauchen koennten.
info = _make_other_user(client)
# 1) Token funktioniert
r = client.get("/api/auth/me", headers=info["headers"])
assert r.status_code == 200
# 2) Logout
r2 = client.post("/api/auth/logout", headers=info["headers"])
assert r2.status_code == 200
assert r2.json()["ok"] is True
# 3) Token nun blacklisted -> 401
r3 = client.get("/api/auth/me", headers=info["headers"])
assert r3.status_code == 401, (
f"Token wurde nach Logout NICHT blacklisted (status={r3.status_code})"
)
def test_blacklist_entry_persisted_in_db(self, client):
"""Pruefen, dass das jti tatsaechlich in jwt_blacklist landet."""
info = _make_other_user(client)
# jti aus Token extrahieren
import jwt as _jwt
payload = _jwt.decode(
info["token"], options={"verify_signature": False}
)
jti = payload.get("jti")
assert jti, "Token enthaelt kein jti"
# Logout
client.post("/api/auth/logout", headers=info["headers"])
from database import db
with db() as conn:
row = conn.execute(
"SELECT jti, expires_at FROM jwt_blacklist WHERE jti=?", (jti,)
).fetchone()
assert row is not None, "jwt_blacklist-Eintrag fehlt nach Logout"
assert row["jti"] == jti
# ==================================================================
# 3) Login-Lockout — 5 Fehlversuche -> 429 mit Retry-After
# ==================================================================
class TestLoginLockout:
"""In conftest.py wird der Lockout fuer die Test-Session global
deaktiviert (sonst wuerden Auth-Tests sich gegenseitig blocken).
Hier aktivieren wir ihn fuer einzelne Tests gezielt zurueck."""
def _enable_lockout(self, monkeypatch):
"""Aktiviert die Lockout-Logik fuer einen einzelnen Test wieder.
In conftest.py werden routes.auth._db_is_account_locked und
_db_record_login_failure global gestubbt (damit Register-Spam
durch andere Tests nicht zur Session-weiten Sperre fuehrt).
Hier definieren wir die echten Implementierungen lokal neu
(Kopie aus routes/auth.py) und setzen sie per monkeypatch
zurueck — das wird nach dem Test automatisch revertiert.
"""
import routes.auth as _ra
from database import db
from datetime import datetime, timedelta, timezone
# Counter leeren — sonst beeinflussen vorherige Tests die Sperre.
with db() as conn:
conn.execute("DELETE FROM login_attempts")
_LOCKOUT_WINDOW_MIN = 15
_LOCKOUT_ATTEMPTS_MAX = 5
def _is_locked(email: str):
with db() as conn:
row = conn.execute(
"SELECT locked_until FROM login_attempts WHERE email=? COLLATE NOCASE",
(email,)
).fetchone()
if not row or not row["locked_until"]:
return None
try:
locked_until = datetime.fromisoformat(row["locked_until"])
except Exception:
return None
now = datetime.now(timezone.utc)
if locked_until.tzinfo is None:
locked_until = locked_until.replace(tzinfo=timezone.utc)
if locked_until <= now:
return None
return int((locked_until - now).total_seconds())
def _record(email: str):
now = datetime.now(timezone.utc)
window_start = now - timedelta(minutes=_LOCKOUT_WINDOW_MIN)
with db() as conn:
row = conn.execute(
"SELECT attempts, last_attempt FROM login_attempts WHERE email=? COLLATE NOCASE",
(email,)
).fetchone()
if row:
try:
last = datetime.fromisoformat(row["last_attempt"])
if last.tzinfo is None:
last = last.replace(tzinfo=timezone.utc)
except Exception:
last = now
attempts = (row["attempts"] + 1) if last >= window_start else 1
else:
attempts = 1
locked_until = None
if attempts >= _LOCKOUT_ATTEMPTS_MAX:
locked_until = (now + timedelta(minutes=_LOCKOUT_WINDOW_MIN)).isoformat()
conn.execute(
"""INSERT INTO login_attempts (email, attempts, last_attempt, locked_until)
VALUES (?,?,?,?)
ON CONFLICT(email) DO UPDATE SET
attempts=excluded.attempts,
last_attempt=excluded.last_attempt,
locked_until=excluded.locked_until""",
(email.lower(), attempts, now.isoformat(), locked_until)
)
monkeypatch.setattr(_ra, "_db_is_account_locked", _is_locked)
monkeypatch.setattr(_ra, "_db_record_login_failure", _record)
def test_lockout_after_five_failed_logins(self, client, monkeypatch):
"""5x falsches PW -> 6. Versuch ergibt 429 mit Retry-After."""
# Eigenen User anlegen, damit andere Tests das Lockout-Limit nicht
# zufaellig schon erreicht haben.
info = _make_other_user(client)
self._enable_lockout(monkeypatch)
# 5 Fehlversuche
for i in range(5):
r = client.post("/api/auth/login", json={
"email": info["email"], "password": "WRONG-PW!"
})
assert r.status_code == 401, (
f"Versuch {i+1}: erwartete 401, bekam {r.status_code}"
)
# 6. Versuch: jetzt gelockt
r6 = client.post("/api/auth/login", json={
"email": info["email"], "password": "WRONG-PW!"
})
assert r6.status_code == 429, (
f"Erwartete 429 nach 5 Fehlversuchen, bekam {r6.status_code}"
)
assert "Retry-After" in r6.headers, "Retry-After-Header fehlt bei Lockout"
def test_lockout_writes_to_login_attempts_table(self, client, monkeypatch):
"""login_attempts-Eintrag muss locked_until enthalten."""
info = _make_other_user(client)
self._enable_lockout(monkeypatch)
for _ in range(5):
client.post("/api/auth/login", json={
"email": info["email"], "password": "WRONG-PW!"
})
from database import db
with db() as conn:
row = conn.execute(
"SELECT attempts, locked_until FROM login_attempts WHERE email=? COLLATE NOCASE",
(info["email"],)
).fetchone()
assert row is not None, "Kein login_attempts-Eintrag angelegt"
assert row["attempts"] >= 5
assert row["locked_until"] is not None, "locked_until nicht gesetzt"
# ==================================================================
# 4) Rate-Limit auf /auth/login (Brute-Force-Schutz auf IP-Ebene)
# ==================================================================
@pytest.mark.xfail(
reason="rl_check ist in conftest fuer alle Tests gestubbt — Rate-Limit "
"laesst sich pro Test nicht selektiv reaktivieren ohne andere "
"parallele Tests zu beeintraechtigen."
)
def test_login_rate_limit_blocks_burst(client):
"""20+ schnelle Logins -> 429 vom Rate-Limiter."""
info = _make_other_user(client)
statuses = []
for _ in range(25):
r = client.post("/api/auth/login", json={
"email": info["email"], "password": "WRONG-PW!"
})
statuses.append(r.status_code)
assert 429 in statuses, f"Kein Rate-Limit-Treffer in {statuses}"