Source code for scheduler
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from __future__ import print_function
import six
import os
import inspect
import importlib
import threading
[docs]class Scheduler(object):
    '''
    Allow to submit, kill and get the status of jobs.
    The Scheduler class is an abstract class which specifies the jobs
    management API. It has several implementations, located in
    ``soma_workflow.schedulers.*_scheduler``.
    A scheduler implementation class can be retrived using the global function
    :func:`get_scheduler_implementation`, or instantiated using
    :func:`build_scheduler`.
    New schedulers can be written to support computing resources types that are
    currently not supported (a cluster with a DRMS which has no DRMAA
    implementation typicalyly). The various methods of the Scheduler API have
    to be overloaded in this case.
    '''
    parallel_job_submission_info = None
    logger = None
    is_sleeping = None
    jobs_finished_event = None
    def __init__(self):
        self.parallel_job_submission_info = None
        self.is_sleeping = False
        self.jobs_finished_event = threading.Event()
    def sleep(self):
        self.is_sleeping = True
    def wake(self):
        self.is_sleeping = False
    def clean(self):
        pass
[docs]    def job_submission(self, jobs):
        '''
        Submit a Soma-Workflow job
        Parameters
        ----------
        jobs: EngineJob or list[EngineJob]
            Job to be submitted
        Returns
        -------
        job_id: list[str]
            Job id for the scheduling system (DRMAA for example, or native DRMS
            identifier).
            If some submissions failed, None is in the list instead of the job
            id.
        '''
        raise Exception("Scheduler is an abstract class!") 
[docs]    def get_job_status(self, scheduler_job_id):
        '''
        Parameters
        ----------
        scheduler_job_id: string
            Job id for the scheduling system (DRMAA for example)
        Returns
        -------
        status: string
            Job status as defined in constants.JOB_STATUS
        '''
        raise Exception("Scheduler is an abstract class!") 
[docs]    def get_job_exit_info(self, scheduler_job_id):
        '''
        The exit info consists of 4 values returned in a tuple:
        **exit_status**: string
            one of the constants.JOB_EXIT_STATUS values
        **exit_value**: int
            exit code of the command (normally 0 in case of success)
        **term_sig**: int
            termination signal, 0 IF ok
        **resource_usage**: bytes
            bytes string in the shape
            ``b'cpupercent=60 mem=13530kb cput=00:00:12'`` etc. Items may include:
            * cpupercent
            * cput
            * mem
            * vmem
            * ncpus
            * walltime
        Parameters
        ----------
        scheduler_job_id: string
            Job id for the scheduling system (DRMAA for example)
        Returns
        -------
        exit_info: tuple
            exit_status, exit_value, term_sig, resource_usage
        '''
        raise Exception("Scheduler is an abstract class!") 
[docs]    def kill_job(self, scheduler_job_id):
        '''
        Parameters
        ----------
        scheduler_job_id: string
            Job id for the scheduling system (DRMAA for example)
        '''
        raise Exception("Scheduler is an abstract class!") 
[docs]    @classmethod
    def build_scheduler(cls, config):
        ''' Create a scheduler of the expected type, using configuration to
        parameterize it.
        Parameters
        ----------
        config: Configuration
            configuration object instance
        '''
        raise Exception("Scheduler is an abstract class!")  
[docs]def get_scheduler_implementation(scheduler_type):
    ''' Get the scheduler class implementation corresponding to the expected
        one.
        Parameters
        ----------
        scheduler_type: str
            scheduler type: 'drmaa', 'drmaa2', 'local_basic', 'mpi', or other
            custom scheduler
        Returns
        -------
        scheduler_class: Scheduler subclass
    '''
    from . import schedulers
    if scheduler_type == 'local_basic':
        scheduler_type = 'local'
    sched_dir = os.path.dirname(schedulers.__file__)
    if os.path.exists(os.path.join(sched_dir,
                                   '%s_scheduler.py' % scheduler_type)):
        sched_mod = '%s_scheduler' % scheduler_type
        # try:
        module = importlib.import_module('.%s' % sched_mod,
                                         'soma_workflow.schedulers')
        sched_list = []
        # if there is a __main_scheduler__, just use it
        scheduler = getattr(module, '__main_scheduler__', None)
        if scheduler is not None:
            return scheduler
        for element in six.itervalues(module.__dict__):
            if element in sched_list:
                continue  # avoid duplicates
            if inspect.isclass(element) and element is not Scheduler \
                    
and issubclass(element, Scheduler):
                sched_list.append(element)
                if element.__name__.lower() == ('%sscheduler'
                                                % scheduler_type).lower():
                    # fully matching
                    return element
        if len(sched_list) == 1:
            # unambiguous
            return sched_list[0]
        if len(sched_list) == 0:
            print('Warning: module soma_workflow.schedulers.%s contains '
                  'no scheduler:' % sched_mod)
        else:
            print('Warning: module soma_workflow.schedulers.%s contains '
                  'several schedulers:' % sched_mod)
            print([s.__name__ for s in sched_list])
        # except ImportError:
    raise NameError('scheduler type %s is not found' % scheduler_type) 
[docs]def build_scheduler(scheduler_type, config):
    ''' Create a scheduler of the expected type, using configuration to
    parameterize it.
    Parameters
    ----------
    scheduler_type: string
        type of scheduler to be built
    config: Configuration
        configuration object
    Returns
    -------
    scheduler: Scheduler
        Scheduler instance
    '''
    scheduler_class = get_scheduler_implementation(scheduler_type)
    scheduler = scheduler_class.build_scheduler(config)
    return scheduler 
[docs]def get_schedulers_list():
    '''
    List all available installed schedulers
    Returns
    -------
    schedulers: list
        schedulers list. Each item is a tuple (name, enabled)
    '''
    from . import schedulers
    dirname = os.path.dirname(schedulers.__file__)
    sched_files = os.listdir(dirname)
    schedulers = []
    for sched_file in sched_files:
        if sched_file.endswith('_scheduler.py'):
            sched_mod = sched_file[:-3]
            enabled = True
            try:
                module = importlib.import_module('.%s' % sched_mod,
                                                 'soma_workflow.schedulers')
            except NotImplementedError:
                continue  # skip not implemented / unfinished ones
            except Exception:
                enabled = False
            if sched_mod == 'local_scheduler':
                sched_mod = 'local_basic_scheduler'
            sched = sched_mod[:-10]
            schedulers.append((sched, enabled))
    return schedulers