Source code for pycaps.interp.interp


import numpy as np

import libinterp

from functools import wraps

dt = np.float32


[docs]class abstract(object): def __init__(self, func, custom_name=None): self._func = func self._name = custom_name if custom_name is not None else func.__name__ wraps(func)(self) return def __call__(self, *args, **kwargs): raise NotImplementedError("Function '%s' is abstract and must be implemented in a subclass." % self._name)
[docs]class Interpolator(object): """ Base class for the Interpolator objects. Needs to be re-implemented in a subclass. Args: wrap (bool): What to do with points that are out of the domain. `wrap` == True means to assign the nearest point in the domain, `wrap` == False means to assign NaN to those points. buffer (bool): Wether to add a one grid point buffer around the data when it's loaded (usually used when you need a derivative across the plane of the data). agl (bool): Wether or not the heights are above ground level (`agl` == True) or above mean sea level (`agl` == False). """ def __init__(self, wrap, buffer, agl): """ Constructor for the base class. """ self._wrap = wrap self._buffer = buffer self._agl = agl self._axes = None # self._bounds = None return @abstract
[docs] def get_bounds(self): return
[docs] def set_axes(self, axes): """ Set the axes used by this interpolator. Args: axes (dict): A dictionary that must contain 'x', 'y', and 'z' keys corresponding to the x, y, and z axes. x and y must be one-dimensional, z must be three-dimensional. """ self._axes = axes self._bounds = self.get_bounds() return
[docs] def get_axes(self): """ Return the axes used by this interpolator. """ return self._axes
[docs] def is_buffered(self): """ Return whether or not the interpolator needs to keep a one-grid point buffer. """ return self._buffer is not None
[docs] def is_agl(self): """ Return whether or not the heights in this interpolator refer to AGL (True) or MSL (False) """ return self._agl
@abstract def __call__(self, data): return def _axes_check(self): if not hasattr(self, '_axes'): raise AttributeError("Need to set the axes using setAxes() first!") return def _buffer_bounds(self, lower_bound, upper_bound, axis_len): if self._buffer: if lower_bound == 0 and upper_bound == axis_len: pass elif lower_bound == 0: upper_bound = upper_bound + 1 elif upper_bound == axis_len: lower_bound = lower_bound - 1 else: lower_bound = lower_bound - 1 upper_bound = upper_bound + 1 return lower_bound, upper_bound def _find_z_bounds(self, interp_lb, interp_ub, decreasing=False): self._axes_check() z_coords = self._axes['z'] argmin_dist_lb = np.argmin(np.abs(z_coords[:, 1:-1, 1:-1] - interp_lb), axis=0) argmin_dist_ub = np.argmin(np.abs(z_coords[:, 1:-1, 1:-1] - interp_ub), axis=0) min_dist_lb = np.empty(argmin_dist_lb.shape) min_dist_ub = np.empty(argmin_dist_ub.shape) for index in np.ndindex(*min_dist_lb.shape): min_dist_lb[index] = z_coords[(argmin_dist_lb[index],) + index] min_dist_ub[index] = z_coords[(argmin_dist_ub[index],) + index] if decreasing: lower_bound = np.where(min_dist_lb > interp_lb, argmin_dist_lb, argmin_dist_lb - 1).min() upper_bound = np.where(min_dist_ub < interp_ub, argmin_dist_ub, argmin_dist_ub + 1).max() + 1 else: lower_bound = np.where(min_dist_lb < interp_lb, argmin_dist_lb, argmin_dist_lb - 1).min() upper_bound = np.where(min_dist_ub > interp_ub, argmin_dist_ub, argmin_dist_ub + 1).max() + 1 lower_bound = max(lower_bound, 1) - 1 upper_bound = min(upper_bound, z_coords.shape[0] - 1) + 1 return self._buffer_bounds(lower_bound, upper_bound, z_coords.shape[0]) def _find_x_bounds(self, interp_lb, interp_ub): self._axes_check() return self._find_xy_bounds(self._axes['x'], interp_lb, interp_ub) def _find_y_bounds(self, interp_lb, interp_ub): self._axes_check() return self._find_xy_bounds(self._axes['y'], interp_lb, interp_ub) def _find_xy_bounds(self, axis, interp_lb, interp_ub): argmin_dist_lb = np.argmin(np.abs(axis - interp_lb)) argmin_dist_ub = np.argmin(np.abs(axis - interp_ub)) min_dist_lb = axis[argmin_dist_lb] min_dist_ub = axis[argmin_dist_ub] lower_bound = argmin_dist_lb if min_dist_lb <= interp_lb else argmin_dist_lb - 1 upper_bound = argmin_dist_ub + 1 if min_dist_ub <= interp_ub else argmin_dist_ub + 2 lower_bound = max(lower_bound, 1) - 1 upper_bound = min(upper_bound, axis.shape[0] - 1) + 1 return self._buffer_bounds(lower_bound, upper_bound, axis.shape[0])
[docs]class ZInterpolator(Interpolator): """ Interpolate to specific heights (e.g. 500 m AGL, 6 km MSL) or pressures (e.g. 500 hPa). Args: z_coord (float): The height or pressure at which to do the interpolation. Units must match the units of `axes['z']`. axes (dict): A dictionary that must contain 'x', 'y', and 'z' keys corresponding to the x, y, and z axes. x and y must be one-dimensional, z must be three-dimensional. Optional. coords (str): String specifying which coordinate system to do the interpolation in. Valid values are 'hght' for height coordinates (default) or 'pres' for pressure coordinates. In the case of pressure coordinates, `axes['z']` is interpreted as pressure instead of height. wrap (bool): What to do with points that are out of the domain. `wrap` == True means to assign the nearest point in the domain, `wrap` == False means to assign NaN to those points. buffer (bool): Wether to add a one grid point buffer around the data when it's loaded (usually used when you need a derivative across the plane of the data). agl (bool): Wether or not the heights are above ground level (`agl` == True) or above mean sea level (`agl` == False). """ def __init__(self, z_coord, axes=None, coords='hght', wrap=False, buffer=False, agl=False): """ Constructor for the ZInterpolator class """ self._z_coord = z_coord self._coords = coords super(ZInterpolator, self).__init__(wrap, buffer, agl) if axes is not None: self.set_axes(axes) return
[docs] def get_bounds(self): """ Get the grid bounds of the data that needs to be loaded in from the data file. This tries to be as small as possible. Returns: tuple: tuple of slice objects (directly passable to numpy arrays or PyNIO variable objects) in Z, Y, X order. """ if not hasattr(self, '_bounds'): z_lb, z_ub = self._find_z_bounds(self._z_coord, self._z_coord, decreasing=(self._coords == 'pres')) bounds = (slice(z_lb, z_ub), slice(None), slice(None)) else: bounds = self._bounds return bounds
[docs] def __call__(self, data): """ Do the interpolation. Args: data: The grid to interpolate. Note: this method assumes that you've already subsetted the grid according to the bounds returned by `getBounds()`. It also assumes that `data` is three-dimensional, even if one or more of those dimensions has length 1. Returns: Interpolated data as a Numpy array. """ bnds = self.get_bounds() z_axis = self._axes['z'][bnds].astype(dt) return libinterp.interp_height(data.astype(dt), z_axis, self._z_coord)
[docs]class SigmaInterpolator(Interpolator): """ "Interpolate" to sigma levels in the model (that is: just pull out the relevant level, perhaps with a buffer). Args: sigma (int): The model level to use. axes (dict): A dictionary that must contain 'x', 'y', and 'z' keys corresponding to the x, y, and z axes. x and y must be one-dimensional, z must be three-dimensional. Optional. buffer (bool): Wether to add a one grid point buffer around the data when it's loaded (usually used when you need a derivative across the plane of the data). """ def __init__(self, sigma, axes=None, buffer=False): """ Constructor for the SigmaInterpolator class """ self._sigma = sigma super(SigmaInterpolator, self).__init__(False, buffer, False) if axes is not None: self.set_axes(axes) return
[docs] def get_bounds(self): """ Get the grid bounds of the data that needs to be loaded in from the data file. This tries to be as small as possible. Returns: tuple: tuple of slice objects (directly passable to numpy arrays or PyNIO variable objects) in Z, Y, X order. """ if not hasattr(self, '_bounds'): bounds = (slice(self._sigma, self._sigma + 1), slice(None), slice(None)) else: bounds = self._bounds return bounds
[docs] def __call__(self, data): """ Do the interpolation. Args: data: The grid to interpolate. Note: this method assumes that you've already subsetted the grid according to the bounds returned by `getBounds()`. It also assumes that `data` is three-dimensional, even if one or more of those dimensions has length 1. Returns: Interpolated data as a Numpy array. """ return data[0]
[docs]class PointInterpolator(Interpolator): """ Interpolate to points. Args: points (dict): A dictionary containing all the points to interpolate to. The dictionary must contain 'x', 'y', and 'z' keys pointing to numpy arrays or single floats containing the respective coordinates. axes (dict): A dictionary that must contain 'x', 'y', and 'z' keys corresponding to the x, y, and z axes. x and y must be one-dimensional, z must be three-dimensional. Optional. coords (str): String specifying which coordinate system to do the interpolation in. Valid values are 'hght' for height coordinates (default) or 'pres' for pressure coordinates. In the case of pressure coordinates, `axes['z']` is interpreted as pressure instead of height. wrap (bool): What to do with points that are out of the domain. `wrap` == True means to assign the nearest point in the domain, `wrap` == False means to assign NaN to those points. buffer (bool): Wether to add a one grid point buffer around the data when it's loaded (usually used when you need a derivative across the plane of the data). agl (bool): Wether or not the heights are above ground level (`agl` == True) or above mean sea level (`agl` == False). """ def __init__(self, points, axes=None, coords='hght', wrap=False, buffer=False, agl=False): """ Constructor for the PointInterpolator class """ self._points = points self._coords = coords self._singleton = False super(PointInterpolator, self).__init__(wrap, buffer, agl) if type(self._points['x']) not in [ np.ndarray ]: self._singleton = True for axis in ['x', 'y', 'z']: self._points[axis] = np.array([ self._points[axis] ]) if axes is not None: self.set_axes(axes) return
[docs] def get_bounds(self): """ Get the grid bounds of the data that needs to be loaded in from the data file. This tries to be as small as possible. Returns: tuple: tuple of slice objects (directly passable to numpy arrays or PyNIO variable objects) in Z, Y, X order. """ if not hasattr(self, '_bounds'): decreasing = self._coords == 'pres' z_lb, z_ub = self._find_z_bounds(self._points['z'].min(), self._points['z'].max(), decreasing=decreasing) y_lb, y_ub = self._find_y_bounds(self._points['y'].min(), self._points['y'].max()) x_lb, x_ub = self._find_x_bounds(self._points['x'].min(), self._points['x'].max()) bounds = (slice(z_lb, z_ub), slice(y_lb, y_ub), slice(x_lb, x_ub)) else: bounds = self._bounds return bounds
[docs] def __call__(self, data): """ Do the interpolation. Args: data: The grid to interpolate. Note: this method assumes that you've already subsetted the grid according to the bounds returned by `getBounds()`. It also assumes that `data` is three-dimensional, even if one or more of those dimensions has length 1. Returns: Interpolated as a Numpy array. """ z_bnds, y_bnds, x_bnds = self.get_bounds() points = libinterp.interp_points(data.astype(dt), (self._axes['z'][z_bnds, y_bnds, x_bnds].astype(dt), self._axes['y'][y_bnds].astype(dt), self._axes['x'][x_bnds].astype(dt)), tuple(self._points[ax].astype(dt) for ax in ['z', 'y', 'x']) ) if self._singleton: points = points[0] return points
[docs]class SoundingInterpolator(Interpolator): """ Interpolate to single columns. Args: point (dict): A dictionary containing the point to interpolate to. The dictionary must contain 'x' and 'y' keys pointing the respective coordinates in meters of the point. axes (dict): A dictionary that must contain 'x', 'y', and 'z' keys corresponding to the x, y, and z axes. x and y must be one-dimensional, z must be three-dimensional. Optional. k_min (int): Minimum sigma level to use in the cross section. Defaults to None (the bottom of the domain). k_max (int): Maximum sigma level to use in the cross section. Defaults to None (the top of the domain). buffer (bool): Wether to add a one grid point buffer around the data when it's loaded (usually used when you need a derivative across the plane of the data). """ def __init__(self, point, axes=None, k_min=None, k_max=None, buffer=False): """ Constructor for the SoundingInterpolator class. """ self._point = point self._k_min = k_min self._k_max = k_max super(SoundingInterpolator, self).__init__(False, buffer, False) if axes is not None: self.set_axes(axes) return
[docs] def get_bounds(self): """ Get the grid bounds of the data that needs to be loaded in from the data file. This tries to be as small as possible. Returns: tuple: tuple of slice objects (directly passable to numpy arrays or PyNIO variable objects) in Z, Y, X order. """ if not hasattr(self, '_bounds'): y_lb, y_ub = self._find_y_bounds(self._point['y'], self._point['y']) x_lb, x_ub = self._find_x_bounds(self._point['x'], self._point['x']) bounds = (slice(self._k_min, self._k_max), slice(y_lb, y_ub), slice(x_lb, x_ub)) else: bounds = self._bounds return bounds
[docs] def __call__(self, data): """ Do the interpolation. Args: data: The grid to interpolate. Note: this method assumes that you've already subsetted the grid according to the bounds returned by `getBounds()`. It also assumes that `data` is three-dimensional, even if one or more of those dimensions has length 1. Returns: Interpolated data as a Numpy array. """ z_bnds, y_bnds, x_bnds = self.get_bounds() return libinterp.interp_column(data.astype(dt), (self._axes['y'][y_bnds].astype(dt), self._axes['x'][x_bnds].astype(dt)), tuple(self._point[ax] for ax in ['y', 'x']) )
[docs]class XSectInterpolator(Interpolator): """ Interpolate to arbitrary vertical cross sections. Args: start_point (dict): Dictionary containing the start point for the cross section. The dictionary should contain 'x' and 'y' keys pointing to the respective coordinates of the start point in meters. end_point (dict): As in `start_point`, but for the end of the cross section. Because the cross section is given the same grid spacing as the input grid, the cross section may not end exactly on `end_point`. In that case, `end_point` will be between the last two columns on the cross section. axes (dict): A dictionary that must contain 'x', 'y', and 'z' keys corresponding to the x, y, and z axes. x and y must be one-dimensional, z must be three-dimensional. Optional. k_min (int): Minimum sigma level to use in the cross section. Defaults to None (the bottom of the domain). k_max (int): Maximum sigma level to use in the cross section. Defaults to None (the top of the domain). buffer (bool): Wether to add a one grid point buffer around the data when it's loaded (usually used when you need a derivative across the plane of the data). """ def __init__(self, start_point, end_point, axes=None, k_min=None, k_max=None, buffer=False): """ Constructor for the XSectInterpolator class """ self._start_pt = start_point self._end_pt = end_point self._k_min = k_min self._k_max = k_max super(XSectInterpolator, self).__init__(False, buffer, False) if axes is not None: self.set_axes(axes) return
[docs] def get_bounds(self): """ Get the grid bounds of the data that needs to be loaded in from the data file. This tries to be as small as possible. Returns: tuple: tuple of slice objects (directly passable to numpy arrays or PyNIO variable objects) in Z, Y, X order. """ if not hasattr(self, '_bounds'): y_pts = [pt['y'] for pt in [self._start_pt, self._end_pt]] x_pts = [pt['x'] for pt in [self._start_pt, self._end_pt]] y_lb, y_ub = self._find_y_bounds(min(y_pts), max(y_pts)) x_lb, x_ub = self._find_x_bounds(min(x_pts), max(x_pts)) bounds = (slice(self._k_min, self._k_max), slice(y_lb, y_ub), slice(x_lb, x_ub)) else: bounds = self._bounds return bounds
[docs] def get_xsect_coords(self): """ Get the x-y coordinates in meters of each column of the cross section. Returns: A tuple of 1D numpy arrays (xs, ys) giving the horizontal locations of the columns in the cross section in meters. """ if hasattr(self, '_xsect_xs') and hasattr(self, '_xsect_ys'): return self._xsect_xs, self._xsect_ys else: raise AttributeError("Do the interpolation in XSectInterpolator before trying to get the coordinates.")
[docs] def __call__(self, data): """ Do the interpolation. Args: data: The grid to interpolate. Note: this method assumes that you've already subsetted the grid according to the bounds returned by `getBounds()`. It also assumes that `data` is three-dimensional, even if one or more of those dimensions has length 1. Returns: Interpolated data as a Numpy array. """ z_bnds, y_bnds, x_bnds = self.get_bounds() self._xsect_ys, self._xsect_xs, xsect = libinterp.interp_xsect(data.astype(dt), (self._axes['y'][y_bnds].astype(dt), self._axes['x'][x_bnds].astype(dt)), tuple(self._start_pt[ax] for ax in ['y', 'x']), tuple(self._end_pt[ax] for ax in ['y', 'x']) ) return xsect
[docs]class NullInterpolator(Interpolator): """ A null interpolator (that is: don't do any interpolation). This is useful for functions that expect an interpolator, but the user doesn't want to do any interpolation. """ def __init__(self): """ Constructor for the NullInterpolator class """ super(NullInterpolator, self).__init__(False, False, False) return
[docs] def get_bounds(self): """ Get the grid bounds of the data that needs to be loaded in from the data file. This tries to be as small as possible. Returns: tuple: tuple of slice objects (directly passable to numpy arrays or PyNIO variable objects) in Z, Y, X order. """ bounds = (Ellipsis, slice(None), slice(None)) return bounds
[docs] def __call__(self, data): """ Do the interpolation. Args: data: The grid to interpolate. Note: this method assumes that you've already subsetted the grid according to the bounds returned by `getBounds()`. It also assumes that `data` is three-dimensional, even if one or more of those dimensions has length 1. Returns: Interpolated data as a Numpy array. """ return data
[docs]def interp_height(data, z_coord, height): """ Interpolate 3-dimensional data to a given height Args: data (np.array): A 3-dimensional array (NZ x NY x NX) of data to interpolate z_coord (np.array): A 3-dimensional array containing the height of each point in the data grid. Units are assumed to be meters. height (float): The height to interpolate to. Units are assumed to be meters. Returns: A 2-dimensional array of interpolated data. """ interp = ZInterpolator(z_coord) bnds = interp.get_bounds() return interp(data[bnds])
[docs]def interp_points(data, axes, points): """ Interpolate 3-dimensional data to one or more points in space Args: data (np.array): A 3-dimensional array (NZ x NY x NX) of data to interpolate axes (dict): A dictionary containing the coordinate axes of the array. It must have 'x', 'y', and 'z' keys, whose values must be the x, y, and z coordinates, respectively, of each point in the grid. The x and y coordinate arrays must be 1-dimensional, and the z coordinate array must be 3-dimensional. Units are assumed to be meters. points (dict): A dictionary containing the points to interpolate to. It must have 'x', 'y', and 'z' keys, whose values must be the x, y, and z coordinates, respectively, of the points to interpolate to. The coordinates can be either single floats or 1-dimensional arrays. Units are assumed to be meters. Returns: If the coordinates were specified as 1-dimensional arrays, returns a 1-dimensional array of the interpolated data. The order of this array is the same as the order of the points. Otherwise, returns a single float of interpolated data. """ interp = PointInterpolator(points, axes=axes) bnds = interp.get_bounds() return interp(data[bnds])
[docs]def interp_column(data, axes, column, k_min=None, k_max=None): """ Interpolate 3-dimensional data to a column Args: data (np.array): A 3-dimensional array (NZ x NY x NX) of data to interpolate axes (dict): A dictionary containing the coordinate axes of the array. It must have 'x', 'y', and 'z' keys, whose values must be the x, y, and z coordinates, respectively, of each point in the grid. The x and y coordinate arrays must be 1-dimensional, and the z coordinate array must be 3-dimensional. Units are assumed to be meters. column (dict): A dictionary containing the column to interpolate to. It must have 'x' and 'y' keys, whose values must be the x and y coordinates, respectively, of the column to interpolate to. Units are assumed to be meters. k_min (int): Lowest sigma level to include in the column k_max (int): Highest sigma level to include in the column Returns: A 1-dimensional array of interpolated data """ interp = SoundingInterpolator(column, axes=axes, k_min=k_min, k_max=k_max) bnds = interp.get_bounds() return interp(data[bnds])
[docs]def interp_xsect(data, axes, start_point, end_point, k_min=None, k_max=None): """ Interpolate 3-dimensional data to a vertical cross section in an arbitrary direction. The interpolation creates a cross section grid that has the same horizontal grid interval as the source data. The user is not expected to provide an endpoint that occurs exactly at a grid interval. Rather, the cross section extends no more than 1 grid interval beyond the end point to provide an integer number of grid intervals in the interpolated data. Args: data (np.array): A 3-dimensional array (NZ x NY x NX) of data to interpolate axes (dict): A dictionary containing the coordinate axes of the array. It must have 'x', 'y', and 'z' keys, whose values must be the x, y, and z coordinates, respectively, of each point in the grid. The x and y coordinate arrays must be 1-dimensional, and the z coordinate array must be 3-dimensional. Units are assumed to be meters. start_point (dict): A dictionary containing the coordinates of start column for the cross section. It must have 'x' and 'y' keys, whose values must be the x and y coordinates, respectively, of start column. Units are assumed to be meters. end_point (dict): As in the start_point argument, but for the ending point. The cross section will not necessarily stop exactly at this column, but this column will be contained in the last grid interval. k_min (int): Lowest sigma level to include in the cross section k_max (int): Highest sigma level to include in the cross section Returns: A tuple containing the 2-dimensional grid of interpolated data, as well as the x and y coordinates of the cross section in meters. """ interp = XSectInterpolator(start_point, end_point, axes=axes, k_min=k_min, k_max=k_max) bnds = interp.get_bounds() interp_data = interp(data[bnds]) xsect_xs, xsect_ys = interp.get_xsect_coords() return interp_data, xsect_xs, xsect_ys