# -*- coding: utf-8 -*-
# This software and supporting documentation are distributed by
# Institut Federatif de Recherche 49
# CEA/NeuroSpin, Batiment 145,
# 91191 Gif-sur-Yvette cedex
# France
#
# This software is governed by the CeCILL license version 2 under
# French law and abiding by the rules of distribution of free software.
# You can use, modify and/or redistribute the software under the
# terms of the CeCILL license version 2 as circulated by CEA, CNRS
# and INRIA at the following URL "http://www.cecill.info".
#
# As a counterpart to the access to the source code and rights to copy,
# modify and redistribute granted by the license, users are provided only
# with a limited warranty and the software's author, the holder of the
# economic rights, and the successive licensors have only limited
# liability.
#
# In this respect, the user's attention is drawn to the risks associated
# with loading, using, modifying and/or developing or reproducing the
# software by the user in light of its specific status of free software,
# that may mean that it is complicated to manipulate, and that also
# therefore means that it is reserved for developers and experienced
# professionals having in-depth computer knowledge. Users are therefore
# encouraged to load and test the software's suitability as regards their
# requirements in conditions enabling the security of their systems and/or
# data to be ensured and, more generally, to use and operate it in the
# same conditions as regards security.
#
# The fact that you are presently reading this means that you have had
# knowledge of the CeCILL license version 2 and that you accept its terms.
from __future__ import absolute_import
import os
import time
import shutil
from glob import glob
import weakref
import types
from gzip import open as gzipOpen
import threading
from soma.minf.api import writeMinf
from soma.uuid import Uuid
from soma.translation import translate as _
from soma.undefined import Undefined
from soma.minf.api import readMinf
from soma import safemkdir
from brainvisa.processing import neuroLog
from brainvisa.configuration import neuroConfig
from brainvisa.data import neuroHierarchy
from brainvisa.processing import neuroException
from brainvisa.data.writediskitem import WriteDiskItem
from brainvisa.data.neuroDiskItems import DiskItem
from brainvisa.processes import defaultContext
import six
import sys
minfHistory = 'brainvisa-history_2.0'
_sessionIDs = {}
_sessionsLock = threading.RLock()
[docs]def sessionId(database):
'''
Manage an id / database in order to give an id for each bvsession.
'''
global _sessionIDs
global _sessionsLock
#_sessionsLock.lock()
if database is None:
# return Uuid()
return neuroConfig.sessionID
_sessionsLock.acquire()
try:
idDb = _sessionIDs.get(database, None)
if idDb is None:
idDb = Uuid()
_sessionIDs[database] = idDb
return idDb
finally:
# _sessionsLock.acquire()
_sessionsLock.release()
[docs]class HistoryBook(object):
'''
An L{HistoryBook} contains some L{HistoricalEvent}.
'''
_allBooks = weakref.WeakValueDictionary()
def __new__(cls, directory=None, database=None, dirBvsession=None, compression=False):
book = HistoryBook._allBooks.get(directory)
if book is None:
book = object.__new__(cls)
HistoryBook._allBooks[directory] = book
return book
def __init__(self, directory=None, database=None, dirBvsession=None, compression=False):
if hasattr(self, '_HistoryBook__dir'):
# self has already been created but __init__ is always
# called after __new__
return
if dirBvsession is None:
dirBvsession = os.path.join(directory, 'bvsession')
self.uuid = Uuid()
self.__compression = compression
if not os.path.isdir(directory):
safemkdir.makedirs(directory)
self.__dir = directory
self.__database = database
if not os.path.isdir(dirBvsession):
safemkdir.makedirs(dirBvsession)
self.__dirBvsession = dirBvsession
def storeEvent(self, event, compression=None, storeBvproc=False):
if isinstance(event, ProcessExecutionEvent):
# Store an event corresponding to current BrainVISA session
# if it is not already done.
bvsessionEvent = self.findEvent(event.content['bvsession'], None)
if bvsessionEvent is None:
bvsessionEvent = BrainVISASessionEvent()
if self.__database is not None:
bvsessionEvent.setCurrentBrainVISASession(
sessionId(self.__database.name))
else: # no database
bvsessionEvent.setCurrentBrainVISASession(
sessionId(self.__dirBvsession))
# bvsessionEvent.setCurrentBrainVISASession( sessionId(
# None ) )
self.storeEvent(bvsessionEvent)
if compression is None:
compression = self.__compression
if event.eventType == "bvsession":
eventFileName = os.path.join(
self.__dirBvsession, str(event.uuid) + '.' + event.eventType)
# eventFileName = os.path.join( self.__dirBvsession, str(
# sessionId( self.__database.name ) ) + '.' + event.eventType )
elif event.eventType == "bvproc":
timeNameDirectory = time.strftime('%Y-%m-%d', time.localtime())
eventDirectory = os.path.join(self.__dir, timeNameDirectory)
if not os.path.exists(eventDirectory):
safemkdir.mkdir(eventDirectory)
eventFileName = os.path.join(
eventDirectory, str(event.uuid) + '.' + event.eventType)
event.save(eventFileName, compression, storeBvproc)
def findEvent(self, uuid, default=Undefined):
try:
fileName = self._findEventFileName(uuid)
except ValueError:
if default is Undefined:
raise
return default
return readMinf(fileName)[0]
def _findEventFileName(self, uuid):
eventFilePattern = os.path.join(self.__dir, str(uuid) + '.*')
l = glob(eventFilePattern)
if l:
return l[0]
raise ValueError(_('History book %(book)s does not contain event %(uuid)s') %
{'book': six.text_type(self), 'uuid': str(uuid)})
def removeEvent(self, uuid):
os.remove(self._findEventFileName(uuid))
@staticmethod
def getHistoryBookDirectories(item):
if item is None:
return (None, None, None)
historyBook = None
db = None
if hasattr(neuroConfig, 'historyBookDirectory'): # used for distributed executions
historyBook = neuroConfig.historyBookDirectory
dirBvsession = None
if not historyBook:
database = item.getHierarchy('_database')
if database:
db = neuroHierarchy.databases.database(database)
if db is not None and db.activate_history:
historyBook = os.path.join(database, 'history_book')
# ini the bvSession Directory
di = WriteDiskItem('Bvsession', 'Directory')
dirBvsession = str(di.findValue({'_database': database}))
sessionId(database)
if isinstance(historyBook, six.string_types):
historyBook = [historyBook]
return (historyBook, db, dirBvsession)
@staticmethod
def storeProcessStart(executionContext, process):
historyBooksContext = {}
for parameterized, attribute, type in process.getAllParameters():
if isinstance(type, WriteDiskItem):
item = getattr(parameterized, attribute)
historyBooks, db, dirBvsession = HistoryBook.getHistoryBookDirectories(
item)
if historyBooks:
for historyBook in historyBooks:
# event = None
if not os.path.exists(historyBook):
safemkdir.mkdir(historyBook)
historyBook = HistoryBook(
historyBook, db, dirBvsession, compression=True)
dHistoryBook = {}
historyBooksContext.setdefault(historyBook, dHistoryBook)[
item.fullPath()] = (item, item.modificationHash())
event = None
if historyBooksContext:
for book in six.iterkeys(historyBooksContext):
event = executionContext.createProcessExecutionEvent()
if book.__database is not None:
event.setBvsession(sessionId(book.__database.name))
else: # no database, use the dirBvsession instead of the database name
# self.__dirBvsession
# event.setBvsession(sessionId( None ) )
event.setBvsession(sessionId(book.__dirBvsession))
dirBook = historyBooksContext.get(book).copy()
dirBook["processExcutionEvent"] = event
historyBooksContext[book] = dirBook
book.storeEvent(event)
event._logItem = executionContext._lastStartProcessLogItem
return event, historyBooksContext
@staticmethod
def storeProcessFinished(executionContext, process, event, historyBooksContext):
for book, items in six.iteritems(historyBooksContext):
historyBooksContext[book].get('processExcutionEvent').setLog(
historyBooksContext[book].get('processExcutionEvent')._logItem)
changedItems = []
g = six.itervalues(items)
for i in g:
if not isinstance(i, ProcessExecutionEvent):
val = [j for j in i]
if val[1] != val[0].modificationHash():
changedItems.append(val[0])
historyBooksContext[book].get('processExcutionEvent').content[
'modified_data'] = [six.text_type(item) for item in changedItems]
book.storeEvent(historyBooksContext[book].get(
'processExcutionEvent'), storeBvproc=True)
# update the the lastHistoricalEvent of each diskitems
for item in changedItems:
# need to test if the path of the item is compatible with the book (coming from the same database) otherwise
# we could set a wrong uuid for lasthistoricalEvent
head, tail = [os.path.normcase(x)
for x in os.path.split(book.__dir)]
listToCompare = [head, os.path.normcase(str(item))]
compare = os.path.commonprefix(listToCompare)
if (tail == "history_book" and compare == head):
try:
item.setMinf('lastHistoricalEvent', historyBooksContext[
book].get('processExcutionEvent').uuid)
except Exception:
neuroException.showException()
[docs]class HistoricalEvent(object):
"""
"""
def __init__(self, uuid=None):
if uuid is None:
uuid = Uuid()
self.uuid = uuid
def save(self, eventFileName, compression=False, storeBvproc=False):
close = True
writeMinFile = False
bvProcDiskItem = None
if not hasattr(eventFileName, 'write'):
if compression:
eventFile = gzipOpen(eventFileName, mode='w')
else:
eventFile = open(eventFileName, mode='w')
else:
eventFile = eventFileName
close = False
writeMinf(eventFile, (self, ), reducer=minfHistory)
if self.eventType == 'bvproc':
if storeBvproc:
# move the bvproc if the time is new
timeNameDirectory = time.strftime(
'%Y-%m-%d', time.localtime())
s = os.stat(eventFileName)
fileDate = time.strftime(
'%Y-%m-%d', time.localtime(s.st_mtime))
if fileDate != timeNameDirectory:
eventDirectory = os.path.join(
self.__dir, timeNameDirectory)
if not os.path.exists(eventDirectory):
safemkdir.mkdir(eventDirectory)
newFile = os.path.join(
eventDirectory, os.path.basename(eventFileName))
shutil.move(eventFileName, newFile)
bvProcDiskItem = WriteDiskItem(
'Process execution event', 'Process execution event').findValue(eventFileName)
else:
bvProcDiskItem = WriteDiskItem(
'Process execution event', 'Process execution event').findValue(eventFileName)
elif self.eventType == 'bvsession':
bvProcDiskItem = WriteDiskItem(
'BrainVISA session event', 'BrainVISA session event').findValue(eventFileName)
if bvProcDiskItem is not None:
minf = {}
minf['uuid'] = self.uuid
bvProcDiskItem.saveMinf(minf)
database = bvProcDiskItem.getHierarchy('_database')
if database:
db = neuroHierarchy.databases.database(database)
db.insertDiskItem(bvProcDiskItem, update=True)
# else :
# defaultContext().warning("No diskitem found for BrainVISA session event
# or Process execution event")
if close:
eventFile.close()
[docs]class ProcessExecutionEvent(HistoricalEvent):
"""
This object enables to store the state of a :py:class:`Process` instance in a dictionary format.
"""
eventType = 'bvproc'
def __init__(self, uuid=None, content={}):
HistoricalEvent.__init__(self, uuid)
self.content = {}
self.content.update(content)
def __getinitargs__(self):
return (self.uuid, self.content)
def setBvsession(self, uuid):
self.content['bvsession'] = uuid
def setProcess(self, process):
process.saveStateInDictionary(self.content)
def setWindow(self, processView):
self.content['window'] = {
'position': [processView.x(), processView.y()],
'size': [processView.width(), processView.height()],
#'state': processView.windowState(),
}
def setLog(self, log):
if log:
if isinstance(log, neuroLog.LogFile.Item):
log._expand({})
self.content['log'] = [log]
else:
self.content['log'] = list(
neuroLog.expandedReader(log.fileName))
def __str__(self):
if self.content.get('id', None):
return 'bvproc<' + str(self.uuid) + ',' + self.content['id'] + '>'
else:
return str(self.content)
class BrainVISASessionEvent(HistoricalEvent):
eventType = 'bvsession'
# def __init__( self, uuid=None, content={}, database):
def __init__(self, uuid=None, content={}):
HistoricalEvent.__init__(self, uuid)
self.content = content.copy()
def __getinitargs__(self):
return (self.uuid, self.content)
def setCurrentBrainVISASession(self, uuidDb):
self.content['version'] = neuroConfig.versionString()
self.uuid = uuidDb
# if uuidDb is not None:
# self.uuid = uuidDb
# else:
# self.uuid = Uuid()
# self.uuid = neuroConfig.sessionID
if neuroConfig.brainvisaSessionLogItem:
neuroConfig.brainvisaSessionLogItem._expand({})
self.content['log'] = [neuroConfig.brainvisaSessionLogItem]
def __str__(self):
return 'bvsession<' + str(self.uuid) + '>'