#!/usr/bin/env python3 """DWD-RV-Regenvorhersage → PMTiles-Frames + Manifest. Läuft im Container (python3 + GDAL + numpy + pmtiles-CLI, s. Dockerfile). Alle 5 Min (Cron auf der DS): 1. neuestes DE1200_RV-Komposit von opendata.dwd.de laden (25 Frames, 0–120 min) 2. je Frame: decodieren → RGBA kolorieren → DE1200-GeoTIFF → Warp 3857 → MBTiles (z0–9) → PMTiles 3. manifest.json + atomarer Swap nach OUT_DIR (rename), alte Läufe aufräumen Georeferenzierung BEWIESEN (PoC tools/dwd-radar/poc/, 2026-06-08): Anker (9E,51N) = Pixel-Mitte (470/600), Ecken decken sich mit der DWD-DE1200-Spec. Format: 194-Byte-ASCII-Header bis ETX, 1200×1100 uint16 LE, Wert = (raw & 0x0FFF) × 10^-PR mm/5min, raw & 0x2000 = kein Daten. ENV: OUT_DIR (Default /out), FRAME_STEP (1 = alle 25 Frames, 2 = 10-Min-Schritte), KEEP_RUNS (Default 2). Zoom: Basis z7 (≈ native 1-km-Auflösung, ZOOM_LEVEL_STRATEGY=UPPER) + Overviews bis z4. Darüber overzoomt MapLibre die Raster-Source nativ (Radar ist ohnehin 1-km-blockig); unter z4 wird der Layer im Frontend ausgeblendet (minzoom). """ import json import os import re import shutil import subprocess import sys import tarfile import tempfile import urllib.request from pathlib import Path import numpy as np from osgeo import gdal, osr gdal.UseExceptions() BASE_URL = "https://opendata.dwd.de/weather/radar/composite/rv/" OUT_DIR = Path(os.environ.get("OUT_DIR", "/out")) FRAME_STEP = int(os.environ.get("FRAME_STEP", "1")) KEEP_RUNS = int(os.environ.get("KEEP_RUNS", "2")) MIN_ZOOM, MAX_ZOOM = 4, 7 # s. Docstring (Overzoom > z7 macht MapLibre) NCOLS, NROWS = 1100, 1200 # DE1200, WGS84-Variante (wradlib-Parameter, PoC-verifiziert). In GDALs Achsen- # Konvention belegt das Gitter x ∈ [0, 1100000], y ∈ [-1200000, 0] (Süden negativ). DE1200_WKT = ( 'PROJCS["Radolan Projection",' 'GEOGCS["Radolan Coordinate System",' 'DATUM["Radolan_Kugel",SPHEROID["WGS 84", 6378137, 298.25722356301]],' 'PRIMEM["Greenwich", 0],' 'UNIT["degree", 0.017453292519943295]],' 'PROJECTION["Polar_Stereographic"],' 'PARAMETER["latitude_of_origin", 60],' 'PARAMETER["central_meridian", 10],' 'PARAMETER["false_easting", 543196.83521776402],' 'PARAMETER["false_northing", 3622588.8619310018],' 'UNIT["m", 1]]' ) # Farbskala mm/5min → RGBA (an gängige Radar-Paletten angelehnt, RainViewer-ähnlich). # Unter 0,05 mm/5min transparent (Rauschen), darüber blau→grün→gelb→orange→rot→violett. SCALE = [ (0.05, (60, 130, 220, 110)), (0.15, (40, 160, 230, 150)), (0.40, (50, 200, 130, 170)), (0.80, (230, 210, 70, 190)), (1.50, (240, 150, 50, 210)), (3.00, (235, 70, 50, 230)), (6.00, (180, 40, 150, 240)), (99.0, (130, 20, 110, 250)), ] def latest_archive(): html = urllib.request.urlopen(BASE_URL, timeout=30).read().decode() names = sorted(set(re.findall(r'DE1200_RV\d{10}\.tar\.bz2', html))) if not names: raise RuntimeError("Kein RV-Komposit im DWD-Verzeichnis gefunden") return names[-1] def parse_frame(raw): etx = raw.index(b'\x03') header = raw[:etx].decode('ascii', 'replace') prec = 0.01 m = re.search(r'PR E-(\d{2})', header) if m: prec = 10 ** -int(m.group(1)) data = np.frombuffer(raw[etx + 1:], dtype=' 0 vals = (grid & 0x0FFF).astype(np.float32) * prec vals[nodata] = 0.0 # kein Daten = transparent wie kein Regen return vals def colorize(vals): """mm/5min → RGBA uint8 (4, NROWS, NCOLS), Zeile 0 = Norden (für GDAL geflippt).""" rgba = np.zeros((4, NROWS, NCOLS), dtype=np.uint8) lower = 0.0 for thresh, (r, g, b, a) in SCALE: m = (vals > lower) & (vals <= thresh) if lower > 0 else (vals >= 0.05) & (vals <= thresh) rgba[0][m], rgba[1][m], rgba[2][m], rgba[3][m] = r, g, b, a lower = thresh return rgba[:, ::-1, :] # Süd-zuerst → Nord-zuerst def frame_to_pmtiles(vals, out_pmtiles, tmp): rgba = colorize(vals) drv = gdal.GetDriverByName('GTiff') src = str(tmp / 'frame_de1200.tif') ds = drv.Create(src, NCOLS, NROWS, 4, gdal.GDT_Byte, options=['COMPRESS=DEFLATE']) ds.SetProjection(DE1200_WKT) ds.SetGeoTransform((0, 1000, 0, 0, 0, -1000)) # linke OBERE Ecke (0,0), y südwärts for i in range(4): ds.GetRasterBand(i + 1).WriteArray(rgba[i]) ds = None # Warp 3857 + MBTiles z0–MAX_ZOOM + Overviews warped = str(tmp / 'frame_3857.tif') gdal.Warp(warped, src, dstSRS='EPSG:3857', resampleAlg='near', creationOptions=['COMPRESS=DEFLATE']) mb = str(tmp / 'frame.mbtiles') gdal.Translate(mb, warped, format='MBTILES', creationOptions=['TILE_FORMAT=PNG', 'ZOOM_LEVEL_STRATEGY=UPPER']) mbds = gdal.Open(mb, gdal.GA_Update) mbds.BuildOverviews('AVERAGE', [2 ** i for i in range(1, MAX_ZOOM - MIN_ZOOM + 1)]) mbds = None subprocess.run(['pmtiles', 'convert', mb, str(out_pmtiles)], check=True, capture_output=True) for f in (src, warped, mb): Path(f).unlink(missing_ok=True) def main(): name = latest_archive() run_id = re.search(r'RV(\d{10})', name).group(1) # YYMMDDHHMM (UTC) run_dir = OUT_DIR / f'run-{run_id}' if run_dir.exists(): print(f"Lauf {run_id} existiert schon — nichts zu tun.") return print(f"Lade {name} …") with tempfile.TemporaryDirectory() as td: tmp = Path(td) arch = tmp / name urllib.request.urlretrieve(BASE_URL + name, arch) work = OUT_DIR / f'.tmp-{run_id}' if work.exists(): shutil.rmtree(work) work.mkdir(parents=True) frames = [] with tarfile.open(arch, 'r:bz2') as tf: members = sorted(tf.getnames()) for i, m in enumerate(members): lead = int(m.rsplit('_', 1)[1]) if (i % FRAME_STEP) != 0: continue vals = parse_frame(tf.extractfile(m).read()) out = work / f'rv_{lead:03d}.pmtiles' frame_to_pmtiles(vals, out, tmp) frames.append({'lead_min': lead, 'file': out.name}) print(f" Frame +{lead:03d} min → {out.name}") # Manifest + atomarer Swap: erst Verzeichnis, dann manifest.json auf den neuen Lauf ts = run_id # YYMMDDHHMM UTC iso = f"20{ts[0:2]}-{ts[2:4]}-{ts[4:6]}T{ts[6:8]}:{ts[8:10]}:00Z" manifest = {'run': run_id, 'run_time_utc': iso, 'interval_min': 5 * FRAME_STEP, 'min_zoom': MIN_ZOOM, 'max_zoom': MAX_ZOOM, 'path': run_dir.name, 'frames': frames} (work / 'manifest.json').write_text(json.dumps(manifest)) work.rename(run_dir) (OUT_DIR / 'manifest.json.tmp').write_text(json.dumps(manifest)) (OUT_DIR / 'manifest.json.tmp').rename(OUT_DIR / 'manifest.json') # Alte Läufe aufräumen (die letzten KEEP_RUNS behalten) runs = sorted([d for d in OUT_DIR.iterdir() if d.is_dir() and d.name.startswith('run-')]) for old in runs[:-KEEP_RUNS]: shutil.rmtree(old, ignore_errors=True) print(f"Fertig: Lauf {run_id}, {len(frames)} Frames → {run_dir}") if __name__ == '__main__': sys.exit(main())