Supporting new computing resources: writing a Scheduler implementation

In Soma-Workflow the link between the workflow and jobs API and the computing resource jobs manager is handled through Scheduler subclasses implementations. There are already several in Soma-Workflow: a local scheduler to run on a standard computer (LocalScheduler), a DRMAA implementation which can use several types of DRMS (cluster managers) (DrmaaScheduler), the MPI scheduler (MPIScheduler), and now also a PBS Pro scheduler (since DRMAA does not work any longer with it) (PBSProScheduler).

In case the computing resource you are using is not supported by the existing implementations, it is possible and relatively easy to write a new Scheduler implementation to handle a new type of cluster typically. It is a matter of writing a Scheduler subclass which implements the different methods, mainly 4 methods have to be defined (job_submission, get_job_status, get_job_exit_info, kill_job), plus the build_scheduler class method which just instantiates an instance of the derived class with appropriate parameters.

A good and easy example to look at is the PBSProScheduler implementation which merely runs DRMS commands (qsub, qstat) and does not use complex C/python bindings. This implementation has been written in a few hours, so it is not a huge work.

Scheduler definition

This is the generic, abstract API.

class scheduler.Scheduler[source]

Allow to submit, kill and get the status of jobs.

The Scheduler class is an abstract class which specifies the jobs management API. It has several implementations, located in soma_workflow.schedulers.*_scheduler.

A scheduler implementation class can be retrived using the global function get_scheduler_implementation(), or instantiated using build_scheduler().

New schedulers can be written to support computing resources types that are currently not supported (a cluster with a DRMS which has no DRMAA implementation typicalyly). The various methods of the Scheduler API have to be overloaded in this case.

classmethod build_scheduler(config)[source]

Create a scheduler of the expected type, using configuration to parameterize it.

Parameters:

config (Configuration) – configuration object instance

get_job_exit_info(scheduler_job_id)[source]

The exit info consists of 4 values returned in a tuple: exit_status: string

one of the constants.JOB_EXIT_STATUS values

exit_value: int

exit code of the command (normally 0 in case of success)

term_sig: int

termination signal, 0 IF ok

resource_usage: bytes

bytes string in the shape b'cpupercent=60 mem=13530kb cput=00:00:12' etc. Items may include:

  • cpupercent

  • cput

  • mem

  • vmem

  • ncpus

  • walltime

Parameters:

scheduler_job_id (string) – Job id for the scheduling system (DRMAA for example)

Returns:

exit_info – exit_status, exit_value, term_sig, resource_usage

Return type:

tuple

get_job_status(scheduler_job_id)[source]
Parameters:

scheduler_job_id (string) – Job id for the scheduling system (DRMAA for example)

Returns:

status – Job status as defined in constants.JOB_STATUS

Return type:

string

job_submission(jobs)[source]

Submit a Soma-Workflow job

Parameters:

jobs (EngineJob or list[EngineJob]) – Job to be submitted

Returns:

job_id – Job id for the scheduling system (DRMAA for example, or native DRMS identifier). If some submissions failed, None is in the list instead of the job id.

Return type:

list[str]

kill_job(scheduler_job_id)[source]
Parameters:

scheduler_job_id (string) – Job id for the scheduling system (DRMAA for example)

scheduler.build_scheduler(scheduler_type, config)[source]

Create a scheduler of the expected type, using configuration to parameterize it.

Parameters:
  • scheduler_type (string) – type of scheduler to be built

  • config (Configuration) – configuration object

Returns:

scheduler – Scheduler instance

Return type:

Scheduler

scheduler.get_scheduler_implementation(scheduler_type)[source]

Get the scheduler class implementation corresponding to the expected one.

Parameters:

scheduler_type (str) – scheduler type: ‘drmaa’, ‘drmaa2’, ‘local_basic’, ‘mpi’, or other custom scheduler

Returns:

scheduler_class

Return type:

Scheduler subclass

scheduler.get_schedulers_list()[source]

List all available installed schedulers

Returns:

schedulers – schedulers list. Each item is a tuple (name, enabled)

Return type:

list

PBS Pro implementation

The PBS Pro scheduler is not using the DRMAA library, we had to design it because the DRMAA implementation was not working any longer on our cluster. This implementation is a very simple (and probably sub-optimal) wrapping around PBS commandlines (qsub, qstat, qdel). It should also reasonably work using Torque/PBS 2.5 but has not been extensively tested this way (since DRMAA works for it).

To use the PBS Pro scheduler, specify in the server config file:

scheduler_type = pbspro
class soma_workflow.schedulers.pbspro_scheduler.PBSProScheduler(parallel_job_submission_info, tmp_file_path=None, configured_native_spec=None)[source]

Scheduling using a PBS Pro session.

classmethod build_scheduler(config)[source]

Build an instance of PBSProScheduler

get_job_exit_info(scheduler_job_id)[source]

The exit info consists of 4 values returned in a tuple: exit_status: string

one of the constants.JOB_EXIT_STATUS values

exit_value: int

exit code of the command (normally 0 in case of success)

term_sig: int

termination signal, 0 IF ok

resource_usage: unicode

string in the shape 'cpupercent=60 mem=13530kb cput=00:00:12' etc. Items may include:

  • cpupercent

  • cput

  • mem

  • vmem

  • ncpus

  • walltime

Parameters:

scheduler_job_id (string) – Job id for the scheduling system (DRMAA for example)

Returns:

exit_info – exit_status, exit_value, term_sig, resource_usage

Return type:

tuple

get_job_extended_status(scheduler_job_id)[source]

Get job full status in a dictionary (from qstat in json format)

get_job_status(scheduler_job_id)[source]
Parameters:

scheduler_job_id (string) – Job id for the scheduling system (DRMAA for example)

Returns:

status – Job status as defined in constants.JOB_STATUS

Return type:

string

get_pbs_version()[source]

get PBS implementation and version. May be PBS Pro or Torque/PBS

Returns:

pbs_version – (implementation, version)

Return type:

tuple (2 strings)

job_submission(jobs, signal_end=True)[source]
Parameters:
  • jobs (soma_workflow.client.Job) – job to be submitted

  • signal_end (bool) –

Returns:

job_id – job id

Return type:

string

kill_job(scheduler_job_id)[source]
Parameters:

scheduler_job_id (string) – Job id for the scheduling system (DRMAA for example)

static out_of_container_command()[source]

In case this server is running inside a contaner, qsat/qsub commands are not available inside the container but are on the host. We need to get out of the container in such a situation.

setup_pbs_variant()[source]

determine PBS implementation / version and set internal behaviour accordingly

submit_simple_test_job(outstr, out_o_file, out_e_file)[source]

Create a job to test

wait_job(job_id, timeout=0, poll_interval=0.3)[source]

Wait for a specific job to be terminated.

Local implementation

To use the local scheduler, specify in the server config file:

scheduler_type = local_basic
class soma_workflow.schedulers.local_scheduler.ConfiguredLocalScheduler(config)[source]

Local scheduler synchronized with a configuration object.

  • config LocalSchedulerCfg

classmethod build_scheduler(config)[source]

Create a scheduler of the expected type, using configuration to parameterize it.

Parameters:

config (Configuration) – configuration object instance

class soma_workflow.schedulers.local_scheduler.LocalScheduler(proc_nb=0, interval=0.05, max_proc_nb=0)[source]

Allow to submit, kill and get the status of jobs. Run on one machine without dependencies.

  • _proc_nb int

  • _queue list of scheduler jobs ids

  • _jobs dictionary job_id -> soma_workflow.engine_types.EngineJob

  • _processes dictionary job_id -> subprocess.Popen

  • _status dictionary job_id -> job status as defined in constants

  • _exit_info * dictionay job_id -> exit info*

  • _loop thread

  • _interval int

  • _lock threading.RLock

static create_process(engine_job)[source]
  • engine_job EngineJob

  • returns: Subprocess process

get_job_exit_info(scheduler_job_id)[source]

This function is called only once per job by the engine, thus it also deletes references to the job in internal tables.

  • scheduler_job_id string

    Job id for the scheduling system (DRMAA for example)

  • return: tuple

    exit_status, exit_value, term_sig, resource_usage

get_job_status(scheduler_job_id)[source]
  • scheduler_job_id string

    Job id for the scheduling system (DRMAA for example)

  • return: string

    Job status as defined in constants.JOB_STATUS

job_submission(jobs)[source]
  • job EngineJob

  • return: string

    Job id for the scheduling system (DRMAA for example)

kill_job(scheduler_job_id)[source]
  • scheduler_job_id string

    Job id for the scheduling system (DRMAA for example)

exception soma_workflow.schedulers.local_scheduler.LocalSchedulerError[source]
soma_workflow.schedulers.local_scheduler.kill_process_tree(pid)[source]

Kill a process with its children. Needs psutil to get children list

DRMAA implementation

To use the DRMAA scheduler, specify in the server config file (this is the default on a server actually so it is not mandatory to specify it):

scheduler_type = drmaa

Note that the DRMAA implementation only supports the DRMAA1 API, not DRMAA2. A work has been started to build a DRMAA2 implementation, but has been stopped because we did not find a working DRMAA2 C implementation for our PBSPro cluster.

MPI implementation

class soma_workflow.schedulers.mpi_scheduler.MPIScheduler(communicator, interval=0.01, nb_attempt_per_job=1)[source]

Allow to submit, kill and get the status of jobs.

classmethod build_scheduler(config)[source]

Create a scheduler of the expected type, using configuration to parameterize it.

Parameters:

config (Configuration) – configuration object instance

get_job_exit_info(scheduler_job_id)[source]
  • scheduler_job_id string

Job id for the scheduling system (DRMAA for example) * return: tuple exit_status, exit_value, term_sig, resource_usage

get_job_status(scheduler_job_id)[source]
  • scheduler_job_id string

Job id for the scheduling system (DRMAA for example) * return: string Job status as defined in constants.JOB_STATUS

job_submission(jobs)[source]
  • job EngineJob

  • return: string

Job id for the scheduling system (DRMAA for example)

kill_job(scheduler_job_id)[source]
  • scheduler_job_id string

Job id for the scheduling system (DRMAA for example)