"""Tests fuer die Media-Registry (iCloud-Hybrid, iOS-Voll-App M0). Architektur: Originale liegen in der privaten CloudKit-DB des Nutzers, der Server haelt nur Previews. diary = Phantom-URL + _preview.webp + Marker-404; route = Preview wird als Datei selbst gespeichert. Siehe backend/routes/media.py. """ import io import os import secrets # ------------------------------------------------------------------ # Helpers # ------------------------------------------------------------------ def _jpeg(px: int = 900) -> bytes: from PIL import Image buf = io.BytesIO() Image.new("RGB", (px, px), (200, 120, 40)).save(buf, format="JPEG", quality=85) return buf.getvalue() def _entry(client, user, dog, titel="Medien-Eintrag") -> int: r = client.post( f"/api/dogs/{dog['id']}/diary", headers=user["headers"], json={"titel": titel, "text": "."}, ) assert r.status_code == 201, r.text return r.json()["id"] def _register(client, user, context, context_id, **extra): data = {"context": context, "context_id": str(context_id), "kind": "image", "img_width": "4032", "img_height": "3024", **extra} return client.post( "/api/media/register", headers=user["headers"], data=data, files={"preview": ("preview.jpg", _jpeg(), "image/jpeg")}, ) def _media_dir() -> str: return os.environ["MEDIA_DIR"] def _disk_path(url: str) -> str: return os.path.join(_media_dir(), url.removeprefix("/media/")) # ------------------------------------------------------------------ # diary: Phantom-URL + Preview # ------------------------------------------------------------------ def test_register_diary_creates_preview_only(client, user, dog): """register legt _preview.webp an, aber KEIN Original (das geht nach iCloud).""" eid = _entry(client, user, dog) r = _register(client, user, "diary", eid) assert r.status_code == 201, r.text j = r.json() assert j["url"].startswith("/media/diary/") assert j["preview_url"] == os.path.splitext(j["url"])[0] + "_preview.webp" assert j["ck_record_name"] == f"media-{j['media_id']}" assert j["is_cover"] == 1 # erstes Medium wird Cover assert not os.path.exists(_disk_path(j["url"])) # Phantom assert os.path.exists(_disk_path(j["preview_url"])) # echtes Preview # Eintrag listet das Medium mit Preview-URL (Web-Anzeige unveraendert) r2 = client.get(f"/api/dogs/{dog['id']}/diary/{eid}", headers=user["headers"]) items = r2.json()["media_items"] assert len(items) == 1 and items[0]["url"] == j["url"] def test_original_404_carries_icloud_marker(client, user, dog): """GET auf die Phantom-URL -> 404 mit storage:'icloud'; Preview liefert 200.""" eid = _entry(client, user, dog) j = _register(client, user, "diary", eid).json() cookies = {"by_token": user["token"]} # /media/diary/ verlangt Login-Cookie r = client.get(j["url"], cookies=cookies) assert r.status_code == 404 assert r.json().get("storage") == "icloud" r2 = client.get(j["preview_url"], cookies=cookies) assert r2.status_code == 200 assert r2.headers["content-type"].startswith("image/webp") def test_confirm_and_mine(client, user, dog): """PATCH bestaetigt den CloudKit-Upload; /mine ist die Sync-Wahrheit der App.""" eid = _entry(client, user, dog) j = _register(client, user, "diary", eid).json() r = client.patch(f"/api/media/{j['media_id']}", headers=user["headers"], json={}) assert r.status_code == 200 assert r.json()["ck_state"] == "confirmed" mine = client.get("/api/media/mine", headers=user["headers"]).json() assert [m for m in mine if m["id"] == j["media_id"]][0]["ck_state"] == "confirmed" def test_quota_fallback_uploads_original(client, user, dog): """iCloud voll -> Original klassisch zum Server, danach liefert die URL 200.""" eid = _entry(client, user, dog) j = _register(client, user, "diary", eid).json() r = client.post( f"/api/media/{j['media_id']}/original", headers=user["headers"], files={"file": ("original.jpg", _jpeg(2000), "image/jpeg")}, ) assert r.status_code == 201, r.text assert r.json()["storage"] == "server" r2 = client.get(j["url"], cookies={"by_token": user["token"]}) assert r2.status_code == 200 mine = client.get("/api/media/mine", headers=user["headers"]).json() assert [m for m in mine if m["id"] == j["media_id"]][0]["storage"] == "server" # ------------------------------------------------------------------ # Ownership # ------------------------------------------------------------------ def _second_user(client) -> dict: email = f"fremd-{secrets.token_hex(4)}@example.com" pw = "TestPass123!" r = client.post("/api/auth/register", json={"email": email, "password": pw, "name": f"fremd{secrets.token_hex(3)}"}) assert r.status_code == 200, r.text from database import db with db() as conn: conn.execute("UPDATE users SET email_verified=1 WHERE email=?", (email,)) token = client.post("/api/auth/login", json={"email": email, "password": pw}).json()["token"] return {"token": token, "headers": {"Authorization": f"Bearer {token}"}} def test_foreign_user_blocked(client, user, dog): """Fremder User kann weder registrieren noch bestaetigen noch fallbacken.""" eid = _entry(client, user, dog) j = _register(client, user, "diary", eid).json() fremd = _second_user(client) assert _register(client, fremd, "diary", eid).status_code == 404 assert client.patch(f"/api/media/{j['media_id']}", headers=fremd["headers"], json={}).status_code == 404 r = client.post(f"/api/media/{j['media_id']}/original", headers=fremd["headers"], files={"file": ("x.jpg", _jpeg(100), "image/jpeg")}) assert r.status_code == 404 # ------------------------------------------------------------------ # Loeschen raeumt Registry + Dateien # ------------------------------------------------------------------ def test_delete_media_item_cleans_registry_and_preview(client, user, dog): eid = _entry(client, user, dog) j = _register(client, user, "diary", eid).json() prev_path = _disk_path(j["preview_url"]) assert os.path.exists(prev_path) r = client.delete(f"/api/dogs/{dog['id']}/diary/{eid}/media/{j['id']}", headers=user["headers"]) assert r.status_code == 204 assert not os.path.exists(prev_path) mine = client.get("/api/media/mine", headers=user["headers"]).json() assert not [m for m in mine if m["id"] == j["media_id"]] def test_entry_delete_cleans_registry(client, user, dog): eid = _entry(client, user, dog) j = _register(client, user, "diary", eid).json() r = client.delete(f"/api/dogs/{dog['id']}/diary/{eid}", headers=user["headers"]) assert r.status_code == 204 mine = client.get("/api/media/mine", headers=user["headers"]).json() assert not [m for m in mine if m["id"] == j["media_id"]] # ------------------------------------------------------------------ # route: Preview wird die Datei selbst # ------------------------------------------------------------------ def _route(client, user) -> int: r = client.post("/api/routes", headers=user["headers"], json={ "name": "Testrunde", "gps_track": [{"lat": 48.10, "lon": 11.50}, {"lat": 48.11, "lon": 11.51}], "distanz_km": 1.2, "dauer_min": 20, }) assert r.status_code == 201, r.text return r.json()["id"] def test_register_route_stores_preview_as_file(client, user, dog): rid = _route(client, user) r = _register(client, user, "route", rid) assert r.status_code == 201, r.text j = r.json() assert j["foto_url"].startswith("/media/routes/") assert j["foto_urls"] == [j["foto_url"]] assert os.path.exists(_disk_path(j["foto_url"])) # Datei existiert (= Preview) r2 = client.get(j["foto_url"]) # routes/ ist nicht auth-pflichtig assert r2.status_code == 200 # Route-Delete raeumt die Registry-Row ab (App-Sync loescht dann den CKRecord) assert client.delete(f"/api/routes/{rid}", headers=user["headers"]).status_code == 204 mine = client.get("/api/media/mine", headers=user["headers"]).json() assert not [m for m in mine if m["id"] == j["media_id"]] def test_register_validates_context_and_kind(client, user, dog): eid = _entry(client, user, dog) r = client.post("/api/media/register", headers=user["headers"], data={"context": "forum", "context_id": "1", "kind": "image"}, files={"preview": ("p.jpg", _jpeg(100), "image/jpeg")}) assert r.status_code == 422 r2 = client.post("/api/media/register", headers=user["headers"], data={"context": "diary", "context_id": str(eid), "kind": "egal"}, files={"preview": ("p.jpg", _jpeg(100), "image/jpeg")}) assert r2.status_code == 422