Python API¶
The submission, monitoring and control of Workflow, Job and FileTransfer can be done through the client.WorkflowController interface.
This page presents the documentation of the client.WorkflowController class.
- class client.WorkflowController(object)[source]¶
Submission, control and monitoring of Job, FileTransfer and Workflow objects.
Sets up the connection to the computing resource. Looks for a soma-workflow configuration file (if not specified in the config argument).
Note
The login and password are only required for a remote computing resource.
- Parameters
resource_id (str) – Identifier of the computing resource to connect to. If None, the number of cpu of the current machine is detected and the basic scheduler is lauched.
login (str) – Required if the computing resource is remote.
password (str) – Required if the computing resource is remote and not RSA key where configured to log on the remote machine with ssh.
config (configuration.Configuration) – Optional configuration.
rsa_key_pass (str) – Required if the RSA key is protected with a password.
isolated_light_mode (None, str, or True) – if not None, work in a custom soma-workflow directory (database, transfers, temporary files…). If the isolated_light_mode parameter value is True, then generate a temporary directory for that. Otherwise the parameter should be a directory name which will be used instead of the default one.
WorkflowController API
Setup the connection with the computing resource¶
- WorkflowController.__init__(resource_id=None, login=None, password=None, config=None, rsa_key_pass=None, isolated_light_mode=None)[source]¶
Sets up the connection to the computing resource. Looks for a soma-workflow configuration file (if not specified in the config argument).
Note
The login and password are only required for a remote computing resource.
- Parameters
resource_id (str) – Identifier of the computing resource to connect to. If None, the number of cpu of the current machine is detected and the basic scheduler is lauched.
login (str) – Required if the computing resource is remote.
password (str) – Required if the computing resource is remote and not RSA key where configured to log on the remote machine with ssh.
config (configuration.Configuration) – Optional configuration.
rsa_key_pass (str) – Required if the RSA key is protected with a password.
isolated_light_mode (None, str, or True) – if not None, work in a custom soma-workflow directory (database, transfers, temporary files…). If the isolated_light_mode parameter value is True, then generate a temporary directory for that. Otherwise the parameter should be a directory name which will be used instead of the default one.
Submission / Registration¶
- WorkflowController.submit_workflow(workflow, expiration_date=None, name=None, queue=None)[source]¶
Submits a workflow and returns a workflow identifier.
Raises WorkflowError or JobError if the workflow is not correct.
- Parameters
workflow (client.Workflow) – Workflow description.
expiration_date (datetime.datetime) – After this date the workflow will be deleted.
name (str) – Optional workflow name.
queue (str) – Optional name of the queue where to submit jobs. If it is not specified the jobs will be submitted to the default queue.
- Returns
Workflow_identifier
- Return type
- WorkflowController.register_transfer(file_transfer)[source]¶
Registers a file transfer which is not part of a workflow and returns a file transfer identifier.
- Parameters
file_transfer (client.FileTransfer) –
- Returns
transfer
- Return type
EngineTransfer
User’s Workflows, Jobs, and FileTransfers retrieval¶
- WorkflowController.workflow(workflow_id)[source]¶
Raises UnknownObjectError if the workflow_id is not valid
- Parameters
workflow_id (workflow_identifier) –
- Returns
- Return type
- WorkflowController.workflows(workflow_ids=None)[source]¶
Lists the identifiers and general information about all the workflows submitted by the user, or about the workflows specified in the workflow_ids argument.
- Parameters
workflow_ids (sequence of workflow identifiers) –
- Returns
workflows – workflow_id -> (workflow_name, expiration_date)
- Return type
dictionary: workflow identifier -> tuple(date, string)
- WorkflowController.jobs(job_ids=None)[source]¶
Lists the identifiers and general information about all the jobs submitted by the user and which are not part of a workflow, or about the jobs specified in the job_ids argument.
- Parameters
job_ids (sequence of job identifiers) –
- Returns
jobs – job_id -> (name, command, submission date)
- Return type
dictionary: job identifiers -> tuple(string, string, date)
- WorkflowController.transfers(transfer_ids=None)[source]¶
Lists the identifiers and information about all the user’s file transfers which are not part of a workflow or about the file transfers specified in the transfer_ids argument.
- Parameters
transfer_ids (sequence of FileTransfer identifiers) –
- Returns
transfers –
- transfer_id -> (
client_path: client file or directory path
expiration_date: after this date the file copied on the computing resource and all the transfer information will be deleted, unless an existing job has declared this file as output or input.
client_paths: sequence of file or directory path or None)
- Return type
dictionary: str -> tuple(str, date, None or sequence of str)
Monitoring¶
Workflows¶
- WorkflowController.workflow_status(workflow_id)[source]¶
Raises UnknownObjectError if the workflow_id is not valid
- Parameters
workflow_id (workflow identifier) –
- Returns
status – Status of the workflow: see Workflow status or the constants.WORKFLOW_STATUS list.
- Return type
- WorkflowController.workflow_elements_status(workflow_id, with_drms_id=True)[source]¶
Gets back the status of all the workflow elements at once, minimizing the communication with the server and request to the database. TO DO => make it more user friendly.
Note: in Soma-Workflow 3.0, the last job info (drmaa_id) has been added to job status tuple.
- Parameters
workflow_id (workflow_identifier) –
with_drms_id (bool (optional, default=True)) – if True the DRMS id (drmaa_id) is also included in the returned tuple for each job. This info has been added in soma_workflow 3.0 and is thus optional to avoid breaking compatibility with earlier versions.
- Returns
status (tuple:) –
- sequence of tuple
- (job_id, status, queue, exit_info,
(submission_date, execution_date, ending_date, drmaa_id), [drms_id]),
- sequence of tuple
- (transfer_id, (status, progression_info, engine_path,
client_path, client_paths)),
workflow_status,
workflow_queue,
sequence of tuple (temp_path_id, engine_path, status)
Raises *UnknownObjectError if the workflow_id is not valid*
Jobs¶
- WorkflowController.job_status(job_id)[source]¶
- Parameters
job_id (job identifier) –
- Returns
status (str) – Status of the job: see Job status or the list constants.JOB_STATUS.
Raises *UnknownObjectError if the job_id is not valid*
- WorkflowController.job_termination_status(job_id)[source]¶
Information related to the end of the job.
- Parameters
job_id (job identifier) –
- Returns
status (tuple(str, int or None, str or None, str) or None) –
exit status: status of the terminated job: see Job exit status or the constants.JOB_EXIT_STATUS list.
exit value: operating system exit code of the job if the job terminated normally.
terminating signal: representation of the signal that caused the termination of the job if the job terminated due to the receipt of a signal.
resource usage: resource usage information provided as an array of strings where each string complies with the format <name>=<value>. The information provided depends on the DRMS and DRMAA implementation.
Raises *UnknownObjectError if the job_id is not valid*
File Transfers¶
- WorkflowController.transfer_status(transfer_id)[source]¶
File transfer status and information related to the transfer progress.
- Parameters
transfer_id (transfer identifier) –
- Returns
status (tuple(transfer_status or None, tuple or None)) –
Status of the file transfer : see File Transfer status or the constants.FILE_TRANSFER_STATUS list.
None if the transfer status in not constants.TRANSFERING_FROM_CLIENT_TO_CR or constants.TRANSFERING_FROM_CR_TO_CLIENT. tuple (file size, size already transfered) if it is a file transfer. tuple (cumulated size, sequence of tuple (relative_path, file_size, size already transfered) if it is a directory transfer.
Raises *UnknownObjectError if the transfer_id is not valid*
Control¶
Workflows¶
- WorkflowController.stop_workflow(workflow_id)[source]¶
Stops a workflow. The running jobs will be killed. The jobs in queues will be removed from queues. It will be possible to restart the workflow afterwards.
- Returns
success – returns True if the running jobs were killed and False if some jobs are possibly still running on the computing resource despite the workflow was stopped.
- Return type
- WorkflowController.restart_workflow(workflow_id, queue=None)[source]¶
Restarts the jobs of the workflow which failed. The jobs will be submitted again. The workflow status has to be constants.WORKFLOW_DONE.
- Parameters
workflow_id (workflow identifier) –
queue (str) – Optional name of the queue where to submit jobs. If it is not specified the jobs will be submitted to the default queue.
- Returns
success (bool) – True if some jobs were restarted.
Raises *UnknownObjectError if the workflow_id is not valid*
- WorkflowController.delete_workflow(workflow_id, force=True)[source]¶
Deletes the workflow and all its associated elements (FileTransfers and Jobs). The worklfow_id will become invalid and can not be used anymore. The workflow jobs which are running will be killed. If force is set to True: the call will block until the workflow is deleted. With force set to True, if the workflow can not be deleted properly it is deleted from Soma-workflow database. However, if some jobs are still running they are not be killed. In this case the return value is False.
- Parameters
workflow_id (workflow_identifier) –
force (bool) –
- Returns
success (bool)
Raises *UnknownObjectError if the workflow_id is not valid*
- WorkflowController.change_workflow_expiration_date(workflow_id, new_expiration_date)[source]¶
Sets a new expiration date for the workflow.
- Parameters
workflow_id (workflow identifier) –
new_expiration_date (datetime.datetime) –
- Returns
success (bool) – True if the expiration date was changed.
Raises *UnknownObjectError if the workflow_id is not valid*
- WorkflowController.wait_workflow(workflow_id, timeout=- 1)[source]¶
Waits for the specified workflow to finish.
Raises UnknownObjectError if the job_id is not valid
- Parameters
workflow_id (workflow identifier) – Jobs to wait for.
timeout (int) – The call to wait_job exits before timeout seconds. A negative value means that the method will wait indefinetely.
Jobs¶
- WorkflowController.wait_job(job_ids, timeout=- 1)[source]¶
Waits for all the specified jobs to finish.
Raises UnknownObjectError if the job_id is not valid
- Parameters
job_ids (sequence of job identifier) – Jobs to wait for.
timeout (int) – The call to wait_job exits before timeout seconds. A negative value means that the method will wait indefinetely.
File Transfers¶
- WorkflowController.transfer_files(transfer_ids, buffer_size=262144)[source]¶
Transfer file(s) associated to the transfer_id. If the files are only located on the client side (that is the transfer status is constants.FILES_ON_CLIENT) the file(s) will be transfered from the client to the computing resource. If the files are located on the computing resource side (that is the transfer status is constants.FILES_ON_CR or constants.FILES_ON_CLIENT_AND_CR) the files will be transfered from the computing resource to the client.
- Parameters
transfer_id (FileTransfer identifier) –
buffer_size (int) – Depending on the transfer method, the files can be transfered piece by piece. The size of each piece can be tuned using the buffer_size argument.
- Returns
success (bool) – The transfer was done. (TBI right error management)
Raises *UnknownObjectError if the transfer_id is not valid*
- WorkflowController.delete_transfer(transfer_id)[source]¶
Deletes the FileTransfer and the associated files and directories on the computing resource side. The transfer_id will become invalid and can not be used anymore. If some jobs reference the FileTransfer as an input or an output the FileTransfer will not be deleted immediately but as soon as these jobs will be deleted.
Raises UnknownObjectError if the transfer_id is not valid
Helper¶
- static Helper.list_failed_jobs(workflow_id, wf_ctrl, include_aborted_jobs=False, include_user_killed_jobs=False, include_statuses=None)[source]¶
To spot the problematic jobs in a workflow.
- Parameters
workflow_id (workflow identifier) –
include_aborted_jobs (bool) – Include the jobs which exit status is constants.EXIT_ABORTED and constants.EXIT_NOTRUN
include_user_killed_jobs (bool) – Include the jobs which exit status is constants.USER_KILLED
include_statuses (sequence or None) – Get failed jobs with exit status in this list/set. Ignore
include_aborted_jobs
andinclude_user_killed_jobs
parameters.
- Returns
jobs – Returns the list of id of job which status is constants.FAILED or which exit value is not 0.
- Return type
list of job identifier
- static Helper.delete_all_workflows(wf_ctrl, force=True)[source]¶
Delete all the workflows. If force is set to True: the call will block until the workflows are deleted. With force set to True, if a workflow can not be deleted properly it is deleted from Soma-workflow database. However, if some jobs are still running they will not be killed. In this case the return value is False.
- Parameters
wf_ctrl (client.WorkflowController) –
force (bool) –
- Returns
success
- Return type
- static Helper.wait_workflow(workflow_id, wf_ctrl)[source]¶
Waits for workflow execution to end.
- Parameters
workflow_id (workflow identifier) –
wf_ctrl (client.WorkflowController) –
- static Helper.transfer_input_files(workflow_id, wf_ctrl, buffer_size=262144)[source]¶
Transfers all the input files of a workflow.
- Parameters
workflow_id (workflow identifier) –
wf_ctrl (client.WorkflowController) –
buffer_size (int) – Depending on the transfer method, the files can be transfered piece by piece. The size of each piece can be tuned using the buffer_size argument.
- static Helper.transfer_output_files(workflow_id, wf_ctrl, buffer_size=262144)[source]¶
Transfers all the output files of a workflow which are ready to transfer.
- Parameters
workflow_id (workflow identifier) –
wf_ctrl (client.WorkflowController) –
buffer_size (int) – Depending on the transfer method, the files can be transfered piece by piece. The size of each piece can be tuned using the buffer_size argument.
- static Helper.serialize(file_path, workflow)[source]¶
Saves a workflow to a file. Uses JSON format.
Raises SerializationError in case of failure
- Parameters
file_path (str) –
workflow (client.Workflow) –