Supporting new computing resources: writing a Scheduler implementation¶
In Soma-Workflow the link between the workflow and jobs API and the computing resource jobs manager is handled through Scheduler
subclasses implementations. There are already several in Soma-Workflow: a local scheduler to run on a standard computer (LocalScheduler
), a DRMAA implementation which can use several types of DRMS (cluster managers) (DrmaaScheduler
), the MPI scheduler (MPIScheduler
), and now also a PBS Pro scheduler (since DRMAA does not work any longer with it) (PBSProScheduler
).
In case the computing resource you are using is not supported by the existing implementations, it is possible and relatively easy to write a new Scheduler
implementation to handle a new type of cluster typically. It is a matter of writing a Scheduler
subclass which implements the different methods, mainly 4 methods have to be defined (job_submission, get_job_status, get_job_exit_info, kill_job), plus the build_scheduler class method which just instantiates an instance of the derived class with appropriate parameters.
A good and easy example to look at is the PBSProScheduler
implementation which merely runs DRMS commands (qsub
, qstat
) and does not use complex C/python bindings. This implementation has been written in a few hours, so it is not a huge work.
Scheduler definition¶
This is the generic, abstract API.
- class scheduler.Scheduler[source]¶
Allow to submit, kill and get the status of jobs.
The Scheduler class is an abstract class which specifies the jobs management API. It has several implementations, located in
soma_workflow.schedulers.*_scheduler
.A scheduler implementation class can be retrived using the global function
get_scheduler_implementation()
, or instantiated usingbuild_scheduler()
.New schedulers can be written to support computing resources types that are currently not supported (a cluster with a DRMS which has no DRMAA implementation typicalyly). The various methods of the Scheduler API have to be overloaded in this case.
- classmethod build_scheduler(config)[source]¶
Create a scheduler of the expected type, using configuration to parameterize it.
- Parameters:
config (Configuration) – configuration object instance
- get_job_exit_info(scheduler_job_id)[source]¶
The exit info consists of 4 values returned in a tuple: exit_status: string
one of the constants.JOB_EXIT_STATUS values
- exit_value: int
exit code of the command (normally 0 in case of success)
- term_sig: int
termination signal, 0 IF ok
- resource_usage: bytes
bytes string in the shape
b'cpupercent=60 mem=13530kb cput=00:00:12'
etc. Items may include:cpupercent
cput
mem
vmem
ncpus
walltime
- Parameters:
scheduler_job_id (string) – Job id for the scheduling system (DRMAA for example)
- Returns:
exit_info – exit_status, exit_value, term_sig, resource_usage
- Return type:
- get_job_status(scheduler_job_id)[source]¶
- Parameters:
scheduler_job_id (string) – Job id for the scheduling system (DRMAA for example)
- Returns:
status – Job status as defined in constants.JOB_STATUS
- Return type:
string
- scheduler.build_scheduler(scheduler_type, config)[source]¶
Create a scheduler of the expected type, using configuration to parameterize it.
- Parameters:
scheduler_type (string) – type of scheduler to be built
config (Configuration) – configuration object
- Returns:
scheduler – Scheduler instance
- Return type:
- scheduler.get_scheduler_implementation(scheduler_type)[source]¶
Get the scheduler class implementation corresponding to the expected one.
- Parameters:
scheduler_type (str) – scheduler type: ‘drmaa’, ‘drmaa2’, ‘local_basic’, ‘mpi’, or other custom scheduler
- Returns:
scheduler_class
- Return type:
Scheduler subclass
PBS Pro implementation¶
The PBS Pro scheduler is not using the DRMAA library, we had to design it because the DRMAA implementation was not working any longer on our cluster. This implementation is a very simple (and probably sub-optimal) wrapping around PBS commandlines (qsub
, qstat
, qdel
). It should also reasonably work using Torque/PBS 2.5 but has not been extensively tested this way (since DRMAA works for it).
To use the PBS Pro scheduler, specify in the server config file:
scheduler_type = pbspro
- class soma_workflow.schedulers.pbspro_scheduler.PBSProScheduler(parallel_job_submission_info, tmp_file_path=None, configured_native_spec=None)[source]¶
Scheduling using a PBS Pro session.
- get_job_exit_info(scheduler_job_id)[source]¶
The exit info consists of 4 values returned in a tuple: exit_status: string
one of the constants.JOB_EXIT_STATUS values
- exit_value: int
exit code of the command (normally 0 in case of success)
- term_sig: int
termination signal, 0 IF ok
- resource_usage: unicode
string in the shape
'cpupercent=60 mem=13530kb cput=00:00:12'
etc. Items may include:cpupercent
cput
mem
vmem
ncpus
walltime
- Parameters:
scheduler_job_id (string) – Job id for the scheduling system (DRMAA for example)
- Returns:
exit_info – exit_status, exit_value, term_sig, resource_usage
- Return type:
- get_job_extended_status(scheduler_job_id)[source]¶
Get job full status in a dictionary (from qstat in json format)
- get_job_status(scheduler_job_id)[source]¶
- Parameters:
scheduler_job_id (string) – Job id for the scheduling system (DRMAA for example)
- Returns:
status – Job status as defined in constants.JOB_STATUS
- Return type:
string
- get_pbs_version()[source]¶
get PBS implementation and version. May be PBS Pro or Torque/PBS
- Returns:
pbs_version – (implementation, version)
- Return type:
tuple (2 strings)
- job_submission(jobs, signal_end=True)[source]¶
- Parameters:
jobs (soma_workflow.client.Job) – job to be submitted
signal_end (bool) –
- Returns:
job_id – job id
- Return type:
string
- kill_job(scheduler_job_id)[source]¶
- Parameters:
scheduler_job_id (string) – Job id for the scheduling system (DRMAA for example)
- static out_of_container_command()[source]¶
In case this server is running inside a contaner, qsat/qsub commands are not available inside the container but are on the host. We need to get out of the container in such a situation.
Local implementation¶
To use the local scheduler, specify in the server config file:
scheduler_type = local_basic
- class soma_workflow.schedulers.local_scheduler.ConfiguredLocalScheduler(config)[source]¶
Local scheduler synchronized with a configuration object.
config LocalSchedulerCfg
- class soma_workflow.schedulers.local_scheduler.LocalScheduler(proc_nb=0, interval=0.05, max_proc_nb=0)[source]¶
Allow to submit, kill and get the status of jobs. Run on one machine without dependencies.
_proc_nb int
_queue list of scheduler jobs ids
_jobs dictionary job_id -> soma_workflow.engine_types.EngineJob
_processes dictionary job_id -> subprocess.Popen
_status dictionary job_id -> job status as defined in constants
_exit_info * dictionay job_id -> exit info*
_loop thread
_interval int
_lock threading.RLock
- get_job_exit_info(scheduler_job_id)[source]¶
This function is called only once per job by the engine, thus it also deletes references to the job in internal tables.
- scheduler_job_id string
Job id for the scheduling system (DRMAA for example)
- return: tuple
exit_status, exit_value, term_sig, resource_usage
- get_job_status(scheduler_job_id)[source]¶
- scheduler_job_id string
Job id for the scheduling system (DRMAA for example)
- return: string
Job status as defined in constants.JOB_STATUS
DRMAA implementation¶
To use the DRMAA scheduler, specify in the server config file (this is the default on a server actually so it is not mandatory to specify it):
scheduler_type = drmaa
Note that the DRMAA implementation only supports the DRMAA1 API, not DRMAA2. A work has been started to build a DRMAA2 implementation, but has been stopped because we did not find a working DRMAA2 C implementation for our PBSPro cluster.
MPI implementation¶
- class soma_workflow.schedulers.mpi_scheduler.MPIScheduler(communicator, interval=0.01, nb_attempt_per_job=1)[source]¶
Allow to submit, kill and get the status of jobs.
- classmethod build_scheduler(config)[source]¶
Create a scheduler of the expected type, using configuration to parameterize it.
- Parameters:
config (Configuration) – configuration object instance
- get_job_exit_info(scheduler_job_id)[source]¶
scheduler_job_id string
Job id for the scheduling system (DRMAA for example) * return: tuple exit_status, exit_value, term_sig, resource_usage
- get_job_status(scheduler_job_id)[source]¶
scheduler_job_id string
Job id for the scheduling system (DRMAA for example) * return: string Job status as defined in constants.JOB_STATUS