Big Sweep: Security + Race-Conditions + Tests + DSGVO + A11y, SW by-v1095

SECURITY (auth.py, routes/auth.py, database.py, main.py)
- JWT bekommt jti; Logout trägt in neue jwt_blacklist-Tabelle ein,
  decode_token() prüft → server-side Invalidierung
- JWT-Expiry default 30 → 7 Tage (ENV JWT_EXPIRY_DAYS überschreibt)
- Sliding-Refresh-Middleware: erneuert Cookie wenn >50% verbraucht
  (Schwelle via JWT_REFRESH_FRACTION, Default 2)
- Login-Lockout in DB-Tabelle login_attempts (5 Versuche / 15 Min,
  überlebt Container-Restart) — alte In-Memory-Lockouts ersetzt
- SMTP-Versand: alle 'except: pass' durch logger.exception ersetzt;
  Fehlversuche landen in failed_emails-Tabelle für späteres Retry
- Referral-Counter Race gefixt: UPDATE partner_codes SET uses=uses+1
  ... WHERE uses<max_uses RETURNING — atomar statt SELECT+UPDATE

RACE CONDITIONS (routes/invoices.py, database.py)
- Neue invoice_counters-Tabelle für atomare Nummernvergabe
- _next_invoice_number nutzt BEGIN IMMEDIATE + atomares UPDATE
- Funktioniert für RG- und ST-Prefixe (Stornorechnungen)
- Race-Test verifiziert (5 Threads × 20 Calls = 100 eindeutige Nummern)

VERSION + TESTS + ERROR-DIGEST (VERSION, Makefile, tests/, scheduler.py)
- Neue VERSION-Datei (Single Source of Truth) — main.py liest beim
  Startup
- Makefile-Target 'make bump' propagiert in sw.js, app.js, index.html
- Makefile-Target 'make test' setzt venv auf, läuft pytest
- 19 Smoke-Tests in tests/ (health, auth, diary, invoice) — alle grün
- Scheduler: täglicher _job_error_digest um 06:30 → schickt Error-
  Zusammenfassung an ADMIN_EMAIL (still wenn keine Errors)

DSGVO + A11Y + ERSTE-HILFE
- landing.html: 'HTML und ODS' → 'JSON' (tatsächlich implementiert)
- datenschutz.js: Sektion Account-Löschung erweitert (sofort gelöscht /
  anonymisiert / 10 Jahre für Rechnungen)
- erste-hilfe.js: prominentes Warning-Banner oben (ersetzt keine
  Tierarzt-Beratung); Notfallnummern gruppiert nach Land, TODO-Platz-
  halter für AT-Uni-Klinik, CH Tox Info Suisse, CH Tierspital Zürich
- ui.js Modal: ESC schließt, Focus-Trap, Auto-Focus erstes Element,
  Restore Focus auf vorigen Caller
- impressum.js Kontaktformular: Labels mit for=cf-name etc.

NEUE DB-TABELLEN (idempotent via CREATE TABLE IF NOT EXISTS)
- jwt_blacklist, login_attempts, failed_emails, invoice_counters

NEUE ENV-VARS
- JWT_REFRESH_FRACTION (Default 2)
- JWT_EXPIRY_DAYS Default geändert (30 → 7)
This commit is contained in:
rene 2026-05-26 20:12:01 +02:00
parent 6224044654
commit 9394bab1fb
23 changed files with 1208 additions and 78 deletions

0
tests/__init__.py Normal file
View file

172
tests/conftest.py Normal file
View file

@ -0,0 +1,172 @@
"""
BAN YARO pytest Fixtures
Wichtig:
- ENV-Variablen werden VOR dem Import von `backend.main` gesetzt,
weil `database.DB_PATH = os.getenv("DB_PATH", ...)` beim Modul-Import gebunden wird.
- Wir mocken APScheduler weg, damit beim lifespan-Startup keine Hintergrund-Jobs starten.
- Pro Test-Session legen wir eine frische SQLite-Datei in tmp_path an.
"""
from __future__ import annotations
import os
import sys
import secrets
import tempfile
import shutil
from pathlib import Path
import pytest
ROOT = Path(__file__).resolve().parent.parent
BACKEND_DIR = ROOT / "backend"
# Backend-Pfad in sys.path, damit `from main import app` funktioniert
sys.path.insert(0, str(BACKEND_DIR))
# ------------------------------------------------------------------
# Session-weite App-Instanz
# ------------------------------------------------------------------
@pytest.fixture(scope="session")
def _tmp_db_dir():
"""Temp-Verzeichnis für DB + Media während der Test-Session."""
d = tempfile.mkdtemp(prefix="banyaro-tests-")
yield Path(d)
shutil.rmtree(d, ignore_errors=True)
@pytest.fixture(scope="session")
def app(_tmp_db_dir):
"""
Initialisiert die FastAPI-App einmal pro Test-Session mit:
- Test-DB in tmp_path
- Stubbed Scheduler (keine Background-Jobs)
- KI_MODE=off (keine Netzwerk-Aufrufe)
- SMTP-Versand wird gemockt
"""
db_file = _tmp_db_dir / "test.db"
media_dir = _tmp_db_dir / "media"
breeder_dir = _tmp_db_dir / "breeder_docs"
media_dir.mkdir(exist_ok=True)
breeder_dir.mkdir(exist_ok=True)
os.environ["DB_PATH"] = str(db_file)
os.environ["MEDIA_DIR"] = str(media_dir)
os.environ["BREEDER_DOCS_DIR"] = str(breeder_dir)
os.environ["KI_MODE"] = "off"
os.environ["ENV"] = "test"
os.environ["JWT_SECRET"] = "test-secret-" + secrets.token_hex(8)
os.environ["STAGING"] = "false"
os.environ["ADMIN_EMAIL"] = "test-admin@example.com"
# SMTP nicht konfigurieren → _SMTP_READY=False, Mails werden geskippt
os.environ.pop("SMTP_SUPPORT_USER", None)
os.environ.pop("SMTP_SUPPORT_PASS", None)
# Scheduler-Modul vor main-Import stubben — sonst startet APScheduler beim Lifespan
import scheduler as _sched
_sched.start = lambda: None # type: ignore[assignment]
_sched.stop = lambda: None # type: ignore[assignment]
# Mailer abklemmen — keine echten E-Mails
import mailer as _mailer
async def _noop_send(*args, **kwargs):
return True
_mailer.send_email = _noop_send # type: ignore[assignment]
# Rate-Limit + Login-Lockout fuer Tests deaktivieren — sonst
# blockiert /api/auth/register nach 5 Aufrufen die ganze Test-Session.
import ratelimit as _rl
_noop_rl = lambda *a, **kw: None
_rl.check = _noop_rl # type: ignore[assignment]
_rl.is_account_locked = lambda *a, **kw: False # type: ignore[assignment]
_rl.record_login_failure = lambda *a, **kw: 0 # type: ignore[assignment]
# App importieren (initialisiert DB beim lifespan)
from main import app as fastapi_app
# Module-level Aliase ersetzen (Imports der Form `from ratelimit import check as rl_check`)
import routes.auth as _routes_auth
_routes_auth.rl_check = _noop_rl # type: ignore[assignment]
if hasattr(_routes_auth, "_db_is_account_locked"):
_routes_auth._db_is_account_locked = lambda *a, **kw: None # type: ignore[assignment]
if hasattr(_routes_auth, "_db_record_login_failure"):
_routes_auth._db_record_login_failure = lambda *a, **kw: 0 # type: ignore[assignment]
return fastapi_app
@pytest.fixture(scope="session")
def client(app):
"""FastAPI TestClient — triggert lifespan (init_db, seed_movies, …)."""
from fastapi.testclient import TestClient
with TestClient(app) as c:
yield c
# ------------------------------------------------------------------
# Test-User
# ------------------------------------------------------------------
def _register_and_verify(client, email: str, password: str, name: str) -> dict:
"""Helper: Register User + setzt email_verified=1 direkt in der DB."""
r = client.post("/api/auth/register", json={
"email": email, "password": password, "name": name
})
assert r.status_code == 200, f"Register failed: {r.status_code} {r.text}"
# email_verified=1 setzen (umgeht E-Mail-Verifikation für Tests)
from database import db
with db() as conn:
conn.execute(
"UPDATE users SET email_verified=1 WHERE email=?", (email,)
)
return {"email": email, "password": password, "name": name}
def _login(client, email: str, password: str) -> str:
"""Helper: Login → JWT-Token zurueck."""
r = client.post("/api/auth/login", json={"email": email, "password": password})
assert r.status_code == 200, f"Login failed: {r.status_code} {r.text}"
return r.json()["token"]
@pytest.fixture
def user(client):
"""Frisch registrierter und verifizierter Test-User mit JWT-Token."""
email = f"user-{secrets.token_hex(4)}@example.com"
pw = "TestPass123!"
name = f"tester{secrets.token_hex(3)}"
info = _register_and_verify(client, email, pw, name)
token = _login(client, email, pw)
info["token"] = token
info["headers"] = {"Authorization": f"Bearer {token}"}
return info
@pytest.fixture
def admin(client):
"""Test-Admin: registriert, verifiziert, rolle='admin' direkt in DB gesetzt."""
email = f"admin-{secrets.token_hex(4)}@example.com"
pw = "AdminPass123!"
# "admin"/"administrator" sind in der Blocklist → Bezeichner "ops..." verwenden
name = f"ops{secrets.token_hex(3)}"
info = _register_and_verify(client, email, pw, name)
from database import db
with db() as conn:
conn.execute("UPDATE users SET rolle='admin' WHERE email=?", (email,))
token = _login(client, email, pw)
info["token"] = token
info["headers"] = {"Authorization": f"Bearer {token}"}
return info
@pytest.fixture
def dog(client, user):
"""Erstellt einen Hund fuer den Test-User und gibt das Hund-Objekt zurueck."""
r = client.post(
"/api/dogs",
headers=user["headers"],
json={"name": "Buddy", "rasse": "Labrador", "is_public": False},
)
assert r.status_code in (200, 201), f"create_dog failed: {r.status_code} {r.text}"
return r.json()

56
tests/test_auth.py Normal file
View file

@ -0,0 +1,56 @@
"""Smoke-Tests fuer Auth-Flows: Register, Login, Logout, /me."""
import secrets
def test_register_creates_pending_user(client):
"""Frischer User -> pending_verification=True."""
email = f"new-{secrets.token_hex(4)}@example.com"
r = client.post("/api/auth/register", json={
"email": email,
"password": "TestPass123!",
"name": f"user{secrets.token_hex(3)}",
})
assert r.status_code == 200, r.text
assert r.json().get("pending_verification") is True
def test_login_with_wrong_password_returns_401(client, user):
"""Falsches Passwort -> 401."""
r = client.post("/api/auth/login", json={
"email": user["email"], "password": "WrongPass!!"
})
assert r.status_code == 401
def test_login_returns_token(client, user):
"""Korrekte Credentials -> JWT-Token."""
r = client.post("/api/auth/login", json={
"email": user["email"], "password": user["password"]
})
assert r.status_code == 200
assert "token" in r.json()
def test_me_requires_auth(client):
"""/api/auth/me ohne Token -> 401."""
r = client.get("/api/auth/me")
assert r.status_code == 401
def test_me_returns_user_info(client, user):
"""/api/auth/me mit gueltigem Token -> User-Objekt."""
r = client.get("/api/auth/me", headers=user["headers"])
assert r.status_code == 200
data = r.json()
assert data["email"] == user["email"]
assert data["name"] == user["name"]
# email_verified wurde im Fixture per DB-Update auf 1 gesetzt
assert data["email_verified"] == 1
def test_logout_clears_cookie(client, user):
"""/api/auth/logout -> ok."""
r = client.post("/api/auth/logout", headers=user["headers"])
assert r.status_code == 200
assert r.json()["ok"] is True

71
tests/test_diary.py Normal file
View file

@ -0,0 +1,71 @@
"""Smoke-Tests fuer Tagebuch-CRUD."""
def test_create_diary_entry(client, user, dog):
"""POST /api/dogs/{id}/diary -> 201 mit Entry."""
r = client.post(
f"/api/dogs/{dog['id']}/diary",
headers=user["headers"],
json={"titel": "Mein erster Eintrag", "text": "Heute war ein toller Tag."},
)
assert r.status_code == 201, r.text
entry = r.json()
assert entry["titel"] == "Mein erster Eintrag"
assert entry["dog_id"] == dog["id"]
def test_list_diary_entries(client, user, dog):
"""GET /api/dogs/{id}/diary listet erstellte Eintraege."""
client.post(
f"/api/dogs/{dog['id']}/diary",
headers=user["headers"],
json={"titel": "Eintrag A", "text": "Erstes Mal Gassi."},
)
client.post(
f"/api/dogs/{dog['id']}/diary",
headers=user["headers"],
json={"titel": "Eintrag B", "text": "Tierarzt."},
)
r = client.get(f"/api/dogs/{dog['id']}/diary", headers=user["headers"])
assert r.status_code == 200
entries = r.json()
assert isinstance(entries, list)
titles = [e["titel"] for e in entries]
assert "Eintrag A" in titles
assert "Eintrag B" in titles
def test_get_single_diary_entry(client, user, dog):
"""GET /api/dogs/{id}/diary/{entry_id} liefert genauen Eintrag."""
r = client.post(
f"/api/dogs/{dog['id']}/diary",
headers=user["headers"],
json={"titel": "Einzeleintrag", "text": "Inhalt."},
)
eid = r.json()["id"]
r2 = client.get(f"/api/dogs/{dog['id']}/diary/{eid}", headers=user["headers"])
assert r2.status_code == 200
assert r2.json()["titel"] == "Einzeleintrag"
def test_delete_diary_entry(client, user, dog):
"""DELETE entfernt Eintrag (204) — danach 404."""
r = client.post(
f"/api/dogs/{dog['id']}/diary",
headers=user["headers"],
json={"titel": "Loeschen mich", "text": "."},
)
eid = r.json()["id"]
r2 = client.delete(f"/api/dogs/{dog['id']}/diary/{eid}", headers=user["headers"])
assert r2.status_code == 204
r3 = client.get(f"/api/dogs/{dog['id']}/diary/{eid}", headers=user["headers"])
assert r3.status_code == 404
def test_diary_unauth(client, dog):
"""Ohne Token -> 401."""
r = client.get(f"/api/dogs/{dog['id']}/diary")
assert r.status_code == 401

32
tests/test_health.py Normal file
View file

@ -0,0 +1,32 @@
"""Smoke-Tests fuer trivialste Endpoints (kein Auth noetig)."""
def test_api_version(client):
"""GET /api/version sollte die zentrale VERSION zurueckgeben."""
r = client.get("/api/version")
assert r.status_code == 200
data = r.json()
assert "version" in data
# APP_VER wird aus VERSION-Datei gelesen → muss ein Zahlen-String sein
assert data["version"].isdigit(), f"version='{data['version']}' ist nicht numerisch"
def test_assetlinks(client):
"""TWA-Verifikation fuer Play Store."""
r = client.get("/.well-known/assetlinks.json")
assert r.status_code == 200
assert "package_name" in r.text
def test_robots(client):
"""robots.txt muss erreichbar sein."""
r = client.get("/robots.txt")
assert r.status_code == 200
def test_manifest(client):
"""PWA-Manifest."""
r = client.get("/manifest.json")
assert r.status_code == 200
data = r.json()
assert "name" in data

91
tests/test_invoice.py Normal file
View file

@ -0,0 +1,91 @@
"""Smoke-Tests fuer Rechnungs-CRUD (Admin)."""
import os
import secrets
def test_admin_create_invoice(client, admin):
"""POST /api/admin/invoices als Admin -> 201 mit Rechnungsnummer."""
r = client.post(
"/api/admin/invoices",
headers=admin["headers"],
json={
"recipient_name": "Max Mustermann",
"recipient_email": "max@example.com",
"items": [{"description": "Pro-Jahresabo", "quantity": 1, "unit_price": 29.0}],
"service_period": "2026-01-01 - 2026-12-31",
"notes": "Test-Rechnung",
},
)
assert r.status_code == 201, r.text
data = r.json()
assert data["invoice_number"].startswith("RG-")
assert data["amount_gross"] == 29.0
assert data["status"] == "draft"
assert len(data["items"]) == 1
def test_admin_list_invoices(client, admin):
"""GET /api/admin/invoices listet vorhandene Rechnungen."""
# Eine erstellen
client.post(
"/api/admin/invoices",
headers=admin["headers"],
json={
"recipient_name": "Tester Listing",
"recipient_email": f"list-{secrets.token_hex(3)}@example.com",
"items": [{"description": "Sub", "quantity": 1, "unit_price": 49.0}],
},
)
r = client.get("/api/admin/invoices", headers=admin["headers"])
assert r.status_code == 200
data = r.json()
assert isinstance(data, list)
assert len(data) >= 1
def test_non_admin_cannot_create_invoice(client, user):
"""Normaler User -> 403."""
r = client.post(
"/api/admin/invoices",
headers=user["headers"],
json={
"recipient_name": "x",
"recipient_email": "x@example.com",
"items": [{"description": "x", "quantity": 1, "unit_price": 1.0}],
},
)
assert r.status_code == 403
def test_admin_send_invoice(client, admin, tmp_path, monkeypatch):
"""Send-Endpoint generiert PDF, ruft (gemockten) Mailer auf, setzt Status auf 'sent'."""
# Paperless-Verzeichnis auf tmp_path setzen
scaninput = tmp_path / "scaninput"
scaninput.mkdir()
monkeypatch.setenv("SCANINPUT_DIR", str(scaninput))
monkeypatch.setenv("PAPERLESS_URL", "") # kein HTTP-Call
monkeypatch.setenv("KLEINUNTERNEHMER", "true")
# Rechnung anlegen
r = client.post(
"/api/admin/invoices",
headers=admin["headers"],
json={
"recipient_name": "Send-Test",
"recipient_email": f"send-{secrets.token_hex(3)}@example.com",
"items": [{"description": "Sub", "quantity": 1, "unit_price": 29.0}],
"service_period": "2026-01-01 - 2026-12-31",
},
)
assert r.status_code == 201, r.text
inv = r.json()
# Senden
r2 = client.post(f"/api/admin/invoices/{inv['id']}/send", headers=admin["headers"])
assert r2.status_code == 200, r2.text
# Status auf 'sent'?
r3 = client.get(f"/api/admin/invoices/{inv['id']}", headers=admin["headers"])
assert r3.status_code == 200
assert r3.json()["status"] == "sent"