#!/usr/bin/env python3 """DWD-RV-PoC: Frame dekodieren + Georeferenzierung beweisen. Läuft im osgeo/gdal-Container (python3 + GDAL + numpy). Schritte: 1. RV-Frame parsen (Header bis ETX, 1200×1100 uint16 LE, Wert = (raw & 0x0FFF) * 10^PR mm/5min, raw & 0x2000 = kein Echo/kein Daten) 2. DE1200-CRS (polar-stereografisch, WGS84-Ellipsoid, wradlib-Parameter): ANKER-BEWEIS: (9°E, 51°N) muss auf ≈ (470000, 600000) m projizieren. 3. GeoTIFF (Float) im DE1200-CRS → gdalwarp nach EPSG:3857 4. Ecken in WGS84 ausgeben (Plausibilität: Deutschland-Umgriff) Doku: docs/DWD_RAIN_FORECAST_PLAN.md · Parameter: wradlib georef/projection.py """ import sys import numpy as np from osgeo import gdal, osr gdal.UseExceptions() NCOLS, NROWS = 1100, 1200 # DE1200, WGS84-Variante (wradlib _radolan_ref['wgs84']['de1200']): # False Easting/Northing so, dass die LINKE UNTERE Gitterecke bei (0,0) liegt. DE1200_WKT = ( 'PROJCS["Radolan Projection",' 'GEOGCS["Radolan Coordinate System",' 'DATUM["Radolan_Kugel",SPHEROID["WGS 84", 6378137, 298.25722356301]],' 'PRIMEM["Greenwich", 0],' 'UNIT["degree", 0.017453292519943295]],' 'PROJECTION["Polar_Stereographic"],' 'PARAMETER["latitude_of_origin", 60],' 'PARAMETER["central_meridian", 10],' 'PARAMETER["false_easting", 543196.83521776402],' 'PARAMETER["false_northing", 3622588.8619310018],' 'UNIT["m", 1]]' ) def parse_frame(path): raw = open(path, 'rb').read() etx = raw.index(b'\x03') header = raw[:etx].decode('ascii', 'replace') # PR-Feld: Genauigkeit, z.B. "PR E-02" → Faktor 0.01 prec = 0.01 if 'E-' in header: try: prec = 10 ** -int(header.split('E-')[1][:2]) except Exception: pass data = np.frombuffer(raw[etx + 1:], dtype=' 0 vals = (grid & 0x0FFF).astype(np.float32) * prec # mm / 5 min vals[nodata] = np.nan return header, vals def main(frame_path, out_prefix): header, vals = parse_frame(frame_path) print("Header:", header[:120]) print(f"Werte: min={np.nanmin(vals):.2f} max={np.nanmax(vals):.2f} mm/5min, " f"Regen-Pixel (>0): {(np.nan_to_num(vals) > 0).sum()}") srs = osr.SpatialReference(); srs.ImportFromWkt(DE1200_WKT) wgs = osr.SpatialReference(); wgs.ImportFromEPSG(4326) wgs.SetAxisMappingStrategy(osr.OAMS_TRADITIONAL_GIS_ORDER) # --- ANKER-BEWEIS: (9E, 51N) liegt auf der MITTE von Pixel (Spalte 470, Zeile 600 # von unten). In GDALs Achsen-Konvention (polar-stereografisch, Süden negativ) belegt # das Gitter x ∈ [0, 1100000], y ∈ [-1200000, 0] → Anker ≈ (469500, -599500). to_de = osr.CoordinateTransformation(wgs, srs) ax, ay, _ = to_de.TransformPoint(9.0, 51.0) print(f"Anker (9E,51N) → ({ax:.1f}, {ay:.1f}) [erwartet ≈ (469500, -599500)]") if abs(ax - 469500) > 600 or abs(ay + 599500) > 600: print("FEHLER: Anker-Abweichung > 600 m — Projektionsparameter falsch!") sys.exit(1) # --- Gitter-Ecken in WGS84 (Plausibilität: Deutschland-Umgriff) --- to_wgs = osr.CoordinateTransformation(srs, wgs) for name, (x, y) in [("LL", (0, -NROWS * 1000)), ("LR", (NCOLS * 1000, -NROWS * 1000)), ("UL", (0, 0)), ("UR", (NCOLS * 1000, 0))]: lon, lat, _ = to_wgs.TransformPoint(float(x), float(y)) print(f"Ecke {name}: {lon:.4f}E {lat:.4f}N") # --- GeoTIFF im DE1200-CRS (Zeile 0 der Datei = Süden → für GDAL flippen) --- drv = gdal.GetDriverByName('GTiff') ds = drv.Create(f"{out_prefix}_de1200.tif", NCOLS, NROWS, 1, gdal.GDT_Float32, options=['COMPRESS=DEFLATE']) ds.SetProjection(DE1200_WKT) # GeoTransform: linke OBERE Ecke (0, 0) — Gitter-y läuft in diesem CRS südwärts negativ ds.SetGeoTransform((0, 1000, 0, 0, 0, -1000)) band = ds.GetRasterBand(1) band.SetNoDataValue(-1) flipped = np.flipud(np.nan_to_num(vals, nan=-1)) band.WriteArray(flipped) ds = None print(f"OK: {out_prefix}_de1200.tif geschrieben") # --- Warp nach EPSG:3857 --- gdal.Warp(f"{out_prefix}_3857.tif", f"{out_prefix}_de1200.tif", dstSRS='EPSG:3857', xRes=1000, yRes=1000, srcNodata=-1, dstNodata=-1, resampleAlg='near', creationOptions=['COMPRESS=DEFLATE']) info = gdal.Info(f"{out_prefix}_3857.tif", format='json') cc = info['cornerCoordinates'] print(f"3857-Bounds: UL={cc['upperLeft']} LR={cc['lowerRight']}") print("OK: Warp nach EPSG:3857 fertig") if __name__ == '__main__': main(sys.argv[1], sys.argv[2])