DWD-Regenvorhersage: Pipeline + /radar-Route + Timeline-Integration + Settings-Toggle

PoC BESTANDEN (tools/dwd-radar/poc): Anker (9E,51N) = Pixel-Mitte (470/600),
Ecken decken sich mit der DWD-DE1200-Spec — Georeferenzierung bewiesen.
- tools/dwd-radar: RV-Komposit (25 Frames, 0-120min) -> kolorierte RGBA-
  PMTiles z4-7 je Frame (MapLibre overzoomt darueber) + manifest.json,
  atomarer Swap, KEEP_RUNS-Aufraeumen; 25 Frames in ~14s lokal
- docker-compose.dwd.yml (DSM-Cron alle 5 min, NIE --remove-orphans)
- main.py: /radar/manifest.json (no-store) + /radar/{run}/{file} (Range/206,
  immutable — Run-Id im Pfad); sw.js: /radar/ pass-through
- map.js: Radar-Frames heterogen ({url,time,dwd}) — DWD ersetzt RainViewer-
  Nowcast (0-120min, 5-min-Schritte) wenn Toggle an + GL + Karte in DE +
  Manifest frisch (<30min); sonst RainViewer-Fallback; Label '+X Min - DWD'
- settings.js: Toggle 'DWD-Regenvorhersage' (by_dwd_radar, Default AN)
- pytest 39 passed
Bump v1240
This commit is contained in:
rene 2026-06-06 18:08:57 +02:00
parent 6a06c9be7e
commit 5330681059
17 changed files with 4685 additions and 23 deletions

View file

@ -0,0 +1,15 @@
# DWD-Regenradar-Pipeline: GDAL (decode/warp/tile) + go-pmtiles (MBTiles → PMTiles)
FROM ghcr.io/osgeo/gdal:alpine-normal-latest
# go-pmtiles-CLI (statisches Binary)
ARG PMTILES_VERSION=1.22.1
ARG TARGETARCH
RUN ARCH=$([ "$TARGETARCH" = "arm64" ] && echo arm64 || echo x86_64) && \
wget -qO /tmp/pmtiles.tar.gz \
"https://github.com/protomaps/go-pmtiles/releases/download/v${PMTILES_VERSION}/go-pmtiles_${PMTILES_VERSION}_Linux_${ARCH}.tar.gz" && \
tar xzf /tmp/pmtiles.tar.gz -C /usr/local/bin pmtiles && \
rm /tmp/pmtiles.tar.gz && pmtiles version || true
COPY make_radar_tiles.py /app/make_radar_tiles.py
WORKDIR /app
CMD ["python3", "/app/make_radar_tiles.py"]

View file

@ -0,0 +1,188 @@
#!/usr/bin/env python3
"""DWD-RV-Regenvorhersage → PMTiles-Frames + Manifest.
Läuft im Container (python3 + GDAL + numpy + pmtiles-CLI, s. Dockerfile).
Alle 5 Min (Cron auf der DS):
1. neuestes DE1200_RV-Komposit von opendata.dwd.de laden (25 Frames, 0120 min)
2. je Frame: decodieren RGBA kolorieren DE1200-GeoTIFF Warp 3857
MBTiles (z09) PMTiles
3. manifest.json + atomarer Swap nach OUT_DIR (rename), alte Läufe aufräumen
Georeferenzierung BEWIESEN (PoC tools/dwd-radar/poc/, 2026-06-08): Anker (9E,51N)
= Pixel-Mitte (470/600), Ecken decken sich mit der DWD-DE1200-Spec.
Format: 194-Byte-ASCII-Header bis ETX, 1200×1100 uint16 LE,
Wert = (raw & 0x0FFF) × 10^-PR mm/5min, raw & 0x2000 = kein Daten.
ENV: OUT_DIR (Default /out), FRAME_STEP (1 = alle 25 Frames, 2 = 10-Min-Schritte),
KEEP_RUNS (Default 2).
Zoom: Basis z7 ( native 1-km-Auflösung, ZOOM_LEVEL_STRATEGY=UPPER) + Overviews bis z4.
Darüber overzoomt MapLibre die Raster-Source nativ (Radar ist ohnehin 1-km-blockig);
unter z4 wird der Layer im Frontend ausgeblendet (minzoom).
"""
import json
import os
import re
import shutil
import subprocess
import sys
import tarfile
import tempfile
import urllib.request
from pathlib import Path
import numpy as np
from osgeo import gdal, osr
gdal.UseExceptions()
BASE_URL = "https://opendata.dwd.de/weather/radar/composite/rv/"
OUT_DIR = Path(os.environ.get("OUT_DIR", "/out"))
FRAME_STEP = int(os.environ.get("FRAME_STEP", "1"))
KEEP_RUNS = int(os.environ.get("KEEP_RUNS", "2"))
MIN_ZOOM, MAX_ZOOM = 4, 7 # s. Docstring (Overzoom > z7 macht MapLibre)
NCOLS, NROWS = 1100, 1200
# DE1200, WGS84-Variante (wradlib-Parameter, PoC-verifiziert). In GDALs Achsen-
# Konvention belegt das Gitter x ∈ [0, 1100000], y ∈ [-1200000, 0] (Süden negativ).
DE1200_WKT = (
'PROJCS["Radolan Projection",'
'GEOGCS["Radolan Coordinate System",'
'DATUM["Radolan_Kugel",SPHEROID["WGS 84", 6378137, 298.25722356301]],'
'PRIMEM["Greenwich", 0],'
'UNIT["degree", 0.017453292519943295]],'
'PROJECTION["Polar_Stereographic"],'
'PARAMETER["latitude_of_origin", 60],'
'PARAMETER["central_meridian", 10],'
'PARAMETER["false_easting", 543196.83521776402],'
'PARAMETER["false_northing", 3622588.8619310018],'
'UNIT["m", 1]]'
)
# Farbskala mm/5min → RGBA (an gängige Radar-Paletten angelehnt, RainViewer-ähnlich).
# Unter 0,05 mm/5min transparent (Rauschen), darüber blau→grün→gelb→orange→rot→violett.
SCALE = [
(0.05, (60, 130, 220, 110)),
(0.15, (40, 160, 230, 150)),
(0.40, (50, 200, 130, 170)),
(0.80, (230, 210, 70, 190)),
(1.50, (240, 150, 50, 210)),
(3.00, (235, 70, 50, 230)),
(6.00, (180, 40, 150, 240)),
(99.0, (130, 20, 110, 250)),
]
def latest_archive():
html = urllib.request.urlopen(BASE_URL, timeout=30).read().decode()
names = sorted(set(re.findall(r'DE1200_RV\d{10}\.tar\.bz2', html)))
if not names:
raise RuntimeError("Kein RV-Komposit im DWD-Verzeichnis gefunden")
return names[-1]
def parse_frame(raw):
etx = raw.index(b'\x03')
header = raw[:etx].decode('ascii', 'replace')
prec = 0.01
m = re.search(r'PR E-(\d{2})', header)
if m:
prec = 10 ** -int(m.group(1))
data = np.frombuffer(raw[etx + 1:], dtype='<u2')
if data.size != NCOLS * NROWS:
raise ValueError(f"Datenlänge {data.size} != {NCOLS * NROWS}")
grid = data.reshape(NROWS, NCOLS) # Zeile 0 = Süden
nodata = (grid & 0x2000) > 0
vals = (grid & 0x0FFF).astype(np.float32) * prec
vals[nodata] = 0.0 # kein Daten = transparent wie kein Regen
return vals
def colorize(vals):
"""mm/5min → RGBA uint8 (4, NROWS, NCOLS), Zeile 0 = Norden (für GDAL geflippt)."""
rgba = np.zeros((4, NROWS, NCOLS), dtype=np.uint8)
lower = 0.0
for thresh, (r, g, b, a) in SCALE:
m = (vals > lower) & (vals <= thresh) if lower > 0 else (vals >= 0.05) & (vals <= thresh)
rgba[0][m], rgba[1][m], rgba[2][m], rgba[3][m] = r, g, b, a
lower = thresh
return rgba[:, ::-1, :] # Süd-zuerst → Nord-zuerst
def frame_to_pmtiles(vals, out_pmtiles, tmp):
rgba = colorize(vals)
drv = gdal.GetDriverByName('GTiff')
src = str(tmp / 'frame_de1200.tif')
ds = drv.Create(src, NCOLS, NROWS, 4, gdal.GDT_Byte, options=['COMPRESS=DEFLATE'])
ds.SetProjection(DE1200_WKT)
ds.SetGeoTransform((0, 1000, 0, 0, 0, -1000)) # linke OBERE Ecke (0,0), y südwärts
for i in range(4):
ds.GetRasterBand(i + 1).WriteArray(rgba[i])
ds = None
# Warp 3857 + MBTiles z0MAX_ZOOM + Overviews
warped = str(tmp / 'frame_3857.tif')
gdal.Warp(warped, src, dstSRS='EPSG:3857', resampleAlg='near',
creationOptions=['COMPRESS=DEFLATE'])
mb = str(tmp / 'frame.mbtiles')
gdal.Translate(mb, warped, format='MBTILES',
creationOptions=['TILE_FORMAT=PNG', 'ZOOM_LEVEL_STRATEGY=UPPER'])
mbds = gdal.Open(mb, gdal.GA_Update)
mbds.BuildOverviews('AVERAGE', [2 ** i for i in range(1, MAX_ZOOM - MIN_ZOOM + 1)])
mbds = None
subprocess.run(['pmtiles', 'convert', mb, str(out_pmtiles)],
check=True, capture_output=True)
for f in (src, warped, mb):
Path(f).unlink(missing_ok=True)
def main():
name = latest_archive()
run_id = re.search(r'RV(\d{10})', name).group(1) # YYMMDDHHMM (UTC)
run_dir = OUT_DIR / f'run-{run_id}'
if run_dir.exists():
print(f"Lauf {run_id} existiert schon — nichts zu tun.")
return
print(f"Lade {name}")
with tempfile.TemporaryDirectory() as td:
tmp = Path(td)
arch = tmp / name
urllib.request.urlretrieve(BASE_URL + name, arch)
work = OUT_DIR / f'.tmp-{run_id}'
if work.exists():
shutil.rmtree(work)
work.mkdir(parents=True)
frames = []
with tarfile.open(arch, 'r:bz2') as tf:
members = sorted(tf.getnames())
for i, m in enumerate(members):
lead = int(m.rsplit('_', 1)[1])
if (i % FRAME_STEP) != 0:
continue
vals = parse_frame(tf.extractfile(m).read())
out = work / f'rv_{lead:03d}.pmtiles'
frame_to_pmtiles(vals, out, tmp)
frames.append({'lead_min': lead, 'file': out.name})
print(f" Frame +{lead:03d} min → {out.name}")
# Manifest + atomarer Swap: erst Verzeichnis, dann manifest.json auf den neuen Lauf
ts = run_id # YYMMDDHHMM UTC
iso = f"20{ts[0:2]}-{ts[2:4]}-{ts[4:6]}T{ts[6:8]}:{ts[8:10]}:00Z"
manifest = {'run': run_id, 'run_time_utc': iso, 'interval_min': 5 * FRAME_STEP,
'min_zoom': MIN_ZOOM, 'max_zoom': MAX_ZOOM,
'path': run_dir.name, 'frames': frames}
(work / 'manifest.json').write_text(json.dumps(manifest))
work.rename(run_dir)
(OUT_DIR / 'manifest.json.tmp').write_text(json.dumps(manifest))
(OUT_DIR / 'manifest.json.tmp').rename(OUT_DIR / 'manifest.json')
# Alte Läufe aufräumen (die letzten KEEP_RUNS behalten)
runs = sorted([d for d in OUT_DIR.iterdir() if d.is_dir() and d.name.startswith('run-')])
for old in runs[:-KEEP_RUNS]:
shutil.rmtree(old, ignore_errors=True)
print(f"Fertig: Lauf {run_id}, {len(frames)} Frames → {run_dir}")
if __name__ == '__main__':
sys.exit(main())

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View file

@ -0,0 +1,113 @@
#!/usr/bin/env python3
"""DWD-RV-PoC: Frame dekodieren + Georeferenzierung beweisen.
Läuft im osgeo/gdal-Container (python3 + GDAL + numpy).
Schritte:
1. RV-Frame parsen (Header bis ETX, 1200×1100 uint16 LE,
Wert = (raw & 0x0FFF) * 10^PR mm/5min, raw & 0x2000 = kein Echo/kein Daten)
2. DE1200-CRS (polar-stereografisch, WGS84-Ellipsoid, wradlib-Parameter):
ANKER-BEWEIS: (9°E, 51°N) muss auf (470000, 600000) m projizieren.
3. GeoTIFF (Float) im DE1200-CRS gdalwarp nach EPSG:3857
4. Ecken in WGS84 ausgeben (Plausibilität: Deutschland-Umgriff)
Doku: docs/DWD_RAIN_FORECAST_PLAN.md · Parameter: wradlib georef/projection.py
"""
import sys
import numpy as np
from osgeo import gdal, osr
gdal.UseExceptions()
NCOLS, NROWS = 1100, 1200
# DE1200, WGS84-Variante (wradlib _radolan_ref['wgs84']['de1200']):
# False Easting/Northing so, dass die LINKE UNTERE Gitterecke bei (0,0) liegt.
DE1200_WKT = (
'PROJCS["Radolan Projection",'
'GEOGCS["Radolan Coordinate System",'
'DATUM["Radolan_Kugel",SPHEROID["WGS 84", 6378137, 298.25722356301]],'
'PRIMEM["Greenwich", 0],'
'UNIT["degree", 0.017453292519943295]],'
'PROJECTION["Polar_Stereographic"],'
'PARAMETER["latitude_of_origin", 60],'
'PARAMETER["central_meridian", 10],'
'PARAMETER["false_easting", 543196.83521776402],'
'PARAMETER["false_northing", 3622588.8619310018],'
'UNIT["m", 1]]'
)
def parse_frame(path):
raw = open(path, 'rb').read()
etx = raw.index(b'\x03')
header = raw[:etx].decode('ascii', 'replace')
# PR-Feld: Genauigkeit, z.B. "PR E-02" → Faktor 0.01
prec = 0.01
if 'E-' in header:
try:
prec = 10 ** -int(header.split('E-')[1][:2])
except Exception:
pass
data = np.frombuffer(raw[etx + 1:], dtype='<u2')
assert data.size == NCOLS * NROWS, f"Datenlänge {data.size} != {NCOLS*NROWS}"
grid = data.reshape(NROWS, NCOLS) # Zeile 0 = SÜDLICHSTE Zeile (RADOLAN: Start links unten)
nodata = (grid & 0x2000) > 0
vals = (grid & 0x0FFF).astype(np.float32) * prec # mm / 5 min
vals[nodata] = np.nan
return header, vals
def main(frame_path, out_prefix):
header, vals = parse_frame(frame_path)
print("Header:", header[:120])
print(f"Werte: min={np.nanmin(vals):.2f} max={np.nanmax(vals):.2f} mm/5min, "
f"Regen-Pixel (>0): {(np.nan_to_num(vals) > 0).sum()}")
srs = osr.SpatialReference(); srs.ImportFromWkt(DE1200_WKT)
wgs = osr.SpatialReference(); wgs.ImportFromEPSG(4326)
wgs.SetAxisMappingStrategy(osr.OAMS_TRADITIONAL_GIS_ORDER)
# --- ANKER-BEWEIS: (9E, 51N) liegt auf der MITTE von Pixel (Spalte 470, Zeile 600
# von unten). In GDALs Achsen-Konvention (polar-stereografisch, Süden negativ) belegt
# das Gitter x ∈ [0, 1100000], y ∈ [-1200000, 0] → Anker ≈ (469500, -599500).
to_de = osr.CoordinateTransformation(wgs, srs)
ax, ay, _ = to_de.TransformPoint(9.0, 51.0)
print(f"Anker (9E,51N) → ({ax:.1f}, {ay:.1f}) [erwartet ≈ (469500, -599500)]")
if abs(ax - 469500) > 600 or abs(ay + 599500) > 600:
print("FEHLER: Anker-Abweichung > 600 m — Projektionsparameter falsch!")
sys.exit(1)
# --- Gitter-Ecken in WGS84 (Plausibilität: Deutschland-Umgriff) ---
to_wgs = osr.CoordinateTransformation(srs, wgs)
for name, (x, y) in [("LL", (0, -NROWS * 1000)), ("LR", (NCOLS * 1000, -NROWS * 1000)),
("UL", (0, 0)), ("UR", (NCOLS * 1000, 0))]:
lon, lat, _ = to_wgs.TransformPoint(float(x), float(y))
print(f"Ecke {name}: {lon:.4f}E {lat:.4f}N")
# --- GeoTIFF im DE1200-CRS (Zeile 0 der Datei = Süden → für GDAL flippen) ---
drv = gdal.GetDriverByName('GTiff')
ds = drv.Create(f"{out_prefix}_de1200.tif", NCOLS, NROWS, 1, gdal.GDT_Float32,
options=['COMPRESS=DEFLATE'])
ds.SetProjection(DE1200_WKT)
# GeoTransform: linke OBERE Ecke (0, 0) — Gitter-y läuft in diesem CRS südwärts negativ
ds.SetGeoTransform((0, 1000, 0, 0, 0, -1000))
band = ds.GetRasterBand(1)
band.SetNoDataValue(-1)
flipped = np.flipud(np.nan_to_num(vals, nan=-1))
band.WriteArray(flipped)
ds = None
print(f"OK: {out_prefix}_de1200.tif geschrieben")
# --- Warp nach EPSG:3857 ---
gdal.Warp(f"{out_prefix}_3857.tif", f"{out_prefix}_de1200.tif",
dstSRS='EPSG:3857', xRes=1000, yRes=1000,
srcNodata=-1, dstNodata=-1, resampleAlg='near',
creationOptions=['COMPRESS=DEFLATE'])
info = gdal.Info(f"{out_prefix}_3857.tif", format='json')
cc = info['cornerCoordinates']
print(f"3857-Bounds: UL={cc['upperLeft']} LR={cc['lowerRight']}")
print("OK: Warp nach EPSG:3857 fertig")
if __name__ == '__main__':
main(sys.argv[1], sys.argv[2])

Binary file not shown.

Binary file not shown.

Binary file not shown.