banyaro/tests/test_media_registry.py

227 lines
8.9 KiB
Python

"""Tests fuer die Media-Registry (iCloud-Hybrid, iOS-Voll-App M0).
Architektur: Originale liegen in der privaten CloudKit-DB des Nutzers, der Server
haelt nur Previews. diary = Phantom-URL + _preview.webp + Marker-404;
route = Preview wird als Datei selbst gespeichert. Siehe backend/routes/media.py.
"""
import io
import os
import secrets
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
def _jpeg(px: int = 900) -> bytes:
from PIL import Image
buf = io.BytesIO()
Image.new("RGB", (px, px), (200, 120, 40)).save(buf, format="JPEG", quality=85)
return buf.getvalue()
def _entry(client, user, dog, titel="Medien-Eintrag") -> int:
r = client.post(
f"/api/dogs/{dog['id']}/diary",
headers=user["headers"],
json={"titel": titel, "text": "."},
)
assert r.status_code == 201, r.text
return r.json()["id"]
def _register(client, user, context, context_id, **extra):
data = {"context": context, "context_id": str(context_id), "kind": "image",
"img_width": "4032", "img_height": "3024", **extra}
return client.post(
"/api/media/register",
headers=user["headers"],
data=data,
files={"preview": ("preview.jpg", _jpeg(), "image/jpeg")},
)
def _media_dir() -> str:
return os.environ["MEDIA_DIR"]
def _disk_path(url: str) -> str:
return os.path.join(_media_dir(), url.removeprefix("/media/"))
# ------------------------------------------------------------------
# diary: Phantom-URL + Preview
# ------------------------------------------------------------------
def test_register_diary_creates_preview_only(client, user, dog):
"""register legt _preview.webp an, aber KEIN Original (das geht nach iCloud)."""
eid = _entry(client, user, dog)
r = _register(client, user, "diary", eid)
assert r.status_code == 201, r.text
j = r.json()
assert j["url"].startswith("/media/diary/")
assert j["preview_url"] == os.path.splitext(j["url"])[0] + "_preview.webp"
assert j["ck_record_name"] == f"media-{j['media_id']}"
assert j["is_cover"] == 1 # erstes Medium wird Cover
assert not os.path.exists(_disk_path(j["url"])) # Phantom
assert os.path.exists(_disk_path(j["preview_url"])) # echtes Preview
# Eintrag listet das Medium mit Preview-URL (Web-Anzeige unveraendert)
r2 = client.get(f"/api/dogs/{dog['id']}/diary/{eid}", headers=user["headers"])
items = r2.json()["media_items"]
assert len(items) == 1 and items[0]["url"] == j["url"]
def test_original_404_carries_icloud_marker(client, user, dog):
"""GET auf die Phantom-URL -> 404 mit storage:'icloud'; Preview liefert 200."""
eid = _entry(client, user, dog)
j = _register(client, user, "diary", eid).json()
cookies = {"by_token": user["token"]} # /media/diary/ verlangt Login-Cookie
r = client.get(j["url"], cookies=cookies)
assert r.status_code == 404
assert r.json().get("storage") == "icloud"
r2 = client.get(j["preview_url"], cookies=cookies)
assert r2.status_code == 200
assert r2.headers["content-type"].startswith("image/webp")
def test_confirm_and_mine(client, user, dog):
"""PATCH bestaetigt den CloudKit-Upload; /mine ist die Sync-Wahrheit der App."""
eid = _entry(client, user, dog)
j = _register(client, user, "diary", eid).json()
r = client.patch(f"/api/media/{j['media_id']}", headers=user["headers"], json={})
assert r.status_code == 200
assert r.json()["ck_state"] == "confirmed"
mine = client.get("/api/media/mine", headers=user["headers"]).json()
assert [m for m in mine if m["id"] == j["media_id"]][0]["ck_state"] == "confirmed"
def test_quota_fallback_uploads_original(client, user, dog):
"""iCloud voll -> Original klassisch zum Server, danach liefert die URL 200."""
eid = _entry(client, user, dog)
j = _register(client, user, "diary", eid).json()
r = client.post(
f"/api/media/{j['media_id']}/original",
headers=user["headers"],
files={"file": ("original.jpg", _jpeg(2000), "image/jpeg")},
)
assert r.status_code == 201, r.text
assert r.json()["storage"] == "server"
r2 = client.get(j["url"], cookies={"by_token": user["token"]})
assert r2.status_code == 200
mine = client.get("/api/media/mine", headers=user["headers"]).json()
assert [m for m in mine if m["id"] == j["media_id"]][0]["storage"] == "server"
# ------------------------------------------------------------------
# Ownership
# ------------------------------------------------------------------
def _second_user(client) -> dict:
email = f"fremd-{secrets.token_hex(4)}@example.com"
pw = "TestPass123!"
r = client.post("/api/auth/register",
json={"email": email, "password": pw,
"name": f"fremd{secrets.token_hex(3)}"})
assert r.status_code == 200, r.text
from database import db
with db() as conn:
conn.execute("UPDATE users SET email_verified=1 WHERE email=?", (email,))
token = client.post("/api/auth/login",
json={"email": email, "password": pw}).json()["token"]
return {"token": token, "headers": {"Authorization": f"Bearer {token}"}}
def test_foreign_user_blocked(client, user, dog):
"""Fremder User kann weder registrieren noch bestaetigen noch fallbacken."""
eid = _entry(client, user, dog)
j = _register(client, user, "diary", eid).json()
fremd = _second_user(client)
assert _register(client, fremd, "diary", eid).status_code == 404
assert client.patch(f"/api/media/{j['media_id']}",
headers=fremd["headers"], json={}).status_code == 404
r = client.post(f"/api/media/{j['media_id']}/original",
headers=fremd["headers"],
files={"file": ("x.jpg", _jpeg(100), "image/jpeg")})
assert r.status_code == 404
# ------------------------------------------------------------------
# Loeschen raeumt Registry + Dateien
# ------------------------------------------------------------------
def test_delete_media_item_cleans_registry_and_preview(client, user, dog):
eid = _entry(client, user, dog)
j = _register(client, user, "diary", eid).json()
prev_path = _disk_path(j["preview_url"])
assert os.path.exists(prev_path)
r = client.delete(f"/api/dogs/{dog['id']}/diary/{eid}/media/{j['id']}",
headers=user["headers"])
assert r.status_code == 204
assert not os.path.exists(prev_path)
mine = client.get("/api/media/mine", headers=user["headers"]).json()
assert not [m for m in mine if m["id"] == j["media_id"]]
def test_entry_delete_cleans_registry(client, user, dog):
eid = _entry(client, user, dog)
j = _register(client, user, "diary", eid).json()
r = client.delete(f"/api/dogs/{dog['id']}/diary/{eid}", headers=user["headers"])
assert r.status_code == 204
mine = client.get("/api/media/mine", headers=user["headers"]).json()
assert not [m for m in mine if m["id"] == j["media_id"]]
# ------------------------------------------------------------------
# route: Preview wird die Datei selbst
# ------------------------------------------------------------------
def _route(client, user) -> int:
r = client.post("/api/routes", headers=user["headers"], json={
"name": "Testrunde",
"gps_track": [{"lat": 48.10, "lon": 11.50}, {"lat": 48.11, "lon": 11.51}],
"distanz_km": 1.2, "dauer_min": 20,
})
assert r.status_code == 201, r.text
return r.json()["id"]
def test_register_route_stores_preview_as_file(client, user, dog):
rid = _route(client, user)
r = _register(client, user, "route", rid)
assert r.status_code == 201, r.text
j = r.json()
assert j["foto_url"].startswith("/media/routes/")
assert j["foto_urls"] == [j["foto_url"]]
assert os.path.exists(_disk_path(j["foto_url"])) # Datei existiert (= Preview)
r2 = client.get(j["foto_url"]) # routes/ ist nicht auth-pflichtig
assert r2.status_code == 200
# Route-Delete raeumt die Registry-Row ab (App-Sync loescht dann den CKRecord)
assert client.delete(f"/api/routes/{rid}",
headers=user["headers"]).status_code == 204
mine = client.get("/api/media/mine", headers=user["headers"]).json()
assert not [m for m in mine if m["id"] == j["media_id"]]
def test_register_validates_context_and_kind(client, user, dog):
eid = _entry(client, user, dog)
r = client.post("/api/media/register", headers=user["headers"],
data={"context": "forum", "context_id": "1", "kind": "image"},
files={"preview": ("p.jpg", _jpeg(100), "image/jpeg")})
assert r.status_code == 422
r2 = client.post("/api/media/register", headers=user["headers"],
data={"context": "diary", "context_id": str(eid), "kind": "egal"},
files={"preview": ("p.jpg", _jpeg(100), "image/jpeg")})
assert r2.status_code == 422