# -*- coding: utf-8 -*-
# This software and supporting documentation are distributed by
# Institut Federatif de Recherche 49
# CEA/NeuroSpin, Batiment 145,
# 91191 Gif-sur-Yvette cedex
# France
#
# This software is governed by the CeCILL license version 2 under
# French law and abiding by the rules of distribution of free software.
# You can use, modify and/or redistribute the software under the
# terms of the CeCILL license version 2 as circulated by CEA, CNRS
# and INRIA at the following URL "http://www.cecill.info".
#
# As a counterpart to the access to the source code and rights to copy,
# modify and redistribute granted by the license, users are provided only
# with a limited warranty and the software's author, the holder of the
# economic rights, and the successive licensors have only limited
# liability.
#
# In this respect, the user's attention is drawn to the risks associated
# with loading, using, modifying and/or developing or reproducing the
# software by the user in light of its specific status of free software,
# that may mean that it is complicated to manipulate, and that also
# therefore means that it is reserved for developers and experienced
# professionals having in-depth computer knowledge. Users are therefore
# encouraged to load and test the software's suitability as regards their
# requirements in conditions enabling the security of their systems and/or
# data to be ensured and, more generally, to use and operate it in the
# same conditions as regards security.
#
# The fact that you are presently reading this means that you have had
# knowledge of the CeCILL license version 2 and that you accept its terms.
# from brainvisa.data.neuroHierarchy import databases
"""
This module contains classes defining data items that are called **diskItems** in Brainvisa.
These diskItems are associated to files that store data on a filesystem,
and attributes that are used to index the data in a database.
The main class in this module is :py:class:`DiskItem`. This class is derived into two sub-classes :py:class:`File` for the diskitems that are stored as files and :py:class:`Directory` for the diskitems that represent directories.
A diskItem has a **type** indicating what the data represents (a Volume, a T1 MRI, fMRI data...) and a **format** indicating the file format used to write the data in files.
The class :py:class:`DiskItemType` represents data types.
The class :py:class:`Format` represents file formats.
Two general formats are defined in this module as global variables:
.. py:data:: directoryFormat
This format matches any directory.
.. py:data:: fileFormat
This format matches any file.
The available types and formats in Brainvisa ontology are defined in each toolbox in python files under the ``types`` directory.
They are loaded at Brainvisa startup using the function :py:func:`readTypes` and stored in global maps:
.. py:data:: formats
A global map which associates each format id to the matching object :py:class:`Format`.
.. py:data:: formatLists
A global map which associates each list of formats id to the matching object :py:class:`NamedFormatList`.
.. py:data:: diskItemTypes
A global map which associates each type id to the matching object :py:class:`DiskItemType`
.. py:data:: mef
A global instance of the class :py:class:`TypesMEF` which is used to read the Brainvisa ontology types files.
The following function are available to get a format or a type object from its id:
* :py:func:`getFormat`
* :py:func:`getFormats`
* :py:func:`getAllFormats`
* :py:func:`getDiskItemType`
* :py:func:`getAllDiskItemTypes`
* :py:func:`isSameDiskItemType`
This module also defines classes for temporary files and directories: :py:class:`TemporaryDiskItem`, :py:class:`TemporaryDirectory`.
The function :py:func:`getTemporary` enables to create a new temporary item.
All temporary files and directories are written in Brainvisa global temporary directory which can be choosen in Brainvisa options. The diskitem corresponding to this global directory is stored in a global variable:
.. py:data:: globalTmpDir
It is the default parent diskItem for temporary diskitems.
:Inheritance diagram:
.. inheritance-diagram:: brainvisa.data.neuroDiskItems
:Classes:
"""
from __future__ import print_function
from __future__ import absolute_import
import types
import sys
import os
import errno
import stat
import operator
import time
import traceback
from weakref import ref, WeakValueDictionary
from threading import RLock
import json
from soma.html import htmlEscape
from soma.undefined import Undefined
from soma.uuid import Uuid
from soma.path import parse_query_string, update_query_string, \
remove_query_string, split_path
from soma.minf.api import readMinf, MinfError
from soma.wip.application.api import Application
from soma.sqlite_tools import sqlite3, ThreadSafeSQLiteConnection
from brainvisa.configuration import neuroConfig
from brainvisa.processing.neuroException import HTMLMessage, showException
from brainvisa.data import temporary
from brainvisa.data.patterns import DictPattern
from brainvisa import shelltools
from brainvisa.multipleExecfile import MultipleExecfile
from soma.qt_gui.qt_backend.QtCore import QObject, Signal
import re
import six
from six.moves import UserList, range, zip
if not six.PY2:
long = int
#----------------------------------------------------------------------------
[docs]def sameContent(a, b, as_dict=False):
"""
Checks if *a* and *b* have the same content.
if as_dict it True, b may be a dict when a is an instance: b is an
instance.__dict__ for example.
If the two objects are lists, the function is called on each element of the
list.
If the first object has a method named *sameContent*, it is called.
Else, the result of a comparison with *==* operator is returned.
"""
result = 0
if as_dict or type(a) is type(b):
if type(a) in (list, tuple):
result = 1
i = 0
for x in a:
if not sameContent(x, b[i]):
result = 0
break
i += 1
elif hasattr(a, 'sameContent'):
result = a.sameContent(b, as_dict=as_dict)
else:
if as_dict and type(b) is dict and type(a) is not dict:
result = a.__dict__ == b
else:
result = a == b
return result
#----------------------------------------------------------------------------
[docs]def modificationHashOrEmpty(f):
"""
Returns a tuple containing information about the file from :py:func:`os.lstat`.
Returns an empty tuple if an exception occurs.
"""
try:
s = os.lstat(f)
return (s.st_mode, s.st_uid, s.st_gid, s.st_size, s.st_mtime, s.st_ctime)
except OSError:
return ()
#----------------------------------------------------------------------------
[docs]class DiskItem(QObject):
"""
This class represents data stored in one or several files on a filesystem.
It can have additional information stored in attributes and may be indexed in a Brainvisa database.
A diskItem can have hierarchy attributes (comes from Brainvisa ontology),
minf attributes (that are stored in a .minf file), and possibly other attributes.
Several methods enable to access the values of the attributes: :py:meth:`getHierarchy`, :py:meth:`getNonHierarchy`, :py:meth:`attributes`, :py:meth:`globalAttributes`, :py:meth:`hierarchyAttributes`, :py:meth:`localAttributes`, :py:meth:`minf`.
The attributes of a diskItem can also be requested using the dictionary notation ``d["attribute_name"]``, which calls :py:meth:`get` method.
A diskItem can be identified by a unique identifier called *uuid*. This uuid is stored in minf attributes and it is an instance of :py:class:`soma.uuid.Uuid` class.
Several methods enable to request the name of the files associated to the diskItem: :py:meth:`fullPath`, :py:meth:`fullName`, :py:meth:`fullPaths`, :py:meth:`fullPathSerie`, :py:meth:`fullPathsSerie`.
:Attributes:
.. py:attribute:: name
name of the diskItem, generally the filename of the first file.
.. py:attribute:: parent
a parent diskItem, generally the diskItem associated to the directory that contains the data files of this diskItem.
.. py:attribute:: type
Data type of the diskItem, indicating the meaning of the data. It is an instance of :py:class:`DiskItemType`.
.. py:attribute:: format
DiskItem file format, indicated by the files extensions. It is an instance of :py:class:`Format`.
:Methods:
"""
lockChanged = Signal(bool)
_minfLock = RLock()
def __init__(self, name, parent):
super(DiskItem, self).__init__()
self.name = remove_query_string(name)
if name and name[-5:] != '.minf':
self._files = [remove_query_string(name)]
else:
self._files = []
self.parent = parent
if self.parent is None:
self._topParentRef = ref(self)
else:
self._topParentRef = self.parent._topParentRef
self._localAttributes = {}
self._globalAttributes = {}
self._minfAttributes = {}
self._otherAttributes = {}
self._queryStringAttributes = parse_query_string(name) if name else {}
self.type = None
self.format = None
self._setLocal('name_serie', [])
self._isTemporary = 0
self._uuid = None
self._write = False
self._identified = False
self._lock = RLock()
def __getstate__(self):
state = {
'name': self.name,
'_files': self._files,
'parent': self.parent,
'_localAttributes': self._localAttributes,
'_globalAttributes': self._globalAttributes,
'_minfAttributes': self._minfAttributes,
'_otherAttributes': self._otherAttributes,
'_queryStringAttributes': self._queryStringAttributes,
'_uuid': self._uuid,
'_write': self._write,
'_identified': self._identified,
}
if self.type:
state['type'] = self.type.id
else:
state['type'] = None
if self.format:
state['format'] = self.format.id
else:
state['format'] = None
priority = getattr(self, '_priority', None)
if priority is not None:
state['_priority'] = priority
return state
def __setstate__(self, state):
self.name = state['name']
self._files = state['_files']
self.parent = state['parent']
if self.parent is None:
self._topParentRef = ref(self)
t = state['type']
if t:
self.type = getDiskItemType(t)
else:
self.type = None
t = state['format']
if t:
self.format = getFormat(t)
else:
self.format = None
self._localAttributes = state['_localAttributes']
self._globalAttributes = state['_globalAttributes']
self._minfAttributes = state['_minfAttributes']
self._otherAttributes = state['_otherAttributes']
self._queryStringAttributes = state['_queryStringAttributes']
self._isTemporary = 0
priority = state.get('_priority')
if priority is not None:
self._priority = priority
self._changeUuid(state.get('_uuid'))
self._write = state['_write']
self._identified = state['_identified']
if not hasattr(self, '_lock'):
self._lock = RLock()
def __eq__(self, other):
if isinstance(other, six.string_types):
return other in self.fullPaths()
return self is other or (isinstance(other, DiskItem) and self.fullPath() == other.fullPath())
def __hash__(self):
return hash(self.fullPath())
def __ne__(self, other):
if isinstance(other, six.string_types):
return other not in self.fullPaths()
return self is not other and ((not isinstance(other, DiskItem)) or self.fullPath() != other.fullPath())
def __lt__(self, other):
if isinstance(other, six.string_types):
return self.fullPath() < other
return self is not other and isinstance(other, DiskItem) \
and self.fullPath() < other.fullPath()
[docs] def clone(self):
"""
Returns a deep copy of the current diskItem object. The associated files are not copied.
"""
result = self.__class__(self.name, self.parent)
result.__setstate__(self.__getstate__())
# Copy attributes so that they can be modified without
# changing cloned item attributes
self.copyAttributes(result)
return result
def _topParent(self):
try:
return self._topParentRef()
except AttributeError:
pass
if self.parent:
p = self.parent
while p.parent is not None:
p = p.parent
self._topParentRef = p._topParentRef
else:
self._topParentRef = ref(self)
return self._topParentRef()
[docs] def attributes(self):
"""
Returns a dictionary containing the name and value of all the attributes associated to this diskItem.
"""
result = {}
self._mergeAttributes(result)
return result
[docs] def globalAttributes(self):
"""
Returns a dictionary containing the name and value of the global attributes.
They may come from parent diskItem and are transmitted to child diskItem.
The global attributes are the attributes coming from data ontoloy.
"""
result = {}
self._mergeGlobalAttributes(result)
return result
[docs] def localAttributes(self):
"""
Returns a dictionary containing the name and value of the local attributes.
The local attributes are valid only for this diskItem, they are no transmitted to child diskItem.
For example the attribute ``name_serie`` that indicates the list of numbers of a serie of files that store the data, is a local attribute.
"""
result = {}
self._mergeLocalAttributes(result)
return result
[docs] def copyAttributes(self, other):
"""
Copy all the attributes of the given diskItem in the current diskItem.
:param other: the :py:class:`DiskItem` whose attributes will be copied.
"""
self._localAttributes = other._localAttributes.copy()
self._globalAttributes = other._globalAttributes.copy()
self._minfAttributes = other._minfAttributes.copy()
self._otherAttributes = other._otherAttributes.copy()
self._queryStringAttributes = other._queryStringAttributes.copy()
def _mergeAttributes(self, result):
"""
Updates result dictionary with all attributes of the current diskItem and its parent global attributes.
"""
result.update(self._globalAttributes)
if self.parent:
self.parent._mergeAttributes(result)
result.update(self._localAttributes)
result.update(self._otherAttributes)
result.update(self._minfAttributes)
result.update(self._queryStringAttributes)
def _mergeGlobalAttributes(self, result):
"""
Updates result dictionary with all global attributes of the current diskItem and its parent global attributes.
"""
result.update(self._globalAttributes)
if self.parent:
self.parent._mergeGlobalAttributes(result)
def _mergeLocalAttributes(self, result):
if self.parent:
self.parent._mergeLocalAttributes(result)
result.update(self._localAttributes)
result.update(self._otherAttributes)
result.update(self._minfAttributes)
result.update(self._queryStringAttributes)
def _mergeHierarchyAttributes(self, result):
if self.parent:
self.parent._mergeHierarchyAttributes(result)
result.update(self._localAttributes)
result.update(self._globalAttributes)
def _mergeNonHierarchyAttributes(self, result):
if self.parent:
self.parent._mergeNonHierarchyAttributes(result)
result.update(self._otherAttributes)
result.update(self._minfAttributes)
result.update(self._queryStringAttributes)
[docs] def fileName(self, index=0, withQueryString=True):
"""
Returns the filename of the file number index in the list of files associated to this diskItem.
The absolute files paths are generally stored in diskItems. So this function is equivalent to :py:meth:`fullPath`.
"""
name_serie = self.get('name_serie')
if name_serie:
fname = self.fileNameSerie(index / len(self._files),
index % len(self._files))
if self._files:
fname = self._files[index]
else:
fname = self.name
if withQueryString:
return update_query_string(fname, self._queryStringAttributes)
else:
return fname
[docs] def fileNames(self, withQueryString=True):
"""
Returns the list of filenames of the files associated to this diskItem.
The absolute files paths are generally stored in diskItems. So this function is equivalent to :py:meth:`fullPaths`.
"""
name_serie = self.get('name_serie')
if name_serie:
result = []
for number in name_serie:
result += list(map(
lambda x, number=number: expand_name_serie(x, number),
self._files))
elif self._files:
result = self._files
else:
result = [self.name]
return [update_query_string(r, self._queryStringAttributes) \
if withQueryString else r for r in result]
[docs] def fileNameSerie(self, serie, index=0):
"""
This function can be used only on diskItems that are a serie of data. For example a serie of 3D volumes, each volume being stored in two files .ima and .dim (GIS format).
Returns the name of the index number file of the serie number item of the serie.
The absolute files paths are generally stored in diskItems. So this function is equivalent to :py:meth:`fullPathSerie`.
:param int serie: index of the item in the serie.
:param int index: index of the file in one item of the serie.
"""
name_serie = self.get('name_serie')
if name_serie:
return expand_name_serie(self._files[index],
name_serie[serie])
raise RuntimeError(
HTMLMessage(_t_('<em>%s</em> is not a file series')))
[docs] def fileNamesSerie(self, serie):
"""
Returns all the files of one item of the serie.
The absolute files paths are generally stored in diskItems. So this function is equivalent to :py:meth:`fullPathsSerie`.
:param int serie: index of the item in the serie.
"""
name_serie = self.get('name_serie')
if name_serie:
return list(map(
lambda x, number=name_serie[
serie]: expand_name_serie(x, number),
self._files))
raise RuntimeError(
HTMLMessage(_t_('<em>%s</em> is not a file series')))
[docs] def fullName(self):
"""
Returns the absolute path to the diskItem with its name (without extension).
"""
if self.parent is None:
return self.name
else:
return os.path.join(self.parent.fullName(), self.name)
[docs] def relativePath(self, index=0):
"""
Gets the file path of this diskitem, relatively to the path its database directory.
If there is no database information in the attributes it returns the full path of the diskItem.
"""
database = self.get("database")
if database is None:
database = self.get("_database")
path = self.fullPath(index)
if database and path.startswith(database):
path = path[len(database) + 1:]
return path
[docs] def fullPath(self, index=0, withQueryString=True):
"""
Returns the absolute file name of the index number file of the diskItem.
"""
if self.parent is None:
fp = self.fileName(index, withQueryString=withQueryString)
else:
fp = os.path.join(self.parent.fullPath(withQueryString = False),
self.fileName(index,
withQueryString=withQueryString))
return fp
[docs] def fullPaths(self, withQueryString=True):
"""
Returns the absolute file names of the all the files of the diskItem.
"""
if self.parent is None:
return self.fileNames(withQueryString=withQueryString)
else:
return list(map(
lambda x, p=self.parent.fullPath(withQueryString = False): \
os.path.join(p, x), \
self.fileNames(withQueryString=withQueryString)))
[docs] def existingFiles(self):
"""
Returns all files associated to this diskitem, that really exist plus its minf file if it exists.
"""
files=[f for f in self.fullPaths(withQueryString=False) if os.path.exists(f)]
minfFile = self.minfFileName()
if minfFile != self.fullPath() and os.path.exists(minfFile):
files.append(minfFile)
return files
[docs] def fullPathSerie(self, serie, index=0):
"""
This function can be used only on a diskItem that is a serie of data. For example a serie of 3D volumes, each volume being stored in two files *.ima* and *.dim* (*GIS format*).
Returns the absolute file name of the index number file of the serie number item of the serie.
:param int serie: index of the item in the serie.
:param int index: index of the file in one item of the serie.
"""
if self.parent is None:
return self.fileNameSerie(serie, index)
else:
return os.path.join(self.parent.fullPath(), self.fileNameSerie(serie, index))
[docs] def fullPathsSerie(self, serie):
"""
This function can be used only on a diskItem that is a serie of data. For example a serie of 3D volumes, each volume being stored in two files *.ima* and *.dim* (*GIS format*).
Returns all the absolute file names of the serie number item of the serie.
:param int serie: index of the item in the serie.
"""
if self.parent is None:
return self.fileNamesSerie(serie)
else:
return list(map(
lambda x, p=self.parent.fullPath(): os.path.join(p, x),
self.fileNamesSerie(serie)))
[docs] def firstFullPathsOfEachSeries(self):
"""
Returns the first file name of each item of the serie.
"""
return list(map(lambda i, self=self: self.fullPathSerie(i),
list(range(len(self.get('name_serie'))))))
def _getGlobal(self, attrName, default=None):
r = self._globalAttributes.get(attrName)
if r is None:
if self.parent:
return self.parent._getGlobal(attrName, default)
else:
return default
return r
def _getLocal(self, attrName, default=None):
r = self._localAttributes.get(attrName)
if r is None:
if self.parent:
return self.parent._getLocal(attrName, default)
else:
return default
return r
def _getOther(self, attrName, default=None):
r = self._otherAttributes.get(attrName)
if r is None:
if self.parent:
return self.parent._getOther(attrName, default)
else:
return default
return r
def _getQueryString(self, attrName, default=None):
r = self._queryStringAttributes.get(attrName)
if r is None:
if self.parent:
return self.parent._getQueryString(attrName, default)
else:
return default
return r
[docs] def get(self, attrName, default=None, search_header=False):
"""
Gets the value of an attribute.
If the attribute is not found in the attributes stored in this diskItem object,
it can be searched for in data file header using :py:func:`aimsFileInfo` function if the option search_header is True.
.. warning::
If search_header is True, the method can take more time to execute than :py:func:`getHierarchy` and :py:func:`getNonHierarchy`
when the attribute is not found because it reads the header of the file on the filesystem.
:param string attrName: name of the attribute
:param default: value returned if the attribute is not set in this diskItem.
"""
r = self._globalAttributes.get(attrName)
if r is None:
r = self._minfAttributes.get(attrName)
if r is None:
r = self._otherAttributes.get(attrName)
if r is None:
r = self._localAttributes.get(attrName)
if r is None:
r = self._queryStringAttributes.get(attrName)
if r is None and search_header:
info = aimsFileInfo(self.fullPath())
for k, v in six.iteritems(info):
self._otherAttributes.setdefault(k, v)
r = info.get(attrName)
if r is None:
if self.parent:
return self.parent.get(attrName, default)
else:
return default
return r
def __getitem__(self, attrName):
"""
Enables to use d[attrname] notation.
"""
r = self.get(attrName)
if r is None:
raise KeyError(attrName)
return r
[docs] def getInTree(self, attrPath, default=None, separator='.'):
"""
This function could be used to get an attribute value from an object that is an attribute value of a diskItem.
::
d.getInTree("attr1.attr2...") <=> d.get(attr1).get(attr2)...
:param string attrPath: the attributes path, each attribute is separated by a separator character
:param default: default value is the attribute value is not found
:param string separator: character separator used to separate the different attributes in the attributes path.
"""
item = self
stack = attrPath.split(separator)
while stack and item is not None:
item = item.get(stack.pop(0))
if item is None or stack:
return default
return item
[docs] def getHierarchy(self, attrName, default=None):
"""
Gets the attribute from the global attributes or the local attributes or the parent hierarchy attributes.
:param string attrName: name of the attribute
:param default: value returned if the attribute is not found.
"""
r = self._globalAttributes.get(attrName)
if r is None:
r = self._localAttributes.get(attrName)
if r is None:
if self.parent:
return self.parent.getHierarchy(attrName, default)
else:
return default
return r
[docs] def getNonHierarchy(self, attrName, default=None):
"""
Gets the attribute value from the attributes written in minf file or in other attributes or in parent non hierarchy attributes.
:param string attrName: name of the attribute
:param default: value returned if the attribute is not found.
"""
r = self._minfAttributes.get(attrName)
if r is None:
r = self._otherAttributes.get(attrName)
if r is None:
r = self._queryStringAttributes.get(attrName)
if r is None:
if self.parent:
return self.parent.getNonHierarchy(attrName, default)
else:
return default
return r
[docs] def nonHierarchyAttributes(self):
"""
Returns all non hierarchy attributes in a dictionary.
"""
result = {}
self._mergeNonHierarchyAttributes(result)
return result
[docs] def hierarchyAttributes(self):
"""
Returns all hierarchy attributes as a dictionary.
"""
result = {}
self._mergeHierarchyAttributes(result)
return result
def __contains__(self, attrName):
"""
Returns True if the diskItem has an attribute with this name in one of its dictionaries of attributes.
"""
r = attrName in self._minfAttributes
if r:
return r
r = attrName in self._otherAttributes
if r:
return r
r = attrName in self._queryStringAttributes
if r:
return r
r = attrName in self._localAttributes
if r:
return r
r = attrName in self._globalAttributes
if r:
return r
if self.parent:
return attrName in self.parent
return 0
def has_key(self, attrName):
return self.__contains__(attrName)
def _setGlobal(self, attrName, value):
if self.parent and self.parent._getGlobal(attrName) is not None:
raise AttributeError(
HTMLMessage(_t_('a global attribute <em>%s</em> already exists in item <em><code>%s</code></em>') % (str(attrName), str(self))))
self._globalAttributes[attrName] = value
def _updateGlobal(self, dict):
for attrName, value in dict.items():
self._setGlobal(attrName, value)
def _setLocal(self, attrName, value):
if self._getGlobal(attrName) is not None:
raise AttributeError(
HTMLMessage(_t_('a global attribute <em>%s</em> already exists in item <em><code>%s</code></em>') % (str(attrName), str(self))))
self._localAttributes[attrName] = value
def _updateLocal(self, dict):
for attrName, value in dict.items():
self._setLocal(attrName, value)
def _setOther(self, attrName, value):
if self._getGlobal(attrName) is not None:
raise AttributeError(
HTMLMessage(_t_('a global attribute <em>%s</em> already exists in item <em><code>%s</code></em>') % (str(attrName), str(self))))
minfValue = self._minfAttributes.get(attrName, Undefined)
if minfValue is Undefined:
self._otherAttributes[attrName] = value
elif minfValue != value:
raise AttributeError(
HTMLMessage(_t_('a MINF attribute <em>%s</em> already exists in item <em><code>%s</code></em>') % (str(attrName), str(self))))
def _updateOther(self, dict):
for attrName, value in dict.items():
self._setOther(attrName, value)
def _setQueryString(self, attrName, value):
if self._getOther(attrName) is not None:
raise AttributeError(
HTMLMessage(_t_('an attribute <em>%s</em> already exists in item <em><code>%s</code></em>') % (str(attrName), str(self))))
self._queryStringAttributes[attrName] = value
def _updateQueryString(self, dict):
for attrName, value in dict.items():
self._setQueryString(attrName, value)
[docs] def setMinf(self, attrName, value, saveMinf=True):
"""
Adds this attribute to the minf attributes of the diskItem.
:param string attrName: name of the attribute
:param value: value sets for the attribute
:param bool saveMinf: if True the modified attributes are written to the diskItem minf file.
"""
self._otherAttributes.pop(attrName, None)
self._queryStringAttributes.pop(attrName, None)
self._minfAttributes[attrName] = value
if saveMinf:
minf = self._readMinf()
if minf is None:
minf = {}
minf[attrName] = value
self._writeMinf(minf)
[docs] def minf(self):
"""
Returns a dictionary containing the minf attributes of the diskItem.
"""
return self._minfAttributes
[docs] def updateMinf(self, dict, saveMinf=True):
"""
Adds new attributes to the minf attributes of the diskItem and possibly save the attributes in a minf file.
Note: it is not possible to remove attributes using this function, as
it only changes the values of the attributes given in the dict.
:param dict: dictionary containing the attributes that will be added to this diskItem minf attributes.
:param bool saveMinf: if True the minf attributes will be saved in the minf file of the diskItem.
"""
# print('!neuroDiskItems !: updateMinf : ', dict)
for attrName, value in dict.items():
self._otherAttributes.pop(attrName, None)
self._queryStringAttributes.pop(attrName, None)
if attrName in self._localAttributes:
self._localAttributes[attrName] = value
elif attrName in self._globalAttributes:
self._globalAttributes[attrName] = value
self._minfAttributes[attrName] = value
if saveMinf:
minf = self._readMinf()
if minf is None:
minf = {}
minf.update(dict)
self._writeMinf(minf)
[docs] def isReadable(self):
"""
Returns True if all the files associated to this diskItem exist and are readable.
"""
result = 1
for p in self.fullPaths(withQueryString=False):
if not os.access(p, os.F_OK + os.R_OK):
result = 0
break
return result
[docs] def isWriteable(self):
"""
Returns True if all the files associated to this diskItem which exist are readable and writable
and if they do not exist if the parent directories exist and are writable and executable.
"""
result = 1
for p in self.fullPaths(withQueryString=False):
if os.access(p, os.F_OK):
if not os.access(p, os.R_OK + os.W_OK):
result = 0
break
else:
# Check if the parent directory is writable for the
# inexistant files
p_dir = os.path.dirname(p)
if not os.access(p_dir, os.W_OK + os.X_OK):
result = 0
break
return result
def __repr__(self):
return repr(self.fullPath())
[docs] def childs(self):
"""
Virtual function, returns None. It is overriden in derived class :py:class:`Directory`.
"""
return None
def __str__(self):
return self.fullPath()
[docs] def priority(self):
"""
Returns the value of priority attribute if found
else the parent diskItem priority,
else the default priority attribute, else 0.
This priority is an attribute associated to the ontology rule that enabled to identify the diskItem.
"""
if getattr(self, '_priority', None) is not None:
return self._priority
if self.parent is not None:
return self.parent.priority()
return getattr(self, '_defaultPriority', 0)
[docs] def setPriority(self, newPriority, priorityOffset=0):
"""
Sets a value for the priority attribute: ``newPriority + priorityOffset``.
"""
self._priority = newPriority
if priorityOffset:
self._priority = self.priority() + priorityOffset
[docs] def minfFileName(self):
"""
Returns the name of the minf file associated to this diskItem. It is generally the name of the main file of the diskItem + the *.minf* extension.
"""
if self.format is not None and (isinstance(self.format, MinfFormat) or self.format.name == 'Minf'):
return self.fullPath(withQueryString=False)
else:
return self.fullPath(withQueryString=False) + '.minf'
[docs] def saveMinf(self, overrideMinfContent=None):
"""
Writes minf attributes or the content of overrideMifContent if given to the minf file.
"""
minfContent = {}
if self._uuid is not None:
minfContent['uuid'] = str(self._uuid)
if overrideMinfContent is None:
minfContent.update(self._minfAttributes)
else:
minfContent.update(overrideMinfContent)
if self._isTemporary:
temporary.manager.registerPath(self.minfFileName())
self._writeMinf(minfContent)
[docs] def removeMinf(self, attrName, saveMinf=True):
"""
Remove the attribute from minf attributes.
:param string attrName: name of the attribute to remove
:param bool saveMinf: if True, the minf attributes are saved in the minf file.
"""
del self._minfAttributes[attrName]
if saveMinf:
minf = self._readMinf()
if minf is not None:
if minf.pop(attrName, Undefined) is not Undefined:
self._writeMinf(minf)
[docs] def clearMinf(self, saveMinf=True):
"""
Deletes all minf attributes. Also removes the minf file if saveMinf is True.
"""
self._minfAttributes.clear()
if saveMinf:
minf = self.minfFileName()
if os.path.exists(minf):
try:
os.remove(minf)
except OSError:
# maybe the file has already been removed
if os.path.exists(minf):
# if not, really trigger an error
raise
def _readMinf(self):
"""
Reads the minf file and returns its content.
"""
attrFile = self.minfFileName()
if os.path.exists(attrFile):
try:
if six.PY2:
f = open(attrFile)
else:
f = open(attrFile, encoding='UTF-8')
# f = open(attrFile)
minfContent = readMinf(f)[0]
# Ignor huge DICOM information produced by NMR
# and stored in 'dicom' key.
if minfContent:
minfContent.pop('dicom', None)
f.close()
return minfContent
except Exception:
showException(beforeError=_t_(
'in file <em>%s</em><br>') % attrFile)
return None
def _writeMinf(self, minfContent):
"""
Writes the given content to the minf file.
if minfContent is None, removes the minf file.
"""
minf = self.minfFileName()
# print("!neuroDiskItems : _writeMinf : ", minf)
if minfContent:
if six.PY2:
file = open(minf, 'w')
else:
file = open(minf, 'w', encoding='UTF-8')
file.write('attributes = ' + str(minfContent))
file.close()
else:
if os.path.exists(minf):
os.remove(minf)
def readDeclaredAttributes(self):
result = {}
for path in set(six.itervalues(self.get('_declared_attributes_location', {}))):
if os.path.exists(path):
result .update(json.load(open(path)))
return result
def readAndUpdateDeclaredAttributes(self):
self._globalAttributes.update(self.readDeclaredAttributes())
[docs] def readAndUpdateMinf(self):
"""
Reads the content of the minf file and updates the minf attribute dictionary accordingly.
"""
self._lock.acquire()
try:
attrs = self._readMinf()
# print('! neuroDiskItems : readAndUpdateMinf : ', attrs)
if attrs is not None:
if 'uuid' in attrs:
self._changeUuid(Uuid(attrs['uuid']))
del attrs['uuid']
self.clearMinf(saveMinf=False)
self.updateMinf(attrs, saveMinf=False)
finally:
self._lock.release()
[docs] def createParentDirectory(self):
"""
According to the file path of the diskItem, creates the directory that should contain the files if it doesn't exist.
"""
p = os.path.dirname(self.fullPath(withQueryString=False))
if not os.path.exists(p):
try:
os.makedirs(p)
except OSError as e:
if not e.errno == errno.EEXIST:
# filter out 'File exists' exception, if the same dir has been created
# concurrently by another instance of BrainVisa or another
# thread
raise
[docs] def isTemporary(self):
"""
Returns True if it is a temporary diskItem, that is to say its files will be automatically deleted when its is no more referenced.
"""
return self._isTemporary
[docs] def distance(self, other):
'''Returns a value that represents a sort of distance between two DiskItems.
The distance is not a number but distances can be sorted, it is a tuple of numbers.'''
# Count the number of common hierarchy attributes
hierarchyCommon = \
sum([other.getHierarchy(nv[0]) == nv[1]
for nv in self.hierarchyAttributes().items()],
self.type is other.type)
# Count the number of common non hierarchy attributes
nonHierarchyCommon = \
sum(
[other.getNonHierarchy(nv[0]) == nv[1]
for nv in self.nonHierarchyAttributes().items()],
self.type is other.type)
return (-hierarchyCommon, self.priority() - other.priority(), -nonHierarchyCommon, )
def _changeUuid(self, newUuid):
self._uuid = newUuid
if newUuid is not None:
_uuid_to_DiskItem[newUuid] = self
[docs] def setUuid(self, uuid, saveMinf=True):
"""
Sets a new uuid to this diskItem.
"""
self._changeUuid(Uuid(uuid))
if saveMinf:
attrs = self._readMinf()
if not attrs:
attrs = {}
if attrs.get('uuid') != self._uuid:
attrs['uuid'] = self._uuid
try:
self._writeMinf(attrs)
except Exception as e:
raise MinfError(
six.text_type(_t_('uuid cannot be saved in minf file') + ': ') + six.text_type(e))
[docs] def uuid(self, saveMinf=True):
"""
Gets the uuid of the diskItem.
"""
if self._uuid is None:
self._minfLock.acquire()
try:
attrs = self._readMinf()
if attrs and 'uuid' in attrs:
self._changeUuid(Uuid(attrs['uuid']))
else:
try:
self.setUuid(Uuid(), saveMinf=saveMinf)
except MinfError:
# probably read-only
self.setUuid(Uuid(), saveMinf=False)
finally:
self._minfLock.release()
return self._uuid
[docs] def modificationHash(self):
"""
Return a value that can be used to assess modification of this
DiskItem. Two calls to modificationHash will return the same value if and
only if all files in the DiskItem have not changed. Note that the contents
of the files are not read, the modification hash rely only on os.stat.
This method uses the funciton :py:func:`modificationHashOrEmpty` to create a hash code for each file.
"""
files = self.fullPaths() + [self.minfFileName()]
return tuple([(f,) + tuple(modificationHashOrEmpty(f)) for f in files])
[docs] def eraseFiles(self, remove_from_database=False):
"""
Deletes all files associated to this diskItem.
"""
if remove_from_database:
database = self.get('_database')
if database:
from brainvisa.data import neuroHierarchy
db = neuroHierarchy.databases.database(database)
db.removeDiskItem(self, eraseFiles=False)
for fp in self.fullPaths():
if os.path.exists(fp):
shelltools.rm(fp)
fp = self.minfFileName()
if os.path.exists(fp):
shelltools.rm(fp)
[docs] def isLockData(self):
"""
Return True or False to know if a file is locked.
"""
# print("-- FUNCTION isLockData : neuroDiskItems -- ")
nameFileLock = str(self.fileName()) + ".lock"
# print("File to test")
# print(nameFileLock)
return os.path.isfile(nameFileLock)
[docs] def lockData(self):
"""
function to lock file
add a filename.lock file if the filename exists
"""
# print("-- FUNCTION lockData : neuroDiskItems -- ")
nameFileLock = str(self.fileName()) + ".lock"
if os.path.isfile(self.fileName()):
# print("File to lock" + nameFileLock)
fd = open(nameFileLock, 'a')
fd.close()
self.lockChanged.emit(True)
return(True)
else:
return(False)
[docs] def unlockData(self):
"""
function to unlock file
remove a .lock file
"""
# print("-- FUNCTION unlockData : neuroDiskItems -- ")
nameFileLock = str(self.fileName()) + ".lock"
if os.path.isfile(nameFileLock):
fd = os.remove(nameFileLock)
self.lockChanged.emit(False)
[docs] def getFileNameFromUuid(self, uuid):
"""
get the bvproc filname where is stored the history
"""
from brainvisa.data.neuroHierarchy import databases
from brainvisa.processes import defaultContext
# print("! getFileNameFromUuid : ")
# donner directement le bvprocfile
database = self.get("database")
if database is None:
database = self.get("_database")
# print("!neuroDiskItems : getFileNameFromUuid :", database)
# print("!neuroDiskItems : getFileNameFromUuid :", type(database))
# print("!neuroDiskItems : getFileNameFromUuid :", databases._databases)
db = databases._databases.get(database)
# print("!neuroDiskItems : getFileNameFromUuid :", db)
# print("!neuroDiskItems : getFileNameFromUuid :", type(db))
cursor = db._getDatabaseCursor()
bvproc_file = None
try:
sql = "SELECT filename FROM _FILENAMES_ WHERE _uuid='" + uuid + "'"
# print(sql)
bvproc_file = cursor.execute(sql).fetchone()
except sqlite3.OperationalError as e:
defaultContext().warning(
"Cannot question database " + db.name + ". You should update this database.")
finally:
db._closeDatabaseCursor(cursor)
if bvproc_file is not None:
bvproc_file = bvproc_file[0]
# print("!neuroDiskItems : getFileNameFromUuid :", bvproc_file)
return bvproc_file
[docs] def setResolutionLevel(self, resolution_level):
"""
Sets the resolution level in the queryString of a DiskItem.
It allows to use a specific resolution for multi resolution data.
Once set, the fullPath of the DiskItem becomes
'/dir/name.ext?resolution_level=<resolution_level>'
:param resolution_level: The DiskItem resolution level
to use.
"""
dims = self.get('resolutions_dimension', search_header = True)
value_resolution_level = self.get('resolution_level')
if dims is not None and len(dims) > 1:
# Remove the query string resolution_level option if the default
# value is selected
if resolution_level == (len(dims) - 1):
resolution_level = None
if value_resolution_level != resolution_level:
# Update query string only when levels are not the same
if resolution_level is None:
# Remove the resolution level because it is the default
# level
del self._queryStringAttributes['resolution_level']
else:
self._setQueryString('resolution_level', resolution_level)
elif value_resolution_level is not None:
# Not a multiresolution file so remove resolution_level
del self._queryStringAttributes['resolution_level']
[docs] def resolutionLevel(self):
'''
Get the current resolution level of a DiskItem. The resolution level
is read from the querystring.
:return: The current resolution level of DiskItem.
'''
dims = self.get('resolutions_dimension', search_header = True)
if dims is not None:
opts = parse_query_string(self.fullPath())
resolution_level = opts.get('resolution_level')
if resolution_level is None:
resolution_level = len(dims) - 1
else:
resolution_level = int(resolution_level)
if resolution_level < 0:
resolution_level = len(dims) + resolution_level
if resolution_level < 0 or resolution_level >= len(dims):
# Set default resolution level if resolution level
# is out of valid resolution levels
resolution_level = len(dims) - 1
return resolution_level
return 0
[docs] def getResolutionDimensions(self):
'''
Get the multi resolution dimensions for a . If the file
is not an image, None is returned.
:return: The current resolution level of :py:class:`DiskItem`.
'''
dims = self.get('resolutions_dimension', search_header = True)
if dims is None:
dims = self.get('volume_dimension', search_header = True)
if dims is not None:
dims = [ self.get('volume_dimension', search_header = True) ]
return dims
#----------------------------------------------------------------------------
[docs]class File(DiskItem):
"""
This class represents a diskItem that cannot contain other diskItems (it is not a directory).
"""
def __init__(self, name, parent):
DiskItem.__init__(self, name, parent)
#----------------------------------------------------------------------------
[docs]class Directory(DiskItem):
"""
This class represents a directory, that is to say a diskItem that can contain other diskItems.
"""
def __init__(self, name, parent):
DiskItem.__init__(self, name, parent)
self._childs = []
self.lastModified = 0
self.scanner = None
if self.parent is None:
self._automatic_update = True
self._check_directory_time_only = False
def __getstate__(self):
state = DiskItem.__getstate__(self)
state['_childs'] = self._childs
state['lastModified'] = self.lastModified
state['scanner'] = self.scanner
return state
def __setstate__(self, state):
DiskItem.__setstate__(self, state)
self._childs = state['_childs']
self.lastModified = state['lastModified']
self.scanner = state['scanner']
if self.parent is None:
self._automatic_update = True
self._check_directory_time_only = False
[docs] def childs(self):
"""
Returns the children diskItems scaning the directory using the ontology rules.
Not used.
"""
self._lock.acquire()
try:
if self.scanner is None:
return []
if not self._topParent()._automatic_update:
return self._childs
fullName = self.fullPath()
if not os.path.isdir(fullName):
return []
currentTime = int(time.time())
listdir = None
if not self._topParent()._check_directory_time_only:
modificationTime = 0
# print('directory', fullName, 'NOT smart')
# sys.stdout.flush()
listdir = []
for n in os.listdir(fullName):
try:
modificationTime = max(modificationTime,
os.stat(os.path.join(fullName, n))[stat.ST_MTIME])
listdir.append(n)
except Exception:
pass
else:
modificationTime = os.stat(fullName)[stat.ST_MTIME]
debug = neuroConfig.debugHierarchyScanning
if modificationTime >= self.lastModified:
if debug:
print(
'----------------------------------------------', file=debug)
print(fullName, 'modified', file=debug)
print(
'----------------------------------------------', file=debug)
print('modification time:', time.ctime(
modificationTime), file=debug)
print('last modification:', time.ctime(self.lastModified),
file=debug)
debug.flush()
# Rescan directory
childs = []
if listdir is None:
listdir = os.listdir(fullName)
for n in listdir:
if os.path.isdir(os.path.join(fullName, n)):
childs.append(Directory(n, self))
else:
childs.append(File(n, self))
if debug:
print('children count:', len(childs), file=debug)
# Identify files
if self.scanner:
if self._childs:
oldChilds = {}
for i in self._childs:
oldChilds[(i.fileName(), i.type, i.format)] = i
self._childs = self.scanner.scan(childs)
for i in self._childs:
old = oldChilds.get(
(i.fileName(), i.type, i.format))
if isinstance(old, Directory):
i.lastModified = old.lastModified
i._childs = old._childs
else:
self._childs = self.scanner.scan(childs)
else:
self._childs = childs
self.lastModified = currentTime
result = self._childs
finally:
self._lock.release()
return result
#----------------------------------------------------------------------------
[docs]def getId(name):
"""
Returns the name in lowercase.
"""
return name.lower()
#----------------------------------------------------------------------------
[docs]class BackwardCompatiblePattern(DictPattern):
"""
This class represents a file pattern.
The pattern is described with an expression like ``f|*.jpg``.
The first character (before the ``|``) indicates if the pattern recognizes files (``f``) or directories (``d``).
This part is optional. The rest of the pattern is a simple regular expression that describes the file name.
It can contain ``*`` character to replace any character and ``#`` character to replace numbers.
See the parent class :py:class:`brainvisa.data.patterns.DictPattern` for more details about the patterns.
A method :py:meth:`match` enables to check if a :py:class:`DiskItem` file name matches this format pattern.
"""
_msgBadPattern = '<em><code>%s</code></em> is not a valid pattern'
def __init__(self, pattern):
i = pattern.find('|')
if i >= 0:
fileType = pattern[:i]
# Check file type
if fileType == 'fd':
self.fileType = None
elif fileType == 'f':
self.fileType = File
elif fileType == 'd':
self.fileType = Directory
else:
raise ValueError(
HTMLMessage(_t_(self._msgBadPattern) % pattern))
p = pattern[i + 1:]
else:
self.fileType = None
p = pattern
DictPattern.__init__(self, p)
self.pattern = pattern
[docs] def match(self, diskItem):
"""
Checks if the diskitem file name matches the pattern.
:param diskItem: The :py:class:`DiskItem` whose file format is checked.
:returns: a dictionary containing the value found in the diskItem file name for each named expression of the pattern.
The ``*`` character in the pattern is associated to a ``filename_variable`` key in the result dictionary.
The ``#`` character in the pattern is associated to a ``name_serie`` key in the result dictionary.
``None`` is returned if the diskitem file name doesn't match the pattern.
"""
# Check File / Directory / both
if self.fileType is not None:
if diskItem.__class__ is not DiskItem and \
not isinstance(diskItem, self.fileType):
return None
result = DictPattern.match(
self, os.path.basename(diskItem.name), diskItem)
return result
[docs] def unmatch(self, diskItem, matchResult, force=False):
"""
The opposite of :py:meth:`match` method: the matching string is found from a match result and a dictionary of attributes values.
:param matchResult: dictionary which associates a value to each named expression of the pattern.
:param dict: dictionary which associates a value to each attribute name of the pattern.
:param bool force: If True default values are set in the match result for ``filename_variable`` and ``name_serie`` attributes.
:rtype: string
:returns: The rebuilt matching string.
"""
if matchResult is None:
return None
if force:
matchResult.setdefault('filename_variable', '')
matchResult.setdefault('name_serie', [])
return DictPattern.unmatch(self, matchResult, diskItem)
#----------------------------------------------------------------------------
[docs]def getResolutionsFromItems(items):
"""
Get the multi resolutions string for common resolution levels of a list of
items.
:param items: list of py:class:`DiskItem`.
:returns: The list of common resolutions.
"""
def format_size(size, unit = None, precision = None):
if unit:
unit = unit + (str(len(size)) if len(size) > 1 else '')
return 'x'.join([('%.' + str(precision) + 'f') % s \
if precision else str(s) \
for s in size]) + (' ' + unit if unit else '')
def get_resolution_string(level, ldim = None):
if len(uniform_ratios) > level:
res_vs = [(r * d) for r, d in zip(uniform_ratios[level][:ldim],
uniform_vs[:ldim])]
if len(res_vs) > 0:
res_dim = []
if len(uniform_dims) > level:
res_dim = uniform_dims[level][:ldim]
return '%s: %s (%s)' % (level,
format_size(res_dim)
if len(res_dim) > 0
else 'item specific',
format_size(res_vs, unit = 'mm',
precision = 6))
return str(level)
resolutions = []
if items is not None:
if not isinstance(items, (list, tuple)):
items = [items]
items_dims = [i.getResolutionDimensions() for i in items]
# Process items level ratios for each dimension
items_ratios = []
for i_dims in items_dims:
i_ratios = []
if i_dims is not None:
for d in six.moves.xrange(len(i_dims)):
i_ratios.append([(a / b) for a, b in zip(i_dims[0],
i_dims[d])])
items_ratios.append(i_ratios)
# Process item full resolution voxel size
items_vs = list()
for i, r in zip(items, items_ratios):
vs = i.get('voxel_size', search_header = True)
res = i.resolutionLevel()
if vs is not None and res is not None:
items_vs.append([s / f for s, f in zip(vs, r[res])])
else:
items_vs.append([])
# Check that each resolution level has uniform ratios for all items
uniform_ratios = dict()
for i in six.moves.xrange(len(items_ratios)):
i_ratios = items_ratios[i]
for d in six.moves.xrange(len(i_ratios)):
ratios = uniform_ratios.get(d)
if i == 0:
uniform_ratios[d] = i_ratios[d]
elif ratios is None:
uniform_ratios[d] = []
elif len(ratios) > 0:
if ratios != i_ratios[d]:
# Update to non uniform ratios
uniform_ratios[d] = []
# Check that voxel sizes at highest resolution are uniform
uniform_vs = None
for i_vs in items_vs:
if uniform_vs is None:
uniform_vs = i_vs
elif uniform_vs != i_vs:
uniform_vs = []
break
# Check that each resolution level has uniform dimensions for all items
uniform_dims = dict()
for i in six.moves.xrange(len(items_dims)):
i_dims = items_dims[i]
for d in six.moves.xrange(len(i_dims)):
dims = uniform_dims.get(d)
if i == 0:
uniform_dims[d] = i_dims[d]
elif dims is None:
uniform_dims[d] = []
elif len(dims) > 0:
if dims != i_dims[d]:
# Update to non uniform dimensions
uniform_dims[d] = []
# Get dimension limits
max_ldim = 0
for i in six.moves.xrange(len(items_dims)):
dims = items_dims[i]
ldim = len(dims[0])
# Does not display dimension with size 1
for d in six.moves.xrange(len(dims[0]) - 1, -1, -1):
if dims[0][d] == 1:
ldim -= 1
else:
break
if ldim > max_ldim:
max_ldim = ldim
resolutions = [get_resolution_string(level, ldim = max_ldim)
for level in six.moves.xrange(len(uniform_dims))]
return resolutions
#----------------------------------------------------------------------------
[docs]class BackwardCompatiblePatterns(object):
"""
This class represents several file patterns.
Each pattern is a :py:class:`BackwardCompatiblePattern`.
"""
_typeMsg = '<em><code>%s</code></em> is not a valid pattern list'
def __init__(self, patterns):
"""
:param patterns: a string, a list of string, or a list of :py:class:`BackwardCompatiblePattern`. They are used to create the internal list of :py:class:`BackwardCompatiblePattern`.
"""
# Build Pattern list in self.patterns
if type(patterns) is str:
self.patterns = [BackwardCompatiblePattern(patterns)]
elif type(patterns) in (tuple, list):
self.patterns = []
for i in patterns:
if isinstance(i, BackwardCompatiblePattern):
self.patterns.append(i)
else:
self.patterns.append(BackwardCompatiblePattern(i))
else:
raise TypeError(HTMLMessage(_t_(self._typeMsg) % str(patterns)))
[docs] def match(self, diskItem, returnPosition=0):
"""
Checks if the diskitem matches one of the patterns of this :py:class:`BackwardCompatiblePatterns`.
:param diskItem: The diskitem which files names should match the patterns.
:param bool returnPosition: if True, the index of the matching pattern is returned as well as the match result.
:returns: the match result or a tuple (match result, index) if ``returnPosition`` is True.
"""
pos = 0
for i in self.patterns:
m = i.match(diskItem)
if m:
if returnPosition:
return (m, pos)
return m
pos += 1
return None
[docs] def unmatch(self, diskItem, matchResult, force=0):
"""
Returns the list of file names generated from a diskitem and its match result.
"""
return [p.unmatch(diskItem, matchResult, force)
for p in self.patterns]
[docs] def fileOrDirectory(self):
"""
Checks if all the patterns match files or directories. If they all match the same type (file or directory), returns this type.
"""
if self.patterns:
result = self.patterns[0].fileType
for p in self.patterns[1:]:
if p.fileType is not result:
return None
return result
return None
def __cmp__(self, other):
return self.patterns != other.patterns
def __eq__(self, other):
return self.patterns == other.patterns
def __lt__(self, other):
return self.patterns < other.patterns
def __le__(self, other):
return self.patterns <= other.patterns
def __gt__(self, other):
return self.patterns > other.patterns
def __ge__(self, other):
return self.patterns >= other.patterns
#----------------------------------------------------------------------------
formats = {}
formatLists = {}
#----------------------------------------------------------------------------
#----------------------------------------------------------------------------
#----------------------------------------------------------------------------
#----------------------------------------------------------------------------
#----------------------------------------------------------------------------
#----------------------------------------------------------------------------
#----------------------------------------------------------------------------
#----------------------------------------------------------------------------
#----------------------------------------------------------------------------
#----------------------------------------------------------------------------
directoryFormat = Format('Directory', 'd|*', ignoreExclusive=1)
fileFormat = Format('File', 'f|*', ignoreExclusive=1)
#----------------------------------------------------------------------------
diskItemTypes = {}
#----------------------------------------------------------------------------
[docs]class DiskItemType(object):
"""
This class represents a data type. It is used to define the type attribute of :py:class:`DiskItem` objects.
The types are defined hierarchically, that's why a type can have a parent type
and a method :py:meth:`DiskItemType.isA` enables to request if a type is derived from another type.
:Attributes:
.. py:attribute:: name
Name of the Type. For example *T1 MRI*.
.. py:attribute:: fileName
Name of the python file that contains the definition of this type.
.. py:attribute:: id
Identifier associated to this type.
.. py:attribute:: parent
As types are defined hierarchically, a type can have a parent type.
.. py:attribute:: toolbox
Name of the toolbox that defined the type, default is 'axon' (no toolbox).
:Methods:
"""
_reloaded_types = {}
def __new__(cls, typeName, *args, **kwargs):
# print'** new **', cls, typeName, args, kwargs
global diskItemTypes
id = getId(typeName)
instance = diskItemTypes.get(id)
if instance is None:
instance = super(DiskItemType, cls).__new__(cls)
# to check double definition and removed types
cls._reloaded_types.setdefault(id, 0)
cls._reloaded_types[id] += 1
return instance
def __getnewargs__(self):
return (self.name,)
def __init__(self, typeName, parent=None, attributes=None):
"""
:param string typeName: name of the type
:param parent: can be the name of the parent. If it already registered, it will be found from its name.
:param attributes: dictionary containing attributes associated to this type.
"""
# Check name
if type(typeName) is not str:
raise ValueError(_t_('a type name must be a string'))
self.name = typeName
self.id = getId(typeName)
# try to compare to an older copy
if hasattr(self, '__older_type'):
other = self.__older_type
del self.__older_type
elif self._reloaded_types.get(self.id):
other = diskItemTypes.get(self.id)
if other:
other = dict([(k, v) for k, v in six.iteritems(other.__dict__)
if not k.startswith('_')])
else:
other = None
tb = traceback.extract_stack(None, 4)
self.fileName = tb[0][0]
self.parse_line = tb[0][1]
self.toolbox = 'axon'
if parent is None:
self.parent = None
else:
self.parent = getDiskItemType(parent)
if other:
if not sameContent(self, other, as_dict=True):
raise ValueError(HTMLMessage(_t_(
'line %d, invalid redefinition for type %s, previous definition in file: %s, line %d')
% (self.parse_line, self.name, other[
'fileName'],
other['parse_line'])))
else:
diskItemTypes[self.id] = self
if attributes is None:
if self.parent is not None:
self._typeAttributes = self.parent._typeAttributes
else:
self._typeAttributes = None
else:
self._typeAttributes = attributes
def __getstate__(self):
return {'diskItemType': self.name}
def __setstate__(self, state):
dit = getDiskItemType(state['diskItemType'])
self.__dict__.update(dit.__dict__)
[docs] def setType(self, item, matchResult, formatPosition):
"""
Sets this type to the given item.
"""
item.type = self
[docs] def isA(self, diskItemType):
"""
Returns True if the given diskItem type is a parent (direct or indirect) of the current diskItem.
"""
diskItemType = getDiskItemType(diskItemType)
if diskItemType is self:
return 1
if self.parent is None:
return 0
return self.parent.isA(diskItemType)
[docs] def inheritanceLevels(self, diskItemType):
"""
Returns -1 if the given diskItem type is not a parent (direct or indirect)
of the current diskItem, else the number of levels between the given diskItem
type and the current diskItem type.
"""
# levels = -1
# t = getDiskItemType( diskItemType )
# while t:
# levels += 1
# if t == self:
# return levels
# t = t.parent
# return -1
diskItemType = getDiskItemType(diskItemType)
if diskItemType is self:
return 0
if self.parent is None:
return -1
p = self.parent.inheritanceLevels(diskItemType)
return p if p < 0 else p + 1
[docs] def parents(self):
"""
Returns the list of parents of this diskItem, that is to say its parent and its parent's parents.
"""
if self.parent:
return [self.parent] + self.parent.parents()
return []
[docs] def levels(self):
"""
Returns hierarchy levels from the current diskItem. Each level is a tuple
that contains an integer and a diskItem type.
"""
i = 0
t = self
while t:
yield (i, t)
t = t.parent
i += 1
def __str__(self):
return self.name
def __repr__(self):
return '<' + str(self) + '>'
[docs] def setAttributes(self, item, writeOnly=0):
"""
Add this type attributes to the given diskItem minf attributes.
"""
attrs = self._typeAttributes
if attrs is not None:
if type(attrs) is dict:
item.updateMinf(attrs)
elif callable(attrs):
item.updateMinf(attrs(item, writeOnly=writeOnly))
else:
raise ValueError(
HTMLMessage(_t_('Invalid attributes: <em>%s</em>') % htmlEscape(str(attrs))))
#----------------------------------------------------------------------------
[docs]def getDiskItemType(item):
"""
Gets the :py:class:`DiskItemType` whose id is the given item.
"""
if isinstance(item, DiskItemType):
return item
elif type(item) is str or type(item) is six.text_type:
result = diskItemTypes.get(getId(item))
if result:
return result
raise ValueError(
HTMLMessage(_t_('<em><code>%s</code></em> is not a valid file or directory type') % str(item)))
#----------------------------------------------------------------------------
[docs]def getAllDiskItemTypes():
"""
Gets all registered :py:class:`DiskItemType`.
"""
return list(diskItemTypes.values())
#----------------------------------------------------------------------------
[docs]def isSameDiskItemType(base, ref):
"""
Returns True if ref is a parent of base.
"""
if base:
return base.isA(ref)
else:
return ref is None
#----------------------------------------------------------------------------
[docs]class FileType(DiskItemType):
"""
This class represents a type for a DiskItem associated to files (a :py:class:`File`).
It is used to define new data types in brainvisa ontology files.
It can store file formats associated to this type.
"""
def __init__(self, typeName, parent=None, formats=None, minfAttributes=None):
"""
:param string typeName: name of the type
:param formats: list of file formats associated to this type of data.
:param minfAttributes: dictionary containing attributes associated to this type.
"""
# try to compare to an older copy
id = getId(typeName)
if not hasattr(self, '__older_type') and self._reloaded_types.get(id):
other = diskItemTypes.get(id)
if other:
other = dict([(k, v) for k, v in six.iteritems(other.__dict__)
if not k.startswith('_')])
self.__older_type = other
# Check formats
if formats:
self.formats = getFormats(formats)
elif parent:
parent = getDiskItemType(parent)
self.formats = parent.formats
else:
self.formats = None
# Register type
DiskItemType.__init__(self, typeName, parent, minfAttributes)
[docs] def sameContent(self, other, as_dict=False):
"""
Returns True if the two file types are instances of the same class and have the same list of formats and the same parent.
The formats and parents are compared using the function :py:func:`sameContent`.
"""
if as_dict and isinstance(other, dict):
return sameContent(self.formats, other['formats']) \
and sameContent(self.parent, other['parent'])
return isinstance( other, FileType ) and \
self.__class__ is other.__class__ and \
sameContent( self.formats, other.formats ) and \
sameContent(self.parent, other.parent)
#----------------------------------------------------------------------------
[docs]def expand_name_serie(text, number):
"""
Replaces ``#`` character in *text* by *number*.
"""
l = text.split('#')
if len(l) == 2:
return l[0] + number + l[1]
return text
#----------------------------------------------------------------------------
[docs]class TemporaryDirectory(Directory):
"""
This class represents a temporary directory.
It will be deleted when Brainvisa closes.
The deletion of temporary files and directories is managed by a :py:class:`brainvisa.data.temporary.TemporaryFileManager`.
"""
def __init__(self, name, parent):
self._isTemporary = 1
if parent:
fullPath = os.path.join(parent.fullPath(), name)
else:
fullPath = name
if not os.path.isdir(fullPath):
try:
os.mkdir(fullPath, 0o770)
except OSError as e:
if not e.errno == errno.EEXIST:
# filter out 'File exists' exception, if the same dir has been created
# concurrently by another instance of BrainVisa or another
# thread
raise
Directory.__init__(self, name, parent)
#----------------------------------------------------------------------------
[docs]class TemporaryDiskItem(File):
"""
This class represents a temporary diskitem.
The associated files are automatically deleted when the corresponding object is no more used and so garbage collected.
The deletion of temporary files and directories is managed by a :py:class:`brainvisa.data.temporary.TemporaryFileManager`.
"""
def __init__(self, name, parent):
File.__init__(self, name, parent)
self._isTemporary = 1
def __del__(self):
if Application().configuration.brainvisa.removeTemporary and \
temporary.manager is not None:
toDelete = self.fullPaths()
toDelete.append(toDelete[0] + '.minf')
# print('deleting temp DI:', toDelete)
for f in toDelete:
n = 0
while 1:
try:
temporary.manager.removePath(f)
break
except OSError:
if n < 100:
n += 1
time.sleep(0.01)
# print('can\' delete', f, 'yet. waiting')
# sys.stdout.flush()
else:
# print('exception while removing', f)
showException(beforeError=_t_('temporary file <em>%s</em> not '
'deleted<br>') % f, gui=0)
# giving up, let it for later
temporary.manager.registerPath(f)
print('continuing after failed rm on %s' % f)
break
[docs] def clear(self):
"""
Removes all files associated to this diskItem.
"""
for f in self.fullPaths():
try:
temporary.manager.removePath(f)
temporary.manager.registerPath(f)
except Exception:
showException(
beforeError=_t_(
'temorary file <em>%s</em> not deleted<br>') % f,
gui=0)
#----------------------------------------------------------------------------
globalTmpDir = None
#----------------------------------------------------------------------------
[docs]def getTemporary(format, diskItemType=None, parent=None, name=None, suffix=None, prefix=None):
"""
Creates a new temporary diskitem. It will be automatically deleted when there is no more references on it or when Brainvisa closes.
:param format: format of the diskitem.
If the format correspond to a directory, a :py:class:`TemporaryDirectory` will be created, else a :py:class:`TemporaryDiskItem` will be created.
:param diskItemType: type of the diskItem.
:param parent: parent diskitem: directory which contains the diskItem.
By default it will be Brainvisa global temporary directory whose path can be choosen in the options.
The global temporary directory diskitem is defined in the global variable :py:data:`globalTmpDir`.
:param name: filename for the diskitem. By default, a new name is generated by the :py:class:`brainvisa.data.temporary.TemporaryFileManager`.
:returns: a :py:class:`TemporaryDirectory` or a :py:class:`TemporaryDiskItem`.
"""
global globalTmpDir
format = getFormat(format)
if diskItemType is not None:
diskItemType = getDiskItemType(diskItemType)
if parent is None:
if globalTmpDir is None:
globalTmpDir = TemporaryDirectory(
neuroConfig.temporaryDirectory, None)
parent = globalTmpDir
if name is None:
name = temporary.manager.newFileName(directory=parent.fullPath(),
suffix=suffix,
prefix=prefix)
if format.fileOrDirectory() is Directory:
item = TemporaryDirectory(name, parent)
else:
item = TemporaryDiskItem(name, parent)
item._files = format.unmatch(item, {'filename_variable': name,
'name_serie': None}, 1)
item.format = format
item.type = diskItemType
toDelete = list(item.fullPaths())
toDelete.append(toDelete[0] + '.minf')
for f in toDelete:
temporary.manager.registerPath(f)
return item
#----------------------------------------------------------------------------
_uuid_to_DiskItem = WeakValueDictionary()
#----------------------------------------------------------------------------
[docs]def getDataFromUuid(uuid):
"""
Gets a :py:class:`DiskItem` from its uuid.
"""
if not isinstance(uuid, Uuid):
uuid = Uuid(uuid)
return _uuid_to_DiskItem.get(uuid)
#----------------------------------------------------------------------------
[docs]class HierarchyDirectoryType(FileType):
"""
This type represents a directory which represents data in Brainvisa ontology.
It is associated to the format :py:data:`directoryFormat`.
"""
def __init__(self, typeName, parent=None, **kwargs):
FileType.__init__(self, typeName, parent, directoryFormat, **kwargs)
#----------------------------------------------------------------------------
typesLastModification = 0
# mef is global to handle multiple call to readTypes since allready read
# file types are stored in it to prevent multiple loads which cause troubles.
[docs]class TypesMEF(MultipleExecfile):
"""
This class enables to read Brainvisa ontology files which contain the definition of formats and types.
A global instance of this object :py:data:`mef` is used to load types and formats at Brainvisa startup.
The Brainvisa types files will define new formats with :py:class:`Format`, new types with :py:class:`FileType`
and new format lists with :py:func:`createFormatList`.
With the :py:attr:`brainvisa.multipleExecfile.MultipleExecfile.localDict`, callback methods are associated to the functions that will be called in the types files:
:py:meth:`create_format`, :py:meth:`create_format_list`, :py:meth:`create_type`.
"""
def __init__(self):
super(TypesMEF, self).__init__()
self.localDict['Format'] = self.create_format
self.localDict['createFormatList'] = self.create_format_list
self.localDict['changeToFormatSeries'] = self.create_format_serie
self.localDict['FileType'] = self.create_type
self.localDict['HierarchyDirectoryType'] = self.create_hie_dir_type
[docs] def create_type(self, *args, **kwargs):
"""
This method is called when a new type is created in one of the executed files.
The *toolbox*, *module* and fileName attributes of the new :py:class:`FileType` are set.
:param args: The arguments will be passed to the constructor of :py:class:`FileType`.
:returns: The new type.
"""
type = FileType(*args, **kwargs)
toolbox, module = self.currentToolbox()
type.toolbox = toolbox
type.module = module
type.fileName = self.localDict["__name__"]
return type
[docs] def create_hie_dir_type(self, *args, **kwargs):
"""
This method is called when a new type is created in one of the executed files.
The *toolbox*, *module* and fileName attributes of the new :py:class:`HierarchyDirectoryType` are set.
:param args: The arguments will be passed to the constructor of :py:class:`HierarchyDirectoryType`.
:returns: The new type.
"""
hdtype = HierarchyDirectoryType(*args, **kwargs)
toolbox, module = self.currentToolbox()
hdtype.toolbox = toolbox
hdtype.module = module
hdtype.fileName = self.localDict["__name__"]
return hdtype
mef = TypesMEF()
mef.fileExtensions.append('.py')
[docs]def readTypes():
"""
This function loads types and formats by reading Brainvisa ontology types files.
"""
global typesLastModification
global mef
mef.includePath.update(neuroConfig.typesPath)
try:
files = shelltools.filesFromShPatterns(
*[os.path.join(path, '*.py') for path in neuroConfig.typesPath])
files.sort()
exc = mef.execute(continue_on_error=True, *files)
if exc:
for e in exc:
try:
six.reraise(e[0], e[1], e[2])
except Exception:
showException(
beforeError="Error while reading types file: ")
typesLastModification = max(
(os.stat(f).st_mtime for f in mef.executedFiles()))
except Exception:
showException()
[docs]def reloadTypes():
"""
This function reinitializes the global variables used to store the types and formats before calling :py:func:`readTypes`.
"""
global formats
global formatLists
global diskItemTypes
global typesLastModification
global mef
global directoryFormat
global fileFormat
Format._reloaded_formats = {}
DiskItemType._reloaded_types = {}
formatLists = {}
diskItemTypes = {}
typesLastModification = 0
mef = TypesMEF()
mef.fileExtensions.append('.py')
directoryFormat = Format('Directory', 'd|*', ignoreExclusive=1)
fileFormat = Format('File', 'f|*', ignoreExclusive=1)
readTypes()
def pruneObsoleteTypes():
global formats
for format in list(formats.keys()):
if format not in Format._reloaded_formats:
del formats[format]
Format._reloaded_formats = {}
global diskItemTypes
for type in list(diskItemTypes.keys()):
if type not in DiskItemType._reloaded_types:
del diskItemTypes[type]
DiskItemType._reloaded_types = {}
#----------------------------------------------------------------------------
try:
from soma import aims
# Fix libxml2 multithreaded application issue by initializing parser from
# the main thread
aims.xmlInitParser()
_finder = aims.Finder()
# don't resolve symlinks if file browser to be consistent with
# all DiskItem namings
try:
aims.setQtResolveSymlinks(False)
except Exception:
pass
except Exception:
_finder = None
#----------------------------------------------------------------------------
[docs]def aimsFileInfo(fileName):
"""
Reads the header of the file *fileName* and returns its attributes as a dictionary.
"""
from brainvisa.processes import defaultContext
from brainvisa.processes import mainThreadActions
global _finder
result = {}
if fileName.endswith('.ima.gz') or fileName.endswith('.dim.gz') or fileName.endswith('.ima.Z') or fileName.endswith('.dim.Z'):
context = defaultContext()
tmp = context.temporary('GIS image')
context.runProcess('uncompressGIS', fileName, tmp, False)
fileName = tmp.fullPath()
try:
try:
import numpy
nan = numpy.nan
except ImportError:
nan = None
if _finder is not None:
finder = aims.Finder()
fileName = six.ensure_str(fileName,
sys.getfilesystemencoding()
or sys.getdefaultencoding())
# Finder is not thread-safe (yet)
if mainThreadActions().call(finder.check, fileName):
result = eval(str(finder.header()), locals())
else:
if neuroConfig.platform == 'windows':
f = os.popen('AimsFileInfo -i "' + fileName + '"', 'r')
else:
f = os.popen(
'AimsFileInfo -i "' + fileName + '" 2> /dev/null', 'r')
s = f.readline()
while s and s != 'attributes = {\n':
s = f.readline()
s = s[13:-1] + f.read()
result = eval(s, locals())
f.close()
except Exception:
pass
return result