import io import os import subprocess import tempfile from typing import Tuple _HEIC_EXTS = {".heic", ".heif"} _VIDEO_EXTS = {".mov", ".avi", ".m4v"} # Magic-Byte-Signaturen erlaubter Medientypen _IMAGE_MAGIC = [ b'\xff\xd8\xff', # JPEG b'\x89PNG\r\n', # PNG b'GIF87a', # GIF87 b'GIF89a', # GIF89 ] _VIDEO_MAGIC = [ b'\x1a\x45\xdf\xa3', # WebM / MKV ] def validate_upload(data: bytes, filename: str) -> None: """ Prüft Magic Bytes des Upload-Inhalts gegen die erwartete Dateitype. Wirft ValueError bei Mismatch. Bilder (JPEG/PNG/GIF) und Videos (MP4/WebM/MOV) werden geprüft. HEIC/HEIF und reine Text-/JSON-Dateien werden übersprungen (Pillow/FFmpeg prüfen selbst). """ ext = os.path.splitext(filename or "")[1].lower() if not data: raise ValueError("Leere Datei.") if ext in (".jpg", ".jpeg"): if not data[:3] == b'\xff\xd8\xff': raise ValueError("Datei ist kein gültiges JPEG.") elif ext == ".png": if not data[:6] == b'\x89PNG\r\n': raise ValueError("Datei ist kein gültiges PNG.") elif ext in (".gif",): if not (data[:6] in (b'GIF87a', b'GIF89a')): raise ValueError("Datei ist kein gültiges GIF.") elif ext in (".webp",): if not (data[:4] == b'RIFF' and data[8:12] == b'WEBP'): raise ValueError("Datei ist kein gültiges WebP.") elif ext in (".mp4",): # MP4: 4-Byte-Größe gefolgt von 'ftyp' oder 'mdat' if not (len(data) >= 8 and data[4:8] in (b'ftyp', b'mdat', b'moov', b'free')): raise ValueError("Datei ist kein gültiges MP4.") elif ext in (".webm",): if not data[:4] == b'\x1a\x45\xdf\xa3': raise ValueError("Datei ist kein gültiges WebM.") # HEIC, MOV, AVI, M4V: Pillow/FFmpeg prüfen beim Konvertieren def safe_media_path(media_dir: str, url: str) -> str | None: """ Konstruiert einen sicheren Dateipfad aus einer gespeicherten URL. Gibt None zurück wenn der Pfad außerhalb von media_dir liegt (Path-Traversal-Schutz). """ relative = url.lstrip("/media/").lstrip("/") candidate = os.path.realpath(os.path.join(media_dir, relative)) real_base = os.path.realpath(media_dir) if not candidate.startswith(real_base + os.sep) and candidate != real_base: return None return candidate def to_jpeg_if_heic(data: bytes, filename: str) -> Tuple[bytes, str]: """Convert HEIC/HEIF to JPEG; return (data, ext) unchanged for all other types.""" ext = os.path.splitext(filename or "")[1].lower() if ext not in _HEIC_EXTS: return data, ext or ".jpg" try: import pillow_heif pillow_heif.register_heif_opener() from PIL import Image, ImageOps img = Image.open(io.BytesIO(data)) img = ImageOps.exif_transpose(img) img = img.convert("RGB") buf = io.BytesIO() img.save(buf, format="JPEG", quality=90) return buf.getvalue(), ".jpg" except Exception: return data, ext def to_mp4_if_needed(data: bytes, filename: str) -> Tuple[bytes, str]: """Convert MOV/AVI/M4V to H.264 MP4; return unchanged for MP4/WebM.""" ext = os.path.splitext(filename or "")[1].lower() if ext not in _VIDEO_EXTS: return data, ext or ".mp4" src_path = dst_path = None try: with tempfile.NamedTemporaryFile(suffix=ext, delete=False) as src: src.write(data) src_path = src.name dst_path = src_path[: -len(ext)] + ".mp4" result = subprocess.run( ["ffmpeg", "-i", src_path, "-c:v", "libx264", "-preset", "fast", "-crf", "23", "-c:a", "aac", "-movflags", "+faststart", "-y", dst_path], capture_output=True, timeout=300, ) if result.returncode == 0: with open(dst_path, "rb") as f: return f.read(), ".mp4" return data, ext except Exception: return data, ext finally: for p in [src_path, dst_path]: if p: try: os.unlink(p) except OSError: pass def convert_media(data: bytes, filename: str) -> Tuple[bytes, str]: """Convert HEIC→JPEG and MOV/AVI/M4V→MP4; pass everything else through.""" ext = os.path.splitext(filename or "")[1].lower() if ext in _HEIC_EXTS: return to_jpeg_if_heic(data, filename) if ext in _VIDEO_EXTS: return to_mp4_if_needed(data, filename) return data, ext or ".bin" def extract_video_thumb(video_path: str) -> str | None: """Extract a frame at ~1s from video_path and save as _thumb.jpg. Returns the thumb path on success, None on failure.""" base, _ = os.path.splitext(video_path) thumb_path = base + "_thumb.jpg" try: result = subprocess.run( ["ffmpeg", "-i", video_path, "-ss", "1", "-vframes", "1", "-q:v", "3", "-vf", "scale=480:-1", "-y", thumb_path], capture_output=True, timeout=30, ) if result.returncode == 0 and os.path.exists(thumb_path): return thumb_path # Fallback: first frame (for videos shorter than 1s) result2 = subprocess.run( ["ffmpeg", "-i", video_path, "-vframes", "1", "-q:v", "3", "-vf", "scale=480:-1", "-y", thumb_path], capture_output=True, timeout=30, ) return thumb_path if result2.returncode == 0 and os.path.exists(thumb_path) else None except Exception: return None