# -*- coding: utf-8 -*-
'''
author: Soizic Laguitton
organization: I2BM, Neurospin, Gif-sur-Yvette, France
organization: CATI, France
organization: U{IFR 49
license: CeCILL version: 2 http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
'''
#-------------------------------------------------------------------------------
# Imports
#-------------------------------------------------------------------------
from __future__ import print_function
from __future__ import absolute_import
import warnings
import sys
import soma_workflow.constants as constants
import re
import importlib
# python2/3 compatibility
import six
from six.moves import range
#-------------------------------------------------------------------------------
# Classes and functions
#-------------------------------------------------------------------------
[docs]class Job(object):
    '''
    Job representation.
    .. note::
      The command is the only argument required to create a Job.
      It is also useful to fill the job name for the workflow display in the GUI.
    **Parallel jobs**
    When a job is designed to run on multiple processors, cluster managements systems normally do the necessary work to run or duplicate the job processes on multiple computing nodes. There are basically 3 classical ways to do it:
      * use MPI (whatever implementation): job commands are run through a launcher program (``mpirun``) which will run the processes and establish inter-process communications.
      * use OpenMP: this threading-based system allows to use several cores on the same computing node (using shared memory). The OpenMP allows to use the required nuber of threads.
      * manual threading or forking ("native" mode).
    In all cases one job runs on several processors/cores. The MPI variant additionally allows to run the same job on several computing nodes (which do not share memory), the others should run on the same node (as far as I know - I'm not an expert of OpenMP). The job specifications should then precise which kind of parallelism they are using, the number of nodes the job should run on, and the number of CPU cores which should be allocated on each node. Thus the **parallel_job_info** variable of a job is a dictionary giving these 3 information, under the respective keys `config_name`, `nodes_number` and `cpu_per_node`. In OpenMP and native modes, the nodes_number should be 1.
    Attributes
    ----------
    command: sequence of string or/and FileTransfer or/and SharedResourcePath or/and TemporaryPath or/and tuple (FileTransfer, relative_path) or/and sequence of FileTransfer or/and sequence of SharedResourcePath or/and sequence of tuple (FileTransfer, relative_path)
        The command to execute. It can not be empty. In case of a shared file system
        the command is a sequence of string.
        In the other cases, the FileTransfer, SharedResourcePath, and TemporaryPath
        objects will be replaced by the appropriate path before the job execution.
        The tuples (FileTransfer, relative_path) can be used to refer to a file in a
        transfered directory.
        The sequences of FileTransfer, SharedResourcePath or tuple (FileTransfer,
        relative_path) will be replaced by the string "['path1', 'path2', 'path3']"
        before the job execution. The FileTransfer, SharedResourcePath or tuple
        (FileTransfer, relative_path) are replaced by the appropriate path inside
        the sequence.
    name: string
        Name of the Job which will be displayed in the GUI
    referenced_input_files: sequence of SpecialPath (FileTransfer, TemporaryPath...)
        List of the FileTransfer which are input of the Job. In other words,
        FileTransfer which are requiered by the Job to run. It includes the
        stdin if you use one.
    referenced_output_files: sequence of SpecialPath (FileTransfer, TemporaryPath...)
        List of the FileTransfer which are output of the Job. In other words,
        the FileTransfer which will be created or modified by the Job.
    stdin: string or FileTransfer or SharedResourcePath
        Path to the file which will be read as input stream by the Job.
    join_stderrout: boolean
        Specifies whether the error stream should be mixed with the output stream.
    stdout_file: string or FileTransfer or SharedResourcePath
        Path of the file where the standard output stream of the job will be
        redirected.
    stderr_file: string or FileTransfer or SharedResourcePath
        Path of the file where the standard error stream of the job will be
        redirected.
        .. note::
          Set stdout_file and stderr_file only if you need to redirect the
          standard output to a specific file. Indeed, even if they are not set
          the standard outputs will always be available through the
          WorklfowController API.
    working_directory: string or FileTransfer or SharedResourcePath
        Path of the directory where the job will be executed. The working directory
        is useful if your Job uses relative file path for example.
    priority: int
        Job priority: 0 = low priority. If several Jobs are ready to run at the
        same time the jobs with higher priority will be submitted first.
    native_specification: string
        Some specific option/function of the computing resource you want to use
        might not be available among the list of Soma-workflow Job attributes.
        Use the native specification attribute to use these specific functionalities.
        If a native_specification is defined here, the configured native
        specification will be ignored (documentation configuration item: NATIVE_SPECIFICATION).
        *Example:* Specification of a job walltime and more:
        * using a PBS cluster: native_specification="-l walltime=10:00:00,pmem=16gb"
        * using a SGE cluster: native_specification="-l h_rt=10:00:00"
    parallel_job_info: dict
        The parallel job information must be set if the Job is parallel (ie. made to
        run on several CPU).
        The parallel job information is a dict, with the following supported items:
        * config_name: name of the configuration (native, MPI, OpenMP)
        * nodes_number: number of computing nodes used by the Job,
        * cpu_per_node: number of CPU or cores needed for each node
        The configuration name is the type of parallel Job. Example: MPI or OpenMP.
        .. warning::
          The computing resources must be configured explicitly to use this feature.
    user_storage: picklable object
        Should have been any small and picklable object for user need but was never
        fully implemented. This parameter is simply ignored.
    env: dict(string, string)
        Environment variables to use when the job gets executed.
    param_dict: dict
        New in 3.1.
        Optional dictionary for job "parameters values". In case of dynamic
        outputs from a job, downstream jobjs values have to be set accordingly
        during the workflow execution. Thus we must be able to know how to
        replace the parameters values in the commandline. To do so, jobs should
        provide commandlines not with builtin values, but with replacement
        strings, and a dict of parameters with names::
            command = ['cp', '%(source)s', '%(dest)s']
            param_dict = {'source': '/data/file1.txt',
                          'dest': '/data/file2.txt'}
        Parameters names can be linked in the workflow to some other jobs
        outputs.
    use_input_params_file: bool
        if True, input parameters from the param_dict will not be passed using
        substitutions in the commandline, but through a JSON file.
    has_outputs: bool
        New in 3.1.
        Set if the job will write a special JSON file which contains output
        parameters values, when the job is a process with outputs.
    input_params_file: string or FileTransfer or SharedResourcePath
        Path to the file which will contain input parameters of the
        job.
    output_params_file: string or FileTransfer or SharedResourcePath
        Path to the file which will be written for output parameters of the
        job.
    disposal_timeout: int
        Only requiered outside of a workflow
    '''
    # sequence of sequence of string or/and FileTransfer or/and
    # SharedResourcePath or/and tuple (relative_path, FileTransfer) or/and
    # sequence of FileTransfer or/and sequence of SharedResourcePath or/and
    # sequence of tuple (relative_path, FileTransfers.)
    command = None
    # string
    name = None
    # sequence of FileTransfer
    referenced_input_files = None
    # sequence of FileTransfer
    referenced_output_files = None
    # string (path)
    stdin = None
    # boolean
    join_stderrout = None
    # string (path)
    stdout_file = None
    # string (path)
    stderr_file = None
    # string (path)
    working_directory = None
    # int
    priority = None
    # string
    native_specification = None
    # tuple(string, int)
    parallel_job_info = None
    # int (in hours)
    disposal_timeout = None
    # any small and picklable object needed by the user
    user_storage = None
    # dict (name -> value)
    env = None
    # dict (dst_job -> dict(dst_param_name: (src_job, src_param_name))
    param_dict = {}
    # bool
    use_input_params_file = False
    # bool
    has_outputs = False
    # string (path)
    input_params_file = None
    # string (path)
    output_params_file = None
    # dict (config options)
    configuration = {}
    def __init__(self,
                 command,
                 referenced_input_files=None,
                 referenced_output_files=None,
                 stdin=None,
                 join_stderrout=False,
                 disposal_timeout=168,
                 name=None,
                 stdout_file=None,
                 stderr_file=None,
                 working_directory=None,
                 parallel_job_info=None,
                 priority=0,
                 native_specification=None,
                 user_storage=None,
                 env=None,
                 param_dict=None,
                 use_input_params_file=False,
                 has_outputs=False,
                 input_params_file=None,
                 output_params_file=None,
                 configuration={}):
        if not name and len(command) != 0:
            self.name = command[0]
        else:
            self.name = name
        self.command = command
        if referenced_input_files:
            self.referenced_input_files = referenced_input_files
        else:
            self.referenced_input_files = []
        if referenced_output_files:
            self.referenced_output_files = referenced_output_files
        else:
            self.referenced_output_files = []
        self.stdin = stdin
        self.join_stderrout = join_stderrout
        self.disposal_timeout = disposal_timeout
        self.stdout_file = stdout_file
        self.stderr_file = stderr_file
        self.working_directory = working_directory
        self.parallel_job_info = parallel_job_info
        self.priority = priority
        self.native_specification = native_specification
        self.env = env
        self.param_dict = param_dict or dict()
        self.use_input_params_file = use_input_params_file
        self.has_outputs = has_outputs
        self.input_params_file = input_params_file
        self.output_params_file = output_params_file
        self.configuration = configuration
        # this deson't seem to be really hamful.
        # for command_elem in self.command:
        #     if isinstance(command_elem, six.string_types):
        #         if "'" in command_elem:
        #             warnings.warn("%s contains single quote. It could fail using DRMAA"
        #                           % command_elem, UserWarning)
    def _attributes_equal(self, element, other_element):
        if element.__class__ is not other_element.__class__:
            # special case str / unicode
            if not isinstance(element, six.string_types) or not isinstance(element, six.string_types):
                # print('differ in class:', element.__class__,
                # other_element.__class__)
                return False
        if isinstance(element, FileTransfer) or \
            
isinstance(element, SharedResourcePath) or \
                
isinstance(element, TemporaryPath) or \
                
isinstance(element, OptionPath):
            return element.attributs_equal(other_element)
        if isinstance(element, (list, tuple)):
            if len(element) != len(other_element):
                # print('len differ:', len(element), '!=', len(other_element))
                return False
            for i in range(0, len(element)):
                if not self._attributes_equal(element[i], other_element[i]):
                    # print('list element differ:', element[i], '!=',
                    # other_element[i])
                    return False
            return True
        if isinstance(element, dict):
            if sorted(element.keys()) != sorted(other_element.keys()):
                return False
            for key, item in six.iteritems(element):
                other_item = other_element[key]
                if not self._attributes_equal(item, other_item):
                    return False
            return True
        return element == other_element
    def get_commandline(self):
        return self.commandline_repl(self.command)
[docs]    def commandline_repl(self, command):
        '''
        Get "processed" commandline list. Each element in the commandline list
        which contains a replacement string in the shame %(var)s is replaced
        using the param_dict values.
        '''
        # return [x % self.param_dict for x in self.command]
        if not self.param_dict:
            return command
        cmd = []
        r = re.compile('%\((.+)\)[dsf]')
        for e in command:
            if isinstance(e, (list, tuple)):
                cmd.append(self.commandline_repl(e))
            else:
                m = r.split(e)
                t = False
                for i in range(int(len(m) / 2)):
                    var = m[i * 2 + 1]
                    value = self.param_dict.get(var)
                    if value is not None:
                        t = True
                    else:
                        value = '%%(%s)s' % var
                    m[i * 2 + 1] = value
                if not t:
                    cmd.append(e)
                else:
                    if len(m) > 3 or m[0] != '' or m[2] != '':
                        # if m[0] == '':
                            # m = m[1:]
                        # if m[-1] == '':
                            # m = m[:-1]
                        # WARNING: returns a string, losing
                        # SpecialPath instances
                        cmd.append(''.join(m))
                    else:
                        cmd.append(m[1])
        return cmd 
    def attributs_equal(self, other):
        # TODO a better solution would be to overload __eq__ and __neq__ operator
        # however these operators are used in soma-workflow to test if
        # two objects are the same instance. These tests have to be replaced
        # first using the id python function.
        if other.__class__ is not self.__class__:
            return False
        attributes = [
            "name",
            "input_params_file",
            "has_outputs",
            "stdin",
            "join_stderrout",
            "stdout_file",
            "stderr_file",
            "working_directory",
            "priority",
            "native_specification",
            "parallel_job_info",
            "disposal_timeout",
            "env",
            "command",
            "referenced_input_files",
            "referenced_output_files",
            "param_dict",
            "input_params_file",
            "output_params_file",
            "configuration",
        ]
        for attr_name in attributes:
            attr = getattr(self, attr_name)
            other_attr = getattr(other, attr_name)
            if not self._attributes_equal(attr, other_attr):
                # print('differ in:', attr_name)
                # print(attr, '!=', other_attr)
                return False
        return True
[docs]    @classmethod
    def from_dict(cls,
                  d,
                  tr_from_ids,
                  srp_from_ids,
                  tmp_from_ids,
                  opt_from_ids):
        '''
         * d *dictionary*
         * tr_from_id *id -> FileTransfer*
         * srp_from_id *id -> SharedResourcePath*
         * tmp_from_ids *id -> TemporaryPath*
         * opt_from_ids *id -> OptionPath*
        '''
        job = cls(command=d["command"],
                  configuration=d.get("configuration", {}))
        for key, value in six.iteritems(d):
            setattr(job, key, value)
        new_command = list_from_serializable(job.command,
                                             tr_from_ids,
                                             srp_from_ids,
                                             tmp_from_ids,
                                             opt_from_ids)
        job.command = new_command
        if job.referenced_input_files:
            ref_in_files = list_from_serializable(job.referenced_input_files,
                                                  tr_from_ids,
                                                  srp_from_ids,
                                                  tmp_from_ids,
                                                  opt_from_ids)
            job.referenced_input_files = ref_in_files
        if job.referenced_output_files:
            ref_out_files = list_from_serializable(job.referenced_output_files,
                                                   tr_from_ids,
                                                   srp_from_ids,
                                                   tmp_from_ids,
                                                   opt_from_ids)
            job.referenced_output_files = ref_out_files
        if job.stdin:
            job.stdin = from_serializable(job.stdin,
                                          tr_from_ids,
                                          srp_from_ids,
                                          tmp_from_ids,
                                          opt_from_ids)
        if job.stdout_file:
            job.stdout_file = from_serializable(job.stdout_file,
                                                tr_from_ids,
                                                srp_from_ids,
                                                tmp_from_ids,
                                                opt_from_ids)
        if job.stderr_file:
            job.stderr_file = from_serializable(job.stderr_file,
                                                tr_from_ids,
                                                srp_from_ids,
                                                tmp_from_ids,
                                                opt_from_ids)
        if job.working_directory:
            job.working_directory = from_serializable(job.working_directory,
                                                      tr_from_ids,
                                                      srp_from_ids,
                                                      tmp_from_ids,
                                                      opt_from_ids)
        if job.output_params_file:
            job.output_params_file = from_serializable(job.output_params_file,
                                                       tr_from_ids,
                                                       srp_from_ids,
                                                       tmp_from_ids,
                                                       opt_from_ids)
        job.param_dict = {}
        for k, v in six.iteritems(d.get('param_dict', {})):
            job.param_dict[k] = from_serializable(v, tr_from_ids,
                                                  srp_from_ids,
                                                  tmp_from_ids,
                                                  opt_from_ids)
        return job 
[docs]    def to_dict(self,
                id_generator,
                transfer_ids,
                shared_res_path_id,
                tmp_ids,
                opt_ids):
        '''
        * id_generator *IdGenerator*
        * transfer_ids *dict: client.FileTransfer -> int*
            This dictonary will be modified.
        * shared_res_path_id *dict: client.SharedResourcePath -> int*
            This dictonary will be modified.
        * tmp_ids *dict: client.TemporaryPath -> int*
        * opt_ids *dict: client.OptionPath -> int*
        '''
        job_dict = {}
        attributes = [
            "name",
            "join_stderrout",
            "priority",
            "native_specification",
            "parallel_job_info",
            "disposal_timeout",
            "env",
            "use_input_params_file",
            "has_outputs",
            "configuration",
            "uuid",
        ]
        job_dict["class"] = '%s.%s' % (self.__class__.__module__,
                                       self.__class__.__name__)
        for attr_name in attributes:
            if hasattr(self, attr_name):
                job_dict[attr_name] = getattr(self, attr_name)
        # command, referenced_input_files, referenced_output_files
        # stdin, stdout_file, stderr_file and working_directory
        # can contain FileTransfer et SharedResourcePath.
        ser_command = list_to_serializable(self.command,
                                           id_generator,
                                           transfer_ids,
                                           shared_res_path_id,
                                           tmp_ids,
                                           opt_ids)
        job_dict['command'] = ser_command
        if self.referenced_input_files:
            ser_ref_in_files = list_to_serializable(
                self.referenced_input_files,
                id_generator,
                transfer_ids,
                shared_res_path_id,
                tmp_ids,
                opt_ids)
            job_dict['referenced_input_files'] = ser_ref_in_files
        if self.referenced_output_files:
            ser_ref_out_files = list_to_serializable(
                self.referenced_output_files,
                id_generator,
                transfer_ids,
                shared_res_path_id,
                tmp_ids,
                opt_ids)
            job_dict['referenced_output_files'] = ser_ref_out_files
        if self.stdin:
            job_dict['stdin'] = to_serializable(self.stdin,
                                                id_generator,
                                                transfer_ids,
                                                shared_res_path_id,
                                                tmp_ids,
                                                opt_ids)
        if self.stdout_file:
            job_dict['stdout_file'] = to_serializable(self.stdout_file,
                                                      id_generator,
                                                      transfer_ids,
                                                      shared_res_path_id,
                                                      tmp_ids,
                                                      opt_ids)
        if self.stderr_file:
            job_dict['stderr_file'] = to_serializable(self.stderr_file,
                                                      id_generator,
                                                      transfer_ids,
                                                      shared_res_path_id,
                                                      tmp_ids,
                                                      opt_ids)
        if self.input_params_file:
            job_dict['input_params_file'] \
                
= to_serializable(self.input_params_file,
                                  id_generator,
                                  transfer_ids,
                                  shared_res_path_id,
                                  tmp_ids,
                                  opt_ids)
        if self.output_params_file:
            job_dict['output_params_file'] \
                
= to_serializable(self.output_params_file,
                                  id_generator,
                                  transfer_ids,
                                  shared_res_path_id,
                                  tmp_ids,
                                  opt_ids)
        if self.working_directory:
            job_dict[
                'working_directory'] = to_serializable(self.working_directory,
                                                       id_generator,
                                                       transfer_ids,
                                                       shared_res_path_id,
                                                       tmp_ids,
                                                       opt_ids)
        if self.param_dict:
            param_dict = {}
            for k, v in six.iteritems(self.param_dict):
                param_dict[k] = to_serializable(v, id_generator,
                                                transfer_ids,
                                                shared_res_path_id,
                                                tmp_ids,
                                                opt_ids)
            job_dict['param_dict'] = param_dict
        return job_dict 
    def __getstate__(self):
        # filter out some instance attributes which should / can not be pickled
        no_picke = getattr(self, '_do_not_pickle', None)
        state_dict = self.__dict__
        if not no_picke:
            return state_dict
        copied = False
        for attribute in no_picke:
            if hasattr(self, attribute):
                if not copied:
                    state_dict = dict(state_dict)
                    copied = True
                del state_dict[attribute]
        return state_dict 
[docs]class EngineExecutionJob(Job):
    '''
    EngineExecutionJob: a lightweight job which will not run as a "real" job,
    but as a python function, on the engine server side.
    Such jobs are meant to perform fast, simple operations on their inputs in
    order to produce modified inputs for other downstream jobs, such as string
    substituitons, lists manipulations, etc. As they will run in the engine
    process (generally the jobs submission machine) they should not perform
    expensive processing (CPU or memory-consuming).
    They are an alternative to link functions in Workflows.
    The only method an EngineExecutionJob defines is :meth:`engine_execution`,
    which will be able to use its parameters dict (as defined in its param_dict
    as any other job), and will return an output parameters dict.
    Warning: the :meth:`engine_execution` is actually a **class method**, not a
    regular instance method. The reason for this is that it will be used with
    an :class:`~soma_workflow.engine_types.EngineJob` instance, which inherits
    :class:`Job`, but not the exact subclass. Thus in the method, ``self`` is
    not a real instance of the class.
    The default implementation just passes its input parameters as outputs in
    order to allow later jobs to reuse their parameters. Subclasses define
    their own :meth:`engine_execution` methods.
    See :ref:`engine_execution_job` for more details.
    '''
    @classmethod
    def engine_execution(cls, self):
        output_dict = dict(self.param_dict)
        return output_dict 
[docs]class BarrierJob(EngineExecutionJob):
    '''
    Barrier job: it is a "fake" job which does nothing (and will not become a
    real job on the DRMS) but has dependencies.
    It may be used to make a dependencies hub, to avoid too many dependencies
    with fully connected jobs sets.
    BarrierJob is implemented as an EngineExecutionJob, and just differs in its
    name, as its :meth:`~EngineExecutionJob.engine_execution` method does
    nothing.
    Ex:
      (Job1, Job2, Job3) should be all connected to (Job4, Job5, Job6)
      needs 3*3 = 9 (N^2) dependencies.
      With a barrier: ::
        Job1              Job4
              \         /
        Job2 -- Barrier -- Job5
              /         \.
        Job3              Job6
      needs 6 (2*N).
    BarrierJob constructor accepts only a subset of Job constructor parameter:
    **referenced_input_files**
    **referenced_output_files**
    **name**
    '''
    def __init__(self,
                 command=[],
                 referenced_input_files=None,
                 referenced_output_files=None,
                 name=None):
        super(BarrierJob, self).__init__(command=[],
                                         referenced_input_files=referenced_input_files,
                                         referenced_output_files=referenced_output_files,
                                         name=name)
[docs]    @classmethod
    def from_dict(cls,
                  d,
                  tr_from_ids,
                  srp_from_ids,
                  tmp_from_ids,
                  opt_from_ids):
        '''
         * d *dictionary*
         * tr_from_id *id -> FileTransfer*
         * srp_from_id *id -> SharedResourcePath*
         * tmp_from_ids *id -> TemporaryPath*
         * opt_from_ids *id -> OptionPath*
        '''
        job = cls()
        for key, value in six.iteritems(d):
            setattr(job, key, value)
        if job.referenced_input_files:
            ref_in_files = list_from_serializable(job.referenced_input_files,
                                                  tr_from_ids,
                                                  srp_from_ids,
                                                  tmp_from_ids,
                                                  opt_from_ids)
            job.referenced_input_files = ref_in_files
        if job.referenced_output_files:
            ref_out_files = list_from_serializable(job.referenced_output_files,
                                                   tr_from_ids,
                                                   srp_from_ids,
                                                   tmp_from_ids,
                                                   opt_from_ids)
            job.referenced_output_files = ref_out_files
        return job 
[docs]    def to_dict(self,
                id_generator,
                transfer_ids,
                shared_res_path_id,
                tmp_ids,
                opt_ids):
        '''
        * id_generator *IdGenerator*
        * transfer_ids *dict: client.FileTransfer -> int*
            This dictonary will be modified.
        * shared_res_path_id *dict: client.SharedResourcePath -> int*
            This dictonary will be modified.
        * tmp_ids *dict: client.TemporaryPath -> int*
        * opt_ids *dict: client.OptionPath -> int*
        '''
        job_dict = {}
        attributes = [
            "name",
            "disposal_timeout",
        ]
        for attr_name in attributes:
            job_dict[attr_name] = getattr(self, attr_name)
        # referenced_input_files, referenced_output_files
        # stdin, stdout_file, stderr_file and working_directory
        # can contain FileTransfer et SharedResourcePath.
        job_dict["class"] = '%s.%s' % (self.__class__.__module__,
                                       self.__class__.__name__)
        if self.referenced_input_files:
            ser_ref_in_files = list_to_serializable(
                self.referenced_input_files,
                id_generator,
                transfer_ids,
                shared_res_path_id,
                tmp_ids,
                opt_ids)
            job_dict['referenced_input_files'] = ser_ref_in_files
        if self.referenced_output_files:
            ser_ref_out_files = list_to_serializable(
                self.referenced_output_files,
                id_generator,
                transfer_ids,
                shared_res_path_id,
                tmp_ids,
                opt_ids)
            job_dict['referenced_output_files'] = ser_ref_out_files
        return job_dict  
class Workflow(object):
    '''
    Workflow representation.
    Attributes
    ----------
    name: string
        Name of the workflow which will be displayed in the GUI.
        Default: workflow_id once submitted
    jobs: sequence of Job
        Workflow jobs.
    dependencies: sequence of tuple (element, element), element being Job or Group
        Dependencies between the jobs of the workflow.
        If a job_a needs to be executed before a job_b can run: the tuple
        (job_a, job_b) must be added to the workflow dependencies. job_a and job_b
        must belong to workflow.jobs.
        In Soma-Workflow 2.7 or higher, dependencies may use groups. In this case,
        dependencies are replaced internally to setup the groups jobs dependencies.
        2 additional barrier jobs (see BarrierJob) are used for each group.
    root_group: *sequence of Job and/or Group*
        Recursive description of the workflow hierarchical structure. For displaying
        purpose only.
        .. note::
          root_group is only used to display nicely the workflow in the GUI. It
          does not have any impact on the workflow execution.
          If root_group is not set, all the jobs of the workflow will be
          displayed at the root level in the GUI tree view.
    user_storage: picklable object
        For the user needs, any small and picklable object can be stored here.
    env: dict(string, string)
        Environment variables to use when the job gets executed. The workflow-
        level env variables are set to all jobs.
    env_builder_code: string
        python source code. This code will be executed from the engine, on
        server side, but not in a processing node (in a separate python process
        in order not to pollute the engine process) when a workflow is
        starting. The code should print on the standard output a json
        dictionary of environment variables, which will be set into all jobs,
        in addition to the *env* variable above.
    param_links: dict
        New in 3.1.
        Job parameters links. Links are in the following shape::
            dest_job: {dest_param: [(source_job, param, <function>), ...]}
        Links are used to get output values from jobs which have completed
        their run, and to set them into downstream jobs inputs. This system
        allows "dynamic outputs" in workflows.
        The optional function item is the name of a function that will be
        called to transform values from the source to the destination of the
        link at runtime. It is basically a string "module.function", or a
        tuple for passing some arguments (as in partial):
        ("module.function", 12, "param"). The function is called with
        additional arguments: parameter name, parameter value, destination
        parameter name, destination parameter current value. The destination
        parameter value is typically used to build / update a list in the
        destination job from a series of values in source jobs.
        See :ref:`params_link_functions` for details.
    '''
    # string
    name = None
    # sequence of Job
    jobs = None
    # sequence of tuple (Job, Job)
    dependencies = None
    # sequence of Job and/or Group
    root_group = None
    # sequence of Groups built from the root_group
    groups = None
    # any small and picklable object needed by the user
    user_storage = None
    # environment variables
    env = {}
    # environment builder code
    env_builder_code = None
    param_links = {}
    def __init__(self,
                 jobs,
                 dependencies=None,
                 root_group=None,
                 disposal_timeout=168,
                 user_storage=None,
                 name=None,
                 env={},
                 env_builder_code=None,
                 param_links=None):
        '''
        In Soma-Workflow 3.1, some "jobs outputs" have been added. This concept
        is somewhat contradictory with the commandline execution model, which
        basically does not produce outputs other than files. To handle this,
        jobs which actually produce "outputs" (names parameters with output
        values) should write a JSON file containing the output values
        dictionary.
        Output values are then read by Soma-Workflow, and values are set in the
        downstream jobs which depend on these values.
        For this, "parameters links" have been added, to tell Soma-Workflow
        which input parameters should be replaced by output parameter values
        from an upstream job.
        param_links is an (optional) dict which specifies these links::
            {dest_job: {dest_param: [(source_job, param, <function>), ...]}}
        Such links de facto define new jobs dependencies, which are added to
        the dependencies manually specified.
        The optional function item is the name of a function that will be
        called to transform values from the source to the destination of the
        link at runtime. It is basically a string "module.function", or a
        tuple for passing some arguments (as in partial):
        ("module.function", 12, "param"). The function is called with
        additional arguments: parameter name, parameter value, destination
        parameter name, destination parameter current value. The destination
        parameter value is typically used to build / update a list in the
        destination job from a series of values in source jobs.
        Parameters
        ----------
        jobs
        dependencies
        root_group
        disposal_timeout
        user_storage
        name
        env
        env_builder_code
        param_links
        '''
        import logging
        logging.debug("Within Workflow constructor")
        self.name = name
        self.jobs = jobs
        if dependencies != None:
            self.dependencies = dependencies
        else:
            self.dependencies = []
        self.disposal_timeout = disposal_timeout
        # Groups
        if root_group:
            if isinstance(root_group, Group):
                self.root_group = root_group.elements
            else:
                self.root_group = root_group
            self.groups = []
            to_explore = []
            for element in self.root_group:
                if isinstance(element, Group):
                    to_explore.append(element)
            while to_explore:
                group = to_explore.pop()
                self.groups.append(group)
                for element in group.elements:
                    if isinstance(element, Group):
                        to_explore.append(element)
        else:
            # self.root_group = self.jobs
            self.groups = []
        # replace groups in deps
        self.__convert_group_dependencies()
        if not root_group:
            self.root_group = self.jobs
        self.env = env
        self.env_builder_code = env_builder_code
        self.param_links = param_links or dict()
        self.add_dependencies_from_links()
    def add_workflow(self, workflow, as_group=None):
        '''
        Concatenates a workflow into the current one.
        Parameters
        ----------
        workflow: Workflow
            workflow to be added to self
        as_group: string (optional)
            if specified, the workflow will be added as a group with the given
            name
        Returns
        -------
        group or None
            if as_group is specified, the group created for the sub-workflow
            will be returned, otherwise the function returns None.
        '''
        self.jobs += workflow.jobs
        if type(self.dependencies) in (list, tuple):
            self.dependencies += workflow.dependencies
        else:  # assume set
            self.dependencies.update(workflow.dependencies)
        if as_group:
            group = Group(workflow.root_group, name=as_group)
            self.root_group.append(group)
            self.groups += [group] + workflow.groups
        else:
            group = None
            self.root_group += workflow.root_group
            self.groups += workflow.groups
        self.param_links.update(workflow.param_links)
        self.env.update(workflow.env)
        # self.env_builder_code ?
        return group
    def add_dependencies(self, dependencies):
        '''
        Add additional dependencies in the workflow.
        '''
        if type(self.dependencies) in (list, tuple):
            self.dependencies += dependencies
        else:  # assume set
            self.dependencies.update(dependencies)
        self.__convert_group_dependencies(dependencies)
    def add_dependencies_from_links(self):
        '''
        Process parameters links and add missing jobs dependencies accordingly
        '''
        deps = set()
        if isinstance(self.dependencies, set):
            current_deps = self.dependencies
        else:
            current_deps = set(self.dependencies)
        for dest_job, links in six.iteritems(self.param_links):
            for p, linkl in six.iteritems(links):
                for link in linkl:
                    deps.add((link[0], dest_job))
        if isinstance(self.dependencies, list):
            for dep in deps:
                if dep not in current_deps:
                    self.dependencies.append(dep)
        else:  # deps as set
            for dep in deps:
                if dep not in current_deps:
                    self.dependencies.add(dep)
    def attributs_equal(self, other):
        if not isinstance(other, self.__class__):
            return False
        seq_attributes = [
            "jobs",
            "dependencies",
            "root_group",
            "groups"
        ]
        for attr_name in seq_attributes:
            attr = getattr(self, attr_name)
            other_attr = getattr(other, attr_name)
            if not len(attr) == len(other_attr):
                return False
            for i in range(0, len(attr)):
                if isinstance(attr[i], Job) or\
                        isinstance(attr[i], Group):
                    if not attr[i].attributs_equal(other_attr[i]):
                        return False
                elif isinstance(attr[i], tuple):
                    if not isinstance(other_attr[i], tuple) or\
                       not len(attr[i]) == len(other_attr[i]):
                        return False
                    if not attr[i][0].attributs_equal(other_attr[i][0]):
                        return False
                    if not attr[i][1].attributs_equal(other_attr[i][1]):
                        return False
                elif not attr[i] == other_attr[i]:
                    return False
        return self.name == other.name and self.env == other.env \
            and self.env_builder_code == other.env_builder_code \
            and self.param_links == other.param_links
    def to_dict(self):
        '''
        The keys must be string to serialize with JSON.
        '''
        id_generator = IdGenerator()
        job_ids = {}  # Job -> id
        wf_dict = {}
        wf_dict["name"] = self.name
        new_jobs = []
        for job in self.jobs:
            ident = id_generator.generate_id()
            new_jobs.append(ident)
            job_ids[job] = ident
        wf_dict["jobs"] = new_jobs
        new_dependencies = []
        for dep in self.dependencies:
            if dep[0] not in job_ids or dep[1] not in job_ids:
                raise Exception("Unknown jobs in dependencies.")
            new_dependencies.append((job_ids[dep[0]], job_ids[dep[1]]))
        wf_dict["dependencies"] = new_dependencies
        new_links = {}
        for dest_job, links in six.iteritems(self.param_links):
            wdjob = job_ids[dest_job]
            wlinks = {}
            for dest_par, linkl in six.iteritems(links):
                for link in linkl:
                    wlinks.setdefault(dest_par, []).append(
                        (job_ids[link[0]], ) + link[1:])
            new_links[wdjob] = wlinks
        wf_dict['param_links'] = new_links
        group_ids = {}
        new_groups = []
        for group in self.groups:
            ident = id_generator.generate_id()
            new_groups.append(ident)
            group_ids[group] = ident
        wf_dict["groups"] = new_groups
        new_root_group = []
        for element in self.root_group:
            if element in job_ids:
                new_root_group.append(job_ids[element])
            elif element in group_ids:
                new_root_group.append(group_ids[element])
            else:
                raise Exception("Unknown root group element.")
        wf_dict["root_group"] = new_root_group
        ser_groups = {}
        for group, group_id in six.iteritems(group_ids):
            ser_groups[str(group_id)] = group.to_dict(group_ids, job_ids)
        wf_dict["serialized_groups"] = ser_groups
        ser_jobs = {}
        ser_barriers = {}
        transfer_ids = {}  # FileTransfer -> id
        shared_res_path_ids = {}  # SharedResourcePath -> id
        temporary_ids = {}  # TemporaryPath -> id
        option_ids = {}  # OptionPath -> id
        for job, job_id in six.iteritems(job_ids):
            ser_jobs[str(job_id)] = job.to_dict(id_generator,
                                                transfer_ids,
                                                shared_res_path_ids,
                                                temporary_ids,
                                                option_ids)
        wf_dict["serialized_jobs"] = ser_jobs
        ser_transfers = {}
        for file_transfer, transfer_id in six.iteritems(transfer_ids):
            ser_transfers[str(transfer_id)] = file_transfer.to_dict()
        wf_dict["serialized_file_transfers"] = ser_transfers
        ser_srp = {}
        for srp, srp_id in six.iteritems(shared_res_path_ids):
            ser_srp[str(srp_id)] = srp.to_dict()
        wf_dict["serialized_shared_res_paths"] = ser_srp
        ser_tmp = {}
        for tmpf, tmp_id in six.iteritems(temporary_ids):
            ser_tmp[str(tmp_id)] = tmpf.to_dict()
        wf_dict["serialized_temporary_paths"] = ser_tmp
        ser_opt = {}
        for optf, opt_id in six.iteritems(option_ids):
            ser_opt[str(opt_id)] = optf.to_dict(id_generator,
                                                transfer_ids,
                                                shared_res_path_ids,
                                                temporary_ids,
                                                option_ids)
        wf_dict["serialized_option_paths"] = ser_opt
        # user_storage
        user_storage = self.user_storage
        if hasattr(user_storage, 'to_dict'):
            user_storage = user_storage.to_dict()
        if self.env:
            wf_dict['env'] = self.env
        if self.env_builder_code is not None:
            wf_dict['env_builder_code'] = self.env_builder_code
        if hasattr(self, 'uuid'):
            wf_dict['uuid'] = self.uuid
        return wf_dict
    @classmethod
    def from_dict(cls, d):
        name = d.get("name", None)
        # shared resource paths
        serialized_srp = d.get("serialized_shared_res_paths", {})
        srp_from_ids = {}
        for srp_id, srp_d in six.iteritems(serialized_srp):
            srp = SharedResourcePath.from_dict(srp_d)
            srp_from_ids[int(srp_id)] = srp
        # file transfers
        serialized_tr = d.get("serialized_file_transfers", {})
        tr_from_ids = {}
        for tr_id, tr_d in six.iteritems(serialized_tr):
            file_transfer = FileTransfer.from_dict(tr_d)
            tr_from_ids[int(tr_id)] = file_transfer
        # file transfers
        serialized_tmp = d.get("serialized_temporary_paths", {})
        tmp_from_ids = {}
        for tmp_id, tmp_d in six.iteritems(serialized_tmp):
            temp_file = TemporaryPath.from_dict(tmp_d)
            tmp_from_ids[int(tmp_id)] = temp_file
        # option paths
        serialized_opt = d.get("serialized_option_paths", {})
        opt_from_ids = {}
        for opt_id, opt_d in six.iteritems(serialized_opt):
            opt_file = OptionPath.from_dict(
                opt_d, tr_from_ids, srp_from_ids, tmp_from_ids, opt_from_ids)
            opt_from_ids[int(opt_id)] = opt_file
        # jobs
        serialized_jobs = d.get("serialized_jobs", {})
        job_from_ids = {}
        for job_id, job_d in six.iteritems(serialized_jobs):
            cls_name = job_d.get("class", "soma_workflow.client_types.Job")
            cls_mod = cls_name.rsplit('.', 1)
            if len(cls_mod) == 1:
                jcls = sys.modules[__name__].__dict__[cls_name]
            else:
                module = importlib.import_module(cls_mod[0])
                jcls = getattr(module, cls_mod[1])
            job = jcls.from_dict(job_d, tr_from_ids, srp_from_ids,
                                 tmp_from_ids, opt_from_ids)
            job_from_ids[int(job_id)] = job
        # barrier jobs
        # obsolete: barriers are now part of jobs definitions, but this helps
        # reloading older workflows.
        serialized_jobs = d.get("serialized_barriers", {})
        for job_id, job_d in six.iteritems(serialized_jobs):
            job = BarrierJob.from_dict(
                job_d, tr_from_ids, srp_from_ids, tmp_from_ids, opt_from_ids)
            job_from_ids[int(job_id)] = job
        jobs = list(job_from_ids.values())
        # groups
        serialized_groups = d.get("serialized_groups", {})
        group_from_ids = {}
        to_convert = list(serialized_groups.keys())
        converted_or_stuck = False
        while not converted_or_stuck:
            new_converted = []
            for group_id in to_convert:
                group = Group.from_dict(serialized_groups[group_id],
                                        group_from_ids,
                                        job_from_ids)
                if group != None:
                    new_converted.append(group_id)
                    group_from_ids[int(group_id)] = group
            for group_id in new_converted:
                to_convert.remove(group_id)
            converted_or_stuck = not to_convert or not new_converted
        groups = list(group_from_ids.values())  # WARNING, not used
        # root group
        id_root_group = d.get("root_group", [])
        root_group = []
        for el_id in id_root_group:
            if el_id in group_from_ids:
                root_group.append(group_from_ids[el_id])
            elif el_id in job_from_ids:
                root_group.append(job_from_ids[el_id])
        # dependencies
        dependencies = []
        id_dependencies = d.get("dependencies", [])
        for id_dep in id_dependencies:
            dep = (job_from_ids[id_dep[0]], job_from_ids[id_dep[1]])
            dependencies.append(dep)
        # param links
        param_links = {}
        id_links = d.get("param_links", {})
        for dest_job, links in six.iteritems(id_links):
            ddest_job = job_from_ids[int(dest_job)]
            dlinks = {}
            for lname, linkl in six.iteritems(links):
                dlinkl = []
                for link in linkl:
                    dsrc_job = job_from_ids[link[0]]
                    dlinkl.append((dsrc_job, ) + tuple(link[1:]))
                dlinks[lname] = dlinkl
            param_links[ddest_job] = dlinks
        # user storage, TODO: handle objects in it
        user_storage = d.get('user_storage', None)
        env = d.get('env', {})
        env_builder_code = d.get('env_builder_code')
        workflow = cls(jobs,
                       dependencies,
                       root_group=root_group,
                       user_storage=user_storage,
                       name=name,
                       env=env,
                       env_builder_code=env_builder_code,
                       param_links=param_links)
        return workflow
    def __getstate__(self):
        # filter out some instance attributes which should / can not be pickled
        no_picke = getattr(self, '_do_not_pickle', None)
        state_dict = self.__dict__
        if not no_picke:
            return state_dict
        copied = False
        for attribute in no_picke:
            if hasattr(self, attribute):
                if not copied:
                    state_dict = dict(state_dict)
                    copied = True
                del state_dict[attribute]
        return state_dict
    def __group_hubs(self, group, group_to_hub):
        '''
        Replace a group with a BarrierJob pair for inputs and ouputs).
        All jobs inside the group depends on its input hub, and the output hub
        depends on all jobs in the group
        '''
        ghubs = group_to_hub.get(group, None)
        if ghubs is not None:
            return ghubs
        ghubs = (BarrierJob(name=group.name + '_input'),
                 BarrierJob(name=group.name + '_output'))
        group_to_hub[group] = ghubs
        if type(self.jobs) is list:
            self.jobs += [ghubs[0], ghubs[1]]
        elif type(self.jobs) is set:
            self.jobs.update([ghubs[0], ghubs[1]])
        elif type(self.jobs) is tuple:
            self.jobs = list(self.jobs) + [ghubs[0], ghubs[1]]
        else:
            raise TypeError('Unsupported jobs list type: %s'
                            % repr(type(self.jobs)))
        return ghubs
    def __group_hubs_recurs(self, group, group_to_hub):
        '''
        Replace a group with a BarrierJob pair for inputs and ouputs).
        Same as __group_hubs() but also create hubs for sub-groups in group
        '''
        groups = [group]
        ghubs = None
        while groups:
            group = groups.pop(0)
            ghubs_tmp = self.__group_hubs(group, group_to_hub)
            if ghubs is None:
                ghubs = ghubs_tmp
            groups += [element for element in group.elements
                       if isinstance(element, Group)]
        return ghubs
    def __make_group_hubs_deps(self, group, group_to_hub):
        '''
        Build and return intra-group dependencies list
        '''
        dependencies = []
        in_hub, out_hub = self.__group_hubs(group, group_to_hub)
        for item in group.elements:
            if isinstance(item, Group):  # depends on a sub-group
                sub_hub = self.__group_hubs(item, group_to_hub)
                # TODO: check that these dependencies are not already here
                # (directly or indirectly)
                dependencies.append((in_hub, sub_hub[0]))
                dependencies.append((sub_hub[1], out_hub))
            else:  # regular job
                # TODO: check that these dependencies are not already here
                # (directly or indirectly)
                dependencies.append((in_hub, item))
                dependencies.append((item, out_hub))
        return dependencies
    def __convert_group_dependencies(self, dependencies=None):
        '''
        Converts dependencies using groups into barrier jobs when needed
        Parameters
        ----------
        dependencies: list, tuple, set (optional)
            dependencies list to check. If not specified, chek all dependencies
            in the workflow. When specified, the dependencies list should be
            a subset of the workflow dependencies (all must exist in
            self.dependencies)
        '''
        new_deps_list = []
        group_to_hub = {}
        deps_to_remove = []
        if dependencies is None:
            dependencies = self.dependencies
            reindex = False
        else:
            reindex = True
        for index, dependency in enumerate(dependencies):
            j1, j2 = dependency
            if not isinstance(j1, Group) and not isinstance(j2, Group):
                continue
            if type(self.dependencies) in (list, tuple):
                if reindex:
                    index = self.dependencies.index(dependency)
                deps_to_remove.insert(0, index)  # reverse order index list
            else:
                deps_to_remove.append(dependency)
            if isinstance(j1, Group):
                # a group is replaced with a BarrierJob pair for inputs and
                # ouputs)
                ghubs = self.__group_hubs_recurs(j1, group_to_hub)
                j1 = ghubs[1]  # replace input group with the group ouput hub
            if isinstance(j2, Group):
                # a group is replaced with a BarrierJob pair for inputs and
                # ouputs)
                ghubs = self.__group_hubs_recurs(j2, group_to_hub)
                j2 = ghubs[0]  # replace output group with the group input hub
            new_deps_list.append((j1, j2))
        # rebuild intra-group links
        for group, ghubs in six.iteritems(group_to_hub):
            new_deps_list += self.__make_group_hubs_deps(group, group_to_hub)
        if type(self.dependencies) is set:
            self.dependencies.difference_update(deps_to_remove)
            self.dependencies.update(new_deps_list)
        elif type(self.dependencies) is list:
            # remove converted dependencies
            for index in deps_to_remove:
                del self.dependencies[index]
            # add new ones
            self.dependencies += new_deps_list
        elif type(self.dependencies) is tuple:
            self.dependencies = list(self.dependencies)
            # remove converted dependencies
            for index in deps_to_remove:
                del self.dependencies[index]
            # add new ones
            self.dependencies += new_deps_list
        else:
            raise TypeError('Unsupported dependencies type: %s'
                            % repr(type(self.dependencies)))
class Group(object):
    '''
    Hierarchical structure of a workflow.
    .. note:
      It only has a displaying role and does not have any impact on the workflow
      execution.
    **elements**: *sequence of Job and/or Group*
      The elements (Job or Group) belonging to the group.
    **name**: *string*
      Name of the Group which will be displayed in the GUI.
    **user_storage**: *picklable object*
      For the user needs, any small and picklable object can be stored here.
    '''
    # string
    name = None
    # sequence of Job and/or Group
    elements = None
    # any small and picklable object needed by the user
    user_storage = None
    def __init__(self, elements, name, user_storage=None):
        '''
        @type  elements: sequence of Job and/or Group
        @param elements: the elements belonging to the group
        @type  name: string
        @param name: name of the group
        '''
        self.elements = elements
        self.name = name
    def attributs_equal(self, other):
        if not isinstance(other, self.__class__):
            return False
        if len(self.elements) != len(other.elements):
            return False
        for i in range(0, len(self.elements)):
            if not self.elements[i].attributs_equal(other.elements[i]):
                return False
        if self.name != other.name:
            return False
        return True
    def to_dict(self, group_ids, job_ids):
        group_dict = {}
        group_dict["name"] = self.name
        new_gp_elements = []
        for element in self.elements:
            if element in job_ids:
                new_gp_elements.append(job_ids[element])
            elif element in group_ids:
                new_gp_elements.append(group_ids[element])
            else:
                raise Exception("Unknown group element.")
        group_dict["elements"] = new_gp_elements
        return group_dict
    @classmethod
    def from_dict(cls, d, group_from_ids, job_from_ids):
        id_elements = d["elements"]
        elements = []
        for el_id in id_elements:
            if el_id in job_from_ids:
                elements.append(job_from_ids[el_id])
            elif el_id in group_from_ids:
                elements.append(group_from_ids[el_id])
            else:
                return None
        name = d["name"]
        group = cls(elements, name)
        return group
class SpecialPath(object):
    '''
    Abstract base class for special file or directory path, which needs specific handling in the engine.
    FileTransfer, TemporaryPath, and SharedResourcePath are SpecialPath.
    '''
    def __init__(self, path=None):
        super(SpecialPath, self).__init__()
        if isinstance(path, self.__class__):
            self.pattern = path.pattern
            self.ref = path.referent()
        else:
            self.pattern = u'%s'
            self.ref = None
    def referent(self):
        return self.ref if self.ref else self
    def __add__(self, other):
        res = type(self)(self)
        res.pattern = self.pattern + six.text_type(other)
        res.ref = self.referent()
        return res
    def __radd__(self, other):
        res = type(self)(self)
        res.pattern = six.text_type(other) + self.pattern
        res.ref = self.referent()
        return res
    def __iadd__(self, other):
        self.pattern += six.text_type(other)
        super(SpecialPath, self).__iadd__(six.text_type(other))
    def __hash__(self):
        if self.ref:
            return self.referent().__hash__()
        return super(SpecialPath, self).__hash__()
    def __eq__(self, other):
        if not isinstance(other, self.__class__):
            return False
        return self.referent() is other.referent()
    def __lt__(self, other):
        return hash(self) < hash(other)
    def __gt__(self, other):
        return hash(self) > hash(other)
    def __le__(self, other):
        return hash(self) <= hash(other)
    def __ge__(self, other):
        return hash(self) >= hash(other)
class FileTransfer(SpecialPath):
    '''
    File/directory transfer representation
    .. note::
      FileTransfers objects are only required if the user and computing resources
      have a separate file system.
    **client_path**: *string*
      Path of the file or directory on the user's file system.
    **initial_status**: *constants.FILES_DO_NOT_EXIST or constants.FILES_ON_CLIENT*
      * constants.FILES_ON_CLIENT for workflow input files
        The file(s) will need to be transfered on the computing resource side
      * constants.FILES_DO_NOT_EXIST for workflow output files
        The file(s) will be created by a job on the computing resource side.
    **client_paths**: *sequence of string*
      Sequence of path. Files to transfer if the FileTransfers concerns a file
      series or if the file format involves several associated files or
      directories (see the note below).
    **name**: *string*
      Name of the FileTransfer which will be displayed in the GUI.
      Default: client_path + "transfer"
    When a file is transfered via a FileTransfer, the Job it is used in has to be
    built using the FileTransfer object in place of the file name in the command
    list. The FileTransfer object has also to be on the referenced_input_files
    or referenced_output_files lists in Job constructor.
    .. note::
      Use client_paths if the transfer involves several associated files and/or
      directories. Examples:
        * file series
        * file format associating several file and/or directories
          (ex: a SPM images are stored in 2 associated files: .img and .hdr)
          In this case, set client_path to one the files (ex: .img) and
          client_paths contains all the files (ex: .img and .hdr files)
      In other cases (1 file or 1 directory) the client_paths must be set to
      None.
      When client_paths is not None, the server-side handling of paths is
      different: the server directory is used istead of files. This has slight
      consequences on the behaviour of the workflow:
      * in soma-workflow 2.6 and earlier, the commandline will be using the
        directory instead of a file name, which is often not what you expect.
      * in soma-workflow 2.7 and later, the commandline will be using the
        main file name (client_path) translated to the server location. This is
        more probably what is expected.
      * in any case it is possible to specify the commandline path using a
        tuple as commandline argument:
        ::
          myfile = FileTransfer(is_input=True, client_path='/home/bubu/plof.nii',
              client_paths=['/home/bubu/plof.nii', '/home/bubu/plof.nii.minf'])
          # job1 will work with SWF >= 2.7, not in 2.6
          job1 = Job(command=['AimsFileInfo', myfile],
              referenced_input_files=[myfile])
          # job2 will use <engine_path>/plof.nii as input
          job2 = Job(command=['AimsFileInfo', (myfile, 'plof.nii')],
              referenced_input_files=[myfile]))
    '''
    # string
    _client_path = None
    # sequence of string
    _client_paths = None
    # constants.FILES_DO_NOT_EXIST constants.FILES_ON_CLIENT
    _initial_status = None
    # int (hours)
    _disposal_timeout = None
    # string
    _name = None
    def __init__(self,
                 is_input,
                 client_path=None,
                 disposal_timeout=168,
                 name=None,
                 client_paths=None,
                 ):
        '''
        Parameters
        ----------
        is_input: bool
            specifies if the files have to be transferred from the client before
            job execution, or back to the client after execution.
        client_path: string
            main file name
        disposal_timeout: int (optional)
            default: 168
        name: string (optional)
            name displayed in the GUI
        client_paths: list (optional)
            when several files are involved
        '''
        if isinstance(is_input, self.__class__):
            if client_path is not None or name is not None \
                    or client_paths is not None:
                raise TypeError('FileTransfer as copy constructor '
                                'should have only one argument')
            super(FileTransfer, self).__init__(is_input)
            return
        if client_path is None:
            raise TypeError('FileTransfer.__init__ takes at least '
                            '3 arguments')
        super(FileTransfer, self).__init__()
        if name:
            ft_name = name
        else:
            ft_name = client_path + "transfer"
        self.name = ft_name
        self._client_path = client_path
        self._disposal_timeout = disposal_timeout
        self._client_paths = client_paths
        if is_input:
            self._initial_status = constants.FILES_ON_CLIENT
        else:
            self._initial_status = constants.FILES_DO_NOT_EXIST
    @property
    def client_path(self):
        return self.referent()._client_path
    @client_path.setter
    def client_path(self, value):
        self.referent()._client_path = value
    @property
    def client_paths(self):
        return self.referent()._client_paths
    @client_paths.setter
    def client_paths(self, value):
        self.referent()._client_paths = value
    @property
    def initial_status(self):
        return self.referent()._initial_status
    @initial_status.setter
    def initial_status(self, value):
        self.referent()._initial_status = value
    @property
    def disposal_timeout(self):
        return self.referent()._disposal_timeout
    @disposal_timeout.setter
    def disposal_timeout(self, value):
        self.referent()._disposal_timeout = value
    @property
    def name(self):
        return self.referent()._name
    @name.setter
    def name(self, value):
        self.referent()._name = value
    def attributs_equal(self, other):
        if not isinstance(other, self.__class__):
            return False
        attributes = [
            "client_path",
            "client_paths",
            "initial_status",
            "disposal_timeout",
            "name",
            "pattern",
        ]
        for attr_name in attributes:
            attr = getattr(self, attr_name)
            other_attr = getattr(other, attr_name)
            if not other_attr == attr:
                return False
        return True
    def to_dict(self):
        transfer_dict = {}
        attributes = [
            "client_path",
            "client_paths",
            "initial_status",
            "disposal_timeout",
            "name",
            "pattern",
        ]
        for attr_name in attributes:
            transfer_dict[attr_name] = getattr(self, attr_name)
        return transfer_dict
    @classmethod
    def from_dict(cls, d):
        transfer = cls(is_input=True,
                       client_path="foo")
        for key, value in six.iteritems(d):
            setattr(transfer, key, value)
        return transfer
    def __str__(self):
        return self.pattern % self.referent().client_path
    def __repr__(self):
        return repr(self.__str__())
class SharedResourcePath(SpecialPath):
    '''
    Representation of path which is valid on either user's or computing resource
    file system.
    .. note::
      SharedResourcePath objects are only required if the user and computing
      resources have a separate file system.
    **namespace**: *string*
      Namespace for the path. That way several applications can use the same
      identifiers without risk.
    **uuid**: *string*
      Identifier of the absolute path.
    **relative_path**: *string*
      Relative path of the file if the absolute path is a directory path.
    .. warning::
      The namespace and uuid must exist in the translations files configured on
      the computing resource side.
    '''
    _relative_path = None
    _namespace = None
    _uuid = None
    _disposal_timeout = None
    def __init__(self,
                 relative_path,
                 namespace=None,
                 uuid=None,
                 disposal_timeout=168):
        if isinstance(relative_path, self.__class__):
            if namespace is not None or uuid is not None:
                raise TypeError('SharedResourcePath as copy constructor '
                                'should have only one argument')
            super(SharedResourcePath, self).__init__(relative_path)
            return
        if namespace is None or uuid is None:
            raise TypeError('SharedResourcePath.__init__ takes at least '
                            '4 arguments')
        super(SharedResourcePath, self).__init__()
        self.relative_path = relative_path
        self.namespace = namespace
        self.uuid = uuid
        self.disposal_timout = disposal_timeout
    @property
    def relative_path(self):
        return self.referent()._relative_path
    @relative_path.setter
    def relative_path(self, value):
        self.referent()._relative_path = value
    @property
    def namespace(self):
        return self.referent()._namespace
    @namespace.setter
    def namespace(self, value):
        self.referent()._namespace = value
    @property
    def uuid(self):
        return self.referent()._uuid
    @uuid.setter
    def uuid(self, value):
        self.referent()._uuid = value
    @property
    def disposal_timeout(self):
        return self.referent()._disposal_timeout
    @disposal_timeout.setter
    def disposal_timeout(self, value):
        self.referent()._disposal_timeout = value
    def attributs_equal(self, other):
        if not isinstance(other, self.__class__):
            return False
        attributes = [
            "relative_path",
            "namespace",
            "uuid",
            "disposal_timeout",
            "pattern",
        ]
        ref = self.referent()
        for attr_name in attributes:
            attr = getattr(ref, attr_name)
            other_attr = getattr(other, attr_name)
            if not attr == other_attr:
                return False
        return True
    def to_dict(self):
        srp_dict = {}
        attributes = [
            "relative_path",
            "namespace",
            "uuid",
            "disposal_timeout",
            "pattern",
        ]
        ref = self.referent()
        for attr_name in attributes:
            srp_dict[attr_name] = getattr(ref, attr_name)
        return srp_dict
    @classmethod
    def from_dict(cls, d):
        shared_res_path = cls(relative_path="toto",
                              namespace="toto",
                              uuid="toto")
        for key, value in six.iteritems(d):
            setattr(shared_res_path, key, value)
        return shared_res_path
    def __str__(self):
        ref = self.referent()
        return self.pattern % ("%s:%s:%s" % (ref.namespace, ref.uuid,
                                             ref.relative_path))
class TemporaryPath(SpecialPath):
    '''
    Temporary file representation. This temporary file will never exist on client
    side: its filename will be created on server side, used during the workflow
    execution, and removed when not used any longer.
    Parameters
    ----------
    is_directory: bool (optional)
        default: False
    disposal_timeout: int (optional)
        default: 168
    name: string (optional)
        name for the TemporaryPath object, displayed in GUI for instance
    suffix: string (optional)
        suffix (typically: extension) applied to the generated file name
    '''
    # bool
    _is_directory = False
    # int (hours)
    _disposal_timeout = None
    # string
    _name = None
    # string
    _suffix = None
    def __init__(self,
                 is_directory=False,
                 disposal_timeout=168,
                 name=None,
                 suffix=''):
        if isinstance(is_directory, self.__class__):
            if name is not None or suffix != '':
                raise TypeError('TemporaryPath as copy constructor should '
                                'have only one argument')
            super(TemporaryPath, self).__init__(is_directory)
            return
        super(TemporaryPath, self).__init__()
        self._is_directory = is_directory
        self._disposal_timeout = disposal_timeout
        self._suffix = suffix
        if name is None:
            self._name = 'temporary'
        else:
            self._name = name
    @property
    def is_directory(self):
        return self.referent()._is_directory
    @is_directory.setter
    def is_directory(self, value):
        self.referent()._is_directory = value
    @property
    def disposal_timeout(self):
        return self.referent()._disposal_timeout
    @disposal_timeout.setter
    def disposal_timeout(self, value):
        self.referent()._disposal_timeout = value
    @property
    def name(self):
        return self.referent()._name
    @name.setter
    def name(self, value):
        self.referent()._name = value
    @property
    def suffix(self):
        return self.referent()._suffix
    @suffix.setter
    def suffix(self, value):
        self.referent()._suffix = value
    def attributs_equal(self, other):
        if not isinstance(other, self.__class__):
            return False
        attributes = [
            "is_directory",
            "disposal_timeout",
            "name",
            "suffix",
            "pattern",
        ]
        for attr_name in attributes:
            attr = getattr(self, attr_name)
            other_attr = getattr(other, attr_name)
            if not attr == other_attr:
                return False
        return True
    def to_dict(self):
        srp_dict = {}
        attributes = [
            "is_directory",
            "disposal_timeout",
            "name",
            "suffix",
            "pattern",
        ]
        for attr_name in attributes:
            srp_dict[attr_name] = getattr(self, attr_name)
        return srp_dict
    @classmethod
    def from_dict(cls, d):
        is_directory = d.get("is_directory", False)
        disposal_timeout = d.get("disposal_timeout", 168)
        suffix = d.get("suffix", "")
        temp_file = cls(is_directory=is_directory,
                        disposal_timeout=disposal_timeout,
                        suffix=suffix)
        for key, value in six.iteritems(d):
            setattr(temp_file, key, value)
        return temp_file
    def __str__(self):
        return self.pattern % self.name
class OptionPath(SpecialPath):
    '''
    File with reading or writing parameters given through a URI (or any
    other suffix system). The file can be passed as a string or as a
    SpecialPath object.
    Parameters
    ----------
    parent_path : :obj:`str` or :obj:`SpecialPath`
        Path to the input file. If it is a :obj:`FileTransfer` or
        :obj:`TemporayPath`, this parent path should be added to
        the Job's referenced_input_files or referenced_output_files.
    uri : :obj:`str` or :obj:`dict`
        * If the provided URI is a string, it is tored as is and will be
        added at the end of the path when the server-side command is
        generated. A URI is of the form '?option1=value1&option2=value2'.
        However, since the provided `uri` is untouched, any other option
        passing system can be used.
        * If the provided URI is a dictionary, it is converted to a URI
        string. Each key is considered an option name, mapped to its
        associated value.
    name : :obj:`str` (optional)
        Name of the path. If not provided, the full client-side path + URI is
        used.
    '''
    _parent_path = None
    _parent_type = None
    _uri = ''
    _name = None
    def __init__(self, parent_path=None, uri=None, name=None):
        # copy constructor
        if isinstance(parent_path, OptionPath):
            if not uri is None:
                raise TypeError('OptionPath as copy constructor should '
                                'have only one argument')
            super(OptionPath, self).__init__(parent_path)
            return
        # normal constructor
        super(OptionPath, self).__init__()
        if isinstance(parent_path, SpecialPath):
            self._parent_path = parent_path
        else:
            self._parent_path = str(parent_path)
        self._parent_type = type(self._parent_path).__name__
        if isinstance(uri, dict):
            from six import iteritems
            build_uri = '?'
            for key, value in iteritems(uri):
                build_uri += str(key) + '=' + str(value) + '&'
            build_uri = build_uri[:-1]
            self._uri = build_uri
        elif str(uri):
            self._uri = str(uri)
        if name is None:
            self._name = 'parent_path' + self._uri
        else:
            self._name = name
    @property
    def parent_type(self):
        return self.referent()._parent_type
    @property
    def parent_path(self):
        return self.referent()._parent_path
    @parent_path.setter
    def parent_path(self, value):
        self.referent()._path = value
        if isinstance(value, SpecialPath):
            self.referent()._parent_path = value
        else:
            self.referent()._parent_path = str(value)
        self.referent()._parent_type = type(
            self.referent()._parent_path).__name__
    @property
    def uri(self):
        return self.referent()._uri
    @uri.setter
    def uri(self, value):
        self.referent()._uri = value
    @property
    def name(self):
        return self.referent()._name
    @name.setter
    def name(self, value):
        self.referent()._name = value
    def attributs_equal(self, other):
        if not isinstance(other, self.__class__):
            return False
        attributes = [
            "parent_type",
            "parent_path",
            "uri",
            "name",
            "pattern",
        ]
        for attr_name in attributes:
            attr = getattr(self, attr_name)
            other_attr = getattr(other, attr_name)
            if not attr == other_attr:
                return False
        return True
    def to_dict(self,
                id_generator,
                transfer_ids,
                shared_res_path_id,
                tmp_ids,
                opt_ids):
        opt_dict = {}
        attributes = [
            "parent_type",
            "parent_path",
            "uri",
            "name",
            "pattern",
        ]
        for attr_name in attributes:
            if attr_name == "parent_path" and getattr(self, "parent_type") != 'str':
                opt_dict[attr_name] = to_serializable(getattr(self, attr_name),
                                                      id_generator,
                                                      transfer_ids,
                                                      shared_res_path_id,
                                                      tmp_ids,
                                                      opt_ids)
            else:
                opt_dict[attr_name] = getattr(self, attr_name)
        return opt_dict
    @classmethod
    def from_dict(cls, d,
                  tr_from_ids,
                  srp_from_ids,
                  tmp_from_ids,
                  opt_from_ids):
        parent_type = d.get("parent_type", "str")
        parent_path = d.get("parent_path", None)
        if parent_type != "str":
            parent_path = from_serializable(parent_path,
                                            tr_from_ids,
                                            srp_from_ids,
                                            tmp_from_ids,
                                            opt_from_ids)
        uri = d.get("uri", None)
        name = d.get("name", None)
        temp_file = cls(parent_path=parent_path, uri=uri, name=name)
        for key, value in six.iteritems(d):
            if not key in ("parent_path", "parent_type", "uri", "name"):
                setattr(temp_file, key, value)
        return temp_file
    def __str__(self):
        return self.pattern % ("%s%s" % (str(self.parent_path), self.uri))
    def __repr__(self):
        return repr(self.__str__())
class IdGenerator(object):
    def __init__(self):
        self.current_id = 0
    def generate_id(self):
        current_id = self.current_id
        self.current_id = self.current_id + 1
        return current_id
def to_serializable(element,
                    id_generator,
                    transfer_ids,
                    shared_res_path_ids,
                    tmp_ids,
                    opt_ids):
    if isinstance(element, FileTransfer):
        if element in transfer_ids:
            return ('<id>', transfer_ids[element])
        else:
            ident = id_generator.generate_id()
            transfer_ids[element] = ident
            return ('<id>', ident)
    elif isinstance(element, SharedResourcePath):
        if element in shared_res_path_ids:
            return ('<id>', shared_res_path_ids[element])
        else:
            ident = id_generator.generate_id()
            shared_res_path_ids[element] = ident
            return ('<id>', ident)
    elif isinstance(element, TemporaryPath):
        if element in tmp_ids:
            return ('<id>', tmp_ids[element])
        else:
            ident = id_generator.generate_id()
            tmp_ids[element] = ident
            return ('<id>', ident)
    elif isinstance(element, OptionPath):
        if element in opt_ids:
            return ('<id>', opt_ids[element])
        else:
            ident = id_generator.generate_id()
            opt_ids[element] = ident
            return ('<id>', ident)
    elif isinstance(element, list):
        return list_to_serializable(element,
                                    id_generator,
                                    transfer_ids,
                                    shared_res_path_ids,
                                    tmp_ids,
                                    opt_ids)
    elif isinstance(element, tuple):
        return tuple(list_to_serializable(element,
                                          id_generator,
                                          transfer_ids,
                                          shared_res_path_ids,
                                          tmp_ids,
                                          opt_ids))
        # return ["soma-workflow-tuple",
                # to_serializable(element[0],
                                # id_generator,
                                # transfer_ids,
                                # shared_res_path_ids,
                                # tmp_ids,
                                # opt_ids),
                # element[1]]
    else:
        return element
def from_serializable(element,
                      tr_from_ids,
                      srp_from_ids,
                      tmp_from_ids,
                      opt_from_ids):
    if isinstance(element, list):
        if len(element) == 3 and element[0] == "soma-workflow-tuple":
            return (from_serializable(element[1],
                                      tr_from_ids,
                                      srp_from_ids,
                                      tmp_from_ids,
                                      opt_from_ids),
                    element[2])
        else:
            return list_from_serializable(element, tr_from_ids, srp_from_ids,
                                          tmp_from_ids, opt_from_ids)
    elif isinstance(element, tuple):
        if len(element) >= 1:
            code = element[0]
            if code == '<id>' and len(element) == 2:
                el = element[1]
                if el in tr_from_ids:
                    return tr_from_ids[el]
                elif el in srp_from_ids:
                    return srp_from_ids[el]
                elif el in tmp_from_ids:
                    return tmp_from_ids[el]
                elif el in opt_from_ids:
                    return opt_from_ids[el]
            return tuple(
                list_from_serializable(element, tr_from_ids, srp_from_ids,
                                       tmp_from_ids, opt_from_ids))
    else:
        return element
def list_to_serializable(list_to_convert,
                         id_generator,
                         transfer_ids,
                         shared_res_path_ids,
                         tmp_ids,
                         opt_ids):
    ser_list = []
    for element in list_to_convert:
        ser_element = to_serializable(element,
                                      id_generator,
                                      transfer_ids,
                                      shared_res_path_ids,
                                      tmp_ids,
                                      opt_ids)
        ser_list.append(ser_element)
    return ser_list
def list_from_serializable(list_to_convert,
                           tr_from_ids,
                           srp_from_ids,
                           tmp_from_ids,
                           opt_from_ids):
    us_list = []
    for element in list_to_convert:
        us_element = from_serializable(element,
                                       tr_from_ids,
                                       srp_from_ids,
                                       tmp_from_ids,
                                       opt_from_ids)
        us_list.append(us_element)
    return us_list