Source code for brainvisa.data.actions

# -*- coding: utf-8 -*-
#  This software and supporting documentation are distributed by
#      Institut Federatif de Recherche 49
#      CEA/NeuroSpin, Batiment 145,
#      91191 Gif-sur-Yvette cedex
#      France
#
# This software is governed by the CeCILL license version 2 under
# French law and abiding by the rules of distribution of free software.
# You can  use, modify and/or redistribute the software under the
# terms of the CeCILL license version 2 as circulated by CEA, CNRS
# and INRIA at the following URL "http://www.cecill.info".
#
# As a counterpart to the access to the source code and  rights to copy,
# modify and redistribute granted by the license, users are provided only
# with a limited warranty  and the software's author,  the holder of the
# economic rights,  and the successive licensors  have only  limited
# liability.
#
# In this respect, the user's attention is drawn to the risks associated
# with loading,  using,  modifying and/or developing or reproducing the
# software by the user in light of its specific status of free software,
# that may mean  that it is complicated to manipulate,  and  that  also
# therefore means  that it is reserved for developers  and  experienced
# professionals having in-depth computer knowledge. Users are therefore
# encouraged to load and test the software's suitability as regards their
# requirements in conditions enabling the security of their systems and/or
# data to be ensured and,  more generally, to use and operate it in the
# same conditions as regards security.
#
# The fact that you are presently reading this means that you have had
# knowledge of the CeCILL license version 2 and that you accept its terms.

from __future__ import print_function
from __future__ import absolute_import
import os
import shutil
import re
import brainvisa.processes
from brainvisa import registration
from brainvisa.data import neuroHierarchy
import six

#


[docs]class FileProcess(object): """ Represents a process to execute on a file. @type file: string @ivar file: the file to process @type action: Action @ivar action: the action to do to process the file @type selected: boolean @ivar selected: if the process is selected, the action will be executed by process method of a DBProcessor. @type pattern: regexp @ivar pattern: regular expression that describes the files to process in the directory self.file @type files: list of string @ivar files: if pattern is not None, files corresponding to that pattern that have been treated by doit method. useful for undoCmd. """ def __init__(self, file, action, pattern=None, diskItem=None): self.file = file self.diskItem = diskItem self.pattern = pattern if pattern is not None: self.pattern = re.compile(pattern) self.action = action if self.action: self.selected = True else: self.selected = False def filePattern(self): if self.pattern is not None: return os.path.join(self.file, self.pattern.pattern) else: return self.file
[docs] def doit(self, debug=False, context=None): """ Executes the action on the file. """ self.files = [] ret = True if self.pattern is not None: if os.path.isdir(self.file): content = os.listdir(self.file) self.action.initialize() # must get directory content before # intialize because initialize may # create directories that must not be # taken into acount for f in content: if self.pattern.match(f): # if debug: # print("-- ", self.action.tooltip, str(self.action), " # pattern:"+self.pattern.pattern+" [ ", # os.path.join(self.file, f)," ]") if self.action.doit(os.path.join(self.file, f), debug=debug, context=context): self.files.append(f) elif self.diskItem: # a diskitem can represent several files self.action.initialize() if isinstance(self.action, CallProcess) or isinstance(self.action, SetTransformationInfo): # a process can use directly the diskitem, no use to call it for every file # if debug: # print("-- ", self.action.tooltip, str(self.action), " [ # ", self.file," ]") if self.action.doit(self.file, debug=debug, context=context): self.files.append(self.file) else: for f in self.diskItem.existingFiles(): # if debug: # print("-- ", self.action.tooltip, str(self.action), " # [ ", f," ]") if self.action.doit(f, debug=debug, context=context): self.files.append(f) else: self.action.initialize() # if debug: # print("-- ", self.action.tooltip, str(self.action), " [ ", # self.file," ]") if not self.action.doit(self.file, debug=debug, context=context): self.file = None
[docs] def undoCmd(self): """ Gets the command to undo the action. @rtype : string @return: the undo command """ cmd = "" if self.pattern is not None: for f in self.files: cmd += self.action.undoCmd(os.path.join(self.file, f)) elif self.diskItem: for f in self.files: cmd += self.action.undoCmd(f) elif self.file is not None: cmd = self.action.undoCmd(self.file) cmd += self.action.finalizeUndo() return cmd
def __getattr__(self, name): """ Called when trying to access to name attribute, which is not defined. Used to give a value to centralRef attribute first time it is accessed. """ if name == "tooltip": return self.action.tooltip else: raise AttributeError def __str__(self): return self.action.__str__()
# # classes to represent actions to do in order to convert a database class Action(object): tooltip = "Action" icon = "run.png" def __init__(self): pass def initialize(self): pass def doit(self, file, debug=False, context=None): return True def undoCmd(self, file): return "" def finalizeUndo(self): return "" def __str__(self): return "" #
[docs]class Move(Action): """ Action to move src item to dest item. items can be files or directories. If intermediate directories in dest don't exist they are created. Paths must be explicite, * cannot be used instead of file name. It is possible to rename at the same time : Examples : Move(destDir).doit(srcDir/item) : srcDir/item -> destDir/item (not renamed) Move(destDir, None, newItem).doit(srcDir/item) : srcDir/item -> destDir/newItem (renamed with fixed name) Move(destDir, ".*", "\0-suffix").doit(srcDir/item) : srcDir/item -> destDir/item-suffix (renamed with a name related to src name) """ tooltip = "Move" icon = "move.png" def __init__(self, dest, patternSrc=None, patternDest=None): """ @type dest: string @param dest: directory where to move src @type patternSrc: string @param patternSrc: regular expression that will be matched on files to move @type patternDest: string @param patternDest: regular expression to expand the match object found on file to move in order to get the new name """ super(Move, self).__init__() self.dest = dest self.newDirs = [] self.patternSrc = patternSrc self.patternDest = patternDest if patternSrc: self.patternSrc = re.compile(patternSrc) def initialize(self): # create directories to prepare the move dest = os.path.normpath(self.dest) # store intermediate created directories to undo while dest and not os.path.exists(dest): self.newDirs.append(dest) dest = os.path.dirname(dest) it = reversed(self.newDirs) for d in it: os.mkdir(d) def doit(self, src, debug=False, context=None): # print(str(self)) ret = True if self.patternSrc: match = self.patternSrc.match(os.path.basename(src)) dest = os.path.join(self.dest, match.expand(self.patternDest)) elif self.patternDest: dest = os.path.join(self.dest, self.patternDest) else: dest = os.path.join(self.dest, os.path.basename(src)) if debug: if context is None: context = brainvisa.processes.defaultContext() context.write("-- ", self.tooltip, src, " -> ", dest) # print("move", src, "->", dest) # exception for graphs : must use AimsGraphConvert to copy .data with # the graph if src[-4:] == ".arg": os.system("AimsGraphConvert -i '" + src + "' -o '" + dest + "'") os.remove(src) else: if os.path.exists(dest): context.write( "!Warning MOVE : destination ", dest, " already exists ; move cancelled.") ret = False # if os.path.isdir(src): # shutil.rmtree(src) # else: # os.remove(src) else: shutil.move(src, dest) return ret def undoCmd(self, src): if self.patternSrc: match = self.patternSrc.match(os.path.basename(src)) dest = os.path.join(self.dest, match.expand(self.patternDest)) elif self.patternDest: dest = os.path.join(self.dest, self.patternDest) else: dest = os.path.join(self.dest, os.path.basename(src)) if dest[-4:] == ".arg": undo = "os.system(\"AimsGraphConvert -i '" + dest + \ "' -o '" + src + "'\")\n" + \ "os.remove('" + dest + "')\n" else: undo = "if not os.path.exists('" + dest + "'): print('!Warning MOVE : source " + dest + " do not exist, not moved to " + src + ".')\n" +\ "else: shutil.move('" + dest + "','" + src + "')\n" return undo def finalizeUndo(self): undo = "" for newDir in self.newDirs: undo += "shutil.rmtree('" + newDir + "')\n" self.newDirs = [] return undo def __str__(self): if self.patternDest: return os.path.join(self.dest, self.patternDest) return self.dest
#
[docs]class Remove(Move): tooltip = "Remove" icon = "remove.png" """ This action doesn't really remove the file, it is moved to trash directory. """ def __init__(self, srcDir=""): """ src: directory in trash where to put files to remove """ super(Remove, self).__init__(os.path.join("trash", srcDir))
# print("Remove ", src, " in ", self.dest) # # terminer de remplacer avec FileProcess pour pouvoir remplacer une action # par une autre... class Mkdir(Action): tooltip = "Create directory" icon = "folder_new.png" def __init__(self): super(Mkdir, self).__init__() def doit(self, newDir, debug=False, context=None): if debug: if context is None: context = brainvisa.processes.defaultContext() context.write("-- ", self.tooltip, newDir) # print(str(self)) os.mkdir(newDir) return True def undoCmd(self, newDir): return"shutil.rmtree('" + newDir + "')" # class CallProcess(Action): tooltip = "Call process" def __init__(self, processName, *args, **kwargs): self.processName = processName self.args = args self.kwargs = kwargs def doit(self, file, debug=False, context=None, *args, **kwargs): if self.args: args = self.args if self.kwargs: kwargs = self.kwargs if debug: if context is None: context = brainvisa.processes.defaultContext() context.write("") context.write("-- ", str(self), args, kwargs) if context is None: brainvisa.processes.defaultContext() try: context.runProcess(self.processName, *args, **kwargs) except Exception as e: context.warning( "Error while executing " + self.processName + " : " + six.text_type(e)) return True def __str__(self): return self.processName # class ImportData(CallProcess): tooltip = "Import data" def __init__(self, item, dest=None): """ @type item: DiskItem @param item: the data to import in the database @type dest: WriteDiskItem @param dest: where the data must be copied in the database """ super(ImportData, self).__init__("GeneralImport") self.src = item self.dest = dest def initialize(self): self.newDir = None if not os.path.exists("trash"): os.mkdir("trash") self.newDir = "trash" def doit(self, file, debug=False, context=None): # creates dest files from src files, deletes src files self.kwargs = {'input': self.src.fullPath(), 'output': self.dest} if debug: if context is None: context = brainvisa.processes.defaultContext() context.write("-- ", self.tooltip, self.kwargs) super(ImportData, self).doit(None, debug, context) for f in self.src.existingFiles(): shutil.move(f, "trash") return True def undoCmd(self, *args): cmd = "" # delete dest files for f in self.dest.fullPaths(): cmd += "shutil.move('" + f + "', 'trash')\n" destMinf = self.dest.minfFileName() cmd += "if os.path.exists('" + destMinf + \ "'): shutil.move('" + \ destMinf + "', 'trash')\n" # restore src files for f in self.src.fullPaths(): cmd += "shutil.move(os.path.join('trash', os.path.basename('" + f + "')), '" + \ f + "')\n" srcMinf = self.src.minfFileName() trashSrcMinf = os.path.join('trash', os.path.basename(srcMinf)) cmd += "if os.path.exists('" + trashSrcMinf + \ "'): shutil.move('" + \ trashSrcMinf + \ "', '" + \ srcMinf + "')\n" return cmd def finalizeUndo(self): undo = "" if self.newDir is not None: undo += "shutil.rmtree('" + self.newDir + "')\n" self.newDir = None return undo def __str__(self): return six.text_type(self.dest.relativePath()) # class SetTransformationInfo(Action): def __init__(self, transformation, sourceRef, destRef): self.transformation = transformation self.sourceRef = sourceRef self.destRef = destRef def doit(self, file, debug=False, context=None): if debug: if context is None: context = brainvisa.processes.defaultContext() context.write("") context.write("-- ", str(self), self.transformation, " : ") context.write(self.sourceRef, " -> ", self.destRef) tm = registration.getTransformationManager() if not self.sourceRef.isReadable(): tm.createNewReferential(self.sourceRef) if not self.destRef.isReadable(): tm.createNewReferential(self.destRef) tm.setNewTransformationInfo( self.transformation, source_referential=self.sourceRef, destination_referential=self.destRef) return True def __str__(self): return _t_("set transformation info")