PoC BESTANDEN (tools/dwd-radar/poc): Anker (9E,51N) = Pixel-Mitte (470/600),
Ecken decken sich mit der DWD-DE1200-Spec — Georeferenzierung bewiesen.
- tools/dwd-radar: RV-Komposit (25 Frames, 0-120min) -> kolorierte RGBA-
PMTiles z4-7 je Frame (MapLibre overzoomt darueber) + manifest.json,
atomarer Swap, KEEP_RUNS-Aufraeumen; 25 Frames in ~14s lokal
- docker-compose.dwd.yml (DSM-Cron alle 5 min, NIE --remove-orphans)
- main.py: /radar/manifest.json (no-store) + /radar/{run}/{file} (Range/206,
immutable — Run-Id im Pfad); sw.js: /radar/ pass-through
- map.js: Radar-Frames heterogen ({url,time,dwd}) — DWD ersetzt RainViewer-
Nowcast (0-120min, 5-min-Schritte) wenn Toggle an + GL + Karte in DE +
Manifest frisch (<30min); sonst RainViewer-Fallback; Label '+X Min - DWD'
- settings.js: Toggle 'DWD-Regenvorhersage' (by_dwd_radar, Default AN)
- pytest 39 passed
Bump v1240
188 lines
7.4 KiB
Python
188 lines
7.4 KiB
Python
#!/usr/bin/env python3
|
||
"""DWD-RV-Regenvorhersage → PMTiles-Frames + Manifest.
|
||
|
||
Läuft im Container (python3 + GDAL + numpy + pmtiles-CLI, s. Dockerfile).
|
||
Alle 5 Min (Cron auf der DS):
|
||
1. neuestes DE1200_RV-Komposit von opendata.dwd.de laden (25 Frames, 0–120 min)
|
||
2. je Frame: decodieren → RGBA kolorieren → DE1200-GeoTIFF → Warp 3857
|
||
→ MBTiles (z0–9) → PMTiles
|
||
3. manifest.json + atomarer Swap nach OUT_DIR (rename), alte Läufe aufräumen
|
||
|
||
Georeferenzierung BEWIESEN (PoC tools/dwd-radar/poc/, 2026-06-08): Anker (9E,51N)
|
||
= Pixel-Mitte (470/600), Ecken decken sich mit der DWD-DE1200-Spec.
|
||
Format: 194-Byte-ASCII-Header bis ETX, 1200×1100 uint16 LE,
|
||
Wert = (raw & 0x0FFF) × 10^-PR mm/5min, raw & 0x2000 = kein Daten.
|
||
|
||
ENV: OUT_DIR (Default /out), FRAME_STEP (1 = alle 25 Frames, 2 = 10-Min-Schritte),
|
||
KEEP_RUNS (Default 2).
|
||
Zoom: Basis z7 (≈ native 1-km-Auflösung, ZOOM_LEVEL_STRATEGY=UPPER) + Overviews bis z4.
|
||
Darüber overzoomt MapLibre die Raster-Source nativ (Radar ist ohnehin 1-km-blockig);
|
||
unter z4 wird der Layer im Frontend ausgeblendet (minzoom).
|
||
"""
|
||
import json
|
||
import os
|
||
import re
|
||
import shutil
|
||
import subprocess
|
||
import sys
|
||
import tarfile
|
||
import tempfile
|
||
import urllib.request
|
||
from pathlib import Path
|
||
|
||
import numpy as np
|
||
from osgeo import gdal, osr
|
||
|
||
gdal.UseExceptions()
|
||
|
||
BASE_URL = "https://opendata.dwd.de/weather/radar/composite/rv/"
|
||
OUT_DIR = Path(os.environ.get("OUT_DIR", "/out"))
|
||
FRAME_STEP = int(os.environ.get("FRAME_STEP", "1"))
|
||
KEEP_RUNS = int(os.environ.get("KEEP_RUNS", "2"))
|
||
MIN_ZOOM, MAX_ZOOM = 4, 7 # s. Docstring (Overzoom > z7 macht MapLibre)
|
||
NCOLS, NROWS = 1100, 1200
|
||
|
||
# DE1200, WGS84-Variante (wradlib-Parameter, PoC-verifiziert). In GDALs Achsen-
|
||
# Konvention belegt das Gitter x ∈ [0, 1100000], y ∈ [-1200000, 0] (Süden negativ).
|
||
DE1200_WKT = (
|
||
'PROJCS["Radolan Projection",'
|
||
'GEOGCS["Radolan Coordinate System",'
|
||
'DATUM["Radolan_Kugel",SPHEROID["WGS 84", 6378137, 298.25722356301]],'
|
||
'PRIMEM["Greenwich", 0],'
|
||
'UNIT["degree", 0.017453292519943295]],'
|
||
'PROJECTION["Polar_Stereographic"],'
|
||
'PARAMETER["latitude_of_origin", 60],'
|
||
'PARAMETER["central_meridian", 10],'
|
||
'PARAMETER["false_easting", 543196.83521776402],'
|
||
'PARAMETER["false_northing", 3622588.8619310018],'
|
||
'UNIT["m", 1]]'
|
||
)
|
||
|
||
# Farbskala mm/5min → RGBA (an gängige Radar-Paletten angelehnt, RainViewer-ähnlich).
|
||
# Unter 0,05 mm/5min transparent (Rauschen), darüber blau→grün→gelb→orange→rot→violett.
|
||
SCALE = [
|
||
(0.05, (60, 130, 220, 110)),
|
||
(0.15, (40, 160, 230, 150)),
|
||
(0.40, (50, 200, 130, 170)),
|
||
(0.80, (230, 210, 70, 190)),
|
||
(1.50, (240, 150, 50, 210)),
|
||
(3.00, (235, 70, 50, 230)),
|
||
(6.00, (180, 40, 150, 240)),
|
||
(99.0, (130, 20, 110, 250)),
|
||
]
|
||
|
||
|
||
def latest_archive():
|
||
html = urllib.request.urlopen(BASE_URL, timeout=30).read().decode()
|
||
names = sorted(set(re.findall(r'DE1200_RV\d{10}\.tar\.bz2', html)))
|
||
if not names:
|
||
raise RuntimeError("Kein RV-Komposit im DWD-Verzeichnis gefunden")
|
||
return names[-1]
|
||
|
||
|
||
def parse_frame(raw):
|
||
etx = raw.index(b'\x03')
|
||
header = raw[:etx].decode('ascii', 'replace')
|
||
prec = 0.01
|
||
m = re.search(r'PR E-(\d{2})', header)
|
||
if m:
|
||
prec = 10 ** -int(m.group(1))
|
||
data = np.frombuffer(raw[etx + 1:], dtype='<u2')
|
||
if data.size != NCOLS * NROWS:
|
||
raise ValueError(f"Datenlänge {data.size} != {NCOLS * NROWS}")
|
||
grid = data.reshape(NROWS, NCOLS) # Zeile 0 = Süden
|
||
nodata = (grid & 0x2000) > 0
|
||
vals = (grid & 0x0FFF).astype(np.float32) * prec
|
||
vals[nodata] = 0.0 # kein Daten = transparent wie kein Regen
|
||
return vals
|
||
|
||
|
||
def colorize(vals):
|
||
"""mm/5min → RGBA uint8 (4, NROWS, NCOLS), Zeile 0 = Norden (für GDAL geflippt)."""
|
||
rgba = np.zeros((4, NROWS, NCOLS), dtype=np.uint8)
|
||
lower = 0.0
|
||
for thresh, (r, g, b, a) in SCALE:
|
||
m = (vals > lower) & (vals <= thresh) if lower > 0 else (vals >= 0.05) & (vals <= thresh)
|
||
rgba[0][m], rgba[1][m], rgba[2][m], rgba[3][m] = r, g, b, a
|
||
lower = thresh
|
||
return rgba[:, ::-1, :] # Süd-zuerst → Nord-zuerst
|
||
|
||
|
||
def frame_to_pmtiles(vals, out_pmtiles, tmp):
|
||
rgba = colorize(vals)
|
||
drv = gdal.GetDriverByName('GTiff')
|
||
src = str(tmp / 'frame_de1200.tif')
|
||
ds = drv.Create(src, NCOLS, NROWS, 4, gdal.GDT_Byte, options=['COMPRESS=DEFLATE'])
|
||
ds.SetProjection(DE1200_WKT)
|
||
ds.SetGeoTransform((0, 1000, 0, 0, 0, -1000)) # linke OBERE Ecke (0,0), y südwärts
|
||
for i in range(4):
|
||
ds.GetRasterBand(i + 1).WriteArray(rgba[i])
|
||
ds = None
|
||
|
||
# Warp 3857 + MBTiles z0–MAX_ZOOM + Overviews
|
||
warped = str(tmp / 'frame_3857.tif')
|
||
gdal.Warp(warped, src, dstSRS='EPSG:3857', resampleAlg='near',
|
||
creationOptions=['COMPRESS=DEFLATE'])
|
||
mb = str(tmp / 'frame.mbtiles')
|
||
gdal.Translate(mb, warped, format='MBTILES',
|
||
creationOptions=['TILE_FORMAT=PNG', 'ZOOM_LEVEL_STRATEGY=UPPER'])
|
||
mbds = gdal.Open(mb, gdal.GA_Update)
|
||
mbds.BuildOverviews('AVERAGE', [2 ** i for i in range(1, MAX_ZOOM - MIN_ZOOM + 1)])
|
||
mbds = None
|
||
subprocess.run(['pmtiles', 'convert', mb, str(out_pmtiles)],
|
||
check=True, capture_output=True)
|
||
for f in (src, warped, mb):
|
||
Path(f).unlink(missing_ok=True)
|
||
|
||
|
||
def main():
|
||
name = latest_archive()
|
||
run_id = re.search(r'RV(\d{10})', name).group(1) # YYMMDDHHMM (UTC)
|
||
run_dir = OUT_DIR / f'run-{run_id}'
|
||
if run_dir.exists():
|
||
print(f"Lauf {run_id} existiert schon — nichts zu tun.")
|
||
return
|
||
|
||
print(f"Lade {name} …")
|
||
with tempfile.TemporaryDirectory() as td:
|
||
tmp = Path(td)
|
||
arch = tmp / name
|
||
urllib.request.urlretrieve(BASE_URL + name, arch)
|
||
work = OUT_DIR / f'.tmp-{run_id}'
|
||
if work.exists():
|
||
shutil.rmtree(work)
|
||
work.mkdir(parents=True)
|
||
|
||
frames = []
|
||
with tarfile.open(arch, 'r:bz2') as tf:
|
||
members = sorted(tf.getnames())
|
||
for i, m in enumerate(members):
|
||
lead = int(m.rsplit('_', 1)[1])
|
||
if (i % FRAME_STEP) != 0:
|
||
continue
|
||
vals = parse_frame(tf.extractfile(m).read())
|
||
out = work / f'rv_{lead:03d}.pmtiles'
|
||
frame_to_pmtiles(vals, out, tmp)
|
||
frames.append({'lead_min': lead, 'file': out.name})
|
||
print(f" Frame +{lead:03d} min → {out.name}")
|
||
|
||
# Manifest + atomarer Swap: erst Verzeichnis, dann manifest.json auf den neuen Lauf
|
||
ts = run_id # YYMMDDHHMM UTC
|
||
iso = f"20{ts[0:2]}-{ts[2:4]}-{ts[4:6]}T{ts[6:8]}:{ts[8:10]}:00Z"
|
||
manifest = {'run': run_id, 'run_time_utc': iso, 'interval_min': 5 * FRAME_STEP,
|
||
'min_zoom': MIN_ZOOM, 'max_zoom': MAX_ZOOM,
|
||
'path': run_dir.name, 'frames': frames}
|
||
(work / 'manifest.json').write_text(json.dumps(manifest))
|
||
work.rename(run_dir)
|
||
(OUT_DIR / 'manifest.json.tmp').write_text(json.dumps(manifest))
|
||
(OUT_DIR / 'manifest.json.tmp').rename(OUT_DIR / 'manifest.json')
|
||
|
||
# Alte Läufe aufräumen (die letzten KEEP_RUNS behalten)
|
||
runs = sorted([d for d in OUT_DIR.iterdir() if d.is_dir() and d.name.startswith('run-')])
|
||
for old in runs[:-KEEP_RUNS]:
|
||
shutil.rmtree(old, ignore_errors=True)
|
||
print(f"Fertig: Lauf {run_id}, {len(frames)} Frames → {run_dir}")
|
||
|
||
|
||
if __name__ == '__main__':
|
||
sys.exit(main())
|