Compare commits

...

3 commits

8 changed files with 568 additions and 4 deletions

View file

@ -296,7 +296,9 @@ dev:
KI_MODE=off ENV=development \
JWT_SECRET=dev-secret \
DB_PATH=./dev.db \
uvicorn main:app --reload --port 8001
MEDIA_DIR=$${MEDIA_DIR:-/tmp/banyaro-media} \
BREEDER_DOCS_DIR=$${BREEDER_DOCS_DIR:-/tmp/banyaro-breeder} \
uvicorn main:app --reload --port 8001 --host $${HOST:-127.0.0.1}
# ----------------------------------------------------------
# REPORTS — Quartalsberichte generieren und committen

View file

@ -2673,6 +2673,44 @@ def _migrate(conn_factory):
""")
logger.info("Migration: failed_emails Tabelle bereit.")
# iOS-Voll-App M0: Media-Registry — Originale liegen in der privaten
# CloudKit-DB des Nutzers, der Server hält nur Previews. Die Registry ist
# die Quelle der Wahrheit über Existenz + Speicherort (storage: server|icloud).
# ON DELETE CASCADE → Self-Delete (FK-Introspektion in profile.py) und
# Admin-Delete (finaler users-DELETE) räumen automatisch mit ab.
conn.executescript("""
CREATE TABLE IF NOT EXISTS media_registry (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
url TEXT NOT NULL UNIQUE,
storage TEXT NOT NULL DEFAULT 'icloud',
ck_record_name TEXT,
ck_state TEXT NOT NULL DEFAULT 'pending',
kind TEXT NOT NULL DEFAULT 'image',
context TEXT,
context_id INTEGER,
bytes_original INTEGER,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_media_registry_user ON media_registry(user_id);
CREATE INDEX IF NOT EXISTS idx_media_registry_ctx ON media_registry(context, context_id);
""")
logger.info("Migration: media_registry Tabelle bereit (iCloud-Hybrid).")
# daily_photo_cache wurde bisher NUR lazy in routes/dogs.py (Tagesfoto)
# angelegt — delete_diary/delete_media_item referenzieren sie aber auch.
# Auf einer frischen DB ohne Tagesfoto-Abruf warf Löschen daher 500.
# Struktur identisch zum Lazy-CREATE in dogs.py:241.
conn.executescript("""
CREATE TABLE IF NOT EXISTS daily_photo_cache (
dog_id INTEGER NOT NULL,
datum TEXT NOT NULL,
photo_url TEXT NOT NULL,
PRIMARY KEY (dog_id, datum)
);
""")
logger.info("Migration: daily_photo_cache Tabelle bereit.")
# Second-Pass der ALTER-TABLE-Migrations:
# Manche Migrations (z.B. diary_media.img_width) referenzieren Tabellen,
# die erst weiter unten in dieser Funktion per CREATE IF NOT EXISTS angelegt

View file

@ -237,6 +237,7 @@ from routes.wiki import router as wiki_router
from routes.movies import router as movies_router
from routes.friends import router as friends_router
from routes.chat import router as chat_router
from routes.media import router as media_router
from routes.admin import router as admin_router
from routes.webcal import router as webcal_router
from routes.profile import router as profile_router
@ -306,6 +307,7 @@ app.include_router(wiki_router, prefix="/api/wiki", tags=["Wiki"])
app.include_router(movies_router, prefix="/api/movies", tags=["Filme"])
app.include_router(friends_router, prefix="/api/friends", tags=["Freunde"])
app.include_router(chat_router, prefix="/api/chat", tags=["Chat"])
app.include_router(media_router, prefix="/api/media", tags=["Medien"])
app.include_router(admin_router, prefix="/api/admin", tags=["Admin"])
app.include_router(breeder_router, prefix="/api", tags=["Züchter"])
app.include_router(litters_router, prefix="/api", tags=["Würfe"])
@ -583,6 +585,22 @@ async def serve_media(path: str, request: _Request):
filepath = _resolve_media_path(path)
if not filepath:
# iCloud-Hybrid (M0): Original liegt in der privaten iCloud des Besitzers,
# auf dem Server existiert nur das Preview. Marker-404, damit Clients
# unterscheiden können — Web fällt via onerror aufs Preview zurück.
# Lookup nur im Miss-Pfad → keine Kosten für normale Medien-Requests.
from database import db as _db
from fastapi.responses import JSONResponse as _JSONResponse
with _db() as conn:
reg = conn.execute(
"SELECT storage FROM media_registry WHERE url=?", ("/media/" + path,)
).fetchone()
if reg and reg["storage"] == "icloud":
return _JSONResponse(
{"detail": "Original liegt in der iCloud des Besitzers.",
"storage": "icloud"},
status_code=404,
)
raise _HE(404, "Nicht gefunden.")
return _media_response(filepath)

View file

@ -122,9 +122,14 @@ def _fetch_media_items(conn, entry_ids: list[int]) -> dict:
if not entry_ids:
return {}
ph = ",".join("?" * len(entry_ids))
# LEFT JOIN media_registry: iCloud-Hybrid-Medien tragen storage='icloud' +
# ck_record_name — die iOS-App holt das Original dann aus CloudKit statt
# von der (Phantom-)URL. Server-Medien: storage NULL/'server'.
rows = conn.execute(
f"SELECT id, diary_id, url, media_type, sort_order, is_cover FROM diary_media "
f"WHERE diary_id IN ({ph}) ORDER BY diary_id, sort_order",
f"SELECT dm.id, dm.diary_id, dm.url, dm.media_type, dm.sort_order, dm.is_cover, "
f" mr.storage AS storage, mr.ck_record_name AS ck_record_name "
f"FROM diary_media dm LEFT JOIN media_registry mr ON mr.url = dm.url "
f"WHERE dm.diary_id IN ({ph}) ORDER BY dm.diary_id, dm.sort_order",
entry_ids
).fetchall()
result = {}
@ -135,6 +140,8 @@ def _fetch_media_items(conn, entry_ids: list[int]) -> dict:
"preview_url": preview_url_from(url),
"media_type": r["media_type"], "sort_order": r["sort_order"],
"is_cover": r["is_cover"],
"storage": r["storage"] or "server",
"ck_record_name": r["ck_record_name"],
})
return result
@ -634,6 +641,9 @@ async def delete_diary(dog_id: int, entry_id: int, user=Depends(get_current_user
).fetchall()]
for u in media_urls:
conn.execute("DELETE FROM daily_photo_cache WHERE photo_url=?", (u,))
# iCloud-Hybrid: Registry-Row mitlöschen — der nächste App-Sync
# (GET /api/media/mine) räumt dann den verwaisten CKRecord ab.
conn.execute("DELETE FROM media_registry WHERE url=?", (u,))
conn.execute(
"DELETE FROM diary WHERE id=? AND dog_id=?", (entry_id, dog_id)
)
@ -792,9 +802,16 @@ async def delete_media_item(dog_id: int, entry_id: int, media_id: int,
if file_path:
try: os.remove(file_path)
except OSError: pass
# Preview/Thumb mit-entfernen — bei iCloud-Medien die einzige
# Server-Datei, bei Server-Medien lagen sie bisher als Leichen herum.
base = os.path.splitext(file_path)[0]
for leftover in (base + "_preview.webp", base + "_thumb.jpg"):
try: os.remove(leftover)
except OSError: pass
# daily_photo_cache mit-bereinigen falls das Bild als Tagesfoto
# gewählt war (sonst lädt der Client 404).
conn.execute("DELETE FROM daily_photo_cache WHERE photo_url=?", (row["url"],))
conn.execute("DELETE FROM media_registry WHERE url=?", (row["url"],))
conn.execute("DELETE FROM diary_media WHERE id=?", (media_id,))

243
backend/routes/media.py Normal file
View file

@ -0,0 +1,243 @@
"""
Media-Registry iOS-Voll-App M0 (iCloud-Hybrid, Plan: banyaro-ios/PLAN_VOLLAPP.md).
Originale liegen in der privaten CloudKit-DB des Nutzers (die App lädt sie dorthin),
der Server speichert nur Previews. Die Registry ist die Quelle der Wahrheit über
Existenz und Speicherort jedes Mediums; bei storage='icloud' sieht der Server die
Original-Bytes nie (Apple erlaubt Server-to-Server-Zugriff nur auf Public-DBs).
Zwei Kontext-Modi:
- diary: Phantom-Original-URL (Datei existiert nicht) + echtes _preview.webp bzw.
_thumb.jpg daneben. Original-Request 404 + {"storage":"icloud"}
(serve_media in main.py), Web fällt via onerror aufs Preview zurück.
- route: Das 800px-Preview wird ALS Datei unter der foto_url gespeichert
Routen-Fotos haben keine Preview-Konvention, Web bleibt unverändert.
Geteilte Inhalte (GassiTreffen-Fotos, Forum, Challenges) bleiben bewusst komplett
auf dem Server fremde Nutzer können nicht in eine private iCloud schauen.
Quota-Fallback: Ist die iCloud des Nutzers voll (CKError.quotaExceeded), lädt die
App das Original klassisch über POST /api/media/{id}/original nach storage='server'.
"""
import asyncio
import json
import os
import uuid
from fastapi import APIRouter, Depends, File, Form, HTTPException, UploadFile
from pydantic import BaseModel
from auth import get_current_user
from database import db
from media_utils import (
convert_media, generate_preview, preview_url_from, safe_media_path, validate_upload,
)
router = APIRouter()
MEDIA_DIR = os.getenv("MEDIA_DIR", "/data/media")
_PREVIEW_MAX_BYTES = 2 * 1024 * 1024 # Previews/Poster sind ≤800px-JPEGs
_CONTEXTS = {"diary", "route"}
_SUBDIR = {"diary": "diary", "route": "routes"}
def _own_diary_entry(conn, entry_id: int, user_id: int):
"""Eintrag gehört dem User (direkt über dog_id oder via diary_dogs)."""
return conn.execute(
"""SELECT d.id FROM diary d
WHERE d.id=? AND (
d.dog_id IN (SELECT id FROM dogs WHERE user_id=?)
OR EXISTS (SELECT 1 FROM diary_dogs dd
JOIN dogs g ON g.id = dd.dog_id
WHERE dd.diary_id = d.id AND g.user_id=?))""",
(entry_id, user_id, user_id)
).fetchone()
def _own_registry_row(conn, media_id: int, user_id: int):
row = conn.execute(
"SELECT * FROM media_registry WHERE id=? AND user_id=?", (media_id, user_id)
).fetchone()
if not row:
raise HTTPException(404, "Medium nicht gefunden.")
return row
def _write_bytes(path: str, data: bytes) -> None:
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "wb") as f:
f.write(data)
@router.post("/register", status_code=201)
async def register_media(
context: str = Form(...),
context_id: int = Form(...),
kind: str = Form("image"),
img_width: int | None = Form(None),
img_height: int | None = Form(None),
gps_lat: float | None = Form(None),
gps_lon: float | None = Form(None),
bytes_original: int | None = Form(None),
preview: UploadFile = File(...),
user=Depends(get_current_user),
):
"""Registriert ein iCloud-Medium: Metadaten + Preview zum Server, Original → CloudKit."""
if context not in _CONTEXTS:
raise HTTPException(422, "Unbekannter Kontext.")
if kind not in ("image", "video"):
raise HTTPException(422, "kind muss image oder video sein.")
if context == "route" and kind != "image":
raise HTTPException(422, "Routen-Fotos sind Bilder.")
raw = await preview.read()
if len(raw) > _PREVIEW_MAX_BYTES:
raise HTTPException(413, "Preview zu groß (max 2 MB).")
try:
validate_upload(raw, preview.filename or "preview.jpg")
except ValueError as e:
raise HTTPException(415, str(e))
prev_ext = os.path.splitext(preview.filename or "")[1].lower() or ".jpg"
with db() as conn:
if context == "diary":
if not _own_diary_entry(conn, context_id, user["id"]):
raise HTTPException(404, "Eintrag nicht gefunden.")
else:
row = conn.execute(
"SELECT user_id FROM routes WHERE id=?", (context_id,)
).fetchone()
if not row:
raise HTTPException(404, "Route nicht gefunden.")
if row["user_id"] != user["id"]:
raise HTTPException(403, "Nicht berechtigt.")
orig_ext = ".jpg" if kind == "image" else ".mp4"
filename = f"{context}_{context_id}_{uuid.uuid4().hex[:8]}{orig_ext}"
subdir = _SUBDIR[context]
url = f"/media/{subdir}/{filename}"
disk = os.path.join(MEDIA_DIR, subdir, filename)
base = os.path.splitext(disk)[0]
loop = asyncio.get_event_loop()
if context == "diary":
if kind == "image":
webp = await loop.run_in_executor(None, lambda: generate_preview(raw, prev_ext))
if not webp:
raise HTTPException(415, "Preview konnte nicht verarbeitet werden.")
await loop.run_in_executor(None, lambda: _write_bytes(base + "_preview.webp", webp))
else:
# Poster-Frame des Videos — gleiche Konvention wie extract_video_thumb()
await loop.run_in_executor(None, lambda: _write_bytes(base + "_thumb.jpg", raw))
else:
# route: das Preview wird die Datei selbst (Web kennt dort keine Previews)
await loop.run_in_executor(None, lambda: _write_bytes(disk, raw))
with db() as conn:
cur = conn.execute(
"INSERT INTO media_registry (user_id, url, storage, ck_state, kind, context, context_id, bytes_original) "
"VALUES (?,?,?,?,?,?,?,?)",
(user["id"], url, "icloud", "pending", kind, context, context_id, bytes_original)
)
rid = cur.lastrowid
ck_name = f"media-{rid}"
conn.execute("UPDATE media_registry SET ck_record_name=? WHERE id=?", (ck_name, rid))
if context == "diary":
max_order = conn.execute(
"SELECT COALESCE(MAX(sort_order), -1) FROM diary_media WHERE diary_id=?",
(context_id,)
).fetchone()[0]
# Erstes Item eines Eintrags wird automatisch Cover (wie diary.upload_media)
is_cover = 1 if max_order == -1 else 0
cur2 = conn.execute(
"INSERT INTO diary_media (diary_id, url, media_type, sort_order, is_cover, img_width, img_height) "
"VALUES (?,?,?,?,?,?,?)",
(context_id, url, kind, max_order + 1, is_cover, img_width, img_height)
)
dm_id = cur2.lastrowid
# GPS vom Gerät — wie der EXIF-Pfad in diary.upload_media, aber ohne
# Wetter/POI-Nachladen (iOS-Einträge bringen i.d.R. Track-GPS mit).
if gps_lat is not None and gps_lon is not None:
existing = conn.execute(
"SELECT gps_lat FROM diary WHERE id=?", (context_id,)
).fetchone()
if existing and existing["gps_lat"] is None:
conn.execute(
"UPDATE diary SET gps_lat=?, gps_lon=? WHERE id=?",
(gps_lat, gps_lon, context_id)
)
return {"id": dm_id, "url": url, "preview_url": preview_url_from(url),
"media_type": kind, "sort_order": max_order + 1,
"is_cover": is_cover, "media_id": rid, "ck_record_name": ck_name}
# route: foto_urls-Append wie routen.add_route_photo
row = conn.execute("SELECT foto_urls FROM routes WHERE id=?", (context_id,)).fetchone()
urls = json.loads(dict(row)["foto_urls"] or "[]")
urls.append(url)
conn.execute("UPDATE routes SET foto_urls=? WHERE id=?", (json.dumps(urls), context_id))
return {"foto_url": url, "foto_urls": urls, "media_id": rid, "ck_record_name": ck_name}
@router.get("/mine")
async def my_media(user=Depends(get_current_user)):
"""Sync-Grundlage der App: sie gleicht ihre CloudKit-Zone gegen diese Liste ab
und löscht verwaiste CKRecords (Web-Deletes kennt CloudKit sonst nicht)."""
with db() as conn:
rows = conn.execute(
"SELECT id, url, storage, ck_record_name, ck_state, kind, context, context_id, created_at "
"FROM media_registry WHERE user_id=? ORDER BY id", (user["id"],)
).fetchall()
return [dict(r) for r in rows]
class ConfirmBody(BaseModel):
ck_record_name: str | None = None
@router.patch("/{media_id}")
async def confirm_media(media_id: int, body: ConfirmBody, user=Depends(get_current_user)):
"""Bestätigt den erfolgreichen CloudKit-Upload des Originals."""
with db() as conn:
row = _own_registry_row(conn, media_id, user["id"])
ck_name = body.ck_record_name or row["ck_record_name"]
conn.execute(
"UPDATE media_registry SET ck_state='confirmed', ck_record_name=? WHERE id=?",
(ck_name, media_id)
)
return {"id": media_id, "storage": "icloud", "ck_state": "confirmed",
"ck_record_name": ck_name}
@router.post("/{media_id}/original", status_code=201)
async def upload_original(media_id: int, file: UploadFile = File(...),
user=Depends(get_current_user)):
"""Quota-Fallback: iCloud des Nutzers voll → Original klassisch zum Server.
Schreibt die Datei an die registrierte URL (diary: füllt die Phantom-URL,
route: ersetzt das Preview durch das Original)."""
with db() as conn:
row = _own_registry_row(conn, media_id, user["id"])
raw = await file.read()
try:
validate_upload(raw, file.filename or "")
except ValueError as e:
raise HTTPException(415, str(e))
loop = asyncio.get_event_loop()
raw, _ext = await loop.run_in_executor(
None, lambda: convert_media(raw, file.filename or "")
)
disk = safe_media_path(MEDIA_DIR, row["url"])
if not disk:
raise HTTPException(400, "Ungültiger Medienpfad.")
await loop.run_in_executor(None, lambda: _write_bytes(disk, raw))
with db() as conn:
conn.execute(
"UPDATE media_registry SET storage='server', ck_state='none' WHERE id=?",
(media_id,)
)
return {"id": media_id, "storage": "server", "url": row["url"]}

View file

@ -480,6 +480,12 @@ async def delete_route(route_id: int, user=Depends(get_current_user)):
raise HTTPException(404, "Route nicht gefunden.")
if row['user_id'] != user['id']:
raise HTTPException(403, "Nicht berechtigt.")
# iCloud-Hybrid: Registry-Rows der Routen-Fotos mitlöschen — der nächste
# App-Sync (GET /api/media/mine) räumt dann die verwaisten CKRecords ab.
conn.execute(
"DELETE FROM media_registry WHERE context='route' AND context_id=?",
(route_id,)
)
conn.execute("DELETE FROM routes WHERE id = ?", (route_id,))

View file

@ -312,6 +312,19 @@
schreibt die App abgeschlossene Touren als „Walking"-Workout inkl. Route in Apple
Health. Es werden <strong>keine</strong> Gesundheitsdaten aus Apple Health gelesen.
Diese Daten verbleiben auf deinem Gerät bzw. in deiner iCloud.</li>
<li><strong>iCloud-Fotospeicher (CloudKit):</strong> Fotos, die du in der App zu
Tagebuch oder Touren hinzufügst, speichert die App in voller Auflösung in
<strong>deiner privaten iCloud</strong> (Apple CloudKit); sie zählen zu deinem
iCloud-Speicherplatz. Unser Server erhält und speichert nur eine verkleinerte
Vorschau (max. 800&nbsp;px), damit deine Inhalte auch im Web sichtbar sind — auf
die Originale in deiner iCloud können wir <strong>nicht</strong> zugreifen.
Ist kein iCloud-Konto angemeldet oder dein iCloud-Speicher voll, wird stattdessen
wie bisher eine komprimierte Fassung auf unserem Server gespeichert. Löschst du
ein Medium in der App, wird auch der iCloud-Eintrag entfernt; löschst du es im
Web, räumt die App den iCloud-Eintrag bei der nächsten Synchronisierung auf.
Nach einer Account-Löschung entfernen wir Vorschauen und Verweise auf unserem
Server — die Originale in deiner iCloud bleiben unter deiner alleinigen Kontrolle
(Einstellungen&nbsp;→ iCloud&nbsp;→ Speicher verwalten&nbsp;→ Ban&nbsp;Yaro).</li>
<li><strong>Apple Maps:</strong> Zur Navigation (z. B. zu Gassi-Treffen) kann auf deinen
Wunsch Apple Maps geöffnet werden.</li>
<li><strong>GPX-Import:</strong> Aus anderen Apps geteilte GPX-Dateien werden lokal auf
@ -495,7 +508,7 @@
</section>
<p style="font-size:var(--text-xs);color:var(--c-text-muted);margin:0">
Stand: Juni 2026 · Version 4
Stand: Juni 2026 · Version 5
</p>
</div>

View file

@ -0,0 +1,227 @@
"""Tests fuer die Media-Registry (iCloud-Hybrid, iOS-Voll-App M0).
Architektur: Originale liegen in der privaten CloudKit-DB des Nutzers, der Server
haelt nur Previews. diary = Phantom-URL + _preview.webp + Marker-404;
route = Preview wird als Datei selbst gespeichert. Siehe backend/routes/media.py.
"""
import io
import os
import secrets
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
def _jpeg(px: int = 900) -> bytes:
from PIL import Image
buf = io.BytesIO()
Image.new("RGB", (px, px), (200, 120, 40)).save(buf, format="JPEG", quality=85)
return buf.getvalue()
def _entry(client, user, dog, titel="Medien-Eintrag") -> int:
r = client.post(
f"/api/dogs/{dog['id']}/diary",
headers=user["headers"],
json={"titel": titel, "text": "."},
)
assert r.status_code == 201, r.text
return r.json()["id"]
def _register(client, user, context, context_id, **extra):
data = {"context": context, "context_id": str(context_id), "kind": "image",
"img_width": "4032", "img_height": "3024", **extra}
return client.post(
"/api/media/register",
headers=user["headers"],
data=data,
files={"preview": ("preview.jpg", _jpeg(), "image/jpeg")},
)
def _media_dir() -> str:
return os.environ["MEDIA_DIR"]
def _disk_path(url: str) -> str:
return os.path.join(_media_dir(), url.removeprefix("/media/"))
# ------------------------------------------------------------------
# diary: Phantom-URL + Preview
# ------------------------------------------------------------------
def test_register_diary_creates_preview_only(client, user, dog):
"""register legt _preview.webp an, aber KEIN Original (das geht nach iCloud)."""
eid = _entry(client, user, dog)
r = _register(client, user, "diary", eid)
assert r.status_code == 201, r.text
j = r.json()
assert j["url"].startswith("/media/diary/")
assert j["preview_url"] == os.path.splitext(j["url"])[0] + "_preview.webp"
assert j["ck_record_name"] == f"media-{j['media_id']}"
assert j["is_cover"] == 1 # erstes Medium wird Cover
assert not os.path.exists(_disk_path(j["url"])) # Phantom
assert os.path.exists(_disk_path(j["preview_url"])) # echtes Preview
# Eintrag listet das Medium mit Preview-URL (Web-Anzeige unveraendert)
r2 = client.get(f"/api/dogs/{dog['id']}/diary/{eid}", headers=user["headers"])
items = r2.json()["media_items"]
assert len(items) == 1 and items[0]["url"] == j["url"]
def test_original_404_carries_icloud_marker(client, user, dog):
"""GET auf die Phantom-URL -> 404 mit storage:'icloud'; Preview liefert 200."""
eid = _entry(client, user, dog)
j = _register(client, user, "diary", eid).json()
cookies = {"by_token": user["token"]} # /media/diary/ verlangt Login-Cookie
r = client.get(j["url"], cookies=cookies)
assert r.status_code == 404
assert r.json().get("storage") == "icloud"
r2 = client.get(j["preview_url"], cookies=cookies)
assert r2.status_code == 200
assert r2.headers["content-type"].startswith("image/webp")
def test_confirm_and_mine(client, user, dog):
"""PATCH bestaetigt den CloudKit-Upload; /mine ist die Sync-Wahrheit der App."""
eid = _entry(client, user, dog)
j = _register(client, user, "diary", eid).json()
r = client.patch(f"/api/media/{j['media_id']}", headers=user["headers"], json={})
assert r.status_code == 200
assert r.json()["ck_state"] == "confirmed"
mine = client.get("/api/media/mine", headers=user["headers"]).json()
assert [m for m in mine if m["id"] == j["media_id"]][0]["ck_state"] == "confirmed"
def test_quota_fallback_uploads_original(client, user, dog):
"""iCloud voll -> Original klassisch zum Server, danach liefert die URL 200."""
eid = _entry(client, user, dog)
j = _register(client, user, "diary", eid).json()
r = client.post(
f"/api/media/{j['media_id']}/original",
headers=user["headers"],
files={"file": ("original.jpg", _jpeg(2000), "image/jpeg")},
)
assert r.status_code == 201, r.text
assert r.json()["storage"] == "server"
r2 = client.get(j["url"], cookies={"by_token": user["token"]})
assert r2.status_code == 200
mine = client.get("/api/media/mine", headers=user["headers"]).json()
assert [m for m in mine if m["id"] == j["media_id"]][0]["storage"] == "server"
# ------------------------------------------------------------------
# Ownership
# ------------------------------------------------------------------
def _second_user(client) -> dict:
email = f"fremd-{secrets.token_hex(4)}@example.com"
pw = "TestPass123!"
r = client.post("/api/auth/register",
json={"email": email, "password": pw,
"name": f"fremd{secrets.token_hex(3)}"})
assert r.status_code == 200, r.text
from database import db
with db() as conn:
conn.execute("UPDATE users SET email_verified=1 WHERE email=?", (email,))
token = client.post("/api/auth/login",
json={"email": email, "password": pw}).json()["token"]
return {"token": token, "headers": {"Authorization": f"Bearer {token}"}}
def test_foreign_user_blocked(client, user, dog):
"""Fremder User kann weder registrieren noch bestaetigen noch fallbacken."""
eid = _entry(client, user, dog)
j = _register(client, user, "diary", eid).json()
fremd = _second_user(client)
assert _register(client, fremd, "diary", eid).status_code == 404
assert client.patch(f"/api/media/{j['media_id']}",
headers=fremd["headers"], json={}).status_code == 404
r = client.post(f"/api/media/{j['media_id']}/original",
headers=fremd["headers"],
files={"file": ("x.jpg", _jpeg(100), "image/jpeg")})
assert r.status_code == 404
# ------------------------------------------------------------------
# Loeschen raeumt Registry + Dateien
# ------------------------------------------------------------------
def test_delete_media_item_cleans_registry_and_preview(client, user, dog):
eid = _entry(client, user, dog)
j = _register(client, user, "diary", eid).json()
prev_path = _disk_path(j["preview_url"])
assert os.path.exists(prev_path)
r = client.delete(f"/api/dogs/{dog['id']}/diary/{eid}/media/{j['id']}",
headers=user["headers"])
assert r.status_code == 204
assert not os.path.exists(prev_path)
mine = client.get("/api/media/mine", headers=user["headers"]).json()
assert not [m for m in mine if m["id"] == j["media_id"]]
def test_entry_delete_cleans_registry(client, user, dog):
eid = _entry(client, user, dog)
j = _register(client, user, "diary", eid).json()
r = client.delete(f"/api/dogs/{dog['id']}/diary/{eid}", headers=user["headers"])
assert r.status_code == 204
mine = client.get("/api/media/mine", headers=user["headers"]).json()
assert not [m for m in mine if m["id"] == j["media_id"]]
# ------------------------------------------------------------------
# route: Preview wird die Datei selbst
# ------------------------------------------------------------------
def _route(client, user) -> int:
r = client.post("/api/routes", headers=user["headers"], json={
"name": "Testrunde",
"gps_track": [{"lat": 48.10, "lon": 11.50}, {"lat": 48.11, "lon": 11.51}],
"distanz_km": 1.2, "dauer_min": 20,
})
assert r.status_code == 201, r.text
return r.json()["id"]
def test_register_route_stores_preview_as_file(client, user, dog):
rid = _route(client, user)
r = _register(client, user, "route", rid)
assert r.status_code == 201, r.text
j = r.json()
assert j["foto_url"].startswith("/media/routes/")
assert j["foto_urls"] == [j["foto_url"]]
assert os.path.exists(_disk_path(j["foto_url"])) # Datei existiert (= Preview)
r2 = client.get(j["foto_url"]) # routes/ ist nicht auth-pflichtig
assert r2.status_code == 200
# Route-Delete raeumt die Registry-Row ab (App-Sync loescht dann den CKRecord)
assert client.delete(f"/api/routes/{rid}",
headers=user["headers"]).status_code == 204
mine = client.get("/api/media/mine", headers=user["headers"]).json()
assert not [m for m in mine if m["id"] == j["media_id"]]
def test_register_validates_context_and_kind(client, user, dog):
eid = _entry(client, user, dog)
r = client.post("/api/media/register", headers=user["headers"],
data={"context": "forum", "context_id": "1", "kind": "image"},
files={"preview": ("p.jpg", _jpeg(100), "image/jpeg")})
assert r.status_code == 422
r2 = client.post("/api/media/register", headers=user["headers"],
data={"context": "diary", "context_id": str(eid), "kind": "egal"},
files={"preview": ("p.jpg", _jpeg(100), "image/jpeg")})
assert r2.status_code == 422