# -*- coding: utf-8 -*-
'''
@author: Soizic Laguitton
@organization: I2BM, Neurospin, Gif-sur-Yvette, France
@organization: CATI, France
@organization: U{IFR 49<http://www.ifr49.org>}
@license: U{CeCILL version 2<http://www.cecill.info/licences/Licence_CeCILL_V2-en.html>}
'''
#-------------------------------------------------------------------------------
# Imports
#-------------------------------------------------------------------------
from __future__ import print_function
from __future__ import absolute_import
import os
import os.path as osp
import hashlib
import stat
import operator
import random
import pickle
import types
import sys
import posixpath
import logging
import six
import tempfile
import json
# import cProfile
# import traceback
import soma_workflow.connection as connection
from soma_workflow.transfer import PortableRemoteTransfer, TransferSCP, TransferRsync, TransferMonitoring, TransferLocal
import soma_workflow.constants as constants
import soma_workflow.configuration as configuration
from soma_workflow.errors import TransferError, SerializationError, SomaWorkflowError
#-------------------------------------------------------------------------------
# Classes and functions
#-------------------------------------------------------------------------
# imports required by the users of soma-workflow API (do not remove):
from soma_workflow.client_types import Job
from soma_workflow.client_types import EngineExecutionJob
from soma_workflow.custom_jobs import BarrierJob
from soma_workflow.custom_jobs import MapJob
from soma_workflow.custom_jobs import ReduceJob
from soma_workflow.custom_jobs import ListCatJob
from soma_workflow.custom_jobs import LeaveOneOutJob
from soma_workflow.custom_jobs import CrossValidationFoldJob
from soma_workflow.client_types import Workflow
from soma_workflow.client_types import Group
from soma_workflow.client_types import FileTransfer
from soma_workflow.client_types import SharedResourcePath
from soma_workflow.client_types import TemporaryPath
from soma_workflow.client_types import SpecialPath
from soma_workflow.client_types import OptionPath
from soma_workflow import scheduler
[docs]class WorkflowController(object):
    '''
    Submission, control and monitoring of Job, FileTransfer and Workflow
    objects.
    '''
    _connection = None
    _engine_proxy = None
    _transfer = None
    _transfer_stdouterr = None
    config = None
    engine_config_proxy = None
    _resource_id = None
    scheduler_config = None
    isolated_light_mode = None
[docs]    def __init__(self,
                 resource_id=None,
                 login=None,
                 password=None,
                 config=None,
                 rsa_key_pass=None,
                 isolated_light_mode=None):
        '''
        Sets up the connection to the computing resource.
        Looks for a soma-workflow configuration file (if not specified in the
        *config* argument).
        .. note::
          The login and password are only required for a remote computing
          resource.
        Parameters
        ----------
        resource_id: str
            Identifier of the computing resource to connect to.
            If None, the number of cpu of the current machine is detected and
            the basic scheduler is lauched.
        login: str
            Required if the computing resource is remote.
        password: str
            Required if the computing resource is remote and not RSA key where
            configured to log on the remote machine with ssh.
        config: configuration.Configuration
            Optional configuration.
        rsa_key_pass: str
            Required if the RSA key is protected with a password.
        isolated_light_mode: None, str, or True
            if not None, work in a custom soma-workflow directory (database,
            transfers, temporary files...). If the isolated_light_mode
            parameter value is True, then generate a temporary directory for
            that. Otherwise the parameter should be a directory name which will
            be used instead of the default one. If None, a class-wide variable
            WorkflowController.isolated_light_mode is used instread, so you can
            set it on the class to allow isolated mode globally.
        '''
        if isolated_light_mode is None:
            isolated_light_mode = WorkflowController.isolated_light_mode
        if isolated_light_mode is not None:
            if isolated_light_mode is True:
                isolated_dir = tempfile.mkdtemp(prefix='soma_workflow_')
            else:
                isolated_dir = isolated_light_mode
            resource_id = 'localhost'
            os.environ['SOMA_WORKFLOW_CONFIG'] = osp.join(isolated_dir,
                                                          'soma_workflow.cfg')
            db_file = osp.join(isolated_dir, 'soma-workflow.db')
            trans_dir = osp.join(isolated_dir, 'transfered_files')
            config = configuration.Configuration(
                'localhost', 'light', 'local_basic', db_file, trans_dir)
            if isolated_light_mode is True:
                config._temp_config_dir = isolated_dir
        if config is None:
            self.config = configuration.Configuration.load_from_file(
                resource_id)
        else:
            self.config = config
        if resource_id is None:
            resource_id \
                
= configuration.Configuration.get_local_resource_id(config)
        if password == '':
            password = None
        self.scheduler_config = None
        mode = self.config.get_mode()
        self._resource_id = resource_id
        # LOCAL MODE
        if mode == configuration.LOCAL_MODE:
            print("soma-workflow starting in local mode")
            # setup logging
            (engine_log_dir,
             engine_log_format,
             engine_log_level) = self.config.get_engine_log_info()
            if engine_log_dir:
                logfilepath = os.path.join(
                    os.path.abspath(engine_log_dir), "log_local_mode")
                log_dir = os.path.dirname(logfilepath)
                if not os.path.exists(log_dir):
                    os.makedirs(log_dir)
                logging.basicConfig(
                    filename=logfilepath,
                    format=engine_log_format,
                    level=eval("logging." + engine_log_level))
            trial = 0
            ok = False
            # several attempts are sometimes needed here:
            # the local port is sometimes busy or something.
            # We must wait for a timeout and the subsequent
            # exception, then retry, and it often works...
            while not ok and trial < 3:
                trial += 1
                try:
                    self._connection = connection.LocalConnection(
                        resource_id,
                        "")
                    print("Local connection established")
                    ok = True
                except Exception:
                    if trial == 2:
                        raise
                    logging.info('LocalConnection failed to establish '
                                 '- trying again')
                    import time
                    time.sleep(1.)
            self._engine_proxy = self._connection.get_workflow_engine()
            self.engine_config_proxy = self._connection.get_configuration()
            self.scheduler_config = self._connection.get_scheduler_config()
            self._transfer = TransferLocal(self._engine_proxy)
            self._transfer_stdouterr = TransferLocal(self._engine_proxy)
        # REMOTE MODE
        elif mode == configuration.REMOTE_MODE:
            print("soma-workflow starting in remote mode")
            submitting_machines = self.config.get_submitting_machines()
            sub_machine = submitting_machines[random.randint(
                0, len(submitting_machines) - 1)]
            cluster_address = self.config.get_cluster_address()
            if login is None:
                login = self.config.get_login()
            print('cluster address: %s, submission machine: %s, login: %s'
                  % (cluster_address, sub_machine, login))
            trial = 0
            ok = False
            # several attempts are sometimes needed here:
            # the paramiko transport and tunnel sometimes does not start
            # correctly and remains silent (no communication can be done, no
            # error reported). We must wait for a timeout and the subsequent
            # exception, then retry, and it often works...
            while not ok and trial < 3:
                trial += 1
                try:
                    self._connection = connection.RemoteConnection(
                        login,
                        password,
                        cluster_address,
                        sub_machine,
                        resource_id,
                        "",
                        rsa_key_pass,
                        self.config)
                    print("Remote connection established")
                    ok = True
                except Exception:
                    if trial == 2:
                        raise
                    logging.info('RemoteConnection failed to establish '
                                 '- trying again')
                    import time
                    time.sleep(1.)
            self._engine_proxy = self._connection.get_workflow_engine()
            self.engine_config_proxy = self._connection.get_configuration()
            self.scheduler_config = self._connection.get_scheduler_config()
            if not password and not rsa_key_pass:
                self._transfer = TransferSCP(self._engine_proxy,
                                             username=login,
                                             hostname=sub_machine)
            else:
                self._transfer = PortableRemoteTransfer(self._engine_proxy)
            self._transfer_stdouterr = PortableRemoteTransfer(
                self._engine_proxy)
        # LIGHT MODE
        elif mode == configuration.LIGHT_MODE:
            print("soma-workflow starting in light mode")
            local_scdl_cfg_path \
                
= configuration.LocalSchedulerCfg.search_config_path()
            if local_scdl_cfg_path == None:
                cpu_count = Helper.cpu_count()
                self.scheduler_config = configuration.LocalSchedulerCfg(
                    proc_nb=cpu_count)
            else:
                self.scheduler_config \
                    
= configuration.LocalSchedulerCfg.load_from_file(
                        local_scdl_cfg_path)
            self.config.set_scheduler_config(self.scheduler_config)
            self._engine_proxy = _embedded_engine_and_server(self.config)
            self.engine_config_proxy = self.config
            self._connection = None
            self._transfer = TransferLocal(self._engine_proxy)
            self._transfer_stdouterr = TransferLocal(self._engine_proxy)
        self._transfer_monitoring = TransferMonitoring(self._engine_proxy)
        print("Workflow controller initialised") 
    def __del__(self):
        print('del WorkflowController')
        self.stop_engine()
        #try:
            #import gc
            #gc.collect()
        #except Exception:
            #pass
    def get_scheduler_type(self):
        '''
        Returns the scheduler type in the underlying engine ('local_basic',
        'pbs', 'pbspro', 'drmaa' ...)
        '''
        return self.engine_config_proxy.get_scheduler_type()
    def disconnect(self):
        '''
        Simulates a disconnection for TEST PURPOSE ONLY.
        !!! The current instance will not be usable anymore after this call !!!!
        '''
        if self._connection:
            self._connection.stop()
            self._connection = None
    def stop_engine(self):
        if hasattr(self, '_transfer_monitoring') \
                
and self._transfer_monitoring is not None:
            del self._transfer_monitoring
        if hasattr(self, '_transfer') and self._transfer is not None:
            del self._transfer
        if hasattr(self, '_transfer_stdouterr') \
                
and self._transfer_stdouterr is not None:
            del self._transfer_stdouterr
        if hasattr(self, 'engine_config_proxy') \
                
and self.engine_config_proxy is not None:
            del self.engine_config_proxy
        if hasattr(self, 'scheduler_config') \
                
and self.scheduler_config is not None:
            del self.scheduler_config
        if self._engine_proxy and self._engine_proxy is not None:
            if hasattr(self._engine_proxy, 'interrupt_after'):
                self._engine_proxy.interrupt_after(10.)
            try:
                self._engine_proxy.stop()
            except Exception:
                pass  # cleanup anyway
            self._engine_proxy = None
        self.disconnect()
    # SUBMISSION / REGISTRATION ####################################
[docs]    def submit_workflow(self,
                        workflow,
                        expiration_date=None,
                        name=None,
                        queue=None):
        '''
        Submits a workflow and returns a workflow identifier.
        Raises *WorkflowError* or *JobError* if the workflow is not correct.
        Parameters
        ----------
        workflow: client.Workflow
            Workflow description.
        expiration_date: *datetime.datetime*
            After this date the workflow will be deleted.
        name: str
            Optional workflow name.
        queue: str
            Optional name of the queue where to submit jobs. If it is not
            specified the jobs will be submitted to the default queue.
        Returns
        -------
        Workflow_identifier: int
        '''
        if self.engine_config_proxy.get_scheduler_type() \
                
== configuration.MPI_SCHEDULER:
            raise SomaWorkflowError(
                "The MPI scheduler is configured for this resource. "
                "Use soma_workflow.MPI_workflow_runner to submit a workflow "
                "using the MPI scheduler.")
        # cProfile.runctx("wf_id = self._engine_proxy.submit_workflow(workflow,
        # expiration_date, name, queue)", globals(), locals(),
        # "/home/soizic/profile/profile_submit_workflow")
        wf_id = self._engine_proxy.submit_workflow(workflow,
                                                   expiration_date,
                                                   name,
                                                   queue)
        return wf_id 
[docs]    def register_transfer(self, file_transfer):
        '''
        Registers a file transfer which is not part of a workflow and returns a
        file transfer identifier.
        Parameters
        ----------
        file_transfer: client.FileTransfer
        Returns
        -------
        transfer: EngineTransfer
        '''
        engine_transfer = self._engine_proxy.register_transfer(file_transfer)
        return engine_transfer 
    # WORKFLOWS, JOBS and FILE TRANSFERS RETRIEVAL ###################
[docs]    def workflow(self, workflow_id):
        '''
        Raises *UnknownObjectError* if the workflow_id is not valid
        Parameters
        ----------
        workflow_id: workflow_identifier
        Returns
        -------
        Workflow
        '''
        return self._engine_proxy.workflow(workflow_id) 
[docs]    def workflows(self, workflow_ids=None):
        '''
        Lists the identifiers and general information about all the workflows
        submitted by the user, or about the workflows specified in the
        *workflow_ids* argument.
        Parameters
        ----------
        workflow_ids: sequence of workflow identifiers
        Returns
        -------
        workflows: dictionary: workflow identifier -> tuple(date, string)
            workflow_id -> (workflow_name, expiration_date)
        '''
        return self._engine_proxy.workflows(workflow_ids) 
[docs]    def jobs(self, job_ids=None):
        '''
        Lists the identifiers and general information about all the jobs
        submitted by the user and which are not part of a workflow, or about
        the jobs specified in the *job_ids* argument.
        Parameters
        ----------
        job_ids: sequence of job identifiers
        Returns
        -------
        jobs: dictionary: job identifiers -> tuple(string, string, date)
            job_id -> (name, command, submission date)
        '''
        return self._engine_proxy.jobs(job_ids) 
[docs]    def transfers(self, transfer_ids=None):
        '''
        Lists the identifiers and information about all the user's file
        transfers which are not part of a workflow or about the file transfers
        specified in the *transfer_ids* argument.
        Parameters
        ----------
        transfer_ids: sequence of FileTransfer identifiers
        Returns
        -------
        transfers: dictionary: str -> tuple(str, date, None or sequence of str)
            transfer_id -> (
                            * client_path: client file or directory path
                            * expiration_date: after this date the file copied
                              on the computing resource and all the transfer
                              information will be deleted, unless an existing
                              job has declared this file as output or input.
                            * client_paths: sequence of file or directory path
                              or None)
        '''
        return self._engine_proxy.transfers(transfer_ids) 
    # WORKFLOW MONITORING #########################################
[docs]    def workflow_status(self, workflow_id):
        '''
        Raises *UnknownObjectError* if the workflow_id is not valid
        Parameters
        ----------
        workflow_id: workflow identifier
        Returns
        -------
        status: str or None
            Status of the workflow: see :ref:`workflow-status` or the
            constants.WORKFLOW_STATUS list.
        '''
        return self._engine_proxy.workflow_status(workflow_id) 
[docs]    def workflow_elements_status(self, workflow_id, with_drms_id=True):
        '''
        Gets back the status of all the workflow elements at once, minimizing
        the communication with the server and request to the database.
        TO DO => make it more user friendly.
        Note: in Soma-Workflow 3.0, the last job info (drmaa_id) has been added
        to job status tuple.
        Parameters
        ----------
        workflow_id: workflow_identifier
        with_drms_id: bool (optional, default=True)
            if True the DRMS id (drmaa_id) is also included in the returned
            tuple for each job. This info has been added in soma_workflow 3.0
            and is thus optional to avoid breaking compatibility with earlier
            versions.
        Returns
        -------
        status: tuple:
            * sequence of tuple
                (job_id, status, queue, exit_info,
                    (submission_date, execution_date, ending_date, drmaa_id),
                    [drms_id]),
            * sequence of tuple
                (transfer_id, (status, progression_info, engine_path,
                 client_path, client_paths)),
            * workflow_status,
            * workflow_queue,
            * sequence of tuple (temp_path_id, engine_path, status)
        Raises *UnknownObjectError* if the workflow_id is not valid
        '''
        wf_status = self._engine_proxy.workflow_elements_status(
            workflow_id, with_drms_id=with_drms_id)
        # special processing for transfer status:
        new_transfer_status = []
        for transfer_id, engine_path, client_path, client_paths, status, \
                
transfer_type in wf_status[1]:
            progression = self._transfer_progression(status,
                                                     transfer_type,
                                                     client_path,
                                                     client_paths,
                                                     engine_path)
            new_transfer_status.append((transfer_id,
                                        (status, progression, engine_path,
                                         client_path, client_paths)))
        new_wf_status = (
            wf_status[0], new_transfer_status, wf_status[2], wf_status[3],
            wf_status[4])
        return new_wf_status 
    # JOB MONITORING #############################################
[docs]    def job_status(self, job_id):
        '''
        Parameters
        ----------
        job_id: job identifier
        Returns
        -------
        status: str
            Status of the job: see :ref:`job-status` or the list
            constants.JOB_STATUS.
        Raises *UnknownObjectError* if the job_id is not valid
        '''
        return self._engine_proxy.job_status(job_id) 
    def get_engine_job(self, job_id):
        return self._engine_proxy.get_engine_job(job_id)
    def get_job_command(self, job_id):
        '''
        Get a job commandline from the database
        '''
        return self._engine_proxy.get_job_command(job_id)
    def updated_job_parameters(self, job_id):
        return self._engine_proxy.updated_job_parameters(job_id)
    def get_job_output_params(self, job_id):
        return self._engine_proxy.get_job_output_params(job_id)
    def drms_job_id(self, wf_id, job_id):
        return self._engine_proxy.drms_job_id(wf_id, job_id)
[docs]    def job_termination_status(self, job_id):
        '''
        Information related to the end of the job.
        Parameters
        ----------
        job_id: job identifier
        Returns
        -------
        status: tuple(str, int or None, str or None, str) or None
            * exit status: status of the terminated job: see
              :ref:`job-exit-status` or the constants.JOB_EXIT_STATUS list.
            * exit value: operating system exit code of the job if the job
              terminated normally.
            * terminating signal: representation of the signal that caused the
              termination of the job if the job terminated due to the receipt
              of a signal.
            * resource usage: resource usage information provided as an array
              of strings where each string complies with the format
              <name>=<value>.
              The information provided depends on the DRMS and DRMAA
              implementation.
        Raises *UnknownObjectError* if the job_id is not valid
        '''
        return self._engine_proxy.job_termination_status(job_id) 
[docs]    def retrieve_job_stdouterr(self,
                               job_id,
                               stdout_file_path,
                               stderr_file_path=None,
                               buffer_size=512 ** 2):
        '''
        Copies the job standard output and error to specified file.
        Raises *UnknownObjectError* if the job_id is not valid
        Parameters
        ----------
        job_id: job identifier
        stdout_file_path: str
            Path of the file where to copy the standard output.
        stderr_file_path: str
            Path of the file where to copy the standard error.
        buffer_size: int
            The file is transfered piece by piece of size buffer_size.
        '''
        stdout_file_path = os.path.abspath(stdout_file_path)
        stderr_file_path = os.path.abspath(stderr_file_path)
        (engine_stdout_file,
         engine_stderr_file) = self._engine_proxy.stdouterr_file_path(job_id)
        self._transfer_stdouterr.transfer_from_remote(engine_stdout_file,
                                                      stdout_file_path)
        self._transfer_stdouterr.transfer_from_remote(engine_stderr_file,
                                                      stderr_file_path) 
    # FILE TRANSFER MONITORING ###################################
[docs]    def transfer_status(self, transfer_id):
        '''
        File transfer status and information related to the transfer progress.
        Parameters
        ----------
        transfer_id: transfer identifier
        Returns
        -------
        status: tuple(transfer_status or None, tuple or None)
            * Status of the file transfer : see :ref:`file-transfer-status` or
              the constants.FILE_TRANSFER_STATUS list.
            * None if the transfer status in not
              constants.TRANSFERING_FROM_CLIENT_TO_CR or
              constants.TRANSFERING_FROM_CR_TO_CLIENT.
              tuple (file size, size already transfered) if it is a file
              transfer.
              tuple (cumulated size, sequence of tuple (relative_path,
              file_size, size already transfered) if it is a directory
              transfer.
        Raises *UnknownObjectError* if the transfer_id is not valid
        '''
        (transfer_id,
         engine_path,
         client_path,
         expiration_date,
         workflow_id,
         client_paths,
         transfer_type,
         status) = self._engine_proxy.transfer_information(transfer_id)
        progression = self._transfer_progression(status,
                                                 transfer_type,
                                                 client_path,
                                                 client_paths,
                                                 engine_path)
        return (status, progression) 
    # WORKFLOW CONTROL ############################################
[docs]    def restart_workflow(self, workflow_id, queue=None):
        '''
        Restarts the jobs of the workflow which failed. The jobs will be
        submitted again.
        The workflow status has to be constants.WORKFLOW_DONE.
        Parameters
        ----------
        workflow_id: workflow identifier
        queue: str
            Optional name of the queue where to submit jobs. If it is not
            specified the jobs will be submitted to the default queue.
        Returns
        -------
        success: bool
            True if some jobs were restarted.
        Raises *UnknownObjectError* if the workflow_id is not valid
        '''
        if self.engine_config_proxy.get_scheduler_type() \
                
== configuration.MPI_SCHEDULER:
            raise SomaWorkflowError(
                "The MPI scheduler is configured for this resource. "
                "Use soma_workflow.MPI_workflow_runner to restart a workflow "
                "using the MPI scheduler.")
        return self._engine_proxy.restart_workflow(workflow_id, queue) 
[docs]    def delete_workflow(self, workflow_id, force=True):
        '''
        Deletes the workflow and all its associated elements (FileTransfers and
        Jobs). The worklfow_id will become invalid and can not be used anymore.
        The workflow jobs which are running will be killed.
        If force is set to True: the call will block until the workflow is
        deleted. With force set to True, if the workflow can not be deleted
        properly it is deleted from Soma-workflow database. However, if some
        jobs are still running they are not be killed. In this case the return
        value is False.
        Parameters
        ----------
        workflow_id: workflow_identifier
        force: bool
        Returns
        -------
        success: bool
        Raises *UnknownObjectError* if the workflow_id is not valid
        '''
        # cProfile.runctx("self._engine_proxy.delete_workflow(workflow_id)",
        # globals(), locals(), "/home/soizic/profile/profile_delete_workflow")
        return self._engine_proxy.delete_workflow(workflow_id, force) 
[docs]    def stop_workflow(self, workflow_id):
        '''
        Stops a workflow.
        The running jobs will be killed.
        The jobs in queues will be removed from queues.
        It will be possible to restart the workflow afterwards.
        Returns
        -------
        success: bool
            returns True if the running jobs were killed and False
            if some jobs are possibly still running on the computing resource
            despite the workflow was stopped.
        '''
        if self.engine_config_proxy.get_scheduler_type() \
                
== configuration.MPI_SCHEDULER:
            raise SomaWorkflowError(
                "The MPI scheduler is configured for this resource. "
                "Kill the soma_workflow.MPI_workflow_runner job to stop the "
                "workflow.")
        return self._engine_proxy.stop_workflow(workflow_id) 
    def stop_jobs(self, workflow_id, job_ids):
        return self._engine_proxy.stop_jobs(workflow_id, job_ids)
[docs]    def restart_jobs(self, workflow_id, job_ids):
        return self._engine_proxy.restart_jobs(workflow_id, job_ids) 
[docs]    def change_workflow_expiration_date(self, workflow_id,
                                        new_expiration_date):
        '''
        Sets a new expiration date for the workflow.
        Parameters
        ----------
        workflow_id: workflow identifier
        new_expiration_date: datetime.datetime
        Returns
        -------
        success: bool
            True if the expiration date was changed.
        Raises *UnknownObjectError* if the workflow_id is not valid
        '''
        return self._engine_proxy.change_workflow_expiration_date(
            workflow_id, new_expiration_date) 
    # JOB CONTROL #################################################
[docs]    def wait_job(self, job_ids, timeout=-1):
        '''
        Waits for all the specified jobs to finish.
        Raises *UnknownObjectError* if the job_id is not valid
        Parameters
        ----------
        job_ids: sequence of job identifier
            Jobs to wait for.
        timeout: int
            The call to wait_job exits before timeout seconds.
            A negative value means that the method will wait indefinetely.
        '''
        self._engine_proxy.wait_job(job_ids, timeout) 
[docs]    def wait_workflow(self, workflow_id, timeout=-1):
        '''
        Waits for the specified workflow to finish.
        Raises *UnknownObjectError* if the job_id is not valid
        Parameters
        ----------
        workflow_id: workflow identifier
            Jobs to wait for.
        timeout: int
            The call to wait_job exits before timeout seconds.
            A negative value means that the method will wait indefinetely.
        '''
        self._engine_proxy.wait_workflow(workflow_id, timeout) 
    def log_failed_workflow(self, workflow_id, file=sys.stderr):
        '''
        If the workflow has any failed job, log their status and outputs in the
        given file.
        '''
        workflow_status = self.workflow_status(workflow_id)
        if workflow_status != constants.WORKFLOW_DONE:
            print('** Workflow did not finish regularly: %s' % workflow_status,
                  file=file)
        else:
            print('** Workflow status OK', file=file)
        elements_status = self.workflow_elements_status(workflow_id)
        failed_jobs = [element for element in elements_status[0]
                       if element[1] == constants.FAILED
                       or (element[1] == constants.DONE and
                           (element[3][0]
                            not in (constants.FINISHED_REGULARLY, None)
                            or element[3][1] != 0))]
        failed_jobs_info = self.jobs(
            [element[0] for element in failed_jobs
             if element[3][0] != constants.EXIT_NOTRUN])
        if len(failed_jobs) != 0:
            # failure
            print('** Jobs failure, the following jobs ended with failed '
                  'status:', file=file)
            for element in failed_jobs:
                # skip those aborted for their dependencies
                if element[3][0] != constants.EXIT_NOTRUN:
                    job = failed_jobs_info[element[0]]
                    print('+ job:', job[0], ', status:', element[1],
                          ', exit:', element[3][0],
                          ', value:', element[3][1],
                          file=file)
                    print('  commandline:', file=file)
                    print(job[1], file=file)
            print('\n** Failed jobs outputs:\n', file=file)
            # log outputs
            for element in failed_jobs:
                # skip those aborted for their dependencies
                if element[3][0] != constants.EXIT_NOTRUN:
                    job = failed_jobs_info[element[0]]
                    ejob = self.get_engine_job(element[0])
                    if ejob.env:
                        env = dict(ejob.env)
                    else:
                        env = {}
                    print('+ job %d:' % element[0], job[0],
                          ', status:', element[1],
                          ', exit:', element[3][0],
                          ', value:', element[3][1],
                          file=file)
                    print(
                        '  =================================================')
                    print('  commandline:', file=file)
                    print('  ------------:', file=file)
                    print(job[1], file=file)
                    print('\n  input parameters:', file=file)
                    print('  -----------------', file=file)
                    print(repr(dict(self.updated_job_parameters(element[0]))),
                          file=file)
                    in_param_file = ejob.plain_input_params_file()
                    if in_param_file:
                        env['SOMAWF_INPUT_PARAMS'] = in_param_file
                    out_param_file = ejob.plain_output_params_file()
                    if out_param_file:
                        env['SOMAWF_OUTPUT_PARAMS'] = out_param_file
                    print('\n  output parameters:', file=file)
                    print('  ------------------', file=file)
                    print(self.get_job_output_params(element[0]), file=file)
                    print('\n  environment:', file=file)
                    print('  ------------', file=file)
                    print(env, file=file)
                    tmp_stdout = tempfile.mkstemp(prefix='swf_job_stdout_')
                    tmp_stderr = tempfile.mkstemp(prefix='swf_job_stderr_')
                    os.close(tmp_stdout[0])
                    os.close(tmp_stderr[0])
                    self.retrieve_job_stdouterr(element[0], tmp_stdout[1],
                                                tmp_stderr[1])
                    print('\n  standard output:', file=file)
                    print('  ----------------\n', file=file)
                    with open(tmp_stdout[1]) as f:
                        print(f.read(), file=file)
                    os.unlink(tmp_stdout[1])
                    print('\n  standard error:', file=file)
                    print('  ---------------\n', file=file)
                    with open(tmp_stderr[1]) as f:
                        print(f.read(), file=file)
                    os.unlink(tmp_stderr[1])
                    print(file=file)
            print('---- full host env ----', file=file)
            print(repr(os.environ))
        return workflow_status == constants.WORKFLOW_DONE \
            
and len(failed_jobs) == 0
    # FILE TRANSFER CONTROL #######################################
[docs]    def transfer_files(self, transfer_ids, buffer_size=512 ** 2):
        '''
        Transfer file(s) associated to the transfer_id.
        If the files are only located on the client side (that is the transfer
        status is constants.FILES_ON_CLIENT) the file(s) will be transfered
        from the client to the computing resource.
        If the files are located on the computing resource side (that is the
        transfer status is constants.FILES_ON_CR or
        constants.FILES_ON_CLIENT_AND_CR)
        the files will be transfered from the computing resource to the client.
        Parameters
        ----------
        transfer_id: FileTransfer identifier
        buffer_size: int
            Depending on the transfer method, the files can be transfered piece
            by piece. The size of each piece can be tuned using the buffer_size
            argument.
        Returns
        -------
        success: bool
            The transfer was done. (TBI right error management)
        Raises *UnknownObjectError* if the transfer_id is not valid
        '''
        # Raises *TransferError*
        if not isinstance(transfer_ids, six.string_types):
            for transfer_id in transfer_ids:
                self._transfer_file(transfer_id, buffer_size)
        else:
            self._transfer_file(transfer_ids, buffer_size) 
[docs]    def delete_transfer(self, transfer_id):
        '''
        Deletes the FileTransfer and the associated files and directories on
        the computing resource side. The transfer_id will become invalid and
        can not be used anymore. If some jobs reference the FileTransfer as an
        input or an output the FileTransfer will not be deleted immediately but
        as soon as these jobs will be deleted.
        Raises *UnknownObjectError* if the transfer_id is not valid
        '''
        self._engine_proxy.delete_transfer(transfer_id) 
    # PRIVATE #############################################
    def _initialize_transfer(self, transfer_id):
        '''
        Initializes the transfer and returns the transfer action information.
        Parameters
        ----------
        transfer_id: FileTransfer identifier
        Returns
        -------
        transfer: tuple
            transfer_type
            * (file_size, md5_hash) in the case of a file transfer
            * (cumulated_size, dictionary relative path -> (file_size,
              md5_hash)) in case of a directory transfer.
        Raises *UnknownObjectError* if the transfer_id is not valid
        '''
        (transfer_id,
         engine_path,
         client_path,
         expiration_date,
         workflow_id,
         client_paths,
         transfer_type,
         status) = self._engine_proxy.transfer_information(transfer_id)
        if status == constants.FILES_ON_CLIENT:
            if not client_paths:
                if os.path.isfile(client_path):
                    transfer_type = constants.TR_FILE_C_TO_CR
                    self._engine_proxy.set_transfer_status(
                        transfer_id, constants.TRANSFERING_FROM_CLIENT_TO_CR)
                    self._engine_proxy.set_transfer_type(transfer_id,
                                                         transfer_type)
                elif os.path.isdir(client_path):
                    transfer_type = constants.TR_DIR_C_TO_CR
                    self._engine_proxy.set_transfer_status(
                        transfer_id, constants.TRANSFERING_FROM_CLIENT_TO_CR)
                    self._engine_proxy.set_transfer_type(
                        transfer_id, constants.TR_DIR_C_TO_CR)
                else:
                    print("WARNING: The file or directory %s doesn't exist "
                          "on the client machine." % (client_path))
            else:  # client_paths
                for path in client_paths:
                    if not os.path.isfile(path) and not os.path.isdir(path):
                        print("WARNING: The file or directory %s doesn't "
                              "exist on the client machine." % (path))
                transfer_type = constants.TR_MFF_C_TO_CR
            self._engine_proxy.set_transfer_status(
                transfer_id, constants.TRANSFERING_FROM_CLIENT_TO_CR)
            self._engine_proxy.set_transfer_type(transfer_id,
                                                 transfer_type)
            return transfer_type
        elif status == constants.FILES_ON_CR \
                
or status == constants.FILES_ON_CLIENT_AND_CR:
            # transfer_type = self._engine_proxy.init_transfer_from_cr(transfer_id,
                                                   # client_path,
                                                   # expiration_date,
                                                   # workflow_id,
                                                   # client_paths,
                                                   # status)
            if not client_paths:
                if self._engine_proxy.is_file(engine_path):
                    transfer_type = constants.TR_FILE_CR_TO_C
                elif self._engine_proxy.is_dir(engine_path):
                    transfer_type = constants.TR_DIR_CR_TO_C
                else:
                    print("WARNING: The file or directory %s doesn't exist "
                          "on the computing resource side." % (engine_path))
            else:  # client_paths
                engine_dir = os.path.dirname(engine_path)
                for path in client_paths:
                    relative_path = os.path.basename(path)
                    r_path = posixpath.join(engine_dir, relative_path)
                    if not self._engine_proxy.is_file(r_path) and \
                       
not self._engine_proxy.is_dir(r_path):
                        print("WARNING: The file or directory %s doesn't "
                              "exist on the computing resource side."
                              % (r_path))
                transfer_type = constants.TR_MFF_CR_TO_C
            self._engine_proxy.set_transfer_status(
                transfer_id, constants.TRANSFERING_FROM_CR_TO_CLIENT)
            self._engine_proxy.set_transfer_type(transfer_id,
                                                 transfer_type)
            return transfer_type
    def _transfer_file(self, transfer_id, buffer_size):
        (transfer_id,
         engine_path,
         client_path,
         expiration_date,
         workflow_id,
         client_paths,
         transfer_type,
         status) = self._engine_proxy.transfer_information(transfer_id)
        if status == constants.FILES_ON_CLIENT or \
           
status == constants.TRANSFERING_FROM_CLIENT_TO_CR:
            # transfer from client to computing resource
            # overwrite = False
            # if not transfer_type or \
             # transfer_type == constants.TR_FILE_CR_TO_C or \
             # transfer_type == constants.TR_DIR_CR_TO_C or \
             # transfer_type == constants.TR_MFF_CR_TO_C:
                # transfer reset
                # overwrite = True
            transfer_type = self._initialize_transfer(transfer_id)
            remote_path = engine_path
            if transfer_type == constants.TR_FILE_C_TO_CR or \
               
transfer_type == constants.TR_DIR_C_TO_CR:
                self._transfer.transfer_to_remote(client_path,
                                                  remote_path)
                self._engine_proxy.set_transfer_status(
                    transfer_id, constants.FILES_ON_CLIENT_AND_CR)
                self._engine_proxy.signalTransferEnded(
                    transfer_id, workflow_id)
                return True
            if transfer_type == constants.TR_MFF_C_TO_CR:
                for path in client_paths:
                    relative_path = os.path.basename(path)
                    r_path = posixpath.join(remote_path, relative_path)
                    self._transfer.transfer_to_remote(path,
                                                      r_path)
                self._engine_proxy.set_transfer_status(
                    transfer_id, constants.FILES_ON_CLIENT_AND_CR)
                self._engine_proxy.signalTransferEnded(
                    transfer_id, workflow_id)
                return True
        if status == constants.FILES_ON_CR or \
           
status == constants.TRANSFERING_FROM_CR_TO_CLIENT or \
           
status == constants.FILES_ON_CLIENT_AND_CR:
            # transfer from computing resource to client
            # overwrite = False
            # if not transfer_type or \
             # transfer_type == constants.TR_FILE_C_TO_CR or \
             # transfer_type == constants.TR_DIR_C_TO_CR or \
             # transfer_type == constants.TR_MFF_C_TO_CR :
                # TBI remove existing files
                # overwrite = True
            transfer_type = self._initialize_transfer(transfer_id)
            remote_path = engine_path
            if transfer_type == constants.TR_FILE_CR_TO_C or \
               
transfer_type == constants.TR_DIR_CR_TO_C:
                # file case
                self._transfer.transfer_from_remote(remote_path,
                                                    client_path)
                self._engine_proxy.set_transfer_status(
                    transfer_id, constants.FILES_ON_CLIENT_AND_CR)
                self._engine_proxy.signalTransferEnded(
                    transfer_id, workflow_id)
                return True
            if transfer_type == constants.TR_MFF_CR_TO_C:
                for path in client_paths:
                    relative_path = os.path.basename(path)
                    r_path = posixpath.join(remote_path, relative_path)
                    self._transfer.transfer_from_remote(r_path,
                                                        path)
                self._engine_proxy.set_transfer_status(
                    transfer_id, constants.FILES_ON_CLIENT_AND_CR)
                self._engine_proxy.signalTransferEnded(
                    transfer_id, workflow_id)
                return True
        return False
    def _transfer_progression(self,
                              status,
                              transfer_type,
                              client_path,
                              client_paths,
                              engine_path):
        if status == constants.TRANSFERING_FROM_CLIENT_TO_CR:
            if transfer_type == constants.TR_MFF_C_TO_CR:
                data_size = 0
                data_transfered = 0
                for path in client_paths:
                    relative_path = os.path.basename(path)
                    r_path = posixpath.join(engine_path, relative_path)
                    (ds, dt) \
                        
= self._transfer_monitoring.transfer_to_remote_progression(
                            path, r_path)
                    data_size = data_size + ds
                    data_transfered = data_transfered + dt
                progression = (data_size, data_transfered)
            else:
                progression \
                    
= self._transfer_monitoring.transfer_to_remote_progression(
                        client_path, engine_path)
        elif status == constants.TRANSFERING_FROM_CR_TO_CLIENT:
            if transfer_type == constants.TR_MFF_CR_TO_C:
                data_size = 0
                data_transfered = 0
                for path in client_paths:
                    relative_path = os.path.basename(path)
                    r_path = posixpath.join(engine_path, relative_path)
                    (ds,
                     dt) = self._transfer_monitoring.transfer_from_remote_progression(r_path,
                                                                                      path)
                    data_size = data_size + ds
                    data_transfered = data_transfered + dt
                progression = (data_size, data_transfered)
            else:
                progression = self._transfer_monitoring.transfer_from_remote_progression(
                    engine_path,
                    client_path)
        else:
            progression = (100, 100)
        return progression 
def _embedded_engine_and_server(config):
    '''
    Creates the workflow engine and workflow database server in the client
    process.
    The client process can not finish before the workflows and jobs are done.
    Using serveral client process simultaneously (thus several database server
    with the same database file) can cause error (notably database locked
    problems)
    Parameters
    ----------
    config: configuration.Configuration
    Returns
    -------
    engine: WorkflowEngine
    '''
    import logging
    from soma_workflow.engine import WorkflowEngine, ConfiguredWorkflowEngine
    from soma_workflow.database_server import WorkflowDatabaseServer
    # configure logging
    log_config = {'version': 1}
    (engine_log_dir,
     engine_log_format,
     engine_log_level) = config.get_engine_log_info()
    if engine_log_dir:
        logfilepath = os.path.join(
            os.path.abspath(engine_log_dir), "log_light_mode")
        log_config['loggers'] = {
            'engine': {
                'level': eval("logging." + engine_log_level),
                'handlers': ['engine'],
                'propagate': False,
            }
        }
        log_config['handlers'] = {
            'engine': {
                'class': 'logging.FileHandler',
                'filename': logfilepath,
                'level': eval("logging." + engine_log_level),
                'formatter': 'engine',
            }
        }
        log_config['formatters'] = {
            'engine': {
                'format': engine_log_format,
            }
        }
    (server_log_file,
     server_log_format,
     server_log_level) = config.get_server_log_info()
    if server_log_file:
        log_config.setdefault('loggers', {})['jobServer'] = {
            'level': eval("logging." + server_log_level),
            'handlers': ['jobServer'],
            'propagate': False,
        }
        log_config.setdefault('handlers', {})['jobServer'] = {
            'class': 'logging.FileHandler',
            'filename': server_log_file,
            'level': eval("logging." + server_log_level),
            'formatter': 'jobServer',
        }
        log_config.setdefault('formatters', {})['jobServer'] = {
            'format': server_log_format,
        }
        if not os.path.exists(os.path.dirname(server_log_file)):
            os.makedirs(os.path.dirname(server_log_file))
    import logging.config
    logging.config.dictConfig(log_config)
    if engine_log_dir:
        logger = logging.getLogger('engine')
        logger.info(" ")
        logger.info("****************************************************")
        logger.info("****************************************************")
    if server_log_file:
        logger = logging.getLogger('jobServer')
        logger.info(" ")
        logger.info("****************************************************")
        logger.info("****************************************************")
    # database server
    database_server = WorkflowDatabaseServer(config.get_database_file(),
                                             config.get_transfered_file_dir(),
                                             remove_orphan_files=config.get_remove_orphan_files())
    sch = scheduler.build_scheduler(config.get_scheduler_type(), config)
    workflow_engine = ConfiguredWorkflowEngine(database_server,
                                               sch,
                                               config)
    return workflow_engine
class Helper(object):
    def __init__(self):
        pass
[docs]    @staticmethod
    def list_failed_jobs(workflow_id,
                         wf_ctrl,
                         include_aborted_jobs=False,
                         include_user_killed_jobs=False,
                         include_statuses=None):
        '''
        To spot the problematic jobs in a workflow.
        Parameters
        ----------
        workflow_id: workflow identifier
        include_aborted_jobs: bool
            Include the jobs which exit status is constants.EXIT_ABORTED
            and constants.EXIT_NOTRUN
        include_user_killed_jobs: bool
            Include the jobs which exit status is constants.USER_KILLED
        include_statuses: sequence or None
            Get failed jobs with exit status in this list/set. Ignore
            ``include_aborted_jobs`` and ``include_user_killed_jobs``
            parameters.
        Returns
        -------
        jobs: list of job identifier
            Returns the list of id of job which status is constants.FAILED
            or which exit value is not 0.
        '''
        (jobs_info,
         transfers_info,
         workflow_status,
         workflow_queue,
         transfers_temp_info) = wf_ctrl.workflow_elements_status(
            workflow_id, with_drms_id=True)
        failed_job_ids = []
        if include_statuses is None:
            include_statuses = set(constants.JOB_EXIT_STATUS)
            if not include_user_killed_jobs:
                include_statuses.remove(constants.USER_KILLED)
            if not include_aborted_jobs:
                include_statuses.remove(constants.EXIT_ABORTED)
                include_statuses.remove(constants.EXIT_NOTRUN)
        for (job_id, status, queue, exit_info, dates, drmaa_id) in jobs_info:
            if(status == constants.DONE and exit_info[1] != 0) or \
                    
(status == constants.FAILED and
                     exit_info[0] in include_statuses):
                failed_job_ids.append(job_id)
        return failed_job_ids 
[docs]    @staticmethod
    def delete_all_workflows(wf_ctrl, force=True):
        '''
        Delete all the workflows.
        If force is set to True: the call will block until the workflows are
        deleted. With force set to True, if a workflow can not be deleted
        properly it is deleted from Soma-workflow database. However, if some
        jobs are still running they will not be killed. In this case the return
        value is False.
        Parameters
        ----------
        wf_ctrl: client.WorkflowController
        force: bool
        Returns
        -------
        success: bool
        '''
        deleted_properly = True
        while wf_ctrl.workflows():
            wf_id = next(iter(wf_ctrl.workflows().keys()))
            deleted_properly = deleted_properly and wf_ctrl.delete_workflow(
                wf_id, force)
        return deleted_properly 
[docs]    @staticmethod
    def wait_workflow(workflow_id,
                      wf_ctrl):
        '''
        Waits for workflow execution to end.
        Parameters
        ----------
        workflow_id: workflow identifier
        wf_ctrl: client.WorkflowController
        '''
        wf_ctrl.wait_workflow(workflow_id) 
[docs]    @staticmethod
    def transfer_output_files(workflow_id,
                              wf_ctrl,
                              buffer_size=512 ** 2):
        '''
        Transfers all the output files of a workflow which are ready to
        transfer.
        Parameters
        ----------
        workflow_id: workflow identifier
        wf_ctrl: client.WorkflowController
        buffer_size: int
            Depending on the transfer method, the files can be transfered piece
            by piece. The size of each piece can be tuned using the buffer_size
            argument.
        '''
        transfer_info = None
        wf_elements_status = wf_ctrl.workflow_elements_status(workflow_id)
        to_transfer = []
        for transfer_info in wf_elements_status[1]:
            status = transfer_info[1][0]
            if status == constants.FILES_ON_CR:
                engine_path = transfer_info[0]
                to_transfer.append(engine_path)
            if status == constants.TRANSFERING_FROM_CR_TO_CLIENT:
                engine_path = transfer_info[0]
                to_transfer.append(repr(engine_path))
        wf_ctrl.transfer_files(to_transfer, buffer_size) 
[docs]    @staticmethod
    def serialize(file_path, workflow):
        '''
        Saves a workflow to a file.
        Uses JSON format.
        Raises *SerializationError* in case of failure
        Parameters
        ----------
        file_path: str
        workflow: client.Workflow
        '''
        from soma_workflow import utils
        try:
            file = open(file_path, "w")
            workflow_dict = workflow.to_dict()
            json.dump(utils.to_json(workflow_dict), file, indent=4)
            file.close()
        except Exception as e:
            six.reraise(SerializationError,
                        SerializationError("%s: %s" % (type(e), e)),
                        sys.exc_info()[2]) 
[docs]    @staticmethod
    def unserialize(file_path):
        '''
        Loads a workflow from a file.
        Opens JSON format or pickle (see the method:
        Helper.convert_wf_file_for_p2_5).
        Parameters
        ----------
        file_path: str
        Returns
        -------
        workflow: client.Workflow
        Raises *SerializationError* in case of failure
        '''
        from soma_workflow import utils
        try:
            file = open(file_path, "r")
        except Exception as e:
            raise SerializationError("%s: %s" % (type(e), e))
        workflow = None
        try:
            dict_from_json = utils.from_json(json.load(file))
        except ValueError as e:
            pass
        else:
            workflow = Workflow.from_dict(dict_from_json)
        if not workflow:
            file.close()
            file = open(file_path, "r")
            try:
                workflow = pickle.load(file)
            except Exception as e:
                raise SerializationError("%s: %s" % (type(e), e))
        try:
            file.close()
        except Exception as e:
            raise SerializationError("%s: %s" % (type(e), e))
        # compatibility with version 2.2 and previous
        for job in workflow.jobs:
            if not hasattr(job, "native_specification"):
                job.native_specification = None
        return workflow 
[docs]    @staticmethod
    def convert_wf_file_for_p2_5(origin_file_path, target_file_path):
        '''
        This method requires Python >= 2.6.
        It converts a workflow file created using Python >= 2.6 to workflow
        file usable in Python 2.5.
        '''
        from soma_workflow import utils
        try:
            o_file = open(origin_file_path, "r")
            dict_from_json = utils.from_json(json.load(o_file))
            workflow = Workflow.from_dict(dict_from_json)
            o_file.close()
            t_file = open(target_file_path, "w")
            pickle.dump(workflow, t_file)
            t_file.close()
        except Exception as e:
            SerializationError("%s: %s" % (type(e), e)) 
    @staticmethod
    def cpu_count():
        """
        Detects the number of CPUs on a system.
        """
        return configuration.cpu_count()