Source code for brainvisa.processes

# -*- coding: utf-8 -*-
#  This software and supporting documentation are distributed by
#      Institut Federatif de Recherche 49
#      CEA/NeuroSpin, Batiment 145,
#      91191 Gif-sur-Yvette cedex
#      France
#
# This software is governed by the CeCILL license version 2 under
# French law and abiding by the rules of distribution of free software.
# You can  use, modify and/or redistribute the software under the
# terms of the CeCILL license version 2 as circulated by CEA, CNRS
# and INRIA at the following URL "http://www.cecill.info".
#
# As a counterpart to the access to the source code and  rights to copy,
# modify and redistribute granted by the license, users are provided only
# with a limited warranty  and the software's author,  the holder of the
# economic rights,  and the successive licensors  have only  limited
# liability.
#
# In this respect, the user's attention is drawn to the risks associated
# with loading,  using,  modifying and/or developing or reproducing the
# software by the user in light of its specific status of free software,
# that may mean  that it is complicated to manipulate,  and  that  also
# therefore means  that it is reserved for developers  and  experienced
# professionals having in-depth computer knowledge. Users are therefore
# encouraged to load and test the software's suitability as regards their
# requirements in conditions enabling the security of their systems and/or
# data to be ensured and,  more generally, to use and operate it in the
# same conditions as regards security.
#
# The fact that you are presently reading this means that you have had
# knowledge of the CeCILL license version 2 and that you accept its terms.
"""
This module contains classes defining Brainvisa **processes and pipelines**.

The main class in this module is :py:class:`Process`. It is the base class for all Brainvisa processes. It inherits from the class :py:class:`Parameterized` that defines an object with a list of parameters that we call a signature.
When the processes are loaded at Brainvisa startup, information about them is stored in :py:class:`ProcessInfo` objects.

Several functions are available in this module to get information about the processes or to get instances of the processes:
  * :py:func:`getProcessInfo`
  * :py:func:`allProcessesInfo`
  * :py:func:`getProcess`
  * :py:func:`getProcessInstance`
  * :py:func:`getProcessInstanceFromProcessEvent`
  * :py:func:`getProcessFromExecutionNode`
  * :py:func:`getConverter`
  * :py:func:`getConvertersTo`
  * :py:func:`getConvertersFrom`
  * :py:func:`getViewer`
  * :py:func:`runViewer`
  * :py:func:`getDataEditor`
  * :py:func:`getImporter`
To modify, the lists of processes, several functions are also available:
  * :py:func:`addProcessInfo`
  * :py:func:`readProcess`
  * :py:func:`readProcesses`


A pipeline is defined as a specific process that has an execution node that describes the pipeline structure. The base class for execution nodes is :py:class:`ExecutionNode`. This class is specialized into several other classes, defining different types of pipelines:
  * :py:class:`ProcessExecutionNode`: only one process
  * :py:class:`SerialExecutionNode`: a list of execution nodes that have to be executed serially.
  * :py:class:`ParallelExecutionNode`: a list of execution nodes that can be executed in parallel.
  * :py:class:`SelectionExecutionNode`: a choice between several execution nodes.

Specialized Process classes that use the different types of execution nodes also exist:
  * :py:class:`IterationProcess`: an iteration of a process on a set of data. Uses a :py:class:`ParallelExecutionNode`.
  * :py:class:`ListOfIterationProcess`
  * :py:class:`DistributedProcess`: a pipeline that have a :py:class:`ParallelExecutionNode`
  * :py:class:`SelectionProcess`: a pipeline that have a :py:class:`SelectionExecutionNode`.

As processes can be run in different contexts, an object representing this context is passed as a parameter in the processes execution function. This object is an intance of the class :py:class:`ExecutionContext`. A default context associated to the application also exists, to get it use the function :py:func:`defaultContext`.

After loading, Brainvisa processes are stored in an object :py:class:`ProcessTrees` that represents the processes organization in toolboxes and categories. The function :py:func:`updatedMainProcessTree` creates this object if it doesn't exist yet and to returns it.

:Inheritance diagram:

.. inheritance-diagram:: Parameterized Process ExecutionNode SerialExecutionNode ProcessExecutionNode SelectionExecutionNode ParallelExecutionNode ProcessInfo ExecutionContext IterationProcess SelectionProcess DistributedProcess ListOfIterationProcess ProcessTree ProcessTrees


:Classes:

.. autoclass:: Parameterized
  :show-inheritance:

.. autoclass:: Process
  :show-inheritance:

.. autoclass:: ProcessInfo
  :members:
  :show-inheritance:

.. autoclass:: ExecutionNode
  :members:
  :show-inheritance:

.. autoclass:: ProcessExecutionNode
  :members:
  :show-inheritance:

.. autoclass:: SerialExecutionNode
  :members:
  :show-inheritance:

.. autoclass:: ParallelExecutionNode
  :members:
  :show-inheritance:

.. autoclass:: SelectionExecutionNode
  :members:
  :show-inheritance:

.. autoclass:: IterationProcess
  :members:
  :show-inheritance:

.. autoclass:: ListOfIterationProcess
  :members:
  :show-inheritance:

.. autoclass:: DistributedProcess
  :members:
  :show-inheritance:

.. autoclass:: SelectionProcess
  :members:
  :show-inheritance:

.. autoclass:: ExecutionContext
  :members:
  :show-inheritance:

.. autoclass:: ProcessTree
  :members:

.. autoclass:: ProcessTrees
  :members:
  :show-inheritance:

:Functions:

.. autofunction:: getProcessInfo
.. autofunction:: allProcessesInfo
.. autofunction:: getProcess
.. autofunction:: getProcessInstance
.. autofunction:: getProcessInstanceFromProcessEvent
.. autofunction:: getProcessFromExecutionNode
.. autofunction:: getConverter
.. autofunction:: getConvertersTo
.. autofunction:: getConvertersFrom
.. autofunction:: getViewer
.. autofunction:: runViewer
.. autofunction:: getDataEditor
.. autofunction:: getImporter
.. autofunction:: addProcessInfo
.. autofunction:: readProcess
.. autofunction:: readProcesses
.. autofunction:: updatedMainProcessTree
.. autofunction:: allProcessesTree
.. autofunction:: updateProcesses
.. autofunction:: mainThread
.. autofunction:: defaultContext
.. autofunction:: initializeProcesses
.. autofunction:: cleanupProcesses

"""
from __future__ import print_function

from __future__ import absolute_import
from six.moves import range
from six.moves import zip
__docformat__ = 'restructuredtext en'

# Be careful, it is necessary to initialize
# subprocess with subprocess32 when it is possible because of known
# issues in subprocess module that can lead to lock in subprocess run
import soma.subprocess

import traceback
import threading
import formatter
import operator
import inspect
import signal
import shutil
import imp
import types
import copy
import weakref
import string
import distutils.spawn
import os
import errno
import time
import calendar
import tempfile

from soma.sorted_dictionary import SortedDictionary
from soma.functiontools import numberOfParameterRange, hasParameter, partial
from soma.minf.api import readMinf, writeMinf, createMinfWriter, iterateMinf, minfFormat
from soma.minf.xhtml import XHTML
from soma.minf.xml_tags import xhtmlTag
from soma.notification import EditableTree, ObservableSortedDictionary, \
    ObservableAttributes, Notifier
from soma.minf.api import createMinfWriter, iterateMinf, minfFormat
from soma.html import htmlEscape
from soma.somatime import timeDifferenceToString
from soma.path import relative_path

from brainvisa.data.neuroData import *
from brainvisa.data.neuroDiskItems import *
from brainvisa.data.readdiskitem import ReadDiskItem
from brainvisa.data.writediskitem import WriteDiskItem
from brainvisa.configuration import neuroConfig
from brainvisa.data import neuroDiskItems
from brainvisa.processing import neuroLog
from brainvisa.processing import neuroException
from brainvisa.processing.neuroException import *
from brainvisa.validation import ValidationError
from brainvisa.debug import debugHere
from brainvisa.data.sqlFSODatabase import Database, NotInDatabaseError
import brainvisa.toolboxes
from brainvisa.data import fileSystemOntology
from soma.qt_gui.qt_backend.QtCore import QProcess
from brainvisa.processing.qtgui.command import CommandWithQProcess as Command
from soma import safemkdir
from soma.qtgui.api import QtThreadCall, FakeQtThreadCall

import six
from six.moves import cPickle, getcwd, StringIO
from six.moves.html_parser import HTMLParser


global _mainThreadActions
_mainThreadActions = FakeQtThreadCall()
global _defaultContext
_defaultContext = None

#----------------------------------------------------------------------------


[docs]def pathsplit(path): '''Returns a tuple corresponding to a recursive call to os.path.split for example on Unix: :: pathsplit('toto/titi/tata') == ('toto', 'titi', 'tata') pathsplit('/toto/titi/tata') == ('/', 'toto', 'titi', 'tata') ''' if isinstance(path, six.string_types): if path: return pathsplit((path, )) else: return () else: if path[0]: d, b = os.path.split(path[0]) if b: if d: return pathsplit(d) + (b,) + path[1:] else: return (b,) + path[1:] else: return (d,) + path[1:]
#----------------------------------------------------------------------------
[docs]def getProcdocFileName(processId): """ Returns the name of the file (.procdoc) that contains the documentation of the process in parameter. """ processInfo = getProcessInfo(processId) fileName = getattr(processInfo, 'fileName', None) if fileName is None: return None newFileName = os.path.join(os.path.dirname(fileName), processInfo.id + ".procdoc") return newFileName
#----------------------------------------------------------------------------
[docs]def readProcdoc(processId): """ Returns the content of the documentation file (.procdoc) of the process in parameter. """ processInfo = getProcessInfo(processId) procdoc = {} if processInfo is not None: procdoc = processInfo.procdoc if procdoc is None: procdoc = {} fileName = getProcdocFileName(processInfo) if fileName: if os.path.exists(fileName): try: procdoc = readMinf(fileName)[0] except Exception: print('!Error in!', fileName) raise else: # if the process is Capsul-derived, try to use the Capsul # process doc proc = getProcessInstance(processId) from brainvisa.processing import capsul_process if isinstance(proc, capsul_process.CapsulProcess): procdoc = proc.procdoc_from_capsul() processInfo.procdoc = procdoc return procdoc
#----------------------------------------------------------------------------
[docs]def writeProcdoc(processId, documentation): """ Writes the ``documentation`` in the process documentation file (.procdoc). """ fileName = getProcdocFileName(processId) if not os.path.exists(fileName): processInfo = getProcessInfo(processId) procFileName = getattr(processInfo, 'fileName', None) procSourceFileName = os.path.realpath(procFileName) # take care of keeping the .procdoc in the same location as the .py, # whatever symlinks if os.path.islink(procFileName) and procFileName != procSourceFileName: sourceFileName = os.path.join(os.path.dirname(procSourceFileName), os.path.basename(fileName)) if os.path.islink(fileName): os.unlink(fileName) elif os.path.exists(fileName): raise OSError('procdoc file %s should be a symlink, but ' 'is not' % fileName) os.symlink(sourceFileName, fileName) fileName = sourceFileName writeMinf(fileName, (documentation, ))
#----------------------------------------------------------------------------
[docs]def procdocToXHTML(procdoc): """ Converts HTML tags in the content of a .procdoc file to XHTML tags. Checks its syntax. """ stack = [(procdoc, key, key) for key in six.iterkeys(procdoc)] while stack: d, k, h = stack.pop() value = d[k] if isinstance(value, six.string_types): # Convert HTML tags to XML valid tags # Put all tags in lower-case because <TAG> ... </tag> is illegal # XML def lowerTag(x): result = '<' + x.group(1).lower() + x.group(2) return result value = re.sub( '<(/?[A-Za-z_][a-zA-Z_0-9]*)(>|[^a-zA-Z_0-9][^>]*>)', lowerTag, value) # Add a '/' at the end of non closed tags for l in ('img', 'br', 'hr'): expr = '<(' + l + \ '(([^A-Za-z_0-9>/]?)|([^A-Za-z_0-9][^>]*[^/>])))>(?!\s*</' + \ l + '>)' value = re.sub(expr, '<\\1/>', value) # convert <s> tag to <xhtml> tag value = re.sub( '<(/?)s(>|[^a-zA-Z_0-9][^>]*>)', '<\\1' + xhtmlTag + '\\2', value) goOn = True while goOn: goOn = False try: newValue = XHTML.buildFromHTML(value) except Exception as e: # Build a text editor from soma.qt4gui.designer import loadUi from soma.qt_gui.qt_backend import QtGui, QtCore editor = loadUi( os.path.join(mainPath, '..', 'python', 'brainvisa', 'textEditor.ui')) # editor.setAttribute( QtCore.Qt.WA_DeleteOnClose, True ) def f(): l = editor.content.textCursor().blockNumber() c = editor.content.textCursor().columnNumber() editor.cursorPosition.setText( str(l + 2) + ' : ' + str(c)) editor.info.setText( '<h2><font color="red">Error in ' + h + ':<br> ' + str(e) + '</font></h1>') editor.content.setAcceptRichText(False) editor.content.setPlainText(value) editor.content.cursorPositionChanged.connect(f) editor.btnOk.setText('Check and save as XHTML') editor.btnCancel.setText('Save as simple text') line = getattr(e, 'getLineNumber', None) if line is not None: line = line() - 2 else: line = 0 column = getattr(e, 'getColumnNumber', None) if column is not None: column = column() else: column = 0 if line == 0 and column == 0: x = re.match('^[^0-9]*:([0-9]+):([0-9]+):', str(e)) # print(x) # print(str(e)) if x: line = int(x.group(1)) - 3 column = int(x.group(2)) # it's not the column ! # (maybe the XML tag number ?) editor.content.moveCursor(QtGui.QTextCursor.Start) for i in six.moves.xrange(line - 1): editor.content.moveCursor(QtGui.QTextCursor.NextBlock) # for i in six.moves.xrange( column - 1 ): # editor.content.moveCursor( QtGui.QTextCursor.Right ) if editor.exec_() == QDialog.Accepted: value = six.text_type(editor.content.toPlainText()) goOn = True else: newValue = six.text_type(editor.content.toPlainText()) goOn = False d[k] = newValue elif type(value) is dict: stack += [(value, key, h + '.' + key) for key in six.iterkeys(value)]
#----------------------------------------------------------------------------
[docs]def getHTMLFileName(processId, documentation=None, language=None): """ Gets the path to the html page corresponding to the documentation of the process in parameter. """ processInfo = getProcessInfo(processId) if processInfo is None: # no process info: probably an iteration or a process built on-the-fly return None if documentation is None: documentation = readProcdoc(processId) if language is None: language = neuroConfig.language htmlPath = XHTML.html(documentation.get('htmlPath', '')) if htmlPath: defaultPath = htmlPath else: defaultPath = os.path.dirname(neuroConfig.docPath) return os.path.join(defaultPath, language, 'processes', processInfo.id.replace(' ', '_') + '.html')
#---------------------------------------------------------------------------- #----------------------------------------------------------------------------
[docs]def generateHTMLProcessesDocumentation(procId=None): """ Generates HTML pages for the documentation of the process in parameter or for all processes if `procId` is None. The process generateDocumentation is used. """ if procId is None: defaultContext().runProcess("generateDocumentation") else: docproc = getProcessInstance('generateDocumentation') translators = {} for l in neuroConfig._languages: translators[l] = neuroConfig.Translator(l) ontology = 'all' docproc.generateHTMLDocumentation( procId, translators, defaultContext(), ontology)
#----------------------------------------------------------------------------
[docs]def mapValuesToChildrenParameters( destNode, sourceNode, dest, source, value=None, defaultProcess=None, defaultProcessOptions={}, name=None, resultingSize=-1, allow_remove=False): """ Maps values of parameter *sourceNode*.*source* (which is a list) to *destNode*.*dest*. *value* will receive parameter value when calling this function from a link. If children are fewer than source values and a *defaultProcess* is given, lacking children are added, with chosen *defaultProcessOptions* and *name* if given. Should *resultingSize* value differ from -1, the resulting number of children will be set to this value. If *allow_remove* is False (the default), nodes will never be removed even if source list is shorter than the number of nodes. *sourceNode*, *source*, and *dest* may be lists of parameters (of matching sizes if they are lists) to map. The longest size will determine the number of nodes. *destNode* should not be a list since it is the child node we are working on. """ if not isinstance(dest, list): dest = [dest] if not isinstance(sourceNode, list): sourceNode = [sourceNode] if not isinstance(source, list): source = [source] nparam = max(len(dest), len(sourceNode), len(source)) dest += [dest[-1]] * (nparam - len(dest)) sourceNode += [sourceNode[-1]] * (nparam - len(sourceNode)) source += [source[-1]] * (nparam - len(source)) sources = [n.parseParameterString(s) for n, s in zip(sourceNode, source)] sources = [s for s in sources if s is not None] # print('destination attributes', dest, #'source nodes', len(sourceNode), #'source attributes', len(source), #'number of values:', nparam);sys.stdout.flush() # sourceObject, sourceParameter = sourceNode.parseParameterString(source) # l = getattr(sourceObject, sourceParameter, []) ls = [getattr(s[0], s[1], []) for s in sources] lsize = max([len(v) for v in ls]) csize = len(destNode.childrenNames()) if allow_remove: rsize = lsize else: # Resulting size is the max between children size and list size rsize = max(csize, lsize) # print('list of source values', ls, #'maximum size of source values', lsize, #'number of destination children', csize, #'predicted resulting size', rsize, #'fixed resulting size', resultingSize);sys.stdout.flush() if resultingSize < 0: resultingSize = rsize if (defaultProcess is not None) and (name is None): name = getProcessInstance(defaultProcess).name created_nodes = [] for i in six.moves.xrange(resultingSize): if i == csize: if defaultProcess is not None: # Add a new child node child = brainvisa.processes.ProcessExecutionNode( defaultProcess, optional=defaultProcessOptions.get( 'optional', True), selected=defaultProcessOptions.get( 'selected', True), expandedInGui=defaultProcessOptions.get( 'expandedInGui', False), altname=str(i + 1) + '. ' + getProcessInstance( defaultProcess).name ) destNode.addChild(name=name, node=child) created_nodes.append((i, child)) else: return for d, l in zip(dest, ls): if i < len(l): v = l[i] elif len(l) == 1: v = l[0] else: v = None # Set node value if len(l) > 0: k = destNode.childrenNames()[i] destChild = destNode._children[k] dst = destChild.parseParameterString(d) if d is None: # invalid destination node continue destObject, destParameter = dst destObject.setValue(destParameter, v) # update csize csize = len(destNode.childrenNames()) # trigger callbacks to fill new nodes parameters # PROBLEM: we only parse links from the *same* sourceObject - others may # come from elsewhere, we don't have them here. if len(created_nodes) != 0: for (sourceObject, s) in sources: for param, plinks in six.iteritems(sourceObject._links): for link in plinks: function = link[2] if not isinstance(function, ExecutionNode.MultiParameterLink) \ or not isinstance(function.function, partial) \ or function.function.func \ is not mapValuesToChildrenParameters: continue part_func = function.function if len(part_func.args) >= 1: dest_node = part_func.args[0] else: dest_node = part_func.kwargs.get('destNode') if dest_node is not destNode: continue if len(part_func.args) >= 2: src_node = part_func.args[1] else: src_node = part_func.kwargs.get('sourceNode') if not src_node: src_node = sourceNode if len(part_func.args) >= 3: dest_par = part_func.args[2] else: dest_par = part_func.kwargs.get('dest') if not dest_par or dest_par == dest: # we are currently doing dest continue if len(part_func.args) >= 4: src_par = part_func.args[3] else: src_par = part_func.kwargs.get('source') if not src_par: continue src = src_node.parseParameterString(src_par) if src is None: continue src_obj, src_par = src value = getattr(src_obj, src_par, []) nval = len(value) if nval == 0: continue for i, child in created_nodes: v = None if i >= nval: if nval == 1: v = value[0] else: v = value[i] if v is not None: child.setValue(dest_par, v) if allow_remove: # if we have too many nodes, remove the tailing ones while len(destNode.childrenNames()) > resultingSize: destNode.removeChild(destNode.childrenNames()[-1])
#---------------------------------------------------------------------------- def mapChildrenParametersToValues(destNode, sourceNode, dest, source, value=None): d = destNode.parseParameterString(dest) if d is None: # invalid dest node return destObject, destParameter = d r = [] for k in sourceNode.childrenNames(): sourceChild = sourceNode._children[k] src = sourceChild.parseParameterString(source) if src is None: continue sourceObject, sourceParameter = src s = getattr(sourceObject, sourceParameter, None) r.append(s) # setattr(destObject, destParameter, r) destObject.setValue(destParameter, r) #----------------------------------------------------------------------------
[docs]class Parameterized(object): """ This class represents an object that have a signature, that is to say a list of typed parameters. A `Parameterized` object can notify the changes in its signature. The parameters can be linked together, that is to say, if the value of a parameter changes, the value of all linked parameters may change. This object has an :py:func:`initialization` function that can define the links between the parameters and their initial values. :Attributes: .. py:attribute:: signature The signature is a :py:class:`brainvisa.data.neuroData.Signature`. It contains the list of parameters accepted by the object and their types. The possible types are described in :py:mod:`brainvisa.data.neuroData`. .. py:attribute:: signatureChangeNotifier This variable is a :py:class:`soma.notification.Notifier`. It calls its notify function when the signature of the :py:class:`Parameterized` object changes. """ def __init__(self, signature): # print('create Parameterized', self) self.__dict__['_deleted'] = False # safety to avoid double deletion # see http://code.activestate.com/lists/python-list/191512/ self.__dict__['signature'] = signature self._convertedValues = {} self._links = {} self._isParameterSet = {} self._isDefault = {} self._immutable = {} self._warn = {} self.signatureChangeNotifier = Notifier(1) self.deleteCallbacks = [] self._blocklinks = False for i, p in list(self.signature.items()): np = copy.copy(p) self.signature[i] = np np.copyPostprocessing() np.setNameAndParameterized(i, self) # Values initialization for i, p in self.signature.items(): self.setValue(i, p.defaultValue()) self._force_default_in_setattr = None self.initialization() # Take into account links set during self.initialization() : self.linksInitialization() self._force_default_in_setattr = False def linksInitialization(self, parameterizedObjects=None, params=None, excluded=None): if parameterizedObjects: r = [] for o in parameterizedObjects: if not isinstance(o, weakref.ProxyType): o = weakref.proxy(o) r.append(o) parameterizedObjects = r # Call parameterHasChanged for the parameters that have not their # default value anymore or that have a not None value for name in [n for n, v in self.signature.items() if ((self.__dict__[n] != v.defaultValue()) or (self.__dict__[n] != None))]: if (not params or (name in params)) \ and (not excluded or (not name in excluded)): self._parameterHasChanged(name, getattr(self, name), parameterizedObjects=parameterizedObjects) def __del__(self): if not hasattr(self, '_deleted') or self._deleted: return # print('del Parameterized', self) self._deleted = True debugHere() for x in self.deleteCallbacks: x(self) def _parameterHasChanged(self, name, newValue, parameterizedObjects=None): """ This function is called when the value of an attribute described in the signature changes. """ debug = neuroConfig.debugParametersLinks if debug: print('parameter', name, 'changed in', self, 'with value', newValue, file=debug) for function in self._warn.get(name, []): if debug: print(' call (_warn)', function, '(', name, ',', newValue, ')', file=debug) function(self, name, newValue) for parameterized, attribute, function, force, destDefaultUpdate in self._links.get(name, []): if parameterized is None: if debug: print(' call (_links)', function, '(', self, ',', self, ')', file=debug) function(self, self) elif (not parameterizedObjects) or (parameterized in parameterizedObjects): if debug: print(' ', name, 'is linked to parameter', attribute, 'of', parameterized, 'from', self, '(', len( self._links.get(name, [])), ')', file=debug) linkParamType = parameterized.signature[attribute] if not parameterized._isImmutable(attribute) and (force or parameterized.parameterLinkable(attribute, debug=debug)): linkParamDebug = getattr(linkParamType, '_debug', None) if linkParamDebug is not None: print('parameter', name, 'changed in', self, 'with value', newValue, file=linkParamDebug) if destDefaultUpdate: parameterized.setDefault( attribute, self.isDefault(name)) if function is None: if debug: print(' ' + str(parameterized) + '.setValue(', repr( attribute), ',', newValue, ')', file=debug) if linkParamDebug is not None: print(' ==> ' + str(parameterized) + '.setValue(', repr( attribute), ',', newValue, ')', file=linkParamDebug) valueSet = newValue parameterized.setValue(attribute, newValue) else: if debug: print( ' call', function, '(', parameterized, ',', self, ')', file=debug) if linkParamDebug is not None: print( ' ==> call', function, '(', parameterized, ',', self, ')', file=linkParamDebug) v = function(parameterized, self) valueSet = v if debug: print(' ' + str(parameterized) + '.setValue(', repr( attribute), ',', v, ')', file=debug) if linkParamDebug is not None: print(' ' + str(parameterized) + '.setValue(', repr( attribute), ',', v, ')', file=linkParamDebug) parameterized.setValue(attribute, v) # activate the notifier with the parameter that receive a # linked value and with the new value after evaluation of a # link function. parameterized.signature[attribute].valueLinkedNotifier.notify( parameterized, attribute, valueSet) if debug: debug.flush()
[docs] def isDefault(self, key): """Returns True if the parameter `key` has kept its default value.""" return self._isDefault.get(key, True)
[docs] def setDefault(self, key, value): """Stores if the parameter `key` have kept its default value or not.""" debug = neuroConfig.debugParametersLinks if debug: print(' setDefault(', key, ',', value, ')', file=debug) self._isDefault[key] = value
def _isImmutable(self, key): """Returns True if the parameter `key` is immutable, ie can really be changed by a link. This is an internal state, used only temporarily during parameters assignment.""" return self._immutable.get(key, False) def _clearImmutableParameters(self): self._immutable = {} def _setImmutable(self, key, value): """Stores if the parameter `key` immutable, ie can really be changed by a link.""" self._immutable[key] = value
[docs] def parameterLinkable(self, key, debug=None): """Indicates if the value of the parameter can change through a parameter link.""" if debug is None: debug = neuroConfig.debugParametersLinks result = bool(self.signature[key].linkParameterWithNonDefaultValue or self.isDefault(key)) if debug: print(' parameterLinkable =', result, file=debug) return result
[docs] def initialization(self): """This function does nothing by default but it may be overrideed in processes classes to define initial values for the parameters or links between parameters.""" pass
[docs] def checkArguments(self): """Checks the value of the parameters described in the signature.""" for p, o in self.signature.items(): o.checkValue(p, getattr(self, p, None))
[docs] def findValue(self, attributeName, value): """Calls :py:func:`setValue`.""" self.setValue(attributeName, value)
def __setattr__(self, name, value): """Calls :py:func:`setValue` if the parameter is described in the signature.""" if name in self.signature: default = self._force_default_in_setattr self.setValue(name, value, default) else: self.__dict__[name] = value
[docs] def setValue(self, name, value, default=None): """ Checks the value, sets the attribute `name`. If the value has changed, :py:meth:`_parameterHasChanged` is called to apply the links. """ debug = neuroConfig.debugParametersLinks if debug: print(str(self) + '.setValue(', repr( name), ',', repr(value), ',', repr(default), ')', file=debug) changed = False if default is not None: changed = self.isDefault(name) != default self.setDefault(name, default) if self._isParameterSet.get(name, False): oldValue = getattr(self, name, None) newValue = self.signature[name].findValue(value) changed = changed or newValue != oldValue else: self._isParameterSet[name] = True newValue = self.signature[name].findValue(value) changed = True self.__dict__[name] = newValue if changed: self._parameterHasChanged(name, newValue)
@staticmethod def _explicit_default_link_function(parameter, src_param, parameterized, source, link_attributes={}): value = getattr(source, src_param) if link_attributes: # link_attributes only works on DiskItem objects which have # attributes and their signature has the findValue() method attr_values = {} for dst_attr, src_attr in six.iteritems(link_attributes): src_spl = src_attr.split(':') if len(src_spl) >= 2: # source is "param:attribute", get value in source.param src_value = getattr(source, src_spl[0]) src_attr = src_spl[-1] else: src_value = value if src_value is None: return None attr_values[dst_attr] = src_value.get(src_attr) new_value = parameterized.signature[parameter].findValue( value, requiredAttributes=attr_values) return new_value return value
[docs] def linkParameters(self, destName, sources, function=None, link_attributes={}): """ Links the parameters. When one of the `sources` parameters change, the value of `destName` parameter may change. It is possible to give a specific link function that will be called when the link is applied but it is not mandatory, a default function exists according to the type of parameter. Parameters ---------- destName: string name of the parameter that may change when the sources parameters change. If None, the link function will be called every time the source parameters change. sources: string, tuple or list of strings one or several parameters, whose modification will activate the link function. function: function specific function to call instead of the default one when the link is activated. The signature of the function is function(self, process), returning destination link_attributes: dict (optional) A dictionary of mandatory linked attributes. This is only meaningful for DiskItem parameters, which have attributes. Attributes listed here will be added as requiredAttributes to ReadDiskItem.findValue(). The dict maps destination attributes from source parameters attributes values. Ex: * {'dst_attrib': 'src_attrib'} will get in the source parameter value the attribute 'src_attrib', and its value will be forced as the value of the 'dst_attrib' attribute of the destination parameter. * {'dst_attrib1': 'src_param1:src_attrib1', 'dst_attrib2': 'src_param2:src_attrib2'} will get the attribute value of 'src_attrib1' in source parameter 'src_param1', and set it as the value of 'dst_attrib1' of the destination parameter. Same for 'dst_attrib2', but the value will be taken from another source parameter. As values are passed as requiredAttributes, they are thus mandatory in the destination parameter value, and are "stronger" than standard links. In this respect, it can be meaningful to get an attribute which has already the same name/value in the source parameter: {'my_attrib': 'my_attrib'} will just reject values with non-matching attribute 'my_attrib' (compared to a standard link). """ if type(sources) is str: sourcesList = [sources] else: sourcesList = sources for p in [dest for dest in [destName] if dest is not None] \ + list(sourcesList): if p not in self.signature: raise ValueError( HTMLMessage(_t_('<em>%s</em> is not a valid parameter name') % p)) if function is None: function = getattr( self.signature[destName], 'defaultLinkParametersFunction', None) elif inspect.ismethod(function) and function.__self__ is self: function = partial(function.__func__, weakref.proxy(self)) for p in sourcesList: if destName is None: self._links.setdefault(p, []).append( (None, None, function, False, False)) else: if link_attributes: if function is None: function \ = partial(self._explicit_default_link_function, destName, p, link_attributes=link_attributes) else: function = partial(function, link_attributes=link_attributes) self._links.setdefault(p, []).append( (weakref.proxy(self), destName, function, False, False))
[docs] def addParameterObserver(self, parameterName, function): """Associates a callback function to the modifications of the parameter value. Parameters ---------- parameterName: str the name of the parameter whose modification will activate the callback. function: function the callback function. its signature is *function(self, parameterName, newValue)* """ minimum, maximum = numberOfParameterRange(function) if maximum == 0: tmp = lambda x, y, z, f=function: f() tmp._save_function = function function = tmp self._warn.setdefault(parameterName, []).append(function)
[docs] def removeParameterObserver(self, parameterName, function): """Removes the callback function from the parameter observers.""" l = self._warn.get(parameterName, None) if l is not None: l.remove(function) if not l: del self._warn[parameterName]
[docs] def setOptional(self, *args): """Indicates that the parameters are not mandatory.""" for k in args: self.signature[k].mandatory = False
[docs] def setMandatory(self, *args): """Indicates that the parameters are mandatory.""" for k in args: self.signature[k].mandatory = True
[docs] def setVisible(self, *args): """Indicates that the parameters are visible.""" for k in args: self.signature[k].visible = True
[docs] def setHidden(self, *args): """Indicates that the parameters are hidden.""" for k in args: self.signature[k].visible = False
[docs] def setUserLevel(self, userLevel, *args): """Assign a userLevel to a list of parameters.""" from copy import copy for k in args: self.signature[k].userLevel = copy(userLevel)
[docs] def setEnable(self, *args, **kwargs): """Indicates parameters visibility and mandatory using examples : self.setEnable( *args) self.setEnable( *args, userLevel=0) self.setEnable( *args, userLevel=0, mandatory=True) *optional keyword paramerers* userLevel: int indicates that the parameters are visible or hidden regarding the userLevel. ( default value : the previous userLevel is kept ) mandatory: boolean indicates that the parameters are mandatory(True) or optional(False). ( default value : True ) """ self.setVisible(*args) if 'userLevel' in list(kwargs.keys()): userLevel = kwargs['userLevel'] self.setUserLevel(userLevel, *args) if 'mandatory' in list(kwargs.keys()): if kwargs['mandatory']: self.setMandatory(*args) else: self.setOptional(*args) else: self.setMandatory(*args)
[docs] def setDisable(self, *args): """Indicates that the parameters are hidden and optional.""" self.setHidden(*args) self.setOptional(*args)
[docs] def setSection(self, section, *args): """Sets the section of the parameters. Parameters are then sorted by section in the GUI""" from copy import copy for k in args: self.signature[k]._section = copy(section)
[docs] def setConvertedValue(self, name, value): """Sets the value but stores the previous value in an internal dictionary.""" self._convertedValues[name] = getattr(self, name) self.__dict__[name] = value
[docs] def restoreConvertedValues(self): """Restore values as they were before conversions using the values stored in an internal dictionary.""" self.__dict__.update(self._convertedValues) self._convertedValues.clear()
[docs] def changeSignature(self, signature): """Sets a new signature. Previous values of attributes are kept if the attributes are still in the signature. Links and observer callbacks that are no more associated to the signature parameters are deleted. The :py:attr:`signatureChangeNotifier` is notified. """ # Temporarily disable setting non-default flags _force_default_in_setattr = self._force_default_in_setattr self._force_default_in_setattr = None # Change signature self.signature = signature for n in self.signature.keys(): setattr(self, n, getattr(self, n, None)) # Remove unused links for n in list(self._links.keys()): if n not in self.signature: del self._links[n] for n in list(self._warn.keys()): if n not in self.signature: del self._warn[n] # Notify listeners self.signatureChangeNotifier.notify(self) # Restore setting non-default flags self._force_default_in_setattr = _force_default_in_setattr
[docs] def clearLinksTo(self, *args): """Removes all links that have a parameter in `args` as a destination.""" for i in args: if isinstance(i, six.string_types): destObject, destParameter = None, i else: destObject, destParameter = i if destObject is None: destObject = self if destParameter not in destObject.signature: raise KeyError(_t_('Object %(object)s has no such parameter: %(param)s') % {'object': six.text_type(destObject), 'param': destParameter}) for k, l in self._links.items(): i = 0 while i < len(l): do, dp, ml, f, du = l[i] # There is no API to retrieve a strong reference from a # weakref.proxy. This hack uses an undocumented property of # weakref.proxy: it will always return the same object when # called for a given object. # # FIXME: _links should only store weakref.ref objects (no # strong references, no proxies) if ((do is destObject or do is weakref.proxy(destObject)) and destParameter == dp): del l[i] else: i += 1
[docs] def clearLinksFrom(self, *args): """Removes all links associated to a parameter in `args` as a source. """ for k in args: if k in self._links: del self._links[k]
[docs] def cleanup(self): """Removes all links, observers, and stored converted values, reinitializes the signature change notifier.""" if debugHere is not None: # at exit time, the debug module might already be gone debugHere() self._convertedValues = {} self._links = {} self._warn = {} self.signatureChangeNotifier = Notifier(1)
def convertStateValue(self, value): if value is not None and not isinstance(value, (int, float, six.string_types, list, dict, tuple)): result = six.text_type(value) elif isinstance(value, list): result = [self.convertStateValue(itervalue) for itervalue in value] elif isinstance(value, tuple): result = tuple(self.convertStateValue(itervalue) for itervalue in value) elif isinstance(value, dict): result = dict((key, self.convertStateValue(itervalue)) for key, itervalue in six.iteritems(value)) else: result = value return result def saveStateInDictionary(self, result=None): if result is None: result = {} selected = {} default = {} for n in six.iterkeys(self.signature): value = getattr(self, n, None) value = self.convertStateValue(value) if self.isDefault(n): default[n] = value else: selected[n] = value result['parameters'] = { 'selected': selected, 'default': default, } return result
#----------------------------------------------------------------------------
[docs]class Process(Parameterized): """ This class represents a Brainvisa process or pipeline. This object has a **signature** that describes its inputs and outputs and an **execution function** :py:meth:`execution`. If it is a **pipeline**, it also have an **execution node** that describes the structure of the pipeline. :Attributes: .. py:attribute:: signature The signature is a :py:class:`brainvisa.data.neuroData.Signature`. It contains the list of parameters accepted by the object and their types. The possible types are described in :py:mod:`brainvisa.data.neuroData`. .. py:attribute:: category (string) The processes are organized into categories. Generally, the category is the name of the directory where the process file is located. .. py:attribute:: userLevel (integer) The process is available in Brainvisa interface if its userLevel is lower or equal than the userLevel selected in Brainvisa options. 0 : Basic, 1: Advanced, 2: Expert. .. py:attribute:: showMaximized (boolean) If true, the process window is shown maximized with a frame around it. :Methods: .. automethod:: id .. automethod:: validation .. automethod:: execution .. automethod:: executionNode .. automethod:: setExecutionNode .. automethod:: getAllParameters .. automethod:: allProcesses .. automethod:: pipelineStructure .. automethod:: sourceFile .. automethod:: sourcePath .. automethod:: inlineGUI .. automethod:: _iterate .. automethod:: _copy """ signature = Signature() category = 'BrainVISA' userLevel = 2 showMaximized = False def __init__(self): # The following attributes can be set in user defined initialization() # mathod which is called by Parameterized constructor. Therefore, it # must be set before or never. self._executionNode = None # Copy signature because there is only one instance of each Parameter # object in signature for each Process class. This is an old mistake in # BrainVISA design, there should be one Signature instance by Process # instance. Parameterized.__init__(self, self.signature.shallowCopy()) self._log = None self._outputLog = None self._outputLogFile = None # Main processes are opposed to subprocessed. There is more information # displayed to the user on main processes. For example, start/stop # notification and time elapsed in process are only displayed on main # processes. By default all processes called by another process are not # main process. It can be changed by setting isMainProcess to True. self.isMainProcess = False if hasattr(self.__class__, '_instance'): self.__class__._instance += 1 else: self.__class__._instance = 1 self.instance = self.__class__._instance def __del__(self): if self._deleted: # print('*** Process already deleted ***') return try: Parameterized.__del__(self) except Exception: # can happen when quitting the application: the current module is # not available any longer pass def _checkIterateParam(self, warn=True, **kwargs): requiredLength = 0 for values in six.itervalues(kwargs): length = len(values) if length > 0: if requiredLength > 1 and length > 1 and requiredLength != length: if warn is True: raise Exception( _t_('all lists of arguments with more than one value must have the same size')) elif length > requiredLength: requiredLength = length return requiredLength
[docs] def _iterate(self, warn=True, **kwargs): """ Returns a list of copies of the current process with different parameters values. Parameters ---------- warn: bool raise an exception if iterated parameters lists sizes do not match kwargs: dictionary containing a list of values for each parameter name. The first value is for the first process of the iteration and so on... """ # Find iteration size requiredLength = self._checkIterateParam(warn, **kwargs) # Set lists of values finalValues = {} for key, values in six.iteritems(kwargs): if values: if len(values) == 1: finalValues[key] = [ self.signature[key].findValue(values[0])] * requiredLength else: finalValues[key] = [ self.signature[key].findValue(v) for v in values] result = [] for i in six.moves.xrange(requiredLength): p = self._copy(withparams=True) # should copy only non-default params for argumentName in finalValues.keys(): p._setImmutable(argumentName, True) for argumentName, values in six.iteritems(finalValues): p.setValue(argumentName, values[i], default=0) p._clearImmutableParameters() result.append(p) return result
[docs] def _copy(self, withparams=True): """Returns a copy of the process. The value of the parameters are also copied if withparams is True (which is the default)""" result = self.__class__() if withparams: # disable links self.blockLinks(True) # set params for (n, p) in self.signature.items(): # if not self.isDefault( n ): # result.setValue( n, getattr( self, n, None ), # default=False ) result.setValue(n, getattr(self, n, None), default=self.isDefault(n)) if self._executionNode: self._executionNode._copy( result.executionNode(), withparams=withparams) self.blockLinks(False) return result
[docs] def inlineGUI(self, values, context, parent, externalRunButton=False): """This method can be overrideed in order to specialize buttons of the process window. Parameters ---------- context: the execution context of the process parent: The parent widget Returns ------- widget: QWidget the widget containing the buttons that will replace the default buttons (Run and Iterate) """ return None
[docs] def validation(self): """This method can be overrideed in order to check if the process dependencies are available. It will be called at Brainvisa startup when the processes are loaded. If the method raises an exception, the process will not be available. :raises: :py:class:`brainvisa.validation.ValidationError` """ return 1
[docs] def id(self): """Returns the process id.""" return self._id
[docs] def sourceFile(self): """Returns the name of the source file of the process.""" return self._fileName
[docs] def sourcePath(self): """Returns the path to the source file of the process.""" return os.path.dirname(self._fileName)
def __str__(self): instance = getattr(self, '_instance', None) if instance is None: return self.id() else: return self.id() + '_' + six.text_type(instance) def addLink(self, destination, source, function=None, destDefaultUpdate=True): eNode = getattr(self, '_executionNode', None) if inspect.ismethod(function) and function.__self__ is self: function = partial(function.__func__, weakref.proxy(self)) if eNode is None: Parameterized.addLink( self, destination, source, function, destDefaultUpdate) else: eNode.addLink(destination, source, function, destDefaultUpdate)
[docs] def setExecutionNode(self, eNode): """Sets the execution node of the pipeline. Parameters ---------- eNode: :py:class:`ExecutionNode` object that describes the structure of the pipeline. """ self._executionNode = eNode
[docs] def execution(self, context): """ Execution function that is called when the process is run. """ if self._executionNode is not None: return self._executionNode.run(context) else: raise RuntimeError( HTMLMessage(_t_('No <em>execution</em> method provided')))
[docs] def executionNode(self): """Returns the execution node of the pipeline.""" return self._executionNode
[docs] def pipelineStructure(self): """Returns the description of a pipeline in a dictionary or the id of the process if it is a simple process.""" return self.id()
[docs] def allProcesses(self): """Returns the current process and all its children if it is a pipeline. :rtype: generator """ yield self if self._executionNode is not None: stack = [self._executionNode] while stack: eNode = stack.pop(0) if isinstance(eNode, ProcessExecutionNode): yield eNode._process stack.extend(eNode.children())
[docs] def parent_pipeline(self): ''' Returns the root pipeline process from which this process is part of, if it is actually part of a pipeline (otherwise None is returned). New in Axon 4.6. ''' node = getattr(self, '_parent', None) if node is None: ref_process = getattr(self, 'reference_process', None) if ref_process is not None: # point to another parent = ref_process.parent_pipeline() if parent is not None: return parent return ref_process return None return node().parent_pipeline()
[docs] def allParameterFiles(self): """Get recursively all parameters which are DiskItems, descending through the pipeline structure. """ params = [] files = set() for paramname in six.iterkeys(self.signature): param = self.__dict__.get(paramname) if param is not None and isinstance(param, DiskItem): filename = param.fullPath() if filename not in files: params.append(param) files.add(filename) # parse pipeline structure eNodes = [] eNode = self.executionNode() if eNode is not None: eNodes.append(eNode) while eNodes: eNode = eNodes.pop() eNodes += list(eNode.children()) if isinstance(eNode, ProcessExecutionNode): process = eNode._process for paramname in six.iterkeys(process.signature): param = process.__dict__.get(paramname) if param is not None and isinstance(param, DiskItem): filename = param.fullPath() if filename not in files: params.append(param) files.add(filename) return params
[docs] def saveStateInDictionary(self, result=None): """Returns the description of the process in a dictionary.""" if result is None: result = {} result['pipelineStructure'] = self.pipelineStructure() if self._executionNode is not None: if self._executionNode._parameterized is not None: Parameterized.saveStateInDictionary( self._executionNode._parameterized(), result) eNodesState = {} for eNodeKey in self._executionNode.childrenNames(): eNode = self._executionNode.child(eNodeKey) eNodeDict = {} eNode.saveStateInDictionary(eNodeDict) eNodesState[eNodeKey] = eNodeDict result['executionNodes'] = eNodesState else: Parameterized.saveStateInDictionary(self, result) return result
[docs] def getAllParameters(self): """ Returns all the parameters of the current process and its children if it is a pipeline. :returns: tuples (Parameterized, attribute name, attribute type) :rtype: generator """ stack = [self] while stack: node = stack.pop(0) if isinstance(node, Process): parameterized = node node = node.executionNode() if node is not None: stack += [node.child(i) for i in node.childrenNames()] else: parameterized = node._parameterized if parameterized is not None: parameterized = parameterized() stack += [node.child(i) for i in node.childrenNames()] if parameterized is not None: for attribute, type in six.iteritems(parameterized.signature): yield (parameterized, attribute, type)
#----------------------------------------------------------------------------
[docs]class IterationProcess(Process): """ This class represents a set of process instances that can be executed in parallel. It is used to iterate the same process on a set of data. """ def __init__(self, name, processes, base=None): self._id = name + 'Iteration' self.name = name self.instance = 1 self.base = base self._processes = [getProcessInstance(p) for p in processes] Process.__init__(self) for sp, p in zip(list(self._executionNode._children.values()), processes): if isinstance(p, ExecutionNode): sp._optional = p._optional sp._selected = p._selected # def __del__( self ): # print('del IterationProcess', self, ', children:', len( self._processes)) # print('children:', self._processes) # del self._processes # import gc # print('refs to execution node:', len( gc.get_referrers( self._executionNode ) )) # print([ i.keys() for i in gc.get_referrers( self._executionNode ) ]) def pipelineStructure(self): return {'type': 'iteration', 'name': self.name, 'children': [p.pipelineStructure() for p in self._processes]} def initialization(self): if not self.base is None: dp = self.base elif len(self._processes) != 0: dp = self._processes[0].name else: dp = None eNode = ParallelExecutionNode(self.name, stopOnError=False, possibleChildrenProcesses=dp, notify=True) for i in six.moves.xrange(len(self._processes)): n = self._processes[i].name self._processes[i].isMainProcess = True self._processes[i].name = repr(i + 1) + ". " + n eNode.addChild(name=n, node=ProcessExecutionNode(self._processes[i], optional=True, selected=True)) # Add callbacks to maintain synchronization eNode.beforeChildAdded.add( ExecutionNode.MethodCallbackProxy(self.beforeChildAdded)) eNode.beforeChildRemoved.add( ExecutionNode.MethodCallbackProxy(self.beforeChildRemoved)) self._executionNode = eNode def beforeChildAdded(self, parent, key, child): child._process.isMainProcess = True child._process.name = repr( parent._internalIndex) + ". " + child._process.name self._processes.append(child._process) def beforeChildRemoved(self, parent, key, child): if child._process in self._processes: self._processes.remove(child._process)
#----------------------------------------------------------------------------
[docs]class ListOfIterationProcess(IterationProcess): ''' An IterationProcess which has on its main signature a list of the first element of each sub-process. Used for viewers and editors of ListOf() ''' class linkP(object): def __init__(self, proc, i): self.proc = proc self.num = i def __call__(self, par): if len(self.proc.param) > self.num: return self.proc.param[self.num] def __init__(self, name, processes): IterationProcess.__init__(self, name, processes) en = self.executionNode() chs = list(en.children())[0]._process.signature self.changeSignature(Signature('param', ListOf(list(chs.values())[0]))) en._parameterized = weakref.ref(self) chkeys = list(self.executionNode()._children.keys()) for i, p in enumerate(en.children()): s = p._process.signature en.addLink(str(chkeys[i]) + '.' + list(s.keys())[0], 'param', self.linkP(self, i))
#----------------------------------------------------------------------------
[docs]class DistributedProcess(Process): """ This class represents a set of process instances that can be executed in parallel. """ def __init__(self, name, processes): self._id = name + 'DistributedIteration' self.name = name self.instance = 1 self._processes = [getProcessInstance(p) for p in processes] Process.__init__(self) for sp, p in zip(list(self._executionNode._children.values()), processes): if isinstance(p, ExecutionNode): sp._optional = p._optional sp._selected = p._selected def pipelineStructure(self): return {'type': 'distributed', 'name': self.name, 'children': [p.pipelineStructure() for p in self._processes]} def initialization(self): eNode = ParallelExecutionNode(self.name) for i in six.moves.xrange(len(self._processes)): self._processes[i].isMainProcess = True subENode = self._processes[i]._executionNode eNode.addChild(str(i), ProcessExecutionNode(self._processes[i], optional=True, selected=True)) self._executionNode = eNode
#----------------------------------------------------------------------------
[docs]class SelectionProcess(Process): """ This class represents a choice between a list of processes. """ def __init__(self, name, processes): self._id = name + 'Selection' self.name = name self.instance = 1 self._processes = [getProcessInstance(p) for p in processes] Process.__init__(self) for sp, p in zip(list(self._executionNode._children.values()), processes): if isinstance(p, ExecutionNode): sp._optional = p._optional sp._selected = p._selected def pipelineStructure(self): return {'type': 'selection', 'name': self.name, 'children': [p.pipelineStructure() for p in self._processes]} def initialization(self): eNode = SelectionExecutionNode(self.name) for i in six.moves.xrange(len(self._processes)): self._processes[i].isMainProcess = True eNode.addChild(str(i), ProcessExecutionNode(self._processes[i], optional=True, selected=True)) self._executionNode = eNode
#---------------------------------------------------------------------------- class TimeoutCall(object): def __init__(self, function, *args, **kwargs): self.function = function self.args = args self.kwargs = kwargs self.event = threading.Event() self.callFunction = 0 self.functionLock = threading.RLock() def __del__(self): self.stop() def _thread(self): self.event.wait(self.timeout) self.functionLock.acquire() try: if self.callFunction: self.function(*self.args, **self.kwargs) finally: self.functionLock.release() def start(self, timeout): self.stop() # Just in case of multiple start() call during timeout self.functionLock.acquire() try: self.callFunction = 1 self.event.clear() self.timeout = timeout threading.Thread(target=self._thread) finally: self.functionLock.release() def stop(self): self.functionLock.acquire() try: self.callFunction = 0 self.event.set() finally: self.functionLock.release() #---------------------------------------------------------------------------- def signalName(signalNumber): for key, value in signal.__dict__.items(): if key[:3] == 'SIG' and value == signalNumber: return key return str(signalNumber) #---------------------------------------------------------------------------- def escapeQuoteForShell(s): return s.replace("'", "'\"'\"'") #-------------------------------------------------------------------------
[docs]class ExecutionNode(object): """ Base class for the classes that describe a pipeline structure. """ class MultiParameterLink(object): def __init__(self, sources, dest, function): self.sources = [] for p, n in sources: if type(p) is weakref.ReferenceType: self.sources.append((p, n)) else: self.sources.append((weakref.ref(p), n)) # dest is the single destination given as a tuple containing # the parameterized object and the attribute to set self.dest = (dest[0], dest[1]) self.function = function self.hasParameterized = hasParameter(function, 'parameterized') self.hasNames = hasParameter(function, 'names') self.hasDestParameterized = hasParameter( function, 'destparameterized') self.hasDestNames = hasParameter(function, 'destnames') def __call__(self, dummy1, dummy2): kwargs = {} if self.hasParameterized: kwargs['parameterized'] = [i[0]() for i in self.sources] if self.hasNames: kwargs['names'] = [i[1] for i in self.sources] if self.hasDestParameterized: kwargs['destparameterized'] = self.dest[0] if self.hasDestNames: kwargs['destnames'] = self.dest[1] return self.function( *[getattr(i[0](), i[1], None) for i in self.sources], **kwargs) class MethodCallbackProxy(object): def __init__(self, method): self.object = weakref.ref(method.__self__) self.method = method.__func__ def __call__(self, *args, **kwargs): o = self.object() if o is not None: self.method(o, *args, **kwargs) def __eq__(self, other): if isinstance(other, ExecutionNode.MethodCallbackProxy): return self.object() == other.object() and self.method == other.method if self.object() is None: return other is None return self.method.__get__(self.object()) == other def __init__(self, name='', optional=False, selected=True, guiOnly=False, parameterized=None, expandedInGui=False): """ Parameters ---------- name: str name of the node - default ''. optional: boolean indicates if this node is optional in the pipeline - default False. selected: boolean indicates if the node is selected in the pipeline - default True. guiOnly: boolean default False. parameterized: :py:class:`Parameterized` containing the signature of the node - default None. """ # Initialize an empty execution node # print('ExecutionNode.__init__', self) self.__dict__['_deleted'] = False # safety to avoid double deletion # see http://code.activestate.com/lists/python-list/191512/ self.__dict__['_children'] = SortedDictionary() if parameterized is not None: parameterized = weakref.ref(parameterized) self.__dict__['_parameterized'] = parameterized self.__dict__['_name'] = str(name) self.__dict__['_optional'] = optional self.__dict__['_selected'] = selected self.__dict__['_guiOnly'] = guiOnly self.__dict__['_selectionChange'] = Notifier(1) self.__dict__['_expandedInGui'] = expandedInGui self.__dict__['_dependencies'] = [] self.__dict__['_parent'] = None self.__dict__['_invalid_children'] = set() def __del__(self): # print('del ExecutionNode', self) if not hasattr(self, '_deleted') or self.__dict__['_deleted']: # print('*** ExecutionNode already deleted ! ***') return self.__dict__['_deleted'] = True debugHere() def _copy(self, node, withparams=True): """ Uses non default parameters values to initialize the parameters of the node given in argument, if withparams is True (which is the default). """ # if execution node contains a process, copy the process parameters and # copy its execution node parameters if any process = getattr(self, "_process", None) if process: processCopy = node._process if withparams: for (n, v) in process.signature.items(): # if not self.isDefault( n ): # processCopy.setValue( n, getattr( process, n, None ), # default=False ) processCopy.setValue(n, getattr(process, n, None), default=process.isDefault(n)) processNode = process.executionNode() if processNode: processNode._copy( processCopy.executionNode(), withparams=withparams) node.setSelected(self._selected) # if execution node have children nodes, copy the parameters of these # nodes for name in self.childrenNames(): child = self.child(name) child._copy(node.child(name), withparams=withparams)
[docs] def addChild(self, name, node, index=None): '''Add a new child execution node. :param string name: name which identifies the node :param node: an :py:class:`ExecutionNode` which will be added to this node's children. ''' if name in self._children or name in self._invalid_children: raise KeyError( HTMLMessage(_t_('<em>%s</em> already defined') % (name, ))) if not isinstance(node, ExecutionNode): raise RuntimeError( HTMLMessage('<em>node</em> argument must be an execution node')) if not node.is_valid(): # invalid nodes are just marked in a list, not really added self._invalid_children.add(name) else: if not index is None: self._children.insert(index, name, node) else: self._children[name] = node node._parent = weakref.ref(self)
def is_valid(self): return True
[docs] def removeChild(self, name): '''Remove child execution node. :param string name: name which identifies the node ''' if name in self._invalid_children: self._invalid_children.remove(name) return None if name not in self._children: raise KeyError( HTMLMessage(_t_('<em>%s</em> not defined') % (name, ))) c = self._children[name] del self._children[name] return c
[docs] def childrenNames(self): ''' Returns the list of names of the children execution nodes. ''' return list(self._children.keys())
[docs] def children(self): """Returns the list of children execution nodes.""" return six.itervalues(self._children)
[docs] def hasChildren(self): """Returns True if this node has children.""" return bool(self._children)
[docs] def parent_node(self): ''' Returns the parent node, if any. New in Axon 4.6. ''' if self._parent is None: return None return self._parent()
[docs] def parent_pipeline(self): ''' Returns the root pipeline process from which this node is part of, if it is actually part of a pipeline (otherwise None is returned). New in Axon 4.6. ''' parent = self.parent_node() next_parent = parent while next_parent is not None: next_parent = parent.parent_node() if next_parent is not None: parent = next_parent elif parent._parameterized is not None: node = getattr(parent._parameterized(), '_parent', None) if node is not None: parent = node() next_parent = node() if parent is not None and parent._parameterized is not None: return parent._parameterized() return None
[docs] def setSelected(self, selected): """Change the selection state of the node. :param bool selected: new selection state of the node. If the selection changes, a selection change notifier is notified. """ if selected != self._selected: self._selected = selected self._selectionChange.notify(self)
[docs] def isSelected(self): """True if this node is selected.""" return self._selected
@property def parameterized(self): """The Parameterized object associated with this node. The ExecutionNode keeps only a weak reference in _parameterized. This property returns a strong reference to the Parameterized object if it is still alive, otherwise it returns None. """ weak_ref = self._parameterized if weak_ref is not None: return weak_ref() else: return None def __setattr__(self, attribute, value): """ If the attribute is in the signature of the corresponding parameterized object, it is modified. """ if self._parameterized is not None and \ attribute in self._parameterized().signature: setattr(self._parameterized(), attribute, value) elif attribute in self._children: raise RuntimeError( HTMLMessage(_t_('Direct modification of execution node <em>%s</em> is not allowed.') % (attribute, ))) else: self.__dict__[attribute] = value def __getattr__(self, attribute): p = self.__dict__.get('_parameterized') if p is not None: p = p() if p is not None and hasattr(p, attribute): return getattr(p, attribute) children = self.__dict__['_children'] if attribute in children: return children[attribute] raise AttributeError(attribute)
[docs] def child(self, name, default=None): """Get a child node by name.""" return self._children.get(name, default)
[docs] def allParameterFiles(self): """Get recursively all parameters which are DiskItems, descending through the pipeline structure. """ params = [] files = set() # parse pipeline structure eNodes = [self] while eNodes: eNode = eNodes.pop() eNodes += list(eNode.children()) if isinstance(eNode, ProcessExecutionNode): process = eNode._process for paramname in six.iterkeys(process.signature): param = process.__dict__.get(paramname) if param is not None and isinstance(param, DiskItem): filename = param.fullPath() if filename not in files: params.append(param) files.add(filename) return params
[docs] def run(self, context): """ Calls :py:meth:`_run` method if the node is selected. """ if self._optional and (not self._selected): context.write( '<font color=orange>Skip unselected node: ' + str(self.name()) + '</font>') return if self._guiOnly and not neuroConfig.gui: context.write( '<font color=orange>Skip GUI-only node: ' + str(self.name()) + '</font>') return return self._run(context)
def _run(self, context): """ Does nothing in the base class. It is overriden by derived classes. """ pass
[docs] def name(self): """Returns the name of the node.""" return self._name
[docs] def gui(self, parent, processView=None): """ Returns the graphical user interface of this node. """ from brainvisa.processing.qtgui.neuroProcessesGUI import ExecutionNodeGUI if self._parameterized is not None: if processView != None and processView.read_only: return ExecutionNodeGUI(parent, self._parameterized(), read_only=True) return ExecutionNodeGUI(parent, self._parameterized()) return None
[docs] def setNodeHidden(self, hidden=True): """ Set the status of this node for graphical user interface. """ self._hidden = hidden gui_item = getattr(self, '_guiItem', None) if gui_item: gui_item.setHidden(getattr(self, '_hidden', False))
[docs] def parseParameterString(self, parameterString): """ Returns a tuple containing the :class:`Parameterized` object of the child node indicated in the parameter string and the name of the parameter. May return None if the node is invalid. Parameters ---------- parameterString: str references a parameter of a child node with a path like <node name 1>.<node name 2>...<parameter name> """ if parameterString is None: return (None, None) l = parameterString.split('.') node = self for nodeName in l[: -1]: if not node.is_valid(): # invalid process node return None if nodeName in node._invalid_children: return None node = node.child(nodeName) parameterized = node._parameterized if parameterized is not None: parameterized = parameterized() name = l[-1] if parameterized is None or name not in parameterized.signature: raise KeyError(name) return (parameterized, name)
def saveStateInDictionary(self, result=None): if result is None: result = {} result['name'] = self._name result['selected'] = self._selected if self._parameterized is not None: Parameterized.saveStateInDictionary(self._parameterized(), result) eNodesState = {} for eNodeKey in self.childrenNames(): eNode = self.child(eNodeKey) eNodesState[eNodeKey] = eNode.saveStateInDictionary() result['executionNodes'] = eNodesState return result
[docs] def addExecutionDependencies(self, deps): '''Adds to the execution node dependencies on the execution of other nodes. This allows to build a dependencies structure which is not forced to be a tree, but can be a grapĥ. Dependencies are used to build Soma-Workflow workflows with correct dependencies. ''' if type(deps) not in (list, tuple): deps = [deps] self._dependencies += [weakref.ref(x) for x in deps]
#-------------------------------------------------------------------------
[docs]class ProcessExecutionNode(ExecutionNode): ''' An execution node that has no children and run one process ''' def __init__(self, process, optional=False, selected=True, guiOnly=False, expandedInGui=False, altname=None, skip_invalid=False): ''' Parameters ---------- process: process id or instance or class optional: bool may be unchecked in the pipeline (not run) selected: bool checked in the pipeline, will be the selected one in a selection node guiOnly: bool will be skipped in a non-interactive environment expandedInGui: bool in the tree widget altname: str alternative name displayed in the GUI skip_invalid: bool marks the node as "optional": if the process cannot be instantiated, the pipeline construction will not fail, but the node will not be added. Links involving this node will be also skipped silently. This is useful to buils pipelines with several alternative nodes which may be unavailable in some contexts (due to missing dependencies or external software, typically) ''' self.__dict__['failing_node'] = False self.__dict__['skip_invalid'] = skip_invalid if skip_invalid: try: process = getProcessInstance(process) except Exception: process = None self.__dict__['failing_node'] = True else: process = getProcessInstance(process) pname = '' if process is not None: pname = process.name # print('ProcessExecutionNode.__init__:', self, pname) ExecutionNode.__init__(self, pname, optional=optional, selected=selected, guiOnly=guiOnly, parameterized=process, expandedInGui=expandedInGui) self.__dict__['_process'] = process if process is not None: process._parent = weakref.ref(self) if altname is not None: self.__dict__['_name'] = altname if process is not None: reloadNotifier = getattr(process, 'processReloadNotifier', None) if reloadNotifier is not None: reloadNotifier.add(ExecutionNode.MethodCallbackProxy( self.processReloaded)) def __del__(self): # print('del ProcessExecutionNode', self) if not hasattr(self, '_deleted') or self._deleted: # print('*** already deleted !***') return if hasattr(self, '_process'): # print(' del proc:', self._process.name) process = self._process if process is not None: reloadNotifier = getattr( self._process, 'processReloadNotifier', None) if reloadNotifier is not None: try: l = len(reloadNotifier._listeners) z = ExecutionNode.MethodCallbackProxy( self.processReloaded) # bidouille: hack z so as to contain a weakref to None # since we are in __del__ and existing weakrefs to self # have already been neutralized class A(object): pass w = weakref.ref(A()) # w points to None immediately z.object = w x = reloadNotifier.remove(z) except AttributeError: # this try..except is here to prevent an error when quitting # BrainVisa: # ProcessExecutionNode class is set to None during module # destruction pass else: # print('del ProcessExecutionNode', self) print('no _process in ProcessExecutionNode !') try: ExecutionNode.__del__(self) except Exception: # same as above pass def is_valid(self): return not getattr(self, 'failing_node', False) def addChild(self, name, node, index=None): raise RuntimeError( _t_('A ProcessExecutionNode cannot have children')) def _run(self, context): return context.runProcess(self._process) def gui(self, parent, processView=None): if processView is not None: return ProcessView(self._process, parent, externalInfo=processView.info, read_only=processView.read_only) else: return ProcessView(self._process, parent) def children(self): eNode = getattr(self._process, '_executionNode', None) if eNode is not None: return six.itervalues(eNode._children) else: return [] def childrenNames(self): eNode = getattr(self._process, '_executionNode', None) if eNode is not None: return list(eNode._children.keys()) else: return [] def __setattr__(self, attribute, value): if self._parameterized is not None and \ attribute in self._parameterized().signature: setattr(self._parameterized(), attribute, value) else: eNode = getattr(self._process, '_executionNode', None) if eNode is not None and attribute in eNode._children: raise RuntimeError( HTMLMessage(_t_('Direct modification of execution node <em>%s</em> is not allowed.') % (attribute, ))) self.__dict__[attribute] = value def __getattr__(self, attribute): p = self.__dict__.get('_parameterized') if p is not None: # None if in deletion ? p = p() if p is not None and hasattr(p, attribute): return getattr(p, attribute) try: proc = self.__dict__.get('_process') if proc is not None: eNode = getattr(proc, '_executionNode', None) if eNode is not None: c = eNode.child(attribute) if c is not None: return c except ReferenceError: pass raise AttributeError(attribute) def child(self, name, default=None): eNode = getattr(self._process, '_executionNode', None) if eNode is not None: return eNode.child(name, default) return default
[docs] def processReloaded(self, newProcess): """ If the associated process has an attribute *processReloadNotifier*, this callback is attached to the notifier. So, the node is reloaded when the process is reloaded. """ event = ProcessExecutionEvent() event.setProcess(self._process) self._process.processReloadNotifier.remove( ExecutionNode.MethodCallbackProxy(self.processReloaded)) self.__dict__['_process'] = getProcessInstanceFromProcessEvent(event) self._process._parent = weakref.ref(self) self._process.processReloadNotifier.add( ExecutionNode.MethodCallbackProxy(self.processReloaded))
def addExecutionDependencies(self, deps): ExecutionNode.addExecutionDependencies(self, deps) eNode = self._process._executionNode if eNode: eNode.addExecutionDependencies(deps)
#-------------------------------------------------------------------------
[docs]class SerialExecutionNode(ExecutionNode): '''An execution node that run all its children sequentially''' def __init__(self, name='', optional=False, selected=True, guiOnly=False, parameterized=None, stopOnError=True, expandedInGui=False, possibleChildrenProcesses=None, notify=False): # print('SerialExecutionNode.__init__', self) ExecutionNode.__init__( self, name, optional, selected, guiOnly, parameterized, expandedInGui=expandedInGui) self.stopOnError = stopOnError self.notify = notify if possibleChildrenProcesses: if not isinstance(possibleChildrenProcesses, dict): if not isinstance( possibleChildrenProcesses, list ) \ and not isinstance(possibleChildrenProcesses, tuple): possibleChildrenProcesses = [possibleChildrenProcesses] r = {} for i in six.moves.xrange(len(possibleChildrenProcesses)): r[possibleChildrenProcesses[i]] = {'optional': True, 'selected': True, 'expandedInGui': False} possibleChildrenProcesses = r self._internalIndex = 0 self.possibleChildrenProcesses = possibleChildrenProcesses if self.notify: # Add child changes notifiers self.beforeChildRemoved = Notifier(4) self.afterChildRemoved = Notifier(4) self.beforeChildAdded = Notifier(4) self.afterChildAdded = Notifier(4) def _run(self, context): result = [] pi, p = context.getProgressInfo(self) pi.children = [None] * len(self._children) if self.stopOnError: for node in self._children.values(): npi, proc = context.getProgressInfo(node, parent=pi) context.progress() result.append(node.run(context)) del npi else: for node in self._children.values(): npi, proc = context.getProgressInfo(node, parent=pi) context.progress() try: result.append(node.run(context)) del npi except ExecutionContext.UserInterruptionStep as e: context.error(six.text_type(e)) except ExecutionContext.UserInterruption: raise except Exception as e: context.error("Error in execution node : " + six.text_type(e)) context.progress() return result def addChild(self, name=None, node=None, index=None): if self.possibleChildrenProcesses: # As it is possible to add other children from GUI # so it is necessary to suffix children keys with an internal index if name is None: if isinstance(node, ExecutionNode): name = node.name() + '_' + str(self._internalIndex) else: raise RuntimeError( HTMLMessage('<em>node</em> argument must be an execution node')) else: name += '_' + str(self._internalIndex) if not node: node = self.possibleChildrenProcesses self._internalIndex += 1 if self.notify and node.is_valid(): self.beforeChildAdded.notify( weakref.proxy(self), name, weakref.proxy(node)) super(SerialExecutionNode, self).addChild(name, node, index) if self.notify and node.is_valid(): self.afterChildAdded.notify( weakref.proxy(self), name, weakref.proxy(node)) def removeChild(self, name): valid = name not in self._invalid_children if self.possibleChildrenProcesses: if name not in self._children and valid: raise KeyError( HTMLMessage(_t_('<em>%s</em> not defined') % (name, ))) if self.notify and valid: node = self._children[name] self.beforeChildRemoved.notify( weakref.proxy(self), name, weakref.proxy(node)) super(SerialExecutionNode, self).removeChild(name) if self.notify and valid: self.afterChildRemoved.notify( weakref.proxy(self), name, weakref.proxy(node))
#-------------------------------------------------------------------------
[docs]class ParallelExecutionNode(SerialExecutionNode): """ An execution node that run all its children in any order (and in parallel if possible) """ def _run(self, context): pi, p = context.getProgressInfo(self) # do as for serial node return super(ParallelExecutionNode, self)._run(context)
#-------------------------------------------------------------------------
[docs]class SelectionExecutionNode(ExecutionNode): '''An execution node that run one of its children''' def __init__(self, *args, **kwargs): # print('SelectionExecutionNode.__init__', self) ExecutionNode.__init__(self, *args, **kwargs) def __del__(self): # print('SelectionExecutionNode.__del__', self) if not hasattr(self, '_deleted') or self._deleted: # print('*** SelectionExecutionNode already deleted') return for node in self._children.values(): node._selectionChange.remove(ExecutionNode.MethodCallbackProxy( self.childSelectionChange)) # print('__del__ finished') def _run(self, context): 'Run the selected child' if self._selected is None: raise RuntimeError(_t_('No children selected')) pi, p = context.getProgressInfo(self) pi.children = [None] for node in self._children.values(): if node._selected: npi, proc = context.getProgressInfo(node, parent=pi) context.progress() res = node.run(context) del npi context.progress() return res context.progress()
[docs] def addChild(self, name, node, index=None): 'Add a new child execution node' ExecutionNode.addChild(self, name, node, index) if node.is_valid(): node._selectionChange.add(ExecutionNode.MethodCallbackProxy( self.childSelectionChange)) node._dependencies += self._dependencies
[docs] def childSelectionChange(self, node): '''This callback is called when the selection state of a child has changed. If the child is selected, all the other children must be unselected because this node is a selectionNode.''' if node._selected: for child in self.children(): if child != node: child.setSelected(False)
def addExecutionDependencies(self, deps): ExecutionNode.addExecutionDependencies(self, deps) for node in self._children.values(): node.addExecutionDependencies(deps)
#-------------------------------------------------------------------------
[docs]class ExecutionContext(object): """ This object represents the execution context of the processes. Indeed, a process can be started in different contexts : * The user starts the process by clicking on the Run button in the graphical interface. * The process is started via a script. It is possible to run brainvisa in batch mode (without any graphical interface) and to run a process via a python function : brainvisa.processes.defaultContext().runProcess(...). * The process is a converter, so it can be run automatically by BrainVISA when a conversion is needed for another process parameters. * The process is a viewer or an editor, it is run when the user clicks on the corresponding icon to view or edit another process parameter. The interactions with the user are different according to the context. That's why the context object offers several useful functions to interact with BrainVISA and to call system commands. Here are these functions : * :py:meth:`write`, :py:meth:`warning`, :py:meth:`error` : prints a message, either in the graphical process window (in GUI mode) or in the terminal (in batch mode). * :py:meth:`log` : writes a message in the BrainVISA log file. * :py:meth:`ask`, :py:meth:`dialog` : asks a question to the user. * :py:meth:`temporary` : creates a temporary file. * :py:meth:`system`: calls a system command. * :py:meth:`runProcess` : runs a BrainVISA process. * :py:meth:`checkInterruption` : defines a breakpoint. """ remote = None class UserInterruption(Exception): def __init__(self): Exception.__init__(self, _t_('user interruption')) class UserInterruptionStep(Exception): def __init__(self): Exception.__init__( self, _t_('user interruption of current step')) class StackInfo(object): def __init__(self, process): self.process = process self.processCount = {} self.thread = None self.debug = None self.log = None self.time = time.localtime() def __init__(self, userLevel=None, debug=None): if userLevel is None: self.userLevel = neuroConfig.userLevel else: self.userLevel = userLevel # self._processStack = [] self._lock = threading.RLock() self._processStackThread = {} self._processStackHead = None self.manageExceptions = 1 self._systemOutputLevel = 0 self._stdoutInContext = False self._systemLog = None self._systemLogFile = None self._interruptionRequest = None self._interruptionActions = {} self._interruptionActionsId = 0 self._interruptionLock = threading.RLock() self._allowHistory = True def _processStack(self): self._lock.acquire() try: stack = self._processStackThread[threading.currentThread()] except Exception: stack = [] self._processStackThread[threading.currentThread()] = stack self._lock.release() return stack def _popStack(self): self._lock.acquire() stack = self._processStackThread[threading.currentThread()] stackinfo = stack.pop() if len(stack) == 0: del self._processStackThread[threading.currentThread()] if stackinfo is self._processStackHead: self._processStackHead = None self._lock.release() return stackinfo def _pushStack(self, stackinfo): self._lock.acquire() stack = self._processStack() stack.append(stackinfo) if self._processStackHead is None: self._processStackHead = stackinfo self._lock.release() def _stackTop(self): stack = self._processStack() if len(stack) == 0: return None return stack[-1] def _processStackParent(self): stack = self._processStack() if len(stack) == 0: return self._processStackHead return stack[-1] def _get_process_and_argname(self, process, param_name): '''Find process or subprocess and parameter name from a param name which may address a sub-process: subprocess.param ''' argnames = param_name.split('.') current_proc = process if len(argnames) > 1: enode_meth = getattr(current_proc, 'executionNode') if not enode_meth: raise KeyError('Process %s has no children (looking for %s)' % (current_proc.name, pname)) enode = enode_meth() for pname in argnames[:-1]: enode = enode.child(pname) current_proc = enode._process return current_proc, argnames[-1] def _setArguments(self, _process, *args, **kwargs): # Set arguments to_restore = set() for i, v in enumerate(args): n = list(_process.signature.keys())[i] _process._setImmutable(n, True) to_restore.add(_process) # performing this 2 pass loop allows to set parameters with # a forced value to immutable (ie non-linked) before actually # setting values and running links. This avoids a bunch of unnecessary # links to work (often several times) for (n, v) in kwargs.items(): proc, argname = self._get_process_and_argname(_process, n) proc._setImmutable(argname, True) to_restore.add(proc) for i, v in enumerate(args): n = list(_process.signature.keys())[i] _process.setDefault(n, 0) if v is not None: _process.setValue(n, v) else: setattr(_process, n, None) # Set kwargs in the order that they appear in the Signature, instead of # the order that the kwargs dict returns them (which is not # predictable). # FIXME: some values may not be set if they are added dynamically to # the signature by parameter links. for n in _process.signature.keys(): if n in kwargs: v = kwargs[n] proc, argname = self._get_process_and_argname(_process, n) proc.setDefault(argname, 0) if v is not None: proc.setValue(argname, v) else: setattr(proc, argname, None) for proc in to_restore: proc._clearImmutableParameters() # FIXME TODO WARNING # calling checkArguments() on children breaks links from a WriteDiskItem # to a child ReadDiskItem since it will fail if an input file is not # actually existing. But not calling it will result in errors not being # checked in children. However this used to be the behavior... _process.checkArguments() # instead of: # to_restore.add(_process) # for proc in to_restore: # proc.checkArguments() def _startProcess(self, _process, executionFunction, *args, **kwargs): if not isinstance(_process, Process): _process = getProcessInstance(_process) self._setArguments(_process, *args, **kwargs) # Launch process t = threading.Thread(target=self._processExecutionThread, args=(_process, executionFunction)) t.start() return _process
[docs] def runProcess(self, _process, *args, **kwargs): """ It is possible to call a sub-process in the current process by calling context.runProcess. The first argument is the process identifier, which is either the filename wihtout extension of the process or its english name. The other arguments are the values of the process parameters. All mandatory argument must have a value. The function returns the value returned by the sub-process execution method. *Example* >>> context.runProcess('do_something', self.input, self.output, value=3.14) In this example, the process do_something is called with self.input as the first paramter value, self.ouput as the second parameter value and 3.14 to the parameter named value. """ _process = getProcessInstance(_process) self.checkInterruption() self._setArguments(_process, *args, **kwargs) result = self._processExecution(_process, None) self.checkInterruption() if self._lastProcessRaisedException: e = self._lastProcessRaisedException self._lastProcessRaisedException = None # reset exception once used. if isinstance(e, tuple) and len(e) == 2: # e = ( exception, traceback ) six.reraise(type(e[0]), e[0], e[1]) else: raise e return result
@staticmethod def createContext(): return ExecutionContext()
[docs] def runInteractiveProcess(self, callMeAtTheEnd, process, *args, **kwargs): """ Runs a process in a new thread and calls a callback function when the execution is finished. :param function callMeAtTheEnd: callback function which will be called the process execution is finished. :param process: id of the process which will be run. """ context = self.createContext() process = getProcessInstance(process) self.checkInterruption() self._setArguments(process, *args, **kwargs) thread = threading.Thread(target=self._runInteractiveProcessThread, args=(context, process, callMeAtTheEnd)) thread.start()
def _runInteractiveProcessThread(self, context, process, callMeAtTheEnd): try: result = context.runProcess(process) except Exception as e: result = e callMeAtTheEnd(result) def _processExecutionThread(self, *args, **kwargs): self._processExecution(*args, **kwargs) neuroHierarchy.databases.currentThreadCleanup() def _processExecution(self, process, executionFunction=None): '''Execute the process "process". The value return is stored to avoid the garbage-collection of some of the objects created by the process itself (GUI for example). ''' result = None stackTop = None process = getProcessInstance(process) stack = self._processStack() stackTop = self._processStackParent() if stackTop: # if neuroConfig.userLevel > 0: # self.write( '<img alt="" src="' + os.path.join( neuroConfig.iconPath, 'icon_process.png' ) + '" border="0">' \ # + _t_(process.name) + ' '\ # + str(process.instance) + '<p>' ) # Count process execution count = stackTop.processCount.get(process._id, 0) stackTop.processCount[process._id] = count + 1 newStackTop = self.StackInfo(process) self._pushStack(newStackTop) ishead = not stackTop # Logging process start if not stackTop or stackTop.process._executionNode is None: process.isMainProcess = True try: # finally -> processFinished try: # show exception # check write parameters if the process is the main process (check all parameters in child nodes if it is a pipeline) # or if it has a parent which is not a pipeline that is to say, # the current process is run throught context.runProcess if ishead: self._allWriteDiskItems = {} if ishead or (stackTop and stackTop.process._executionNode is None): writeParameters = [] for parameterized, attribute, type in process.getAllParameters(): if isinstance(type, WriteDiskItem): item = getattr(parameterized, attribute) if item is not None: writeParameters.append(item) elif isinstance(type, ListOf) and isinstance(type.contentType, WriteDiskItem): itemList = getattr(parameterized, attribute) if itemList: writeParameters.extend( item for item in itemList if item is not None ) for item in writeParameters: dirname = os.path.dirname(item.fullPath()) if not os.path.exists(dirname): safemkdir.makedirs(dirname) uuid = item.uuid() self._allWriteDiskItems[uuid] = [ item, item.modificationHash()] if ishead: log = neuroConfig.mainLog else: if len(stack) >= 2: log = stack[-2].log else: # FIXME: # attaching to head log is not always the right solution # if a sub-process has parallel sub-nodes, then a new thread # and a new stack will be created, but the logs will not be # appended to the correct parent log = self._processStackHead.log if log is not None: # print("Create subLog for process ", process.name) newStackTop.log = log.subLog() process._log = newStackTop.log content = '<html><body><h1>' + \ _t_(process.name) + '</h1><h2>' + _t_( 'Process identifier') + '</h2>' + process._id content += '<h2>' + _t_('Execution platform') + '</h2>' if(hasattr(process, 'executionWorkflow')): content += 'Soma-Workflow' else: content += 'BrainVISA' content += '<h2>' + _t_('Parameters') + '</h2>' for n in process.signature.keys(): content += '<em>' + n + '</em> = ' + \ htmlEscape( str(getattr(process, n, None))) + '<p>' content += '<h2>' + _t_('Output') + '</h2>' try: # print("Create subTextLog for process ", process.name) process._outputLog = log.subTextLog() process._outputLogFile = open( process._outputLog.fileName, 'w') print(content, file=process._outputLogFile) process._outputLogFile.flush() content = process._outputLog except Exception: content += '<font color=red>' + \ _t_('Unabled to open log file') + \ '</font></html></body>' process._outputLog = None process._outputLogFile = None if stackTop: self._lastStartProcessLogItem = log.append( _t_(process.name), html=content, children=newStackTop.log, icon='icon_process.png') else: self._lastStartProcessLogItem = log.append( _t_(process.name) + ' ' + str(process.instance), html=content, children=newStackTop.log, icon='icon_process.png') else: newStackTop.log = None self._lastStartProcessLogItem = None if ishead and self._allowHistory: self._historyBookEvent, self._historyBooksContext = HistoryBook.storeProcessStart( self, process) self._processStarted() newStackTop.thread = threading.currentThread() self._lastProcessRaisedException = None # Check arguments and conversions def _getConvertedValue(v, p): # v: value # p: parameter (Read/WriteDiskItem) if v and getattr(v, "type", None) and ((not isSameDiskItemType(v.type, p.type)) or v.format not in p.formats): c = None formats = p.get_formats_order(v.get('_database')) for destinationFormat in formats: converter = getConversionInfo((v.type, v.format), (p.type, destinationFormat), checkUpdate=False) \ .converter() # print('converter', converter, #'source', (v.type, v.format), #'destination', (p.type, destinationFormat)) if converter: tmp = self.temporary(destinationFormat) tmp.type = v.type tmp.copyAttributes(v) convargs = {'read': v, 'write': tmp} c = getProcessInstance(converter.name) if c is not None: try: self._setArguments(c, **convargs) if c.write is not None: break except Exception: pass # if not converter: raise Exception( _t_('Cannot convert format <em>%s</em> to format <em>%s</em> for parameter <em>%s</em>') % ( _t_( v.format.name ), _t_( destinationFormat.name ), n ) ) # tmp = self.temporary( destinationFormat ) # tmp.type = v.type # tmp.copyAttributes( v ) # self.runProcess( converter.name, read = v, write = # tmp ) if not c: raise Exception( HTMLMessage(_t_('Cannot convert format <em>%s</em> to format <em>%s</em> for parameter <em>%s</em>') % (_t_(v.format.name), _t_(formats[0].name), n))) self.runProcess(c) return tmp converter = None for (n, p) in process.signature.items(): if isinstance(p, ReadDiskItem) and p.enableConversion: v = getattr(process, n) tmp = _getConvertedValue(v, p) if tmp is not None: process.setConvertedValue(n, tmp) elif isinstance(p, WriteDiskItem): v = getattr(process, n) # test if data is locked if (v is not None): if v.isLockData() and (process.execution.__func__ != super(process.__class__, process).execution.__func__): # raise an error if the diskitem is an output # of a process which has an execution function # (not the default execution function of the # Process class) raise IOError( HTMLMessage(_t_('<b>The file: <em>%s</em> is locked</b>. It cannot be opened for writing. You can unlock it if necessary using the contextual menu of the parameter %s') % (str(v), n))) # end test if data is locked if v is not None: v.createParentDirectory() elif isinstance(p, ListOf): needsconv = False converted = [] lv = getattr(process, n) for v in lv: tmp = _getConvertedValue(v, p.contentType) if tmp is not None: converted.append(tmp) needsconv = True else: converted.append(v) if needsconv: process.setConvertedValue(n, converted) if executionFunction is None: if(hasattr(process, 'executionWorkflow')): from soma.wip.application.api import Application from soma_workflow.client import WorkflowController, \ Workflow, Helper workflow = process.executionWorkflow(context=self) if isinstance(workflow, tuple): if len(workflow) == 3: jobs, dependencies, root_group = workflow else: raise ValueError( 'Bad workflow in %s.executionWorkflow()' % process.name) workflow = Workflow( jobs=jobs, dependencies=dependencies, root_group=root_group, name='brainvisa_' + process.name) controller = WorkflowController() wid = controller.submit_workflow( workflow=workflow, name=process.name) Helper.wait_workflow(wid, controller) list_failed_jobs = Helper.list_failed_jobs( wid, controller, include_aborted_jobs=True, include_user_killed_jobs=True) result = workflow configuration = Application().configuration if (len(list_failed_jobs) > 0): if not configuration.soma_workflow.somaworkflow_keep_failed_workflows: # Delete the submitted workflow controller.delete_workflow(wid, True) raise RuntimeError( 'run through soma workflow failed, see ' 'details with soma-workflow-gui') else: if not configuration.soma_workflow.somaworkflow_keep_succeeded_workflows: # Delete the submitted workflow controller.delete_workflow(wid, True) else: result = process.execution(self) else: result = executionFunction(self) except Exception as e: self._lastProcessRaisedException = (e, sys.exc_info()[2]) try: self._showException() except SystemExit as e: neuroConfig.exitValue = e.args[0] except Exception: import traceback info = sys.exc_info() sys.stderr.write( '\n%s: %s\n' % (info[0].__name__, six.text_type(info[1]))) traceback.print_tb(info[2], None, sys.stderr) logException(context=self) if self._depth() != 1 or not self.manageExceptions: raise finally: self._processFinished(result) process.restoreConvertedValues() # update history if self._allowHistory and ishead and hasattr(self, '_historyBookEvent') and self._historyBookEvent is not None: # one of these conditions may be false when an exception occurs during execution HistoryBook.storeProcessFinished( self, process, self._historyBookEvent, self._historyBooksContext) self._historyBookEvent = None self._historyBooksContext = None processWriteParameters = [] for attribute, type in six.iteritems(process.signature): if isinstance(type, WriteDiskItem): item = getattr(process, attribute) if item is not None: processWriteParameters.append(item) elif isinstance(type, ListOf) and isinstance(type.contentType, WriteDiskItem): itemList = getattr(process, attribute) if itemList: processWriteParameters.extend( item for item in itemList if item is not None ) wdi_to_release = set() #for uuid, item_hash in self._allWriteDiskItems.items(): for item in processWriteParameters: uuid = item.uuid() item_hash = self._allWriteDiskItems.get(uuid) if not item_hash: continue hash = item_hash[1] if item.isReadable(): wdi_to_release.add(uuid) if item.modificationHash() != hash: try: # do not try to insert in the database an item that doesn't have any reference to a database # or which is temporary if item.get("_database", None) and \ (not hasattr(item, '_isTemporary') or not item._isTemporary): neuroHierarchy.databases.insertDiskItem( item, update=True) except NotInDatabaseError: pass except Exception: showException() item_hash[1] = item.modificationHash() for uuid in wdi_to_release: del self._allWriteDiskItems[uuid] self._clear_unused_minfs(process) # Close output log file if process._outputLogFile is not None: print('</body></html>', file=process._outputLogFile) process._outputLogFile.close() process.outputLogFile = None if process._outputLog is not None: process._outputLog.close() process._outputLog = None if process._log is not None: process._log.close() process._log = None # Expand log to put sublogs inline log = self._stackTop().log if log is not None: if process.isMainProcess and neuroConfig.mainLog: neuroConfig.mainLog.expand() if self._depth() == 1: self._lastStartProcessLogItem = None self._popStack().thread = None # WARNING !!! not pop() return result def _clear_unused_minfs(self, process): # clear unused minfs only when the main process is finished to avoid # clearing minf that will be used in next steps if not process.isMainProcess: return writeParameters = [] for parameterized, attribute, type in process.getAllParameters(): if isinstance(type, WriteDiskItem): item = getattr(parameterized, attribute) if item is not None: writeParameters.append(item) elif isinstance(type, ListOf) and isinstance(type.contentType, WriteDiskItem): itemList = getattr(parameterized, attribute) if itemList: writeParameters.extend( item for item in itemList if item is not None ) for item in writeParameters: if not item.isReadable(): item.eraseFiles(True) def _currentProcess(self): stackTop = self._stackTop() if stackTop is None: return None else: return stackTop.process def _depth(self): return len(self._processStack()) def _showSystemOutput(self): return self._systemOutputLevel >= 0 and self.userLevel >= self._systemOutputLevel def _processStarted(self): if self._currentProcess().isMainProcess: msg = '<p><img alt="" src="' + \ os.path.join( neuroConfig.iconPath, 'process_start.png' ) + \ '" border="0"><em>' + _t_( 'Process <b>%s</b> started on %s') % \ (_t_(self._currentProcess().name), time.strftime(_t_('%Y/%m/%d %H:%M'), self._stackTop().time ) ) + \ '</em></p>' self.write(msg) def _processFinished(self, result): if self._currentProcess().isMainProcess: finalTime = time.localtime() elapsed = calendar.timegm( finalTime) - calendar.timegm(self._stackTop().time) msg = '<br><img alt="" src="' + \ os.path.join( neuroConfig.iconPath, 'process_end.png' ) + \ '" border="0"><em>' + _t_( 'Process <b>%s</b> finished on %s (%s)' ) % \ (_t_(self._currentProcess().name), time.strftime( _t_( '%Y/%m/%d %H:%M' ), finalTime), timeDifferenceToString( elapsed ) ) + \ '</em>' self.write(msg)
[docs] def system(self, *args, **kwargs): """ This function is used to call system commands. It is very similar to functions like ``os.system()`` in Python and ``system()`` in C. The main difference is the management of messages sent on standard output. These messages are intercepted and reported in *BrainVISA* interface according to the current execution context. Moreover, a command started using this function can be interrupted via the *Interrupt* button in the interface which is not the case if the python ``os.system()`` function is used directly. If the command is given as one argument, it is converted to a string and passed to the system. If there are several arguments, each argument is converted to a string, surrounded by simple quotes and all elements are joined, separated by spaces. The resulting command is passed to the system. The second method is recommended because the usage of quotes enables to pass arguments that contain spaces. The function returns the value returned by the system command. *optional keyword paramerers* outputLevel: (int) if >=0 and < userLevel, write stdout in log stdoutInContext: (bool) if True, write stdout in the current context output ignoreReturnValue: (bool) if True, ignore the command return value. Useful when you know the command will exit badly even if the work is done. stdout: (file object) if specified, stdout will be written in this stream. It may be a StringIO object. stderr: (file object) if specified, stderr will be written in this stream. It may be a StringIO object. nativeEnv: (bool or None) if True, forces execution within the "native" system environment variables (for commands external to the brainvisa binary distribution). If False, force execution within the current brainvisa environment. If None (default), guess if the executed command path is external to the main brainvisa path. cwd: (str or None) Current directory of the child process (by default or if None, it is inherited from the parent process i.e. BrainVISA). env: dict Environment variables to be set. Contrarily to soma.subprocess.Popen, they do not completely replace the current environment variables, but only add / replace the given variables. If both env and nativeEnv keyword arguments are used, nativeEnv acts before env, thus native environment can be overriden by env. *Example* >>> arg1 = 'x' >>> arg2 = 'y z' >>> context.system('command ' + arg1 + ' ' + arg2) >>> context.system('command', arg1, arg2) The first call generates the command command x y z which calls the commands with 3 parameters. The second call generates the command 'command' 'x' 'y z' which calls the command with two parameters. """ self._systemOutputLevel = kwargs.get('outputLevel', 0) old_stdoutInContext = self._stdoutInContext self._stdoutInContext = kwargs.get('stdoutInContext', False) self._stdout = kwargs.get('stdout', None) self._stderr = kwargs.get('stderr', None) ignoreReturnValue = kwargs.get('ignoreReturnValue', 0) nativeEnv = kwargs.get('nativeEnv', None) cwd = kwargs.get('cwd', None) env = kwargs.get('env', None) command = [str(i) for i in args] ret = self._system(command, self._systemStdout, self._systemStderr, nativeEnv=nativeEnv, cwd=cwd, env=env) self._stdoutInContext = old_stdoutInContext self._stdout = None self._stderr = None if ret and not ignoreReturnValue: raise RuntimeError( _t_('System command exited with non null value : %s') % str(ret)) return ret
[docs] def pythonSystem(self, *args, **kwargs): """Specialization of system() for calling Python scripts The script passed as the first agrument is called through the Python interpreter (``sys.executable``). The script will be searched in ``PATH`` and its full path will be passed to the interpreter, unless the first argument begins with ``-`` (which allows using ``-c`` or ``-m``), On Unix, this should have the same result as using system(), but on Windows the script may not be recognized as a Python script by the system, so needs this wrapping. NOTE: this method will always use the same interpreter as BrainVISA (i.e. ``sys.executable``), regardless of the shebang line in your Python script. Therefore, you should make sure that your script is compatible with both Python 2 and Python 3. """ if args[0].startswith('-'): # 1st arg is a flag, like -m or -c: use it as is exe = args[0] else: # otherwise it should be a script name: find it in the path exe = neuroConfig.executableWithPath(args[0]) return self.system(sys.executable, exe, *args[1:], **kwargs)
def _systemStdout(self, line, logFile=None): if not line: return if logFile is None: logFile = self._systemLogFile if self._stdoutInContext: lineInHTML = '<tt>' + \ htmlEscape(line).replace('\n', '<br/>') + '</tt>' self.write(lineInHTML) if self._stdout is not None: self._stdout.write(line) if logFile is not None and self._showSystemOutput(): if line[-1] not in ('\b', '\r'): logFile.write(htmlEscape(line)) logFile.flush() def _systemStderr(self, line, logFile=None): if not line: return if logFile is None: logFile = self._systemLogFile lineInHTML = '<font color=red>' + htmlEscape(line) + '</font>' self.write(lineInHTML) if self._stderr is not None: self._stderr.write(line) if logFile is not None: logFile.write(lineInHTML) logFile.flush() def _system(self, command, stdoutAction=None, stderrAction=None, nativeEnv=None, cwd=None, env=None): self.checkInterruption() stackTop = self._stackTop() if isinstance(command, six.string_types): c = Command(command) else: c = Command(*command) # Logging system call start if stackTop: log = stackTop.log else: log = neuroConfig.mainLog systemLogFile = None systemLog = None if log is not None: # print("Create subTextLog for command ", command[0]) systemLog = log.subTextLog() self._systemLog = systemLog systemLogFile = open(systemLog.fileName, 'w') self._systemLogFile = systemLogFile log.append(c.commandName(), html=systemLog, icon='icon_system.png') try: commandName = distutils.spawn.find_executable(c.commandName()) if not commandName: commandName = c.commandName() if systemLogFile: print('<html><body><h1>' + commandName + ' </h1><h2>' + _t_('Command line') + '</h2><code>' + htmlEscape(str(c)) + '</code></h2><h2>' + _t_( 'Output') + '</h2><pre>', file=systemLogFile) systemLogFile.flush() # if self._showSystemOutput() > 0: # self.write( '<img alt="" src="' + os.path.join( neuroConfig.iconPath, # 'icon_system.png' ) + '">' + c.commandName() + '<p>' ) # Set working directory for the command if cwd is not None: c.setWorkingDirectory(cwd) # Set environment for the command if env is not None or nativeEnv is not None: own_env = {} if nativeEnv or \ (nativeEnv is None and not commandName.startswith(os.path.dirname(neuroConfig.mainPath))): # external command if neuroConfig.brainvisaSysEnv: own_env.update( neuroConfig.brainvisaSysEnv.getVariables()) if env is not None: own_env.update(env) c.setEnvironment(own_env) if stdoutAction is not None: if stdoutAction is self._systemStdout: c.setStdoutAction(lambda line: stdoutAction(line, logFile=systemLogFile)) else: c.setStdoutAction(stdoutAction) if stderrAction is not None: if stderrAction is self._systemStderr: c.setStderrAction(lambda line: stderrAction(line, logFile=systemLogFile)) else: c.setStderrAction(stderrAction) retry = 1 first = True while (retry > 0): try: c.start() retry = 0 except RuntimeError as e: if c.error() == QProcess.FailedToStart: if first: retry = 2 first = False else: retry = retry - 1 self._systemStderr(six.text_type(e) + "\n", systemLogFile) if (retry != 0): self._systemStderr( "Try to restart the command...\n", systemLogFile) else: raise e else: raise e intActionId = self._addInterruptionAction(c.stop) try: try: result = c.wait() finally: self._removeInterruptionAction(intActionId) except Command.UserInterruption: pass self.checkInterruption() if systemLogFile is not None: print('</pre><h2>' + _t_('Result') + '</h2>' + _t_('Value returned') + ' = ' + str(result) + '</body></html>', file=systemLogFile) finally: if systemLogFile is not None: systemLogFile.close() self._systemLogFile = None if systemLog is not None: systemLog.close() # no need to expand the log associated to the command as it is the log of the parent process, # it will be expanded at the end of the process # unless we are not during a process execution... if log is not None and log is not neuroConfig.mainLog and \ self._processStackHead is None: log.expand() self._systemLog = None return result
[docs] def temporary(self, format, diskItemType=None, suffix=None, prefix=None): """ This method enables to create a temporary DiskItem. The argument format is the temporary data format. The optional argument type is the data type. It generates one or several unique filenames (according to the format) in the temporary directory of BrainVISA (it can be changed in BrainVISA configuration). No file is created by this function. The process has to create it. The temporary files are deleted automatically when the temporary diskitem returned by the function is no later used. *Example* >>> tmp = context.temporary('GIS image') >>> context.runProcess('threshold', self.input, tmp, self.threshold) >>> tmp2 = context.temporary('GIS image') >>> context.system('AimsMorphoMath', '-m', 'ero', '-i', tmp.fullPath(), '-o', tmp2.fullPath(), '-r', self.size) >>> del tmp In this example, a temporary data in GIS format is created and it is used to store the output of the process threshold. Then a new temporary data is created to store the output of a command line. At the end, the variable tmp is deleted, so the temporary data is no more referenced and the corresponding files are deleted. """ result = getTemporary(format, diskItemType, suffix=suffix, prefix=prefix) return result
[docs] def write(self, *messages, **kwargs): """ This method is used to print information messages during the process execution. All arguments are converted into strings and joined to form the message. This message may contain HTML tags for an improved display. The result vary according to the context. If the process is run via its graphical interface, the message is displayed in the process window. If the process is run via a script, the message is displayed in the terminal. The message can also be ignored if the process is called automatically by brainvisa or another process. :: context.write("Computing threshold of <i>", self.input.name, "</i>..." ) """ self.checkInterruption() if messages: msg = u' '.join(six.text_type(i) for i in messages) self._writeMessageInLogFile(msg) self._write(msg)
def _writeMessageInLogFile(self, msg): stackTop = self._stackTop() if stackTop: outputLogFile = stackTop.process._outputLogFile if outputLogFile and not outputLogFile.closed: print(msg, file=outputLogFile) outputLogFile.flush() def _write(self, html): neuroException.print_html(html, file=sys.stdout, flush=True)
[docs] def warning(self, *messages): """ This method is used to print a warning message. This function adds some HTML tags to change the appearance of the message and calls the :py:meth:`write` function. """ self.checkInterruption() bmsg = '<table width=100% border=1><tr><td><font color=orange><img alt="WARNING: " src="' \ + os.path.join(neuroConfig.iconPath, 'warning.png') + '">' emsg = '</font></td></tr></table>' self.write(*((bmsg, ) + messages + (emsg, )))
[docs] def error(self, *messages): """ This method is used to print an error message. Like the above function, it adds some HTML tags to change the appearance of the message and calls :py:meth:`write` function. """ self.checkInterruption() bmsg = '<table width=100% border=1><tr><td><font color=red><img alt="ERROR: " src="' \ + os.path.join(neuroConfig.iconPath, 'error.png') + '">' emsg = '</font></td></tr></table>' self.write(*((bmsg, ) + messages + (emsg, )))
[docs] def ask(self, message, *buttons, **kwargs): """ This method asks a question to the user. The message is displayed and the user is invited to choose a value among the propositions. The method returns the index of the chosen value, beginning by 0. If the answer is not valid, the returned value is -1. Sometimes, when the process is called automatically (in batch mode), these calls to context.ask are ignored and return directly -1 without asking question. *Example* >>> if context.ask('Is the result ok ?', 'yes', 'no') == 1: >>> try_again() """ self.checkInterruption() self.write('<pre>' + message) i = 0 for b in buttons: self.write(' %d: %s' % (i, str(b))) i += 1 sys.stdout.write('Choice: ') sys.stdout.flush() line = sys.stdin.readline()[:-1] self.write('</pre>') try: result = int(line) except Exception: result = None return result
[docs] def dialog(self, *args): """ This method is available only in a graphical context. Like ask, it is used to ask a question to the user, but the dialog interface is customisable. It is possible to add a signature to the dialog : fields that the user has to fill in. *Example* >>> dial = context.dialog(1, 'Enter a value', Signature('param', Number()), _t_('OK'), _t_('Cancel')) >>> dial.setValue('param', 0) >>> r = dial.call() >>> if r == 0: >>> v=dial.getValue('param') """ self.checkInterruption() return None
def _showException(self, showCallStack=False): stackTop = self._stackTop() try: self.checkInterruption() except Exception: pass beforeError = _t_('in <em>%s</em>') % ( _t_(stackTop.process.name) + ' ' + str(stackTop.process.instance)) exceptionType, exceptionMessage, _traceBack = sys.exc_info() msgFull = self._messageFromException( beforeError, (exceptionType, exceptionMessage, _traceBack)) msgBasic = self._messageFromException( beforeError, (exceptionType, exceptionMessage, None)) self.checkInterruption() isNonExpertUser = neuroConfig.userLevel < 2 if(isNonExpertUser): self._write(msgBasic) else: self._write(msgFull) self._writeMessageInLogFile(msgFull) if neuroConfig.fastStart and not neuroConfig.gui: sys.exit(1)
[docs] def showException(self, beforeError='', afterError='', exceptionInfo=None): '''same as the global brainvisa.processing.neuroException.showException() but displays in the current context (the process output box for instance) ''' if exceptionInfo is None: exceptionInfo = sys.exc_info() stackTop = self._stackTop() if not beforeError: beforeError = _t_('in <em>%s</em>') % ( _t_(stackTop.process.name) + ' ' + str(stackTop.process.instance)) exceptionType, exceptionMessage, _traceBack = exceptionInfo msgFull = self._messageFromException( beforeError, (exceptionType, exceptionMessage, _traceBack), afterError=afterError) isNonExpertUser = neuroConfig.userLevel < 2 if(isNonExpertUser): msgBasic = self._messageFromException( beforeError, (exceptionType, exceptionMessage, None)) self._write(msgBasic) else: self._write(msgFull) self._writeMessageInLogFile(msgFull)
def _messageFromException(self, beforeError, exceptionInfo, afterError=''): exceptionMsgFull = exceptionMessageHTML( exceptionInfo, beforeError=beforeError, afterError=afterError) + '<hr>' + exceptionTracebackHTML(exceptionInfo) return '<table width=100% border=1><tr><td>' + exceptionMsgFull + '</td></tr></table>'
[docs] def checkInterruption(self): """ This function is used to define breakpoints. When the process execution reach a breakpoint, the user can interrupt the process. There are 4 types of breakpoints automatically added : * before each system call (context.system) * after each system call (context.system) * before each sub-process call (context.runProcess) * after each sub-process call (context.runProcess) To allow the user to interrupt the process at another place, you have to use the function context.checkInterruption. If the user has clicked on the Interrupt button while the process runs, it will stop when reaching the checkInterruption point. """ self._interruptionLock.acquire() try: self._checkInterruption() exception = self._interruptionRequest if exception is not None: self._interruptionRequest = None raise exception finally: self._interruptionLock.release()
def _checkInterruption(self): self._interruptionLock.acquire() try: if self._interruptionRequest is not None: for function, args, kwargs in self._interruptionActions.values(): function(*args, **kwargs) self._interruptionActions.clear() finally: self._interruptionLock.release() return None def _addInterruptionAction(self, function, *args, **kwargs): self._interruptionLock.acquire() try: result = self._interruptionActionsId self._interruptionActionsId += 1 self._interruptionActions[result] = (function, args, kwargs) finally: self._interruptionLock.release() return result def _removeInterruptionAction(self, number): self._interruptionLock.acquire() try: if number in self._interruptionActions: del self._interruptionActions[number] finally: self._interruptionLock.release() def _setInterruptionRequest(self, interruptionRequest): self._interruptionLock.acquire() try: self._interruptionRequest = interruptionRequest self._checkInterruption() finally: self._interruptionLock.release()
[docs] def log(self, *args, **kwargs): """ `context.log(what, when=None, html='', children=[], icon=None)` This method is used to add a message to BrainVISA log. The first parameter what is the name of the entry in the log, the message to write is in the html parameter. """ stackTop = self._stackTop() if stackTop: logFile = stackTop.log else: logFile = neuroConfig.mainLog if logFile is not None: logFile.append(*args, **kwargs)
[docs] def getConverter(self, source, dest, checkUpdate=True): """ Gets a converter process which can convert the source diskitem from its format to the destination format. """ # Check and convert source type if isinstance(source, DiskItem): source = (source.type, source.format) elif isinstance(source, ReadDiskItem): if source.formats: source = (source.type, source.formats[0]) else: source = (source.type, None) # Check and convert dest type if isinstance(dest, DiskItem): dest = (dest.type, dest.format) elif isinstance(dest, ReadDiskItem): if dest.formats: dest = (dest.type, dest.formats[0]) else: dest = (dest.type, None) st, sf = source dt, df = dest return getConverter((getDiskItemType(st), getFormat(sf)), (getDiskItemType(dt), getFormat(df)), checkUpdate=checkUpdate)
def createProcessExecutionEvent(self): from brainvisa.history import ProcessExecutionEvent event = ProcessExecutionEvent() process = None if hasattr(self, 'process'): process = self.process stack = self._processStack() if stack: if process is None: process = stack[0].process log = stack[0].log if log is not None: event.setLog(log) event.setProcess(process) return event def _attachProgress(self, parent, count=None, process=None): '''Create a new ProgressInfo object. If parent is provided, it is the parent ProgressInfo, or the parent process. If not specified, the new ProgressInfo will be attached to the top-level ProgressInfo in the context. count is the number of children that the new ProgressInfo will hold. It is not the maximum value of a numeric progress value (see progress() method). process is the current (child) process which will be attached with the new ProgressInfo. This method is called internally in pipeline execution nodes. Regular processes need not to call it directly. They should call getProcessInfo() instead. ''' if parent is not None: parent, parentproc = self._findProgressInfo(parent) if parent is None: parent = self._topProgressinfo() # if parent is not None and len( parent.children ) == 0: # parent.childrendone = 0 # reset if parent is None: parent = ProgressInfo() self._progressinfo = weakref.ref(parent) if process is None: pi = parent pi.children = [None] * count else: pi = ProgressInfo(parent, count, process=process) else: pi = ProgressInfo(parent, count, process=process) pig = self._topProgressinfo() if process is not None: plist = getattr(pig, 'processes', None) if plist is None: plist = weakref.WeakKeyDictionary() pig.processes = plist plist[process] = weakref.ref(pi) return pi
[docs] def getProgressInfo(self, process, childrencount=None, parent=None): '''Get the progress info for a given process or execution node, or create one if none already exists. A regular process may call it. The output is a tuple containing the ProgressInfo and the process itself, just in case the input process is in fact a ProgressInfo instance. A ProgressInfo has no hard reference in BrainVISA: when you don't need it anymore, it is destroyed via Python reference counting, and is considered done 100% for its parent. :param childrencount: it is the number of children that the process will have, and is not the same as the own count of the process in itself, which is in addition to children (and independent), and specified when using the progress() method. ''' pinfo, process = self._findProgressInfo(process) if pinfo is None: if process is None: pinfo = self._topProgressinfo() if pinfo is None: pinfo = self._attachProgress(parent=parent, process=process, count=childrencount) return pinfo, process
def _topProgressinfo(self): '''internal.''' if hasattr(self, '_progressinfo'): return self._progressinfo() return None def _findProgressInfo(self, processOrProgress): '''internal.''' if processOrProgress is None: return None, None if isinstance(processOrProgress, ProgressInfo): return processOrProgress, getattr(processOrProgress, 'process', None) # in case it is a ProcessExecutionNode down = True while down: down = False if isinstance( processOrProgress, ExecutionNode ) and \ hasattr(processOrProgress, '_process'): processOrProgress = processOrProgress._process down = True if not isinstance( processOrProgress, ExecutionNode ) and \ hasattr(processOrProgress, '_executionNode'): p = processOrProgress._executionNode if p is not None: processOrProgress = p down = True if hasattr(processOrProgress, '_progressinfo'): return processOrProgress._progressinfo, processOrProgress pinfo = getattr(self, '_progressinfo', None) if pinfo is None: return None, processOrProgress procs = getattr(self._topProgressinfo(), 'processes', None) if procs is None: return None, processOrProgress pi = procs.get(processOrProgress, None) if pi is not None: pi = pi() return pi, processOrProgress
[docs] def progress(self, value=None, count=None, process=None): '''Set the progress information for the parent process or ProgressInfo instance, and output it using the context output mechanisms. :param value: is the progress value to set. If none, the value will not be changed, but the current status will be shown. :param count: is the maximum value for the process own progress value (not taking children into account). :param process: is either the calling process, or the ProgressInfo. ''' if value is not None: pinfo, process = self.getProgressInfo(process) pinfo.setValue(value, count) tpi = self._topProgressinfo() if tpi is not None: self.showProgress(tpi.value() * 100)
[docs] def showProgress(self, value, count=None): '''Output the given progress value. This is just the output method which is overriden in subclassed contexts. Users should normally not call it directory, but use progress() instead. ''' if count is None: self.write('progress:', value, '% ...') else: self.write('progress:', value, '/', count, '...')
#----------------------------------------------------------------------------
[docs]class ProgressInfo(object): '''ProgressInfo is a tree-like structure for progression information in a process or a pipeline. The final goal is to provide feedback to the user via a progress bar. ProgressInfo has children for sub-processes (when used in a pipeline), or a local value for its own progression. A ProgressInfo normally registers itself in the calling Process, and is destroyed when the process is destroyed, or when the process _progressinfo variable is deleted. ''' def __init__(self, parent=None, count=None, process=None): ''' :param parent: is a ProgressInfo instance. :param count: is a number of children which will be attached. :param process: is the calling process. ''' if count is None: self.children = [] else: self.children = [None] * count self.done = False self.childrendone = 0 if parent is not None: parent.attach(self) self.parent = parent # prevent parent from deleting self._localvalue = 0. self._localcount = None if process is not None: self.process = weakref.ref(process) # process._progressinfo = self else: self.process = None def __del__(self): if self.process is not None: proc = self.process() if proc is not None and hasattr(proc, '_progressinfo'): del proc._progressinfo
[docs] def value(self): '''Calculate the progress value including those of children. ''' if self.done: return 1. n = self.childrendone + len(self.children) if self._localcount is not None: n += 1 # self._localcount if n == 0: return self._localvalue done = float(self.childrendone) for c in self.children: if c is not None: done += c().value() if c().done: self._delchild(c) done += self._localvalue return done / n
[docs] def setValue(self, value, count=None): '''Set the ProgressInfo own progress value (not its children) ''' if count is None: count = self._localcount else: self._localcount = count if self.done: if ( count is not None and value != count ) \ or (count is None and value != 1.): self.done = False if count: value = float(value) / count self._localvalue = value
[docs] def setdone(self): '''Marks the ProgressInfo as done 100%, the children are detached. ''' self.done = True del self.children
def _delchild(self, child): if child in self.children: del self.children[self.children.index(child)] self.childrendone += 1 if len(self.children) == 0: self.done = True
[docs] def attach(self, pinfo): '''Don't use this method directly, it is part of the internal mechanism, called by the constructor. ''' wr = weakref.ref(pinfo, self._delchild) if None in self.children: i = self.children.index(None) self.children[i] = wr else: self.children.append(wr) self.done = False
def debugDump(self): print('ProgressInfo:', self) if hasattr(self, 'process'): print(' process:', self.process) print(' local value:', self._localvalue, '*', self._localcount) print(' value:', self.value()) print(' children:', len(self.children) + self.childrendone) print(' done:', self.childrendone) print(' not started:', len([x for x in self.children if x is None])) print(' running:') todo = [(x(), 1) for x in self.children if x is not None] while len(todo) != 0: pi, indent = todo[0] del todo[0] print(' ' * indent, pi, pi.childrendone, end=' ') if hasattr(pi, 'process'): print(pi.process()) else: print() todo = [(x(), indent + 1) for x in pi.children if x is not None] + todo
#----------------------------------------------------------------------------
[docs]class ProcessInfo(object): """ This object stores information about a process. Such objects are created at BrainVISA startup when the processes are loaded. .. py:attribute:: id Id of the process. It is the name of the file without extension in lowercase. .. py:attribute:: name Name of the process as it is displayed in the GUI. .. py:attribute:: signature Process excepted parameters. .. py:attribute:: userLevel User level needed to see the process. .. py:attribute:: category Process category path: <toolbox>/<category1>/<category2>/... .. py:attribute:: showMaximized Process window maximized state. .. py:attribute:: fileName Path to the file containing the source code of the process. .. py:attribute:: roles Tuple containing the specific roles of the process: viewer, converter, editor, importer. .. py:attribute:: valid False if the validation method of the process fails - default True. .. py:attribute:: procdoc The content of the .procdoc file associated to this process in a dictionary. It represents the documentation of the process. .. py:attribute:: toolbox The id of the toolbox containing the process. .. py:attribute:: module Module path to the source of the process related to the toolbox directory. <processes>.<category1>...<process> """ def __init__(self, id, name, signature, userLevel, category, fileName, roles, toolbox, module=None, showMaximized=False): self.id = id self.name = name # TODO: Signature cannot be pickeled self.signature = None self.userLevel = userLevel self.category = category self.fileName = fileName self.roles = tuple(roles) self.valid = True # set to False if process' validation method fails self.procdoc = None self.toolbox = toolbox self.showMaximized = showMaximized if module is None: for p in (neuroConfig.mainPath, neuroConfig.homeBrainVISADir): if self.fileName.startswith(p): module = split_path(self.fileName[len(p) + 1:]) if module: if module[0] == 'toolboxes': module = module[2:] module = '.'.join(module) if module.endswith('.py'): module = module[:-3] self.module = module
[docs] def html(self): """ Returns the process information in html format. """ return '\n'.join(['<b>' + n + ': </b>' + six.text_type(getattr(self, n)) + '<br>\n' for n in ('id', 'name', 'toolbox', 'signature', 'userLevel', 'category', 'fileName', 'roles')])
#----------------------------------------------------------------------------
[docs]def getProcessInfo(processId): """ Gets information about the process whose id is given in parameter. :return type: :py:class:`ProcessInfo` """ if isinstance(processId, ProcessInfo): result = processId else: if isinstance(processId, six.string_types): processId = processId.lower() result = _processesInfo.get(processId) if result is None: process = getProcess(processId, checkUpdate=False) if process not in (None, type(None)) and hasattr(process, '_id'): result = _processesInfo.get(process._id.lower()) return result
#----------------------------------------------------------------------------
[docs]def addProcessInfo(processId, processInfo): """Stores information about the process.""" _processesInfo[processId.lower()] = processInfo
#----------------------------------------------------------------------------
[docs]def getProcess(processId, ignoreValidation=False, checkUpdate=True): """ Gets the class associated to the process id given in parameter. When the processes are loaded, a new class called :py:class:`NewProcess` is created for each process. This class inherits from :py:class:`Process` and adds an instance counter which is incremented each time a new instance of the process is created. :param processId: the id or the name of the process, or a dictionary *{'type' : 'iteration|distributed|selection', 'children' : [...] }* to create an :py:class:`IterationProcess`, a :py:class:`DistributedProcess` or a :py:class:`SelectionProcess`. :param boolean ignoreValidation: if True the validation function of the process won't be executed if the process need to be reloaded - default False. :param boolean checkUpdate: If True, the modification date of the source file of the process will be checked. If the file has been modified since the process loading, it may need to be reloaded. The user will be asked what he wants to do. Default True. :returns: a :py:class:`NewProcess` class which inherits from :py:class:`Process`. """ global _askUpdateProcess if processId is None: return None if isinstance(processId, Process) \ or (isinstance(processId, type) and issubclass(processId, Process)) \ or isinstance(processId, weakref.ProxyType): result = processId id = getattr(processId, '_id', None) if id is not None: process = getProcess(id, checkUpdate=False) if process is not None: result = process else: if isinstance(processId, Process): result = processId.__class__ else: result = processId elif isinstance(processId, dict): if processId['type'] == 'iteration': return IterationProcess(processId.get('name', 'Iteration'), [getProcessInstance(i) for i in processId['children']]) elif processId['type'] == 'distributed': return DistributedProcess(processId.get('name', 'Distributed iteration'), [getProcessInstance(i) for i in processId['children']]) elif processId['type'] == 'selection': return SelectionProcess(processId.get('name', 'Selection'), [getProcessInstance(i) for i in processId['children']]) else: raise TypeError(_t_('Unknown process type: %s') % (six.text_type(processId['type']))) else: if isinstance(processId, six.string_types): if processId.startswith('capsul://'): from brainvisa.processing import capsul_process capsul_pid = processId[9:] instance = capsul_process.CapsulProcess. \ axon_process_from_capsul_module(capsul_pid) return instance.__class__ processId = processId.lower() result = _processes.get(processId) if result is None: info = _processesInfo.get(processId) if info is None: info = _processesInfoByName.get(processId) if info is not None: result = _processes.get(info.id.lower()) if result is None: result = readProcess( info.fileName, ignoreValidation=ignoreValidation) checkUpdate = False if result is not None: # Check if process source file have changed if checkUpdate: fileName = getattr(result, '_fileName', None) if fileName is not None: ask = _askUpdateProcess.get(result._id, 0) # if the user choosed never updating the process, no need to # check if it needs update if (ask != 2): ntime = os.path.getmtime(fileName) if ntime > result._fileTime: update = 0 if ask == 0: # if neuroConfig.userLevel > 0: r = defaultContext().ask(_t_('%s has been modified, would you like to update the process <em>%s</em> processes ? You should close all processes windows before reloading a process.') % (result._fileName, _t_(result.name)), _t_('Yes'), _t_('No'), _t_('Always'), _t_('Never')) if r == 0: update = 1 elif r == 2: update = 1 _askUpdateProcess[result._id] = 1 elif r == 3: update = 0 _askUpdateProcess[result._id] = 2 elif ask == 1: update = 1 if update: result = readProcess(fileName) return result
#----------------------------------------------------------------------------
[docs]def getProcessInstanceFromProcessEvent(event): """ Gets an instance of a process described in a :py:class:`brainvisa.history.ProcessExecutionEvent`. :param event: a :py:class:`brainvisa.history.ProcessExecutionEvent` that describes the process: its structure and its parameters. :returns: an instance of the :py:class:`NewProcess` class associated to the described process. Parameters may have been set. """ pipelineStructure = event.content.get('id') if pipelineStructure is None: pipelineStructure = event.content.get('pipelineStructure') result = getProcessInstance(pipelineStructure) procs = set() if result is not None: # 1st pass: disable links to parameters which have a saved value selected = event.content.get('parameters', {}).get('selected', {}) procs.add(result) for n, v in six.iteritems(selected): try: result._setImmutable(n, True) except KeyError: pass defaultp = event.content.get('parameters', {}).get('default', {}) for n, v in six.iteritems(defaultp): try: result._setImmutable(n, True) except KeyError: pass stackp = [(result.executionNode(), k, e.get('parameters'), e['selected'], e.get('executionNodes', {})) for k, e in six.iteritems(event.content.get('executionNodes', {}))] stack = list(stackp) # copy list while stack: eNodeParent, eNodeName, eNodeParameters, eNodeSelected, eNodeChildren = stack.pop( 0) eNode = eNodeParent.child(eNodeName) if eNode: eNode.setSelected(eNodeSelected) if eNodeParameters: for n, v in six.iteritems(eNodeParameters['selected']): try: eNode._setImmutable(n, True) procs.add(eNode) except KeyError: pass for n, v in six.iteritems(eNodeParameters['default']): try: eNode._setImmutable(n, True) procs.add(eNode) except KeyError: pass stackadd = [(eNode, k, e.get('parameters'), e['selected'], e.get('executionNodes', {})) for k, e in six.iteritems(eNodeChildren)] stackp += stackadd stack += stackadd # 2nd pass: now really set values, in the order that they appear in the # Signature, instead of the order of dict entries (which is not # predictable). # FIXME: some values may not be set if they are added dynamically to # the signature by parameter links. for n in result.signature.keys(): if n in selected: try: result.setValue(n, selected[n], default=False) except KeyError: pass for n in result.signature.keys(): if n in defaultp: v = defaultp[n] try: result.setValue(n, v, default=True) except KeyError: pass except Exception: defaultContext().showException( beforeError='<em>while loading process %s, setting ' 'parameter %s with value: %s</em>' % (result.name, n, str(v))) stack = stackp for eNodeParent, eNodeName, eNodeParameters, eNodeSelected, eNodeChildren in stack: eNode = eNodeParent.child(eNodeName) if eNode: eNode.setSelected(eNodeSelected) if eNodeParameters: # FIXME: do these parameters concern the ExecutionNode # itself or are they forwarded to the underlying # Parameterized? In the latter case, they should also be # set in the order of its signature. for n, v in six.iteritems(eNodeParameters['selected']): try: eNode.setValue(n, v, default=False) except KeyError: pass for n, v in six.iteritems(eNodeParameters['default']): try: eNode.setValue(n, v, default=True) except KeyError: pass for p in procs: p._clearImmutableParameters() windowGeometry = event.content.get('window') if windowGeometry is not None: result._windowGeometry = windowGeometry return result
#----------------------------------------------------------------------------
[docs]def getProcessFromExecutionNode(node): """ Gets a process instance corresponding to the given execution node. :param node: a process :py:class:`ExecutionNode` :returns: According to the type of node, it returns: * a :py:class:`NewProcess` instance if the node is :py:class:`ProcessExecutionNode`, * an :py:class:`IterationProcess` if the node is a :py:class:`SerialExecutionNode` * a :py:class:`DistributedProcess` if the node is a :py:class:`ParallelExecutionNode` * a :py:class:`SelectionProcess` if the node is a :py:class:`SelectionExecutionNode`. """ nt = type(node) if nt is ProcessExecutionNode: return node._process elif nt is SerialExecutionNode: return IterationProcess(node.name(), node.children()) elif isinstance(node, ParallelExecutionNode): return DistributedProcess(node.name(), list(node._children.values())) elif nt is SelectionExecutionNode: return SelectionProcess(node.name(), node.children())
#----------------------------------------------------------------------------
[docs]def getProcessInstance(processIdClassOrInstance, ignoreValidation=False): """ Gets an instance of the process given in parameter. Parameters ---------- processIdClassOrInstance: a process id, name, class, instance, execution node, or a the name of a file containing a backup copy of a process. ignoreValidation: bool (optional) if True, a validation failure will not prevent from building an instance of the process. Returns ------- an instance of the :py:class:`NewProcess` class associated to the described process. """ if isinstance(processIdClassOrInstance, weakref.ProxyType): processIdClassOrInstance = copy.copy(processIdClassOrInstance) result = getProcess(processIdClassOrInstance, ignoreValidation=ignoreValidation) if isinstance(processIdClassOrInstance, Process): if result is processIdClassOrInstance or result is processIdClassOrInstance.__class__: result = processIdClassOrInstance else: event = ProcessExecutionEvent() event.setProcess(processIdClassOrInstance) try: result = getProcessInstanceFromProcessEvent(event) except KeyError: # custom process built on-the-fly ? return the same instance result = processIdClassOrInstance elif result is None: if isinstance(processIdClassOrInstance, ExecutionNode): result = getProcessFromExecutionNode(processIdClassOrInstance) else: try: if (isinstance(processIdClassOrInstance, six.string_types) or hasattr(processIdClassOrInstance, 'readline')) and minfFormat(processIdClassOrInstance)[1] == minfHistory: event = readMinf(processIdClassOrInstance)[0] result = getProcessInstanceFromProcessEvent(event) if result is not None and isinstance(processIdClassOrInstance, six.string_types): result._savedAs = processIdClassOrInstance except IOError as e: raise KeyError('Could not get process "' + repr(processIdClassOrInstance) + '": invalid identifier or process file: ' + repr(e)) elif not isinstance(result, Process): result = result() return result
#----------------------------------------------------------------------------
[docs]def allProcessesInfo(): """ Returns a list of :py:class:`ProcessInfo` objects for the loaded processes. """ return list(_processesInfo.values())
#----------------------------------------------------------------------------
[docs]def getConverter(source, destination, checkUpdate=True): """ Gets a converter (a process that have the role converter) which can convert data from source format to destination format. Such converters can be used to extend the set of formats that a process accepts. :param source: tuple (type, format). If a converter is not found directly, parent types are tried. :param destination: tuple (type, format) :param boolean checkUpdate: if True, Brainvisa will check if the converter needs to be reloaded. Default True. :returns: the :py:class:`NewProcess` class associated to the found converter. """ global _processes result = _converters.get((source, destination)) if result is None: result = _converters_to.get(destination, {}).get(source) if result is None: dt, df = destination st, sf = source if isSameDiskItemType(st, dt): while result is None and st: st = st.parent result = _converters.get(((st, sf), (st, df))) return getProcess(result, checkUpdate=checkUpdate)
#-----------------------------------------------------------------------------
[docs]def resetConverters(): """ Reset converters dictionaries. """ global _converters, _converters_dist, _converters_from, _converters_to _converters = {} _converters_dist = {} _converters_from = {} _converters_to = {}
#-----------------------------------------------------------------------------
[docs]def registerConverter(source, dest, process_id): """ Registers converter from source to destination. :param source: tuple (type, format). If a converter is not found directly, parent types are tried. :param dest: tuple (type, format). If a converter is not found directly, parent types are tried. :param proc: :py:class:`NewProcess` class associated to the converter """ global _converters, _converters_dist, _converters_from, _converters_to # print('===== registering converter for (', source, dest, '):', proc) _converters[(source, dest)] = process_id _converters_dist[(source, dest)] = ((0, 0), process_id) _converters_from.setdefault(source, {})[dest] = process_id _converters_to.setdefault(dest, {})[source] = process_id
#----------------------------------------------------------------------------- def __getConverters(registry, key, keepType=1, checkUpdate=True): """ Get converters from a registry using a key (type and format). If keepType is False, parent types are tried to get converters. :param registry: dict of dict of converters. :param key: tuple (type, format). The type and format to get converters for. :param boolean keepType: if True, parent types won't be tried. Default True. :param boolean checkUpdate: if True, Brainvisa will check if the converters needs to be reloaded. Default True. :returns: a dict (type, format) -> :py:class:`NewProcess` class associated to the found converter. """ type, format = key stack = [] result = {} # Stack converters for parent types while type: d = registry.get((type, format), {}) if keepType: result = d break # Stack types to update stack.append((type, d)) type = type.parent # Update converters from the stack while len(stack) > 0: type, d = stack.pop() result.update(d) converters = {} for k, p in six.iteritems(result): try: pp = getProcess(p, checkUpdate=checkUpdate) converters[k] = pp except Exception: pass # invalid process return converters #-----------------------------------------------------------------------------
[docs]def getConvertersTo(dest, keepType=1, checkUpdate=True): """ Gets the converters which can convert data to destination format. :param destination: tuple (type, format). If a converter is not found directly, parent types are tried. :param boolean keepType: if True, parent type won't be tried. Default True. :param boolean checkUpdate: if True, Brainvisa will check if the converters needs to be reloaded. Default True. :returns: a dict (type, format) -> :py:class:`NewProcess` class associated to the found converter. """ global _converters_to return __getConverters(_converters_to, dest, keepType=keepType, checkUpdate=checkUpdate)
#----------------------------------------------------------------------------
[docs]def getConvertersFrom(source, keepType=1, checkUpdate=True): """ Gets the converters which can convert data from source format to whatever format. :param source: tuple (type, format). If a converter is not found directly, parent types are tried. :param boolean checkUpdate: if True, Brainvisa will check if the converters needs to be reloaded. Default True. :returns: a dict (type, format) -> :py:class:`NewProcess` class associated to the found converter. """ global _converters_from return __getConverters(_converters_from, source, keepType=keepType, checkUpdate=checkUpdate)
#-----------------------------------------------------------------------------
[docs]def getNearestConverter(source, dest, checkUpdate=True): """ Gets the nearest converter which can convert data from source to dest. :param source: tuple (type, format). If a converter is not found directly, parent types are tried. :param dest: tuple (type, format). If a converter is not found directly, parent types are tried. :param boolean checkUpdate: if True, Brainvisa will check if the converters needs to be reloaded. Default True. :returns: a tuple ((source_distance, dest_distance), process) :py:class:`NewProcess` class associated to the found converter. """ global _converters_dist stack = [] result = _converters_dist.get((source, dest)) if result is None: # print('Updating converter for source', source, 'dest', dest) # Go through the list of converters import itertools dst_type, dst_format = dest src_type, src_format = source src_parents = src_type.levels() dst_parents = dst_type.levels() result = ((-1, -1), None) for (sl, s), (dl, d) in itertools.product(src_parents, dst_parents): r = _converters_dist.get(((s, src_format), (d, dst_format))) if r is not None: (csl, cdl), c = r if c: result = ((sl + csl, dl + cdl), c) break _converters_dist[(source, dest)] = result if result[1] is not None: return (result[0], getProcess(result[1], checkUpdate=checkUpdate)) else: return ((-1, -1), None)
#----------------------------------------------------------------------------
[docs]def getConverters(): """ Gets the converter name list. """ global _converters results = [] for v in six.itervalues(_converters): if not v in results and type(v) == str: results.append(v) return sorted(results)
#-------------------------------------------------------------------------
[docs]def getDiskItemSourceInfo(source): """ Gets a tuple containing source :py:class:`neuroDiskItems.DiskItemType` and :py:class:`neuroDiskItems.Format`. :param source: a :py:class:`neuroDiskItems.DiskItem`, a list of :py:class:`neuroDiskItems.DiskItem` (only the first will be taken into account), a tuple (type, format). """ if isinstance(source, DiskItem): return (source.type, source.format) elif isinstance(source, list): if source != [] and isinstance(source[0], DiskItem): return (source[0].type, source[0].format) # TODO: May a proper exception should be raised in case # that no DiskItem was given in the source list, because currently # t0 and f have no valid values in the else case else: t, f = source return (t, f)
#-------------------------------------------------------------------------
[docs]class DiskItemConversionInfo(object): """ Contains information about :py:class:`neuroDiskItems.DiskItem` conversions. The conversion needs a source and a destination The object can be used to determine, if the conversion is possible, if it uses inheritance mechanisms, if it needs type conversion, if it needs format conversion. If the conversion is possible, a distance between :py:class:`neuroDiskItems.DiskItem` can be processed. This is useful to relevantly sort processes.DiskItemConversionInfo. """ def __init__(self, source, dest, checkUpdate=True): """ :param source: the source of :py:class:`neuroDiskItems.DiskItem` conversion. It can be :py:class:`neuroDiskItems.DiskItem`, a list of :py:class:`neuroDiskItems.DiskItem` (only the first will be taken into account), a tuple (type, format). :param dest: the destination of :py:class:`neuroDiskItems.DiskItem` conversion. It can be :py:class:`neuroDiskItems.DiskItem`, a list of :py:class:`neuroDiskItems.DiskItem` (only the first will be taken into account), a tuple (type, format). :param boolean checkUpdate: if the converters :py:class:`NewProcess` must be reloaded when they changed. Default True. """ self.source = source self.dest = dest self.__converter = None self.__converter_distance = (-1, -1) self.__need_conversion = -1 self.__types_distance = -1 self.__formats_distance = -1 self.__check_update = checkUpdate self.__updated = False self.__update__() def __update__(self): """ Update processes.DiskItemConversionInfo internal information. :param boolean checkUpdate: if the converters :py:class:`NewProcess` must be reloaded when they changed. Default True. """ if not self.__updated: ts, fs = getDiskItemSourceInfo(self.source) td, fd = getDiskItemSourceInfo(self.dest) converter = None converter_dist = (-1, -1) need_conversion, types_dist, formats_dist = (-1, -1, -1) # Search type matching the inheritance tree types_dist = ts.inheritanceLevels(td) if fs == fd: formats_dist = 0 if types_dist != -1 and formats_dist == 0: # No conversion is necessary need_conversion = 0 else: # Sets distance of 1 between types if source type and destination # are not the same and need conversion if types_dist != 0: types_dist = 1 formats_dist = -1 converter_dist, converter = getNearestConverter( (ts, fs), (td, fd), checkUpdate=self.__check_update) if converter is not None: need_conversion = 1 formats_dist = 0 if fs == fd else 1 # Store found converters self.__converter = converter self.__converter_distance = converter_dist # Store distance information self.__need_conversion = need_conversion self.__types_distance = types_dist self.__formats_distance = formats_dist self.__updated = True
[docs] def exactDestTypeConverter(self): """ Get converter between source and exact destination type if needed and possible. :returns: the converter :py:class:`NewProcess` to exact destination type if needed and possible else None. """ if self.__converter_distance[1] == 0: return self.__converter return None
[docs] def converter(self, exactConversionTypeOnly=False): """ Get converter between source and destination type if needed and possible. :returns: the converter :py:class:`NewProcess` if needed and possible else None. """ if (not exactConversionTypeOnly or self.__converter_distance[1] == 0): return self.__converter
[docs] def distance(self, useInheritanceOnly=False, exactConversionTypeOnly=False): """ Gets a distance between source :py:class:`neuroDiskItems.DiskItem` and destination :py:class:`neuroDiskItems.DiskItem` of the processes.DiskItemConversionInfo. This is useful to relevantly sort processes.DiskItemConversionInfo :param boolean useInheritanceOnly: Specify if the distance must be processed only when :py:class:`neuroDiskItems.DiskItemType` of the source inherits from :py:class:`neuroDiskItems.DiskItemType` of the destination and does not need a :py:class:`NewProcess` converter call. Default False. :param boolean exactConversionTypeOnly: Specify if the conversion is valid only when converter is registered for the exact destination :py:class:`neuroDiskItems.DiskItemType`. Default False :returns: None when conversion is not possible, else a tuple that contains: 1) if the conversion needs a call to a :py:class:`NewProcess` converter. When no converter call is needed, the value is 0, else the value is 1. 2) the distance between source :py:class:`neuroDiskItems.DiskItemType` and destination :py:class:`neuroDiskItems.DiskItemType`. When source type is equal to destination type, the value is 0. When source inherits from destination and no conversion is needed, the value is the number of levels between them, else the value is 1. 3) if the source :py:class:`neuroDiskItems.Format` is identical to destination :py:class:`neuroDiskItems.Format` value is 0, else 1 """ if self.__types_distance > -1 \ and self.__formats_distance > -1 \ and self.__need_conversion > -1 \ and (not useInheritanceOnly or self.__need_conversion == 0) \ and (self.__need_conversion == 0 or not exactConversionTypeOnly or self.exactDestTypeConverter() is not None): return (self.__need_conversion, self.__types_distance, self.__formats_distance) else: return None
[docs] def exists(self, useInheritanceOnly=False, exactConversionTypeOnly=False): """ Checks that a conversion between source :py:class:`neuroDiskItems.DiskItem` and destination :py:class:`neuroDiskItems.DiskItem` exists. :param boolean useInheritanceOnly: Specify if the check must only consider inheritance between source :py:class:`neuroDiskItems.DiskItemType` and destination :py:class:`neuroDiskItems.DiskItemType`, without checking existing converter :py:class:`NewProcess`. Default False. :param boolean exactConversionTypeOnly: Specify if the conversion is valid only when converter is registered for the exact destination :py:class:`neuroDiskItems.DiskItemType`. Default False :returns: True if conversion is possible between source :py:class:`neuroDiskItems.DiskItem` and destination :py:class:`neuroDiskItems.DiskItem`, else False """ return self.distance(useInheritanceOnly=useInheritanceOnly, exactConversionTypeOnly=exactConversionTypeOnly) \ is not None
#------------------------------------------------------------------------- _conversion_infos = dict()
[docs]def getConversionInfo(source, dest, checkUpdate=True): """ Gets information about conversion of a source :py:class:`neuroDiskItems.DiskItem` and destination :py:class:`neuroDiskItems.DiskItem`. :param source: the source of :py:class:`neuroDiskItems.DiskItem` conversion. It can be :py:class:`neuroDiskItems.DiskItem`, a list of :py:class:`neuroDiskItems.DiskItem` (only the first will be taken into account), a tuple (type, format). :param dest: the destination of :py:class:`neuroDiskItems.DiskItem` conversion. It can be :py:class:`neuroDiskItems.DiskItem`, a list of :py:class:`neuroDiskItems.DiskItem` (only the first will be taken into account), a tuple (type, format). :param boolean checkUpdate: if the converters :py:class:`NewProcess` must be reloaded when they changed. Default True. :returns: a :py:class:`processes.DiskItemConversionInfo` """ global _conversion_infos try: ts, fs = getDiskItemSourceInfo(source) td, fd = getDiskItemSourceInfo(dest) except TypeError: # may occur if source or dest is not valid return None # Check that conversion info is in cache for source->dest or initialize it ci = _conversion_infos.setdefault((ts, fs), dict()) \ .setdefault((td, fd), None) # print('===== getting from cache source', (ts, fs), \ #'dest', (td, fd), 'ci', ci) if ci is None: # Update cache for source->dest conversion ci = DiskItemConversionInfo((ts, fs), (td, fd), checkUpdate=checkUpdate) _conversion_infos[(ts, fs)][(td, fd)] = ci return ci
#-------------------------------------------------------------------------
[docs]class ProcessSet(set): """ Bases: :py:class:`set` Set of processes that can process :py:class:`neuroDiskItems.DiskItem` of a particular :py:class:`neuroDiskItems.DiskItemType` and :py:class:`neuroDiskItems.Format` """ def __init__(self, type, format, ids=(), listof=False): """ :param :py:class:`neuroDiskItems.DiskItemType` type: type of :py:class:`neuroDiskItems.DiskItem` that can be used by the set of processes. :param :py:class:`neuroDiskItems.Format` format: format of :py:class:`neuroDiskItems.DiskItem` that can be used by the processes of the ProcessSet. :param tuple ids: identifiers of processes :py:class:`NewProcess` of the ProcessSet. :param bool listof: if the registered processes are able to process a list of :py:class:`neuroDiskItems.DiskItem` or not. """ set.__init__(self, ids) if type is None: raise TypeError('Type can not be None') if not issubclass(type.__class__, neuroDiskItems.DiskItemType): raise TypeError('Class of type %s must be a subclass of' ' DiskItemType' % type) if format is None: raise TypeError('Format can not be None') self.__type = type self.__format = format self.__listof = listof
[docs] def source(self): """ :returns: a tuple containing :py:class:`neuroDiskItems.DiskItemType` and :py:class:`neuroDiskItems.Format` associated to the ProcessSet. """ return (self.__type, self.__format)
[docs] def accept(self, source, enableConversion=1, exactConversionTypeOnly=False, checkUpdate=True): """ Check that a :py:class:`neuroDiskItems.DiskItem` source is processable using the processes of the current ProcessSet :param int enableConversion: if the source can be converted to be processed by the processes of the current ProcessSet. Default 1. :param boolean checkUpdate: if the converters :py:class:`NewProcess` must be reloaded when they changed. Default True. :param boolean exactConversionTypeOnly: Specify if the conversion is valid only when converter is registered for the exact destination :py:class:`neuroDiskItems.DiskItemType`. Default False :returns: True if the source is processable using the processes of the current ProcessSet, else False. """ return getConversionInfo(source, self.source(), checkUpdate=checkUpdate) \ .exists(useInheritanceOnly=not enableConversion)
[docs] def processes(self, checkUpdate=True): """ Get processes associated to the current ProcessSet and filtered for the current user level :param boolean checkUpdate: if the processes :py:class:`NewProcess` must be reloaded when they changed. Default True. :returns: a list of processes :py:class:`NewProcess` associated to the current ProcessSet and filtered for the current user level. """ # Filter processes using user level procs = [] for v in self: p = getProcess(v, checkUpdate=checkUpdate) if p and p.userLevel <= neuroConfig.userLevel: procs.append(p) return procs
#-------------------------------------------------------------------------
[docs]def getProcessesBySourceDist(registry, source, enableConversion=1, exactConversionTypeOnly=False, checkUpdate=True): """ Get processes :py:class:`NewProcess` able to process a :py:class:`neuroDiskItems.DiskItem` source using a registry. Processes are ordered using the distance of :py:class:`processes.DiskItemConversionInfo` between the :py:class:`neuroDiskItems.DiskItem` source to process and the :py:class:`neuroDiskItems.DiskItem` source registered for the process. The processes registered with closest sources appear first. :param dict registry: the registry of :py:class:`processes.ProcessSet`. Keys are a tuple containing :py:class:`neuroDiskItems.DiskItemType` and :py:class:`neuroDiskItems.Format`. Values are :py:class:`processes.ProcessSet`. :param int enableConversion: if the source can be converted to be processed. Default 1. :param boolean exactConversionTypeOnly: Specify if the conversion is valid only when converter is registered for the exact destination :py:class:`neuroDiskItems.DiskItemType`. Default False :param boolean checkUpdate: if processes :py:class:`NewProcess` must be reloaded when they changed. Default True. :returns: a list of processes :py:class:`NewProcess`. """ # Create a list of process set that can be ordered by distance vector r = list() if not source is None: src_type, src_format = source converters = getConvertersFrom(source, keepType=0, checkUpdate=checkUpdate) possible_sources = [source] possible_sources += [(p, src_format) for p in src_type.parents()] possible_sources += list(converters.keys()) for s in possible_sources: ps = registry.get(s) if ps is not None: ci = getConversionInfo(source, s, checkUpdate=checkUpdate) if ci is not None: d = ci.distance(useInheritanceOnly=not enableConversion, exactConversionTypeOnly=exactConversionTypeOnly) if d is not None: r.append((d, ps)) # Sort process sets using distance vector s = sorted(r) # Create result list r = list() unique = set() for d, ps in s: for p in ps.processes(checkUpdate=checkUpdate): if not p in unique: unique.add(p) r.append(p) # print(p.name, 'with distance', d) return r
#----------------------------------------------------------------------------
[docs]def getDefaultListOfProcesses(source, role, enableConversion=1, checkUpdate=True, process=None): """ Get a default processes able to play the given role for a list of :py:class:`neuroDiskItems.DiskItem`. role: str the role of processes to get (mainly 'viewer' or 'editor) enableConversion: int if the source can be converted to be used. Default 1. checkUpdate: boolean if processes :py:class:`NewProcess` must be reloaded when they changed. Default True. :returns: Callable Object that can play the given role for a list of :py:class:`neuroDiskItems.DiskItem`. """ if role == 'viewer': default_func = getViewer elif role == 'editor': default_func = getDataEditor else: raise RuntimeError('Unable to retrieve processes with role %s' % role) t, f = getDiskItemSourceInfo(source) pcs = [] if isinstance(source, tuple) and len(source) == 2: pcs = [default_func(source, enableConversion=enableConversion, checkUpdate=checkUpdate, process=process)] else: pcs = [default_func(s, enableConversion=enableConversion, checkUpdate=checkUpdate, process=process) for s in source] if len(pcs) != 0 and None not in pcs: if role in ('viewer', 'editor'): proc = getProcess('inspectMultipleData') if proc is not None: return proc class iterproc(object): def __init__(self, name, procs): self.name = name self.procs = procs def __call__(self): ip = ListOfIterationProcess(self.name, self.procs) return ip return iterproc(_t_('%s for list of ' % role.title()) + t.name, pcs)
#-------------------------------------------------------------------------
[docs]def getProcessesBySource(source, role, enableConversion=1, checkUpdate=True, listof=False, process=None, check_values=False): """ Get processes :py:class:`NewProcess` able to play the given role for a :py:class:`neuroDiskItems.DiskItem` source. Processes are ordered using the distance of :py:class:`processes.DiskItemConversionInfo` between the :py:class:`neuroDiskItems.DiskItem` source to use and the :py:class:`neuroDiskItems.DiskItem` source registered for the process. The processes registered with closest sources appear first. Parameters ---------- role: str the role of processes to get (mainly 'viewer' or 'editor') enableConversion: int if the source can be converted to find appropriate processes. Default 1. checkUpdate: boolean if processes :py:class:`NewProcess` must be reloaded when they changed. Default True. listof: boolean if the processes :py:class:`NewProcess` must be able to use list of :py:class:`neuroDiskItems.DiskItem`. Default False. process: None or NewProcess class or instance if specified, specialized processes having a variable 'allowed_processes' which list this process, will be sorted first check_values: bool if True, check if the 1st parameter of each process actually accepts the source value. This is not always true because some filtering may happen using some requiredAttribues. Returns ------- processes: a list of processes :py:class:`NewProcess`. """ if role == 'viewer': global _viewers global _listViewers if not listof: registry = _viewers else: registry = _listViewers default_list_of_func = getDefaultListOfViewer elif role == 'editor': global _dataEditors global _listDataEditors if not listof: registry = _dataEditors else: registry = _listDataEditors default_list_of_func = getDefaultListOfDataEditor else: raise RuntimeError('Unable to retrieve processes with role %s' % role) # Create a list of processset that can be ordered by distance vector r = getProcessesBySourceDist(registry, getDiskItemSourceInfo(source), enableConversion=enableConversion, exactConversionTypeOnly=True, checkUpdate=checkUpdate) # print('=== found process by distance', r, 'with process:', process) if listof: # Gets default process for list dv = default_list_of_func(source, process=process) if dv: r.append(dv) if process is not None: r1 = [] r2 = [] for p in r: rp = getattr(p, 'allowed_processes', None) if rp: if (isinstance(rp, types.FunctionType) and rp(process)) \ or (not isinstance(rp, types.FunctionType) and process.id() in rp): r1.append(p) else: r2.append(p) r = r1 + r2 else: r = [p for p in r if not hasattr(p, 'allowed_processes')] if check_values and isinstance(source, DiskItem): r2 = [] for p in r: # get process signature, 1st param param = p.signature[list(p.signature.keys())[0]] if param.findValue(source) == source: r2.append(p) r = r2 return r
#----------------------------------------------------------------------------
[docs]def runProcessBySource(source, role, context=None, process=None, continueOnError=True): """ Searches for a viewer for source data and runs the process. If viewer fail to display source, tries to get another. :param source: a :py:class:`neuroDiskItems.DiskItem` or something that enables to find a :py:class:`neuroDiskItems.DiskItem`. :param context: the :py:class:`ExecutionContext`. If None, the default context is used. :returns: the result of the execution of the found viewer. """ def __runProcess(runnable_proc, source, reference_proc): runnable_proc = getProcessInstance(runnable_proc) if reference_proc is not None \ and hasattr(runnable_proc, 'allowed_processes'): runnable_proc.reference_process = reference_proc # print('__runProcess => Try to run ', runnable_proc.name, 'for', # source) context.runProcess(runnable_proc, source) if not isinstance(source, DiskItem): source = ReadDiskItem('Any Type', list(formats.keys())).findValue(source) if context is None: context = defaultContext() if role == 'viewer': __get_procs = getViewers elif role == 'editor': __get_procs = getDataEditors else: raise RuntimeError('Unable to retrieve processes with role %s' % role) runnable_procs = __get_procs(source, checkUpdate=False, process=process) for runnable_proc in runnable_procs: if continueOnError: try: return __runProcess(runnable_proc, source, process) except Exception as e: print('Failed to run process', runnable_proc.name, 'for', source) continue else: return __runProcess(runnable_proc, source, process)
#-------------------------------------------------------------------------
[docs]def getViewers(source, enableConversion=1, checkUpdate=True, listof=False, process=None, check_values=False): """ Get viewers :py:class:`NewProcess` able to visualize a :py:class:`neuroDiskItems.DiskItem` source. Viewers are ordered using the distance of :py:class:`processes.DiskItemConversionInfo` between the :py:class:`neuroDiskItems.DiskItem` source to visualize and the :py:class:`neuroDiskItems.DiskItem` source registered for the viewer. The viewers registered with closest sources appear first. Parameters ---------- enableConversion: int if the source can be converted to be visualized. Default 1. checkUpdate: boolean if converters and viewers :py:class:`NewProcess` must be reloaded when they changed. Default True. listof: boolean if the viewers :py:class:`NewProcess` must be able to display list of :py:class:`neuroDiskItems.DiskItem`. Default False. process: None or NewProcess class or instance if specified, specialized viewers having a variable 'allowed_processes' which list this process, will be sorted first check_values: bool if True, check if the 1st parameter of each viewer actually accepts the source value. This is not always true because some filtering may happen using some requiredAttribues. Returns ------- viewers: a list of viewers :py:class:`NewProcess`. """ return getProcessesBySource(source, 'viewer', enableConversion=enableConversion, checkUpdate=checkUpdate, listof=listof, process=process, check_values=check_values)
#----------------------------------------------------------------------------
[docs]def getDefaultListOfViewer(source, enableConversion=1, checkUpdate=True, process=None): """ Get a default viewer for a list of :py:class:`neuroDiskItems.DiskItem`. :param int enableConversion: if the source can be converted to be visualized. Default 1. :param boolean checkUpdate: if converters and viewers :py:class:`NewProcess` must be reloaded when they changed. Default True. :returns: Callable Object that can be used to visualize a list of :py:class:`neuroDiskItems.DiskItem`. """ return getDefaultListOfProcesses(source, 'viewer', enableConversion=enableConversion, checkUpdate=checkUpdate, process=process)
#----------------------------------------------------------------------------
[docs]def getViewer(source, enableConversion=1, checkUpdate=True, listof=False, index=0, process=None, check_values=False): """ Gets a viewer (a process that have the role viewer) which can visualize source data. The viewer is returned only if its userLevel is lower than the current userLevel. Parameters ---------- source: a :py:class:`neuroDiskItems.DiskItem`, a list of :py:class:`neuroDiskItems.DiskItem` (only the first will be taken into account), a tuple (type, format). enableConversion: boolean if True, a viewer that accepts a format in which source can be converted is also accepted. Default True checkUpdate: boolean if True, Brainvisa will check if the viewer needs to be reloaded. Default True. listof: boolean If True, we need a viewer for a list of data. If there is no specific viewer for a list of this type of data, a :py:class:`ListOfIterationProcess` is created from the associated simple viewer. Default False. index: int index of the viewer to find. Default 0. process: None or NewProcess class or instance if specified, specialized viewers having a variable 'allowed_processes' which list this process, will be sorted first check_values: bool if True, check if the 1st parameter of viewer actually accepts the source value. This is not always true because some filtering may happen using some requiredAttribues. Returns ------- viewer: the :py:class:`NewProcess` class associated to the found viewer or None if not found at the specified index. """ vl = getViewers(source, enableConversion=enableConversion, checkUpdate=checkUpdate, listof=listof, process=process, check_values=check_values) # print('===== getViewer, viewers list', [v.name for v in vl], '=====') if len(vl) > index: # print('===== getViewer, found viewer ', vl[index].name, '=====') return vl[index]
#----------------------------------------------------------------------------
[docs]def runViewer(source, context=None, process=None): """ Searches for a viewer for source data and runs the process. If viewer fail to display source, tries to get another. :param source: a :py:class:`neuroDiskItems.DiskItem` or something that enables to find a :py:class:`neuroDiskItems.DiskItem`. :param context: the :py:class:`ExecutionContext`. If None, the default context is used. :returns: the result of the execution of the found viewer. """ runProcessBySource(source, 'viewer', context=context, process=process)
#-------------------------------------------------------------------------
[docs]def getDataEditors(source, enableConversion=1, checkUpdate=True, listof=False, process=None, check_values=False): """ Get data editors :py:class:`NewProcess` able to edit a :py:class:`neuroDiskItems.DiskItem` source. Data editors are ordered using the distance of :py:class:`processes.DiskItemConversionInfo` between the :py:class:`neuroDiskItems.DiskItem` source to edit and the :py:class:`neuroDiskItems.DiskItem` source registered for the data editor. The data editors registered with closest sources appear first. Parameters ---------- enableConversion: int if the source can be converted to be edited. Default 1. checkUpdate: boolean if converters and data editors :py:class:`NewProcess` must be reloaded when they changed. Default True. listof: boolean if the viewers :py:class:`NewProcess` must be able to edit list of :py:class:`neuroDiskItems.DiskItem`. Default False. process: None or NewProcess class or instance if specified, specialized data editors having a variable 'allowed_processes' which list this process, will be sorted first check_values: bool if True, check if the 1st parameter of each data editor actually accepts the source value. This is not always true because some filtering may happen using some requiredAttribues. Returns ------- data editors: a list of data editors :py:class:`NewProcess`. """ return getProcessesBySource(source, 'editor', enableConversion=enableConversion, checkUpdate=checkUpdate, listof=listof, process=process, check_values=check_values)
#----------------------------------------------------------------------------
[docs]def getDefaultListOfDataEditor(source, enableConversion=1, checkUpdate=True, process=None): """ Get a default data editor for a list of :py:class:`neuroDiskItems.DiskItem`. :param int enableConversion: if the source can be converted to be edited. Default 1. :param boolean checkUpdate: if converters and data editors :py:class:`NewProcess` must be reloaded when they changed. Default True. :returns: Callable Object that can be used to edit a list of :py:class:`neuroDiskItems.DiskItem`. """ return getDefaultListOfProcesses(source, 'editor', enableConversion=enableConversion, checkUpdate=checkUpdate, process=process)
#----------------------------------------------------------------------------
[docs]def getDataEditor(source, enableConversion=0, checkUpdate=True, listof=False, index=0, process=None, check_values=False): """ Gets a data editor (a process that have the role editor) which can open source data for edition (modification). The data editor is returned only if its userLevel is lower than the current userLevel. Parameters ---------- source: a :py:class:`neuroDiskItems.DiskItem`, a list of :py:class:`neuroDiskItems.DiskItem` (only the first will be taken into account), a tuple (type, format). enableConversion: boolean if True, a data editor that accepts a format in which source can be converted is also accepted. Default False checkUpdate: boolean if True, Brainvisa will check if the editor needs to be reloaded. Default True. listof: boolean If True, we need an editor for a list of data. If there is no specific editor for a list of this type of data, a :py:class:`ListOfIterationProcess` is created from the associated simple editor. Default False. process: None or NewProcess class or instance if specified, specialized viewers having a variable 'allowed_processes' which list this process, will be sorted first check_values: bool if True, check if the 1st parameter of each data editor actually accepts the source value. This is not always true because some filtering may happen using some requiredAttribues. Returns ------- editor: the :py:class:`NewProcess` class associated to the found editor. """ dl = getDataEditors(source, enableConversion=enableConversion, checkUpdate=checkUpdate, listof=listof, process=process, check_values=check_values) # print('===== getDataEditor, data editor list', [d.name for d in dl], # '=====') if len(dl) > index: # print('===== getDataEditor, found data editor ', dl[index].name, # '=====') return dl[index]
#----------------------------------------------------------------------------
[docs]def runDataEditor(source, context=None, process=None): """ Searches for a data editor for source data and runs the process. If data editor fail to edit source, tries to get another. :param source: a :py:class:`neuroDiskItems.DiskItem` or something that enables to find a :py:class:`neuroDiskItems.DiskItem`. :param context: the :py:class:`ExecutionContext`. If None, the default context is used. :returns: the result of the execution of the found data editor. """ runProcessBySource(source, 'editor', context=context, process=process)
#----------------------------------------------------------------------------
[docs]def getImporter(source, checkUpdate=True): """ Gets a importer (a process that have the role importer) which can import data in the database. :param source: a :py:class:`neuroDiskItems.DiskItem` or a tuple (type, format). :param boolean checkUpdate: if True, Brainvisa will check if the process needs to be reloaded. Default True. :returns: the :py:class:`NewProcess` class associated to the found process. """ global _processes if isinstance(source, DiskItem): t0 = source.type f = source.format else: t0, f = source t = t0 v = _importers.get((t, f)) while not v and t: t = t.parent v = _importers.get((t, f)) p = getProcess(v, checkUpdate=checkUpdate) if p: return p return None
#---------------------------------------------------------------------------- _extToModuleDescription = { 'py': ('.py', 'r', imp.PY_SOURCE), 'pyo': ('.py', 'r', imp.PY_COMPILED), 'pyc': ('.py', 'r', imp.PY_COMPILED), 'so': ('.so', 'rb', imp.C_EXTENSION), } #----------------------------------------------------------------------------
[docs]def readProcess(fileName, category=None, ignoreValidation=False, toolbox='brainvisa'): """ Loads a process from its source file. The source file is a python file which defines some variables (signature, name, userLevel) and functions (validation, initialization, execution). The process is indexed in the global lists of processes so it can be retrieved through the functions :py:func:`getProcess`, :py:func:`getProcessInfo`, :py:func:`getViewer`, ... :param string fileName: the name of the file containing the source code of the process. :param string category: category of the process. If None, it is the name of the directory containing the process. :param boolean ignoreValidation: if True, the validation function of the process won't be executed. :param string toolbox: The id of the toolbox containing the process. Defaut is 'brainvisa', it indicates that the process is not in a toolbox. :returns: A :py:class:`NewProcess` class representing the process if no exception is raised during the loading of the process. A new class derived from :py:class:`Process` is defined to store the content of the file: .. py:class:: NewProcess Bases: :py:class:`Process` All the elements defined in the file are added to the class. .. py:attribute:: name Name of the process. If it is not defined in the process file, it is the base name of the file without extension. .. py:attribute:: category The category of the process. If it is not given in parameter, it is the name of the directory containing the process file. .. py:attribute:: dataDirectory The data directory of the process is a directory near the process file with the same name and the extension .data. It is optional. .. py::attribute:: toolbox Name of the toolbox containing the process. .. py:attribute:: processReloadNotifier A :py:class:`soma.notification.Notifier` that will notify its observers when the process is reload. .. py:attribute:: signature The parameters excepted by the process. .. py:attribute:: userLevel Minimum userLevel needed to see the process. .. py:attribute:: roles Roles of the process: viewer, converter, editor, impoter. .. py:method:: execution(self, context) Execution function. """ result = None try: global _processModules, _processes, _processesInfo, _processesInfoByName, _readProcessLog, _askUpdateProcess # If we do not remove user level here, default userLevel for process # will be this one. g = globals() try: del g['userLevel'] except KeyError: pass extPos = fileName.rfind('.') fileExtension = fileName[extPos + 1:] moduleName = os.path.basename(fileName[: extPos]) dataDirectory = fileName[: extPos] + '.data' if not os.path.exists(dataDirectory): dataDirectory = None # Load module moduleDescription = _extToModuleDescription.get(fileExtension) if moduleDescription is None: raise RuntimeError( HTMLMessage(_t_('Cannot load a process from file <em>%s</em>') % (fileName,))) currentDirectory = getcwd() fopts = {} if six.PY2 else {'encoding': 'utf-8'} fileIn = open(fileName, moduleDescription[1], **fopts) try: if dataDirectory: os.chdir(dataDirectory) try: processModule = imp.load_module( moduleName, fileIn, fileName, moduleDescription) except NameError as e: showException(beforeError=(_t_('In <em>%s</em>')) % (fileName, ), afterError=_t_( ' (perharps you need to add the line <tt>"from brainvisa.processes import *"</tt> at the begining of the process)')) return # raise RuntimeError( HTMLMessage( _t_('In <em>%s</em>') % ( # fileName, ) + " <b>"+str(e)+"</b> "+_t_(' (perharps you need # to add the line <tt>"from brainvisa.processes import *"</tt> # at the begining of the process)') )) except Exception as e: # Add context information to the exception message (file name # where the exception occurred) six.reraise(type(e), type(e)(*(('%s: %s' % (fileName, e.args[0]), ) + e.args[1:])), sys.exc_info()[2]) finally: fileIn.close() if dataDirectory: os.chdir(currentDirectory) _processModules[moduleName] = processModule if category is None: category = os.path.basename(os.path.dirname(fileName)) ProcessClass = getattr(processModule, 'base_class', Process) class NewProcess(ProcessClass): _instance = 0 _id = None NewProcess._id = moduleName NewProcess.name = moduleName NewProcess.category = category NewProcess.dataDirectory = dataDirectory NewProcess.toolbox = toolbox # The callback registered in processReloadNotifier are called whenever # a change in the process source file lead to a reload of the process. # The argument is the new process. NewProcess.processReloadNotifier = Notifier(1) # Optional attributes for n in ('signature', 'execution', 'name', 'userLevel', 'roles', 'category', 'showMaximized', 'allowed_processes', 'rolePriority'): v = getattr(processModule, n, None) if v is not None: setattr(NewProcess, n, v) # Other attributes for n, v in list(processModule.__dict__.items()): if g.get(n) is not v: if type(v) is types.FunctionType: code = v.__code__ args = inspect.getargs(code)[0] if args and args[0] == 'self': setattr(NewProcess, n, v) delattr(processModule, n) else: setattr(NewProcess, n, staticmethod(v)) NewProcess._fileName = fileName NewProcess._fileTime = os.path.getmtime(fileName) processInfo = ProcessInfo(id=NewProcess._id, name=NewProcess.name, signature=NewProcess.signature, userLevel=NewProcess.userLevel, category=NewProcess.category, fileName=NewProcess._fileName, roles=getattr(NewProcess, 'roles', ()), toolbox=toolbox, showMaximized=NewProcess.showMaximized ) _processesInfo[processInfo.id.lower()] = processInfo _processesInfoByName[NewProcess.name.lower()] = processInfo NewProcess.module = processInfo.module # Process validation valid = True v = getattr(processModule, 'validation', None) if v is not None: try: v() except Exception as e: if not ignoreValidation: processInfo.valid = False raise ValidationError(HTMLMessage("The process <em>" + relative_path( processInfo.fileName, neuroConfig.toolboxesDir) + "</em> is not available: <b>" + six.text_type(e) + "</b>")) else: valid = False oldProcess = _processes.get(NewProcess._id.lower()) if oldProcess is not None: if fileName != oldProcess._fileName: defaultContext().warning("Two processes have the same id : " + NewProcess._id.lower() + ".", fileName, " process will override ", oldProcess._fileName) NewProcess.toolbox = oldProcess.toolbox processInfo.toolbox = oldProcess.toolbox for n in ('execution', 'initialization', 'checkArguments'): if six.PY2: setattr(oldProcess, n, getattr(NewProcess, n).__func__) else: setattr(oldProcess, n, getattr(NewProcess, n)) oldProcess._fileTime = NewProcess._fileTime if valid or neuroConfig.ignoreValidation: _processes[processInfo.id.lower()] = NewProcess result = NewProcess def warnRole(processInfo, role): print('WARNING: process', processInfo.name, '(' + processInfo.fileName + ') is not a valid', role + '. Add the following line in the process to make it a', role + ':\nroles =', (role, ), file=sys.stderr) roles = getattr(NewProcess, 'roles', ()) def _setConverter(source, dest, proc): global _converters oldc = _converters.get((source, dest)) if oldc: # print('converter conflict for', source, dest) oldproc = _processes.get(oldc.lower()) oldpriority = 0 if oldproc: oldpriority = getattr(oldproc, 'rolePriority', 0) # print('old:', oldproc._id, oldpriority) # else: print('old', oldc, 'does not exist !') newpriority = getattr(proc, 'rolePriority', 0) # print('new:', proc._id, newpriority) if oldpriority > newpriority: return # don't register because priority is not sufficient # Register all source parents types # print('==== registering converter for source', source, 'dest', # dest, 'proc', proc.name) registerConverter(source, dest, proc._id) if 'converter' in roles: possibleConversions = getattr( NewProcess, 'possibleConversions', None) if possibleConversions is None: sourceArg, destArg = list(NewProcess.signature.values())[: 2] for destFormat in destArg.formats: for sourceFormat in sourceArg.formats: _setConverter((sourceArg.type, sourceFormat), (destArg.type, destFormat), NewProcess) else: for source, dest in possibleConversions(): source = (getDiskItemType(source[0]), getFormat(source[1])) dest = (getDiskItemType(dest[0]), getFormat(dest[1])) _setConverter(source, dest, NewProcess) elif NewProcess.category.lower() == 'converters/automatic': warnRole(processInfo, 'converter') if 'viewer' in roles: global _viewers global _listViewers arg = list(NewProcess.signature.values())[0] if isinstance(arg, ListOf): arg = arg.contentType if hasattr(arg, 'formats'): for format in arg.formats: # print('===== registering viewer', NewProcess.name, #'for list of type', arg.type, 'and format', format) _listViewers.setdefault((arg.type, format), ProcessSet( arg.type, format)) \ .add(NewProcess._id) elif hasattr(arg, 'formats'): for format in arg.formats: # print('===== registering viewer', NewProcess.name, #'for type', arg.type, 'and format', format) _viewers.setdefault((arg.type, format), ProcessSet( arg.type, format)) \ .add(NewProcess._id) elif NewProcess.category.lower() == 'viewers/automatic': warnRole(processInfo, 'viewer') if 'editor' in roles: global _dataEditors global _listDataEditors arg = list(NewProcess.signature.values())[0] if isinstance(arg, ListOf): arg = arg.contentType if hasattr(arg, 'formats'): for format in arg.formats: # print('===== registering data editor', NewProcess.name, #'for list of type', arg.type, 'and format', format) _listDataEditors.setdefault((arg.type, format), ProcessSet( arg.type, format)) \ .add(NewProcess._id) elif hasattr(arg, 'formats'): for format in arg.formats: # print('===== registering data editor', NewProcess.name, #'for type', arg.type, 'and format', format) _dataEditors.setdefault((arg.type, format), ProcessSet( arg.type, format)) \ .add(NewProcess._id) elif NewProcess.category.lower() == 'editors/automatic': warnRole(processInfo.fileName, 'editor') if 'importer' in roles: global _importers sourceArg, destArg = list(NewProcess.signature.values())[: 2] if hasattr(destArg, 'formats'): for format in destArg.formats: _importers[(destArg.type, format)] = NewProcess._id if _readProcessLog is not None: _readProcessLog.append(processInfo.id, html='<h1>' + processInfo.id + '</h1>' + processInfo.html(), icon='icon_process.png') if oldProcess is not None: oldProcess.processReloadNotifier.notify(result) except ValidationError: if _readProcessLog is not None: _readProcessLog.append( NewProcess._id, html=exceptionHTML(), icon='warning.png') raise except Exception: if _readProcessLog is not None: _readProcessLog.append( os.path.basename(fileName), html=exceptionHTML(), icon='error.png') raise return result
#----------------------------------------------------------------------------
[docs]def readProcesses(processesPath): """ Read all the processes found in toolboxes and in a list of directories. The toolboxes are found with the function :py:func:`brainvisa.toolboxes.allToolboxes`. A global object representing a tree of processes is created, it is an instance of the :py:class:`ProcessTree` :param list processesPath: list of paths to directories containing processes files. """ # New style processes initialization global _converters global _viewers, _listViewers global _dataEditors, _listDataEditors, _importers global _allProcessesTree _processesInfo = {} _viewers = {} _listViewers = {} _dataEditors = {} _listDataEditors = {} _importers = {} resetConverters() processesCacheFile = os.path.join( neuroConfig.homeBrainVISADir, 'processCache-' + neuroConfig.shortVersion) processesCache = {} if neuroConfig.fastStart and os.path.exists(processesCacheFile): try: with open(processesCacheFile, 'rb') as f: _processesInfo, converters = cPickle.load(f) # change _converters keys to use the same instances as the global # types / formats list for (src, dst), process_id in six.iteritems(converters): src = (getDiskItemType(src[0].name), getFormat(src[1].name)) dst = (getDiskItemType(dst[0].name), getFormat(dst[1].name)) registerConverter(src, dst, process_id) except Exception: _processesInfo, _converters = {}, {} if neuroConfig.mainLog is not None: neuroConfig.mainLog.append('Cannot read processes cache', html=exceptionHTML( beforeError=_t_( 'Cannot read processes cache file <em>%s</em>') % (processesCacheFile, )), icon='warning.png') if neuroConfig.gui or not neuroConfig.fastStart or not _processesInfo: # create all processes tree while reading processes in processesPath _allProcessesTree = ProcessTree( "Various processes", "all processes", editable=False, user=False) for processesDir in processesPath: _allProcessesTree.addDir(processesDir, "", processesCache) for toolbox in brainvisa.toolboxes.allToolboxes(): toolbox.getProcessTree() # save processes cache try: # the file is first written completely to a temporary file name, # then atomically moved to its final destination, to avoid race # conditions where two processes write to the file concurrently or # a process reads from an incomplete file. with tempfile.NamedTemporaryFile(mode='wb', prefix=os.path.basename(processesCacheFile), dir=os.path.dirname(processesCacheFile), delete=False) as f: # Use version 2 of the pickle protocol (more optimized than # version 0, but still compatible with Python 2) cPickle.dump((_processesInfo, _converters), f, 2) if hasattr(os, 'replace'): os.replace(f.name, processesCacheFile) else: try: os.remove(processesCacheFile) except OSError: pass os.rename(f.name, processesCacheFile) except Exception: if neuroConfig.mainLog is not None: neuroConfig.mainLog.append('Cannot write processes cache', html=exceptionHTML( beforeError=_t_( 'Cannot write processes cache file <em>%s</em>') % (processesCacheFile, )), icon='warning.png')
#----------------------------------------------------------------------------
[docs]class ProcessTree(EditableTree): """ Bases: :py:class:`soma.notification.EditableTree` Represents a hierarchy of processes. It is used to represent the processes of a toolbox or a set of personal bookmarks on processes. The tree contains branches: categories or directories, and leaves: processes. This object can be saved in a minf file (in userProcessTree.minf for user bookmarks). """ defaultName = "New" def __init__(self, name=None, id=None, icon=None, tooltip=None, editable=True, user=True, content=[]): """ :param string name: name of the tree :param string id: id of the process. if None, it is set to the name in lowercase. :param string icon: filename of an icon that represents the process tree. :param string tooltip: description associated to the process tree :param boolean editable: if True, the tree can be modified after its creation. :param boolean user: if True, this tree is a custom process tree created by the user (personal bookmarks on processes) :param list content: initial content, list of children to add in the tree. """ if id is None and name is not None: id = name.lower() super(ProcessTree, self).__init__(_t_(name), id, editable, content) self.initName = name self.onAttributeChange("name", self.updateName) self.user = bool(user) if icon is not None: self.icon = icon elif self.user: self.icon = 'folder_home.png' else: self.icon = 'list.png' if tooltip != None: self.tooltip = _t_(tooltip) else: self.tooltip = self.name self.setVisible() # tag the tree as visible or not : it is visible if it # contains at least one visible child def __getinitargs__(self): content = list(self.values()) return (self.initName, self.id, self.icon, self.tooltip, self.modifiable, self.user, content) def __getinitkwargs__(self): """ This object can be saved in a minf file (in userProcessTree.minf for user bookmarks). That's why it defines __getinitkwargs__ method. this method's result is stored in the file and passed to the constructor to restore the object. Some changes to the constructor attributes must be reflected in getinitkwargs method, but changes can affect the reading of existing minf files. """ content = list(self.values()) return ((), {'name': self.initName, 'id': self.id, 'icon': self.icon, 'editable': self.modifiable, 'user': self.user, 'content': content})
[docs] def addDir(self, processesDir, category="", processesCache={}, toolbox='brainvisa'): """ Adds the processes from a directory to the current tree. Subdirectories will become the branches of the tree and processes will become the leaves of the tree. :param string processesDir: directory where processes are recursively searched. :param string category: category prefix for all processes found in this directory (useful for toolboxes : all processes category begins with toolbox's name. :param dictionary processesCache: a dictionary containing previously saved processes info stored by id. Processes that are in this cache are not reread. """ if os.path.isdir(processesDir): stack = [(self, processesDir, category)] while stack: parent, dir, category = stack.pop(0) p = [] try: listdir = os.listdir(dir) except Exception: showException() listdir = [] for f in sorted(listdir): ff = os.path.join(dir, f) if os.path.isdir(ff): if not ff.endswith('.data') and f != '__pycache__': if category: c = category + '/' + f else: c = f b = ProcessTree.Branch( name=f, id=c.lower(), editable=False) parent.add(b) stack.append((b, ff, c)) else: continue elif ff.endswith('.py') or ff.endswith('.so'): p.append((f, ff)) for f, ff in p: if not os.path.exists(ff): continue id = f[:-3] try: processInfo = processesCache.get(id) if processInfo is None: readProcess(ff, category=category, ignoreValidation=neuroConfig.ignoreValidation, toolbox=toolbox) # two arguments : process fullpath and category (directories separated by /) else: addProcessInfo(id, processInfo) except ValidationError: # it may occur a validation error on reading process pass except Exception: showException() processInfo = getProcessInfo(id) if processInfo is not None: l = ProcessTree.Leaf(id=processInfo.id, editable=False) parent.add(l)
[docs] def setEditable(self, bool): """ Makes the tree editable. All its children becomes modifiable and deletable. """ self.modifiable = bool for item in self.values(): item.setAllModificationsEnabled(bool)
[docs] def setName(self, n): """ Renames the tree. The tooltip is also changed it was equal to the name. """ if self.name == self.tooltip: self.tooltip = n EditableTree.setName(self, n)
[docs] def setVisible(self): """ Sets the tree as visible if it is a user tree or if it has at least one visible child. An empty user tree is visible because it can be a newly created user tree and the user may want to fill it later. """ visible = False if self.user: visible = True else: for item in self.values(): if item.visible: visible = True break self.visible = visible
[docs] def update(self): """ Recursively Updates `visible` attribute for each item in the tree. This method must be called when the visibility may have change. For exemple when the userLevel has changed, some process must become visibles. """ visibleChild = False for item in self.values(): item.update(self.user) if item.visible: visibleChild = True self.visible = (self.user or visibleChild)
[docs] def updateName(self): """ When the tree name is changed after construction. The new name must be saved if the tree is saved in minf file. So change the initName. """ self.initName = self.name
#-------------------------------------------------------------------------
[docs] class Branch(EditableTree.Branch): """ Bases: :py:class:`soma.notification.EditableTree.Branch` A directory that contains processes and/or another branches. Enables to organise processes by category. """ _defaultIcon = 'folder.png' defaultName = "New category" def __init__(self, name=None, id=None, editable=True, icon=None, content=[]): if icon is None: icon = self._defaultIcon defaultName = _t_(self.defaultName) if id is None or id == defaultName: if name is not None and name != defaultName: id = name.lower() else: id = None # from brainvisa.toolboxes import getToolbox # toolbox=getToolbox(id) # if toolbox is not None: # name=toolbox.name # even if the tree isn't editable, copy of elements is enabled # (that doesn't change the tree but enable to copy elements in another tree) super(ProcessTree.Branch, self).__init__( _t_(name), id, icon, _t_("category"), True, editable, editable, content) self.initName = name # store the name given in parameters to return in getinitkwargs, so save in minf format will store init name before potential traduction self.onAttributeChange("name", self.updateName) # EditableTree.Branch.__init__(self, unicode(name), unicode(icon), # _t_("category"), True, editable, editable, content) self.setVisible() # set the visibility of the branch relatively to # its content. As the branch can be constructed # with a content (when it is restored from minf # file for example), it is usefull to do so. def __getinitargs__(self): content = list(self.values()) return (self.initName, self.id, self.modifiable, self.icon, content) def __getinitkwargs__(self): content = list(self.values()) return ((), {'name': self.initName, 'id': self.id, 'editable': self.modifiable, 'content': content}) def __reduce__(self): """This method is redefined for enable deepcopy of this object (and potentially pickle). It gives the arguments to pass to the init method of the object when creating a copy """ return (self.__class__, self.__getinitargs__(), None, None, None)
[docs] def setVisible(self): """ Sets the branch as visible if it has no child or if it has at least one visible child. Empty branch is visible because it can be a newly created user branch and the user may want to fill it later. """ visible = False if len(self) == 0: visible = True else: for item in self.values(): if item.visible: visible = True break self.visible = visible
[docs] def update(self, userTree=False): """ Updates recursively visible attribute for each item in the branch. This method must be called when the visibility may have change. For exemple when the userLevel has changed, some processes must become visibles. """ if len(self) == 0: self.visible = True else: visibleChild = False for item in self.values(): item.update(userTree) if item.visible: visibleChild = True self.visible = visibleChild
def updateName(self): self.initName = self.name
#-------------------------------------------------------------------------
[docs] class Leaf(EditableTree.Leaf): """ Bases: :py:class:`soma.notification.EditableTree.Leaf` A ProcessTree.Leaf represents a process. """ def __init__(self, id, name=None, editable=True, icon=None, userLevel=None, *args, **kwargs): processInfo = getProcessInfo(id) pname = name if name is None: pname = id if processInfo is not None: if name is None: pname = processInfo.name if icon is None: # set icon according to process role and user level if 'viewer' in processInfo.roles: icon = 'viewer.png' elif 'editor' in processInfo.roles: icon = 'editor.png' elif 'converter' in processInfo.roles: icon = 'converter.png' else: icon = 'icon_process_' + \ str(min(processInfo.userLevel if userLevel is None else userLevel, 3 ) ) + \ '.png' super(ProcessTree.Leaf, self).__init__( _t_(pname), id, icon, _t_("process"), True, editable, editable) self.initName = name self.userLevel = userLevel self.onAttributeChange("name", self.updateName) self.setVisible(processInfo) if processInfo is not None: self.enabled = processInfo.valid else: self.enabled = True def __getinitargs__(self): return (self.id, self.initName, self.modifiable, self.icon, self.userLevel) def __getinitkwargs__(self): return ((), {'id': self.id, 'name': self.initName, 'editable': self.modifiable, 'userLevel': self.userLevel}) # do not save icon in minf file for processes because it is determined by its role and user level def __reduce__(self): """This method is redefined for enable deepcopy of this object (and potentially pickle). It gives the arguments to pass to the init method of the object when creating a copy """ return (self.__class__, self.__getinitargs__(), None, None, None)
[docs] def setVisible(self, processInfo): """ A ProcessTree.Leaf is valid if the id references a process in _processesInfo and if the process' userLevel is lower or equal than global userLevel and the related process is valid (validation function succeeded). """ visible = False if processInfo is not None: if (self.userLevel is not None and (self.userLevel <= neuroConfig.userLevel)) \ or (self.userLevel is None and (processInfo.userLevel <= neuroConfig.userLevel)): visible = True self.visible = visible
[docs] def update(self, userTree=False): """ Called when the parent tree is updated because some visibility conditions have changed. Evaluates the visibility of the reprensented process. """ processInfo = getProcessInfo(self.id) self.setVisible(processInfo)
def updateName(self): self.initName = self.name
#----------------------------------------------------------------------------
[docs]class ProcessTrees(ObservableAttributes, ObservableSortedDictionary): """ Model for the list of process trees in brainvisa. A process tree is an instance of the class :py:class:`ProcessTree`. It is a dictionary which maps each tree with its id. It contains several process trees : * default process tree : all processes in brainvisa/processes (that are not in a toolbox). Not modifiable by user. * toolboxes : processes grouped by theme. Not modifiable by user. * user process trees (personal bookmarks): lists created by the user and saved in a minf file. A tree can be set as default. It becomes the current tree at Brainvisa start. .. py:attribute:: name Name of the object. .. py:attribute:: userProcessTreeMinfFile Path to the file which stores the process trees created by the user as bookmarks. Default filename is in brainvisa home directory and is called `userProcessTrees.minf`. .. py:attribute:: selectedTree :py:class:`ProcessTree` that is the current tree when Brainvisa starts. """ def __init__(self, name=None): """ :param string name: Name of the object. Default is 'Toolboxes'. """ if name is None: name = _t_('Toolboxes') # set the selected tree super(ProcessTrees, self).__init__() self.name = name self.userProcessTreeMinfFile = os.path.join( neuroConfig.homeBrainVISADir, 'userProcessTrees.minf') self.selectedTree = None self.load()
[docs] def add(self, processTree): """ Add an item in the dictionary. If this item's id is already present in the dictionary as a key, add the item's content in the corresponding key. recursive method """ key = processTree.id if key in self: for v in processTree.values(): # item is also a dictionary and contains several elements, add each value in the tree item self[key].add(v) else: # new item self[key] = processTree
[docs] def load(self): """ Loads process trees : - a tree containing all processes that are not in toolboxes: the function :py:func:`allProcessesTree` returns it; - toolboxes as new trees; - user trees that are saved in a minf file in user's .brainvisa directory. """ allTree = allProcessesTree() self.add(allTree) # append toolboxes process trees from brainvisa.toolboxes import allToolboxes for toolbox in allToolboxes(): # add each toolbox's process tree self.add(toolbox.getProcessTree()) # no longer add toolboxes in allProcessesTree, it was redundant # and add the toolbox as a branch in all processes tree # allTree.add(ProcessTree.Branch(toolbox.processTree.name, # toolbox.processTree.id, False, toolbox.processTree.icon, # toolbox.processTree.values())) for toolbox in allToolboxes(): # when all toolbox are created, add links from each toolbox to others for processTree in toolbox.links(): self.add(processTree) # if a toolbox with the same id already exists, it doesn't create a new tree but update the existing one # report the links in the toolbox that are in all processes tree # if processTree.id != allTree.id: # except if the links points directly to all processes tree, in that case, there's nothing else to do # allTree.add(ProcessTree.Branch(processTree.name, processTree.id, False, processTree.icon, processTree.values())) # sort processes in alphabetical order in toolboxes and in all # processes tree for toolbox in allToolboxes(): toolbox.processTree.sort() allTree.sort() # append other trees here if necessary # .... # load user trees from minf file userTrees = None currentTree = None if os.access(self.userProcessTreeMinfFile, os.F_OK): # if the file exists, read it try: format, reduction = minfFormat(self.userProcessTreeMinfFile) if (format == "XML") and (reduction == "brainvisa-tree_2.0"): userTrees, currentTree = iterateMinf( self.userProcessTreeMinfFile) except Exception: print("Error while reading", self.userProcessTreeMinfFile) if userTrees != None: for userTree in userTrees: self.add(userTree) # search selected tree. if currentTree is not None: # The id of the selected tree is stored in the minf file. But # before, the name was stored, so if the value is not a key, search # by names if currentTree in self: self.selectedTree = self[currentTree] else: for tree in self.values(): if tree.name == currentTree: self.selectedTree = tree break else: self.selectedTree = None # update items visibility it depends on processes user level : must # update to invisible branches that only contain invisible items self.update()
[docs] def save(self): """ Write trees created by the user in a minf file to restore them next time Brainvisa starts. """ writer = createMinfWriter( self.userProcessTreeMinfFile, reducer='brainvisa-tree_2.0') # save trees created by user writer.write([i for i in self.values() if i.user]) # save selected tree name if self.selectedTree is not None: writer.write(self.selectedTree.id) else: writer.write(None) writer.close()
[docs] def update(self): """ Updates all trees (evaluates visibility of each items). """ for item in self.values(): item.update()
#---------------------------------------------------------------------------- _mainProcessTree = None
[docs]def updatedMainProcessTree(): """ :rtype: :py:class:`ProcessTrees` :returns: Brainvisa list of process trees : all processes tree, toolboxes, user trees """ global _mainProcessTree if _mainProcessTree is None: _mainProcessTree = ProcessTrees() return _mainProcessTree
#----------------------------------------------------------------------------
[docs]def allProcessesTree(): """ Get the tree that contains all processes. It is created when processes in processesPath are first read. Toolboxes processes are also added in this tree. :rtype: :py:class:`ProcessTrees` :return: the tree that contains all processes. """ global _allProcessesTree return _allProcessesTree
#----------------------------------------------------------------------------
[docs]def updateProcesses(): """ Called when option userLevel has changed. Associated widgets will be updated automatically because they listens for changes. """ if _mainProcessTree is not None: _mainProcessTree.update()
#----------------------------------------------------------------------------
[docs]def mainThread(): """ Gets Brainvisa main thread. """ return _mainThread
#----------------------------------------------------------------------------
[docs]def defaultContext(): """ Gets the default execution context. :rtype: :py:class:`ExecutionContext` :return: The default execution context associated to Brainvisa application. """ return _defaultContext
#----------------------------------------------------------------------------
[docs]def initializeProcesses(): """ Intializes the global variables of the module. The current thread is stored as the main thread. A default execution context is created. """ # TODO: A class would be more clean instead of all these global variables global _processModules, _processes, _processesInfo, _processesInfoByName, \ _viewers, _listViewers, _mainThread, _defaultContext, _dataEditors, \ _listDataEditors, _importers, _askUpdateProcess, _readProcessLog _mainThread = threading.currentThread() _processesInfo = {} _processesInfoByName = {} _processes = {} _processModules = {} _askUpdateProcess = {} _viewers = {} _listViewers = {} _dataEditors = {} _listDataEditors = {} _importers = {} _defaultContext = ExecutionContext() resetConverters() if neuroConfig.mainLog is not None: _readProcessLog = neuroConfig.brainvisaSessionLog.subLog() neuroConfig.brainvisaSessionLog.append(_t_('Read processes'), html='<em>processesPath</em> = ' + str(neuroConfig.processesPath), children=_readProcessLog, icon='icon_process.png') else: _readProcessLog = None
#----------------------------------------------------------------------------
[docs]def cleanupProcesses(): """ Callback associated to the application exit. The global variables are cleaned. """ global _processModules, _processes, _processesInfo, _processesInfoByName, \ _viewers, _listViewers, _mainThread, _defaultContext, _dataEditors, \ _listDataEditors, _importers, _askUpdateProcess, _readProcessLog resetConverters() _viewers = {} _listViewers = {} _dataEditors = {} _listDataEditors = {} _importers = {} _processesInfo = {} _processesInfoByName = {} _processes = {} _processModules = {} _askUpdateProcess = {} _mainThread = None _defaultContext = None if _readProcessLog is not None: _readProcessLog.close() _readProcessLog = None
#----------------------------------------------------------------------------
[docs]def reloadToolboxes(): """ Reloads toolboxes, processes, types, ontology rules, databases. Useful to take into account new files without having to quit and start again Brainvisa. """ from brainvisa.data import neuroHierarchy global _mainProcessTree # init typesPath and fileSystemOntologiesPath neuroConfig.initializeOntologyPaths() # read toolboxes directories: useful if there are new toolbox, process, # types, or hierarchy files brainvisa.toolboxes.readToolboxes( neuroConfig.toolboxesDir, neuroConfig.homeBrainVISADir) # execute intialization files of toolboxes for toolbox in brainvisa.toolboxes.allToolboxes(): toolbox.init() # reload lists of types and formats neuroDiskItems.reloadTypes() # reload processes readProcesses(neuroConfig.processesPath) # update the list of processes _mainProcessTree = None updatedMainProcessTree() # update databases and ontology rules fileSystemOntology.FileSystemOntology.clear() neuroHierarchy.initializeDatabases() neuroHierarchy.openDatabases() neuroDiskItems.pruneObsoleteTypes()
#---------------------------------------------------------------------------- # helper function for ipython, def _my_ioloop_start(self): # this is a hacked version of tornado.ioloop.PollIOLoop.start() # with smaller timeout (2 instead of 3600 s) # and breaks when idle with this max timeout from zmq.eventloop import ioloop from zmq.eventloop.ioloop import IOLoop import signal import six.moves._thread import logging import heapq if not logging.getLogger().handlers: # The IOLoop catches and logs exceptions, so it's # important that log output be visible. However, python's # default behavior for non-root loggers (prior to python # 3.2) is to print an unhelpful "no handlers could be # found" message rather than the actual log entry, so we # must explicitly configure logging if we've made it this # far without anything. logging.basicConfig() if self._stopped: self._stopped = False return old_current = getattr(IOLoop._current, "instance", None) IOLoop._current.instance = self self._thread_ident = six.moves._thread.get_ident() self._running = True # signal.set_wakeup_fd closes a race condition in event loops: # a signal may arrive at the beginning of select/poll/etc # before it goes into its interruptible sleep, so the signal # will be consumed without waking the select. The solution is # for the (C, synchronous) signal handler to write to a pipe, # which will then be seen by select. # # In python's signal handling semantics, this only matters on the # main thread (fortunately, set_wakeup_fd only works on the main # thread and will raise a ValueError otherwise). # # If someone has already set a wakeup fd, we don't want to # disturb it. This is an issue for twisted, which does its # SIGCHILD processing in response to its own wakeup fd being # written to. As long as the wakeup fd is registered on the IOLoop, # the loop will still wake up and everything should work. old_wakeup_fd = None if os.name == 'posix': # Requires Unix. set_wakeup_fd exists but crashes # the python process on windows. try: old_wakeup_fd = signal.set_wakeup_fd(self._waker.write_fileno()) if old_wakeup_fd != -1: # Already set, restore previous value. This is a little racy, # but there's no clean get_wakeup_fd and in real use the # IOLoop is just started once at the # beginning. signal.set_wakeup_fd(old_wakeup_fd) old_wakeup_fd = None except ValueError: # non-main thread pass iter = 0 while True: do_break = True poll_timeout = 2.0 # Prevent IO event starvation by delaying new callbacks # to the next iteration of the event loop. with self._callback_lock: callbacks = self._callbacks self._callbacks = [] for callback in callbacks: do_berak = False self._run_callback(callback) if self._timeouts: now = self.time() while self._timeouts: if self._timeouts[0].callback is None: # the timeout was cancelled do_break = False heapq.heappop(self._timeouts) self._cancellations -= 1 elif self._timeouts[0].deadline <= now: do_break = False timeout = heapq.heappop(self._timeouts) self._run_callback(timeout.callback) else: seconds = self._timeouts[0].deadline - now poll_timeout = min(seconds, poll_timeout) if poll_timeout < 2.: do_break = False break if (self._cancellations > 512 and self._cancellations > (len(self._timeouts) >> 1)): # Clean up the timeout queue when it gets large and it's # more than half cancellations. self._cancellations = 0 self._timeouts = [x for x in self._timeouts if x.callback is not None] heapq.heapify(self._timeouts) if self._callbacks: do_break = False # If any callbacks or timeouts called add_callback, # we don't want to wait in poll() before we run them. poll_timeout = 0.0 if not self._running: break if self._blocking_signal_threshold is not None: # clear alarm so it doesn't fire while poll is waiting for # events. signal.setitimer(signal.ITIMER_REAL, 0, 0) if do_break: break # END LOOP NOW. try: event_pairs = self._impl.poll(poll_timeout) except Exception as e: # Depending on python version and IOLoop implementation, # different exception types may be thrown and there are # two ways EINTR might be signaled: # * e.errno == errno.EINTR # * e.args is like (errno.EINTR, 'Interrupted system call') if (getattr(e, 'errno', None) == errno.EINTR or (isinstance(getattr(e, 'args', None), tuple) and len(e.args) == 2 and e.args[0] == errno.EINTR)): continue else: raise if self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, self._blocking_signal_threshold, 0) # Pop one fd at a time from the set of pending fds and run # its handler. Since that handler may perform actions on # other file descriptors, there may be reentrant calls to # this IOLoop that update self._events self._events.update(event_pairs) while self._events: fd, events = self._events.popitem() try: self._handlers[fd](fd, events) except (OSError, IOError) as e: if e.args[0] == errno.EPIPE: # Happens when the client closes the connection pass else: app_log.error("Exception in I/O handler for fd %s", fd, exc_info=True) except Exception: app_log.error("Exception in I/O handler for fd %s", fd, exc_info=True) # reset the stopped flag so another start/stop pair can be issued self._stopped = False if self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, 0, 0) IOLoop._current.instance = old_current if old_wakeup_fd is not None: signal.set_wakeup_fd(old_wakeup_fd) #---------------------------------------------------------------------------- # tool function to open/get a IPython kernel _ipsubprocs = [] def runIPConsoleKernel(mode='qtconsole'): import IPython from IPython.lib import guisupport qtapp = QApplication.instance() qtapp._in_event_loop = True guisupport.in_event_loop = True ipversion = [int(x) for x in IPython.__version__.split('.')] if False: # ipversion >= [3, 0]: # embedded ipython engine + qt loop in the same process. # works for ipython >= 3 but forbids connection from outside # so it is not so interesting after all. os.environ['QT_API'] = 'pyqt' from IPython.qt.inprocess import QtInProcessKernelManager kernel_manager = QtInProcessKernelManager() kernel_manager.start_kernel() kernel = kernel_manager.kernel kernel.gui = 'qt4' # kernel.shell.push({'foo': 43, 'print_process_id': print_process_id}) kernel_client = kernel_manager.client() kernel_client.start_channels() def stop(): kernel_client.stop_channels() kernel_manager.shutdown_kernel() from IPython.qt.console.rich_ipython_widget import RichIPythonWidget control = RichIPythonWidget() control.kernel_manager = kernel_manager control.kernel_client = kernel_client control.exit_requested.connect(stop) control.show() return None elif ipversion >= [1, 0]: if ipversion >= [4, 0]: from ipykernel.kernelapp import IPKernelApp else: from IPython.kernel.zmq.kernelapp import IPKernelApp app = IPKernelApp.instance() if not app.initialized() or not app.kernel: print('runing IP console kernel') app.hb_port = 50042 # don't know why this is not set automatically app.initialize([mode, '--gui=qt', # '--pylab=qt', "--KernelApp.parent_appname='ipython-%s'" % mode]) # in ipython >= 1.2, app.start() blocks until a ctrl-c is issued in # the terminal. Seems to block in tornado.ioloop.PollIOLoop.start() # # So, don't call app.start because it would begin a zmq/tornado loop # instead we must just initialize its callback. #if app.poller is not None: #app.poller.start() app.kernel.start() from zmq.eventloop import ioloop if ipversion >= [2, 0]: # IP 2 allows just calling the current callbacks. # For IP 1 it is not sufficient. import tornado if tornado.version_info >= (4, 5): # tornado 5 is using a decque for _callbacks, not a # list + explicit locking def my_start_ioloop_callbacks(self): ncallbacks = len(self._callbacks) for i in range(ncallbacks): self._run_callback(self._callbacks.popleft()) else: def my_start_ioloop_callbacks(self): with self._callback_lock: callbacks = self._callbacks self._callbacks = [] for callback in callbacks: self._run_callback(callback) my_start_ioloop_callbacks(ioloop.IOLoop.instance()) else: # For IP 1, use the hacked copy of the start() method try: _my_ioloop_start(ioloop.IOLoop.instance()) except KeyboardInterrupt: pass return app else: # ipython 0.x API from IPython.zmq.ipkernel import IPKernelApp app = IPKernelApp.instance() if not app.initialized() or not app.kernel: print('runing IP console kernel') app.hb_port = 50042 # don't know why this is not set automatically app.initialize(['qtconsole', '--pylab=qt', "--KernelApp.parent_appname='ipython-qtconsole'"]) app.start() return app #---------------------------------------------------------------------------- # (YB) Experimental class. It allows use of iterations inside pipelines. class IterationExecutionNode(ParallelExecutionNode): class IterationParameterized(Parameterized): signature = Signature() base = None def getNewSignature(self, base): process = getProcessInstance(base) sig = [] for (name, value) in process.signature.items(): sig += [name, ListOf(value)] signature = Signature