Source code for capsul.attributes.fom_completion_engine

# -*- coding: utf-8 -*-

'''
Completion engine for File Organization Models (FOM).

Classes
=======
:class:`FomProcessCompletionEngine`
-----------------------------------
:class:`FomPathCompletionEngine`
--------------------------------
:class:`FomProcessCompletionEngineIteration`
--------------------------------------------
'''

import os
from traits.api import Str, Undefined

from soma.controller import Controller, ControllerTrait
from capsul.pipeline.pipeline import Pipeline
from capsul.pipeline.pipeline_nodes import Node, Switch, ProcessNode
from capsul.attributes.completion_engine import ProcessCompletionEngine, \
    PathCompletionEngine
from capsul.attributes.completion_engine_iteration \
    import ProcessCompletionEngineIteration
from capsul.attributes.attributes_schema import ProcessAttributes, \
    EditableAttributes
from soma.fom import DirectoryAsDict
from soma.path import split_path
from soma.sorted_dictionary import SortedDictionary
from soma.utils.weak_proxy import get_ref
import collections


[docs] class FomProcessCompletionEngine(ProcessCompletionEngine): """ FOM (File Organization Model) implementation of completion engine. * A capsul.study_config.StudyConfig also needs to be configured with FOM module, and selected FOMS and directories: :: from capsul.api import StudyConfig from capsul.study_config.config_modules.fom_config import FomConfig study_config = StudyConfig(modules=StudyConfig.default_modules + ['FomConfig', 'BrainVISAConfig']) study_config.update_study_configuration('study_config.json') * Only then a FomProcessCompletionEngine can be created: :: process = get_process_instance('morphologist') fom_completion_engine = FomProcessCompletionEngine( process, study_config) But generally this creation is handled via the ProcessCompletionEngine.get_completion_engine() function: :: fom_completion_engine = ProcessCompletionEngine.get_completion_engine( process) Parameters ---------- name: string (optional) name of the process in the FOM dictionary. By default the process.name variable will be used. Methods ------- create_attributes_with_fom """ def __init__(self, process, name=None): super(FomProcessCompletionEngine, self).__init__( process=process, name=name) self.input_fom = None self.output_fom = None self.shared_fom = None
[docs] def get_attribute_values(self): ''' Get attributes Controller associated to a process Returns ------- attributes: Controller ''' if not self._rebuild_attributes \ and self.trait('capsul_attributes') is not None \ and hasattr(self, 'capsul_attributes'): return self.capsul_attributes self.create_attributes_with_fom() self._rebuild_attributes = False return self.capsul_attributes
[docs] def create_attributes_with_fom(self): """To get useful attributes by the fom""" # print('create_attributes_with_fom for', self.process, ', FCE:', self) process = get_ref(self.process) if isinstance(process, ProcessNode): process = process.process study_config = process.study_config modules_data = study_config.modules_data # Get attributes in input fom, # search in processes list with a specific to generic order id = getattr(process, 'id', None) names_search_list = [] if hasattr(process, 'context_name'): names_search_list.append(process.context_name) if self.name: names_search_list.append(self.name) names_search_list.append(process.name) if id: names_search_list.append(id) schemas = self._get_schemas() if not hasattr(self, 'capsul_attributes'): self.add_trait('capsul_attributes', ControllerTrait(Controller())) self.capsul_attributes = ProcessAttributes(self.process, schemas) capsul_attributes = self.capsul_attributes # print('create_attributes_with_fom for', self.process, ', FCE:', self, ', foms:', {n: f.fom_names[-1] for n, f in modules_data.foms.items()}) foms = SortedDictionary() foms.update(modules_data.foms) if study_config.auto_fom: # in auto-fom mode, also search in additional and non-loaded FOMs foms.update(modules_data.all_foms) def editable_attributes(attributes, fom, init_values): ea = EditableAttributes() for attribute in attributes: if attribute.startswith('fom_'): continue # skip FOM internals default_value = init_values.get(attribute) ea.fom_fixed = init_values if default_value is None: default_value = fom.attribute_definitions[attribute].get( 'default_value', '') ea.add_trait(attribute, Str(default_value, optional=True)) return ea sel_foms = {} if study_config.input_fom: sel_foms['input'] = study_config.input_fom if study_config.output_fom: sel_foms['output'] = study_config.output_fom if study_config.shared_fom: sel_foms['shared'] = study_config.shared_fom for schema, fom in foms.items(): if fom is None: fom, atp, pta \ = study_config.modules['FomConfig'].load_fom(schema) foms[schema] = fom if schema not in ('input', 'output', 'shared'): # exclude incompatible FOMs for fs in ('input', 'output', 'shared'): found = 0 f = modules_data.foms.get(fs) if f is None: if fs == 'shared': if 'shared' in schema: sel_foms['shared'] = schema found = True elif 'shared' not in schema: sel_foms[fs] = schema found = True elif fom is not f: if sum([n in fom.fom_names for n in f.fom_names]) \ == len(f.fom_names): # all sub-foms of f included in fom: # possibly keep it if fs != 'shared': # print('extend fom:', schema, 'for', fs, ':', f.fom_names) nfound = len( [True for name in names_search_list if fom.patterns.get(name) is not None]) if nfound > found: found = nfound sel_foms[fs] = schema else: found = 1 sel_foms[fs] = schema # print('new foms:', sel_foms) fom_modified = False found = False for schema, fom_type in sel_foms.items(): # print('schema:', schema, fom_type) fom = modules_data.all_foms[fom_type] atp = modules_data.fom_atp['all'].get(fom_type) \ or modules_data.fom_atp.get(schema) if atp is None: continue # we don't stop at first process match as earlier: # we build an attributes dict incrementally using all definitions # so that a specialized process def may just specify attributes, # then reuse a more generic definition, unless the # ".skip_generic" rule is used. fom_patterns = collections.OrderedDict() att_val = {} names = [] for name in names_search_list: fom_patterns_x = fom.patterns.get(name) if fom_patterns_x is not None: found = True if name in names: continue names.append(name) if '.process_attributes' in fom_patterns_x: # .process_attributes set atts to the whole process. att_val.update( fom_patterns_x['.process_attributes']) keys = [k for k in fom_patterns_x.keys() if k not in ('.process_attributes', '.skip_generic')] for k in keys: if k not in fom_patterns: fom_patterns[k] = fom_patterns_x[k] if '.skip_generic' in fom_patterns_x \ and fom_patterns_x['.skip_generic']: break if not found: # print('process', names_search_list, 'not found in', fom_type) continue # print('completion using FOM:', schema, fom_type, 'for', process.id, ', atp:', atp) done_params = set() for name in names: # for each process name, in order for parameter in fom_patterns: if parameter in done_params: continue param_attributes = atp.find_discriminant_attributes( fom_parameter=parameter, fom_process=name) ea = editable_attributes(param_attributes, fom, att_val) if len(ea.user_traits()) != 0: done_params.add(parameter) capsul_attributes.set_parameter_attributes( parameter, schema, ea, att_val, force=True) # normally att_vas passed as fixed_attributes_values in # set_parameter_attributes() should be OK but apparently # it does not work. Wo we record here in addition. # To be improved... proc_att = getattr(capsul_attributes, 'fom_fixed', {}) proc_att.update(att_val) capsul_attributes.fom_fixed = proc_att if (schema not in modules_data.foms or modules_data.foms[schema] != fom or schema not in modules_data.fom_atp or modules_data.fom_atp[schema] != atp or getattr(study_config, '%s_fom' % schema) != fom_type): modules_data.foms[schema] = fom modules_data.fom_atp[schema] = atp setattr(study_config, '%s_fom' % schema, fom_type) fom_modified = True # the shema is found, no need to look into others. break if not found: # no FOM contains the process raise KeyError('No matching FOM for process %s' % repr(names_search_list)) # in a pipeline, we still must iterate over nodes to find switches, # which have their own behaviour. if isinstance(process, Pipeline): attributes = self.capsul_attributes name = process.name for node_name, node in list(process.nodes.items()): if isinstance(node, Switch): subprocess = node if subprocess is None: continue pname = '.'.join([name, node_name]) subprocess_compl = \ ProcessCompletionEngine.get_completion_engine( subprocess, pname) try: sub_attributes \ = subprocess_compl.get_attribute_values() except Exception: try: subprocess_compl = self.__class__(subprocess) sub_attributes \ = subprocess_compl.get_attribute_values() except Exception: continue fom_fixed = getattr(sub_attributes, 'fom_fixed', []) for attribute, trait \ in sub_attributes.user_traits().items(): if attribute in fom_fixed: # don't assign attributes which are set internally continue if attributes.trait(attribute) is None: attributes.add_trait(attribute, trait) setattr(attributes, attribute, getattr(sub_attributes, attribute)) self._get_linked_attributes() # remember foms for later if 'input' in sel_foms: self.input_fom = sel_foms['input'] else: self.input_fom = study_config.input_fom if 'output' in sel_foms: self.output_fom = sel_foms['output'] else: self.output_fom = study_config.output_fom if 'shared' in sel_foms: self.shared_fom = sel_foms['shared'] else: self.shared_fom = study_config.shared_fom
@staticmethod def setup_fom(process): completion_engine \ = ProcessCompletionEngine.get_completion_engine(process) if not isinstance(completion_engine, FomProcessCompletionEngine): return if not hasattr(completion_engine, 'input_fom') \ or completion_engine.input_fom is None: if process.study_config.input_fom != completion_engine.input_fom \ or process.study_config.output_fom \ != completion_engine.output_fom \ or process.study_config.shared_fom \ != completion_engine.shared_fom: # FOMs have changed: rebuild attributes completion_engine._rebuild_attributes = True completion_engine.create_attributes_with_fom() if process.study_config.input_fom != completion_engine.input_fom: process.study_config.input_fom = completion_engine.input_fom if process.study_config.output_fom != completion_engine.output_fom: process.study_config.output_fom = completion_engine.output_fom if process.study_config.shared_fom != completion_engine.shared_fom: process.study_config.shared_fom = completion_engine.shared_fom
[docs] def path_attributes(self, filename, parameter=None): """By the path, find value of attributes""" pta = self.process.study_config.modules_data.fom_pta['input'] # Extract the attributes from the first result returned by # parse_directory liste = split_path(filename) len_element_to_delete = 1 for element in liste: if element != os.sep: len_element_to_delete \ = len_element_to_delete + len(element) + 1 new_value = filename[len_element_to_delete:len(filename)] try: #import logging #logging.root.setLevel( logging.DEBUG ) #path, st, self.attributes = pta.parse_directory( # DirectoryAsDict.paths_to_dict( new_value), # log=logging ).next() path, st, attributes = next(pta.parse_directory( DirectoryAsDict.paths_to_dict( new_value) )) break except StopIteration: if element == liste[-1]: raise ValueError( '%s is not recognized for parameter "%s" of "%s"' % (new_value, parameter, self.process.name)) attrib_values = self.get_attribute_values().export_to_dict() for att in attributes: if att in list(attrib_values.keys()): setattr(attrib_values, att, attributes[att]) return attributes
[docs] def get_path_completion_engine(self): ''' ''' return FomPathCompletionEngine()
@staticmethod def _fom_completion_factory(process, name): ''' Factoty inserted in attributed_processFactory ''' study_config = process.get_study_config() if study_config is None \ or 'FomConfig' not in study_config.modules \ or study_config.use_fom == False: #print("no FOM:", study_config, study_config.modules.keys()) return None # Non Fom config, no way it could work try: pfom = FomProcessCompletionEngine(process, name) return pfom except KeyError: # process not in FOM pass return None
[docs] class FomPathCompletionEngine(PathCompletionEngine):
[docs] def attributes_to_path(self, process, parameter, attributes): ''' Build a path from attributes Parameters ---------- process: Process instance parameter: str attributes: ProcessAttributes instance (Controller) ''' FomProcessCompletionEngine.setup_fom(process) input_fom = process.study_config.modules_data.foms['input'] output_fom = process.study_config.modules_data.foms['output'] input_atp = process.study_config.modules_data.fom_atp['input'] output_atp = process.study_config.modules_data.fom_atp['output'] #Create completion names_search_list = [] # same as in create_attributes_with_fom, search in specific to generic # order if getattr(process, 'context_name', ''): names_search_list.append(process.context_name) if process.name not in names_search_list: names_search_list.append(process.name) if isinstance(process, Node): trait = process.get_trait(parameter) name = process.name if hasattr(process, 'process'): cn = getattr(process.process, 'context_name', None) if cn and cn not in names_search_list: names_search_list.append(process.process.context_name) if process.process.name not in names_search_list: names_search_list.append(process.process.name) else: trait = process.trait(parameter) name = process.id if name not in names_search_list: names_search_list.append(name) if trait.output: atp = output_atp fom = output_fom else: atp = input_atp fom = input_fom for fname in names_search_list: fom_patterns = fom.patterns.get(fname) if fom_patterns is not None and parameter in fom_patterns: name = fname break else: raise KeyError( f'Process not found in FOMs amongst {names_search_list}; ' f'fom_names: {fom.fom_names}') allowed_attributes = set(attributes.user_traits().keys()) allowed_attributes.discard('parameter') allowed_attributes.discard('process_name') #allowed_attributes = set(attributes.get_parameters_attributes()[ #parameter].keys()) #allowed_attributes.discard('type') #allowed_attributes.discard('generated_by_parameter') #allowed_attributes.discard('generated_by_process') # Select only the attributes that are discriminant for this # parameter otherwise other attributes can prevent the appropriate # rule to match parameter_attributes = atp.find_discriminant_attributes( fom_parameter=parameter, fom_process=name) d = {i: getattr(attributes, i) for i in parameter_attributes if i in allowed_attributes and getattr(attributes, i) not in (None, Undefined)} # merge values set internally d.update(getattr(attributes, 'fom_fixed', {})) d['fom_process'] = name d['fom_parameter'] = parameter d['fom_format'] = 'fom_preferred' path_value = None #path_values = [] #debug = getattr(self, 'debug', None) for h in atp.find_paths(d): # , debug=debug): path_value = h[0] # find_paths() is a generator which can sometimes generate # several values (formats). We are only interested in the # first one. #path_values.append(h[0]) break return path_value
[docs] def open_values_attributes(self, process, parameter): ''' Attributes with "open" values, not restricted to a list of possible values ''' # print('open_values_attributes', process.id, parameter) FomProcessCompletionEngine.setup_fom(process) for schema in ('input', 'output', 'shared'): fom = process.study_config.modules_data.foms[schema] atp = process.study_config.modules_data.fom_atp[schema] # print('fom:', fom.fom_names) # print('atp:', atp) name = getattr(process, 'id', process.name) names_search_list = [] if hasattr(process, 'id'): names_search_list.append(process.id) names_search_list += [process.name, getattr(process, 'context_name', '')] for fname in names_search_list: fom_patterns = fom.patterns.get(fname) if fom_patterns is not None: name = fname break else: continue values = atp.find_attributes_values() attributes = [k for k, v in values.items() if k not in ('fom_name', 'fom_process', 'fom_parameter', 'fom_format') and len(v) == 2 and v[1] == (u'', )] return attributes return None
[docs] def allowed_formats(self, process, parameter): ''' List of possible formats names associated with a parameter ''' FomProcessCompletionEngine.setup_fom(process) formats = [] for schema in ('input', 'output', 'shared'): atp = process.study_config.modules_data.fom_atp[schema] sub_f = atp.allowed_formats_for_parameter(process.name, parameter) formats += [f for f in sub_f if f not in formats] return formats
[docs] def allowed_extensions(self, process, parameter): ''' List of possible file extensions associated with a parameter ''' FomProcessCompletionEngine.setup_fom(process) exts = set() for schema in ('input', 'output', 'shared'): atp = process.study_config.modules_data.fom_atp[schema] sub_e = atp.allowed_extensions_for_parameter( process_name=process.name, param=parameter) exts.update(sub_e) # sort and add dots exts2 = sorted(['.%s' % e for e in exts if e]) if '' in exts: exts2.append('') return exts2
[docs] class FomProcessCompletionEngineIteration(ProcessCompletionEngineIteration): # the general iteration system actually works in this case. # No need to specialize. pass