# -*- coding: utf-8 -*-
"""
This module enables to make a function to be executed in qt thread (main thread).
It is useful when you want to call qt functions from another thread.
It enables to do thread safe calls because all tasks sent are executed in the same thread (qt main thread).
* author: Dominique Geffroy
* organization: NeuroSpin
* license: `CeCILL B <http://www.cecill.info/licences/Licence_CeCILL-B_V1-en.html>`_
"""
__docformat__ = "restructuredtext en"
import atexit
import sys
import threading
import six
from soma.qt_gui.qt_backend.QtCore import QObject, QTimer, QEvent, QCoreApplication
from soma import singleton
[docs]
class FakeQtThreadCall(QObject):
'''
Fake QtThreadCall that behave as if always used from main thread.
'''
def isInMainThread():
return True
isInMainThread = staticmethod(isInMainThread)
def push(function, *args, **kwargs):
if kwargs is None or not kwargs:
function(*args)
else:
function(*args, **kwargs)
push = staticmethod(push)
def call(function, *args, **kwargs):
if kwargs is None or not kwargs:
return function(*args)
else:
return function(*args, **kwargs)
call = staticmethod(call)
# QThreadCall used to be a class deriving from Singleton
# and QObject. This led to very strange bugs that where
# related to Qt but too difficult to understand. Finally,
# the class was renamed to _QThreadCall and is no more a
# Singleton. For backward compatibility with existing code,
# QThreadCall is now a function that manages the singleton
# creation.
qt_thread_call_instance = None
def QtThreadCall():
global qt_thread_call_instance
if not qt_thread_call_instance:
qt_thread_call_instance = _QtThreadCall()
return qt_thread_call_instance
class _QtThreadCall(QObject):
"""
This object enables to send tasks to be executed by qt thread (main
thread).
This object must be initialized in the qt thread (normally the main
thread).
It starts a QTimer and periodically execute tasks in its actions list.
Attributes
----------
lock: RLock
lock to prevent concurrent access to actions list
actions: list
tasks to execute
mainThread: Thread
current thread at object initialisation
timer: QTimer
timer to wake this object periodically
"""
def __init__(self):
super().__init__()
self.lock = threading.RLock()
self.actions = []
# look for the main thread
mainthreadfound = False
for thread in threading.enumerate():
if isinstance(thread, threading._MainThread):
mainthreadfound = True
self.mainThread = thread
break
if not mainthreadfound:
print('Warning: main thread not found')
self.mainThread = threading.current_thread()
def _postEvent(self):
class QtThreadCallEvent(QEvent):
def __init__(self, qthreadcall):
QEvent.__init__(self, QEvent.Type(QEvent.User + 24))
self.qthreadcall = qthreadcall
QCoreApplication.instance().postEvent(self,
QtThreadCallEvent(self))
def event(self, e):
sys.stdout.flush()
if e.type() == QEvent.User + 24:
self.doAction()
return True
return QObject.event(self, e)
def push(self, function, *args, **kwargs):
"""
Add a function call to the actions list. the call is executed immediately if current thread is main thread. Otherwise it will be executed some time in the main thread (asynchronously to the current thread). The function return value is ignored and will be lost.
Parameters
----------
function: function
the function to call in main thread.
"""
if self.isInMainThread():
if kwargs is None or len(kwargs) == 0:
function(*args)
else:
function(*args, **kwargs)
else:
self.lock.acquire()
try:
self.actions.append((function, args, kwargs))
self._postEvent()
finally:
self.lock.release()
def call(self, function, *args, **kwargs):
"""
Send the function call to be executed in the qt main thread and wait for the result. The result will be returned to the calling thread.
.. warning::
If returned objects are thread-depenendent (typically, Qt widgets),
they must be destroyed within the thread which created them, namely
the main thread. The :class:`MainThreadLife` object wrapper may be
helpful for this.
Parameters
----------
function: function
the function to call in main thread.
Returns
-------
function call result
"""
if self.isInMainThread():
if kwargs is None or len(kwargs) == 0:
return function(*args)
return function(*args, **kwargs)
else:
semaphore = threading.Semaphore(0)
self.lock.acquire()
try:
self.actions.append(
(self._callAndWakeUp, (semaphore, function, args, kwargs), {}))
self._postEvent()
finally:
self.lock.release()
semaphore.acquire()
# block until semaphore is released in
# _callAndWakeUp method
result = semaphore._mainThreadActionResult
exception = semaphore._mainThreadActionException
if exception is not None:
six.reraise(*exception)
return result
def _callAndWakeUp(self, semaphore, function, args, kwargs):
"""
Call the function, set the result in semaphore attributes and release the semaphore.
Parameters
----------
semaphore: threading.Semaphore
thread which has added this task is blocked on this semaphore. function call's result will be kept in this semaphore attributes.
function: function
the function to call in main thread.
"""
semaphore._mainThreadActionResult = None
semaphore._mainThreadActionException = None
try:
if kwargs is None or len(kwargs) == 0:
semaphore._mainThreadActionResult = function(*args)
else:
semaphore._mainThreadActionResult = function(*args, **kwargs)
except: # noqa: E722
semaphore._mainThreadActionException = sys.exc_info()
semaphore.release()
# release the semaphore to unblock the thread which
# waits for the function call's result
def isInMainThread(self):
"""
Returns
-------
boolean: True if the current thread is the main thread
"""
# return threading.currentThread().getName() == 'MainThread'
return threading.current_thread() is self.mainThread
def doAction(self):
"""
This method is called each time the timer timeout.
It executes all functions in actions list.
"""
self.lock.acquire()
try:
actions = self.actions
# print("actions to do", self.actions)
self.actions = []
finally:
self.lock.release()
for (function, args, kwargs) in actions:
try:
if kwargs is None or len(kwargs) == 0:
function(*args)
else:
function(*args, **kwargs)
except: # noqa: E722
# Should call a customizable function here
raise
[docs]
class MainThreadLife(object):
'''This wrapper class ensures the contained object is deleted in the main
thread, and not in the current non-GUI thread. The principle is the
following:
* acquire a lock
* pass the object to something in the main thread
* the main thread waits on the lock while holding a reference on the object
* we delete the object in the calling thread
* the lock is releasd from the calling thread
* now the main thread can go on, and del / release the ref on the object:
it is the last ref on it, so it is actually deleted there.
.. warning::
The thing is only working if no other reference is held anywhere on the underlying object, otherwise we do not control its deletion.
Ex:
::
# from a secondary thread
widget = QtThreadCall().call(Qt.QWidget)
# widget.show() # should crash
QtThreadCall().push(widget.show)
# ... use it ...
# del widget # should crash
::
# from a secondary thread
widget = MainThreadLife(QtThreadCall().call(Qt.QWidget))
QtThreadCall().push(widget.ref().show)
# ... use it ...
del widget # OK
'''
def __init__(self, obj_life=None, *args, **kwargs):
super(MainThreadLife, self).__init__(*args, **kwargs)
if obj_life is not None:
self._obj_life = obj_life
def __del__(self):
if (not isinstance(threading.current_thread(), threading._MainThread)
and hasattr(self, '_obj_life')):
lock = threading.Lock()
lock.acquire()
QtThreadCall().push(MainThreadLife.delInMainThread, lock,
self._obj_life)
del self._obj_life
lock.release()
[docs]
def ref(self):
'''Access the underlying object'''
return self._obj_life
@staticmethod
def delInMainThread(lock, thing):
# wait for the lock to be released in the process thread
lock.acquire()
lock.release()
# now the process thread should have removed its reference on thing:
# we can safely delete it from here, in the main thread.
del thing # probably useless