Source code for pycaps.io.dataload


import Nio as nio

import numpy as np

from pycaps.interp import NullInterpolator
from pycaps.derive import recarray_fm_dict, dict_fm_recarray
from pycaps.util import arps_decompress
from pycaps.util import ProgressBar
from pycaps.util import run_concurrently

import gc


def _make_z_coords_agl(z_coords):
    z_coords_agl = z_coords - z_coords[1][np.newaxis]
    return z_coords_agl, z_coords


def _build_ens_name(n_ens, t_ens, state='ena', single_var="", split=False):
    if split:
        dir_state = (state[:-1] if state == 'ena' else state).upper()
        dir = "%s%03d/" % (dir_state, n_ens + 1)
    else:
        dir = ""
    return "%s%s%03d.hdf%s%06d" % (dir, state, n_ens + 1, single_var, t_ens)


def _build_ens_grdbas(n_ens, state='ena', split=False):
    if split:
        dir_state = (state[:-1] if state == 'ena' else state).upper()
        dir = "%s%03d/" % (dir_state, n_ens + 1)
    else:
        dir = ""
    return "%s%s%03d.hdfgrdbas" % (dir, state, n_ens + 1)


def _build_run_name(base_name, t_ens, single_var=""):
    return "%s.hdf%s%06d" % (base_name, single_var, t_ens)


def _build_run_grdbas(base_name):
    return "%s.hdfgrdbas" % base_name


[docs]def get_axes(base_path, base_name, agl=True, z_coord_type="", split=None, fcst=False): """ Get the axes from a grdbas file. Args: base_path (str): Path to the grdbas file. base_name (str): Base name of the grdbas file (e.g. 'enf001'). agl (bool): Whether to return the z coordinates relative to ground level (True) or MSL (False) z_coord_type (str): Which set of z coordinates to use. "" refers to the atmospheric coordinates, and "soil" refers to the soil z coordinates. Default is "" (atmospheric coordinates). split (tuple): If the domain is split, then this is tuple contains the domain decomposition in (NX, NY). Returns: A dictionary containing the x and y coordinates in the 'x' and 'y' keys, and either 'z' and 'z_MSL' (if agl=True) or 'z' and 'z_AGL' (if agl=False) """ if type(base_name) == int: state = {True:'enf', False:'ena'}[fcst] grdbas_file = _build_ens_grdbas(base_name, split=(split is not None), state=state) elif type(base_name) == str: grdbas_file = _build_run_grdbas(base_name) if split is None: hdf_grdbas = nio.open_file("%s/%s" % (base_path, grdbas_file), mode='r', format='hdf') axes = dict((ax[:1], arps_decompress(hdf_grdbas.variables[ax])) for ax in ['x', 'y', "zp%s" % z_coord_type]) hdf_grdbas.close() else: vars = {} for (icore, jcore) in split: hdf_grdbas = nio.open_file("%s/%s_%03d%03d" % (base_path, grdbas_file, icore + 1, jcore + 1), mode='r', format='hdf') axes = dict((ax[:1], arps_decompress(hdf_grdbas.variables[ax])) for ax in ['x', 'y', "zp%s" % z_coord_type]) axes['x'] = axes['x'][np.newaxis, np.newaxis, :] axes['y'] = axes['y'][np.newaxis, :, np.newaxis] vars[(icore, jcore)] = axes hdf_grdbas.close() axes = _merge_cores(vars) axes['x'] = axes['x'][0, 0, :] axes['y'] = axes['y'][0, :, 0] for ax_dir, ax in axes.iteritems(): if ax_dir in ['x', 'y']: axes[ax_dir] = ax - ax[0] if agl: axes['z'], axes['z_MSL'] = _make_z_coords_agl(axes['z']) else: axes['z_AGL'], axes['z'] = _make_z_coords_agl(axes['z']) return axes
def _merge_cores(vars): meshed_cores = {} cores = vars.keys() xcores, ycores = [ np.unique(c) for c in zip(*cores) ] var_list = vars[cores[0]].keys() for var in var_list: nz_core, ny_core, nx_core = vars[cores[0]][var].shape full_ny = 1 if ny_core == 1 else (ny_core - 3) * len(ycores) + 3 full_nx = 1 if nx_core == 1 else (nx_core - 3) * len(xcores) + 3 meshed_cores[var] = np.empty((nz_core, full_ny, full_nx), dtype=np.float32) if full_ny == 1: for icore, xcore in enumerate(xcores): lower_bound_x = (nx_core - 3) * icore upper_bound_x = (nx_core - 3) * (icore + 1) + 3 meshed_cores[var][:, :, lower_bound_x:upper_bound_x] = vars[(xcore, ycores[0])][var] elif full_nx == 1: for jcore, ycore in enumerate(ycores): lower_bound_y = (ny_core - 3) * jcore upper_bound_y = (ny_core - 3) * (jcore + 1) + 3 meshed_cores[var][:, lower_bound_y:upper_bound_y, :] = vars[(xcores[0], ycore)][var] else: for icore, xcore in enumerate(xcores): for jcore, ycore in enumerate(ycores): lower_bound_x = (nx_core - 3) * icore upper_bound_x = (nx_core - 3) * (icore + 1) + 3 lower_bound_y = (ny_core - 3) * jcore upper_bound_y = (ny_core - 3) * (jcore + 1) + 3 meshed_cores[var][:, lower_bound_y:upper_bound_y, lower_bound_x:upper_bound_x] = \ vars[(xcore, ycore)][var] return meshed_cores def _load_data(hdf_var, bounds): if len(hdf_var.shape) == 2: bounds = bounds[1:] if hdf_var[:].max() == np.iinfo(np.int16).max: data = arps_decompress(hdf_var, dindex=bounds) else: data = hdf_var[bounds] return data def _load_vars(hdf, var_list, axes, interpolator, coords='hght'): """ Load a single hdf file """ vars = {} if coords == 'hght': bounds = interpolator.get_bounds() for var in var_list: dointerp = False if var in hdf.variables and var not in ['x', 'y', 'z']: if coords == 'pres' and var == 'p': vars[var] = _load_data(hdf.variables[var], slice(None)) axes['z'] = vars[var] interpolator.set_axes(axes) bounds = interpolator.get_bounds() vars[var] = vars[var][bounds] else: vars[var] = _load_data(hdf.variables[var], bounds) dointerp = True if var == 'z': if 'z_AGL' in axes: vars[var] = axes['z_AGL'][bounds] elif 'z_MSL' in axes: vars[var] = axes['z_MSL'][bounds] dointerp = True elif var == 'y': vars[var] = axes['y'][:] #(axes['y'][1:] + axes['y'][:-1]) / 2 elif var == 'x': vars[var] = axes['x'][:] #(axes['x'][1:] + axes['x'][:-1]) / 2 elif var == 'dx': vars[var] = axes['x'][1] - axes['x'][0] elif var == 'dy': vars[var] = axes['y'][1] - axes['y'][0] elif var == 'dz': zp = axes['z'] vars[var] = np.vstack((zp[1:] - zp[:-1], zp[-1:])) if dointerp and not interpolator.is_buffered(): vars[var] = interpolator(vars[var]) return vars
[docs]def load_domain(base_path, data_file, derived, interpolator, coords='hght', aggregator=None, split=None): """ Load one timestep of one ensemble member. """ var_list, func = derived axes = interpolator.get_axes() if coords == 'pres': try: var_list.remove('p') except ValueError: pass var_list.insert(0, 'p') if split is not None: vars = {} for (icore, jcore) in split: hdf = nio.open_file("%s/%s_%03d%03d" % (base_path, data_file, icore + 1, jcore + 1), mode='r', format='hdf') vars[(icore, jcore)] = _load_vars(hdf, var_list, axes, interpolator) hdf.close() vars = _merge_cores(vars) else: hdf = nio.open_file("%s/%s" % (base_path, data_file), mode='r', format='hdf') vars = _load_vars(hdf, var_list, axes, interpolator, coords=coords) hdf.close() if aggregator is None: result = func(**vars) else: result = recarray_fm_dict(**vars) if interpolator.is_buffered(): if result.dtype.fields: result = dict_fm_recarray(result) for var in result.iterkeys(): result[var] = interpolator(result[var]) result = recarray_fm_dict(**result) else: result = interpolator(result) gc.collect() return result
def _load_ensemble_timestep(base_path, members, times, derived, interpolator, aggregator=None, max_concurrent=-1, single_var="", state='ena', coords='hght', split=None): if type(members) in [int]: members = range(members) else: members = [m - 1 for m in members] ensemble = None state_str = {'ena':"analyses", 'enf':"forecasts"}[state] progress = ProgressBar("Loading %s: " % state_str, len(members) * len(times), width=40) progress.initialize() for wdt, t_ens in enumerate(times): data_files = [_build_ens_name(n_ens, t_ens, state=state, single_var=single_var, split=(split is not None)) for n_ens in members] # print "Loading %s time %d ..." % (state_str, t_ens) timestep = run_concurrently(load_domain, data_files, args=(base_path, "__placeholder__", derived, interpolator), kwargs={'coords':coords, 'aggregator':aggregator, 'split':split}, max_concurrent=max_concurrent, progress=progress ) if aggregator is not None: var_list, func = derived timestep = dict_fm_recarray(np.array(list(timestep), dtype=timestep[0].dtype)) for var, val in timestep.iteritems(): timestep[var] = aggregator(val) timestep = func(**timestep) if ensemble is None: ensemble = np.empty((len(times),) + timestep.shape, dtype=timestep.dtype) ensemble[wdt] = timestep else: if ensemble is None: base = timestep[0] ensemble = np.empty((len(members), len(times)) + base.shape, dtype=base.dtype) for lde, ens_data in enumerate(timestep): ensemble[lde, wdt] = ens_data return ensemble def _load_ensemble_run(base_path, members, times, derived, interpolator, max_concurrent=-1, single_var="", state='ena', coords='hght', split=None): if type(members) in [int]: members = range(members) else: members = [m - 1 for m in members] ensemble = None for lde, n_ens in enumerate(members): data_files = [_build_ens_name(n_ens, t_ens, single_var=single_var, state=state) for t_ens in times] print "Loading member %d ..." % (n_ens + 1) run = run_concurrently(load_domain, data_files, args=(base_path, "__placeholder__", derived, interpolator), kwargs={'coords': coords, 'split': split}, max_concurrent=max_concurrent ) if ensemble is None: base = run[0] ensemble = np.empty((len(members), len(times)) + base.shape, dtype=base.dtype) for wdt, ens_data in enumerate(run): ensemble[lde, wdt] = ens_data return ensemble
[docs]def load_ensemble(base_path, members, times, var_names, derived=recarray_fm_dict, interpolator=NullInterpolator(), aggregator=None, max_concurrent=-1, single_var=False, z_coord_type="atmos", coords='hght', fcst=False, split=None): """ Load an entire ensemble into memory. Optionally, do interpolation, compute derived variables, and aggregate the ensemble. Args: base_path (str): Path to the data to load. members (int or list): If an integer, determines the number of members to load (e.g. load members 1 through `members`). If a list, determines which members to load (e.g. load members 1, 4, 10, and 19). times (list): A list of times (in seconds since initialization) at which to load data. var_names (list): A list of variable names to load from the file. derived (function): A function to compute derived variables. The function must take a collection of keyword arguments (e.g. **kwargs) and return a single numpy array. Optional, default is to return all the variables in a numpy record array. interpolator (Interpolator): An interpolator object (as defined in pycaps.interp.interp) specifying how to interpolate each member at each time. Optional, default is to do no interpolation (return the full three- dimensional domain) aggregator (function): A function describing how to aggregate the ensemble members *prior* to computing derived variables. The function must take a single numpy array, the first dimension of which is the ensemble, and return another numpy array. Could be used for computing an ensemble mean prior to computing reflectivity. Optional, default is to do no aggreggation. max_concurrent (int): The number of processes to run concurrently. Optional, default is to load all ensemble members at the same time (or in some configurations, all time steps from an individual member at the same time). single_var (bool): Whether or not to load data from a single variable file. If true, then var_names must be of length 1. Optional, default is False. z_coord_type (str): Specifies which z coordinates to use. Acceptable values are "atmos" for atmospheric z coordinates or "soil" for soil z coordinates. Optional, default is "atmos". coords (str): Specifies which vertical coordinate to use in the atmosphere. Acceptable values are "hght" for for height coordinates, and "pres" for pressure coordinates. Optional, default is "hght". fcst (bool): Specifies whether to load the forecast (True) or analysis (False) ensemble. Optional, default is False (loads analysis ensemble). split (tuple): Specifies the domain configuration for split ensembles. Must be a tuple (NPX, NPY), where NPX is the number of subdomains in the x direction, and NPY is the same for the y direction. Optional, default is an already-joined ensemble. Returns: A numpy array containing the data in the ensemble. For the full ensemble with no interpolation or aggregation, the order of dimensions will be (NE, NT, NZ, NY, NX). Aggregation will remove the NE dimension, and interpolation will change the last three according to which interpolation is being done (for example, interpolation to a height will remove the NZ dimension, while interpolating to a set of points will replace the NZ, NY, and NX dimensions with NP). """ load_by_run = False load_by_run |= type(members) in [list, tuple] and len(members) == 1 load_by_run &= aggregator is None states = ['ena', 'enf'] if z_coord_type == 'atmos': z_coord_type = "" elif z_coord_type != "soil": raise ValueError("Acceptable values for z_coord_type are 'atmos' and 'soil'!") axes = get_axes(base_path, 0, agl=interpolator.is_agl(), z_coord_type=z_coord_type, split=split, fcst=fcst) interpolator.set_axes(axes) if single_var and len(var_names) != 1: raise ValueError("If loading a single variable, the length of var_names must be 1!") elif single_var: single_var = var_names[0] else: single_var = "" if load_by_run: return _load_ensemble_run(base_path, members, times, (var_names, derived), interpolator, max_concurrent=max_concurrent, single_var=single_var, state=states[int(fcst)], split=split) else: return _load_ensemble_timestep(base_path, members, times, (var_names, derived), interpolator, aggregator=aggregator, max_concurrent=max_concurrent, single_var=single_var, coords=coords, state=states[int(fcst)], split=split)
[docs]def load_run(base_path, base_name, times, var_names, derived=recarray_fm_dict, interpolator=NullInterpolator(), max_concurrent=-1, single_var=False, z_coord_type="atmos", coords='hght', split=None): """ Load a single run into memory. Optionally, do interpolation and compute derived variables. Args: base_path (str): Path to the data to load. base_name (str): The times (list): A list of times (in seconds since initialization) at which to load data. var_names (list): A list of variable names to load from the file. derived (function): A function to compute derived variables. The function must take a collection of keyword arguments (e.g. **kwargs) and return a single numpy array. Optional, default is to return all the variables in a numpy record array. interpolator (Interpolator): An interpolator object (as defined in pycaps.interp.interp) specifying how to interpolate each member at each time. Optional, default is to do no interpolation (return the full three- dimensional domain) max_concurrent (int): The number of processes to run concurrently. Optional, default is to load all time steps at the same time. single_var (bool): Whether or not to load data from a single variable file. If true, then var_names must be of length 1. Optional, default is False. z_coord_type (str): Specifies which z coordinates to use. Acceptable values are "atmos" for atmospheric z coordinates or "soil" for soil z coordinates. Optional, default is "atmos". coords (str): Specifies which vertical coordinate to use in the atmosphere. Acceptable values are "hght" for for height coordinates, and "pres" for pressure coordinates. Optional, default is "hght". split (tuple): Specifies the domain configuration for split ensembles. Must be a tuple (NPX, NPY), where NPX is the number of subdomains in the x direction, and NPY is the same for the y direction. Optional, default is an already-joined ensemble. Returns: A numpy array containing the data for the run. For the full run with no interpolation, the order of dimensions will be (NT, NZ, NY, NX). Iinterpolation will change the last three according to which interpolation is being done (for example, interpolation to a height will remove the NZ dimension, while interpolating to a set of points will replace the NZ, NY, and NX dimensions with NP). """ if single_var and len(var_names) != 1: raise ValueError("If loading a single variable, the length of var_names must be 1!") elif single_var: single_var = var_names[0] else: single_var = "" data_files = [_build_run_name(base_name, t_ens, single_var=single_var) for t_ens in times] if z_coord_type == 'atmos': z_coord_type = "" elif z_coord_type != "soil": raise ValueError("Acceptable values for z_coord_type are 'atmos' and 'soil'!") axes = get_axes(base_path, base_name, agl=interpolator.is_agl(), z_coord_type=z_coord_type, split=split) interpolator.set_axes(axes) progress = ProgressBar("Loading run '%s': " % base_name, len(times), width=40) progress.initialize() run = run_concurrently(load_domain, data_files, args=(base_path, "__placeholder__", (var_names, derived), interpolator), kwargs={'coords': coords, 'split': split}, max_concurrent=max_concurrent, progress=progress ) base = run[0] full_run = np.empty((len(times), ) + base.shape, dtype=base.dtype) for wdt in xrange(len(times)): full_run[wdt] = run[wdt] return full_run
if __name__ == "__main__": from pycaps.derive import reflectivity_lin from pycaps.interp import ZInterpolator from datetime import datetime var_names = ['p', 'pt', 'qr', 'qs', 'qh'] start = datetime.now() ens = load_ensemble("/caps2/tsupinie/05June2009/1km-i-no-mm/", 40, [14400], var_names, derived=reflectivity_lin, interpolator=ZInterpolator(1000., agl=True)) print "Time to read ensemble (parallel):", datetime.now() - start print np.nanmax(ens) # cores = [ (i, j) for i in xrange(2) for j in xrange(15) ] # getAxes("/caps2/tsupinie/24May2011/3km-MPAR/", "EN001/ena001", split=cores)