Wiederverwendbarer UI.noteMediaAttacher für beide Notiz-Stellen (UI.noteModal
+ Notizblock-Seite). note_media-Tabelle + POST/DELETE /api/notes/{id}/media
(vor der gierigen /{parent_type}/{parent_id}-Route). Audio per MediaRecorder,
serverseitig nach m4a/AAC transkodiert (ffmpeg) — iOS spielt Chrome-Opus-webm
nicht ab. UI.lightbox global eingeführt. Mikrofon-Policy microphone=(self) +
CSP media-src 'self' blob:, Datenschutz v6. Disk-Cleanup für note_media bei
Notiz-, Account- und Admin-User-Delete. Reine Medien-Notiz ohne Text erlaubt.
noteModal-Bug gefixt: notes.get() liefert Array -> existing[0] statt
existing?.id (verhinderte Bearbeiten, erzeugte Duplikate). 12 neue Tests.
admin.py enthält außerdem KI-Vision-Statusfelder aus paralleler Arbeit
(nicht sauber trennbar ohne interaktives Staging).
320 lines
12 KiB
Python
320 lines
12 KiB
Python
import io
|
|
import os
|
|
import subprocess
|
|
import tempfile
|
|
from typing import Tuple
|
|
|
|
_HEIC_EXTS = {".heic", ".heif"}
|
|
_VIDEO_EXTS = {".mov", ".avi", ".m4v"}
|
|
# Audio-Endungen, die bereits AAC in einem MP4-Container sind (iOS-Recorder
|
|
# liefert audio/mp4) — keine Transkodierung nötig.
|
|
_AUDIO_AAC_EXTS = {".m4a", ".aac", ".mp4"}
|
|
|
|
# Magic-Byte-Signaturen erlaubter Medientypen
|
|
_IMAGE_MAGIC = [
|
|
b'\xff\xd8\xff', # JPEG
|
|
b'\x89PNG\r\n', # PNG
|
|
b'GIF87a', # GIF87
|
|
b'GIF89a', # GIF89
|
|
]
|
|
_VIDEO_MAGIC = [
|
|
b'\x1a\x45\xdf\xa3', # WebM / MKV
|
|
]
|
|
|
|
def validate_upload(data: bytes, filename: str) -> None:
|
|
"""
|
|
Prüft Magic Bytes des Upload-Inhalts gegen die erwartete Dateitype.
|
|
Wirft ValueError bei Mismatch.
|
|
Bilder (JPEG/PNG/GIF) und Videos (MP4/WebM/MOV) werden geprüft.
|
|
HEIC/HEIF und reine Text-/JSON-Dateien werden übersprungen (Pillow/FFmpeg prüfen selbst).
|
|
"""
|
|
ext = os.path.splitext(filename or "")[1].lower()
|
|
if not data:
|
|
raise ValueError("Leere Datei.")
|
|
|
|
if ext in (".jpg", ".jpeg"):
|
|
if not data[:3] == b'\xff\xd8\xff':
|
|
raise ValueError("Datei ist kein gültiges JPEG.")
|
|
elif ext == ".png":
|
|
if not data[:6] == b'\x89PNG\r\n':
|
|
raise ValueError("Datei ist kein gültiges PNG.")
|
|
elif ext in (".gif",):
|
|
if not (data[:6] in (b'GIF87a', b'GIF89a')):
|
|
raise ValueError("Datei ist kein gültiges GIF.")
|
|
elif ext in (".webp",):
|
|
if not (data[:4] == b'RIFF' and data[8:12] == b'WEBP'):
|
|
raise ValueError("Datei ist kein gültiges WebP.")
|
|
elif ext in (".mp4",):
|
|
# MP4: 4-Byte-Größe gefolgt von 'ftyp' oder 'mdat'
|
|
if not (len(data) >= 8 and data[4:8] in (b'ftyp', b'mdat', b'moov', b'free')):
|
|
raise ValueError("Datei ist kein gültiges MP4.")
|
|
elif ext in (".webm",):
|
|
if not data[:4] == b'\x1a\x45\xdf\xa3':
|
|
raise ValueError("Datei ist kein gültiges WebM.")
|
|
# HEIC, MOV, AVI, M4V: Pillow/FFmpeg prüfen beim Konvertieren
|
|
|
|
|
|
def validate_audio(data: bytes, content_type: str) -> None:
|
|
"""
|
|
Prüft Magic Bytes einer Audio-Datei gegen den vom Client gemeldeten content_type.
|
|
Wirft ValueError bei Mismatch. WICHTIG: Audio-WebM und Video-WebM teilen sich
|
|
dieselbe Magic-Byte-Signatur (Matroska) — die Unterscheidung ist NUR über den
|
|
content_type möglich, deshalb diese eigene Funktion (validate_upload hat keinen).
|
|
"""
|
|
if not data:
|
|
raise ValueError("Leere Audiodatei.")
|
|
ct = (content_type or "").lower().split(";")[0].strip()
|
|
if ct in ("audio/mp4", "audio/aac", "audio/x-m4a", "audio/m4a"):
|
|
if not (len(data) >= 8 and data[4:8] in (b'ftyp', b'mdat', b'moov', b'free')):
|
|
raise ValueError("Datei ist kein gültiges MP4/AAC-Audio.")
|
|
elif ct == "audio/webm":
|
|
if not data[:4] == b'\x1a\x45\xdf\xa3':
|
|
raise ValueError("Datei ist kein gültiges WebM-Audio.")
|
|
elif ct in ("audio/ogg", "audio/opus"):
|
|
if not data[:4] == b'OggS':
|
|
raise ValueError("Datei ist kein gültiges Ogg-Audio.")
|
|
elif ct == "audio/mpeg":
|
|
if not (data[:3] == b'ID3' or (len(data) >= 2 and data[0] == 0xFF and (data[1] & 0xE0) == 0xE0)):
|
|
raise ValueError("Datei ist kein gültiges MP3.")
|
|
elif ct in ("audio/wav", "audio/x-wav", "audio/wave"):
|
|
if not (data[:4] == b'RIFF' and data[8:12] == b'WAVE'):
|
|
raise ValueError("Datei ist kein gültiges WAV.")
|
|
# Andere/unbekannte Audio-Typen: ffmpeg prüft beim Transkodieren.
|
|
|
|
|
|
def safe_media_path(media_dir: str, url: str) -> str | None:
|
|
"""
|
|
Konstruiert einen sicheren Dateipfad aus einer gespeicherten URL.
|
|
Gibt None zurück wenn der Pfad außerhalb von media_dir liegt (Path-Traversal-Schutz).
|
|
"""
|
|
if url.startswith("/media/"):
|
|
relative = url[len("/media/"):]
|
|
elif url.startswith("/"):
|
|
relative = url[1:]
|
|
else:
|
|
relative = url
|
|
candidate = os.path.realpath(os.path.join(media_dir, relative))
|
|
real_base = os.path.realpath(media_dir)
|
|
if not candidate.startswith(real_base + os.sep) and candidate != real_base:
|
|
return None
|
|
return candidate
|
|
|
|
|
|
def delete_media_files(media_dir: str, urls) -> None:
|
|
"""Löscht mehrere Mediendateien samt _preview.webp/_thumb.jpg-Leichen von Disk
|
|
(best-effort, Path-Traversal-sicher). Für Cascade-Cleanup bei Lösch-Operationen."""
|
|
for url in urls or []:
|
|
fp = safe_media_path(media_dir, url)
|
|
if not fp:
|
|
continue
|
|
base = os.path.splitext(fp)[0]
|
|
for path in (fp, base + "_preview.webp", base + "_thumb.jpg"):
|
|
try:
|
|
os.remove(path)
|
|
except OSError:
|
|
pass
|
|
|
|
|
|
def to_jpeg_if_heic(data: bytes, filename: str) -> Tuple[bytes, str]:
|
|
"""Convert HEIC/HEIF to JPEG; return (data, ext) unchanged for all other types."""
|
|
ext = os.path.splitext(filename or "")[1].lower()
|
|
if ext not in _HEIC_EXTS:
|
|
return data, ext or ".jpg"
|
|
try:
|
|
import pillow_heif
|
|
pillow_heif.register_heif_opener()
|
|
from PIL import Image, ImageOps
|
|
img = Image.open(io.BytesIO(data))
|
|
img = ImageOps.exif_transpose(img)
|
|
img = img.convert("RGB")
|
|
buf = io.BytesIO()
|
|
img.save(buf, format="JPEG", quality=90)
|
|
return buf.getvalue(), ".jpg"
|
|
except Exception:
|
|
return data, ext
|
|
|
|
|
|
def to_mp4_if_needed(data: bytes, filename: str) -> Tuple[bytes, str]:
|
|
"""Convert MOV/AVI/M4V to H.264 MP4; return unchanged for MP4/WebM."""
|
|
ext = os.path.splitext(filename or "")[1].lower()
|
|
if ext not in _VIDEO_EXTS:
|
|
return data, ext or ".mp4"
|
|
src_path = dst_path = None
|
|
try:
|
|
with tempfile.NamedTemporaryFile(suffix=ext, delete=False) as src:
|
|
src.write(data)
|
|
src_path = src.name
|
|
dst_path = src_path[: -len(ext)] + ".mp4"
|
|
result = subprocess.run(
|
|
["ffmpeg", "-i", src_path,
|
|
"-c:v", "libx264", "-preset", "fast", "-crf", "23",
|
|
"-c:a", "aac",
|
|
"-movflags", "+faststart",
|
|
"-y", dst_path],
|
|
capture_output=True, timeout=300,
|
|
)
|
|
if result.returncode == 0:
|
|
with open(dst_path, "rb") as f:
|
|
return f.read(), ".mp4"
|
|
return data, ext
|
|
except Exception:
|
|
return data, ext
|
|
finally:
|
|
for p in [src_path, dst_path]:
|
|
if p:
|
|
try:
|
|
os.unlink(p)
|
|
except OSError:
|
|
pass
|
|
|
|
|
|
def to_m4a(data: bytes, src_ext: str) -> Tuple[bytes, str]:
|
|
"""Transkodiert Audio nach m4a/AAC — universell abspielbar, auch auf iOS.
|
|
Nötig, weil Chrome/Firefox Opus-in-WebM/Ogg aufnehmen, das iOS Safari NICHT
|
|
abspielen kann. Bereits-AAC-Container (.m4a/.aac/.mp4, u.a. iOS-Recorder)
|
|
werden ohne Transkodierung durchgereicht. Bei ffmpeg-Fehler: Original zurück."""
|
|
ext = (src_ext or "").lower()
|
|
if not ext.startswith("."):
|
|
ext = ("." + ext) if ext else ".webm"
|
|
if ext in _AUDIO_AAC_EXTS:
|
|
return data, ".m4a"
|
|
src_path = dst_path = None
|
|
try:
|
|
with tempfile.NamedTemporaryFile(suffix=ext, delete=False) as src:
|
|
src.write(data)
|
|
src_path = src.name
|
|
dst_path = src_path[: -len(ext)] + ".m4a"
|
|
result = subprocess.run(
|
|
["ffmpeg", "-i", src_path,
|
|
"-vn", "-c:a", "aac", "-b:a", "128k",
|
|
"-movflags", "+faststart",
|
|
"-y", dst_path],
|
|
capture_output=True, timeout=120,
|
|
)
|
|
if result.returncode == 0:
|
|
with open(dst_path, "rb") as f:
|
|
return f.read(), ".m4a"
|
|
return data, ext
|
|
except Exception:
|
|
return data, ext
|
|
finally:
|
|
for p in [src_path, dst_path]:
|
|
if p:
|
|
try:
|
|
os.unlink(p)
|
|
except OSError:
|
|
pass
|
|
|
|
|
|
def extract_gps_from_exif(data: bytes) -> tuple | None:
|
|
"""EXIF-GPS aus Bilddaten lesen. Gibt (lat, lon) zurück oder None."""
|
|
try:
|
|
from PIL import Image
|
|
img = Image.open(io.BytesIO(data))
|
|
exif = img._getexif()
|
|
if not exif:
|
|
return None
|
|
gps = exif.get(34853) # GPSInfo tag
|
|
if not gps:
|
|
return None
|
|
lat_dms = gps.get(2)
|
|
lon_dms = gps.get(4)
|
|
lat_ref = gps.get(1, 'N')
|
|
lon_ref = gps.get(3, 'E')
|
|
if not lat_dms or not lon_dms:
|
|
return None
|
|
|
|
def dms(v):
|
|
return float(v[0]) + float(v[1]) / 60 + float(v[2]) / 3600
|
|
|
|
lat = dms(lat_dms) * (-1 if lat_ref == 'S' else 1)
|
|
lon = dms(lon_dms) * (-1 if lon_ref == 'W' else 1)
|
|
if not (-90 <= lat <= 90 and -180 <= lon <= 180):
|
|
return None
|
|
return round(lat, 6), round(lon, 6)
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def convert_media(data: bytes, filename: str) -> Tuple[bytes, str]:
|
|
"""Convert HEIC→JPEG and MOV/AVI/M4V→MP4; pass everything else through."""
|
|
ext = os.path.splitext(filename or "")[1].lower()
|
|
if ext in _HEIC_EXTS:
|
|
return to_jpeg_if_heic(data, filename)
|
|
if ext in _VIDEO_EXTS:
|
|
return to_mp4_if_needed(data, filename)
|
|
return data, ext or ".bin"
|
|
|
|
|
|
_PREVIEW_EXTS = {".jpg", ".jpeg", ".png", ".webp", ".gif", ".heic", ".heif"}
|
|
_PREVIEW_MAX = 800 # längste Seite in Pixeln
|
|
|
|
|
|
def generate_preview(data: bytes, ext: str) -> bytes | None:
|
|
"""Erzeugt ein WebP-Preview (max _PREVIEW_MAX px). Gibt None zurück bei Videos/PDFs/Fehler."""
|
|
if ext.lower() not in _PREVIEW_EXTS:
|
|
return None
|
|
try:
|
|
from PIL import Image, ImageOps
|
|
img = Image.open(io.BytesIO(data))
|
|
img = ImageOps.exif_transpose(img)
|
|
img = img.convert("RGB")
|
|
img.thumbnail((_PREVIEW_MAX, _PREVIEW_MAX), Image.LANCZOS)
|
|
buf = io.BytesIO()
|
|
img.save(buf, format="WEBP", quality=80)
|
|
return buf.getvalue()
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def get_image_size(data: bytes) -> tuple[int, int] | None:
|
|
"""Gibt (width, height) eines Bildes zurück, oder None bei Fehler."""
|
|
try:
|
|
from PIL import Image, ImageOps
|
|
img = Image.open(io.BytesIO(data))
|
|
img = ImageOps.exif_transpose(img)
|
|
return img.size # (width, height)
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def preview_url_from(url: str | None) -> str | None:
|
|
"""Leitet die Preview-URL aus der Original-URL ab (fügt _preview vor Extension ein).
|
|
Gibt None für Videos, PDFs, bereits generierte Previews und leere URLs zurück."""
|
|
if not url:
|
|
return None
|
|
low = url.lower()
|
|
if any(low.endswith(s) for s in (
|
|
".mp4", ".webm", ".mov", ".avi", ".pdf", "_thumb.jpg", "_preview.jpg", "_preview.webp"
|
|
)):
|
|
return None
|
|
base, _ = os.path.splitext(url)
|
|
return base + "_preview.webp"
|
|
|
|
|
|
def extract_video_thumb(video_path: str) -> str | None:
|
|
"""Extract a frame at ~1s from video_path and save as <basename>_thumb.jpg.
|
|
Returns the thumb path on success, None on failure."""
|
|
base, _ = os.path.splitext(video_path)
|
|
thumb_path = base + "_thumb.jpg"
|
|
try:
|
|
result = subprocess.run(
|
|
["ffmpeg", "-i", video_path,
|
|
"-ss", "1", "-vframes", "1",
|
|
"-q:v", "3", "-vf", "scale=480:-1",
|
|
"-y", thumb_path],
|
|
capture_output=True, timeout=30,
|
|
)
|
|
if result.returncode == 0 and os.path.exists(thumb_path):
|
|
return thumb_path
|
|
# Fallback: first frame (for videos shorter than 1s)
|
|
result2 = subprocess.run(
|
|
["ffmpeg", "-i", video_path,
|
|
"-vframes", "1",
|
|
"-q:v", "3", "-vf", "scale=480:-1",
|
|
"-y", thumb_path],
|
|
capture_output=True, timeout=30,
|
|
)
|
|
return thumb_path if result2.returncode == 0 and os.path.exists(thumb_path) else None
|
|
except Exception:
|
|
return None
|