Source code for brainvisa.processing.axon_to_capsul

#!/usr/bin/env python

from __future__ import print_function
from __future__ import absolute_import
from brainvisa.axon import processes
from capsul.api import Process
from capsul.api import Pipeline
from brainvisa import processes as procbv
from brainvisa.data import neuroData
from brainvisa.data.readdiskitem import ReadDiskItem
from brainvisa.data.writediskitem import WriteDiskItem
from brainvisa.data import neuroDiskItems
from brainvisa.processing.capsul_process import CapsulProcess
from traits import api as traits
import weakref
import sys
import re
import io
import inspect
import six

from optparse import OptionParser
from six.moves import zip


def choice_options(choice):
    return [repr(x[min(1, len(x) - 1)]) for x in choice.values]


def open_choice_options(choice):
    if len(choice.values) != 0:
        options = []
        trait_type = get_choice_type(choice)
        if trait_type is not None:
            options.append('trait=%s()' % trait_type.__name__)
        value = choice.values[0][min(1, len(choice.values[0]) - 1)]
        return options + ['default_value=' + repr(value)]
    else:
        return []


def get_choice_type(choice):
    if len(choice.values) == 0:
        return None
    element_types = {
        bool: traits.Bool,
        int: traits.Int,
        float: traits.Float,
        str: traits.Str,
        six.text_type: traits.Str,
        list: traits.List,
        tuple: traits.List,
    }
    choice_types = [element_types[type(t[min(1, len(t) - 1)])]
                    for t in choice.values]
    choice0 = choice_types[0]
    if all([elem is choice0 for elem in choice_types[1:]]):
        return choice0
    # test compatible types, ie (int, float) -> float
    if all([elem in (traits.Int, traits.Float) for elem in choice_types]):
        return traits.Float
    return None


def get_openchoice_type(choice):
    trait_type = get_choice_type(choice)
    if trait_type is None:
        trait_type = traits.Str  # default to string
    return trait_type


def point3d_options(point):
    # note: not using traits.ListFloat because its constructor doesn't seem
    # to take parmeters into account (minlen, maxlen, value)
    return ['trait=Float()', 'minlen=3', 'maxlen=3', 'value=[0, 0, 0]']


def matrix_options(list_trait):
    return ['trait=List()']


def diskitem_type(diskitem):
    otype = None
    for format in diskitem.formats:
        f = neuroDiskItems.getFormat(format)
        if otype is None and f.fileOrDirectory() is neuroDiskItems.Directory:
            otype = traits.Directory
        elif f.fileOrDirectory() is not neuroDiskItems.Directory:
            otype = traits.File
            break
    if otype is None:
        otype = traits.File
    return otype


def diskitem_options(diskitem):
    extre = re.compile('^.*\|[^*]*\*(.*)$')
    exts = []
    options = []
    #formats = sorted(diskitem.formats)
    formats = diskitem.formats
    for format in formats:
        f = neuroDiskItems.getFormat(format)
        for pat in f.patterns.patterns:
            m = extre.match(pat.pattern)
            if m is not None and m.group(1) not in exts:
                exts.append(m.group(1))
    if len(exts) != 0:
        options.append('allowed_extensions=%s' % repr(exts))
    if isinstance(diskitem, WriteDiskItem):
        options.append('output=True')
    return options


param_types_table = \
    {
        neuroData.Boolean: traits.Bool,
        neuroData.String: traits.Str,
        neuroData.Number: traits.Float,
        neuroData.Float: traits.Float,
        neuroData.Integer: traits.Int,
        ReadDiskItem: (diskitem_type, diskitem_options),
        WriteDiskItem: (diskitem_type, diskitem_options),
        neuroData.Choice: (traits.Enum, choice_options),
        neuroData.OpenChoice: (get_openchoice_type, open_choice_options),
        neuroData.ListOf: traits.List,
        neuroData.Point3D: (traits.List, point3d_options),
        neuroData.Matrix: (traits.List, matrix_options),
    }


def capsul_param_type(axon_param):
    newtype = param_types_table.get(type(axon_param))
    paramoptions = []
    if newtype is None:
        print('write_process_signature: type', type(axon_param), 'not found')
        newtype = traits.Str
    if isinstance(newtype, tuple):
        paramoptions = newtype[1](axon_param)
        newtype = newtype[0]
    if not axon_param.mandatory:
        paramoptions.append('optional=True')
    if inspect.isfunction(newtype):
        # newtype is a function: call it to get the actual type
        newtype = newtype(axon_param)
    if hasattr(newtype, '__name__'):
        # take name of a type class
        newtype = newtype.__name__
    section = axon_param.getSectionTitleIfDefined()
    if section is not None:
        paramoptions.append('groups=("%s",)' % section)
    return newtype, paramoptions


[docs]def capsul_merged_param_type(axon_params): ''' get a "common" capsul parameter type for a list of axon parameters, typically to form a swith output from its possible inputs. the output allowed_extensions will be the union of input extensions (which may not always be OK). ''' ctype = None coptions = [] allowed_extensions = [] for axon_param in axon_params: newtype, paramoptions = capsul_param_type(axon_param) if ctype is None: ctype = newtype else: if ctype != newtype: print('warning: unmatching input types (for switch) %s and %s' % (ctype, newtype)) for opt in paramoptions: oname, oval = opt.split('=') oname = oname.strip() if oname == 'allowed_extensions': oval = eval(oval) for ext in oval: if ext not in allowed_extensions: allowed_extensions.append(ext) elif opt not in coptions: coptions.append(opt) if len(allowed_extensions) != 0: coptions.append('allowed_extensions=%s' % repr(allowed_extensions)) return ctype, coptions
def write_process_signature(p, out, buffered_lines, get_all_values=True): # write signature for name, param in six.iteritems(p.signature): newtype, paramoptions = capsul_param_type(param) out.write(u' self.add_trait(\'%s\', %s(%s))\n' % (name, newtype, ', '.join(paramoptions))) if get_all_values or not p.isDefault(name): value = getattr(p, name) if value is not None: buffered_lines['initialization'].append( ' self.%s = %s\n' % (name, repr(value))) # print('non-default value for %s in %s' % (name, p.name)) elif type(param) in (neuroData.Boolean, neuroData.Number, neuroData.Float, neuroData.Integer): # None as number is a forced optional value buffered_lines['initialization'].append( ' self.%s = %s\n' % (name, 'Undefined')) out.write(u'\n\n') def write_process_execution(p, out): axon_name = p.id() out.write(u''' def _run_process(self): from brainvisa import axon from brainvisa.configuration import neuroConfig import brainvisa.processes neuroConfig.gui = False neuroConfig.fastStart = True neuroConfig.logFileName = '' axon.initializeProcesses() kwargs = {} for name in self.user_traits(): value = getattr(self, name) if value is Undefined: continue if isinstance(self.trait(name).trait_type, File) and value != '' \ and value is not Undefined: kwargs[name] = value elif isinstance(self.trait(name).trait_type, List): kwargs[name] = list(value) else: kwargs[name] = value context = brainvisa.processes.defaultContext() context.runProcess('%s', **kwargs) ''' % axon_name) def write_process_definition(p, out, get_all_values=True): buffered_lines = {'initialization': []} write_process_signature(p, out, buffered_lines, get_all_values=get_all_values) write_buffered_lines(out, buffered_lines, sections=('initialization', )) write_process_execution(p, out) def str_to_name(s): s = s.replace(' ', '_') s = s.replace('(', '_') s = s.replace(')', '_') s = s.replace('\'', '_') s = s.replace('"', '_') s = s.replace(',', '_') s = s.replace('-', '_') s = s.replace('/', '_') s = s.replace('+', '_') s = s.replace('*', '_') return s
[docs]def make_module_name(procid, module_name_prefix=None, use_process_names={}, lowercase_modules=True): '''module + process class name, ex: morpho.morphologist.morphologist. Parameters ---------- procid: string Axon process id module_name_prefix: string (optional) base module name (ex: morpho). If not specified, no base module use_process_names: dict (optional) If specified, override some complete process names. Key is the axon ID, value is the full name. Ex: {'morphologist': 'morpho.morphologist.Morphologist'} ''' altname = use_process_names.get(procid) if altname: return altname if module_name_prefix is None: return '%s.%s' % (fix_case(procid, lowercase_modules), procid) else: return '%s.%s.%s' % (module_name_prefix, fix_case(procid, lowercase_modules), procid)
def use_weak_ref(obj): if obj is None: return None if type(obj) is weakref.ProxyType: return obj.__weakref__ if type(obj) is weakref.ReferenceType: return obj return weakref.ref(obj) def parse_param_link(pipeline, proc, param, linkdefs, weak_outputs=False): links = [] for dstproc, dstparam, mlink, unknown, force in linkdefs: dstproc = use_weak_ref(dstproc) # check if link is compatible if dstproc is None or dstparam is None or dstproc is proc: # intra-process links are dropped. continue srcsign = proc().signature[param] dstsign = dstproc().signature[dstparam] if type(srcsign) is not type(dstsign) \ and (not isinstance(srcsign, ReadDiskItem) or not isinstance(dstsign, ReadDiskItem)): # incompatible parameters types continue if isinstance(srcsign, ReadDiskItem): if srcsign.type.isA(dstsign.type.name) \ or dstsign.type.isA(srcsign.type.name): # compatible type if isinstance(dstsign, WriteDiskItem) \ or (not isinstance(srcsign, WriteDiskItem) and dstproc is use_weak_ref(pipeline)): # swap input/output this_weak_output = False if weak_outputs and proc is use_weak_ref(pipeline): this_weak_output = True if (dstproc, dstparam, proc, param) not in links: links.append((dstproc, dstparam, proc, param, this_weak_output)) else: this_weak_output = False if weak_outputs and dstproc is use_weak_ref(pipeline): this_weak_output = True if (proc, param, dstproc, dstparam) not in links: links.append((proc, param, dstproc, dstparam, this_weak_output)) else: # not DiskItems this_weak_output = False if weak_outputs and dstproc is use_weak_ref(pipeline): this_weak_output = True if (proc, param, dstproc, dstparam) not in links: links.append((proc, param, dstproc, dstparam, this_weak_output)) return links def parse_links(pipeline, proc, weak_outputs=False): links = [] proc = use_weak_ref(proc) for param, linkdefs in six.iteritems(proc()._links): links += parse_param_link( pipeline, proc, param, linkdefs, weak_outputs) return links def is_output(proc, param, verbose=False): if verbose: print('is_output', proc().name, param) print('selection?:', isinstance(proc(), procbv.SelectionExecutionNode)) if isinstance(proc(), procbv.SelectionExecutionNode): # SelectionExecutionNode nodes may be used for switch nodes. # They do not have parameters in the Axon API since they are not # processes. But the Capsul pipeline side (Switch node) has input # parameters which should be connected from children outputs, and # output parameters which should be exported. if (hasattr(proc(), 'switch_output') and proc().switch_output == param) \ or (not hasattr(proc(), 'switch_output') and param == 'switch_out'): return True else: return False signp = proc().signature.get(param) if signp is None: # non-exported parameter # print('internal') # parse sub_nodes if hasattr(proc(), 'executionNode'): en = proc().executionNode() for cn in en.childrenNames(): if param.startswith(cn + '_'): node = getattr(en, cn) sub_param = param[len(cn) + 1:] if hasattr(node, 'signature') \ and sub_param in node.signature: return isinstance(node.signature[sub_param], WriteDiskItem) # not found: guess it's an output (may be wrong) print('Warning: internal param %s.%s not foud. Assuming output' % (proc().name, param)) return True return isinstance(signp, WriteDiskItem) def converted_link(linkdef, links, pipeline, selfinparams, revinparams, selfoutparams, revoutparams, procmap): # selfinparams: outputs which are in self (pipeline) and are thus inputs # selfoutparams: inputs which are in self (pipeline) and are thus outpupts # revinparams: input params which should be translated to pipeline # revoutparams: dest params which should be translated to pipeline # find exported source/dest weak_link = linkdef[4] real_source = find_param_in_parent(linkdef[0], linkdef[1], procmap) real_dest = find_param_in_parent(linkdef[2], linkdef[3], procmap) if real_source[0] is None or real_dest[0] is None: print('Warning, missing link info for:', linkdef[0]().name, ',', linkdef[1], ' ->', linkdef[2]().name, ',', linkdef[3]) return None linkdef = (real_source[0], real_source[2], real_dest[0], real_dest[2], weak_link) pipeline = use_weak_ref(pipeline) if linkdef[2] is pipeline and linkdef[3] in selfinparams: # output in pipeline inputs: invert link linkdef = (linkdef[2], linkdef[3], linkdef[0], linkdef[1], weak_link) if linkdef[:4] in links: return None if linkdef[0] is pipeline and linkdef[1] in selfoutparams: # source in pipeline outputs: either needs translation, or inversion if is_output(linkdef[2], linkdef[3]): # dest is an output: needs inversion linkdef = (linkdef[2], linkdef[3], linkdef[0], linkdef[1], weak_link) else: altp = selfoutparams.get(linkdef[1]) if altp is None: print('** warning, probably bad link:', linkdef[0]().name, ',', linkdef[1], ' ->', linkdef[2]().name, ',', linkdef[3]) print(revoutparams) return None linkdef = (altp[0], altp[1], linkdef[2], linkdef[3], weak_link) if linkdef[:4] in links: return None if linkdef[0] is not pipeline \ and (linkdef[0], linkdef[1]) in revinparams: # source has an equivalent in exported inputs altp = revinparams[(linkdef[0], linkdef[1])] linkdef = (pipeline, altp, linkdef[2], linkdef[3], weak_link) if linkdef[2] is not pipeline \ and (linkdef[2], linkdef[3]) in revoutparams: # dest has an equivalent in exported outputs altp = revoutparams[(linkdef[2], linkdef[3])] linkdef = (linkdef[0], linkdef[1], pipeline, altp, weak_link) if linkdef[:4] in links: return None if linkdef[2] is not pipeline \ and (linkdef[2], linkdef[3]) in revinparams: # dest has an equivalent in exported inputs altp = revinparams[(linkdef[2], linkdef[3])] linkdef = (pipeline, altp, linkdef[0], linkdef[1], weak_link) if linkdef[:4] in links: return None return linkdef def export_output(buffered_lines, src, sname, sparam, p, dparam, selfoutparams, revoutparams, processed_links, selfouttraits, weak_outputs=False): if dparam in selfouttraits: # a trait has been manually declared declare_output_trait(buffered_lines, src, sname, sparam, p, dparam, selfoutparams, revoutparams, processed_links) else: # global output param in pipeline signature buffered_lines['exports'].append(' # export output parameter\n') if weak_outputs: buffered_lines['exports'].append( ' self.export_parameter(\'%s\', \'%s\', \'%s\', ' 'weak_link=True)\n' % (sname, sparam, dparam)) else: buffered_lines['exports'].append( ' self.export_parameter(\'%s\', \'%s\', \'%s\')\n' % (sname, sparam, dparam)) selfoutparams[dparam] = (src, sparam) revoutparams[(src, sparam)] = dparam processed_links.add((src, sparam, use_weak_ref(p), dparam)) processed_links.add((use_weak_ref(p), dparam, src, sparam)) def declare_output_trait(buffered_lines, src, sname, sparam, p, dparam, selfoutparams, revoutparams, processed_links): # global output param in pipeline signature, as a trait selfoutparams[dparam] = (src, sparam) revoutparams[(src, sparam)] = dparam processed_links.add((src, sparam, use_weak_ref(p), dparam)) processed_links.add((use_weak_ref(p), dparam, src, sparam)) def export_input(buffered_lines, dst, dname, dparam, p, sparam, selfinparams, revinparams, processed_links): # global input param in pipeline signature buffered_lines['exports'].append(' # export input parameter\n') buffered_lines['exports'].append( ' self.export_parameter(\'%s\', \'%s\', \'%s\')\n' % (dname, dparam, sparam)) selfinparams[sparam] = (dst, dparam) revinparams[(dst, dparam)] = sparam processed_links.add((use_weak_ref(p), sparam, dst, dparam)) processed_links.add((dst, dparam, use_weak_ref(p), sparam)) def make_node_name(name, nodenames, parents): name = str_to_name(name) full_name = '.'.join((parents or []) + [name]) if full_name in nodenames: nodenames[full_name] += 1 return '%s_%d' % (name, nodenames[full_name]) else: nodenames[full_name] = 0 return name def is_linked_to_parent(proc, param, parent): # get links from proc.param if isinstance(proc(), procbv.Process): linkdefs = proc()._links.get(param) if linkdefs: for dstproc, dstparam, mlink, unknown, force in linkdefs: if use_weak_ref(dstproc) == parent: return dstparam # get links to parent for pparam, linkdefs in six.iteritems(parent()._links): for srcproc, srcparam, mlink, unknown, force in linkdefs: if use_weak_ref(srcproc) == proc and srcparam == param: return pparam return None def find_param_in_parent(proc, param, procmap): # parse all nodes since there is no notion of parent verbose = False # debug flag - TODO: remove it when all is OK pname, exported = procmap.get(proc, (None, None)) if exported: # exported node: direct, OK if verbose: print(' find_param_in_parent:', proc().name, '/', param, ': direct export') return (proc, pname, param) last = (proc, param) allnotfound = False if verbose: print(' find_param_in_parent:', proc().name, '/', param, ':', pname) while not allnotfound: if verbose: print(' try as child:', last[0]().name, '/', last[1]) if last[0] not in procmap: # probably an external link to a parent pipeline if verbose: print(' Warning: link to external node in "parent" ' 'pipeline:', last[0]().name) return (None, None, None) child_name = procmap[last[0]][0] # look for parent enode for new_proc, (new_node_name, exported) in six.iteritems(procmap): if isinstance(new_proc(), procbv.Process): new_node = use_weak_ref(new_proc().executionNode()) if new_node is None: # leaf: not a possible parent continue else: new_node = new_proc children = set() for n in new_node().children(): if isinstance(n, procbv.ProcessExecutionNode): children.add(n._process) else: children.add(n) if last[0]() in children: if verbose: print(' test parent:', new_node_name) new_pname = '_'.join((child_name, last[1])) if exported: parent_pname = last[1] if verbose: print(' find_param_in_parent:', proc().name, '/', param) print(' ** found:', new_proc().name, '/', new_pname) # now check if it is an exported param in the sub-pipeline opname = is_linked_to_parent(proc, param, new_proc) if opname is not None: # then return the exported parent one # parent_pname = opname new_pname = opname if verbose: print(' parent param translated:', new_pname) elif verbose: print(' not linked to parent: take:', new_pname) return (new_proc, new_node_name, new_pname) last = (new_proc, new_pname) break else: # loop through the end of procmap allnotfound = True # not found print('Warning: find_param_in_parent: NOT FOUND') print(' was:', proc().name, '/', param) return (None, None, None) def write_pipeline_links(p, buffered_lines, procmap, links, processed_links, selfoutparams, revoutparams, selfouttraits): # parse and set pipeline links selfinparams = {} revinparams = {} for link in links: link = converted_link(link, processed_links, p, selfinparams, revinparams, selfoutparams, revoutparams, procmap) if link is None: continue src, sparam, dst, dparam, weak_link = link sname, sexported = procmap.get(src, (None, None)) if sname is None: print('warning, src process', src().name, 'not found in pipeline.') # print('procmap:', [ k[0]().name for k in procmap ]) continue # skip this one dname, dexported = procmap.get(dst, (None, None)) if dname is None: print('warning, dst process', dst().name, 'not found in pipeline.') continue # skip this one spname = sparam if sname: spname = '%s.%s' % (sname, sparam) dpname = dparam if dname: dpname = '%s.%s' % (dname, dparam) if sname == '' and sparam not in selfinparams: export_input(buffered_lines, dst, dname, dparam, p, sparam, selfinparams, revinparams, processed_links) elif dname == '' and dparam not in selfoutparams: export_output(buffered_lines, src, sname, sparam, p, dparam, selfoutparams, revoutparams, processed_links, selfouttraits) else: if dname == '' and dparam in selfinparams: # swap input/output tmp = spname spname = dpname dpname = tmp if sname == '' and sparam in selfoutparams: spname = selfoutparams[sparam][1] if dname != '' and (dst, dpname) in revoutparams: dpname = revoutparams[(dst, dpname)] if spname == dpname: continue # check for non-exported links with same IO status if sname != '' and dname != '' \ and is_output(src, sparam) == is_output(dst, dparam): print('Warning: write_pipeline_links, sname: %s, ' 'sparam: %s, dname: %s, dparam: %s: both same IO type:' % (sname, sparam, dname, dparam), is_output(src, sparam)) if is_output(src, sparam): # both outputs: export 1st sparam2 = sname + '_' + sparam if sparam2 in selfoutparams or sparam2 in selfinparams: # avoid duplicate name sparam2 = sparam2 + '2' export_output(buffered_lines, src, sname, sparam, p, sparam2, selfoutparams, revoutparams, processed_links, selfouttraits) # and link 2nd to this exported output (and switch link) src = dst sparam = dparam spname = dpname dst = use_weak_ref(p) dparam = sparam2 dpname = sparam2 else: # both inputs: export 1st sparam2 = sname + '_' + sparam if sparam2 in selfinparams or sparam2 in selfoutparams: # duplicate name sparam2 = sparam + '2' export_input(buffered_lines, src, sname, sparam, p, sparam2, selfinparams, revinparams, processed_links) # and link 2nd to this exported input sparam = sparam2 spname = sparam2 if weak_link: buffered_lines['links'].append( ' self.add_link(\'%s->%s\', weak_link=True)\n' % (spname, dpname)) else: buffered_lines['links'].append( ' self.add_link(\'%s->%s\')\n' % (spname, dpname)) processed_links.add((src, sparam, dst, dparam)) processed_links.add((dst, dparam, src, sparam)) def write_switch(enode, buffered_lines, nodenames, links, p, processed_links, selfoutparams, revoutparams, self_out_traits, exported, parent_names, enode_name=None, weak_outputs=False): if enode_name is None: enode_name = 'select_' + enode.name() nodename = make_node_name(enode_name, nodenames, parent_names) input_names = [input_name for input_name in enode.childrenNames()] have_optional = any([getattr(enode.child(x), 'skip_invalid', False) for x in input_names]) output_names = ['switch_out'] if hasattr(enode, 'switch_output'): output_names = enode.switch_output if isinstance(output_names, str) \ or isinstance(output_names, six.text_type): output_names = [output_names] # have a list elif exported: buffered_lines['switches'].append( ' # warning, the switch output trait should be ' 'renamed to a more comprehensive name\n') if exported and not hasattr(enode, 'selection_outputs'): buffered_lines['switches'].append( ' # warning, input items should be connected to ' 'adequate output items in each subprocess in the switch.\n') if exported: # postpone add_switch line after we have determined its params types for output_name in output_names: export_output(buffered_lines, use_weak_ref(enode), nodename, output_name, use_weak_ref( p), output_name, selfoutparams, revoutparams, processed_links, self_out_traits, weak_outputs) out_types = {} if hasattr(enode, 'selection_outputs'): # connect children outputs to the switch sel_out = enode.selection_outputs sw_options = '' for link_src, link_pars in zip(input_names, sel_out): if not isinstance(link_pars, list) \ and not isinstance(link_pars, tuple): link_pars = [link_pars] for link_par, output_name in zip(link_pars, output_names): if link_par is None: # not connected input_name = '_switch_'.join((link_src, output_name)) buffered_lines['exports'].append( ' self.do_not_export.add((\'%s\', \'%s\'))\n' % (nodename, input_name)) continue if link_par.startswith('/'): # absolute name, not in child src = enode link_par = link_par[1:] else: src = enode.child(link_src) link_par_split = link_par.split('.') if len(link_par_split) == 1: src = use_weak_ref(src._parameterized) else: while len(link_par_split) > 1: srcname_short = link_par_split.pop(0) src = src.child(srcname_short) src = use_weak_ref(src._parameterized) link_par = link_par_split[-1] # in switches, input params are the concatenation of declared # input params and the output "group" name input_name = '_switch_'.join((link_src, output_name)) src_par = src().signature[link_par] out_types.setdefault(output_name, []).append(src_par) # input_name = link_src # has changed again in Switch... links.append((src, link_par, use_weak_ref(enode), input_name, weak_outputs)) processed_links.add( (src, link_par, use_weak_ref(p), output_name)) processed_links.add( (use_weak_ref(p), output_name, src, link_par)) processed_links.add( (src, link_par, use_weak_ref(p), input_name)) processed_links.add( (use_weak_ref(p), input_name, src, link_par)) if exported: for out_name in out_types.keys(): src = out_types[out_name] out_types[out_name] = capsul_merged_param_type(src) out_types_list = [] for out in output_names: if out in out_types: ptype, options = out_types[out] todel = None for opt in options: if opt.startswith('output='): todel = opt break if todel: options.remove(todel) out_types_list.append('%s(%s)' % (ptype, ', '.join(options))) else: out_types[output_name] = (traits.Any, []) out_types_list.append('Any()') if have_optional: sw_options += ', opt_nodes=True' input_names = repr(input_names) buffered_lines['switches'].append( ' self.add_switch(\'%s\', %s, %s, output_types=[%s]%s)\n' % (nodename, input_names, repr(output_names), ', '.join(out_types_list), sw_options)) # select the right child for sub_node_name in enode.childrenNames(): node = enode.child(sub_node_name) if node.isSelected(): buffered_lines['initialization'].append( ' if \'%s\' in self.nodes:\n' % sub_node_name) buffered_lines['initialization'].append( ' self.nodes[\'%s\'].switch = \'%s\'\n' % (nodename, sub_node_name)) return nodename def reorder_exports(buffered_lines, p): old_lines = buffered_lines['exports'] reordered = [0] * len(old_lines) delayed = False linkre = re.compile( '^ *self.export_parameter\(\'([^,]+)\', \'([^\']+)\'(, \'' '([^\']+)\')(, weak_link=True)?\)$') omax = 0 for i, line in enumerate(old_lines): if line.startswith(' #'): delayed = True reordered[i] = -1 else: m = linkre.match(line) if not m: reordered[i] = -1 else: if len(m.groups()) >= 4: out_name = m.group(4) else: out_name = m.group(2) if out_name in p.signature: sign_ind = p.signature.sortedKeys.index(out_name) reordered[i] = sign_ind * 2 + 1 if reordered[i] > omax: omax = reordered[i] if delayed: # move comment line just before its command line reordered[i - 1] = reordered[i] - 1 else: reordered[i] = -1 delayed = False omax += 1 revorder = {} for i in six.moves.xrange(len(reordered)): # move non-recognized lines at the end if reordered[i] < 0: reordered[i] = omax omax += 1 # inverse map revorder[reordered[i]] = i new_lines = [] for i in sorted(revorder.keys()): j = revorder[i] new_lines.append(old_lines[j]) buffered_lines['exports'] = new_lines def write_buffered_lines(out, buffered_lines, sections=None): if sections is None: sections = ('nodes', 'switches', 'exports', 'links', 'initialization') for section in sections: if buffered_lines.get(section): out.write(u' # %s section\n' % section) for line in buffered_lines[section]: out.write(six.ensure_text(line)) out.write(u'\n')
[docs]def write_pipeline_definition(p, out, parse_subpipelines=False, get_all_values=False, module_name_prefix=None, use_process_names={}, lowercase_modules=True): '''Write a pipeline structure in the out file, and links between pipeline nodes. If parse_subpipelines is set, the pipeline structure inside sub-pipelines which are already Axon processes is also parsed (this is generally unneeded). ''' # writing will be buffered so as to allow reordering buffered_lines = {'nodes': [], 'switches': [], 'exports': [], 'links': [], 'initialization': []} out.write(u'\n\n') out.write(u' def pipeline_definition(self):\n') # enodes list: each element is a 6-tuple: # axon_node, name, exported, weak_outputs, parents, parentnode enodes = [(p.executionNode(), None, True, False, None, None)] links = parse_links(p, p) processed_links = set() # procmap: weak_ref(process) -> (node_name, exported) # non-exported nodes are not built as nodes, but may be used by links procmap = {weakref.ref(p): ('', True)} nodenames = {} selfoutparams = {} revoutparams = {} self_out_traits = [] while enodes: enode, enode_name, exported, weak_outputs, parents, parentnode \ = enodes.pop(0) nodename = None if isinstance(enode, procbv.ProcessExecutionNode) \ and (len(list(enode.children())) == 0 or not parse_subpipelines): if enode_name is None: enode_name = enode.name() nodename = make_node_name(enode_name, nodenames, parents) proc = enode._process if isinstance(proc, CapsulProcess): # we have actually a wrapped Capsul process: remove the # wrapping layer and use it directly moduleprocid = '.'.join( [proc.get_capsul_process().__module__, proc.get_capsul_process().__class__.__name__]) else: procid = proc.id() moduleprocid = make_module_name(procid, module_name_prefix, use_process_names, lowercase_modules) procmap[use_weak_ref(proc)] = (nodename, exported) if exported: skip_invalid = getattr(enode, 'skip_invalid', False) if skip_invalid: opt = ', skip_invalid=True' else: opt = '' buffered_lines['nodes'].append( ' self.add_process(\'%s\', \'%s\'%s)\n' % (nodename, moduleprocid, opt)) if weak_outputs: ind = '' if skip_invalid: buffered_lines['nodes'].append( ' if \'%s\' in self.nodes:\n' % nodename) ind = ' ' buffered_lines['nodes'].append( ' %sself.nodes[\'%s\']._weak_outputs = True\n' % (ind, nodename)) links += parse_links(p, proc, weak_outputs) for param_name in six.iterkeys(proc.signature): if not proc.isDefault(param_name): value = getattr(proc, param_name) buffered_lines['initialization'].append( ' self.nodes[\'%s\'].%s = %s\n' % (nodename, param_name, repr(value))) new_parents = (parents or []) + [nodename] enodes += [(enode.child(name), name, False, weak_outputs, new_parents, enode) for name in enode.childrenNames()] # parse process signature, look for non-default values else: if isinstance(enode, procbv.SelectionExecutionNode) and exported: # FIXME: BUG: if not exported, should we rebuild switch params # list, and doing this, export again internal params ? nodename = write_switch(enode, buffered_lines, nodenames, links, p, processed_links, selfoutparams, revoutparams, self_out_traits, exported, parents, enode_name, weak_outputs) procmap[use_weak_ref(enode)] = (nodename, exported) # children should have weak outputs so that they can be # deactivated by the switch weak_outputs = True new_parents = parents enodes += [(enode.child(name), name, exported, weak_outputs, new_parents, enode) for name in enode.childrenNames()] if nodename and not enode.isSelected() and exported: # FIXME: the exported flag filters out sub-nodes of sub-pipelines # so it is not possible this way to unselect a node inside a # sub-pipeline. # To do so we would have to remove the exported test here, # but it would cause other problems, such as sub-nodes naming, # which should not follow the "global" rule make_node_name(). if parentnode \ and isinstance(parentnode, procbv.SelectionExecutionNode): continue # switches have already their selection activation if parents: sub_node_address = '.'.join( ['nodes[\'%s\'].process' % sub_name for sub_name in parents]) buffered_lines['initialization'].append( ' self.%s.nodes_activation.%s = False\n' % (sub_node_address, nodename)) else: buffered_lines['initialization'].append( ' self.nodes_activation.%s = False\n' % nodename) write_pipeline_links(p, buffered_lines, procmap, links, processed_links, selfoutparams, revoutparams, self_out_traits) do_not_export = getattr(p, 'capsul_do_not_export', set()) if do_not_export: buffered_lines['exports'].append( ' self.do_not_export.update(%s)\n' % repr(do_not_export)) # try to respect pipeline main parameters ordering reorder_exports(buffered_lines, p) # flush the write buffer write_buffered_lines(out, buffered_lines, sections=('nodes', 'switches', 'exports', 'links')) # remove this when there is a more convenient method in Pipeline # out.write( # ''' # export orphan output parameters # self.export_internal_parameters() #''') # buffered_lines['initialization'] += [ " # export orphan parameters\n", " if not hasattr(self, '_autoexport_nodes_parameters') \\\n", " or self._autoexport_nodes_parameters:\n", " self.autoexport_nodes_parameters()\n"] # flush the init section buffer write_buffered_lines(out, buffered_lines, sections=('initialization', )) if all([not buffered_lines.get(section) for section in ('nodes', 'exports', 'links', 'initialization')]): out.write(u' pass\n') out.write( u''' def autoexport_nodes_parameters(self): \'\'\'export orphan and internal output parameters\'\'\' for node_name, node in six.iteritems(self.nodes): if node_name == '': continue # skip main node if hasattr(node, '_weak_outputs'): weak_outputs = node._weak_outputs else: weak_outputs = False for parameter_name, plug in node.plugs.items(): if parameter_name in ('nodes_activation', 'selection_changed'): continue if (node_name, parameter_name) not in self.do_not_export: if not plug.output and plug.links_from: continue weak_link = False if plug.output: if plug.links_to: # or plug.links_from: # some links exist if [True for x in plug.links_to \\ if x[0]=='' or isinstance(x[2], Switch)] \\ or \\ [True for x in plug.links_from \\ if x[0]=='' or isinstance(x[2], Switch)]: # a link to the main pipeline or to a switch # already exists continue # links exist but not to the pipeline: export # weak_link = True if weak_outputs and plug.output: weak_link = True self.export_parameter(node_name, parameter_name, '_'.join((node_name, parameter_name)), weak_link=weak_link, is_optional=True) ''')
# ----
[docs]def axon_to_capsul(proc, outfile, module_name_prefix=None, parse_subpipelines=False, get_all_values=True, capsul_process_name=None, use_process_names={}, lowercase_modules=True): '''Converts an Axon process or pipeline into a CAPSUL process or pipeline. The output is a file, named with the outfile parameter. Parameters ---------- proc: axon process ID (string) or instance process to be converted outfile: filename output file name for the converted process in CAPSUL API module_name_prefix: module path (string) (optional) if specified, this prefix will be prepended to processes module names parse_subpipelines: bool (optional) if True, sub-pipelines internals will be extracted in the current one Experimental. Expect strange effects when you use it. Default is False. get_all_values: bool (optional) if True, the current values of the input process instance will all be reported to the output process. Default is True. capsul_process_name: string (optional) if specified, name of the converted Capsul process. Otherwise use the same name as the Axon process ID. use_process_names: dict string:string (optional) names mapping table between Axon process IDs and Capsul process names when used as pipeline nodes. Default: same as Axon IDs. ''' # try using autopep8 try: import autopep8 except ImportError: autopep8 = None if isinstance(proc, procbv.Process): procid = proc.id() p = proc else: procid = proc p = procbv.getProcessInstance(procid, ignoreValidation=True) if capsul_process_name is None: capsul_process_name = procid if p.executionNode(): proctype = Pipeline else: proctype = Process if autopep8 is not None: # write in a string buffer out = six.StringIO() else: out = io.open(outfile, 'w', encoding='utf-8') out.write(u'''# -*- coding: utf-8 -*- try: from traits.api import File, Directory, Float, Int, Bool, Enum, Str, \\ List, Any, Undefined except ImportError: from enthought.traits.api import File, Directory, Float, Int, Bool, Enum, \\ Str, List, Any, Undefined from capsul.api import Process import six ''') if proctype is Pipeline: out.write(u'''from capsul.api import Pipeline from capsul.api import Switch ''') out.write(u''' class ''') out.write(six.ensure_text(capsul_process_name) + u'(%s):\n' % proctype.__name__) if proctype is Pipeline: out.write(u''' def __init__(self, autoexport_nodes_parameters=True, **kwargs): self._autoexport_nodes_parameters = autoexport_nodes_parameters super(%s, self).__init__(False, **kwargs) del self._autoexport_nodes_parameters # if autoexport_nodes_parameters: # self.autoexport_nodes_parameters()\n''' % capsul_process_name) write_pipeline_definition(p, out, parse_subpipelines=parse_subpipelines, get_all_values=get_all_values, module_name_prefix=module_name_prefix, use_process_names=use_process_names, lowercase_modules=lowercase_modules) else: out.write(u' def __init__(self, **kwargs):\n') out.write(u' super(%s, self).__init__()\n' % capsul_process_name) write_process_definition(p, out, get_all_values=get_all_values) if autopep8 is not None: # use autopep8 and save to an actual file if [int(x) for x in autopep8.__version__.split('.')] >= [1, 0, 0]: pretty_code = autopep8.fix_code(out.getvalue()) else: # old versions of autopep8 pretty_code = autopep8.fix_string(out.getvalue()) out = io.open(outfile, 'w', encoding='utf-8') out.write(pretty_code)
[docs]def get_subprocesses(procid): '''Recursive list of children processes. Parameters ---------- procid: str or brainvisa Process process (pipeline) to parse Returns ------- set of processes found inside procid ''' subprocs = set() if not isinstance(procid, procbv.Process): proc = procbv.getProcessInstance(procid, ignoreValidation=True) else: proc = procid enode = proc.executionNode() if enode is None: return subprocs nodes = list(enode.children()) while nodes: node = nodes.pop(0) if isinstance(node, procbv.ProcessExecutionNode): subprocs.add(node._process) nodes += list(node.children()) return subprocs
[docs]def get_process_id(proc): '''ID of a process or ID''' if isinstance(proc, procbv.Process): return proc.id() return proc
def fix_case(module_name, lowercase_modules): if lowercase_modules: return module_name.lower() return module_name def module_filename(process_name, lowercase_modules, gen_process_names): return fix_case(gen_process_names.get(process_name, process_name), lowercase_modules) def axon_to_capsul_main(argv): parser = OptionParser('Convert an Axon process into a Capsul ' 'process.\nAlso works for pipeline structures.\n' 'Parameters links for completion are not preserved (yet), but ' 'inter-process links in pipelines are (normally) rebuilt.') parser.add_option('-p', '--process', dest='process', action='append', default=[], help='input process ID. Ex: NobiasHistoAnalysis. Several -p options ' 'are allowed and should each correspond to a -o option.') parser.add_option('-o', '--output', dest='output', metavar='FILE', action='append', default=[], help='output .py file for the converted process code') parser.add_option('-n', '--name', dest='name', action='append', default=[], help='name of converted Capsul processes. Default: same as Axon. ' 'Individual processes may be renamed. The syntax is ' 'axon_name:capsul_name, ex: "-n BiasCorrection:bias_correction". ' 'Several -n options may be specified') parser.add_option('-m', '--module', dest='module', help='module name used as namespace to get the sub-processes in a ' 'pipeline') parser.add_option('-u', '--use', dest='use_proc', action='append', default=[], help='names of processes to use in pipeline nodes. As for -n ' 'option, the syntax is axon_name:capsul_name. But this table is not ' 'used when generating a Capsul process class name, but only when ' 'using it to get a process inside a pipeline. The capsul name is ' 'the full module + class name, ex: morpho.morphologist.Morphologist') parser.add_option('-r', '--recursive_sub', dest='parse_subpipelines', action='store_true', default=False, help='recursively parse sub-pipelines of a pipeline. This is mostly ' 'a debugging feature, since it is generally not needed because ' 'sub-pipelines are processes and can be converted and used ' 'directly. Moreover with this option, pipeline processes are not ' 'exported as themselves, but may contain parameters which will not ' 'be exported and may cause missing or broken links.') parser.add_option('-s', '--subprocess', dest='subprocess', action='store_true', default=False, help='automatically convert sub-processes of a pipeline, using the ' 'process IDs as both class name and output file names. Names ' 'conversion through the -u option applies. This option mainly ' 'avoids to specify all pipeline processes via series of -p/-o ' 'parameters. Additional sub-processes specified through -p/-o may ' 'replace them.') parser.add_option('-l', '--lowercase_modules', dest='lowercase_modules', action='store_true', default=True, help='convert process/pipeline classes modules names to lowercase. ' 'Used with the -s option. Default=True') options, args = parser.parse_args(argv) if len(args) != 0: parser.print_help() sys.exit(1) if len(options.process) != len(options.output): raise ValueError( 'There should be the same number of -p options and -o options') # processes.fastStart = True from brainvisa.configuration import neuroConfig neuroConfig.ignoreValidation = True processes.initializeProcesses() lowercase_modules = options.lowercase_modules gen_process_names = dict([(axon, capsul) for (axon, capsul) in [name.split(':') for name in options.name]]) use_process_names = dict( [(axon, make_module_name(capsul, options.module, {}, lowercase_modules)) for axon, capsul in six.iteritems(gen_process_names)]) # print('converted proc names:', use_process_names) use_process_names.update(dict([(axon, capsul) for (axon, capsul) in [name.split(':') for name in options.use_proc]])) # print('use_process_names:', use_process_names) done_processes = set() todo = list(zip([procbv.getProcessInstance(p, ignoreValidation=True) for p in options.process], options.output)) if options.subprocess: added_processes = [] for proc, outfile in todo: added_processes += get_subprocesses(proc) todo += list(zip(list(added_processes), [module_filename(p.id(), lowercase_modules, gen_process_names) + '.py' for p in added_processes])) todo = set(todo) # remove duplicates for proc, outfile in todo: if isinstance(proc, CapsulProcess): # already a capsul process continue procid = get_process_id(proc) # print('Process:', procid, '->', gen_process_names.get(procid), '\n') if procid in done_processes: continue done_processes.add(procid) proc = axon_to_capsul(proc, outfile, module_name_prefix=options.module, parse_subpipelines=options.parse_subpipelines, get_all_values=True, capsul_process_name=gen_process_names.get( procid), use_process_names=use_process_names, lowercase_modules=lowercase_modules) if __name__ == '__main__': axon_to_capsul_main(sys.argv[1:])