Source code for capsul.study_config.process_instance

# -*- coding: utf-8 -*-
'''
Process instance factory

Functions
=========
:func:`is_process`
------------------
:func:`is_pipeline_node`
------------------------
:func:`get_process_instance`
----------------------------
:func:`get_node_class`
----------------------
:func:`get_node_instance`
-------------------------
'''

# System import
from __future__ import absolute_import
import sys
import os.path as osp
import importlib
import types
import re
import six
import os
import inspect

# Caspul import
from capsul.process.process import Process
from capsul.pipeline.pipeline import Pipeline
from capsul.process.nipype_process import nipype_factory
from capsul.process.xml import create_xml_process
from capsul.pipeline.xml import create_xml_pipeline
from capsul.pipeline.pipeline_nodes import Node
from soma.controller import Controller

# Nipype import
try:
    from nipype.interfaces.base import Interface
# If nipype is not found create a dummy Interface class
except ImportError:
    Interface = type("Interface", (object, ), {})


process_xml_re = re.compile(r'<process.*</process>', re.DOTALL)


[docs]def is_process(item): """ Check if the input item is a process class or function with decorator or XML docstring which makes it seen as a process """ if inspect.isclass(item) and item not in (Pipeline, Process) \ and (issubclass(item, Process) or issubclass(item, Interface)): return True if not inspect.isfunction(item): return False if hasattr(item, 'capsul_xml'): return True if item.__doc__: match = process_xml_re.search(item.__doc__) if match: return True return False
[docs]def is_pipeline_node(item): """ Check if the input is an instance of pipeline Node """ return item is not Node and isinstance(item, Node)
[docs]def get_process_instance(process_or_id, study_config=None, **kwargs): """ Return a Process instance given an identifier. Note that it is convenient to create a process from a StudyConfig instance: StudyConfig.get_process_instance() The identifier is either: * a derived Process class. * a derived Process class instance. * a Nipype Interface instance. * a Nipype Interface class. * a string description of the class `<module>.<class>`. * a string description of a function to warp `<module>.<function>`. * a string description of a module containing a single process `<module>` * a string description of a pipeline `<module>.<fname>.xml`. * an XML filename for a pipeline. * a Python (.py) filename with process name in it: `/path/process_source.py#ProcessName`. * a Python (.py) filename for a file containg a single process. Default values of the process instance are passed as additional parameters. .. note: If no process is found an ImportError is raised. .. note: If the 'process_or_id' parameter is not valid a ValueError is raised. .. note: If the function to warp does not contain a process description in its docstring ('<process>...</process>') a ValueError is raised. Parameters ---------- process_or_id: instance or class description (mandatory) a process/nipype interface instance/class or a string description. study_config: StudyConfig instance (optional) A Process instance belongs to a StudyConfig framework. If not specified the study_config can be set afterwards. kwargs: default values of the process instance parameters. Returns ------- result: Process an initialized process instance. """ # NOTE # here we make a bidouille to make study_config accessible from processes # constructors. It is used for instance in ProcessIteration. # This is not elegant, not thread-safe, and forbids to have one pipeline # build a second one in a different study_config context. # I don't have a better solution, however. import capsul.study_config.study_config as study_cmod set_study_config = (study_config is not None and study_cmod._default_study_config is not study_config) try: if set_study_config: old_default_study_config = study_cmod._default_study_config study_cmod._default_study_config = study_config return _get_process_instance(process_or_id, study_config=study_config, **kwargs) finally: if set_study_config: study_cmod._default_study_config = old_default_study_config
def _execfile(filename): # This chunk of code cannot be put inline in python 2.6 glob_dict = {} exec(compile(open(filename, "rb").read(), filename, 'exec'), glob_dict, glob_dict) return glob_dict def _load_module(filename, modname=None): if not modname: modname = os.path.basename(filename).rsplit('.', 2)[0] if sys.version_info >= (3, 5): import importlib.util spec = importlib.util.spec_from_file_location(modname, filename) mod = importlib.util.module_from_spec(spec) sys.modules[modname] = mod spec.loader.exec_module(mod) return mod elif sys.version_info[0] >= 3: from importlib.machinery import SourceFileLoader mod = SourceFileLoader(modname, filename).load_module() else: import imp mod = imp.load_source(modname, filename) if mod is not None: sys.modules[modname] = mod return mod def _get_process_instance(process_or_id, study_config=None, **kwargs): def _find_single_process(module_dict, filename): ''' Scan objects in module_dict and find out if a single one of them is a process ''' object_name = None for name, item in six.iteritems(module_dict): if is_process(item): if object_name is not None: raise KeyError( 'file %s contains several processes. Please ' 'specify which one shoule be used using ' 'filename.py#ProcessName or ' 'module.submodule.ProcessName' % filename) object_name = name return object_name result = None # If the function 'process_or_id' parameter is already a Process # instance. if isinstance(process_or_id, Process): result = process_or_id # If the function 'process_or_id' parameter is a Process class. elif (isinstance(process_or_id, type) and issubclass(process_or_id, Process)): result = process_or_id() # If the function 'process_or_id' parameter is already a Nipye # interface instance, wrap this structure in a Process class elif isinstance(process_or_id, Interface): result = nipype_factory(process_or_id) # If the function 'process_or_id' parameter is an Interface class. elif (isinstance(process_or_id, type) and issubclass(process_or_id, Interface)): result = nipype_factory(process_or_id()) # If the function 'process_or_id' parameter is a function. elif isinstance(process_or_id, types.FunctionType): xml = getattr(process_or_id, 'capsul_xml', None) if xml is None: # Check docstring if process_or_id.__doc__: match = process_xml_re.search( process_or_id.__doc__) if match: xml = match.group(0) if xml: result = create_xml_process(process_or_id.__module__, process_or_id.__name__, process_or_id, xml)() else: raise ValueError('Cannot find XML description to make function {0} a process'.format(process_or_id)) # If the function 'process_or_id' parameter is a class string # description elif isinstance(process_or_id, six.string_types): py_url = os.path.basename(process_or_id).split('#') object_name = None as_xml = False as_py = False module_dict = None module = None if len(py_url) >= 2 and py_url[-2].endswith('.py') \ or len(py_url) == 1 and py_url[0].endswith('.py'): # python file + process name: something.py#ProcessName # or just something.py if it contains only one process class if len(py_url) >= 2: filename = process_or_id[:-len(py_url[-1]) - 1] object_name = py_url[-1] else: filename = process_or_id object_name = None module = _load_module(filename) module_name = module.__name__ module_dict = module.__dict__ module_dict = module.__dict__ object_name = _find_single_process( module_dict, module_name) if object_name is not None: module_name = process_or_id as_py = True if object_name is None: elements = process_or_id.rsplit('.', 1) if len(elements) < 2: module_name, object_name = '__main__', elements[0] else: module_name, object_name = elements try: module = importlib.import_module(module_name) if object_name not in module.__dict__ \ or not is_process(getattr(module, object_name)): # maybe a module with a single process in it module = importlib.import_module(process_or_id) module_dict = module.__dict__ object_name = _find_single_process( module_dict, module_name) if object_name is not None: module_name = process_or_id as_py = True else: as_py = True except ImportError as e: pass if not as_py: # maybe XML filename or URL xml_url = process_or_id + '.xml' if osp.exists(xml_url): object_name = None elif process_or_id.endswith('.xml') and osp.exists(process_or_id): xml_url = process_or_id object_name = None else: # maybe XML file with pipeline name in it xml_url = module_name + '.xml' if not osp.exists(xml_url) and module_name.endswith('.xml') \ and osp.exists(module_name): xml_url = module_name if not osp.exists(xml_url): # try XML file in a module directory + class name basename = None module_name2 = None if module_name in sys.modules: basename = object_name module_name2 = module_name object_name = None # to allow unmatching class / xml if basename.endswith('.xml'): basename = basename[:-4] else: elements = module_name.rsplit('.', 1) if len(elements) == 2: module_name2, basename = elements if module_name2 and basename: try: importlib.import_module(module_name2) mod_dirname = osp.dirname( sys.modules[module_name2].__file__) xml_url = osp.join(mod_dirname, basename + '.xml') if not osp.exists(xml_url): # if basename includes .xml extension xml_url = osp.join(mod_dirname, basename) except ImportError as e: raise ImportError('Cannot import %s: %s' % (module_name, str(e))) as_xml = True if osp.exists(xml_url): result = create_xml_pipeline(module_name, object_name, xml_url)() if result is None and not as_xml: if module_dict is not None: module_object = module_dict.get(object_name) else: module = sys.modules[module_name] module_object = getattr(module, object_name, None) if module_object is not None: if (isinstance(module_object, type) and issubclass(module_object, Process)): result = module_object() elif isinstance(module_object, Interface): # If we have a Nipype interface, wrap this structure in a # Process class result = nipype_factory(result) elif (isinstance(module_object, type) and issubclass(module_object, Interface)): result = nipype_factory(module_object()) elif isinstance(module_object, types.FunctionType): xml = getattr(module_object, 'capsul_xml', None) if xml is None: # Check docstring if module_object.__doc__: match = process_xml_re.search( module_object.__doc__) if match: xml = match.group(0) if xml: result = create_xml_process(module_name, object_name, module_object, xml)() if result is None and module is not None: xml_file = osp.join(osp.dirname(module.__file__), object_name + '.xml') if osp.exists(xml_file): result = create_xml_pipeline(module_name, None, xml_file)() if result is None: raise ValueError("Invalid process_or_id argument. " "Got '{0}' and expect a Process instance/string " "description or an Interface instance/string " "description".format(process_or_id)) # Set the instance default parameters for name, value in six.iteritems(kwargs): result.set_parameter(name, value) if study_config is not None: if result.study_config is not None \ and result.study_config is not study_config: raise ValueError("StudyConfig mismatch in get_process_instance " "for process %s" % result) result.set_study_config(study_config) return result
[docs]def get_node_class(node_type): """ Get a custom node class from module + class string. The class name is optional if the module contains only one node class. It is OK to pass a Node subclass or a Node instance also. """ if inspect.isclass(node_type): if issubclass(node_type, Node): return node_type.__name__, node_type # already a Node class return Node if isinstance(node_type, Node): return node_type.__class__.__name__, node_type.__class__ cls = None try: mod = importlib.import_module(node_type) for name, val in six.iteritems(mod.__dict__): if inspect.isclass(val) and val.__name__ != 'Node' \ and issubclass(val, Node): cls = val break else: return None except ImportError: name = node_type.split('.')[-1] modname = node_type[:-len(name) - 1] mod = importlib.import_module(modname) cls = getattr(mod, name) if cls is None: return None return name, cls
[docs]def get_node_instance(node_type, pipeline, conf_dict=None, name=None, **kwargs): """ Get a custom node instance from a module + class name (see :func:`get_node_class`) and a configuration dict or Controller. The configuration contains parameters needed to instantiate the node type. Each node class may specify its parameters via its class method `configure_node`. Parameters ---------- node_type: str or Node subclass or Node instance node type to be built. Either a class (Node subclass) or a Node instance (the node will be re-instantiated), or a string describing a module and class. pipeline: Pipeline pipeline in which the node will be inserted. conf_dict: dict or Controller configuration dict or Controller defining parameters needed to build the node. The controller should be obtained using the node class's `configure_node()` static method, then filled with the desired values. If not given the node is supposed to be built with no parameters, which will not work for every node type. kwargs: default values of the node instance parameters. """ cls_and_name = get_node_class(node_type) if cls_and_name is None: raise ValueError("Could not find node class %s" % node_type) nname, cls = cls_and_name if not name: name = nname if isinstance(conf_dict, Controller): conf_controller = conf_dict elif conf_dict is not None: if hasattr(cls, 'configure_controller'): conf_controller = cls.configure_controller() if conf_controller is None: raise ValueError("node type %s has a configuration controller " "problem (see %s.configure_controller()" % (node_type, node_type)) conf_controller.import_from_dict(conf_dict) else: conf_controller = Controller() else: if hasattr(cls, 'configure_controller'): conf_controller = cls.configure_controller() else: conf_controller = Controller() if hasattr(cls, 'build_node'): node = cls.build_node(pipeline, name, conf_controller) else: # probably bound to fail... node = cls(pipeline, name, [], []) # Set the instance default parameters for name, value in six.iteritems(kwargs): setattr(node, name, value) return node