Source code for pycaps.util.util


from progress import ProgressBar

from multiprocessing import Process, Queue
import sys
import copy
import traceback

from functools import wraps


[docs]def run_concurrently(target, placeholder_vals, args=[], kwargs={}, max_concurrent=-1, zip_result=False, progress=None): """ Runs several instances of a function at the same time and returns all their outputs as a list. Args: target (function) Function to run. For this version of the function, it must return something. placeholder_vals (list): Values of a placeholder parameter to run the function on. args (list): Arguments to pass to the target function. One or more may have the special value "__placeholder__", which will be replaced with a value from placeholder_vals for each instance of the function. kwargs (dict): Keyword arguments to pass to the target function. One or more may have the special value "__placeholder__", which will be replaced with a value from placeholder_vals for each instance of the function. max_concurrent (int): Maximum number of function instances to run at the same time. The default is to run an instance for each value in placeholder_vals at the same time. zip_result (bool): A boolean specifying whether to run the zip() function on the result, perhaps if the function returns a tuple of values. Default is False. progress (ProgressBar): An optional progress bar instance to use in displaying progress on the tasks. Returns: A list of the return values from each instance of the function, sorted by the corresponding placeholder value. """ if not progress: progress = ProgressBar("", len(placeholder_vals), width=40) progress.initialize() # Dummy function; sets up the pipe for parallelization so the user doesn't have to. def doRun(target, pipe, tag, args, kwargs): try: ret_val = target(*args, **kwargs) except Exception as e: ret_val = (e, traceback.format_exc()) except KeyboardInterrupt as e: ret_val = (e, traceback.format_exc()) pipe.put((tag, ret_val)) return pipe = Queue(len(list(placeholder_vals))) ph_vals = copy.copy(placeholder_vals) ret_vals = [] ph_done = 0 exc_caught = False if max_concurrent > 0: n_chunks = len(ph_vals) / max_concurrent + 1 else: n_chunks = 1 n_iters = 1 while len(ph_vals) > 0: # if n_chunks > 1: # print "Working on chunk %d of %d ..." % (n_iters, n_chunks) # We're going to end up popping off chunks of the ph_vals list, so loop until there's nothing left in that list. # Do the pop. if max_concurrent == -1: max_concurrent = len(ph_vals) ph_chunk = [ ph_vals.pop(0) for idx in range(min(max_concurrent, len(ph_vals))) ] procs = {} for ph_idx, ph_val in enumerate(ph_chunk): tag = ph_idx + ph_done # Run an instance of the function for every value in this chunk. # Replace all the instances of "__placeholder__" in the arguments. ph_args = tuple([ ph_val if item == "__placeholder__" else item for item in args ]) ph_kwargs = dict([ (key, ph_val) if val == "__placeholder__" else (key, val) for key, val in kwargs.iteritems() ]) # Instantiate the process and start it. This calls the dummy function doRun above, which calls the actual target function. if type(ph_val) in [ str, int, float ]: pname = str(ph_val) else: pname = str(tag) proc = Process(target=doRun, name=pname, args=(target, pipe, tag, ph_args, ph_kwargs)) proc.start() procs[tag] = proc while len(procs) > 0: # Wait for the processes to finish, deleting it from the procs list whenever it's finished. Loops until all the # processes have finished. pipe_out = pipe.get() tag, ret_val = pipe_out if isinstance(ret_val, tuple) and len(ret_val) == 2: if isinstance(ret_val[0], Exception): print ret_val[1] exc_caught = True elif isinstance(ret_val[0], KeyboardInterrupt): print "Keyboard Interrupt!" for proc in procs.values(): proc.terminate() sys.exit() # raise ret_val ret_vals.append(pipe_out) del procs[tag] progress.complete_step() if exc_caught: print "Caught exception(s), exiting." sys.exit() ph_done += len(ph_chunk) n_iters += 1 # Sort by placeholder value, keep only the return values themselves. ret_vals = zip(*sorted(ret_vals, key=lambda x: x[0]))[1] if zip_result: return zip(*ret_vals) else: return ret_vals
[docs]class abstract(object): """ Use as a decorator to declare a method as abstract. """ def __init__(self, func, custom_name=None): self._func = func self._name = custom_name if custom_name is not None else func.__name__ wraps(func)(self) return def __call__(self, *args, **kwargs): raise NotImplementedError("Function '%s' is abstract and must be implemented in a subclass." % self._name)
[docs]def format_arps_time(rawtime): """ Takes an integer number provided and converts it to the six-digit ARPS time format, which is then returned. Args: rawtime (str): The number to be converted into the ARPS time format Returns: rawtime converted into the ARPS time format """ arpstime = '{:0>6}'.format(int(rawtime)) return arpstime