""" BAN YARO — OSM/Overpass POI-Cache + Community-Pins Cacht OSM-Daten lokal, erlaubt Nutzern eigene Marker und Meldungen. """ import math import asyncio import httpx import logging from typing import Optional from fastapi import APIRouter, Query, BackgroundTasks, Depends, HTTPException from pydantic import BaseModel from database import db from auth import get_current_user, get_current_user_optional as get_optional_user logger = logging.getLogger(__name__) router = APIRouter() CACHE_ZOOM = 12 CACHE_DAYS = 90 OVERPASS_URLS = [ 'https://lz4.overpass-api.de/api/interpreter', 'https://overpass-api.de/api/interpreter', 'https://overpass.kumi.systems/api/interpreter', ] # Max 1 gleichzeitige Overpass-Anfrage + 2s Mindestabstand (Fair Use) _overpass_sem = asyncio.Semaphore(1) _overpass_last_req = 0.0 _OVERPASS_MIN_DELAY = 2.0 # Sekunden zwischen Anfragen _OVERPASS_UA = 'BanYaro/1.0 (https://banyaro.app; dog-walking PWA; contact: admin@banyaro.app)' _OVERPASS_HEADERS = { 'User-Agent': _OVERPASS_UA, 'Referer': 'https://banyaro.app/', # von overpass-api.de verlangt gegen 406 } # Referenzen auf laufende Hintergrund-Tasks — verhindert GC vor Abschluss _bg_tasks: set = set() # Tiles die gerade gefetcht werden — verhindert Doppel-Requests _fetching: set = set() OSM_QUERIES = { 'waste_basket': '[out:json][timeout:20];node["amenity"="waste_basket"]({bbox});out;', 'dog_park': '[out:json][timeout:25];(way["leisure"="dog_park"]({bbox});node["leisure"="dog_park"]({bbox});way["leisure"="park"]["dog"="yes"]({bbox});node["leisure"="park"]["dog"="yes"]({bbox}););out center;', 'drinking_water': '[out:json][timeout:20];node["amenity"="drinking_water"]({bbox});out;', 'tierarzt': '[out:json][timeout:25];(node["amenity"="veterinary"]({bbox});way["amenity"="veterinary"]({bbox}););out center;', 'shop': '[out:json][timeout:25];(node["shop"="pet"]({bbox});way["shop"="pet"]({bbox}););out center;', 'restaurant': '[out:json][timeout:35];(node["amenity"="restaurant"]({bbox});way["amenity"="restaurant"]({bbox});node["amenity"="cafe"]({bbox});way["amenity"="cafe"]({bbox});node["amenity"="biergarten"]({bbox});way["amenity"="biergarten"]({bbox}););out center;', 'bank': '[out:json][timeout:20];node["amenity"="bench"]({bbox});out;', } # Ab dieser Anzahl Meldungen wird ein Marker ausgeblendet REPORT_THRESHOLD = 3 # ------------------------------------------------------------------ # Tile-Mathematik # ------------------------------------------------------------------ def _lat_lon_to_tile(lat, lon, zoom): n = 2 ** zoom x = int((lon + 180) / 360 * n) y = int((1 - math.asinh(math.tan(math.radians(lat))) / math.pi) / 2 * n) return x, y def _tile_to_bbox(x, y, zoom): n = 2 ** zoom west = x / n * 360 - 180 east = (x + 1) / n * 360 - 180 north = math.degrees(math.atan(math.sinh(math.pi * (1 - 2 * y / n)))) south = math.degrees(math.atan(math.sinh(math.pi * (1 - 2 * (y + 1) / n)))) return south, west, north, east def _covering_tiles(south, west, north, east, zoom): x0, y0 = _lat_lon_to_tile(north, west, zoom) x1, y1 = _lat_lon_to_tile(south, east, zoom) return [(x, y) for x in range(x0, x1 + 1) for y in range(y0, y1 + 1)] # ------------------------------------------------------------------ # Overpass-Fetch + Cache # ------------------------------------------------------------------ async def _fetch_overpass(query): global _overpass_last_req for url in OVERPASS_URLS: for attempt in range(2): try: async with _overpass_sem: # Fair-Use: Mindestabstand zwischen Anfragen einhalten import time wait = _OVERPASS_MIN_DELAY - (time.monotonic() - _overpass_last_req) if wait > 0: await asyncio.sleep(wait) async with httpx.AsyncClient( timeout=40, headers=_OVERPASS_HEADERS, ) as client: r = await client.post(url, data={'data': query}) _overpass_last_req = time.monotonic() if r.status_code == 200: return r.json().get('elements', []) if r.status_code == 429: logger.warning(f"Overpass 429 {url} (Versuch {attempt + 1}/2)") await asyncio.sleep(30 * (attempt + 1)) continue # gleiche URL nochmal logger.warning(f"Overpass {r.status_code} {url} — nächste Instanz") break # nächste URL except Exception as exc: logger.warning(f"Overpass Verbindungsfehler {url}: {exc}") break # nächste URL raise Exception("Alle Overpass-Instanzen fehlgeschlagen") def _stale_tiles(poi_type, tiles): stale = [] with db() as conn: for (x, y) in tiles: key = f"{CACHE_ZOOM}_{x}_{y}" row = conn.execute( """SELECT 1 FROM osm_tiles WHERE type=? AND tile_key=? AND cached_at > datetime('now', ?)""", (poi_type, key, f'-{CACHE_DAYS} days') ).fetchone() if not row: stale.append((x, y)) return stale async def _fetch_and_store_tile(poi_type, x, y): fkey = f"{poi_type}_{CACHE_ZOOM}_{x}_{y}" if fkey in _fetching: return # bereits in Arbeit _fetching.add(fkey) key = f"{CACHE_ZOOM}_{x}_{y}" s, w, n, e = _tile_to_bbox(x, y, CACHE_ZOOM) query = OSM_QUERIES[poi_type].format(bbox=f"{s},{w},{n},{e}") try: elements = await _fetch_overpass(query) except Exception as exc: logger.warning(f"Overpass Fehler {poi_type} Tile {key}: {exc}") _fetching.discard(fkey) return with db() as conn: for el in elements: osm_id = el.get('id') lat = el.get('lat') or (el.get('center') or {}).get('lat') lon = el.get('lon') or (el.get('center') or {}).get('lon') if not (osm_id and lat and lon): continue tags = el.get('tags') or {} name = tags.get('name') or tags.get('description') opening_hours = tags.get('opening_hours') phone = tags.get('phone') or tags.get('contact:phone') or tags.get('telephone') website = tags.get('website') or tags.get('contact:website') or tags.get('url') conn.execute(""" INSERT INTO osm_pois (osm_id, type, lat, lon, name, opening_hours, phone, website, cached_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, datetime('now')) ON CONFLICT(osm_id, type) DO UPDATE SET lat=excluded.lat, lon=excluded.lon, name=excluded.name, opening_hours=CASE WHEN user_edited=1 THEN opening_hours ELSE excluded.opening_hours END, phone=excluded.phone, website=excluded.website, cached_at=excluded.cached_at """, (osm_id, poi_type, lat, lon, name, opening_hours, phone, website)) conn.execute(""" INSERT INTO osm_tiles (type, tile_key, cached_at) VALUES (?, ?, datetime('now')) ON CONFLICT(type, tile_key) DO UPDATE SET cached_at=excluded.cached_at """, (poi_type, key)) logger.info(f"OSM Tile {key} ({poi_type}): {len(elements)} POIs gecacht.") _fetching.discard(fkey) # ------------------------------------------------------------------ # GET /pois — OSM + Community-Pins # Gibt sofort DB-Daten zurück. Stale Tiles werden im Hintergrund # nachgeladen — beim nächsten Scan des gleichen Bereichs sind sie frisch. # ------------------------------------------------------------------ @router.get('/pois') async def get_pois( type: str = Query(...), south: float = Query(...), west: float = Query(...), north: float = Query(...), east: float = Query(...), fast: bool = Query(False), user = Depends(get_optional_user), ): result = [] fetched_fresh = False if type in OSM_QUERIES: tiles = _covering_tiles(south, west, north, east, CACHE_ZOOM) stale = _stale_tiles(type, tiles) if stale and not fast: async def _bg_fetch(poi_type, stale_tiles): for (x, y) in stale_tiles: await _fetch_and_store_tile(poi_type, x, y) task = asyncio.create_task(_bg_fetch(type, stale)) _bg_tasks.add(task) task.add_done_callback(_bg_tasks.discard) with db() as conn: reported = { row[0] for row in conn.execute( """SELECT osm_id FROM osm_reports WHERE type=? AND osm_id IS NOT NULL GROUP BY osm_id HAVING COUNT(*) >= ?""", (type, REPORT_THRESHOLD) ).fetchall() } rows = conn.execute(""" SELECT osm_id, lat, lon, name, opening_hours, phone, website FROM osm_pois WHERE type=? AND lat BETWEEN ? AND ? AND lon BETWEEN ? AND ? """, (type, south, north, west, east)).fetchall() for r in rows: if r['osm_id'] not in reported: result.append({ 'id': r['osm_id'], 'lat': r['lat'], 'lon': r['lon'], 'name': r['name'], 'opening_hours': r['opening_hours'], 'phone': r['phone'], 'website': r['website'], 'source': 'osm', 'fresh': fetched_fresh, }) # Community-Pins: user_map_pois passend zum Typ # type='sonstiges' → zeigt alle 'sonstiges'-Pins # type='waste_basket' etc. → zeigt user-submitted POIs dieses Typs user_poi_type = type # direkte Übereinstimmung with db() as conn: reported_user = { row[0] for row in conn.execute( """SELECT user_poi_id FROM osm_reports WHERE user_poi_id IS NOT NULL GROUP BY user_poi_id HAVING COUNT(*) >= ?""", (REPORT_THRESHOLD,) ).fetchall() } user_pois = conn.execute(""" SELECT p.id, p.lat, p.lon, p.name, p.notiz, p.user_id, p.type, u.name AS username FROM user_map_pois p LEFT JOIN users u ON u.id = p.user_id WHERE p.type=? AND p.lat BETWEEN ? AND ? AND p.lon BETWEEN ? AND ? """, (user_poi_type, south, north, west, east)).fetchall() user_id = user['id'] if user else None for p in user_pois: if p['id'] not in reported_user: result.append({ 'id': f"u{p['id']}", 'user_poi_id': p['id'], 'lat': p['lat'], 'lon': p['lon'], 'name': p['name'], 'notiz': p['notiz'], 'username': p['username'], 'source': 'user', 'own': p['user_id'] == user_id, }) return result # ------------------------------------------------------------------ # POST /user-poi — Community-Marker setzen # ------------------------------------------------------------------ class UserPoiIn(BaseModel): type: str lat: float lon: float name: Optional[str] = None notiz: Optional[str] = None ALLOWED_TYPES = { 'waste_basket', 'drinking_water', 'dog_park', 'giftkoeder', # Giftköder (exklusiv, kein Kombi) 'kotbeutel', # Kotbeutelspender 'bank', # Sitzbank 'gefahr', # Allgemeine Gefahr / Hinweis 'parkplatz', # Hundefreundlicher Parkplatz 'treffpunkt', # Treffpunkt für Hundehalter 'sonstiges', } @router.post('/user-poi') async def add_user_poi(body: UserPoiIn, user = Depends(get_current_user)): types = [t.strip() for t in body.type.split(',') if t.strip()] if not types or any(t not in ALLOWED_TYPES for t in types): raise HTTPException(400, 'Ungültiger Typ') with db() as conn: row = conn.execute(""" INSERT INTO user_map_pois (user_id, type, lat, lon, name, notiz) VALUES (?, ?, ?, ?, ?, ?) """, (user['id'], body.type, body.lat, body.lon, body.name, body.notiz)) new_id = row.lastrowid return {'id': new_id, 'status': 'ok'} # ------------------------------------------------------------------ # DELETE /user-poi/{id} — eigenen Marker löschen # ------------------------------------------------------------------ @router.delete('/user-poi/{poi_id}') async def delete_user_poi(poi_id: int, user = Depends(get_current_user)): with db() as conn: row = conn.execute( "SELECT user_id FROM user_map_pois WHERE id=?", (poi_id,) ).fetchone() if not row: raise HTTPException(404, 'Nicht gefunden') if row['user_id'] != user['id']: raise HTTPException(403, 'Nicht berechtigt') conn.execute("DELETE FROM user_map_pois WHERE id=?", (poi_id,)) return {'status': 'ok'} # ------------------------------------------------------------------ # POST /report — Marker als ungültig melden # ------------------------------------------------------------------ class ReportIn(BaseModel): type: str grund: str osm_id: Optional[int] = None user_poi_id: Optional[int] = None ALLOWED_GRUENDE = {'existiert_nicht', 'falsche_position', 'spam', 'sonstiges'} @router.post('/report') async def report_poi(body: ReportIn, user = Depends(get_current_user)): if not body.osm_id and not body.user_poi_id: raise HTTPException(400, 'osm_id oder user_poi_id erforderlich') if body.grund not in ALLOWED_GRUENDE: raise HTTPException(400, 'Ungültiger Grund') with db() as conn: # Doppelmeldung vom selben User verhindern existing = conn.execute(""" SELECT 1 FROM osm_reports WHERE user_id=? AND osm_id IS ? AND user_poi_id IS ? """, (user['id'], body.osm_id, body.user_poi_id)).fetchone() if existing: return {'status': 'bereits_gemeldet'} conn.execute(""" INSERT INTO osm_reports (user_id, osm_id, user_poi_id, type, grund) VALUES (?, ?, ?, ?, ?) """, (user['id'], body.osm_id, body.user_poi_id, body.type, body.grund)) return {'status': 'ok'} # ------------------------------------------------------------------ # POST /analyze — Cache-Warmup für alle Typen # ------------------------------------------------------------------ @router.post('/analyze') async def analyze_region( background_tasks: BackgroundTasks, south: float = Query(...), west: float = Query(...), north: float = Query(...), east: float = Query(...), ): tiles = _covering_tiles(south, west, north, east, CACHE_ZOOM) async def _warmup(): tasks = [ _fetch_and_store_tile(pt, x, y) for pt in OSM_QUERIES for (x, y) in _stale_tiles(pt, tiles) ] await asyncio.gather(*tasks) background_tasks.add_task(_warmup) return {'status': 'gestartet', 'tiles': len(tiles), 'types': list(OSM_QUERIES.keys())} # ------------------------------------------------------------------ # POST /pois/{osm_id}/edit — Nutzer schlägt Korrektur vor # ------------------------------------------------------------------ class PoiEditCreate(BaseModel): poi_name: str field: str = 'opening_hours' new_value: str @router.post('/pois/{osm_id}/edit', status_code=201) async def submit_poi_edit(osm_id: str, data: PoiEditCreate, user=Depends(get_current_user)): if data.field not in ('opening_hours',): raise HTTPException(400, "Nur 'opening_hours' kann korrigiert werden.") if not data.new_value.strip(): raise HTTPException(400, "Neuer Wert darf nicht leer sein.") with db() as conn: poi = conn.execute( "SELECT name, opening_hours FROM osm_pois WHERE osm_id=?", (osm_id,) ).fetchone() if not poi: raise HTTPException(404, "POI nicht gefunden.") existing = conn.execute( """SELECT id FROM osm_poi_edits WHERE osm_id=? AND field=? AND status='pending' AND user_id=?""", (osm_id, data.field, user["id"]) ).fetchone() if existing: raise HTTPException(409, "Du hast bereits eine ausstehende Korrektur für diesen POI.") conn.execute( """INSERT INTO osm_poi_edits (osm_id, poi_name, field, old_value, new_value, user_id) VALUES (?, ?, ?, ?, ?, ?)""", (osm_id, data.poi_name or poi["name"], data.field, poi[data.field], data.new_value.strip(), user["id"]) ) return {"status": "pending", "message": "Korrektur wurde zur Prüfung eingereicht."}