Source code for windmap

"""
This is file generates a 3d windrose plot for a particular coordinate and timestamp.
The polar plot displays information on wind speed and direction  at
various altitudes in a visual format

"""

import numpy as np
import matplotlib.pyplot as plt
import matplotlib as mpl
from termcolor import colored
from scipy.interpolate import CubicSpline
import sys

import EarthSHAB.config_earth as config_earth
import EarthSHAB.GFS as GFS
import EarthSHAB.ERA5 as ERA5
from EarthSHAB.GFS import _spline_uv

[docs] class Windmap:
[docs] def __init__(self): if config_earth.forecast['forecast_type'] == "GFS": self.gfs = GFS.GFS(config_earth.simulation['start_coord']) self.file = self.gfs.file else: self.era5 = ERA5.ERA5(config_earth.simulation['start_coord']) self.file = self.era5.file #FIX THE DEFINING OF THESE Variables #******************* #self.res = config_earth.netcdf_gfs['res'] #fix this self.nc_start = config_earth.netcdf_gfs["nc_start"] #fix this self.start_time = config_earth.simulation["start_time"] self.coord = config_earth.simulation['start_coord'] self.LAT = self.coord["lat"] self.LON = self.coord["lon"] # VERIFY TIMESTAMP INFO MAKES SENSE hours = None self.new_timestamp = None #EDIT TO REMOVE THESE #Should I move these to ERA5 and GFS for organization? if config_earth.forecast['forecast_type'] == "GFS": self.lat = self.file.variables['lat'][:] self.lon = self.file.variables['lon'][:] self.levels = self.file.variables['lev'][:] self.vgrdprs = self.file.variables['vgrdprs'] self.ugrdprs = self.file.variables['ugrdprs'] self.hgtprs = self.file.variables['hgtprs'] try: self.tmpprs = self.file.variables['tmpprs'] except: print(colored("Temperature not downloaded in this GFS forecast", "yellow")) self.tmpprs = None self.hour_index, self.new_timestamp = self.getHourIndex(self.start_time) if config_earth.forecast['forecast_type'] == "ERA5": self.lat = self.file.variables['latitude'][:] self.lon = self.file.variables['longitude'][:] self.levels = self.file.variables['level'][:] self.vgrdprs = self.file.variables['v'] self.ugrdprs = self.file.variables['u'] self.hgtprs = self.file.variables['z'] self.tmpprs = self.file.variables['time'] #are t and time the same? #THIS IS WRONG SHOULD BE TEMPERATURE self.hour_index, self.new_timestamp = self.getHourIndex(self.start_time)
def closest(self,arr, k): return min(range(len(arr)), key = lambda i: abs(arr[i]-k))
[docs] def time_in_range(self,start, end, x): """Return true if x is in the range [start, end]""" if start <= end: return start <= x <= end else: return start <= x or x <= end
#THIS IS ASSUMING THE CORRECT ERA5 TIME PERIOD HAS BEEN DOWNLOADED def getHourIndex(self,start_time): if config_earth.forecast['forecast_type'] == "GFS": times = self.gfs.time_convert else: times = self.era5.time_convert #Check is simulation start time is within netcdf file if not self.time_in_range(times[0], times[-1], self.start_time): print(colored("Simulation start time " + str(self.start_time) + " is not within netcdf timerange of " + str(times[0]) + " - " + str(times[-1]) , "red")) sys.exit() else: print(colored("Simulation start time " + str(self.start_time) + " is within netcdf timerange of " + str(times[0]) + " - " + str(times[-1]) , "green")) #Find closest time using lambda function: closest_time = min(times, key=lambda sub: abs(sub - start_time)) #Need to write some exception handling code #Find the corresponding index to a matching key (closest time) in the array (full list of timestamps in netcdf) hour_index = [i for i ,e in enumerate(times) if e == closest_time][0] return hour_index, closest_time
[docs] def windVectorToBearing(self, u, v, h): """ Converts U-V wind data at specific heights to angular and radial components for polar plotting. :param u: U-Vector Wind Component from Forecast :type u: float64 array :param v: V-Vector Wind Component from Forecast :type v: float64 array :param h: Corresponding Converted Altitudes (m) from Forecast :type h: float64 array :returns: Array of bearings, radius, colors, and color map for plotting :rtype: array """ # Calculate altitude bearing = np.arctan2(v,u) bearing = np.unwrap(bearing) r = np.power((np.power(u,2)+np.power(v,2)),.5) # Set up Color Bar colors = h cmap=mpl.colors.ListedColormap(colors) return [bearing, r , colors, cmap]
[docs] def plotWind2(self, hour_index, lat, lon, num_interpolations=100, interpolation='linear'): """ Plots a 3D windrose by interpolating wind speed and direction between pressure levels. :param hour_index: Time index from forecast file :type hour_index: int :param lat: Latitude coordinate [deg] :type lat: float :param lon: Longitude coordinate [deg] :type lon: float :param num_interpolations: Points to interpolate between adjacent pressure levels (linear) or total sample points across the full altitude range (spline) :type num_interpolations: int :param interpolation: ``'linear'`` interpolates speed and direction between adjacent pressure levels with angle-wrap correction (default); ``'spline'`` fits a CubicSpline across all pressure levels for smoother, continuous curves :type interpolation: str :returns: 3D windrose plot :rtype: matplotlib.plot """ if config_earth.forecast['forecast_type'] == "GFS": lat_i = self.gfs.getNearestLat(lat, -90, 90.01) lon_i = self.gfs.getNearestLon(lon, 0, 360) ugrd, vgrd, hgt = self.gfs.ugdrps0, self.gfs.vgdrps0, self.gfs.hgtprs elif config_earth.forecast['forecast_type'] == "ERA5": lat_i = self.era5.getNearestLatIdx(lat, self.era5.lat_min_idx, self.era5.lat_max_idx) lon_i = self.era5.getNearestLonIdx(lon, self.era5.lon_min_idx, self.era5.lon_max_idx) ugrd, vgrd, hgt = self.ugrdprs, self.vgrdprs, self.hgtprs g = 9.80665 u = ugrd[hour_index,:,lat_i,lon_i] v = vgrd[hour_index,:,lat_i,lon_i] h = hgt[hour_index,:,lat_i,lon_i] # Remove missing data u = u.filled(np.nan) v = v.filled(np.nan) nans = ~np.isnan(u) u = u[nans] v = v[nans] h = h[nans] if config_earth.forecast['forecast_type'] == "ERA5": u = np.flip(u) v = np.flip(v) h = np.flip(h) h = h / g bearing, r, _, _ = self.windVectorToBearing(u, v, h) if interpolation == 'spline': # Fit CubicSpline across all pressure levels on the unwrapped bearing and speed h_new = np.linspace(h[0], h[-1], num_interpolations * (len(h) - 1)) cs_r = CubicSpline(h, r) cs_bearing = CubicSpline(h, bearing) # bearing is already np.unwrap'd interpolated_speeds = cs_r(h_new) interpolated_directions_deg = np.degrees(cs_bearing(h_new)) % 360 interpolated_altitudes = h_new else: # Linear interpolation between adjacent pressure levels with angle-wrap correction interpolated_altitudes = [] interpolated_speeds = [] interpolated_directions_deg = [] for i in range(len(h) - 1): angle1 = np.degrees(bearing[i]) % 360 angle2 = np.degrees(bearing[i + 1]) % 360 if abs(angle2 - angle1) > 180: if angle2 > angle1: angle1 += 360 else: angle2 += 360 for j in range(num_interpolations + 1): alpha = j / num_interpolations interp_alt = h[i] + alpha * (h[i + 1] - h[i]) interpolated_altitudes.append(interp_alt) interpolated_speeds.append(np.interp(interp_alt, [h[i], h[i + 1]], [r[i], r[i + 1]])) interpolated_directions_deg.append( np.interp(interp_alt, [h[i], h[i + 1]], [angle1, angle2]) % 360 ) forecast_type = config_earth.forecast['forecast_type'] interp_label = "Spline" if interpolation == 'spline' else "Linear" fig = plt.figure(figsize=(10, 8)) ax1 = fig.add_subplot(111, projection='polar') sc = ax1.scatter(np.radians(interpolated_directions_deg), interpolated_altitudes, c=interpolated_speeds, cmap='winter', s=2) ax1.title.set_text(f"{forecast_type} 3D Windrose for ({self.LAT}, {self.LON}) on {self.new_timestamp}") plt.colorbar(sc, label='Wind Speed (m/s)') fig.suptitle(f"Wind Interpolation using Speed/Direction {interp_label} Interpolation")
def _profile_at(self, hour_index, lat, lon): """Return (u, v, h) pressure-level wind profile at a (time, lat, lon), with NaNs removed and ERA5 reversed/geopotential-converted to match the conventions used by GFS/ERA5.wind_alt_Interpolate2. Looks up lat/lon directly against the FULL netCDF lat/lon arrays attached to self (self.lat / self.lon), since self.ugrdprs etc. are also the full netCDF variables. """ g = 9.80665 if config_earth.forecast['forecast_type'] == "GFS": lon_q = float(lon) % 360.0 else: lon_q = float(lon) lat_i = self.closest(self.lat, float(lat)) lon_i = self.closest(self.lon, lon_q) u = self.ugrdprs[hour_index, :, lat_i, lon_i].filled(np.nan) v = self.vgrdprs[hour_index, :, lat_i, lon_i].filled(np.nan) h = np.asarray(self.hgtprs[hour_index, :, lat_i, lon_i]) nans = ~np.isnan(u) u, v, h = u[nans], v[nans], h[nans] if config_earth.forecast['forecast_type'] == "ERA5": u = np.flip(u); v = np.flip(v); h = np.flip(h) / g return u, v, h @staticmethod def _interp_linear_neighbors(h_query, h, u, v): """Reproduces GFS.interpolateBearing semantics across a query grid: for each h_query, find enclosing pressure levels, linearly interp bearing+speed (with 0/360° wrap correction), convert back to u,v. """ u_out = np.empty_like(h_query, dtype=float) v_out = np.empty_like(h_query, dtype=float) bearing = np.degrees(np.arctan2(v, u)) % 360.0 speed = np.hypot(u, v) for q_idx, hq in enumerate(h_query): # find enclosing indices if hq <= h[0]: i0, i1 = 0, 1 elif hq >= h[-1]: i0, i1 = len(h) - 2, len(h) - 1 else: i1 = int(np.searchsorted(h, hq)) i0 = i1 - 1 a1, a2 = bearing[i0], bearing[i1] if abs(a2 - a1) > 180: if a2 > a1: a1 += 360 else: a2 += 360 sp = np.interp(hq, [h[i0], h[i1]], [speed[i0], speed[i1]]) dr = np.interp(hq, [h[i0], h[i1]], [a1, a2]) % 360 u_out[q_idx] = sp * np.cos(np.radians(dr)) v_out[q_idx] = sp * np.sin(np.radians(dr)) return u_out, v_out def _interp_method(self, method, h_query, h, u, v): """Compute (u, v) on h_query using the named production interpolation. :param method: one of ``'linear_neighbors'``, ``'linear_full'``, ``'spline_full'`` """ if method == 'linear_neighbors': return self._interp_linear_neighbors(h_query, h, u, v) if method == 'linear_full': return np.interp(h_query, h, u), np.interp(h_query, h, v) if method == 'spline_full': u_out = np.empty_like(h_query, dtype=float) v_out = np.empty_like(h_query, dtype=float) for i, hq in enumerate(h_query): u_out[i], v_out[i] = _spline_uv(hq, h, u, v) return u_out, v_out raise ValueError( f"Unknown wind_interpolation method {method!r}. " "Expected 'linear_neighbors', 'linear_full', or 'spline_full'." )
[docs] def plotWindMethod(self, hour_index, lat, lon, method='linear_full', alt_step=100.0, ax=None, vmin=None, vmax=None, alt_max=None, show_title=True): """3D polar windrose for a single production interpolation method. Same visual conventions as :func:`plotWind2` (polar projection, radius = altitude, angle = bearing, color = wind speed), but the wind profile is interpolated using the named production method from ``config_earth.forecast['wind_interpolation']`` instead of the ad-hoc visualization-only schemes in ``plotWind2``. This makes the panel a faithful picture of what the simulator actually sees with that method. :param method: ``'linear_neighbors'``, ``'linear_full'``, or ``'spline_full'`` :type method: str :param alt_step: Altitude sample spacing [m] for the dense query grid :type alt_step: float :param ax: Optional pre-existing polar matplotlib Axes :param vmin: Color-scale lower bound for wind speed [m/s] (shared bounds across panels) :param vmax: Color-scale upper bound for wind speed [m/s] :param alt_max: Outer radial limit [m] (shared across panels) :returns: (Figure, PathCollection) — the scatter handle lets the caller attach a colorbar """ u_lvl, v_lvl, h_lvl = self._profile_at(hour_index, lat, lon) if len(h_lvl) < 2: raise RuntimeError(f"Not enough valid pressure levels at ({lat}, {lon}).") h_dense = np.arange(h_lvl[0], h_lvl[-1] + alt_step / 2.0, alt_step) u, v = self._interp_method(method, h_dense, h_lvl, u_lvl, v_lvl) speed = np.hypot(u, v) bearing_deg = np.degrees(np.arctan2(v, u)) % 360.0 if ax is None: fig = plt.figure(figsize=(7, 7)) ax = fig.add_subplot(111, projection='polar') else: fig = ax.figure sc = ax.scatter(np.radians(bearing_deg), h_dense, c=speed, cmap='winter', s=4, vmin=vmin, vmax=vmax) # Overlay the raw pressure-level samples (larger black-edged dots). lvl_speed = np.hypot(u_lvl, v_lvl) lvl_bearing = np.degrees(np.arctan2(v_lvl, u_lvl)) % 360.0 ax.scatter(np.radians(lvl_bearing), h_lvl, c=lvl_speed, cmap='winter', s=45, edgecolor='black', linewidth=0.6, vmin=vmin, vmax=vmax, zorder=10) ax.set_xticks(ax.get_xticks()) ax.set_xticklabels(['E', '', 'N', '', 'W', '', 'S', '']) if alt_max is not None: ax.set_ylim(0, alt_max) if show_title: ax.set_title(method, fontsize=12, pad=14) return fig, sc
[docs] def plotWindMethodsComparison(self, hour_index, lat, lon, alt_step=100.0): """Three side-by-side polar windrose panels — one per production interpolation method — with shared color and radial scales. """ u_lvl, v_lvl, h_lvl = self._profile_at(hour_index, lat, lon) if len(h_lvl) < 2: raise RuntimeError(f"Not enough valid pressure levels at ({lat}, {lon}).") h_dense = np.arange(h_lvl[0], h_lvl[-1] + alt_step / 2.0, alt_step) # Pre-compute speeds across all methods so panels share color bounds. all_speeds = [np.hypot(u_lvl, v_lvl)] for m in ('linear_neighbors', 'linear_full', 'spline_full'): uu, vv = self._interp_method(m, h_dense, h_lvl, u_lvl, v_lvl) all_speeds.append(np.hypot(uu, vv)) vmin = 0.0 vmax = float(np.max(np.concatenate(all_speeds))) alt_max = float(h_dense[-1]) forecast_type = config_earth.forecast['forecast_type'] fig = plt.figure(figsize=(18, 7)) axes = [fig.add_subplot(1, 3, i + 1, projection='polar') for i in range(3)] sc_last = None for ax, method in zip(axes, ('linear_neighbors', 'linear_full', 'spline_full')): _, sc_last = self.plotWindMethod( hour_index, lat, lon, method=method, alt_step=alt_step, ax=ax, vmin=vmin, vmax=vmax, alt_max=alt_max, ) fig.suptitle( f"{forecast_type} wind interpolation comparison — " f"({lat:.3f}, {lon:.3f}) @ {self.new_timestamp}\n" "radius = altitude (m), angle = bearing, color = speed (m/s)", fontsize=12, ) # Single shared colorbar on the right. fig.colorbar(sc_last, ax=axes, orientation='vertical', shrink=0.85, pad=0.05, label='Wind Speed (m/s)') return fig