# -*- coding: utf-8 -*-
'''
Completion engine for File Organization Models (FOM).
Classes
=======
:class:`FomProcessCompletionEngine`
-----------------------------------
:class:`FomPathCompletionEngine`
--------------------------------
:class:`FomProcessCompletionEngineIteration`
--------------------------------------------
'''
from __future__ import print_function
from __future__ import absolute_import
import os
import six
try:
from traits.api import Str, HasTraits, List
except ImportError:
from enthought.traits.api import Str, HasTraits, List
from soma.controller import Controller, ControllerTrait
from capsul.pipeline.pipeline import Pipeline
from capsul.pipeline.pipeline_nodes import Node, Switch, ProcessNode
from capsul.attributes.completion_engine import ProcessCompletionEngine, \
ProcessCompletionEngineFactory, PathCompletionEngine, \
PathCompletionEngineFactory
from capsul.attributes.completion_engine_iteration \
import ProcessCompletionEngineIteration
from capsul.pipeline.process_iteration import ProcessIteration
from capsul.attributes.attributes_schema import ProcessAttributes, \
EditableAttributes
from soma.fom import DirectoryAsDict
from soma.path import split_path
from soma.sorted_dictionary import SortedDictionary
[docs]class FomProcessCompletionEngine(ProcessCompletionEngine):
"""
FOM (File Organization Model) implementation of completion engine.
* A capsul.study_config.StudyConfig also needs to be configured with FOM
module, and selected FOMS and directories:
::
from capsul.api import StudyConfig
from capsul.study_config.config_modules.fom_config import FomConfig
study_config = StudyConfig(modules=StudyConfig.default_modules
+ ['FomConfig', 'BrainVISAConfig'])
study_config.update_study_configuration('study_config.json')
* Only then a FomProcessCompletionEngine can be created:
::
process = get_process_instance('morphologist')
fom_completion_engine = FomProcessCompletionEngine(
process, study_config)
But generally this creation is handled via the
ProcessCompletionEngine.get_completion_engine() function:
::
fom_completion_engine = ProcessCompletionEngine.get_completion_engine(
process)
Parameters
----------
name: string (optional)
name of the process in the FOM dictionary. By default the
process.name variable will be used.
Methods
-------
create_attributes_with_fom
"""
def __init__(self, process, name=None):
super(FomProcessCompletionEngine, self).__init__(
process=process, name=name)
self.input_fom = None
self.output_fom = None
self.shared_fom = None
[docs] def get_attribute_values(self):
''' Get attributes Controller associated to a process
Returns
-------
attributes: Controller
'''
if not self._rebuild_attributes \
and self.trait('capsul_attributes') is not None \
and hasattr(self, 'capsul_attributes'):
return self.capsul_attributes
schemas = self._get_schemas()
#schemas = self.process.get_study_config().modules_data.foms.keys()
if not hasattr(self, 'capsul_attributes'):
self.add_trait('capsul_attributes', ControllerTrait(Controller()))
self.capsul_attributes = ProcessAttributes(self.process, schemas)
self._rebuild_attributes = False
self.create_attributes_with_fom()
return self.capsul_attributes
[docs] def create_attributes_with_fom(self):
"""To get useful attributes by the fom"""
process = self.process
if isinstance(process, ProcessNode):
process = process.process
study_config = process.study_config
modules_data = study_config.modules_data
#Get attributes in input fom
id = getattr(process, 'id', None)
names_search_list = [self.name]
if id:
names_search_list.append(id)
names_search_list += [process.name,
getattr(process, 'context_name', '')]
capsul_attributes = self.get_attribute_values()
matching_fom = False
input_found = False
output_found = False
foms = SortedDictionary()
foms.update(modules_data.foms)
if study_config.auto_fom:
# in auto-fom mode, also search in additional and non-loaded FOMs
for schema, fom in six.iteritems(modules_data.all_foms):
if schema not in (study_config.input_fom,
study_config.output_fom,
study_config.shared_fom):
foms[schema] = fom
def editable_attributes(attributes, fom):
ea = EditableAttributes()
for attribute in attributes:
if attribute.startswith('fom_'):
continue # skip FOM internals
default_value = fom.attribute_definitions[attribute].get(
'default_value', '')
ea.add_trait(attribute, Str(default_value, optional=True))
return ea
for schema, fom in six.iteritems(foms):
if fom is None:
fom, atp, pta \
= study_config.modules['FomConfig'].load_fom(schema)
else:
atp = modules_data.fom_atp.get(schema) \
or modules_data.fom_atp['all'].get(schema)
if atp is None:
continue
for name in names_search_list:
fom_patterns = fom.patterns.get(name)
if fom_patterns is not None:
break
else:
continue
if not matching_fom:
matching_fom = True
if schema == 'input':
input_found = True
elif schema == 'output':
output_found = True
elif matching_fom in (False, True, None):
matching_fom = schema, fom, atp, fom_patterns
# print('completion using FOM:', schema, 'for', process.id)
#break
for parameter in fom_patterns:
param_attributes = atp.find_discriminant_attributes(
fom_parameter=parameter, fom_process=name)
ea = editable_attributes(param_attributes, fom)
try:
capsul_attributes.set_parameter_attributes(
parameter, schema, ea, {})
except KeyError:
# param already registered
pass
if not matching_fom:
raise KeyError('Process not found in FOMs')
#if isinstance(matching_fom, tuple): print('matching_fom:', matching_fom[0])
#else: print('matching fom:', matching_fom)
if not input_found and matching_fom is not True:
fom_type, fom, atp, fom_patterns = matching_fom
schema = 'input'
for parameter in fom_patterns:
param_attributes = atp.find_discriminant_attributes(
fom_parameter=parameter, fom_process=name)
ea = editable_attributes(param_attributes, fom)
try:
capsul_attributes.set_parameter_attributes(
parameter, schema, ea, {})
except KeyError:
# param already registered
pass
modules_data.foms[schema] = fom
modules_data.fom_atp[schema] = atp
study_config.input_fom = fom_type
if not output_found and matching_fom is not True:
fom_type, fom, atp, fom_patterns = matching_fom
schema = 'output'
for parameter in fom_patterns:
param_attributes = atp.find_discriminant_attributes(
fom_parameter=parameter, fom_process=name)
ea = editable_attributes(param_attributes, fom)
try:
capsul_attributes.set_parameter_attributes(
parameter, schema, ea, {})
except KeyError:
# param already registered
pass
modules_data.foms[schema] = fom
modules_data.fom_atp[schema] = atp
study_config.output_fom = fom_type
# in a pipeline, we still must iterate over nodes to find switches,
# which have their own behaviour.
if isinstance(process, Pipeline):
attributes = self.capsul_attributes
name = process.name
for node_name, node in six.iteritems(process.nodes):
if isinstance(node, Switch):
subprocess = node
if subprocess is None:
continue
pname = '.'.join([name, node_name])
subprocess_compl = \
ProcessCompletionEngine.get_completion_engine(
subprocess, pname)
try:
sub_attributes \
= subprocess_compl.get_attribute_values()
except Exception:
try:
subprocess_compl = self.__class__(subprocess)
sub_attributes \
= subprocess_compl.get_attribute_values()
except Exception:
continue
for attribute, trait \
in six.iteritems(sub_attributes.user_traits()):
if attributes.trait(attribute) is None:
attributes.add_trait(attribute, trait)
setattr(attributes, attribute,
getattr(sub_attributes, attribute))
self._get_linked_attributes()
# remember foms for later
self.input_fom = study_config.input_fom
self.output_fom = study_config.output_fom
self.shared_fom = study_config.shared_fom
@staticmethod
def setup_fom(process):
completion_engine \
= ProcessCompletionEngine.get_completion_engine(process)
if not isinstance(completion_engine, FomProcessCompletionEngine):
return
if not hasattr(completion_engine, 'input_fom') \
or completion_engine.input_fom is None:
completion_engine.create_attributes_with_fom()
if process.study_config.input_fom != completion_engine.input_fom:
process.study_config.input_fom = completion_engine.input_fom
if process.study_config.output_fom != completion_engine.output_fom:
process.study_config.output_fom = completion_engine.output_fom
if process.study_config.shared_fom != completion_engine.shared_fom:
process.study_config.shared_fom = completion_engine.shared_fom
[docs] def path_attributes(self, filename, parameter=None):
"""By the path, find value of attributes"""
pta = self.process.study_config.modules_data.fom_pta['input']
# Extract the attributes from the first result returned by
# parse_directory
liste = split_path(filename)
len_element_to_delete = 1
for element in liste:
if element != os.sep:
len_element_to_delete \
= len_element_to_delete + len(element) + 1
new_value = filename[len_element_to_delete:len(filename)]
try:
#import logging
#logging.root.setLevel( logging.DEBUG )
#path, st, self.attributes = pta.parse_directory(
# DirectoryAsDict.paths_to_dict( new_value),
# log=logging ).next()
path, st, attributes = next(pta.parse_directory(
DirectoryAsDict.paths_to_dict( new_value) ))
break
except StopIteration:
if element == liste[-1]:
raise ValueError(
'%s is not recognized for parameter "%s" of "%s"'
% (new_value, parameter, self.process.name))
attrib_values = self.get_attribute_values().export_to_dict()
for att in attributes:
if att in list(attrib_values.keys()):
setattr(attrib_values, att, attributes[att])
return attributes
[docs] def get_path_completion_engine(self):
'''
'''
return FomPathCompletionEngine()
@staticmethod
def _fom_completion_factory(process, name):
''' Factoty inserted in attributed_processFactory
'''
study_config = process.get_study_config()
if study_config is None \
or 'FomConfig' not in study_config.modules \
or study_config.use_fom == False:
#print("no FOM:", study_config, study_config.modules.keys())
return None # Non Fom config, no way it could work
try:
pfom = FomProcessCompletionEngine(process, name)
return pfom
except KeyError:
# process not in FOM
pass
return None
class FomPathCompletionEngine(PathCompletionEngine):
def attributes_to_path(self, process, parameter, attributes):
''' Build a path from attributes
Parameters
----------
process: Process instance
parameter: str
attributes: ProcessAttributes instance (Controller)
'''
FomProcessCompletionEngine.setup_fom(process)
input_fom = process.study_config.modules_data.foms['input']
output_fom = process.study_config.modules_data.foms['output']
input_atp = process.study_config.modules_data.fom_atp['input']
output_atp = process.study_config.modules_data.fom_atp['output']
#Create completion
names_search_list = []
if isinstance(process, Node):
trait = process.get_trait(parameter)
name = process.name
if hasattr(process, 'process'):
if hasattr(process.process, 'context_name'):
names_search_list.append(process.process.context_name)
names_search_list.append(process.process.name)
else:
trait = process.trait(parameter)
name = process.id
names_search_list.append(name)
if trait.output:
atp = output_atp
fom = output_fom
else:
atp = input_atp
fom = input_fom
names_search_list += [process.name,
getattr(process, 'context_name', '')]
for fname in names_search_list:
fom_patterns = fom.patterns.get(fname)
if fom_patterns is not None:
name = fname
break
else:
raise KeyError('Process not found in FOMs amongst %s' \
% repr(names_search_list))
allowed_attributes = set(attributes.user_traits().keys())
allowed_attributes.discard('parameter')
allowed_attributes.discard('process_name')
#allowed_attributes = set(attributes.get_parameters_attributes()[
#parameter].keys())
#allowed_attributes.discard('type')
#allowed_attributes.discard('generated_by_parameter')
#allowed_attributes.discard('generated_by_process')
# Select only the attributes that are discriminant for this
# parameter otherwise other attibutes can prevent the appropriate
# rule to match
parameter_attributes = atp.find_discriminant_attributes(
fom_parameter=parameter, fom_process=name)
d = dict((i, getattr(attributes, i)) \
for i in parameter_attributes if i in allowed_attributes)
d['fom_process'] = name
d['fom_parameter'] = parameter
d['fom_format'] = 'fom_preferred'
path_value = None
#path_values = []
#debug = getattr(self, 'debug', None)
for h in atp.find_paths(d): # , debug=debug):
path_value = h[0]
# find_paths() is a generator which can sometimes generate
# several values (formats). We are only interested in the
# first one.
#path_values.append(h[0])
break
return path_value
def open_values_attributes(self, process, parameter):
''' Attributes with "open" values, not restricted to a list of possible
values
'''
# print('open_values_attributes', process.id, parameter)
FomProcessCompletionEngine.setup_fom(process)
for schema in ('input', 'output', 'shared'):
fom = process.study_config.modules_data.foms[schema]
atp = process.study_config.modules_data.fom_atp[schema]
# print('fom:', fom.fom_names)
# print('atp:', atp)
name = getattr(process, 'id', process.name)
names_search_list = []
if hasattr(process, 'id'):
names_search_list.append(process.id)
names_search_list += [process.name,
getattr(process, 'context_name', '')]
for fname in names_search_list:
fom_patterns = fom.patterns.get(fname)
if fom_patterns is not None:
name = fname
break
else:
continue
values = atp.find_attributes_values()
attributes = [k for k, v in six.iteritems(values)
if k not in ('fom_name', 'fom_process',
'fom_parameter', 'fom_format')
and len(v) == 2 and v[1] == (u'', )]
return attributes
return None
def allowed_formats(self, process, parameter):
''' List of possible formats names associated with a parameter
'''
FomProcessCompletionEngine.setup_fom(process)
formats = []
for schema in ('input', 'output', 'shared'):
atp = process.study_config.modules_data.fom_atp[schema]
sub_f = atp.allowed_formats_for_parameter(process.name, parameter)
formats += [f for f in sub_f if f not in formats]
return formats
def allowed_extensions(self, process, parameter):
''' List of possible file extensions associated with a parameter
'''
FomProcessCompletionEngine.setup_fom(process)
exts = set()
for schema in ('input', 'output', 'shared'):
atp = process.study_config.modules_data.fom_atp[schema]
sub_e = atp.allowed_extensions_for_parameter(
process_name=process.name, param=parameter)
exts.update(sub_e)
# sort and add dots
exts2 = sorted(['.%s' % e for e in exts if e])
if '' in exts:
exts2.append('')
return exts2
class FomProcessCompletionEngineIteration(ProcessCompletionEngineIteration):
def get_iterated_attributes(self):
process = self.process
if isinstance(process, ProcessNode):
process = process.process
subprocess = process.process
FomProcessCompletionEngine.setup_fom(subprocess)
input_fom = subprocess.study_config.modules_data.foms['input']
output_fom = subprocess.study_config.modules_data.foms['output']
input_atp = subprocess.study_config.modules_data.fom_atp['input']
output_atp = subprocess.study_config.modules_data.fom_atp['output']
name = subprocess.id
names_search_list = (subprocess.id, subprocess.name,
getattr(subprocess, 'context_name', ''))
for fom in (input_fom, output_fom):
for fname in names_search_list:
fom_patterns = fom.patterns.get(fname)
if fom_patterns is not None:
name = fname
break
else:
continue
break
else:
raise KeyError('Process not found in FOMs amongst %s' \
% repr(names_search_list))
iter_attrib = set()
if not self.process.iterative_parameters:
params = list(subprocess.user_traits().keys())
else:
params = self.process.iterative_parameters
for parameter in params:
if subprocess.trait(parameter).output:
atp = output_atp
else:
atp = input_atp
parameter_attributes = set([
x for x in atp.find_discriminant_attributes(
fom_parameter=parameter, fom_process=name)
if not x.startswith('fom_')])
iter_attrib.update(parameter_attributes)
return iter_attrib
#class FomPathCompletionEngineFactory(PathCompletionEngineFactory):
#factory_id = 'fom'
#def get_path_completion_engine(self, process):
#return FomPathCompletionEngine(process)