Source code for soma.mpfork

# -*- coding: utf-8 -*-

'''
Run worker functions in separate processes.

The use case is somewhat similar to what can be done using either :class:`queue.Queue <Queue.Queue>` and threads, or using :mod:`multiprocessing`. That is: run a number of independent job functions, dispatched over a number of worker threads/processes.

The mpfork and mpfork2 modules are useful in cases the above methods cannot work:

* threading in python is globally mainly inefficient because of the GIL.
* :mod:`multiprocessing` makes heavy use of pickles to pass objects and parameters, and there are cases we are using objects which cannot be pickled, or which pickling generates heavy IO traffic (large data objects)

mpfork and mpfork2

The method here is based on the queue / thread schema, but uses fork() to actually execute the workers in a separate process. Only results are passed through pickles, so the worker return results must be picklable, but not the input arguments and objects.

* mpfork forks from a worker thread. This is the most efficient, but fork and thread are not easily mixable: only the current thread is left running in the forked process, all other ones are stopped. However locks left in other threads may block the entire process. In such cases, or if the main thread is running graphical objects and an event loop (like Qt), expect mpfork to lock.
In such a case, please consider using mpfork2 instead.

* mpfork2 forks from the main thread, earlier than mpfork (basically then workers are instantiated), which make it more stable. Compared to mpfork, workers need to be manually started using Worker.start(), and jobs input data need to be transfered via pickles. Hoxever large data may be stored in the calling object before forking.

* in case workers need to perform graphical or GPU renderings, the fork mode of mpfork2 will also not be sufficient, because event loops often need threads and locks, and because graphical resources will not be duplicated and allocated in the forked process. In this situation, mpfork2 offers a "spawn" mode, which doesn't use fork() (and thus might workk on non-unix systems). Here, a new process has to be started, and initialized in the worker process before jobs can be run. This is done by passing `fork_mode="spawn"` to allocate_workers(), and also passing an init function and parameters, using `init_child_function=(function, args, kwargs)`. The function and args need to be picklable.

Use:

* allocate a :class:`Queue <Queue.Queue>`
* allocate a results list with the exact size of the jobs number
* allocate a set of worker threads (typically one per processor or core). The function :func:`allocate_workers` can do this for you.
* fill the queue with jobs, each being a tuple (job_index, function, args, kwargs, results_list)
* add a number of empty jobs (None) to the queue, one per allocated worker: this will be the marker for the end of processing in each worker thread. It is necessary to explicitly add these empty jobs since an empty queue is not the signal for the end of processing: it can be re-filled at any time.
* jobs run in workers
* join the queue
* join the worker threads
* the result list gets the return data for each job

::

    njobs = 10
    q = queue.Queue()
    res = [None] * njobs
    workers = allocate_workers(q, 0) # one per core
    for i in range(njobs):
        job = (i, sum, ((i, i), ), {}, res)
        q.put(job)

    # add as many empty jobs as the workers number to end them
    for i in range(len(workers)):
        q.put(None)

    # wait for every job to complete
    q.join()
    # terminate all threads
    for w in workers:
        w.join()

    print('result:', res)

In case of error, the job result will be an exception with stack information: (exception type, exception instance, stack_info)


Availability: Unix
'''

import multiprocessing
import threading
import os
import tempfile
import sys
import glob
import re
from six.moves import range
try:
    import cpickle as pickle
except ImportError:
    import pickle


_available_cpu = None


[docs] def available_cpu_count(): ''' Available CPU cores for user. Based on func:`multiprocessing.cpu_count`, but tries to also read CPU quotas from /etc/systemd/system/user-*.d ''' global _available_cpu if _available_cpu is not None: return _available_cpu count = multiprocessing.cpu_count() # find CPU quotas sysd_files = glob.glob('/etc/systemd/system/user-*.d/*.conf') found = False for cfile in sysd_files: with open(cfile) as f: for line in f.readlines(): if 'CPUQuota' in line: r = re.match('CPUQuota=(.*)%', line) if r: cpuq = int(int(r.group(1)) / 100) if cpuq < count: count = cpuq found = True break if found: break _available_cpu = count return _available_cpu
[docs] def run_job(f, *args, **kwargs): ''' Internal function, runs the function in a remote process. Uses fork() to perform it. Availability: Unix ''' out_file = tempfile.mkstemp() os.close(out_file[0]) pid = os.fork() if pid != 0: # parent: wait for the child pid, status = os.waitpid(pid, 0) # read output file #print('read from', os.getpid(), ':', out_file[1]) if os.stat(out_file[1]).st_size == 0: # child did not write anything os.unlink(out_file[1]) raise OSError('child did not output anything') if status != 0: os.unlink(out_file[1]) raise RuntimeError('subprocess error: %d' % status) result = pickle.load(open(out_file[1], 'rb')) os.unlink(out_file[1]) # traceback objects cannot be pickled... #if isinstance(result, tuple) and len(result) == 3 \ #and isinstance(result[1], Exception): ## result is an axception with call stack: reraise it #raise result[0], result[1], result[2] if isinstance(result, Exception): raise result return result # child process try: try: #print('exec in', os.getpid(), ':', f, args, kwargs) result = f(*args, **kwargs) #print('OK') except Exception as e: # traceback objects cannot be pickled... #result = (type(e), e, sys.exc_info()[2]) result = e #print('write:', out_file[1], ':', result) try: pickle.dump(result, open(out_file[1], 'wb'), protocol=2) except Exception as e: print('pickle failed:', e, '\nfor object:', type(result)) finally: # sys.exit() is not enough os._exit(0)
[docs] def worker(q, thread_only, *args, **kwargs): ''' Internal function: worker thread loop: * pick a job in the queue q * execute it, either in the current thread, or in a remote process (using run_job()), depending on the thread_only parameter value * store the result in the result list * start again with another job The loop ends when a job in the queue is None. .. warning:: Here we are making use of fork() (Unix only) inside a thread. Some systems do not behave well in this situation. See :func:`the os.fork() doc <os.fork>` ''' while True: item = q.get() if item is None: q.task_done() break try: #print('run item in', os.getpid(), ':', item[:-1]) sys.stdout.flush() i, f, argsi, kwargsi, res = item argsi = argsi + args kwargs = dict(kwargs) kwargs.update(kwargsi) try: if thread_only: try: result = f(*argsi, **kwargs) except Exception as e: result = e else: result = run_job(f, *argsi, **kwargs) #print('result:', i, result) res[i] = result except Exception as e: res[i] = (type(e), e, sys.exc_info()[2]) print(e) finally: #print('task done in', os.getpid(), ':', i, f) q.task_done() sys.stdout.flush()
[docs] def allocate_workers(q, nworker=0, thread_only=False, max_workers=0, *args, **kwargs): ''' Utility function to allocate worker threads. Parameters ---------- q: :class:`Queue <Queue.Queue>` instance the jobs queue which will be fed with jobs for processing thread_only: bool if True, workers will run jobs in the worker thread, not using a separate process. This flag thus allows to choose a threaded or multiprocessing implementation. nworker: int number of worker threads (jobs which will run in parallel). A positive number (1, 2...) will be used as is, 0 means all available CPU cores (see :func:`available_cpu_count`), and a negative number means all CPU cores except this given number. max_workers: int max number of workers: if nworker is 0, the number of CPU cores is used, but might exceed the number of actual jobs to be done. To limit this, you can use the number of jobs (if known) here. 0 means no limit. args, kwargs: additional arguments will be passed to the job function(s) after individual jobs arguments: they are args common to all jobs (if any) Returns ------- workers: list workers list, each is a :class:`thread <threading.Thread>` instance running the worker loop function. Threads are already started (ie. ''' if nworker == 0: nworker = available_cpu_count() elif nworker < 0: nworker = available_cpu_count() + nworker if nworker < 1: nworker = 1 if max_workers > 0 and nworker > max_workers: nworker = max_workers workers = [] for i in range(nworker): w = threading.Thread(target=worker, args=(q, thread_only) + args, kwargs=kwargs) w.start() workers.append(w) return workers