Skip to content
This repository has been archived by the owner on Sep 10, 2022. It is now read-only.

parallel (module)

Aaron Graubert edited this page Feb 9, 2018 · 5 revisions

agutil.parallel (module)

Note: Due to inherent issues with CPython, thread-based parallelization system is not suited for CPU bound tasks. I/O bound tasks will, however, be able to benefit from thread-based parallelization. Process-based parallelization is optimized for CPU bound tasks

Warning: Using process-based parallelization has several limitations that thread-based parallelization does not suffer from. See explanation below for details

Warning: agutil.parallel.parallelize2 is not suitable for recursive functions. If a function recursively calls itself and then uses the callback to wait for the value, the called function may block forever. See explanation below for details.

API
  • parallelize(maximum=15, workertype=WORKERTYPE_THREAD)

    Decorator factory. Returns a function which can be used to decorate another function. The decorated function has an identical signature, except that in place of each argument or keyword argument, you should provide an iterable of arguments instead. The iterables will define the different calls made to the un-decorated function. The decorated function will return a generator which yields values in order (see the dispatcher return guarantee, below) from the background calls. The generator will call the un-decorated function until one of the argument iterables is exhausted. The maximum argument sets the maximum number of workers which can run at one time. You may set workertype to agutil.parallel.WORKERTYPE_PROCESS to use process-based parallelization. Process-based parallelization is useful if your function is CPU-intensive but there are limitations which effect the syntax of parallelize (see note below)

  • parallelize2(maximum=15, workertype=WORKERTYPE_THREAD)

    Decorator factory. Returns a function which can be used to decorate another function. The decorated function has an identical signature, but calls will return callback objects instead of function results. When called, the callback waits for the result from the associated call to return, then returns that value (or raises an exception if the call failed). The maximum argument sets the maximum number of workers which can run at one time. You may set workertype to agutil.parallel.WORKERTYPE_PROCESS to use process-based parallelization. Process-based parallelization is useful if your function is CPU-intensive but there are limitations which effect the syntax of parallelize (see note below). Note: parallelize2 may encounter a substantially larger system overhead than parallelize or dispatcher when calling the decorated function many (>100) times

Example: parallelize vs parallelize2

These two decorators provide essentially the same system of parallelization, with slightly different flavors. Given two functions foo and bar:

@parallelize()
def foo(n):
  #Do some work
  return results

@parallelize2()
def bar(n):
  #Do the same work
  return results

Both foo and bar will perform the same tasks in roughly the same amount of time, the only difference being how the functions are invoked.

#get a list of results from foo, using parallelize
results_1 = [x for x in foo(range(100))]

#Start all the workers for bar, using parallelize2
tmp = [bar(x) for x in range(100)]
#Now wait for and retrieve the results
results_2 = [callback() for callback in tmp]

When to use parallelize vs parallelize2

Both functions have advantages and disadvantages.

@parallelize is useful in the following conditions:

  • You know exactly how many times you will call the function and you know what the arguments will be for each call
  • You are going to iterate over the results

@parallelize2 is useful in the following conditions:

  • You cannot determine in advance how many times the function will be called
  • You may not need to iterate over the results, but simply need the results at some later point in time
  • The function is not recursive

Multiprocessing note

Process-based parallelization has several limitations due to the implementation of the builtin pickle module:

  • You CANNOT use paralellize or parallelize2 as decorators. You must call them on a pre-defined function and assign the result to a function with a different name.
  • You CANNOT use parallelize or parallelize2 on a function created within another function's closure. You must call them on globally accessible functions

Example usage:

def foo(arg):
  return arg + 1

bar = parallelize(workertype=WORKERTYPE_PROCESS)(foo)

Thread-based parallelization does not suffer these limitations and therefore both parallelize and parallelize2 can be used as decorators and on function-local objects when using WORKERTYPE_THREAD (default)

parallelize2 recursion issues:

agutil.parallel.parallelize2 enforces a maximum worker count (per-function) by putting workers to sleep until one of the running workers finishes. If a recursive function calls itself, it will immediately return the callback (standard behavior of parallelize2). However, if the maximum worker count has been reached, the worker for the recursive call will be put to sleep immediately. If the calling function then uses the callback to wait for the value, it may block forever unless another worker finishes:

@agutil.parallel.parallelize2()
def func(args):
    # While this function is working, other calls to func()
    # fill up the remaining worker slots so future calls to func()
    # will be put to sleep until workers start finishing
    callback = func(args) # recursively call func()
    # This call to func is immediately put to sleep since the maximum
    # worker count has been reached.
    callback() # wait for the result
    # Unless one of the other workers finishes, the above function will block
    # forever

# For this specific example, calling func() will block forever on one call
# as the recursive calls take up worker slots until they start being put to sleep

func()() # blocks forever

agutil.parallel.parallelize can be used in recursive functions because the worker count is specific to each invocation, so a recursive call uses its own worker pool