from progress import ProgressBar
from multiprocessing import Process, Queue
import sys
import copy
import traceback
from functools import wraps
[docs]def run_concurrently(target, placeholder_vals, args=[], kwargs={}, max_concurrent=-1, zip_result=False, progress=None):
"""
Runs several instances of a function at the same time and returns all their outputs as a list.
Args:
target (function) Function to run. For this version of the function, it must return something.
placeholder_vals (list): Values of a placeholder parameter to run the function on.
args (list): Arguments to pass to the target function. One or more may have the special value
"__placeholder__", which will be replaced with a value from placeholder_vals for each instance of the
function.
kwargs (dict): Keyword arguments to pass to the target function. One or more may have the special value
"__placeholder__", which will be replaced with a value from placeholder_vals for each instance of the
function.
max_concurrent (int): Maximum number of function instances to run at the same time. The default is to run an
instance for each value in placeholder_vals at the same time.
zip_result (bool): A boolean specifying whether to run the zip() function on the result, perhaps if the function
returns a tuple of values. Default is False.
progress (ProgressBar): An optional progress bar instance to use in displaying progress on the tasks.
Returns:
A list of the return values from each instance of the function, sorted by the corresponding placeholder value.
"""
if not progress:
progress = ProgressBar("", len(placeholder_vals), width=40)
progress.initialize()
# Dummy function; sets up the pipe for parallelization so the user doesn't have to.
def doRun(target, pipe, tag, args, kwargs):
try:
ret_val = target(*args, **kwargs)
except Exception as e:
ret_val = (e, traceback.format_exc())
except KeyboardInterrupt as e:
ret_val = (e, traceback.format_exc())
pipe.put((tag, ret_val))
return
pipe = Queue(len(list(placeholder_vals)))
ph_vals = copy.copy(placeholder_vals)
ret_vals = []
ph_done = 0
exc_caught = False
if max_concurrent > 0:
n_chunks = len(ph_vals) / max_concurrent + 1
else:
n_chunks = 1
n_iters = 1
while len(ph_vals) > 0:
# if n_chunks > 1:
# print "Working on chunk %d of %d ..." % (n_iters, n_chunks)
# We're going to end up popping off chunks of the ph_vals list, so loop until there's nothing left in that list.
# Do the pop.
if max_concurrent == -1: max_concurrent = len(ph_vals)
ph_chunk = [ ph_vals.pop(0) for idx in range(min(max_concurrent, len(ph_vals))) ]
procs = {}
for ph_idx, ph_val in enumerate(ph_chunk):
tag = ph_idx + ph_done
# Run an instance of the function for every value in this chunk.
# Replace all the instances of "__placeholder__" in the arguments.
ph_args = tuple([ ph_val if item == "__placeholder__" else item for item in args ])
ph_kwargs = dict([ (key, ph_val) if val == "__placeholder__" else (key, val) for key, val in kwargs.iteritems() ])
# Instantiate the process and start it. This calls the dummy function doRun above, which calls the actual target function.
if type(ph_val) in [ str, int, float ]:
pname = str(ph_val)
else:
pname = str(tag)
proc = Process(target=doRun, name=pname, args=(target, pipe, tag, ph_args, ph_kwargs))
proc.start()
procs[tag] = proc
while len(procs) > 0:
# Wait for the processes to finish, deleting it from the procs list whenever it's finished. Loops until all the
# processes have finished.
pipe_out = pipe.get()
tag, ret_val = pipe_out
if isinstance(ret_val, tuple) and len(ret_val) == 2:
if isinstance(ret_val[0], Exception):
print ret_val[1]
exc_caught = True
elif isinstance(ret_val[0], KeyboardInterrupt):
print "Keyboard Interrupt!"
for proc in procs.values():
proc.terminate()
sys.exit()
# raise ret_val
ret_vals.append(pipe_out)
del procs[tag]
progress.complete_step()
if exc_caught:
print "Caught exception(s), exiting."
sys.exit()
ph_done += len(ph_chunk)
n_iters += 1
# Sort by placeholder value, keep only the return values themselves.
ret_vals = zip(*sorted(ret_vals, key=lambda x: x[0]))[1]
if zip_result:
return zip(*ret_vals)
else:
return ret_vals
[docs]class abstract(object):
"""
Use as a decorator to declare a method as abstract.
"""
def __init__(self, func, custom_name=None):
self._func = func
self._name = custom_name if custom_name is not None else func.__name__
wraps(func)(self)
return
def __call__(self, *args, **kwargs):
raise NotImplementedError("Function '%s' is abstract and must be implemented in a subclass." % self._name)