import numpy as np
import libinterp
from functools import wraps
dt = np.float32
[docs]class abstract(object):
def __init__(self, func, custom_name=None):
self._func = func
self._name = custom_name if custom_name is not None else func.__name__
wraps(func)(self)
return
def __call__(self, *args, **kwargs):
raise NotImplementedError("Function '%s' is abstract and must be implemented in a subclass." % self._name)
[docs]class Interpolator(object):
"""
Base class for the Interpolator objects. Needs to be re-implemented in a subclass.
Args:
wrap (bool): What to do with points that are out of the domain. `wrap` == True means to assign the nearest point
in the domain, `wrap` == False means to assign NaN to those points.
buffer (bool): Wether to add a one grid point buffer around the data when it's loaded (usually used when you
need a derivative across the plane of the data).
agl (bool): Wether or not the heights are above ground level (`agl` == True) or above mean sea level (`agl` ==
False).
"""
def __init__(self, wrap, buffer, agl):
"""
Constructor for the base class.
"""
self._wrap = wrap
self._buffer = buffer
self._agl = agl
self._axes = None
# self._bounds = None
return
@abstract
[docs] def get_bounds(self):
return
[docs] def set_axes(self, axes):
"""
Set the axes used by this interpolator.
Args:
axes (dict): A dictionary that must contain 'x', 'y', and 'z' keys corresponding to the x, y, and z axes. x
and y must be one-dimensional, z must be three-dimensional.
"""
self._axes = axes
self._bounds = self.get_bounds()
return
[docs] def get_axes(self):
"""
Return the axes used by this interpolator.
"""
return self._axes
[docs] def is_buffered(self):
"""
Return whether or not the interpolator needs to keep a one-grid point buffer.
"""
return self._buffer is not None
[docs] def is_agl(self):
"""
Return whether or not the heights in this interpolator refer to AGL (True) or MSL (False)
"""
return self._agl
@abstract
def __call__(self, data):
return
def _axes_check(self):
if not hasattr(self, '_axes'):
raise AttributeError("Need to set the axes using setAxes() first!")
return
def _buffer_bounds(self, lower_bound, upper_bound, axis_len):
if self._buffer:
if lower_bound == 0 and upper_bound == axis_len:
pass
elif lower_bound == 0:
upper_bound = upper_bound + 1
elif upper_bound == axis_len:
lower_bound = lower_bound - 1
else:
lower_bound = lower_bound - 1
upper_bound = upper_bound + 1
return lower_bound, upper_bound
def _find_z_bounds(self, interp_lb, interp_ub, decreasing=False):
self._axes_check()
z_coords = self._axes['z']
argmin_dist_lb = np.argmin(np.abs(z_coords[:, 1:-1, 1:-1] - interp_lb), axis=0)
argmin_dist_ub = np.argmin(np.abs(z_coords[:, 1:-1, 1:-1] - interp_ub), axis=0)
min_dist_lb = np.empty(argmin_dist_lb.shape)
min_dist_ub = np.empty(argmin_dist_ub.shape)
for index in np.ndindex(*min_dist_lb.shape):
min_dist_lb[index] = z_coords[(argmin_dist_lb[index],) + index]
min_dist_ub[index] = z_coords[(argmin_dist_ub[index],) + index]
if decreasing:
lower_bound = np.where(min_dist_lb > interp_lb, argmin_dist_lb, argmin_dist_lb - 1).min()
upper_bound = np.where(min_dist_ub < interp_ub, argmin_dist_ub, argmin_dist_ub + 1).max() + 1
else:
lower_bound = np.where(min_dist_lb < interp_lb, argmin_dist_lb, argmin_dist_lb - 1).min()
upper_bound = np.where(min_dist_ub > interp_ub, argmin_dist_ub, argmin_dist_ub + 1).max() + 1
lower_bound = max(lower_bound, 1) - 1
upper_bound = min(upper_bound, z_coords.shape[0] - 1) + 1
return self._buffer_bounds(lower_bound, upper_bound, z_coords.shape[0])
def _find_x_bounds(self, interp_lb, interp_ub):
self._axes_check()
return self._find_xy_bounds(self._axes['x'], interp_lb, interp_ub)
def _find_y_bounds(self, interp_lb, interp_ub):
self._axes_check()
return self._find_xy_bounds(self._axes['y'], interp_lb, interp_ub)
def _find_xy_bounds(self, axis, interp_lb, interp_ub):
argmin_dist_lb = np.argmin(np.abs(axis - interp_lb))
argmin_dist_ub = np.argmin(np.abs(axis - interp_ub))
min_dist_lb = axis[argmin_dist_lb]
min_dist_ub = axis[argmin_dist_ub]
lower_bound = argmin_dist_lb if min_dist_lb <= interp_lb else argmin_dist_lb - 1
upper_bound = argmin_dist_ub + 1 if min_dist_ub <= interp_ub else argmin_dist_ub + 2
lower_bound = max(lower_bound, 1) - 1
upper_bound = min(upper_bound, axis.shape[0] - 1) + 1
return self._buffer_bounds(lower_bound, upper_bound, axis.shape[0])
[docs]class ZInterpolator(Interpolator):
"""
Interpolate to specific heights (e.g. 500 m AGL, 6 km MSL) or pressures (e.g. 500 hPa).
Args:
z_coord (float): The height or pressure at which to do the interpolation. Units must match the units of
`axes['z']`.
axes (dict): A dictionary that must contain 'x', 'y', and 'z' keys corresponding to the x, y, and z axes. x and
y must be one-dimensional, z must be three-dimensional. Optional.
coords (str): String specifying which coordinate system to do the interpolation in. Valid values are 'hght' for
height coordinates (default) or 'pres' for pressure coordinates. In the case of pressure coordinates,
`axes['z']` is interpreted as pressure instead of height.
wrap (bool): What to do with points that are out of the domain. `wrap` == True means to assign the nearest point
in the domain, `wrap` == False means to assign NaN to those points.
buffer (bool): Wether to add a one grid point buffer around the data when it's loaded (usually used when you
need a derivative across the plane of the data).
agl (bool): Wether or not the heights are above ground level (`agl` == True) or above mean sea level (`agl` ==
False).
"""
def __init__(self, z_coord, axes=None, coords='hght', wrap=False, buffer=False, agl=False):
"""
Constructor for the ZInterpolator class
"""
self._z_coord = z_coord
self._coords = coords
super(ZInterpolator, self).__init__(wrap, buffer, agl)
if axes is not None:
self.set_axes(axes)
return
[docs] def get_bounds(self):
"""
Get the grid bounds of the data that needs to be loaded in from the data file. This tries to be as small as
possible.
Returns:
tuple: tuple of slice objects (directly passable to numpy arrays or PyNIO variable objects) in Z, Y, X
order.
"""
if not hasattr(self, '_bounds'):
z_lb, z_ub = self._find_z_bounds(self._z_coord, self._z_coord, decreasing=(self._coords == 'pres'))
bounds = (slice(z_lb, z_ub), slice(None), slice(None))
else:
bounds = self._bounds
return bounds
[docs] def __call__(self, data):
"""
Do the interpolation.
Args:
data: The grid to interpolate. Note: this method assumes that you've already subsetted the grid according to
the bounds returned by `getBounds()`. It also assumes that `data` is three-dimensional, even if one or
more of those dimensions has length 1.
Returns:
Interpolated data as a Numpy array.
"""
bnds = self.get_bounds()
z_axis = self._axes['z'][bnds].astype(dt)
return libinterp.interp_height(data.astype(dt), z_axis, self._z_coord)
[docs]class SigmaInterpolator(Interpolator):
"""
"Interpolate" to sigma levels in the model (that is: just pull out the relevant level, perhaps with a buffer).
Args:
sigma (int): The model level to use.
axes (dict): A dictionary that must contain 'x', 'y', and 'z' keys corresponding to the x, y, and z axes. x and
y must be one-dimensional, z must be three-dimensional. Optional.
buffer (bool): Wether to add a one grid point buffer around the data when it's loaded (usually used when you
need a derivative across the plane of the data).
"""
def __init__(self, sigma, axes=None, buffer=False):
"""
Constructor for the SigmaInterpolator class
"""
self._sigma = sigma
super(SigmaInterpolator, self).__init__(False, buffer, False)
if axes is not None:
self.set_axes(axes)
return
[docs] def get_bounds(self):
"""
Get the grid bounds of the data that needs to be loaded in from the data file. This tries to be as small as
possible.
Returns:
tuple: tuple of slice objects (directly passable to numpy arrays or PyNIO variable objects) in Z, Y, X
order.
"""
if not hasattr(self, '_bounds'):
bounds = (slice(self._sigma, self._sigma + 1), slice(None), slice(None))
else:
bounds = self._bounds
return bounds
[docs] def __call__(self, data):
"""
Do the interpolation.
Args:
data: The grid to interpolate. Note: this method assumes that you've already subsetted the grid according to
the bounds returned by `getBounds()`. It also assumes that `data` is three-dimensional, even if one or
more of those dimensions has length 1.
Returns:
Interpolated data as a Numpy array.
"""
return data[0]
[docs]class PointInterpolator(Interpolator):
"""
Interpolate to points.
Args:
points (dict): A dictionary containing all the points to interpolate to. The dictionary must contain 'x', 'y',
and 'z' keys pointing to numpy arrays or single floats containing the respective coordinates.
axes (dict): A dictionary that must contain 'x', 'y', and 'z' keys corresponding to the x, y, and z axes. x and
y must be one-dimensional, z must be three-dimensional. Optional.
coords (str): String specifying which coordinate system to do the interpolation in. Valid values are 'hght' for
height coordinates (default) or 'pres' for pressure coordinates. In the case of pressure coordinates,
`axes['z']` is interpreted as pressure instead of height.
wrap (bool): What to do with points that are out of the domain. `wrap` == True means to assign the nearest point
in the domain, `wrap` == False means to assign NaN to those points.
buffer (bool): Wether to add a one grid point buffer around the data when it's loaded (usually used when you
need a derivative across the plane of the data).
agl (bool): Wether or not the heights are above ground level (`agl` == True) or above mean sea level (`agl` ==
False).
"""
def __init__(self, points, axes=None, coords='hght', wrap=False, buffer=False, agl=False):
"""
Constructor for the PointInterpolator class
"""
self._points = points
self._coords = coords
self._singleton = False
super(PointInterpolator, self).__init__(wrap, buffer, agl)
if type(self._points['x']) not in [ np.ndarray ]:
self._singleton = True
for axis in ['x', 'y', 'z']:
self._points[axis] = np.array([ self._points[axis] ])
if axes is not None:
self.set_axes(axes)
return
[docs] def get_bounds(self):
"""
Get the grid bounds of the data that needs to be loaded in from the data file. This tries to be as small as
possible.
Returns:
tuple: tuple of slice objects (directly passable to numpy arrays or PyNIO variable objects) in Z, Y, X
order.
"""
if not hasattr(self, '_bounds'):
decreasing = self._coords == 'pres'
z_lb, z_ub = self._find_z_bounds(self._points['z'].min(), self._points['z'].max(), decreasing=decreasing)
y_lb, y_ub = self._find_y_bounds(self._points['y'].min(), self._points['y'].max())
x_lb, x_ub = self._find_x_bounds(self._points['x'].min(), self._points['x'].max())
bounds = (slice(z_lb, z_ub), slice(y_lb, y_ub), slice(x_lb, x_ub))
else:
bounds = self._bounds
return bounds
[docs] def __call__(self, data):
"""
Do the interpolation.
Args:
data: The grid to interpolate. Note: this method assumes that you've already subsetted the grid according to
the bounds returned by `getBounds()`. It also assumes that `data` is three-dimensional, even if one or
more of those dimensions has length 1.
Returns:
Interpolated as a Numpy array.
"""
z_bnds, y_bnds, x_bnds = self.get_bounds()
points = libinterp.interp_points(data.astype(dt),
(self._axes['z'][z_bnds, y_bnds, x_bnds].astype(dt),
self._axes['y'][y_bnds].astype(dt),
self._axes['x'][x_bnds].astype(dt)),
tuple(self._points[ax].astype(dt) for ax in ['z', 'y', 'x'])
)
if self._singleton:
points = points[0]
return points
[docs]class SoundingInterpolator(Interpolator):
"""
Interpolate to single columns.
Args:
point (dict): A dictionary containing the point to interpolate to. The dictionary must contain 'x' and 'y' keys
pointing the respective coordinates in meters of the point.
axes (dict): A dictionary that must contain 'x', 'y', and 'z' keys corresponding to the x, y, and z axes. x and
y must be one-dimensional, z must be three-dimensional. Optional.
k_min (int): Minimum sigma level to use in the cross section. Defaults to None (the bottom of the domain).
k_max (int): Maximum sigma level to use in the cross section. Defaults to None (the top of the domain).
buffer (bool): Wether to add a one grid point buffer around the data when it's loaded (usually used when you
need a derivative across the plane of the data).
"""
def __init__(self, point, axes=None, k_min=None, k_max=None, buffer=False):
"""
Constructor for the SoundingInterpolator class.
"""
self._point = point
self._k_min = k_min
self._k_max = k_max
super(SoundingInterpolator, self).__init__(False, buffer, False)
if axes is not None:
self.set_axes(axes)
return
[docs] def get_bounds(self):
"""
Get the grid bounds of the data that needs to be loaded in from the data file. This tries to be as small as
possible.
Returns:
tuple: tuple of slice objects (directly passable to numpy arrays or PyNIO variable objects) in Z, Y, X
order.
"""
if not hasattr(self, '_bounds'):
y_lb, y_ub = self._find_y_bounds(self._point['y'], self._point['y'])
x_lb, x_ub = self._find_x_bounds(self._point['x'], self._point['x'])
bounds = (slice(self._k_min, self._k_max), slice(y_lb, y_ub), slice(x_lb, x_ub))
else:
bounds = self._bounds
return bounds
[docs] def __call__(self, data):
"""
Do the interpolation.
Args:
data: The grid to interpolate. Note: this method assumes that you've already subsetted the grid according to
the bounds returned by `getBounds()`. It also assumes that `data` is three-dimensional, even if one or
more of those dimensions has length 1.
Returns:
Interpolated data as a Numpy array.
"""
z_bnds, y_bnds, x_bnds = self.get_bounds()
return libinterp.interp_column(data.astype(dt),
(self._axes['y'][y_bnds].astype(dt), self._axes['x'][x_bnds].astype(dt)),
tuple(self._point[ax] for ax in ['y', 'x'])
)
[docs]class XSectInterpolator(Interpolator):
"""
Interpolate to arbitrary vertical cross sections.
Args:
start_point (dict): Dictionary containing the start point for the cross section. The dictionary should contain
'x' and 'y' keys pointing to the respective coordinates of the start point in meters.
end_point (dict): As in `start_point`, but for the end of the cross section. Because the cross section is given
the same grid spacing as the input grid, the cross section may not end exactly on `end_point`. In that case,
`end_point` will be between the last two columns on the cross section.
axes (dict): A dictionary that must contain 'x', 'y', and 'z' keys corresponding to the x, y, and z axes. x and
y must be one-dimensional, z must be three-dimensional. Optional.
k_min (int): Minimum sigma level to use in the cross section. Defaults to None (the bottom of the domain).
k_max (int): Maximum sigma level to use in the cross section. Defaults to None (the top of the domain).
buffer (bool): Wether to add a one grid point buffer around the data when it's loaded (usually used when you
need a derivative across the plane of the data).
"""
def __init__(self, start_point, end_point, axes=None, k_min=None, k_max=None, buffer=False):
"""
Constructor for the XSectInterpolator class
"""
self._start_pt = start_point
self._end_pt = end_point
self._k_min = k_min
self._k_max = k_max
super(XSectInterpolator, self).__init__(False, buffer, False)
if axes is not None:
self.set_axes(axes)
return
[docs] def get_bounds(self):
"""
Get the grid bounds of the data that needs to be loaded in from the data file. This tries to be as small as
possible.
Returns:
tuple: tuple of slice objects (directly passable to numpy arrays or PyNIO variable objects) in Z, Y, X
order.
"""
if not hasattr(self, '_bounds'):
y_pts = [pt['y'] for pt in [self._start_pt, self._end_pt]]
x_pts = [pt['x'] for pt in [self._start_pt, self._end_pt]]
y_lb, y_ub = self._find_y_bounds(min(y_pts), max(y_pts))
x_lb, x_ub = self._find_x_bounds(min(x_pts), max(x_pts))
bounds = (slice(self._k_min, self._k_max), slice(y_lb, y_ub), slice(x_lb, x_ub))
else:
bounds = self._bounds
return bounds
[docs] def get_xsect_coords(self):
"""
Get the x-y coordinates in meters of each column of the cross section.
Returns:
A tuple of 1D numpy arrays (xs, ys) giving the horizontal locations of the columns in the cross section in
meters.
"""
if hasattr(self, '_xsect_xs') and hasattr(self, '_xsect_ys'):
return self._xsect_xs, self._xsect_ys
else:
raise AttributeError("Do the interpolation in XSectInterpolator before trying to get the coordinates.")
[docs] def __call__(self, data):
"""
Do the interpolation.
Args:
data: The grid to interpolate. Note: this method assumes that you've already subsetted the grid according to
the bounds returned by `getBounds()`. It also assumes that `data` is three-dimensional, even if one or
more of those dimensions has length 1.
Returns:
Interpolated data as a Numpy array.
"""
z_bnds, y_bnds, x_bnds = self.get_bounds()
self._xsect_ys, self._xsect_xs, xsect = libinterp.interp_xsect(data.astype(dt),
(self._axes['y'][y_bnds].astype(dt), self._axes['x'][x_bnds].astype(dt)),
tuple(self._start_pt[ax] for ax in ['y', 'x']),
tuple(self._end_pt[ax] for ax in ['y', 'x'])
)
return xsect
[docs]class NullInterpolator(Interpolator):
"""
A null interpolator (that is: don't do any interpolation). This is useful for functions that expect an interpolator,
but the user doesn't want to do any interpolation.
"""
def __init__(self):
"""
Constructor for the NullInterpolator class
"""
super(NullInterpolator, self).__init__(False, False, False)
return
[docs] def get_bounds(self):
"""
Get the grid bounds of the data that needs to be loaded in from the data file. This tries to be as small as
possible.
Returns:
tuple: tuple of slice objects (directly passable to numpy arrays or PyNIO variable objects) in Z, Y, X
order.
"""
bounds = (Ellipsis, slice(None), slice(None))
return bounds
[docs] def __call__(self, data):
"""
Do the interpolation.
Args:
data: The grid to interpolate. Note: this method assumes that you've already subsetted the grid according to
the bounds returned by `getBounds()`. It also assumes that `data` is three-dimensional, even if one or
more of those dimensions has length 1.
Returns:
Interpolated data as a Numpy array.
"""
return data
[docs]def interp_height(data, z_coord, height):
"""
Interpolate 3-dimensional data to a given height
Args:
data (np.array): A 3-dimensional array (NZ x NY x NX) of data to interpolate
z_coord (np.array): A 3-dimensional array containing the height of each point in the data grid. Units are
assumed to be meters.
height (float): The height to interpolate to. Units are assumed to be meters.
Returns:
A 2-dimensional array of interpolated data.
"""
interp = ZInterpolator(z_coord)
bnds = interp.get_bounds()
return interp(data[bnds])
[docs]def interp_points(data, axes, points):
"""
Interpolate 3-dimensional data to one or more points in space
Args:
data (np.array): A 3-dimensional array (NZ x NY x NX) of data to interpolate
axes (dict): A dictionary containing the coordinate axes of the array. It must have 'x', 'y', and 'z' keys,
whose values must be the x, y, and z coordinates, respectively, of each point in the grid. The x and y
coordinate arrays must be 1-dimensional, and the z coordinate array must be 3-dimensional. Units are assumed
to be meters.
points (dict): A dictionary containing the points to interpolate to. It must have 'x', 'y', and 'z' keys, whose
values must be the x, y, and z coordinates, respectively, of the points to interpolate to. The coordinates
can be either single floats or 1-dimensional arrays. Units are assumed to be meters.
Returns:
If the coordinates were specified as 1-dimensional arrays, returns a 1-dimensional array of the interpolated
data. The order of this array is the same as the order of the points. Otherwise, returns a single float of
interpolated data.
"""
interp = PointInterpolator(points, axes=axes)
bnds = interp.get_bounds()
return interp(data[bnds])
[docs]def interp_column(data, axes, column, k_min=None, k_max=None):
"""
Interpolate 3-dimensional data to a column
Args:
data (np.array): A 3-dimensional array (NZ x NY x NX) of data to interpolate
axes (dict): A dictionary containing the coordinate axes of the array. It must have 'x', 'y', and 'z' keys,
whose values must be the x, y, and z coordinates, respectively, of each point in the grid. The x and y
coordinate arrays must be 1-dimensional, and the z coordinate array must be 3-dimensional. Units are assumed
to be meters.
column (dict): A dictionary containing the column to interpolate to. It must have 'x' and 'y' keys, whose values
must be the x and y coordinates, respectively, of the column to interpolate to. Units are assumed to be
meters.
k_min (int): Lowest sigma level to include in the column
k_max (int): Highest sigma level to include in the column
Returns:
A 1-dimensional array of interpolated data
"""
interp = SoundingInterpolator(column, axes=axes, k_min=k_min, k_max=k_max)
bnds = interp.get_bounds()
return interp(data[bnds])
[docs]def interp_xsect(data, axes, start_point, end_point, k_min=None, k_max=None):
"""
Interpolate 3-dimensional data to a vertical cross section in an arbitrary direction. The interpolation creates a
cross section grid that has the same horizontal grid interval as the source data. The user is not expected to
provide an endpoint that occurs exactly at a grid interval. Rather, the cross section extends no more than 1 grid
interval beyond the end point to provide an integer number of grid intervals in the interpolated data.
Args:
data (np.array): A 3-dimensional array (NZ x NY x NX) of data to interpolate
axes (dict): A dictionary containing the coordinate axes of the array. It must have 'x', 'y', and 'z' keys,
whose values must be the x, y, and z coordinates, respectively, of each point in the grid. The x and y
coordinate arrays must be 1-dimensional, and the z coordinate array must be 3-dimensional. Units are assumed
to be meters.
start_point (dict): A dictionary containing the coordinates of start column for the cross section. It must have
'x' and 'y' keys, whose values must be the x and y coordinates, respectively, of start column. Units are
assumed to be meters.
end_point (dict): As in the start_point argument, but for the ending point. The cross section will not
necessarily stop exactly at this column, but this column will be contained in the last grid interval.
k_min (int): Lowest sigma level to include in the cross section
k_max (int): Highest sigma level to include in the cross section
Returns:
A tuple containing the 2-dimensional grid of interpolated data, as well as the x and y coordinates of the cross
section in meters.
"""
interp = XSectInterpolator(start_point, end_point, axes=axes, k_min=k_min, k_max=k_max)
bnds = interp.get_bounds()
interp_data = interp(data[bnds])
xsect_xs, xsect_ys = interp.get_xsect_coords()
return interp_data, xsect_xs, xsect_ys