"""Race-Condition-Tests fuer atomare Counter (invoice_number, founder_number). Diese Tests pruefen, dass die in Sprint60 eingefuehrten Race-Schutze (invoice_counters-Tabelle + atomare SQL-UPDATEs) tatsaechlich eindeutige Werte vergeben, wenn mehrere Threads gleichzeitig zugreifen. """ from __future__ import annotations import concurrent.futures import secrets import threading # ================================================================== # 1) Invoice-Counter — atomare Rechnungsnummern-Vergabe # ================================================================== class TestInvoiceCounterRace: """_next_invoice_number() in routes/invoices.py nutzt: - dedizierte invoice_counters-Tabelle - BEGIN IMMEDIATE + busy_timeout - SQLite serialisiert Writer Test: 20 parallele Aufrufe -> 20 EINDEUTIGE Nummern.""" def test_parallel_invoice_numbers_are_unique(self): from database import db from routes.invoices import _next_invoice_number N = 20 # Counter fuer Pruefung zuruecksetzen — sonst koennten andere Tests # die Sequence schon hochgezaehlt haben (Konflikt vermeiden wir # ueber einen frischen, einzigartigen Prefix pro Testlauf). prefix = "TEST-" + secrets.token_hex(3).upper() results: list[str] = [] errors: list[str] = [] lock = threading.Lock() def worker(): try: with db() as conn: n = _next_invoice_number(conn, prefix=prefix) with lock: results.append(n) except Exception as exc: with lock: errors.append(repr(exc)) with concurrent.futures.ThreadPoolExecutor(max_workers=N) as pool: futures = [pool.submit(worker) for _ in range(N)] concurrent.futures.wait(futures) assert not errors, f"Fehler in Threads: {errors}" assert len(results) == N, f"Erwartete {N} Ergebnisse, bekam {len(results)}" # Duplikate? assert len(set(results)) == N, ( f"DUPLIKATE in vergebenen Nummern! {len(results)-len(set(results))} " f"Kollisionen. Beispiele: {results}" ) def test_invoice_counter_increments_monotonically(self): """Ohne Parallelitaet muss next_num strikt um 1 steigen.""" from database import db from routes.invoices import _next_invoice_number prefix = "MONO-" + secrets.token_hex(3).upper() nums = [] for _ in range(5): with db() as conn: nums.append(_next_invoice_number(conn, prefix=prefix)) # letzte Komponente aus z. B. "MONO-XYZ-2026-0001" tails = [int(n.split("-")[-1]) for n in nums] assert tails == [1, 2, 3, 4, 5], ( f"Counter steigt nicht monoton: {tails}" ) # ================================================================== # 2) Founder-Number — atomare Gruender-Vergabe via partner.py # ================================================================== class TestFounderNumberAtomic: """partner.py setzt is_founder + founder_number in EINEM UPDATE, routes/dogs.py macht dasselbe via atomarem Sub-Query. Wir testen die SQL-Logik direkt (ohne HTTP), weil das partner- Endpunkt-Aufruf-Trace fuer Race-Tests zu komplex ist.""" def _make_pending_user(self, email_prefix: str = "fp") -> int: """Legt einen User direkt in der DB an, der bereits is_founder_pending=1 hat.""" from database import db email = f"{email_prefix}-{secrets.token_hex(4)}@example.com" name = f"founder{secrets.token_hex(3)}" with db() as conn: cur = conn.execute( """INSERT INTO users (email, pw_hash, name, referral_code, email_verified, is_founder_pending) VALUES (?, ?, ?, ?, 1, 1)""", (email, "x", name, secrets.token_hex(4)) ) return cur.lastrowid def _atomic_founder_update(self, user_id: int): """Reproduziert das atomare UPDATE aus routes/dogs.py.""" from database import db with db() as conn: conn.execute( """UPDATE users SET is_founder = 1, founder_number = ( SELECT IFNULL(MAX(founder_number), 0) + 1 FROM users WHERE is_founder = 1 ), is_founder_pending = 0 WHERE id = ? AND is_founder_pending = 1 AND (is_founder IS NULL OR is_founder = 0) AND (SELECT COUNT(*) FROM users WHERE is_founder = 1) < 100""", (user_id,) ) def test_parallel_founder_assignments_get_unique_numbers(self): """Zwei parallele Aufrufe fuer zwei verschiedene Pending-User muessen unterschiedliche founder_numbers bekommen.""" from database import db ids = [self._make_pending_user("race") for _ in range(2)] threads = [] errors: list[str] = [] lock = threading.Lock() def worker(uid): try: self._atomic_founder_update(uid) except Exception as exc: with lock: errors.append(repr(exc)) for uid in ids: t = threading.Thread(target=worker, args=(uid,)) threads.append(t) t.start() for t in threads: t.join() assert not errors, f"Fehler: {errors}" with db() as conn: rows = conn.execute( f"SELECT id, founder_number FROM users WHERE id IN ({','.join('?'*len(ids))})", ids ).fetchall() numbers = [r["founder_number"] for r in rows] assert all(n is not None for n in numbers), ( f"Mindestens eine founder_number ist NULL: {numbers}" ) assert len(set(numbers)) == len(numbers), ( f"DUPLIKATE bei founder_number: {numbers}" ) def test_founder_full_means_no_more_numbers(self, monkeypatch): """Wenn bereits 100 Founder existieren, vergibt die atomare Logik KEINE neue Nummer mehr (rowcount = 0).""" from database import db # Wir koennen nicht einfach 100 Founder anlegen — stattdessen # mocken wir die Limit-Logik durch einen kleinen Limit-Test: # statt < 100 -> < N, in dem wir einen eigenen Test-User # einfuegen und davor genau N existierende Founder anlegen. N_LIMIT = 3 # Test-Schwelle waehlen: vorhandene Founder im System zaehlen, # dann auf N_LIMIT auffuellen. with db() as conn: existing = conn.execute( "SELECT COUNT(*) FROM users WHERE is_founder=1" ).fetchone()[0] # Wir muessen auf den HARDCODED 100er-Wert in der SQL-Query # vertrauen — also auf 100 auffuellen. Das ist teuer, aber # eindeutig. Wir machen es in einem Insert mit GROUP BY trick. to_create = max(0, 100 - existing) if to_create > 0: with db() as conn: # Bulk-Insert ist am schnellsten conn.executemany( """INSERT INTO users (email, pw_hash, name, referral_code, email_verified, is_founder, founder_number) VALUES (?, 'x', ?, ?, 1, 1, ?)""", [ ( f"founder{i}-{secrets.token_hex(3)}@x.test", f"founder{i}-{secrets.token_hex(3)}", secrets.token_hex(4), existing + i + 1, ) for i in range(to_create) ] ) # Pruefen: 100 Founder vorhanden with db() as conn: count = conn.execute( "SELECT COUNT(*) FROM users WHERE is_founder=1" ).fetchone()[0] assert count >= 100, f"Setup falsch: nur {count} Founder" # Jetzt: ein neuer Pending-User darf KEINE Nummer mehr bekommen new_uid = self._make_pending_user("over") self._atomic_founder_update(new_uid) with db() as conn: row = conn.execute( "SELECT is_founder, founder_number, is_founder_pending FROM users WHERE id=?", (new_uid,) ).fetchone() assert row["is_founder"] in (0, None), ( "User wurde Founder, obwohl bereits 100 vergeben sind." ) assert row["founder_number"] is None, ( f"founder_number wurde vergeben trotz vollem Slot: {row['founder_number']}" ) # Pending bleibt erhalten — User kann spaeter bei Ausstieg eines # bestehenden Founders nachruecken. assert row["is_founder_pending"] == 1