import io import os import subprocess import tempfile from typing import Tuple _HEIC_EXTS = {".heic", ".heif"} _VIDEO_EXTS = {".mov", ".avi", ".m4v"} # Audio-Endungen, die bereits AAC in einem MP4-Container sind (iOS-Recorder # liefert audio/mp4) — keine Transkodierung nötig. _AUDIO_AAC_EXTS = {".m4a", ".aac", ".mp4"} # Magic-Byte-Signaturen erlaubter Medientypen _IMAGE_MAGIC = [ b'\xff\xd8\xff', # JPEG b'\x89PNG\r\n', # PNG b'GIF87a', # GIF87 b'GIF89a', # GIF89 ] _VIDEO_MAGIC = [ b'\x1a\x45\xdf\xa3', # WebM / MKV ] def validate_upload(data: bytes, filename: str) -> None: """ Prüft Magic Bytes des Upload-Inhalts gegen die erwartete Dateitype. Wirft ValueError bei Mismatch. Bilder (JPEG/PNG/GIF) und Videos (MP4/WebM/MOV) werden geprüft. HEIC/HEIF und reine Text-/JSON-Dateien werden übersprungen (Pillow/FFmpeg prüfen selbst). """ ext = os.path.splitext(filename or "")[1].lower() if not data: raise ValueError("Leere Datei.") if ext in (".jpg", ".jpeg"): if not data[:3] == b'\xff\xd8\xff': raise ValueError("Datei ist kein gültiges JPEG.") elif ext == ".png": if not data[:6] == b'\x89PNG\r\n': raise ValueError("Datei ist kein gültiges PNG.") elif ext in (".gif",): if not (data[:6] in (b'GIF87a', b'GIF89a')): raise ValueError("Datei ist kein gültiges GIF.") elif ext in (".webp",): if not (data[:4] == b'RIFF' and data[8:12] == b'WEBP'): raise ValueError("Datei ist kein gültiges WebP.") elif ext in (".mp4",): # MP4: 4-Byte-Größe gefolgt von 'ftyp' oder 'mdat' if not (len(data) >= 8 and data[4:8] in (b'ftyp', b'mdat', b'moov', b'free')): raise ValueError("Datei ist kein gültiges MP4.") elif ext in (".webm",): if not data[:4] == b'\x1a\x45\xdf\xa3': raise ValueError("Datei ist kein gültiges WebM.") # HEIC, MOV, AVI, M4V: Pillow/FFmpeg prüfen beim Konvertieren def validate_audio(data: bytes, content_type: str) -> None: """ Prüft Magic Bytes einer Audio-Datei gegen den vom Client gemeldeten content_type. Wirft ValueError bei Mismatch. WICHTIG: Audio-WebM und Video-WebM teilen sich dieselbe Magic-Byte-Signatur (Matroska) — die Unterscheidung ist NUR über den content_type möglich, deshalb diese eigene Funktion (validate_upload hat keinen). """ if not data: raise ValueError("Leere Audiodatei.") ct = (content_type or "").lower().split(";")[0].strip() if ct in ("audio/mp4", "audio/aac", "audio/x-m4a", "audio/m4a"): if not (len(data) >= 8 and data[4:8] in (b'ftyp', b'mdat', b'moov', b'free')): raise ValueError("Datei ist kein gültiges MP4/AAC-Audio.") elif ct == "audio/webm": if not data[:4] == b'\x1a\x45\xdf\xa3': raise ValueError("Datei ist kein gültiges WebM-Audio.") elif ct in ("audio/ogg", "audio/opus"): if not data[:4] == b'OggS': raise ValueError("Datei ist kein gültiges Ogg-Audio.") elif ct == "audio/mpeg": if not (data[:3] == b'ID3' or (len(data) >= 2 and data[0] == 0xFF and (data[1] & 0xE0) == 0xE0)): raise ValueError("Datei ist kein gültiges MP3.") elif ct in ("audio/wav", "audio/x-wav", "audio/wave"): if not (data[:4] == b'RIFF' and data[8:12] == b'WAVE'): raise ValueError("Datei ist kein gültiges WAV.") # Andere/unbekannte Audio-Typen: ffmpeg prüft beim Transkodieren. def safe_media_path(media_dir: str, url: str) -> str | None: """ Konstruiert einen sicheren Dateipfad aus einer gespeicherten URL. Gibt None zurück wenn der Pfad außerhalb von media_dir liegt (Path-Traversal-Schutz). """ if url.startswith("/media/"): relative = url[len("/media/"):] elif url.startswith("/"): relative = url[1:] else: relative = url candidate = os.path.realpath(os.path.join(media_dir, relative)) real_base = os.path.realpath(media_dir) if not candidate.startswith(real_base + os.sep) and candidate != real_base: return None return candidate def delete_media_files(media_dir: str, urls) -> None: """Löscht mehrere Mediendateien samt _preview.webp/_thumb.jpg-Leichen von Disk (best-effort, Path-Traversal-sicher). Für Cascade-Cleanup bei Lösch-Operationen.""" for url in urls or []: fp = safe_media_path(media_dir, url) if not fp: continue base = os.path.splitext(fp)[0] for path in (fp, base + "_preview.webp", base + "_thumb.jpg"): try: os.remove(path) except OSError: pass def to_jpeg_if_heic(data: bytes, filename: str) -> Tuple[bytes, str]: """Convert HEIC/HEIF to JPEG; return (data, ext) unchanged for all other types.""" ext = os.path.splitext(filename or "")[1].lower() if ext not in _HEIC_EXTS: return data, ext or ".jpg" try: import pillow_heif pillow_heif.register_heif_opener() from PIL import Image, ImageOps img = Image.open(io.BytesIO(data)) img = ImageOps.exif_transpose(img) img = img.convert("RGB") buf = io.BytesIO() img.save(buf, format="JPEG", quality=90) return buf.getvalue(), ".jpg" except Exception: return data, ext def to_mp4_if_needed(data: bytes, filename: str) -> Tuple[bytes, str]: """Convert MOV/AVI/M4V to H.264 MP4; return unchanged for MP4/WebM.""" ext = os.path.splitext(filename or "")[1].lower() if ext not in _VIDEO_EXTS: return data, ext or ".mp4" src_path = dst_path = None try: with tempfile.NamedTemporaryFile(suffix=ext, delete=False) as src: src.write(data) src_path = src.name dst_path = src_path[: -len(ext)] + ".mp4" result = subprocess.run( ["ffmpeg", "-i", src_path, "-c:v", "libx264", "-preset", "fast", "-crf", "23", "-c:a", "aac", "-movflags", "+faststart", "-y", dst_path], capture_output=True, timeout=300, ) if result.returncode == 0: with open(dst_path, "rb") as f: return f.read(), ".mp4" return data, ext except Exception: return data, ext finally: for p in [src_path, dst_path]: if p: try: os.unlink(p) except OSError: pass def to_m4a(data: bytes, src_ext: str) -> Tuple[bytes, str]: """Transkodiert Audio nach m4a/AAC — universell abspielbar, auch auf iOS. Nötig, weil Chrome/Firefox Opus-in-WebM/Ogg aufnehmen, das iOS Safari NICHT abspielen kann. Bereits-AAC-Container (.m4a/.aac/.mp4, u.a. iOS-Recorder) werden ohne Transkodierung durchgereicht. Bei ffmpeg-Fehler: Original zurück.""" ext = (src_ext or "").lower() if not ext.startswith("."): ext = ("." + ext) if ext else ".webm" if ext in _AUDIO_AAC_EXTS: return data, ".m4a" src_path = dst_path = None try: with tempfile.NamedTemporaryFile(suffix=ext, delete=False) as src: src.write(data) src_path = src.name dst_path = src_path[: -len(ext)] + ".m4a" result = subprocess.run( ["ffmpeg", "-i", src_path, "-vn", "-c:a", "aac", "-b:a", "128k", "-movflags", "+faststart", "-y", dst_path], capture_output=True, timeout=120, ) if result.returncode == 0: with open(dst_path, "rb") as f: return f.read(), ".m4a" return data, ext except Exception: return data, ext finally: for p in [src_path, dst_path]: if p: try: os.unlink(p) except OSError: pass def extract_gps_from_exif(data: bytes) -> tuple | None: """EXIF-GPS aus Bilddaten lesen. Gibt (lat, lon) zurück oder None.""" try: from PIL import Image img = Image.open(io.BytesIO(data)) exif = img._getexif() if not exif: return None gps = exif.get(34853) # GPSInfo tag if not gps: return None lat_dms = gps.get(2) lon_dms = gps.get(4) lat_ref = gps.get(1, 'N') lon_ref = gps.get(3, 'E') if not lat_dms or not lon_dms: return None def dms(v): return float(v[0]) + float(v[1]) / 60 + float(v[2]) / 3600 lat = dms(lat_dms) * (-1 if lat_ref == 'S' else 1) lon = dms(lon_dms) * (-1 if lon_ref == 'W' else 1) if not (-90 <= lat <= 90 and -180 <= lon <= 180): return None return round(lat, 6), round(lon, 6) except Exception: return None def convert_media(data: bytes, filename: str) -> Tuple[bytes, str]: """Convert HEIC→JPEG and MOV/AVI/M4V→MP4; pass everything else through.""" ext = os.path.splitext(filename or "")[1].lower() if ext in _HEIC_EXTS: return to_jpeg_if_heic(data, filename) if ext in _VIDEO_EXTS: return to_mp4_if_needed(data, filename) return data, ext or ".bin" _PREVIEW_EXTS = {".jpg", ".jpeg", ".png", ".webp", ".gif", ".heic", ".heif"} _PREVIEW_MAX = 800 # längste Seite in Pixeln def generate_preview(data: bytes, ext: str) -> bytes | None: """Erzeugt ein WebP-Preview (max _PREVIEW_MAX px). Gibt None zurück bei Videos/PDFs/Fehler.""" if ext.lower() not in _PREVIEW_EXTS: return None try: from PIL import Image, ImageOps img = Image.open(io.BytesIO(data)) img = ImageOps.exif_transpose(img) img = img.convert("RGB") img.thumbnail((_PREVIEW_MAX, _PREVIEW_MAX), Image.LANCZOS) buf = io.BytesIO() img.save(buf, format="WEBP", quality=80) return buf.getvalue() except Exception: return None def get_image_size(data: bytes) -> tuple[int, int] | None: """Gibt (width, height) eines Bildes zurück, oder None bei Fehler.""" try: from PIL import Image, ImageOps img = Image.open(io.BytesIO(data)) img = ImageOps.exif_transpose(img) return img.size # (width, height) except Exception: return None def preview_url_from(url: str | None) -> str | None: """Leitet die Preview-URL aus der Original-URL ab (fügt _preview vor Extension ein). Gibt None für Videos, PDFs, bereits generierte Previews und leere URLs zurück.""" if not url: return None low = url.lower() if any(low.endswith(s) for s in ( ".mp4", ".webm", ".mov", ".avi", ".pdf", "_thumb.jpg", "_preview.jpg", "_preview.webp" )): return None base, _ = os.path.splitext(url) return base + "_preview.webp" def extract_video_thumb(video_path: str) -> str | None: """Extract a frame at ~1s from video_path and save as _thumb.jpg. Returns the thumb path on success, None on failure.""" base, _ = os.path.splitext(video_path) thumb_path = base + "_thumb.jpg" try: result = subprocess.run( ["ffmpeg", "-i", video_path, "-ss", "1", "-vframes", "1", "-q:v", "3", "-vf", "scale=480:-1", "-y", thumb_path], capture_output=True, timeout=30, ) if result.returncode == 0 and os.path.exists(thumb_path): return thumb_path # Fallback: first frame (for videos shorter than 1s) result2 = subprocess.run( ["ffmpeg", "-i", video_path, "-vframes", "1", "-q:v", "3", "-vf", "scale=480:-1", "-y", thumb_path], capture_output=True, timeout=30, ) return thumb_path if result2.returncode == 0 and os.path.exists(thumb_path) else None except Exception: return None