Supporting new computing resources: writing a Scheduler implementation

In Soma-Workflow the link between the workflow and jobs API and the computing resource jobs manager is handled through Scheduler subclasses implementations. There are already several in Soma-Workflow: a local scheduler to run on a standard computer (LocalScheduler), a DRMAA implementation which can use several types of DRMS (cluster managers) (DrmaaScheduler), the MPI scheduler (MPIScheduler), and now also a PBS Pro scheduler (since DRMAA does not work any longer with it) (PBSProScheduler).

In case the computing resource you are using is not supported by the existing implementations, it is possible and relatively easy to write a new Scheduler implementation to handle a new type of cluster typically. It is a matter of writing a Scheduler subclass which implements the different methods, mainly 4 methods have to be defined (job_submission, get_job_status, get_job_exit_info, kill_job), plus the build_scheduler class method which just instantiates an instance of the derived class with appropriate parameters.

A good and easy example to look at is the PBSProScheduler implementation which merely runs DRMS commands (qsub, qstat) and does not use complex C/python bindings. This implementation has been written in a few hours, so it is not a huge work.

Scheduler definition

This is the generic, abstract API.

class scheduler.Scheduler[source]

Allow to submit, kill and get the status of jobs.

The Scheduler class is an abstract class which specifies the jobs management API. It has several implementations, located in soma_workflow.schedulers.*_scheduler.

A scheduler implementation class can be retrived using the global function get_scheduler_implementation(), or instantiated using build_scheduler().

New schedulers can be written to support computing resources types that are currently not supported (a cluster with a DRMS which has no DRMAA implementation typicalyly). The various methods of the Scheduler API have to be overloaded in this case.

classmethod build_scheduler(config)[source]

Create a scheduler of the expected type, using configuration to parameterize it.

Parameters:config (Configuration) – configuration object instance
get_job_exit_info(scheduler_job_id)[source]

The exit info consists of 4 values returned in a tuple: exit_status: string

one of the constants.JOB_EXIT_STATUS values
exit_value: int
exit code of the command (normally 0 in case of success)
term_sig: int
termination signal, 0 IF ok
resource_usage: bytes

bytes string in the shape b'cpupercent=60 mem=13530kb cput=00:00:12' etc. Items may include:

  • cpupercent
  • cput
  • mem
  • vmem
  • ncpus
  • walltime
Parameters:scheduler_job_id (string) – Job id for the scheduling system (DRMAA for example)
Returns:exit_info – exit_status, exit_value, term_sig, resource_usage
Return type:tuple
get_job_status(scheduler_job_id)[source]
Parameters:scheduler_job_id (string) – Job id for the scheduling system (DRMAA for example)
Returns:status – Job status as defined in constants.JOB_STATUS
Return type:string
job_submission(job)[source]

Submit a Soma-Workflow job

Parameters:job (EngineJob) – Job to be submitted
Returns:job_id – Job id for the scheduling system (DRMAA for example, or native DRMS identifier)
Return type:string
kill_job(scheduler_job_id)[source]
Parameters:scheduler_job_id (string) – Job id for the scheduling system (DRMAA for example)
scheduler.build_scheduler(scheduler_type, config)[source]

Create a scheduler of the expected type, using configuration to parameterize it.

Parameters:
  • scheduler_type (string) – type of scheduler to be built
  • config (Configuration) – configuration object
Returns:

scheduler – Scheduler instance

Return type:

Scheduler

scheduler.get_scheduler_implementation(scheduler_type)[source]

Get the scheduler class implementation corresponding to the expected one.

Parameters:scheduler_type (str) – scheduler type: ‘drmaa’, ‘drmaa2’, ‘local_basic’, ‘mpi’, or other custom scheduler
Returns:scheduler_class
Return type:Scheduler subclass
scheduler.get_schedulers_list()[source]

List all available installed schedulers

Returns:schedulers – schedulers list. Each item is a tuple (name, enabled)
Return type:list

PBS Pro implementation

The PBS Pro scheduler is not using the DRMAA library, we had to design it because the DRMAA implementation was not working any longer on our cluster. This implementation is a very simple (and probably sub-optimal) wrapping around PBS commandlines (qsub, qstat, qdel). It should also reasonably work using Torque/PBS 2.5 but has not been extensively tested this way (since DRMAA works for it).

To use the PBS Pro scheduler, specify in the server config file:

scheduler_type = pbspro
class soma_workflow.schedulers.pbspro_scheduler.PBSProScheduler(parallel_job_submission_info, tmp_file_path=None, configured_native_spec=None)[source]

Scheduling using a PBS Pro session.

classmethod build_scheduler(config)[source]

Build an instance of PBSProScheduler

get_job_exit_info(scheduler_job_id)[source]

The exit info consists of 4 values returned in a tuple: exit_status: string

one of the constants.JOB_EXIT_STATUS values
exit_value: int
exit code of the command (normally 0 in case of success)
term_sig: int
termination signal, 0 IF ok
resource_usage: unicode

string in the shape 'cpupercent=60 mem=13530kb cput=00:00:12' etc. Items may include:

  • cpupercent
  • cput
  • mem
  • vmem
  • ncpus
  • walltime
Parameters:scheduler_job_id (string) – Job id for the scheduling system (DRMAA for example)
Returns:exit_info – exit_status, exit_value, term_sig, resource_usage
Return type:tuple
get_job_extended_status(scheduler_job_id)[source]

Get job full status in a dictionary (from qstat in json format)

get_job_status(scheduler_job_id)[source]
Parameters:scheduler_job_id (string) – Job id for the scheduling system (DRMAA for example)
Returns:status – Job status as defined in constants.JOB_STATUS
Return type:string
get_pbs_version()[source]

get PBS implementation and version. May be PBS Pro or Torque/PBS

Returns:pbs_version – (implementation, version)
Return type:tuple (2 strings)
job_submission(job)[source]
Parameters:job (soma_workflow.client.Job) – job to be submitted
Returns:job_id – job id
Return type:string
kill_job(scheduler_job_id)[source]
Parameters:scheduler_job_id (string) – Job id for the scheduling system (DRMAA for example)
setup_pbs_variant()[source]

determine PBS implementation / version and set internal behaviour accordingly

submit_simple_test_job(outstr, out_o_file, out_e_file)[source]

Create a job to test

wait_job(job_id, timeout=0, poll_interval=0.3)[source]

Wait for a specific job to be terminated.

Local implementation

To use the local scheduler, specify in the server config file:

scheduler_type = local_basic
class soma_workflow.schedulers.local_scheduler.ConfiguredLocalScheduler(config)[source]

Local scheduler synchronized with a configuration object.

  • config LocalSchedulerCfg
class soma_workflow.schedulers.local_scheduler.LocalScheduler(proc_nb=0, interval=1, max_proc_nb=0)[source]

Allow to submit, kill and get the status of jobs. Run on one machine without dependencies.

  • _proc_nb int
  • _queue list of scheduler jobs ids
  • _jobs dictionary job_id -> soma_workflow.engine_types.EngineJob
  • _processes dictionary job_id -> subprocess.Popen
  • _status dictionary job_id -> job status as defined in constants
  • _exit_info * dictionay job_id -> exit info*
  • _loop thread
  • _interval int
  • _lock threading.RLock
static create_process(engine_job)[source]
  • engine_job EngineJob
  • returns: Subprocess process
get_job_exit_info(scheduler_job_id)[source]
  • scheduler_job_id string
    Job id for the scheduling system (DRMAA for example)
  • return: tuple
    exit_status, exit_value, term_sig, resource_usage
get_job_status(scheduler_job_id)[source]
  • scheduler_job_id string
    Job id for the scheduling system (DRMAA for example)
  • return: string
    Job status as defined in constants.JOB_STATUS
job_submission(job)[source]
  • job EngineJob
  • return: string
    Job id for the scheduling system (DRMAA for example)
kill_job(scheduler_job_id)[source]
  • scheduler_job_id string
    Job id for the scheduling system (DRMAA for example)
soma_workflow.schedulers.local_scheduler.kill_process_tree(pid)[source]

Kill a process with its children. Needs psutil to get children list

DRMAA implementation

To use the DRMAA scheduler, specify in the server config file (this is the default on a server actually so it is not mandatory to specify it):

scheduler_type = drmaa

Note that the DRMAA implementation only supports the DRMAA1 API, not DRMAA2. A work has been started to build a DRMAA2 implementation, but has been stopped because we did not find a working DRMAA2 C implementation for our PBSPro cluster.

MPI implementation

class soma_workflow.schedulers.mpi_scheduler.MPIScheduler(communicator, interval=1, nb_attempt_per_job=1)[source]

Allow to submit, kill and get the status of jobs.

get_job_exit_info(scheduler_job_id)[source]
  • scheduler_job_id string

Job id for the scheduling system (DRMAA for example) * return: tuple exit_status, exit_value, term_sig, resource_usage

get_job_status(scheduler_job_id)[source]
  • scheduler_job_id string

Job id for the scheduling system (DRMAA for example) * return: string Job status as defined in constants.JOB_STATUS

job_submission(job)[source]
  • job EngineJob
  • return: string

Job id for the scheduling system (DRMAA for example)

kill_job(scheduler_job_id)[source]
  • scheduler_job_id string

Job id for the scheduling system (DRMAA for example)