Security Nice-to-Have: Dockerfile, Magic-Bytes, Path-Traversal, TABLE_MAP, Deps
- Dockerfile: non-root user appuser, chown /data + /app - media_utils: validate_upload() Magic-Byte-Check (JPEG/PNG/GIF/WebP/MP4/WebM) - media_utils: safe_media_path() Path-Traversal-Schutz beim Löschen - diary/health/dogs: safe_media_path() statt os.path.join + lstrip - diary: validate_upload() vor jedem Medien-Upload - forum: _LIKE_TABLE dict statt dynamischer String-Interpolation - requirements: uvicorn 0.34, PyJWT 2.10.1, pydantic 2.10.6, bcrypt 4.3, httpx 0.28.1, anthropic 0.49 - SW by-v319, APP_VER 307
This commit is contained in:
parent
15f854d96c
commit
71e588a240
9 changed files with 100 additions and 29 deletions
|
|
@ -8,7 +8,7 @@ from database import db
|
|||
from auth import get_current_user
|
||||
import ki as KI
|
||||
import httpx
|
||||
from media_utils import convert_media, extract_video_thumb
|
||||
from media_utils import convert_media, extract_video_thumb, safe_media_path, validate_upload
|
||||
|
||||
router = APIRouter()
|
||||
MEDIA_DIR = os.getenv("MEDIA_DIR", "/data/media")
|
||||
|
|
@ -488,6 +488,10 @@ async def upload_media(dog_id: int, entry_id: int,
|
|||
raise HTTPException(415, "Nur Bilder, Videos und PDFs erlaubt.")
|
||||
|
||||
raw_data = await file.read()
|
||||
try:
|
||||
validate_upload(raw_data, file.filename or "")
|
||||
except ValueError as e:
|
||||
raise HTTPException(415, str(e))
|
||||
raw_data, ext = convert_media(raw_data, file.filename or "")
|
||||
if not ext:
|
||||
ext = ".jpg"
|
||||
|
|
@ -539,9 +543,10 @@ async def delete_media_item(dog_id: int, entry_id: int, media_id: int,
|
|||
).fetchone()
|
||||
if not row:
|
||||
raise HTTPException(404, "Medium nicht gefunden.")
|
||||
file_path = os.path.join(MEDIA_DIR, row["url"].lstrip("/media/"))
|
||||
try: os.remove(file_path)
|
||||
except OSError: pass
|
||||
file_path = safe_media_path(MEDIA_DIR, row["url"])
|
||||
if file_path:
|
||||
try: os.remove(file_path)
|
||||
except OSError: pass
|
||||
conn.execute("DELETE FROM diary_media WHERE id=?", (media_id,))
|
||||
|
||||
|
||||
|
|
@ -556,9 +561,10 @@ async def delete_media_legacy(dog_id: int, entry_id: int, user=Depends(get_curre
|
|||
if not row:
|
||||
raise HTTPException(404, "Eintrag nicht gefunden.")
|
||||
if row["media_url"]:
|
||||
path = os.path.join(MEDIA_DIR, row["media_url"].lstrip("/media/"))
|
||||
try: os.remove(path)
|
||||
except OSError: pass
|
||||
path = safe_media_path(MEDIA_DIR, row["media_url"])
|
||||
if path:
|
||||
try: os.remove(path)
|
||||
except OSError: pass
|
||||
conn.execute("UPDATE diary SET media_url=NULL WHERE id=?", (entry_id,))
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ from typing import Optional
|
|||
from database import db
|
||||
from auth import get_current_user
|
||||
from routes.push import send_push_to_user
|
||||
from media_utils import safe_media_path
|
||||
|
||||
router = APIRouter()
|
||||
MEDIA_DIR = os.getenv("MEDIA_DIR", "/data/media")
|
||||
|
|
@ -208,8 +209,8 @@ async def delete_photo(dog_id: int, user=Depends(get_current_user)):
|
|||
if not row:
|
||||
raise HTTPException(404, "Hund nicht gefunden.")
|
||||
if row["foto_url"]:
|
||||
path = os.path.join(MEDIA_DIR, row["foto_url"].lstrip("/media/"))
|
||||
if os.path.exists(path):
|
||||
path = safe_media_path(MEDIA_DIR, row["foto_url"])
|
||||
if path and os.path.exists(path):
|
||||
os.remove(path)
|
||||
with db() as conn:
|
||||
conn.execute(
|
||||
|
|
|
|||
|
|
@ -476,12 +476,14 @@ async def upload_post_foto(
|
|||
# ------------------------------------------------------------------
|
||||
# POST /api/forum/like — Toggle
|
||||
# ------------------------------------------------------------------
|
||||
_LIKE_TABLE = {'thread': 'forum_threads', 'post': 'forum_posts'}
|
||||
|
||||
@router.post("/like")
|
||||
async def toggle_like(data: LikeBody, user=Depends(get_current_user)):
|
||||
if data.target_type not in ('thread', 'post'):
|
||||
if data.target_type not in _LIKE_TABLE:
|
||||
raise HTTPException(400, "Ungültiger Typ.")
|
||||
|
||||
table = f"forum_{data.target_type}s"
|
||||
table = _LIKE_TABLE[data.target_type]
|
||||
with db() as conn:
|
||||
existing = conn.execute(
|
||||
"SELECT 1 FROM forum_likes WHERE user_id=? AND target_type=? AND target_id=?",
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ from pydantic import BaseModel
|
|||
from typing import Optional
|
||||
from database import db
|
||||
from auth import get_current_user
|
||||
from media_utils import safe_media_path
|
||||
|
||||
router = APIRouter()
|
||||
MEDIA_DIR = os.getenv("MEDIA_DIR", "/data/media")
|
||||
|
|
@ -219,10 +220,8 @@ async def delete_dokument(dog_id: int, entry_id: int, user=Depends(get_current_u
|
|||
|
||||
datei_url = entry["datei_url"]
|
||||
if datei_url:
|
||||
# datei_url z.B. "/media/health/health_42_abc12345.pdf"
|
||||
filename = datei_url.lstrip("/media/")
|
||||
path = os.path.join(MEDIA_DIR, filename)
|
||||
if os.path.isfile(path):
|
||||
path = safe_media_path(MEDIA_DIR, datei_url)
|
||||
if path and os.path.isfile(path):
|
||||
os.remove(path)
|
||||
|
||||
conn.execute(
|
||||
|
|
@ -338,9 +337,10 @@ async def delete_media_item(dog_id: int, entry_id: int, media_id: int,
|
|||
).fetchone()
|
||||
if not row:
|
||||
raise HTTPException(404, "Medium nicht gefunden.")
|
||||
file_path = os.path.join(MEDIA_DIR, row["url"].lstrip("/media/"))
|
||||
try: os.remove(file_path)
|
||||
except OSError: pass
|
||||
file_path = safe_media_path(MEDIA_DIR, row["url"])
|
||||
if file_path:
|
||||
try: os.remove(file_path)
|
||||
except OSError: pass
|
||||
conn.execute("DELETE FROM health_media WHERE id=?", (media_id,))
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue