#!/usr/bin/env python
from __future__ import print_function
from __future__ import absolute_import
import os
from brainvisa.axon import processes
from brainvisa import processes as procbv
# from brainvisa.data import neuroData
# from brainvisa.data import neuroDiskItems
from brainvisa.data.readdiskitem import ReadDiskItem
from brainvisa.data.writediskitem import WriteDiskItem
# from traits import api as traits
from brainvisa.data import neuroHierarchy
import sys
from argparse import ArgumentParser
import yaml
import json
from six.moves import range
from collections import OrderedDict
from . import axon_to_capsul
import six
[docs]class AxonFsoToFom(object):
'''
Converter for Axon hierarchies (File System Ontologies) to CAPSUL/Soma-Base
FOM (File Organization Model).
Converts parameters for a given process according to rules taken from
actual data: a main input data is used for Axon completion, then all
parameters are analyzed and converted to FOM entries.
'''
def __init__(self, init_fom_def=OrderedDict(), formats_fom={}):
'''
Parameters
----------
init_fom_def: dict or (preferably) collections.OrderedDict
FOM to be completed. An existing one may be used, otherwise a new
FOM dictionary is created.
formats_fom: dict or collections.OrderedDict
Formats and formats lists definitions to be used. They are expected
to match Axon formats and formats lists definitions.
'''
self.current_fom_def = init_fom_def
self.formats_fom = formats_fom
def _find_rule(self, item):
'''
Find Axon FSO rule for a given distitem
'''
database_name = item.get('_database')
if not database_name:
print(item, 'not in any database.')
return (None, {})
name_in_db = item.name[len(database_name) + 1:]
database = neuroHierarchy.databases.database(database_name)
rules = database.fso.typeToPatterns.get(item.type, None)
attribs = item.hierarchyAttributes()
for rule in rules:
m = rule.pattern.match(name_in_db, attribs)
if m:
return (rule, m)
return (None, {})
def _get_fom_formats(self, formats):
'''
Get FOM format definiton name in format FOM, adding it in format FOM
it it is not found in it. Formats lists are handled.
'''
formats_lists = self.formats_fom.get('format_lists', {})
fnames = set()
# remove duplicates
nformats = []
for f in formats:
if f.name not in fnames:
fnames.add(f.name)
nformats.append(f)
if f.name == 'Directory':
fdir = f
formats = nformats
# special case of Directory format
if len(fnames) >= 2 and 'Directory' in fnames:
fnames.remove('Directory')
formats.remove(fdir)
for flist_name, flist in six.iteritems(formats_lists):
if set(flist) == fnames:
return flist_name
all_formats = self.formats_fom.setdefault('formats', {})
# complete global formats list
for format in formats:
if format.name == 'Directory' and format.name not in fnames:
continue
if format.name not in all_formats:
all_formats[format.name] \
= self._extensions_from_format(format)[0]
return [f.name for f in formats]
def _extensions_from_format(self, format):
'''
File extensions for a given Axon Format
'''
exts = []
for pattern in format.patterns.patterns:
sp = pattern.pattern.split('|')
if len(sp) != 2:
print('malformed pattern for format:', format, ':', pattern)
continue
t, p = sp
x = p.split('*.')
if len(x) == 1:
exts.append('')
else:
exts.append(x[-1])
return exts
def _translate_param_name(self, process, name):
'''
Translate a parameter name for a given process into the resulting one
in Capsul pipeline.
By default, just output the input name.
'''
return name
def _transform_rule(self, rule, matched_attr, input_attr):
'''
Generates the FOM rule frol FSO rule and attributes
Parameters
----------
rule: FSO rule
matched_attr: dict
FSO attributes that made the rule match
input_attr: dict
input attributes and values
Returns
-------
fom_pattern: string
fom_added_attr: dict
new attributes used in the rule, not in input_attr
defaults: dict
default attribute values used in the rule
'''
rule_attribs = dict([(k, '<%s>' % k) for k in matched_attr.keys()])
non_transformed = ['filename_variable', ]
# such attributes are not considered attributes: take their real value
for k in non_transformed:
value = matched_attr.get(k)
if value is not None:
rule_attribs[k] = value
fom_pattern = rule.pattern.unmatch(rule_attribs, {})
fom_added_attr = {}
defaults = {}
for k, value in six.iteritems(matched_attr):
if k in rule.defaultAttributesValues:
defaults[k] = {'default_value':
rule.defaultAttributesValues[k]}
if k not in input_attr and k not in non_transformed \
and k not in rule.defaultAttributesValues:
fom_added_attr[k] = value
return fom_pattern, fom_added_attr, defaults
[docs] def fso_to_fom(self, proc_name, node_name, data):
'''
Transform a process or pipeline parameters into FOM rules. This is the
main function in the class.
Subprocesses of a pipeline will be added to the FOM too.
Parameters
----------
proc_name: string
identifier of the Axon process
node_name: string
name to be used in the FOM
data: string
input data (file name) for the first input param opf the process.
It will be used to perform Axon completion, then to get values
and patterns for all parameters. Thus it must be a valid input
data, existing in an Axon database.
Returns
-------
new_fom_def: collections.OrderedDict
new FOM definition (also found in self.current_fom_def)
default_atts: dict
attributes which have default values. Also adde in the FOM, in
the "attribute_definitions" section.
'''
# print(proc_name, node_name, data)
process = procbv.getProcessInstance(proc_name)
signature = process.signature
for name, param in six.iteritems(signature):
if isinstance(param, ReadDiskItem):
break
else:
raise ValueError('No ReadDiskItem in signature of process %s'
% proc_name)
# print('process %s, set param: %s' % (proc_name, name))
setattr(process, name, data)
value = getattr(process, name)
if value is None:
raise ValueError(
'The input value for param %s.%s could not be set (using: %s)'
% (node_name, name, data))
input_rule, input_attr = self._find_rule(value)
self._done_params = set()
self.current_fom_def, default_atts = self._fso_to_fom_parse(
process, node_name, input_attr)
node_names = {}
self._process_nodes_fso_to_fom(process, node_name, default_atts,
input_attr, node_names)
del self._done_params
return self.current_fom_def, default_atts
def _fso_to_fom_parse(self, process, node_name, input_attr,
current_fom_def=None):
'''
Parse a single Axon process parameters and generate FOM rules for them.
Parameters must all be already set.
Doesn't handle children of a pipeline.
'''
if current_fom_def is None:
current_fom_def = self.current_fom_def
proc_fom = current_fom_def.setdefault(node_name, OrderedDict())
signature = process.signature
default_atts = OrderedDict()
for name, param in six.iteritems(signature):
if not isinstance(param, ReadDiskItem):
# skip this param
continue
param_name = self._translate_param_name(process, name)
value = getattr(process, name)
# print(' %s.%s: %s' % (node_name, name, value))
if value is None:
continue
if self._check_and_mark_done(process, name):
# print('already done:', process, name)
continue
database_name = value.get('_database')
rule, matched_attr = self._find_rule(value)
if rule is None:
continue
database = neuroHierarchy.databases.database(database_name)
if database.fso.name == 'shared':
fom_type = 'shared'
elif self._is_output(process, name, param):
fom_type = 'output'
else:
fom_type = 'input'
fom_rule, fom_added_attr, added_defaults \
= self._transform_rule(rule, matched_attr, input_attr)
default_atts.update(added_defaults)
fom_format = self._get_fom_formats(param.formats)
fom_pattern = [['%s:%s' % (fom_type, fom_rule), fom_format]]
if fom_added_attr:
fom_pattern[0].append(fom_added_attr)
proc_fom[param_name] = fom_pattern
if len(proc_fom) == 0:
del current_fom_def[node_name]
return current_fom_def, default_atts
def _is_output(self, process, param_name, param):
'''
Checks if a given Axon process parameter is an output.
'''
if isinstance(param, WriteDiskItem):
return True
# check if this input is connected to an upstream output
linkdefs = process._links.get(param_name)
if linkdefs is not None:
for linkdef in linkdefs:
other_proc, other_param_name = linkdef[:2]
if other_proc is None \
or other_proc == axon_to_capsul.use_weak_ref(process):
# internal link, doesn't count
continue
other_param = other_proc.signature[other_param_name]
if isinstance(other_param, WriteDiskItem) \
and getattr(process, param_name) \
== getattr(other_proc, other_param_name):
return True
return False
def _check_and_mark_done(self, process, param_name):
'''
Set param_name parameter of process process into processed list. Links
are followed recursively to avoid having another parameter duplicating
the current one.
Returns
-------
True if the parameter was already in the done list, False otherwise
'''
proc_ref = axon_to_capsul.use_weak_ref(process)
if (proc_ref, param_name) in self._done_params:
return True
stack = [(proc_ref, param_name)]
value = getattr(process, param_name)
while stack:
cprocess, cparam_name = stack.pop(0)
self._done_params.add((cprocess, cparam_name))
# propagate through links
links = cprocess()._links.get(cparam_name, [])
for link in links:
other_end = (axon_to_capsul.use_weak_ref(link[0]), link[1])
if other_end[0] is not None \
and getattr(other_end[0](), link[1]) == value \
and other_end not in self._done_params:
stack.append(other_end)
return False
def _process_nodes_fso_to_fom(self, process, node_name, default_atts,
input_attr, node_names):
'''
Walks a pipeline tree structure and adds all its children to the FOM
'''
if not hasattr(process, 'executionNode') \
or process.executionNode() is None:
return # not a pipeline
a_to_c = axon_to_capsul.AxonToCapsul()
nodes = [(process.executionNode(), node_name, self.current_fom_def)]
while nodes:
node, current_node_name, current_fom_def = nodes.pop(0)
if isinstance(node, procbv.ProcessExecutionNode):
new_def, added_default_atts = self._fso_to_fom_parse(
node._process, current_node_name, input_attr,
current_fom_def)
default_atts.update(added_default_atts)
for child_name in node.childrenNames():
child = node.child(child_name)
if isinstance(child, procbv.ProcessExecutionNode):
new_node_name = a_to_c.make_node_name(
'.'.join([current_node_name, child_name]),
node_names,
None)
else:
new_node_name = current_node_name
nodes.append((child, new_node_name, current_fom_def))
[docs]def ordered_load(stream, Loader=yaml.Loader, object_pairs_hook=OrderedDict):
'''
http://stackoverflow.com/questions/5121931/in-python-how-can-you-load-yaml-mappings-as-ordereddicts
'''
class OrderedLoader(Loader):
pass
def construct_mapping(loader, node):
loader.flatten_mapping(node)
return object_pairs_hook(loader.construct_pairs(node))
OrderedLoader.add_constructor(
yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG,
construct_mapping)
return yaml.load(stream, OrderedLoader)
# usage example:
# ordered_load(stream, yaml.SafeLoader)
[docs]def ordered_dump(data, stream=None, Dumper=yaml.Dumper, **kwds):
'''
http://stackoverflow.com/questions/5121931/in-python-how-can-you-load-yaml-mappings-as-ordereddicts
'''
class OrderedDumper(Dumper):
pass
def _dict_representer(dumper, data):
return dumper.represent_mapping(
yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG,
list(data.items()))
OrderedDumper.add_representer(OrderedDict, _dict_representer)
return yaml.dump(data, stream, OrderedDumper, **kwds)
# usage:
# ordered_dump(data, Dumper=yaml.SafeDumper)
[docs]def fso_to_fom_main(argv):
'''
Main FSO hierarchy to FOM conversion for one or several processes.
Contains an argument parser for the __main__ function.
'''
basedir = os.path.dirname(__file__)
for i in range(3):
basedir = os.path.dirname(basedir)
foms_dir = os.path.join(basedir, 'share', 'foms')
def_formats_fom = os.path.join(foms_dir, 'brainvisa-formats-3.2.0.json')
parser = ArgumentParser(
description='Convert an Axon FSO hierarchy into FOM entries, for '
'given Axon processes.')
parser.add_argument('-p', '--process', dest='process', action='append',
help='input process ID. Ex: NobiasHistoAnalysis. Several -p options '
'are allowed. Processes may be specified as id,name (no space) to '
'force a new name')
parser.add_argument('-o', '--output', dest='output',
help='output FOM rules files')
parser.add_argument('-a', '--append', action='store_true',
help='append output to the end of an existing FOM file')
parser.add_argument('-d', '--data', dest='data',
help='input data file for process 1st arg (ex: '
'/home/bob/bvdata/center/subject01/t1mri/default_acquisition/subject01.nii')
parser.add_argument('-f', '--formats', dest='formats',
help='formats FOM file [default: %s]' % def_formats_fom)
parser.add_argument('-n', '--name', dest='fom_name',
help='set this FOM name [default: output file name]')
parser.add_argument('-F', '--Formats', dest='output_formats',
help='output file name for updated formats FOM')
args = parser.parse_args(argv)
# from brainvisa.configuration import neuroConfig
# neuroConfig.ignoreValidation = True
processes.initializeProcesses()
append = args.append
fom_def = OrderedDict()
if args.append:
fom_def = ordered_load(open(args.output))
if args.formats:
formats = args.formats
else:
formats = None
fom_imports = fom_def.get('fom_import')
if fom_imports:
formats_list = [x for x in fom_imports if x.find('formats') >= 0]
if len(formats_list) >= 1:
formats = os.path.join(foms_dir, formats_list[0] + '.json')
if not formats:
formats = def_formats_fom
if formats:
print('using formats FOM:', formats)
formats_fom = ordered_load(open(formats))
else:
print('NO formats FOM !')
# raise RuntimeError('No formats FOM')
formats_fom = {}
fom_name = args.fom_name
if 'fom_name' not in fom_def or fom_name is not None:
if fom_name is None:
fom_name = os.path.basename(args.output)
p = fom_name.rfind('.')
if p >= 0:
fom_name = fom_name[: p]
fom_def['fom_name'] = fom_name
fom_imports = fom_def.setdefault('fom_import', [])
if formats_fom:
formats_bname = os.path.basename(formats)
formats_bname = formats_bname[: formats_bname.rfind('.')]
if formats_bname not in fom_imports:
fom_imports.append(formats_bname)
if 'shared-brainvisa-1.0' not in fom_imports:
fom_imports.append('shared-brainvisa-1.0')
default_atts = fom_def.setdefault("attribute_definitions", OrderedDict())
current_fom_def = fom_def.setdefault('processes', OrderedDict())
fso_to_fom = AxonFsoToFom(current_fom_def, formats_fom)
for proc_spec in args.process:
proc_spec_list = proc_spec.split(',')
proc_name = proc_spec_list[0]
node_name = proc_spec_list[-1]
new_def, added_default_atts = fso_to_fom.fso_to_fom(
proc_name, node_name, args.data)
default_atts.update(added_default_atts)
# ordered_dump(fom_def, open(args.output, 'w'))
json.dump(fom_def, open(args.output, 'w'), indent=4)
if args.output_formats:
json.dump(fso_to_fom.formats_fom, open(args.output_formats, 'w'),
indent=4)
if __name__ == '__main__':
fso_to_fom_main(sys.argv[1:])