Source code for scheduler
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from __future__ import print_function
import six
import os
import inspect
import importlib
[docs]class Scheduler(object):
'''
Allow to submit, kill and get the status of jobs.
The Scheduler class is an abstract class which specifies the jobs
management API. It has several implementations, located in
``soma_workflow.schedulers.*_scheduler``.
A scheduler implementation class can be retrived using the global function
:func:`get_scheduler_implementation`, or instantiated using
:func:`build_scheduler`.
New schedulers can be written to support computing resources types that are
currently not supported (a cluster with a DRMS which has no DRMAA
implementation typicalyly). The various methods of the Scheduler API have
to be overloaded in this case.
'''
parallel_job_submission_info = None
logger = None
is_sleeping = None
def __init__(self):
self.parallel_job_submission_info = None
self.is_sleeping = False
def sleep(self):
self.is_sleeping = True
def wake(self):
self.is_sleeping = False
def clean(self):
pass
[docs] def job_submission(self, job):
'''
Submit a Soma-Workflow job
Parameters
----------
job: EngineJob
Job to be submitted
Returns
-------
job_id: string
Job id for the scheduling system (DRMAA for example, or native DRMS
identifier)
'''
raise Exception("Scheduler is an abstract class!")
[docs] def get_job_status(self, scheduler_job_id):
'''
Parameters
----------
scheduler_job_id: string
Job id for the scheduling system (DRMAA for example)
Returns
-------
status: string
Job status as defined in constants.JOB_STATUS
'''
raise Exception("Scheduler is an abstract class!")
[docs] def get_job_exit_info(self, scheduler_job_id):
'''
The exit info consists of 4 values returned in a tuple:
**exit_status**: string
one of the constants.JOB_EXIT_STATUS values
**exit_value**: int
exit code of the command (normally 0 in case of success)
**term_sig**: int
termination signal, 0 IF ok
**resource_usage**: bytes
bytes string in the shape
``b'cpupercent=60 mem=13530kb cput=00:00:12'`` etc. Items may include:
* cpupercent
* cput
* mem
* vmem
* ncpus
* walltime
Parameters
----------
scheduler_job_id: string
Job id for the scheduling system (DRMAA for example)
Returns
-------
exit_info: tuple
exit_status, exit_value, term_sig, resource_usage
'''
raise Exception("Scheduler is an abstract class!")
[docs] def kill_job(self, scheduler_job_id):
'''
Parameters
----------
scheduler_job_id: string
Job id for the scheduling system (DRMAA for example)
'''
raise Exception("Scheduler is an abstract class!")
[docs] @classmethod
def build_scheduler(cls, config):
''' Create a scheduler of the expected type, using configuration to
parameterize it.
Parameters
----------
config: Configuration
configuration object instance
'''
raise Exception("Scheduler is an abstract class!")
[docs]def get_scheduler_implementation(scheduler_type):
''' Get the scheduler class implementation corresponding to the expected
one.
Parameters
----------
scheduler_type: str
scheduler type: 'drmaa', 'drmaa2', 'local_basic', 'mpi', or other
custom scheduler
Returns
-------
scheduler_class: Scheduler subclass
'''
from . import schedulers
if scheduler_type == 'local_basic':
scheduler_type = 'local'
sched_dir = os.path.dirname(schedulers.__file__)
if os.path.exists(os.path.join(sched_dir,
'%s_scheduler.py' % scheduler_type)):
sched_mod = '%s_scheduler' % scheduler_type
# try:
module = importlib.import_module('.%s' % sched_mod,
'soma_workflow.schedulers')
sched_list = []
# if there is a __main_scheduler__, just use it
scheduler = getattr(module, '__main_scheduler__', None)
if scheduler is not None:
return scheduler
for element in six.itervalues(module.__dict__):
if element in sched_list:
continue # avoid duplicates
if inspect.isclass(element) and element is not Scheduler \
and issubclass(element, Scheduler):
sched_list.append(element)
if element.__name__.lower() == ('%sscheduler'
% scheduler_type).lower():
# fully matching
return element
if len(sched_list) == 1:
# unambiguous
return sched_list[0]
if len(sched_list) == 0:
print('Warning: module soma_workflow.schedulers.%s contains '
'no scheduler:' % sched_mod)
else:
print('Warning: module soma_workflow.schedulers.%s contains '
'several schedulers:' % sched_mod)
print([s.__name__ for s in sched_list])
# except ImportError:
raise NameError('scheduler type %s is not found' % scheduler_type)
[docs]def build_scheduler(scheduler_type, config):
''' Create a scheduler of the expected type, using configuration to
parameterize it.
Parameters
----------
scheduler_type: string
type of scheduler to be built
config: Configuration
configuration object
Returns
-------
scheduler: Scheduler
Scheduler instance
'''
scheduler_class = get_scheduler_implementation(scheduler_type)
scheduler = scheduler_class.build_scheduler(config)
return scheduler
[docs]def get_schedulers_list():
'''
List all available installed schedulers
Returns
-------
schedulers: list
schedulers list. Each item is a tuple (name, enabled)
'''
from . import schedulers
dirname = os.path.dirname(schedulers.__file__)
sched_files = os.listdir(dirname)
schedulers = []
for sched_file in sched_files:
if sched_file.endswith('_scheduler.py'):
sched_mod = sched_file[:-3]
enabled = True
try:
module = importlib.import_module('.%s' % sched_mod,
'soma_workflow.schedulers')
except NotImplementedError:
continue # skip not implemented / unfinished ones
except Exception:
enabled = False
if sched_mod == 'local_scheduler':
sched_mod = 'local_basic_scheduler'
sched = sched_mod[:-10]
schedulers.append((sched, enabled))
return schedulers