banyaro/backend/routes/walks.py
rene 1ff66a7083 Sicherheit + Tests + A11y, SW by-v1118
PYDANTIC max_length (38 Routen, ~400 Field-Constraints):
Schützt vor DoS durch Riesen-Payloads (10MB Thread-Titel etc.).
Pragmatische Limits:
- Titel/Name: 200 · Beschreibung/Body: 10000 · Notiz: 5000
- Email: 254 (RFC 5321) · URL: 500 · Slug/Kategorie: 100
- Hund-Name/Rasse: 80 · Hund-Bio: 2000

Top-betroffen: forum.py, diary.py, health.py, dogs.py, expenses.py,
notes.py, auth.py, profile.py. Manuelle len()-Checks in profile,
chat, ki entfernt (jetzt durch Field abgedeckt).

PYTEST COVERAGE (+19 Tests, 37 grün + 1 xfail):
- test_security.py: require_owner (Places GET/PATCH/DELETE mit
  Fremduser → 403), JWT-Blacklist (Logout invalidiert Token),
  Login-Lockout (5 Fehlversuche → 429 + Retry-After Header)
- test_race.py: Invoice-Counter (20 parallele Threads, alle unique),
  Founder-Number (atomare Vergabe, voll bei 100)
- test_validation.py: Forum-Titel 30k Zeichen → 422, Diary-Text
  50k → 422 (verifiziert Pydantic max_length-Sweep)

A11Y (Tap-Targets ≥44×44 + Dark-Mode-Kontrast):
- #header-user-btn 36→44px, .header-back 40→44, .header-menu-btn 40→44
- dog-profile Wrapped-Slider Prev/Next 40→44
- forum-Lightbox Close 40→44
- --c-text-muted Light: #B0A090 (2.37:1 FAIL) → #7F6B58 (4.74:1 PASS)
- --c-text-muted Dark:  #806A58 (3.58:1 FAIL) → #A08878 (5.46:1 PASS)
- Branding-Farben unangetastet
2026-05-27 13:40:30 +02:00

613 lines
25 KiB
Python

"""BAN YARO — Gassi-Treffen"""
import os, uuid
import httpx
from datetime import date
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File
from pydantic import BaseModel, Field
from typing import Optional, List
from database import db
from auth import get_current_user
from routes.push import send_push_to_user
from math_utils import haversine_km, haversine_m
MEDIA_DIR = os.getenv("MEDIA_DIR", "/data/media")
router = APIRouter()
# ------------------------------------------------------------------
# Schemas
# ------------------------------------------------------------------
class WalkCreate(BaseModel):
titel: str = Field(..., min_length=1, max_length=200)
datum: str = Field(..., max_length=32) # YYYY-MM-DD
uhrzeit: str = Field(..., max_length=20) # HH:MM
lat: float
lon: float
ort_name: Optional[str] = Field(None, max_length=300)
max_teilnehmer: int = 10
beschreibung: Optional[str] = Field(None, max_length=5000)
class WalkUpdate(BaseModel):
titel: Optional[str] = Field(None, max_length=200)
datum: Optional[str] = Field(None, max_length=32)
uhrzeit: Optional[str] = Field(None, max_length=20)
lat: Optional[float] = None
lon: Optional[float] = None
ort_name: Optional[str] = Field(None, max_length=300)
max_teilnehmer: Optional[int] = None
beschreibung: Optional[str] = Field(None, max_length=5000)
class JoinRequest(BaseModel):
dog_ids: List[int] = [] # leere Liste = ohne Hund (selten)
class InviteRequest(BaseModel):
friend_id: int
class RsvpRequest(BaseModel):
status: str = Field(..., max_length=20) # 'yes' | 'maybe' | 'no'
# ------------------------------------------------------------------
# GET /api/walks — alle offenen Treffen (ab heute, optional Umkreis)
# ------------------------------------------------------------------
@router.get("")
async def list_walks(
lat: Optional[float] = None,
lon: Optional[float] = None,
radius: int = 20000,
alle: bool = False, # True → auch vergangene / stornierte
):
today = date.today().isoformat()
with db() as conn:
q = """
SELECT w.*,
u.name AS veranstalter_name,
COUNT(DISTINCT wp.user_id) AS teilnehmer_count
FROM walks w
LEFT JOIN users u ON u.id = w.user_id
LEFT JOIN walk_participants wp ON wp.walk_id = w.id
WHERE w.status != 'storniert'
"""
if not alle:
q += f" AND w.datum >= '{today}'"
q += " GROUP BY w.id ORDER BY w.datum ASC, w.uhrzeit ASC"
rows = conn.execute(q).fetchall()
result = [dict(r) for r in rows]
# Umkreis-Filter
if lat is not None and lon is not None:
result = [r for r in result if haversine_m(lat, lon, r['lat'], r['lon']) <= radius]
return result
# ------------------------------------------------------------------
# POST /api/walks — Treffen erstellen
# ------------------------------------------------------------------
@router.post("", status_code=201)
async def create_walk(data: WalkCreate, user=Depends(get_current_user)):
with db() as conn:
cur = conn.execute("""
INSERT INTO walks (user_id, titel, datum, uhrzeit, lat, lon,
ort_name, max_teilnehmer, beschreibung)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (user['id'], data.titel, data.datum, data.uhrzeit,
data.lat, data.lon, data.ort_name,
data.max_teilnehmer, data.beschreibung))
row = conn.execute(
"SELECT w.*, u.name AS veranstalter_name FROM walks w "
"LEFT JOIN users u ON u.id = w.user_id WHERE w.id = ?",
(cur.lastrowid,)
).fetchone()
return {**dict(row), 'teilnehmer_count': 0}
# ------------------------------------------------------------------
# GET /api/walks/nearby — POI-Suche für Treffpunkt-Autocomplete
# WICHTIG: Muss VOR /{walk_id} stehen (FastAPI Route-Reihenfolge)
# ------------------------------------------------------------------
@router.get("/nearby")
async def nearby_places(lat: float, lon: float, user=Depends(get_current_user)):
results = []
with db() as conn:
# 1. User-eigene Places
places = conn.execute(
"SELECT name, typ, lat, lon FROM places WHERE lat IS NOT NULL",
).fetchall()
for p in places:
km = haversine_km(lat, lon, p["lat"], p["lon"])
if km <= 5:
results.append({"name": p["name"], "type": p["typ"] or "place",
"lat": p["lat"], "lon": p["lon"],
"distance_m": int(km * 1000), "source": "places"})
# 2. Gecachte OSM-POIs
osm = conn.execute(
"SELECT name, type, lat, lon FROM osm_pois WHERE name IS NOT NULL AND name != ''"
).fetchall()
for p in osm:
km = haversine_km(lat, lon, p["lat"], p["lon"])
if km <= 2:
results.append({"name": p["name"], "type": p["type"],
"lat": p["lat"], "lon": p["lon"],
"distance_m": int(km * 1000), "source": "osm"})
# 3. Overpass: benannte POIs in 1000m
try:
async with httpx.AsyncClient(timeout=6) as client:
q = (
f'[out:json][timeout:6];'
f'(node["name"]["leisure"](around:1000,{lat},{lon});'
f' node["name"]["amenity"](around:1000,{lat},{lon});'
f' node["name"]["tourism"](around:1000,{lat},{lon});'
f' way["name"]["leisure"](around:1000,{lat},{lon});'
f');out center;'
)
r = await client.post("https://overpass-api.de/api/interpreter",
data={"data": q})
if r.status_code == 200:
for el in r.json().get("elements", []):
name = el.get("tags", {}).get("name")
if not name:
continue
elat = el.get("lat") or el.get("center", {}).get("lat")
elon = el.get("lon") or el.get("center", {}).get("lon")
if elat is None or elon is None:
continue
km = haversine_km(lat, lon, elat, elon)
if km <= 1:
results.append({"name": name, "type": "osm",
"lat": elat, "lon": elon,
"distance_m": int(km * 1000), "source": "osm"})
except Exception:
pass
# Deduplizieren nach Name + Sortieren nach Distanz
seen = set()
unique = []
for r in sorted(results, key=lambda x: x["distance_m"]):
key = r["name"].lower()
if key not in seen:
seen.add(key)
unique.append(r)
return unique[:20]
# ------------------------------------------------------------------
# GET /api/walks/{id}/invite-candidates — einladbare Freunde
# WICHTIG: Muss VOR /{walk_id} stehen
# ------------------------------------------------------------------
@router.get("/{walk_id}/invite-candidates")
async def invite_candidates(walk_id: int, user=Depends(get_current_user)):
with db() as conn:
walk = conn.execute("SELECT * FROM walks WHERE id=?", (walk_id,)).fetchone()
if not walk:
raise HTTPException(404, "Treffen nicht gefunden.")
if walk['user_id'] != user['id']:
raise HTTPException(403, "Nur der Veranstalter kann diese Liste abrufen.")
# Freunde die noch nicht eingeladen wurden
already = {r[0] for r in conn.execute(
"SELECT user_id FROM walk_invitations WHERE walk_id=?", (walk_id,)
).fetchall()}
friends = _get_accepted_friends(user['id'])
return [f for f in friends if f['friend_id'] not in already]
# ------------------------------------------------------------------
# POST /api/walks/{id}/invite — Freund einladen (nur Veranstalter)
# WICHTIG: Muss VOR /{walk_id} stehen
# ------------------------------------------------------------------
@router.post("/{walk_id}/invite", status_code=201)
async def invite_friend(walk_id: int, data: InviteRequest, user=Depends(get_current_user)):
with db() as conn:
walk = conn.execute("SELECT * FROM walks WHERE id = ?", (walk_id,)).fetchone()
if not walk:
raise HTTPException(404, "Treffen nicht gefunden.")
if walk['user_id'] != user['id']:
raise HTTPException(403, "Nur der Veranstalter kann einladen.")
# Freundschaft prüfen
friendship = conn.execute("""
SELECT 1 FROM friendships
WHERE status='accepted'
AND ((requester_id=? AND addressee_id=?) OR (requester_id=? AND addressee_id=?))
""", (user['id'], data.friend_id, data.friend_id, user['id'])).fetchone()
if not friendship:
raise HTTPException(400, "Dieser Nutzer ist nicht in deiner Freundesliste.")
# Bereits eingeladen?
existing = conn.execute(
"SELECT status FROM walk_invitations WHERE walk_id=? AND user_id=?",
(walk_id, data.friend_id)
).fetchone()
if existing:
raise HTTPException(409, f"Nutzer wurde bereits eingeladen (Status: {existing['status']}).")
conn.execute(
"INSERT INTO walk_invitations (walk_id, user_id, status) VALUES (?, ?, 'invited')",
(walk_id, data.friend_id)
)
friend = conn.execute("SELECT name FROM users WHERE id=?", (data.friend_id,)).fetchone()
# Push-Notification
send_push_to_user(data.friend_id, {
"type": "walk_invite",
"title": f"Einladung: {walk['titel']}",
"body": f"{user['name']} lädt dich zu einem Gassi-Treffen ein ({walk['datum']} {walk['uhrzeit']})",
"page": "walks",
"walk_id": walk_id,
})
return {"status": "invited", "friend_name": friend['name'] if friend else ""}
# ------------------------------------------------------------------
# POST /api/walks/{id}/rsvp — RSVP setzen (yes / maybe / no)
# WICHTIG: Muss VOR /{walk_id} stehen
# ------------------------------------------------------------------
@router.post("/{walk_id}/rsvp")
async def rsvp_walk(walk_id: int, data: RsvpRequest, user=Depends(get_current_user)):
if data.status not in ('yes', 'maybe', 'no'):
raise HTTPException(400, "Ungültiger RSVP-Status. Erlaubt: yes, maybe, no")
with db() as conn:
walk = conn.execute("SELECT * FROM walks WHERE id=?", (walk_id,)).fetchone()
if not walk:
raise HTTPException(404, "Treffen nicht gefunden.")
inv = conn.execute(
"SELECT * FROM walk_invitations WHERE walk_id=? AND user_id=?",
(walk_id, user['id'])
).fetchone()
if not inv:
raise HTTPException(403, "Du wurdest nicht zu diesem Treffen eingeladen.")
conn.execute(
"""UPDATE walk_invitations
SET status=?, responded_at=datetime('now')
WHERE walk_id=? AND user_id=?""",
(data.status, walk_id, user['id'])
)
return {"status": data.status}
# ------------------------------------------------------------------
# GET /api/walks/{id}/participants — Teilnehmerliste mit RSVP
# WICHTIG: Muss VOR /{walk_id} stehen
# ------------------------------------------------------------------
@router.get("/{walk_id}/participants")
async def get_participants(walk_id: int, user=Depends(get_current_user)):
with db() as conn:
walk = conn.execute("SELECT * FROM walks WHERE id=?", (walk_id,)).fetchone()
if not walk:
raise HTTPException(404, "Treffen nicht gefunden.")
is_organizer = walk['user_id'] == user['id']
if is_organizer:
# Veranstalter sieht alle Einladungen
invitations = conn.execute("""
SELECT wi.user_id, wi.status, wi.invited_at, wi.responded_at,
u.name AS user_name,
GROUP_CONCAT(d.name, ', ') AS hunde
FROM walk_invitations wi
JOIN users u ON u.id = wi.user_id
LEFT JOIN walk_participant_dogs wpd
ON wpd.walk_id = wi.walk_id AND wpd.user_id = wi.user_id
LEFT JOIN dogs d ON d.id = wpd.dog_id
WHERE wi.walk_id = ?
GROUP BY wi.user_id
""", (walk_id,)).fetchall()
else:
# Eingeladene sehen nur Zugesagte + sich selbst
invitations = conn.execute("""
SELECT wi.user_id, wi.status, wi.invited_at, wi.responded_at,
u.name AS user_name,
GROUP_CONCAT(d.name, ', ') AS hunde
FROM walk_invitations wi
JOIN users u ON u.id = wi.user_id
LEFT JOIN walk_participant_dogs wpd
ON wpd.walk_id = wi.walk_id AND wpd.user_id = wi.user_id
LEFT JOIN dogs d ON d.id = wpd.dog_id
WHERE wi.walk_id = ? AND (wi.status = 'yes' OR wi.user_id = ?)
GROUP BY wi.user_id
""", (walk_id, user['id'])).fetchall()
my_invitation = conn.execute(
"SELECT status FROM walk_invitations WHERE walk_id=? AND user_id=?",
(walk_id, user['id'])
).fetchone()
return {
"invitations": [dict(r) for r in invitations],
"my_rsvp": my_invitation['status'] if my_invitation else None,
"is_organizer": is_organizer,
}
# ------------------------------------------------------------------
# GET /api/walks/{id} — Detail mit Teilnehmerliste
# ------------------------------------------------------------------
@router.get("/{walk_id}")
async def get_walk(walk_id: int):
with db() as conn:
walk = conn.execute(
"SELECT w.*, u.name AS veranstalter_name FROM walks w "
"LEFT JOIN users u ON u.id = w.user_id WHERE w.id = ?",
(walk_id,)
).fetchone()
if not walk:
raise HTTPException(404, "Treffen nicht gefunden.")
# Teilnehmer mit Hunden
participants = conn.execute("""
SELECT wp.user_id, u.name AS user_name,
GROUP_CONCAT(d.name, ', ') AS hunde
FROM walk_participants wp
JOIN users u ON u.id = wp.user_id
LEFT JOIN walk_participant_dogs wpd
ON wpd.walk_id = wp.walk_id AND wpd.user_id = wp.user_id
LEFT JOIN dogs d ON d.id = wpd.dog_id
WHERE wp.walk_id = ?
GROUP BY wp.user_id
""", (walk_id,)).fetchall()
# Hunde-Details (Foto + Rasse) pro Teilnehmer
dog_rows = conn.execute("""
SELECT wpd.user_id, d.name AS dog_name, d.foto_url, d.rasse
FROM walk_participant_dogs wpd
JOIN dogs d ON d.id = wpd.dog_id
WHERE wpd.walk_id = ?
""", (walk_id,)).fetchall()
# Walk-Fotos
photos = conn.execute(
"SELECT id, user_id, url, created_at FROM walk_photos WHERE walk_id=? ORDER BY created_at",
(walk_id,)
).fetchall()
from collections import defaultdict
dogs_by_user = defaultdict(list)
for r in dog_rows:
dogs_by_user[r['user_id']].append({
'name': r['dog_name'], 'foto_url': r['foto_url'], 'rasse': r['rasse']
})
result = dict(walk)
result['teilnehmer'] = [
{**dict(p), 'hunde_liste': dogs_by_user.get(p['user_id'], [])}
for p in participants
]
result['teilnehmer_count'] = len(result['teilnehmer'])
result['photos'] = [dict(p) for p in photos]
return result
# Helper: accepted friends list for invite-modal (reused from friends route)
def _get_accepted_friends(user_id: int):
with db() as conn:
rows = conn.execute("""
SELECT CASE WHEN f.requester_id=? THEN f.addressee_id ELSE f.requester_id END AS friend_id,
u.name AS friend_name
FROM friendships f
JOIN users u ON u.id = CASE WHEN f.requester_id=? THEN f.addressee_id ELSE f.requester_id END
WHERE (f.requester_id=? OR f.addressee_id=?) AND f.status='accepted'
ORDER BY u.name
""", (user_id, user_id, user_id, user_id)).fetchall()
return [dict(r) for r in rows]
# ------------------------------------------------------------------
# PATCH /api/walks/{id}
# ------------------------------------------------------------------
@router.patch("/{walk_id}")
async def update_walk(walk_id: int, data: WalkUpdate, user=Depends(get_current_user)):
with db() as conn:
walk = conn.execute("SELECT * FROM walks WHERE id = ?", (walk_id,)).fetchone()
if not walk:
raise HTTPException(404, "Treffen nicht gefunden.")
if walk['user_id'] != user['id']:
raise HTTPException(403, "Nur der Veranstalter kann das Treffen bearbeiten.")
updates = data.model_dump(exclude_none=True)
if updates:
cols = ', '.join(f"{k} = ?" for k in updates)
conn.execute(f"UPDATE walks SET {cols} WHERE id = ?", [*updates.values(), walk_id])
row = conn.execute(
"SELECT w.*, u.name AS veranstalter_name FROM walks w "
"LEFT JOIN users u ON u.id = w.user_id WHERE w.id = ?",
(walk_id,)
).fetchone()
count = conn.execute(
"SELECT COUNT(*) FROM walk_participants WHERE walk_id = ?", (walk_id,)
).fetchone()[0]
return {**dict(row), 'teilnehmer_count': count}
# ------------------------------------------------------------------
# DELETE /api/walks/{id} — stornieren
# ------------------------------------------------------------------
@router.delete("/{walk_id}", status_code=204)
async def cancel_walk(walk_id: int, user=Depends(get_current_user)):
with db() as conn:
walk = conn.execute("SELECT * FROM walks WHERE id = ?", (walk_id,)).fetchone()
if not walk:
raise HTTPException(404, "Treffen nicht gefunden.")
if walk['user_id'] != user['id']:
raise HTTPException(403, "Nur der Veranstalter kann das Treffen stornieren.")
conn.execute("UPDATE walks SET status = 'storniert' WHERE id = ?", (walk_id,))
# ------------------------------------------------------------------
# POST /api/walks/{id}/join — beitreten
# ------------------------------------------------------------------
@router.post("/{walk_id}/join")
async def join_walk(walk_id: int, data: JoinRequest, user=Depends(get_current_user)):
with db() as conn:
walk = conn.execute("SELECT * FROM walks WHERE id = ?", (walk_id,)).fetchone()
if not walk:
raise HTTPException(404, "Treffen nicht gefunden.")
if walk['status'] != 'offen':
raise HTTPException(400, "Dieses Treffen ist nicht mehr offen.")
# Bereits beigetreten?
existing = conn.execute(
"SELECT 1 FROM walk_participants WHERE walk_id = ? AND user_id = ?",
(walk_id, user['id'])
).fetchone()
if existing:
raise HTTPException(409, "Du nimmst bereits teil.")
# Platz frei?
count = conn.execute(
"SELECT COUNT(*) FROM walk_participants WHERE walk_id = ?", (walk_id,)
).fetchone()[0]
if count >= walk['max_teilnehmer']:
raise HTTPException(400, "Das Treffen ist bereits voll.")
# Beitreten
primary_dog = data.dog_ids[0] if data.dog_ids else None
conn.execute(
"INSERT INTO walk_participants (walk_id, user_id, dog_id) VALUES (?, ?, ?)",
(walk_id, user['id'], primary_dog)
)
# Hunde eintragen
for dog_id in data.dog_ids:
conn.execute(
"INSERT OR IGNORE INTO walk_participant_dogs (walk_id, user_id, dog_id) VALUES (?, ?, ?)",
(walk_id, user['id'], dog_id)
)
new_count = count + 1
if new_count >= walk['max_teilnehmer']:
conn.execute("UPDATE walks SET status = 'voll' WHERE id = ?", (walk_id,))
return {"status": "joined", "teilnehmer_count": new_count}
# ------------------------------------------------------------------
# DELETE /api/walks/{id}/join — verlassen
# ------------------------------------------------------------------
@router.delete("/{walk_id}/join", status_code=200)
async def leave_walk(walk_id: int, user=Depends(get_current_user)):
with db() as conn:
walk = conn.execute("SELECT * FROM walks WHERE id = ?", (walk_id,)).fetchone()
if not walk:
raise HTTPException(404, "Treffen nicht gefunden.")
if walk['user_id'] == user['id']:
raise HTTPException(400, "Als Veranstalter kannst du nicht austreten — storniere das Treffen stattdessen.")
conn.execute(
"DELETE FROM walk_participants WHERE walk_id = ? AND user_id = ?",
(walk_id, user['id'])
)
conn.execute(
"DELETE FROM walk_participant_dogs WHERE walk_id = ? AND user_id = ?",
(walk_id, user['id'])
)
# Status ggf. wieder auf offen setzen
count = conn.execute(
"SELECT COUNT(*) FROM walk_participants WHERE walk_id = ?", (walk_id,)
).fetchone()[0]
if walk['status'] == 'voll':
conn.execute("UPDATE walks SET status = 'offen' WHERE id = ?", (walk_id,))
return {"status": "left", "teilnehmer_count": count}
# ------------------------------------------------------------------
# POST /api/walks/{id}/photos — Foto nach dem Treffen hochladen
# GET /api/walks/{id}/photos — Fotos eines Treffens abrufen
# ------------------------------------------------------------------
@router.post("/{walk_id}/photos", status_code=201)
async def upload_walk_photo(
walk_id: int,
file: UploadFile = File(...),
user=Depends(get_current_user)
):
with db() as conn:
walk = conn.execute("SELECT * FROM walks WHERE id=?", (walk_id,)).fetchone()
if not walk:
raise HTTPException(404, "Treffen nicht gefunden.")
# Nur Teilnehmer oder Veranstalter dürfen Fotos hochladen
is_participant = conn.execute(
"SELECT 1 FROM walk_participants WHERE walk_id=? AND user_id=?",
(walk_id, user['id'])
).fetchone()
if not is_participant and walk['user_id'] != user['id']:
raise HTTPException(403, "Nur Teilnehmer können Fotos hochladen.")
import io
from PIL import Image, ImageOps
try:
import pillow_heif; pillow_heif.register_heif_opener()
except ImportError:
pass
raw = await file.read()
try:
img = Image.open(io.BytesIO(raw))
img = ImageOps.exif_transpose(img).convert("RGB")
buf = io.BytesIO()
img.save(buf, format="JPEG", quality=88)
raw = buf.getvalue()
except Exception:
pass
import re as _re
walk_datum = walk['datum'] or "0000-00-00" # YYYY-MM-DD
uname_raw = (user.get('name') or 'user').lower()
uname_safe = _re.sub(r'[^a-z0-9]', '-', uname_raw)[:20].strip('-')
with db() as conn:
count = conn.execute(
"SELECT COUNT(*) FROM walk_photos WHERE walk_id=? AND user_id=?",
(walk_id, user['id'])
).fetchone()[0]
filename = f"{walk_datum}-{uname_safe}-{count + 1:03d}.jpg"
path = os.path.join(MEDIA_DIR, "walks", filename)
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "wb") as f:
f.write(raw)
url = f"/media/walks/{filename}"
with db() as conn:
cur = conn.execute(
"INSERT INTO walk_photos (walk_id, user_id, url) VALUES (?,?,?)",
(walk_id, user['id'], url)
)
row = conn.execute("SELECT * FROM walk_photos WHERE id=?", (cur.lastrowid,)).fetchone()
return dict(row)
@router.get("/{walk_id}/photos")
async def get_walk_photos(walk_id: int):
with db() as conn:
rows = conn.execute(
"SELECT * FROM walk_photos WHERE walk_id=? ORDER BY created_at",
(walk_id,)
).fetchall()
return [dict(r) for r in rows]
@router.delete("/{walk_id}/photos/{photo_id}", status_code=204)
async def delete_walk_photo(walk_id: int, photo_id: int, user=Depends(get_current_user)):
with db() as conn:
photo = conn.execute(
"SELECT * FROM walk_photos WHERE id=? AND walk_id=?", (photo_id, walk_id)
).fetchone()
if not photo:
raise HTTPException(404)
walk = conn.execute("SELECT user_id FROM walks WHERE id=?", (walk_id,)).fetchone()
if photo['user_id'] != user['id'] and walk['user_id'] != user['id']:
raise HTTPException(403)
conn.execute("DELETE FROM walk_photos WHERE id=?", (photo_id,))