Python API

The submission, monitoring and control of Workflow, Job and FileTransfer can be done through the client.WorkflowController interface.

This page presents the documentation of the client.WorkflowController class.

class client.WorkflowController(object)[source]

Submission, control and monitoring of Job, FileTransfer and Workflow objects.

Sets up the connection to the computing resource. Looks for a soma-workflow configuration file (if not specified in the config argument).

Note

The login and password are only required for a remote computing resource.

Parameters:
  • resource_id (str) – Identifier of the computing resource to connect to. If None, the number of cpu of the current machine is detected and the basic scheduler is lauched.

  • login (str) – Required if the computing resource is remote.

  • password (str) – Required if the computing resource is remote and not RSA key where configured to log on the remote machine with ssh.

  • config (configuration.Configuration) – Optional configuration.

  • rsa_key_pass (str) – Required if the RSA key is protected with a password.

  • isolated_light_mode (None, str, or True) – if not None, work in a custom soma-workflow directory (database, transfers, temporary files…). If the isolated_light_mode parameter value is True, then generate a temporary directory for that. Otherwise the parameter should be a directory name which will be used instead of the default one. If None, a class-wide variable WorkflowController.isolated_light_mode is used instread, so you can set it on the class to allow isolated mode globally.

Setup the connection with the computing resource

WorkflowController.__init__(resource_id=None, login=None, password=None, config=None, rsa_key_pass=None, isolated_light_mode=None)[source]

Sets up the connection to the computing resource. Looks for a soma-workflow configuration file (if not specified in the config argument).

Note

The login and password are only required for a remote computing resource.

Parameters:
  • resource_id (str) – Identifier of the computing resource to connect to. If None, the number of cpu of the current machine is detected and the basic scheduler is lauched.

  • login (str) – Required if the computing resource is remote.

  • password (str) – Required if the computing resource is remote and not RSA key where configured to log on the remote machine with ssh.

  • config (configuration.Configuration) – Optional configuration.

  • rsa_key_pass (str) – Required if the RSA key is protected with a password.

  • isolated_light_mode (None, str, or True) – if not None, work in a custom soma-workflow directory (database, transfers, temporary files…). If the isolated_light_mode parameter value is True, then generate a temporary directory for that. Otherwise the parameter should be a directory name which will be used instead of the default one. If None, a class-wide variable WorkflowController.isolated_light_mode is used instread, so you can set it on the class to allow isolated mode globally.

Submission / Registration

WorkflowController.submit_workflow(workflow, expiration_date=None, name=None, queue=None)[source]

Submits a workflow and returns a workflow identifier.

Raises WorkflowError or JobError if the workflow is not correct.

Parameters:
  • workflow (client.Workflow) – Workflow description.

  • expiration_date (datetime.datetime) – After this date the workflow will be deleted.

  • name (str) – Optional workflow name.

  • queue (str) – Optional name of the queue where to submit jobs. If it is not specified the jobs will be submitted to the default queue.

Returns:

Workflow_identifier

Return type:

int

WorkflowController.register_transfer(file_transfer)[source]

Registers a file transfer which is not part of a workflow and returns a file transfer identifier.

Parameters:

file_transfer (client.FileTransfer) –

Returns:

transfer

Return type:

EngineTransfer

User’s Workflows, Jobs, and FileTransfers retrieval

WorkflowController.workflow(workflow_id)[source]

Raises UnknownObjectError if the workflow_id is not valid

Parameters:

workflow_id (workflow_identifier) –

Return type:

Workflow

WorkflowController.workflows(workflow_ids=None)[source]

Lists the identifiers and general information about all the workflows submitted by the user, or about the workflows specified in the workflow_ids argument.

Parameters:

workflow_ids (sequence of workflow identifiers) –

Returns:

workflows – workflow_id -> (workflow_name, expiration_date)

Return type:

dictionary: workflow identifier -> tuple(date, string)

WorkflowController.jobs(job_ids=None)[source]

Lists the identifiers and general information about all the jobs submitted by the user and which are not part of a workflow, or about the jobs specified in the job_ids argument.

Parameters:

job_ids (sequence of job identifiers) –

Returns:

jobs – job_id -> (name, command, submission date)

Return type:

dictionary: job identifiers -> tuple(string, string, date)

WorkflowController.transfers(transfer_ids=None)[source]

Lists the identifiers and information about all the user’s file transfers which are not part of a workflow or about the file transfers specified in the transfer_ids argument.

Parameters:

transfer_ids (sequence of FileTransfer identifiers) –

Returns:

transfers

transfer_id -> (
  • client_path: client file or directory path

  • expiration_date: after this date the file copied on the computing resource and all the transfer information will be deleted, unless an existing job has declared this file as output or input.

  • client_paths: sequence of file or directory path or None)

Return type:

dictionary: str -> tuple(str, date, None or sequence of str)

Monitoring

Workflows

WorkflowController.workflow_status(workflow_id)[source]

Raises UnknownObjectError if the workflow_id is not valid

Parameters:

workflow_id (workflow identifier) –

Returns:

status – Status of the workflow: see Workflow status or the constants.WORKFLOW_STATUS list.

Return type:

str or None

WorkflowController.workflow_elements_status(workflow_id, with_drms_id=True)[source]

Gets back the status of all the workflow elements at once, minimizing the communication with the server and request to the database. TO DO => make it more user friendly.

Note: in Soma-Workflow 3.0, the last job info (drmaa_id) has been added to job status tuple.

Parameters:
  • workflow_id (workflow_identifier) –

  • with_drms_id (bool (optional, default=True)) – if True the DRMS id (drmaa_id) is also included in the returned tuple for each job. This info has been added in soma_workflow 3.0 and is thus optional to avoid breaking compatibility with earlier versions.

Returns:

  • status (tuple:) –

    • sequence of tuple
      (job_id, status, queue, exit_info,

      (submission_date, execution_date, ending_date, drmaa_id), [drms_id]),

    • sequence of tuple
      (transfer_id, (status, progression_info, engine_path,

      client_path, client_paths)),

    • workflow_status,

    • workflow_queue,

    • sequence of tuple (temp_path_id, engine_path, status)

  • Raises *UnknownObjectError if the workflow_id is not valid*

Jobs

WorkflowController.job_status(job_id)[source]
Parameters:

job_id (job identifier) –

Returns:

  • status (str) – Status of the job: see Job status or the list constants.JOB_STATUS.

  • Raises *UnknownObjectError if the job_id is not valid*

WorkflowController.job_termination_status(job_id)[source]

Information related to the end of the job.

Parameters:

job_id (job identifier) –

Returns:

  • status (tuple(str, int or None, str or None, str) or None) –

    • exit status: status of the terminated job: see Job exit status or the constants.JOB_EXIT_STATUS list.

    • exit value: operating system exit code of the job if the job terminated normally.

    • terminating signal: representation of the signal that caused the termination of the job if the job terminated due to the receipt of a signal.

    • resource usage: resource usage information provided as an array of strings where each string complies with the format <name>=<value>. The information provided depends on the DRMS and DRMAA implementation.

  • Raises *UnknownObjectError if the job_id is not valid*

WorkflowController.retrieve_job_stdouterr(job_id, stdout_file_path, stderr_file_path=None, buffer_size=262144)[source]

Copies the job standard output and error to specified file.

Raises UnknownObjectError if the job_id is not valid

Parameters:
  • job_id (job identifier) –

  • stdout_file_path (str) – Path of the file where to copy the standard output.

  • stderr_file_path (str) – Path of the file where to copy the standard error.

  • buffer_size (int) – The file is transfered piece by piece of size buffer_size.

File Transfers

WorkflowController.transfer_status(transfer_id)[source]

File transfer status and information related to the transfer progress.

Parameters:

transfer_id (transfer identifier) –

Returns:

  • status (tuple(transfer_status or None, tuple or None)) –

    • Status of the file transfer : see File Transfer status or the constants.FILE_TRANSFER_STATUS list.

    • None if the transfer status in not constants.TRANSFERING_FROM_CLIENT_TO_CR or constants.TRANSFERING_FROM_CR_TO_CLIENT. tuple (file size, size already transfered) if it is a file transfer. tuple (cumulated size, sequence of tuple (relative_path, file_size, size already transfered) if it is a directory transfer.

  • Raises *UnknownObjectError if the transfer_id is not valid*

Control

Workflows

WorkflowController.stop_workflow(workflow_id)[source]

Stops a workflow. The running jobs will be killed. The jobs in queues will be removed from queues. It will be possible to restart the workflow afterwards.

Returns:

success – returns True if the running jobs were killed and False if some jobs are possibly still running on the computing resource despite the workflow was stopped.

Return type:

bool

WorkflowController.restart_workflow(workflow_id, queue=None)[source]

Restarts the jobs of the workflow which failed. The jobs will be submitted again. The workflow status has to be constants.WORKFLOW_DONE.

Parameters:
  • workflow_id (workflow identifier) –

  • queue (str) – Optional name of the queue where to submit jobs. If it is not specified the jobs will be submitted to the default queue.

Returns:

  • success (bool) – True if some jobs were restarted.

  • Raises *UnknownObjectError if the workflow_id is not valid*

WorkflowController.delete_workflow(workflow_id, force=True)[source]

Deletes the workflow and all its associated elements (FileTransfers and Jobs). The worklfow_id will become invalid and can not be used anymore. The workflow jobs which are running will be killed. If force is set to True: the call will block until the workflow is deleted. With force set to True, if the workflow can not be deleted properly it is deleted from Soma-workflow database. However, if some jobs are still running they are not be killed. In this case the return value is False.

Parameters:
  • workflow_id (workflow_identifier) –

  • force (bool) –

Returns:

  • success (bool)

  • Raises *UnknownObjectError if the workflow_id is not valid*

WorkflowController.change_workflow_expiration_date(workflow_id, new_expiration_date)[source]

Sets a new expiration date for the workflow.

Parameters:
Returns:

  • success (bool) – True if the expiration date was changed.

  • Raises *UnknownObjectError if the workflow_id is not valid*

WorkflowController.wait_workflow(workflow_id, timeout=-1)[source]

Waits for the specified workflow to finish.

Raises UnknownObjectError if the job_id is not valid

Parameters:
  • workflow_id (workflow identifier) – Jobs to wait for.

  • timeout (int) – The call to wait_job exits before timeout seconds. A negative value means that the method will wait indefinetely.

Jobs

WorkflowController.wait_job(job_ids, timeout=-1)[source]

Waits for all the specified jobs to finish.

Raises UnknownObjectError if the job_id is not valid

Parameters:
  • job_ids (sequence of job identifier) – Jobs to wait for.

  • timeout (int) – The call to wait_job exits before timeout seconds. A negative value means that the method will wait indefinetely.

WorkflowController.restart_jobs(workflow_id, job_ids)[source]

File Transfers

WorkflowController.transfer_files(transfer_ids, buffer_size=262144)[source]

Transfer file(s) associated to the transfer_id. If the files are only located on the client side (that is the transfer status is constants.FILES_ON_CLIENT) the file(s) will be transfered from the client to the computing resource. If the files are located on the computing resource side (that is the transfer status is constants.FILES_ON_CR or constants.FILES_ON_CLIENT_AND_CR) the files will be transfered from the computing resource to the client.

Parameters:
  • transfer_id (FileTransfer identifier) –

  • buffer_size (int) – Depending on the transfer method, the files can be transfered piece by piece. The size of each piece can be tuned using the buffer_size argument.

Returns:

  • success (bool) – The transfer was done. (TBI right error management)

  • Raises *UnknownObjectError if the transfer_id is not valid*

WorkflowController.delete_transfer(transfer_id)[source]

Deletes the FileTransfer and the associated files and directories on the computing resource side. The transfer_id will become invalid and can not be used anymore. If some jobs reference the FileTransfer as an input or an output the FileTransfer will not be deleted immediately but as soon as these jobs will be deleted.

Raises UnknownObjectError if the transfer_id is not valid

Helper

static Helper.list_failed_jobs(workflow_id, wf_ctrl, include_aborted_jobs=False, include_user_killed_jobs=False, include_statuses=None)[source]

To spot the problematic jobs in a workflow.

Parameters:
  • workflow_id (workflow identifier) –

  • include_aborted_jobs (bool) – Include the jobs which exit status is constants.EXIT_ABORTED and constants.EXIT_NOTRUN

  • include_user_killed_jobs (bool) – Include the jobs which exit status is constants.USER_KILLED

  • include_statuses (sequence or None) – Get failed jobs with exit status in this list/set. Ignore include_aborted_jobs and include_user_killed_jobs parameters.

Returns:

jobs – Returns the list of id of job which status is constants.FAILED or which exit value is not 0.

Return type:

list of job identifier

static Helper.delete_all_workflows(wf_ctrl, force=True)[source]

Delete all the workflows. If force is set to True: the call will block until the workflows are deleted. With force set to True, if a workflow can not be deleted properly it is deleted from Soma-workflow database. However, if some jobs are still running they will not be killed. In this case the return value is False.

Parameters:
Returns:

success

Return type:

bool

static Helper.wait_workflow(workflow_id, wf_ctrl)[source]

Waits for workflow execution to end.

Parameters:
static Helper.transfer_input_files(workflow_id, wf_ctrl, buffer_size=262144)[source]

Transfers all the input files of a workflow.

Parameters:
  • workflow_id (workflow identifier) –

  • wf_ctrl (client.WorkflowController) –

  • buffer_size (int) – Depending on the transfer method, the files can be transfered piece by piece. The size of each piece can be tuned using the buffer_size argument.

static Helper.transfer_output_files(workflow_id, wf_ctrl, buffer_size=262144)[source]

Transfers all the output files of a workflow which are ready to transfer.

Parameters:
  • workflow_id (workflow identifier) –

  • wf_ctrl (client.WorkflowController) –

  • buffer_size (int) – Depending on the transfer method, the files can be transfered piece by piece. The size of each piece can be tuned using the buffer_size argument.

static Helper.serialize(file_path, workflow)[source]

Saves a workflow to a file. Uses JSON format.

Raises SerializationError in case of failure

Parameters:
static Helper.unserialize(file_path)[source]

Loads a workflow from a file. Opens JSON format or pickle (see the method: Helper.convert_wf_file_for_p2_5).

Parameters:

file_path (str) –

Returns:

  • workflow (client.Workflow)

  • Raises *SerializationError in case of failure*

static Helper.convert_wf_file_for_p2_5(origin_file_path, target_file_path)[source]

This method requires Python >= 2.6. It converts a workflow file created using Python >= 2.6 to workflow file usable in Python 2.5.