import Nio as nio
import numpy as np
from pycaps.interp import NullInterpolator
from pycaps.derive import recarray_fm_dict, dict_fm_recarray
from pycaps.util import arps_decompress
from pycaps.util import ProgressBar
from pycaps.util import run_concurrently
import gc
def _make_z_coords_agl(z_coords):
z_coords_agl = z_coords - z_coords[1][np.newaxis]
return z_coords_agl, z_coords
def _build_ens_name(n_ens, t_ens, state='ena', single_var="", split=False):
if split:
dir_state = (state[:-1] if state == 'ena' else state).upper()
dir = "%s%03d/" % (dir_state, n_ens + 1)
else:
dir = ""
return "%s%s%03d.hdf%s%06d" % (dir, state, n_ens + 1, single_var, t_ens)
def _build_ens_grdbas(n_ens, state='ena', split=False):
if split:
dir_state = (state[:-1] if state == 'ena' else state).upper()
dir = "%s%03d/" % (dir_state, n_ens + 1)
else:
dir = ""
return "%s%s%03d.hdfgrdbas" % (dir, state, n_ens + 1)
def _build_run_name(base_name, t_ens, single_var=""):
return "%s.hdf%s%06d" % (base_name, single_var, t_ens)
def _build_run_grdbas(base_name):
return "%s.hdfgrdbas" % base_name
[docs]def get_axes(base_path, base_name, agl=True, z_coord_type="", split=None, fcst=False):
"""
Get the axes from a grdbas file.
Args:
base_path (str): Path to the grdbas file.
base_name (str): Base name of the grdbas file (e.g. 'enf001').
agl (bool): Whether to return the z coordinates relative to ground level (True) or MSL (False)
z_coord_type (str): Which set of z coordinates to use. "" refers to the atmospheric coordinates, and "soil"
refers to the soil z coordinates. Default is "" (atmospheric coordinates).
split (tuple): If the domain is split, then this is tuple contains the domain decomposition in (NX, NY).
Returns:
A dictionary containing the x and y coordinates in the 'x' and 'y' keys, and either 'z' and 'z_MSL' (if
agl=True) or 'z' and 'z_AGL' (if agl=False)
"""
if type(base_name) == int:
state = {True:'enf', False:'ena'}[fcst]
grdbas_file = _build_ens_grdbas(base_name, split=(split is not None), state=state)
elif type(base_name) == str:
grdbas_file = _build_run_grdbas(base_name)
if split is None:
hdf_grdbas = nio.open_file("%s/%s" % (base_path, grdbas_file), mode='r', format='hdf')
axes = dict((ax[:1], arps_decompress(hdf_grdbas.variables[ax])) for ax in ['x', 'y', "zp%s" % z_coord_type])
hdf_grdbas.close()
else:
vars = {}
for (icore, jcore) in split:
hdf_grdbas = nio.open_file("%s/%s_%03d%03d" % (base_path, grdbas_file, icore + 1, jcore + 1),
mode='r', format='hdf')
axes = dict((ax[:1], arps_decompress(hdf_grdbas.variables[ax])) for ax in ['x', 'y', "zp%s" % z_coord_type])
axes['x'] = axes['x'][np.newaxis, np.newaxis, :]
axes['y'] = axes['y'][np.newaxis, :, np.newaxis]
vars[(icore, jcore)] = axes
hdf_grdbas.close()
axes = _merge_cores(vars)
axes['x'] = axes['x'][0, 0, :]
axes['y'] = axes['y'][0, :, 0]
for ax_dir, ax in axes.iteritems():
if ax_dir in ['x', 'y']:
axes[ax_dir] = ax - ax[0]
if agl:
axes['z'], axes['z_MSL'] = _make_z_coords_agl(axes['z'])
else:
axes['z_AGL'], axes['z'] = _make_z_coords_agl(axes['z'])
return axes
def _merge_cores(vars):
meshed_cores = {}
cores = vars.keys()
xcores, ycores = [ np.unique(c) for c in zip(*cores) ]
var_list = vars[cores[0]].keys()
for var in var_list:
nz_core, ny_core, nx_core = vars[cores[0]][var].shape
full_ny = 1 if ny_core == 1 else (ny_core - 3) * len(ycores) + 3
full_nx = 1 if nx_core == 1 else (nx_core - 3) * len(xcores) + 3
meshed_cores[var] = np.empty((nz_core, full_ny, full_nx), dtype=np.float32)
if full_ny == 1:
for icore, xcore in enumerate(xcores):
lower_bound_x = (nx_core - 3) * icore
upper_bound_x = (nx_core - 3) * (icore + 1) + 3
meshed_cores[var][:, :, lower_bound_x:upper_bound_x] = vars[(xcore, ycores[0])][var]
elif full_nx == 1:
for jcore, ycore in enumerate(ycores):
lower_bound_y = (ny_core - 3) * jcore
upper_bound_y = (ny_core - 3) * (jcore + 1) + 3
meshed_cores[var][:, lower_bound_y:upper_bound_y, :] = vars[(xcores[0], ycore)][var]
else:
for icore, xcore in enumerate(xcores):
for jcore, ycore in enumerate(ycores):
lower_bound_x = (nx_core - 3) * icore
upper_bound_x = (nx_core - 3) * (icore + 1) + 3
lower_bound_y = (ny_core - 3) * jcore
upper_bound_y = (ny_core - 3) * (jcore + 1) + 3
meshed_cores[var][:, lower_bound_y:upper_bound_y, lower_bound_x:upper_bound_x] = \
vars[(xcore, ycore)][var]
return meshed_cores
def _load_data(hdf_var, bounds):
if len(hdf_var.shape) == 2:
bounds = bounds[1:]
if hdf_var[:].max() == np.iinfo(np.int16).max:
data = arps_decompress(hdf_var, dindex=bounds)
else:
data = hdf_var[bounds]
return data
def _load_vars(hdf, var_list, axes, interpolator, coords='hght'):
"""
Load a single hdf file
"""
vars = {}
if coords == 'hght':
bounds = interpolator.get_bounds()
for var in var_list:
dointerp = False
if var in hdf.variables and var not in ['x', 'y', 'z']:
if coords == 'pres' and var == 'p':
vars[var] = _load_data(hdf.variables[var], slice(None))
axes['z'] = vars[var]
interpolator.set_axes(axes)
bounds = interpolator.get_bounds()
vars[var] = vars[var][bounds]
else:
vars[var] = _load_data(hdf.variables[var], bounds)
dointerp = True
if var == 'z':
if 'z_AGL' in axes:
vars[var] = axes['z_AGL'][bounds]
elif 'z_MSL' in axes:
vars[var] = axes['z_MSL'][bounds]
dointerp = True
elif var == 'y':
vars[var] = axes['y'][:] #(axes['y'][1:] + axes['y'][:-1]) / 2
elif var == 'x':
vars[var] = axes['x'][:] #(axes['x'][1:] + axes['x'][:-1]) / 2
elif var == 'dx':
vars[var] = axes['x'][1] - axes['x'][0]
elif var == 'dy':
vars[var] = axes['y'][1] - axes['y'][0]
elif var == 'dz':
zp = axes['z']
vars[var] = np.vstack((zp[1:] - zp[:-1], zp[-1:]))
if dointerp and not interpolator.is_buffered():
vars[var] = interpolator(vars[var])
return vars
[docs]def load_domain(base_path, data_file, derived, interpolator, coords='hght', aggregator=None, split=None):
"""
Load one timestep of one ensemble member.
"""
var_list, func = derived
axes = interpolator.get_axes()
if coords == 'pres':
try:
var_list.remove('p')
except ValueError:
pass
var_list.insert(0, 'p')
if split is not None:
vars = {}
for (icore, jcore) in split:
hdf = nio.open_file("%s/%s_%03d%03d" % (base_path, data_file, icore + 1, jcore + 1), mode='r', format='hdf')
vars[(icore, jcore)] = _load_vars(hdf, var_list, axes, interpolator)
hdf.close()
vars = _merge_cores(vars)
else:
hdf = nio.open_file("%s/%s" % (base_path, data_file), mode='r', format='hdf')
vars = _load_vars(hdf, var_list, axes, interpolator, coords=coords)
hdf.close()
if aggregator is None:
result = func(**vars)
else:
result = recarray_fm_dict(**vars)
if interpolator.is_buffered():
if result.dtype.fields:
result = dict_fm_recarray(result)
for var in result.iterkeys():
result[var] = interpolator(result[var])
result = recarray_fm_dict(**result)
else:
result = interpolator(result)
gc.collect()
return result
def _load_ensemble_timestep(base_path, members, times, derived, interpolator,
aggregator=None, max_concurrent=-1, single_var="", state='ena', coords='hght', split=None):
if type(members) in [int]:
members = range(members)
else:
members = [m - 1 for m in members]
ensemble = None
state_str = {'ena':"analyses", 'enf':"forecasts"}[state]
progress = ProgressBar("Loading %s: " % state_str, len(members) * len(times), width=40)
progress.initialize()
for wdt, t_ens in enumerate(times):
data_files = [_build_ens_name(n_ens, t_ens, state=state, single_var=single_var, split=(split is not None))
for n_ens in members]
# print "Loading %s time %d ..." % (state_str, t_ens)
timestep = run_concurrently(load_domain, data_files,
args=(base_path, "__placeholder__", derived, interpolator),
kwargs={'coords':coords, 'aggregator':aggregator, 'split':split},
max_concurrent=max_concurrent,
progress=progress
)
if aggregator is not None:
var_list, func = derived
timestep = dict_fm_recarray(np.array(list(timestep), dtype=timestep[0].dtype))
for var, val in timestep.iteritems():
timestep[var] = aggregator(val)
timestep = func(**timestep)
if ensemble is None:
ensemble = np.empty((len(times),) + timestep.shape, dtype=timestep.dtype)
ensemble[wdt] = timestep
else:
if ensemble is None:
base = timestep[0]
ensemble = np.empty((len(members), len(times)) + base.shape, dtype=base.dtype)
for lde, ens_data in enumerate(timestep):
ensemble[lde, wdt] = ens_data
return ensemble
def _load_ensemble_run(base_path, members, times, derived, interpolator, max_concurrent=-1, single_var="", state='ena',
coords='hght', split=None):
if type(members) in [int]:
members = range(members)
else:
members = [m - 1 for m in members]
ensemble = None
for lde, n_ens in enumerate(members):
data_files = [_build_ens_name(n_ens, t_ens, single_var=single_var, state=state) for t_ens in times]
print "Loading member %d ..." % (n_ens + 1)
run = run_concurrently(load_domain, data_files,
args=(base_path, "__placeholder__", derived, interpolator),
kwargs={'coords': coords, 'split': split},
max_concurrent=max_concurrent
)
if ensemble is None:
base = run[0]
ensemble = np.empty((len(members), len(times)) + base.shape, dtype=base.dtype)
for wdt, ens_data in enumerate(run):
ensemble[lde, wdt] = ens_data
return ensemble
[docs]def load_ensemble(base_path, members, times, var_names,
derived=recarray_fm_dict, interpolator=NullInterpolator(), aggregator=None, max_concurrent=-1,
single_var=False, z_coord_type="atmos", coords='hght', fcst=False, split=None):
"""
Load an entire ensemble into memory. Optionally, do interpolation, compute derived variables, and aggregate the
ensemble.
Args:
base_path (str): Path to the data to load.
members (int or list): If an integer, determines the number of members to load (e.g. load members 1 through
`members`). If a list, determines which members to load (e.g. load members 1, 4, 10, and 19).
times (list): A list of times (in seconds since initialization) at which to load data.
var_names (list): A list of variable names to load from the file.
derived (function): A function to compute derived variables. The function must take a collection of keyword
arguments (e.g. **kwargs) and return a single numpy array. Optional, default is to return all the variables
in a numpy record array.
interpolator (Interpolator): An interpolator object (as defined in pycaps.interp.interp) specifying how to
interpolate each member at each time. Optional, default is to do no interpolation (return the full three-
dimensional domain)
aggregator (function): A function describing how to aggregate the ensemble members *prior* to computing derived
variables. The function must take a single numpy array, the first dimension of which is the ensemble, and
return another numpy array. Could be used for computing an ensemble mean prior to computing reflectivity.
Optional, default is to do no aggreggation.
max_concurrent (int): The number of processes to run concurrently. Optional, default is to load all ensemble
members at the same time (or in some configurations, all time steps from an individual member at the same
time).
single_var (bool): Whether or not to load data from a single variable file. If true, then var_names must be of
length 1. Optional, default is False.
z_coord_type (str): Specifies which z coordinates to use. Acceptable values are "atmos" for atmospheric z
coordinates or "soil" for soil z coordinates. Optional, default is "atmos".
coords (str): Specifies which vertical coordinate to use in the atmosphere. Acceptable values are "hght" for for
height coordinates, and "pres" for pressure coordinates. Optional, default is "hght".
fcst (bool): Specifies whether to load the forecast (True) or analysis (False) ensemble. Optional, default is
False (loads analysis ensemble).
split (tuple): Specifies the domain configuration for split ensembles. Must be a tuple (NPX, NPY), where NPX is
the number of subdomains in the x direction, and NPY is the same for the y direction. Optional, default is
an already-joined ensemble.
Returns:
A numpy array containing the data in the ensemble. For the full ensemble with no interpolation or aggregation,
the order of dimensions will be (NE, NT, NZ, NY, NX). Aggregation will remove the NE dimension, and
interpolation will change the last three according to which interpolation is being done (for example,
interpolation to a height will remove the NZ dimension, while interpolating to a set of points will replace the
NZ, NY, and NX dimensions with NP).
"""
load_by_run = False
load_by_run |= type(members) in [list, tuple] and len(members) == 1
load_by_run &= aggregator is None
states = ['ena', 'enf']
if z_coord_type == 'atmos':
z_coord_type = ""
elif z_coord_type != "soil":
raise ValueError("Acceptable values for z_coord_type are 'atmos' and 'soil'!")
axes = get_axes(base_path, 0, agl=interpolator.is_agl(), z_coord_type=z_coord_type, split=split, fcst=fcst)
interpolator.set_axes(axes)
if single_var and len(var_names) != 1:
raise ValueError("If loading a single variable, the length of var_names must be 1!")
elif single_var:
single_var = var_names[0]
else:
single_var = ""
if load_by_run:
return _load_ensemble_run(base_path, members, times, (var_names, derived), interpolator,
max_concurrent=max_concurrent, single_var=single_var, state=states[int(fcst)],
split=split)
else:
return _load_ensemble_timestep(base_path, members, times, (var_names, derived), interpolator,
aggregator=aggregator, max_concurrent=max_concurrent, single_var=single_var,
coords=coords, state=states[int(fcst)], split=split)
[docs]def load_run(base_path, base_name, times, var_names,
derived=recarray_fm_dict, interpolator=NullInterpolator(), max_concurrent=-1, single_var=False,
z_coord_type="atmos", coords='hght', split=None):
"""
Load a single run into memory. Optionally, do interpolation and compute derived variables.
Args:
base_path (str): Path to the data to load.
base_name (str): The
times (list): A list of times (in seconds since initialization) at which to load data.
var_names (list): A list of variable names to load from the file.
derived (function): A function to compute derived variables. The function must take a collection of keyword
arguments (e.g. **kwargs) and return a single numpy array. Optional, default is to return all the variables
in a numpy record array.
interpolator (Interpolator): An interpolator object (as defined in pycaps.interp.interp) specifying how to
interpolate each member at each time. Optional, default is to do no interpolation (return the full three-
dimensional domain)
max_concurrent (int): The number of processes to run concurrently. Optional, default is to load all time steps
at the same time.
single_var (bool): Whether or not to load data from a single variable file. If true, then var_names must be of
length 1. Optional, default is False.
z_coord_type (str): Specifies which z coordinates to use. Acceptable values are "atmos" for atmospheric z
coordinates or "soil" for soil z coordinates. Optional, default is "atmos".
coords (str): Specifies which vertical coordinate to use in the atmosphere. Acceptable values are "hght" for for
height coordinates, and "pres" for pressure coordinates. Optional, default is "hght".
split (tuple): Specifies the domain configuration for split ensembles. Must be a tuple (NPX, NPY), where NPX is
the number of subdomains in the x direction, and NPY is the same for the y direction. Optional, default is
an already-joined ensemble.
Returns:
A numpy array containing the data for the run. For the full run with no interpolation, the order of dimensions
will be (NT, NZ, NY, NX). Iinterpolation will change the last three according to which interpolation is being
done (for example, interpolation to a height will remove the NZ dimension, while interpolating to a set of
points will replace the NZ, NY, and NX dimensions with NP).
"""
if single_var and len(var_names) != 1:
raise ValueError("If loading a single variable, the length of var_names must be 1!")
elif single_var:
single_var = var_names[0]
else:
single_var = ""
data_files = [_build_run_name(base_name, t_ens, single_var=single_var) for t_ens in times]
if z_coord_type == 'atmos':
z_coord_type = ""
elif z_coord_type != "soil":
raise ValueError("Acceptable values for z_coord_type are 'atmos' and 'soil'!")
axes = get_axes(base_path, base_name, agl=interpolator.is_agl(), z_coord_type=z_coord_type, split=split)
interpolator.set_axes(axes)
progress = ProgressBar("Loading run '%s': " % base_name, len(times), width=40)
progress.initialize()
run = run_concurrently(load_domain, data_files,
args=(base_path, "__placeholder__", (var_names, derived), interpolator),
kwargs={'coords': coords, 'split': split},
max_concurrent=max_concurrent,
progress=progress
)
base = run[0]
full_run = np.empty((len(times), ) + base.shape, dtype=base.dtype)
for wdt in xrange(len(times)):
full_run[wdt] = run[wdt]
return full_run
if __name__ == "__main__":
from pycaps.derive import reflectivity_lin
from pycaps.interp import ZInterpolator
from datetime import datetime
var_names = ['p', 'pt', 'qr', 'qs', 'qh']
start = datetime.now()
ens = load_ensemble("/caps2/tsupinie/05June2009/1km-i-no-mm/", 40, [14400], var_names,
derived=reflectivity_lin, interpolator=ZInterpolator(1000., agl=True))
print "Time to read ensemble (parallel):", datetime.now() - start
print np.nanmax(ens)
# cores = [ (i, j) for i in xrange(2) for j in xrange(15) ]
# getAxes("/caps2/tsupinie/24May2011/3km-MPAR/", "EN001/ena001", split=cores)