banyaro/backend/media_utils.py
rene 71e588a240 Security Nice-to-Have: Dockerfile, Magic-Bytes, Path-Traversal, TABLE_MAP, Deps
- Dockerfile: non-root user appuser, chown /data + /app
- media_utils: validate_upload() Magic-Byte-Check (JPEG/PNG/GIF/WebP/MP4/WebM)
- media_utils: safe_media_path() Path-Traversal-Schutz beim Löschen
- diary/health/dogs: safe_media_path() statt os.path.join + lstrip
- diary: validate_upload() vor jedem Medien-Upload
- forum: _LIKE_TABLE dict statt dynamischer String-Interpolation
- requirements: uvicorn 0.34, PyJWT 2.10.1, pydantic 2.10.6, bcrypt 4.3, httpx 0.28.1, anthropic 0.49
- SW by-v319, APP_VER 307
2026-04-23 18:42:05 +02:00

155 lines
5.6 KiB
Python

import io
import os
import subprocess
import tempfile
from typing import Tuple
_HEIC_EXTS = {".heic", ".heif"}
_VIDEO_EXTS = {".mov", ".avi", ".m4v"}
# Magic-Byte-Signaturen erlaubter Medientypen
_IMAGE_MAGIC = [
b'\xff\xd8\xff', # JPEG
b'\x89PNG\r\n', # PNG
b'GIF87a', # GIF87
b'GIF89a', # GIF89
]
_VIDEO_MAGIC = [
b'\x1a\x45\xdf\xa3', # WebM / MKV
]
def validate_upload(data: bytes, filename: str) -> None:
"""
Prüft Magic Bytes des Upload-Inhalts gegen die erwartete Dateitype.
Wirft ValueError bei Mismatch.
Bilder (JPEG/PNG/GIF) und Videos (MP4/WebM/MOV) werden geprüft.
HEIC/HEIF und reine Text-/JSON-Dateien werden übersprungen (Pillow/FFmpeg prüfen selbst).
"""
ext = os.path.splitext(filename or "")[1].lower()
if not data:
raise ValueError("Leere Datei.")
if ext in (".jpg", ".jpeg"):
if not data[:3] == b'\xff\xd8\xff':
raise ValueError("Datei ist kein gültiges JPEG.")
elif ext == ".png":
if not data[:6] == b'\x89PNG\r\n':
raise ValueError("Datei ist kein gültiges PNG.")
elif ext in (".gif",):
if not (data[:6] in (b'GIF87a', b'GIF89a')):
raise ValueError("Datei ist kein gültiges GIF.")
elif ext in (".webp",):
if not (data[:4] == b'RIFF' and data[8:12] == b'WEBP'):
raise ValueError("Datei ist kein gültiges WebP.")
elif ext in (".mp4",):
# MP4: 4-Byte-Größe gefolgt von 'ftyp' oder 'mdat'
if not (len(data) >= 8 and data[4:8] in (b'ftyp', b'mdat', b'moov', b'free')):
raise ValueError("Datei ist kein gültiges MP4.")
elif ext in (".webm",):
if not data[:4] == b'\x1a\x45\xdf\xa3':
raise ValueError("Datei ist kein gültiges WebM.")
# HEIC, MOV, AVI, M4V: Pillow/FFmpeg prüfen beim Konvertieren
def safe_media_path(media_dir: str, url: str) -> str | None:
"""
Konstruiert einen sicheren Dateipfad aus einer gespeicherten URL.
Gibt None zurück wenn der Pfad außerhalb von media_dir liegt (Path-Traversal-Schutz).
"""
relative = url.lstrip("/media/").lstrip("/")
candidate = os.path.realpath(os.path.join(media_dir, relative))
real_base = os.path.realpath(media_dir)
if not candidate.startswith(real_base + os.sep) and candidate != real_base:
return None
return candidate
def to_jpeg_if_heic(data: bytes, filename: str) -> Tuple[bytes, str]:
"""Convert HEIC/HEIF to JPEG; return (data, ext) unchanged for all other types."""
ext = os.path.splitext(filename or "")[1].lower()
if ext not in _HEIC_EXTS:
return data, ext or ".jpg"
try:
import pillow_heif
pillow_heif.register_heif_opener()
from PIL import Image, ImageOps
img = Image.open(io.BytesIO(data))
img = ImageOps.exif_transpose(img)
img = img.convert("RGB")
buf = io.BytesIO()
img.save(buf, format="JPEG", quality=90)
return buf.getvalue(), ".jpg"
except Exception:
return data, ext
def to_mp4_if_needed(data: bytes, filename: str) -> Tuple[bytes, str]:
"""Convert MOV/AVI/M4V to H.264 MP4; return unchanged for MP4/WebM."""
ext = os.path.splitext(filename or "")[1].lower()
if ext not in _VIDEO_EXTS:
return data, ext or ".mp4"
src_path = dst_path = None
try:
with tempfile.NamedTemporaryFile(suffix=ext, delete=False) as src:
src.write(data)
src_path = src.name
dst_path = src_path[: -len(ext)] + ".mp4"
result = subprocess.run(
["ffmpeg", "-i", src_path,
"-c:v", "libx264", "-preset", "fast", "-crf", "23",
"-c:a", "aac",
"-movflags", "+faststart",
"-y", dst_path],
capture_output=True, timeout=300,
)
if result.returncode == 0:
with open(dst_path, "rb") as f:
return f.read(), ".mp4"
return data, ext
except Exception:
return data, ext
finally:
for p in [src_path, dst_path]:
if p:
try:
os.unlink(p)
except OSError:
pass
def convert_media(data: bytes, filename: str) -> Tuple[bytes, str]:
"""Convert HEIC→JPEG and MOV/AVI/M4V→MP4; pass everything else through."""
ext = os.path.splitext(filename or "")[1].lower()
if ext in _HEIC_EXTS:
return to_jpeg_if_heic(data, filename)
if ext in _VIDEO_EXTS:
return to_mp4_if_needed(data, filename)
return data, ext or ".bin"
def extract_video_thumb(video_path: str) -> str | None:
"""Extract a frame at ~1s from video_path and save as <basename>_thumb.jpg.
Returns the thumb path on success, None on failure."""
base, _ = os.path.splitext(video_path)
thumb_path = base + "_thumb.jpg"
try:
result = subprocess.run(
["ffmpeg", "-i", video_path,
"-ss", "1", "-vframes", "1",
"-q:v", "3", "-vf", "scale=480:-1",
"-y", thumb_path],
capture_output=True, timeout=30,
)
if result.returncode == 0 and os.path.exists(thumb_path):
return thumb_path
# Fallback: first frame (for videos shorter than 1s)
result2 = subprocess.run(
["ffmpeg", "-i", video_path,
"-vframes", "1",
"-q:v", "3", "-vf", "scale=480:-1",
"-y", thumb_path],
capture_output=True, timeout=30,
)
return thumb_path if result2.returncode == 0 and os.path.exists(thumb_path) else None
except Exception:
return None