banyaro/backend/media_utils.py
rene e86d89f3d9 Notiz-Medien & Sprachnachrichten: Fotos/Videos/Dateien + Audio an Notizen
Wiederverwendbarer UI.noteMediaAttacher für beide Notiz-Stellen (UI.noteModal
+ Notizblock-Seite). note_media-Tabelle + POST/DELETE /api/notes/{id}/media
(vor der gierigen /{parent_type}/{parent_id}-Route). Audio per MediaRecorder,
serverseitig nach m4a/AAC transkodiert (ffmpeg) — iOS spielt Chrome-Opus-webm
nicht ab. UI.lightbox global eingeführt. Mikrofon-Policy microphone=(self) +
CSP media-src 'self' blob:, Datenschutz v6. Disk-Cleanup für note_media bei
Notiz-, Account- und Admin-User-Delete. Reine Medien-Notiz ohne Text erlaubt.
noteModal-Bug gefixt: notes.get() liefert Array -> existing[0] statt
existing?.id (verhinderte Bearbeiten, erzeugte Duplikate). 12 neue Tests.

admin.py enthält außerdem KI-Vision-Statusfelder aus paralleler Arbeit
(nicht sauber trennbar ohne interaktives Staging).
2026-06-14 20:22:35 +02:00

320 lines
12 KiB
Python

import io
import os
import subprocess
import tempfile
from typing import Tuple
_HEIC_EXTS = {".heic", ".heif"}
_VIDEO_EXTS = {".mov", ".avi", ".m4v"}
# Audio-Endungen, die bereits AAC in einem MP4-Container sind (iOS-Recorder
# liefert audio/mp4) — keine Transkodierung nötig.
_AUDIO_AAC_EXTS = {".m4a", ".aac", ".mp4"}
# Magic-Byte-Signaturen erlaubter Medientypen
_IMAGE_MAGIC = [
b'\xff\xd8\xff', # JPEG
b'\x89PNG\r\n', # PNG
b'GIF87a', # GIF87
b'GIF89a', # GIF89
]
_VIDEO_MAGIC = [
b'\x1a\x45\xdf\xa3', # WebM / MKV
]
def validate_upload(data: bytes, filename: str) -> None:
"""
Prüft Magic Bytes des Upload-Inhalts gegen die erwartete Dateitype.
Wirft ValueError bei Mismatch.
Bilder (JPEG/PNG/GIF) und Videos (MP4/WebM/MOV) werden geprüft.
HEIC/HEIF und reine Text-/JSON-Dateien werden übersprungen (Pillow/FFmpeg prüfen selbst).
"""
ext = os.path.splitext(filename or "")[1].lower()
if not data:
raise ValueError("Leere Datei.")
if ext in (".jpg", ".jpeg"):
if not data[:3] == b'\xff\xd8\xff':
raise ValueError("Datei ist kein gültiges JPEG.")
elif ext == ".png":
if not data[:6] == b'\x89PNG\r\n':
raise ValueError("Datei ist kein gültiges PNG.")
elif ext in (".gif",):
if not (data[:6] in (b'GIF87a', b'GIF89a')):
raise ValueError("Datei ist kein gültiges GIF.")
elif ext in (".webp",):
if not (data[:4] == b'RIFF' and data[8:12] == b'WEBP'):
raise ValueError("Datei ist kein gültiges WebP.")
elif ext in (".mp4",):
# MP4: 4-Byte-Größe gefolgt von 'ftyp' oder 'mdat'
if not (len(data) >= 8 and data[4:8] in (b'ftyp', b'mdat', b'moov', b'free')):
raise ValueError("Datei ist kein gültiges MP4.")
elif ext in (".webm",):
if not data[:4] == b'\x1a\x45\xdf\xa3':
raise ValueError("Datei ist kein gültiges WebM.")
# HEIC, MOV, AVI, M4V: Pillow/FFmpeg prüfen beim Konvertieren
def validate_audio(data: bytes, content_type: str) -> None:
"""
Prüft Magic Bytes einer Audio-Datei gegen den vom Client gemeldeten content_type.
Wirft ValueError bei Mismatch. WICHTIG: Audio-WebM und Video-WebM teilen sich
dieselbe Magic-Byte-Signatur (Matroska) — die Unterscheidung ist NUR über den
content_type möglich, deshalb diese eigene Funktion (validate_upload hat keinen).
"""
if not data:
raise ValueError("Leere Audiodatei.")
ct = (content_type or "").lower().split(";")[0].strip()
if ct in ("audio/mp4", "audio/aac", "audio/x-m4a", "audio/m4a"):
if not (len(data) >= 8 and data[4:8] in (b'ftyp', b'mdat', b'moov', b'free')):
raise ValueError("Datei ist kein gültiges MP4/AAC-Audio.")
elif ct == "audio/webm":
if not data[:4] == b'\x1a\x45\xdf\xa3':
raise ValueError("Datei ist kein gültiges WebM-Audio.")
elif ct in ("audio/ogg", "audio/opus"):
if not data[:4] == b'OggS':
raise ValueError("Datei ist kein gültiges Ogg-Audio.")
elif ct == "audio/mpeg":
if not (data[:3] == b'ID3' or (len(data) >= 2 and data[0] == 0xFF and (data[1] & 0xE0) == 0xE0)):
raise ValueError("Datei ist kein gültiges MP3.")
elif ct in ("audio/wav", "audio/x-wav", "audio/wave"):
if not (data[:4] == b'RIFF' and data[8:12] == b'WAVE'):
raise ValueError("Datei ist kein gültiges WAV.")
# Andere/unbekannte Audio-Typen: ffmpeg prüft beim Transkodieren.
def safe_media_path(media_dir: str, url: str) -> str | None:
"""
Konstruiert einen sicheren Dateipfad aus einer gespeicherten URL.
Gibt None zurück wenn der Pfad außerhalb von media_dir liegt (Path-Traversal-Schutz).
"""
if url.startswith("/media/"):
relative = url[len("/media/"):]
elif url.startswith("/"):
relative = url[1:]
else:
relative = url
candidate = os.path.realpath(os.path.join(media_dir, relative))
real_base = os.path.realpath(media_dir)
if not candidate.startswith(real_base + os.sep) and candidate != real_base:
return None
return candidate
def delete_media_files(media_dir: str, urls) -> None:
"""Löscht mehrere Mediendateien samt _preview.webp/_thumb.jpg-Leichen von Disk
(best-effort, Path-Traversal-sicher). Für Cascade-Cleanup bei Lösch-Operationen."""
for url in urls or []:
fp = safe_media_path(media_dir, url)
if not fp:
continue
base = os.path.splitext(fp)[0]
for path in (fp, base + "_preview.webp", base + "_thumb.jpg"):
try:
os.remove(path)
except OSError:
pass
def to_jpeg_if_heic(data: bytes, filename: str) -> Tuple[bytes, str]:
"""Convert HEIC/HEIF to JPEG; return (data, ext) unchanged for all other types."""
ext = os.path.splitext(filename or "")[1].lower()
if ext not in _HEIC_EXTS:
return data, ext or ".jpg"
try:
import pillow_heif
pillow_heif.register_heif_opener()
from PIL import Image, ImageOps
img = Image.open(io.BytesIO(data))
img = ImageOps.exif_transpose(img)
img = img.convert("RGB")
buf = io.BytesIO()
img.save(buf, format="JPEG", quality=90)
return buf.getvalue(), ".jpg"
except Exception:
return data, ext
def to_mp4_if_needed(data: bytes, filename: str) -> Tuple[bytes, str]:
"""Convert MOV/AVI/M4V to H.264 MP4; return unchanged for MP4/WebM."""
ext = os.path.splitext(filename or "")[1].lower()
if ext not in _VIDEO_EXTS:
return data, ext or ".mp4"
src_path = dst_path = None
try:
with tempfile.NamedTemporaryFile(suffix=ext, delete=False) as src:
src.write(data)
src_path = src.name
dst_path = src_path[: -len(ext)] + ".mp4"
result = subprocess.run(
["ffmpeg", "-i", src_path,
"-c:v", "libx264", "-preset", "fast", "-crf", "23",
"-c:a", "aac",
"-movflags", "+faststart",
"-y", dst_path],
capture_output=True, timeout=300,
)
if result.returncode == 0:
with open(dst_path, "rb") as f:
return f.read(), ".mp4"
return data, ext
except Exception:
return data, ext
finally:
for p in [src_path, dst_path]:
if p:
try:
os.unlink(p)
except OSError:
pass
def to_m4a(data: bytes, src_ext: str) -> Tuple[bytes, str]:
"""Transkodiert Audio nach m4a/AAC — universell abspielbar, auch auf iOS.
Nötig, weil Chrome/Firefox Opus-in-WebM/Ogg aufnehmen, das iOS Safari NICHT
abspielen kann. Bereits-AAC-Container (.m4a/.aac/.mp4, u.a. iOS-Recorder)
werden ohne Transkodierung durchgereicht. Bei ffmpeg-Fehler: Original zurück."""
ext = (src_ext or "").lower()
if not ext.startswith("."):
ext = ("." + ext) if ext else ".webm"
if ext in _AUDIO_AAC_EXTS:
return data, ".m4a"
src_path = dst_path = None
try:
with tempfile.NamedTemporaryFile(suffix=ext, delete=False) as src:
src.write(data)
src_path = src.name
dst_path = src_path[: -len(ext)] + ".m4a"
result = subprocess.run(
["ffmpeg", "-i", src_path,
"-vn", "-c:a", "aac", "-b:a", "128k",
"-movflags", "+faststart",
"-y", dst_path],
capture_output=True, timeout=120,
)
if result.returncode == 0:
with open(dst_path, "rb") as f:
return f.read(), ".m4a"
return data, ext
except Exception:
return data, ext
finally:
for p in [src_path, dst_path]:
if p:
try:
os.unlink(p)
except OSError:
pass
def extract_gps_from_exif(data: bytes) -> tuple | None:
"""EXIF-GPS aus Bilddaten lesen. Gibt (lat, lon) zurück oder None."""
try:
from PIL import Image
img = Image.open(io.BytesIO(data))
exif = img._getexif()
if not exif:
return None
gps = exif.get(34853) # GPSInfo tag
if not gps:
return None
lat_dms = gps.get(2)
lon_dms = gps.get(4)
lat_ref = gps.get(1, 'N')
lon_ref = gps.get(3, 'E')
if not lat_dms or not lon_dms:
return None
def dms(v):
return float(v[0]) + float(v[1]) / 60 + float(v[2]) / 3600
lat = dms(lat_dms) * (-1 if lat_ref == 'S' else 1)
lon = dms(lon_dms) * (-1 if lon_ref == 'W' else 1)
if not (-90 <= lat <= 90 and -180 <= lon <= 180):
return None
return round(lat, 6), round(lon, 6)
except Exception:
return None
def convert_media(data: bytes, filename: str) -> Tuple[bytes, str]:
"""Convert HEIC→JPEG and MOV/AVI/M4V→MP4; pass everything else through."""
ext = os.path.splitext(filename or "")[1].lower()
if ext in _HEIC_EXTS:
return to_jpeg_if_heic(data, filename)
if ext in _VIDEO_EXTS:
return to_mp4_if_needed(data, filename)
return data, ext or ".bin"
_PREVIEW_EXTS = {".jpg", ".jpeg", ".png", ".webp", ".gif", ".heic", ".heif"}
_PREVIEW_MAX = 800 # längste Seite in Pixeln
def generate_preview(data: bytes, ext: str) -> bytes | None:
"""Erzeugt ein WebP-Preview (max _PREVIEW_MAX px). Gibt None zurück bei Videos/PDFs/Fehler."""
if ext.lower() not in _PREVIEW_EXTS:
return None
try:
from PIL import Image, ImageOps
img = Image.open(io.BytesIO(data))
img = ImageOps.exif_transpose(img)
img = img.convert("RGB")
img.thumbnail((_PREVIEW_MAX, _PREVIEW_MAX), Image.LANCZOS)
buf = io.BytesIO()
img.save(buf, format="WEBP", quality=80)
return buf.getvalue()
except Exception:
return None
def get_image_size(data: bytes) -> tuple[int, int] | None:
"""Gibt (width, height) eines Bildes zurück, oder None bei Fehler."""
try:
from PIL import Image, ImageOps
img = Image.open(io.BytesIO(data))
img = ImageOps.exif_transpose(img)
return img.size # (width, height)
except Exception:
return None
def preview_url_from(url: str | None) -> str | None:
"""Leitet die Preview-URL aus der Original-URL ab (fügt _preview vor Extension ein).
Gibt None für Videos, PDFs, bereits generierte Previews und leere URLs zurück."""
if not url:
return None
low = url.lower()
if any(low.endswith(s) for s in (
".mp4", ".webm", ".mov", ".avi", ".pdf", "_thumb.jpg", "_preview.jpg", "_preview.webp"
)):
return None
base, _ = os.path.splitext(url)
return base + "_preview.webp"
def extract_video_thumb(video_path: str) -> str | None:
"""Extract a frame at ~1s from video_path and save as <basename>_thumb.jpg.
Returns the thumb path on success, None on failure."""
base, _ = os.path.splitext(video_path)
thumb_path = base + "_thumb.jpg"
try:
result = subprocess.run(
["ffmpeg", "-i", video_path,
"-ss", "1", "-vframes", "1",
"-q:v", "3", "-vf", "scale=480:-1",
"-y", thumb_path],
capture_output=True, timeout=30,
)
if result.returncode == 0 and os.path.exists(thumb_path):
return thumb_path
# Fallback: first frame (for videos shorter than 1s)
result2 = subprocess.run(
["ffmpeg", "-i", video_path,
"-vframes", "1",
"-q:v", "3", "-vf", "scale=480:-1",
"-y", thumb_path],
capture_output=True, timeout=30,
)
return thumb_path if result2.returncode == 0 and os.path.exists(thumb_path) else None
except Exception:
return None