Source code for soma.mpfork

#!usr/bin/env python
# -*- coding: utf-8 -*-

'''
Run worker functions in separate processes.

The use case is somewhat similar to what can be done using either :class:`queue.Queue <Queue.Queue>` and threads, or using :mod:`multiprocessing`. That is: run a number of independent job functions, dispatched over a number of worker threads/processes.

The mpfork module is useful in cases the above methods cannot work:

* threading in python is globally mainly inefficient because of the GIL.
* :mod:`multiprocessing` makes heavy use of pickles to pass objects and parameters, and there are cases we are using objects which cannot be pickled, or which pickling generates heavy IO traffic (large data objects)

The method here is based on the queue / thread schema, but uses fork() to actually execute the workers in a separate process. Only results are passed through pickles, so the worker return results must be picklable, but not the input arguments and objects.

Use:

* allocate a :class:`Queue <Queue.Queue>`
* allocate a results list with the exact size of the jobs number
* allocate a set of worker threads (typically one per processor or core). The function :func:`allocate_workers` can do this for you.
* fill the queue with jobs, each being a tuple (job_index, function, args, kwargs, results_list)
* add a number of empty jobs (None) to the queue, one per allocated worker: this will be the marker for the end of processing in each worker thread. It is necessary to explicitly add these empty jobs since an empty queue is not the signal for the end of processing: it can be re-filled at any time.
* jobs run in workers
* join the queue
* join the worker threads
* the result list gets the return data for each job

::

    njobs = 10
    q = queue.Queue()
    res = [None] * njobs
    workers = allocate_workers(q, 0) # one per core
    for i in range(njobs):
        job = (i, sum, ((i, i), ), {}, res)
        q.put(job)

    # add as many empty jobs as the workers number to end them
    for i in range(len(workers)):
        q.put(None)

    # wait for every job to complete
    q.join()
    # terminate all threads
    for w in workers:
        w.join()

    print('result:', res)

In case of error, the job result will be an exception with stack information: (exception type, exception instance, stack_info)


Availability: Unix
'''

from __future__ import print_function

from __future__ import absolute_import
import multiprocessing
import threading
import queue
import os
import tempfile
import sys
from six.moves import range
try:
    import cpickle as pickle
except ImportError:
    import pickle

[docs]def run_job(f, *args, **kwargs): ''' Internal function, runs the function in a remote process. Uses fork() to perform it. Availability: Unix ''' out_file = tempfile.mkstemp() os.close(out_file[0]) pid = os.fork() if pid != 0: # parent: wait for the child pid, status = os.waitpid(pid, 0) # read output file #print('read from', os.getpid(), ':', out_file[1]) if os.stat(out_file[1]).st_size == 0: # child did not write anything os.unlink(out_file[1]) raise OSError('child did not output anything') if status != 0: os.unlink(out_file[1]) raise RuntimeError('subprocess error: %d' % status) result = pickle.load(open(out_file[1], 'rb')) os.unlink(out_file[1]) # traceback objects cannot be pickled... #if isinstance(result, tuple) and len(result) == 3 \ #and isinstance(result[1], Exception): ## result is an axception with call stack: reraise it #raise result[0], result[1], result[2] if isinstance(result, Exception): raise result return result # child process try: try: #print('exec in', os.getpid(), ':', f, args, kwargs) result = f(*args, **kwargs) #print('OK') except Exception as e: # traceback objects cannot be pickled... #result = (type(e), e, sys.exc_info()[2]) result = e #print('write:', out_file[1], ':', result) try: pickle.dump(result, open(out_file[1], 'wb'), protocol=2) except Exception as e: print('pickle failed:', e, '\nfor object:', type(result)) finally: # sys.exit() is not enough os._exit(0)
[docs]def worker(q, *args, **kwargs): ''' Internal function: worker thread loop: * pick a job in the queue q * execute it in a remote process (using run_job()) * stopre the result in the result list * start again with another job The loop ends when a job in the queue is None. .. warning:: Here we are making use of fork() (Unix only) inside a thread. Some systems do not behave well in this situation. See :func:`the os.fork() doc <os.fork>` ''' while True: item = q.get() if item is None: q.task_done() break try: #print('run item in', os.getpid(), ':', item[:-1]) sys.stdout.flush() i, f, argsi, kwargsi, res = item argsi = argsi + args kwargs = dict(kwargs) kwargs.update(kwargsi) try: result = run_job(f, *argsi, **kwargs) #print('result:', i, result) res[i] = result except Exception as e: res[i] = (type(e), e, sys.exc_info()[2]) print(e) finally: #print('task done in', os.getpid(), ':', i, f) q.task_done() sys.stdout.flush()
[docs]def allocate_workers(q, nworker=0, *args, **kwargs): ''' Utility finction to allocate worker threads. Parameters ---------- q: :class:`Queue <Queue.Queue>` instance the jobs queue which will be fed with jobs for processing nworker: int number of worker threads (jobs which will run in parallel). A positive number (1, 2...) will be used as is, 0 means all available CPU cores (see :func:`multiprocessing.cpu_count`), and a negative number means all CPU cores except this given number. args, kwargs: additional arguments will be pased to the jobs function(s) after individual jobs arguments: they are args common to all jobs (if any) Returns ------- workers: list workers list, each is a :class:`thread <threading.Thread>` instance running the worker loop function. Threads are already started (ie. ''' if nworker == 0: nworker = multiprocessing.cpu_count() elif nworker < 0: nworker = multiprocessing.cpu_count() + nworker if nworker < 1: nworker = 1 workers = [] for i in range(nworker): w = threading.Thread(target=worker, args=(q, ) + args, kwargs=kwargs) w.start() workers.append(w) return workers