# -*- coding: utf-8 -*-
'''
A Pipeline structure viewer widget, which displays pipeline nodes as boxes and links as lines, and provides pipelin editor features.
The only main class you should look at is the :class:`PipelineDevelopperView` widget, the remaining is internal infrastructure::
pv = PipelineDevelopperView(pipeline, allow_open_controller=True,
enable_edition=True,show_sub_pipelines=True)
pv.show()
Classes
=======
:class:`ColorType`
------------------
:class:`Plug`
-------------
:class:`EmbeddedSubPipelineItem`
--------------------------------
:class:`boxItem`
----------------
:class:`NodeGWidget`
--------------------
:class:`HandleItem`
-------------------
:class:`Link`
-------------
:class:`PipelineScene`
----------------------
:class:`PipelineDevelopperView`
-------------------------------
'''
from __future__ import print_function
from __future__ import absolute_import
# System import
import os
from pprint import pprint
import weakref
import tempfile
import soma.subprocess
import distutils.spawn
import importlib
import sys
import types
import inspect
import six
import json
import io
# Capsul import
from soma.qt_gui.qt_backend import QtCore, QtGui, Qt
from soma.qt_gui.qt_backend.Qt import QMessageBox
from soma.sorted_dictionary import SortedDictionary
from capsul.api import Switch, PipelineNode, OptionalOutputSwitch
from capsul.pipeline import pipeline_tools
from capsul.api import Pipeline
from capsul.api import Process
from capsul.api import get_process_instance
from capsul.pipeline.pipeline_nodes import Node, ProcessNode
from soma.qt_gui.qt_backend.Qt import QGraphicsView
from capsul.qt_gui.widgets.pipeline_file_warning_widget \
import PipelineFileWarningWidget
import capsul.pipeline.xml as capsulxml
from capsul.study_config import process_instance
from capsul.pipeline.process_iteration import ProcessIteration
from soma.controller import Controller
from soma.utils.functiontools import SomaPartial
from six.moves import range
from six.moves import zip
try:
from traits import api as traits
except ImportError:
from enthought.traits import api as traits
from soma.qt_gui import qt_backend
qt_backend.init_traitsui_handler()
from soma.qt_gui.controller_widget import ScrollControllerWidget
from capsul.qt_gui.widgets.attributed_process_widget \
import AttributedProcessWidget
# -----------------------------------------------------------------------------
# Globals and constants
# -----------------------------------------------------------------------------
GRAY_1 = QtGui.QColor.fromRgbF(0.7, 0.7, 0.8, 0.1)
GRAY_2 = QtGui.QColor.fromRgbF(0.4, 0.4, 0.4, 1)
LIGHT_GRAY_1 = QtGui.QColor.fromRgbF(0.2, 0.2, 0.2, 1)
LIGHT_GRAY_2 = QtGui.QColor.fromRgbF(0.2, 0.2, 0.2, 1)
# Colors for links and plugs
ORANGE_1 = QtGui.QColor.fromRgb(220, 80, 20)
ORANGE_2 = QtGui.QColor.fromRgb(220, 120, 20)
BLUE_1 = QtGui.QColor.fromRgb(50, 150, 250)
BLUE_2 = QtGui.QColor.fromRgb(50, 50, 250)
PURPLE_2 = QtGui.QColor.fromRgb(200, 0, 200)
RED_2 = QtGui.QColor.fromRgb(200, 0, 0)
GREEN_2 = QtGui.QColor.fromRgb(0, 100, 0)
BLACK_2 = QtGui.QColor.fromRgb(10, 10, 10)
WHITE_2 = QtGui.QColor.fromRgb(255, 255, 255)
ANTHRACITE_1 = QtGui.QColor.fromRgbF(0.05, 0.05, 0.05)
LIGHT_ANTHRACITE_1 = QtGui.QColor.fromRgbF(0.25, 0.25, 0.25)
# -----------------------------------------------------------------------------
# Classes and functions
# -----------------------------------------------------------------------------
class ColorType(object):
def __init__(self):
pass
def colorLink(self, x):
if not isinstance(x, str):
# x is a trait
trait_type_str = x.trait_type.__class__.__name__
if x.output and x.input_filename is False:
trait_type_str = 'File_out'
x = trait_type_str
return {
'Str': PURPLE_2,
'Float': ORANGE_1,
'Int': BLUE_2,
'List': RED_2,
'File': ORANGE_2,
'File_out': GREEN_2,
}[x]
class Plug(QtGui.QGraphicsPolygonItem):
def __init__(self, color, name, height, width, activated=True,
optional=False, parent=None):
super(Plug, self).__init__(parent)
self.name = name
# self.color = self._color(activated, optional)
self.color = color
if optional:
brush = QtGui.QBrush(QtCore.Qt.SolidPattern)
brush.setColor(self.color)
polygon = QtGui.QPolygonF([QtCore.QPointF(0, 0),
QtCore.QPointF(width / 1.5, 0),
QtCore.QPointF(width / 1.5,
(height - 5)),
QtCore.QPointF(0, (height - 5))
])
# self.setPen(QtGui.QPen(QtCore.Qt.NoPen))
else:
brush = QtGui.QBrush(QtCore.Qt.SolidPattern)
brush.setColor(self.color)
polygon = QtGui.QPolygonF([QtCore.QPointF(0, 0),
QtCore.QPointF(
width, (height - 5) / 2.0),
QtCore.QPointF(0, height - 5)
])
self.setPolygon(polygon)
self.setBrush(brush)
self.setZValue(3)
self.setAcceptedMouseButtons(QtCore.Qt.LeftButton)
# def _color(self, activated, optional):
# if optional:
# if activated:
# color = QtCore.Qt.darkGreen
# else:
# color = QtGui.QColor('#BFDB91')
# else:
# if activated:
# color = QtCore.Qt.black
# else:
# color = QtCore.Qt.gray
# return color
# def update_plug(self, activated, optional):
# color = self._color(activated, optional)
# brush = QtGui.QBrush(QtCore.Qt.SolidPattern)
# brush.setColor(color)
# self.setBrush(brush)
def update_plug(self, color):
brush = QtGui.QBrush(QtCore.Qt.SolidPattern)
brush.setColor(color)
self.setBrush(brush)
def get_plug_point(self):
point = QtCore.QPointF(
self.boundingRect().size().width() / 2.0,
self.boundingRect().size().height() / 2.0)
return self.mapToParent(point)
def mousePressEvent(self, event):
super(Plug, self).mousePressEvent(event)
if event.button() == QtCore.Qt.LeftButton:
self.scene().plug_clicked.emit(self.name)
event.accept()
elif event.button() == QtCore.Qt.RightButton:
# print('plug: right click')
self.scene().plug_right_clicked.emit(self.name)
event.accept()
[docs]class EmbeddedSubPipelineItem(QtGui.QGraphicsProxyWidget):
'''
QGraphicsItem containing a sub-pipeline view
'''
def __init__(self, sub_pipeline_wid):
super(EmbeddedSubPipelineItem, self).__init__()
old_height = sub_pipeline_wid.sizeHint().height()
sizegrip = QtGui.QSizeGrip(None)
new_height = old_height \
+ sub_pipeline_wid.horizontalScrollBar().height()
sub_pipeline_wid.setCornerWidget(sizegrip)
sub_pipeline_wid.setHorizontalScrollBarPolicy(
QtCore.Qt.ScrollBarAlwaysOn)
sub_pipeline_wid.resize(sub_pipeline_wid.sizeHint().width(), new_height)
self.setWidget(sub_pipeline_wid)
class boxItem(QtGui.QGraphicsRectItem):
def __init__(self, parent=None):
super(boxItem, self).__init__(parent)
# self.setFlags(self.ItemIsFocusable)
self.penBox = 0;
self.name = ""
def focusInEvent(self, event):
self.setPen(QtGui.QPen(QtGui.QColor(150, 150, 250), 3, QtCore.Qt.DashDotLine))
return QtGui.QGraphicsRectItem.focusInEvent(self, event)
def focusOutEvent(self, event):
self.setPen(self.penBox)
return QtGui.QGraphicsRectItem.focusOutEvent(self, event)
def keyPressEvent(self, event):
if event.key() == QtCore.Qt.Key_Delete:
self.scene()._node_keydelete_clicked(self)
else:
super(boxItem, self).keyPressEvent(event)
class NodeGWidget(QtGui.QGraphicsItem):
def __init__(self, name, parameters, pipeline,
parent=None, process=None, sub_pipeline=None,
colored_parameters=True,
logical_view=False, labels=[],
show_opt_inputs=True, show_opt_outputs=True,
userlevel=0):
super(NodeGWidget, self).__init__(parent)
self.infoActived = QtGui.QGraphicsTextItem('', self)
self.colType = ColorType()
self._userlevel = userlevel
self.setFlags(self.ItemIsSelectable)
self.setCursor(Qt.QCursor(QtCore.Qt.PointingHandCursor))
self.style = 'default'
self.name = name
#print('GNode userlevel:', self.userlevel)
#print([(pname, param) for pname, param in six.iteritems(parameters)
#if not getattr(param, 'hidden', False)
#and (getattr(param, 'userlevel', None) is None
#or param.userlevel <= self.userlevel)]
if isinstance(process, ProcessNode):
controller = process.process
else:
controller = process
self.parameters = SortedDictionary()
for pname, param in six.iteritems(parameters):
show = True
if controller:
trait = controller.trait(pname)
if getattr(trait, 'hidden', False):
show = False
elif getattr(trait, 'userlevel', None) is not None:
if trait.userlevel > self.userlevel:
show = False
if show:
self.parameters[pname] = param
self.setFlag(QtGui.QGraphicsItem.ItemIsMovable)
self.in_plugs = SortedDictionary()
self.in_params = {}
self.out_plugs = SortedDictionary()
self.out_params = {}
self.process = process
self.sub_pipeline = sub_pipeline
self.embedded_subpipeline = None
self.colored_parameters = colored_parameters
self.logical_view = logical_view
self.pipeline = pipeline
# Added to choose to visualize optional parameters
self.show_opt_inputs = show_opt_inputs
self.show_opt_outputs = show_opt_outputs
self.labels = []
self.scene_labels = labels
self.label_items = []
my_labels = []
steps = getattr(pipeline, 'pipeline_steps', None)
if steps:
for step_name, step in six.iteritems(steps.user_traits()):
step_nodes = step.nodes
if name in step_nodes:
my_labels.append('step: %s' % step_name)
selects = pipeline.get_processes_selections()
for sel_plug in selects:
groups = pipeline.get_processes_selection_groups(sel_plug)
for group, nodes in six.iteritems(groups):
if name in nodes:
my_labels.append('select: %s' % sel_plug)
for label in my_labels:
self._get_label(label)
self._set_brush()
self.setAcceptedMouseButtons(
QtCore.Qt.LeftButton | QtCore.Qt.RightButton | QtCore.Qt.MiddleButton)
self._build()
if colored_parameters:
process.on_trait_change(self._repaint_parameter, dispatch='ui')
process.on_trait_change(self.update_parameters, 'user_traits_changed',
dispatch='ui')
def __del__(self):
#print('NodeGWidget.__del__')
self._release()
# super(NodeGWidget, self).__del__()
@property
def userlevel(self):
return self._userlevel
@userlevel.setter
def userlevel(self, value):
self._userlevel = value
self.update_parameters()
def _release(self):
# release internal connections / callbacks / references in order to
# allow deletion of self
self.process.on_trait_change(self.update_parameters,
'user_traits_changed', remove=True)
if self.colored_parameters:
try:
self.process.on_trait_change(self._repaint_parameter, remove=True)
except Exception:
pass
self.colored_parameters = None
self.sizer = None
def get_title(self):
if self.sub_pipeline is None:
return self.name
else:
return "[{0}]".format(self.name)
def update_parameters(self):
forbidden = ['nodes_activation', 'activated', 'enabled', 'name',
'node_type']
if isinstance(self.process, ProcessNode):
controller = self.process.process
else:
controller = self.process
self.parameters = SortedDictionary()
for pname, param in six.iteritems(self.process.user_traits()):
show = True
if self.name == 'inputs' and param.output:
continue
elif self.name == 'outputs' and not param.output:
continue
if controller:
trait = controller.trait(pname)
if getattr(trait, 'hidden', False):
show = False
elif getattr(trait, 'userlevel', None) is not None:
if trait.userlevel > self.userlevel:
show = False
if show:
self.parameters[pname] = param
self.update_node()
def update_labels(self, labels):
''' Update colored labels
'''
self.labels = []
for item in self.label_items:
item.deleteLater() # FIXME there should be another way !
self.label_items = []
for label in labels:
self._get_label(label)
self._create_label_marks()
def _get_label(self, label, register=True):
class Label(object):
def __init__(self, label, color):
self.text = label
self.color = color
for l in self.scene_labels:
if label == l.text:
if register and l not in self.labels:
self.labels.append(l)
return l
color = self.new_color(len(self.scene_labels))
label_item = Label(label, color)
if register:
self.labels.append(label_item)
self.scene_labels.append(label_item)
return label_item
def new_color(self, num):
colors = [[1, 0.3, 0.3],
[0.3, 1, 0.3],
[0.3, 0.3, 1],
[1, 1, 0],
[0, 1, 1],
[1, 0, 1],
[1, 1, 1],
[1, 0.7, 0],
[1, 0, 0.7],
[1, 0.7, 0.7],
[0.7, 1, 0],
[0., 1., 0.7],
[0.7, 1, 0.7],
[0.7, 0, 1],
[0., 0.7, 1],
[0.7, 0.7, 1],
[1, 1, 0.5],
[0.5, 1, 1],
[1, 0.5, 1]]
c = colors[num % len(colors)]
code = (int(c[0] * 255), int(c[1] * 255), int(c[2] * 255))
return code
def _repaint_parameter(self, param_name, new_value):
if self.logical_view or param_name not in self.parameters:
return
param_text = self._parameter_text(param_name)
param_item = self.in_params.get(param_name)
if param_item is None:
param_item = self.out_params[param_name]
if isinstance(param_item, QtGui.QGraphicsProxyWidget):
# colored parameters are widgets
param_item.widget().findChild(
QtGui.QLabel, 'label').setText(param_text)
else:
param_item.setHtml(param_text)
def _build(self):
margin = 0
self.title = QtGui.QGraphicsTextItem(self.get_title(), self)
# font = self.title.font()
font = QtGui.QFont("Times", 11, QtGui.QFont.Bold)
# font.setWeight(QtGui.QFont.Bold)
self.title.setFont(font)
self.title.setPos(margin, margin)
self.title.setZValue(2)
self.title.setDefaultTextColor(QtCore.Qt.white)
self.title.setParentItem(self)
if self.logical_view:
self._build_logical_view_plugs()
else:
self._build_regular_view_plugs()
self._create_label_marks()
ctr = self.contentsRect()
self.wmin = ctr.width()
self.hmin = ctr.height()
font1 = QtGui.QFont("Times", 12, QtGui.QFont.Normal)
font1.setItalic(True)
self.infoActived.setFont(font1)
self.infoActived.setZValue(2)
self.infoActived.setDefaultTextColor(QtCore.Qt.red)
self.infoActived.setParentItem(self)
self.box = boxItem(self)
self.box.setFlags(self.box.ItemIsFocusable)
self.box.setBrush(self.bg_brush)
self.box.setPen(QtGui.QPen(QtCore.Qt.NoPen))
self.box.setZValue(-1)
self.box.penBox = self.box.pen()
self.box.name = self.name
self.box.setParentItem(self)
self.sizer = HandleItem(self)
self.sizer.wmin = self.wmin
self.sizer.hmin = self.hmin
self.sizer.setPos(ctr.width(), ctr.height())
self.sizer.posChangeCallbacks.append(self.changeSize)
self.sizer.setFlag(self.sizer.ItemIsSelectable, True)
self.box_title = QtGui.QGraphicsRectItem(self)
self.box_title.setBrush(self.title_brush)
self.box_title.setPen(QtGui.QPen(QtCore.Qt.NoPen))
self.box_title.setZValue(1)
self.box_title.setParentItem(self)
self.changeSize(ctr.width(), ctr.height())
def changeSize(self, w, h):
limit = False
factor_h = 35
if h < self.hmin:
h = self.hmin
limit = True
if w < self.wmin:
w = self.wmin
limit = True
winMax, woutMax = 0, 0
self.box.setRect(0.0, 0.0, w, h)
self.box_title.setRect(0.0, 0.0, w, 30)
self.title.setPos(w / 2 - self.title.boundingRect().size().width() / 2, 0)
self.infoActived.setPos(w / 2 - self.infoActived.boundingRect().size().width() / 2, h + 2)
# rect = self.title.mapRectToParent(self.title.boundingRect())
# rect.setWidth(w)
# self.box_title.setRect(rect)
y1 = h / (len(self.in_plugs) + 1)
dy = y1
for inp in self.in_plugs:
self.in_plugs[inp].setPos(0, y1)
self.in_params[inp].setPos(self.in_plugs[inp].boundingRect().size().width(), y1)
if winMax < self.in_params[inp].boundingRect().size().width():
winMax = self.in_params[inp].boundingRect().size().width()
y1 += dy
y2 = h / (len(self.out_plugs) + 1)
dy = y2
for outp in self.out_plugs:
self.out_plugs[outp].setPos(w, y2)
self.out_params[outp].setPos(w - self.out_params[outp].boundingRect().size().width() - 5, y2)
if woutMax < self.out_params[outp].boundingRect().size().width():
woutMax = self.out_params[outp].boundingRect().size().width()
y2 += dy
if w < winMax + woutMax + 15:
w = winMax + woutMax + 15
self.updateSize(w, h)
#self.sizer.setPos(w, h)
self.wmin = w
if limit:
self.sizer.setPos(w, h)
self.update_labels([l.text for l in self.labels])
# if self.hmin < factor_h * len(self.in_plugs):
# self.hmin = factor_h * len(self.in_plugs)
# self.updateSize(w, self.hmin)
# if self.hmin < factor_h * len(self.out_plugs):
# self.hmin = factor_h * len(self.out_plugs)
# self.updateSize(w, self.hmin)
def updateSize(self, w, h):
# print("wmin =",self.wmin,", w=",w)
if w<self.wmin:
w=self.wmin
margin = 20
factor_h = 35.0
h = factor_h * len(self.in_plugs)+margin
self.hmin=h
if h < factor_h * len(self.out_plugs):
h = factor_h * len(self.out_plugs) + margin
self.hmin = h
self.sizer.hmin = h
self.changeSize(w, h + margin)
self.sizer.setPos(w, h + margin)
def _colored_text_item(self, label, text=None, margin=2):
labelc = self._get_label(label, False)
color = labelc.color
if text is None:
text = label
# I can't make rounded borders with appropriate padding
# without using 2 QLabels. This is probably overkill. We could
# replace this code of we find a simpler way.
label_w = QtGui.QLabel('')
label_w.setStyleSheet("background: rgba(255, 255, 255, 0);")
lay = QtGui.QVBoxLayout()
lay.setContentsMargins(margin, margin, margin, margin)
label_w.setLayout(lay)
label2 = QtGui.QLabel(text)
label2.setObjectName('label')
label2.setStyleSheet(
"background: rgba({0}, {1}, {2}, 255); "
"border-radius: 7px; border: 0px solid; "
"padding: 1px;".format(*color))
lay.addWidget(label2)
label_item = QtGui.QGraphicsProxyWidget(self)
label_item.setWidget(label_w)
return label_item
def _build_regular_view_plugs(self):
margin = 5
plug_width = 12
pos = margin + margin + self.title.boundingRect().size().height()
pos0 = pos
if self.name == 'inputs':
selections = self.pipeline.get_processes_selections()
else:
selections = []
for in_param, pipeline_plug in six.iteritems(self.parameters):
output = (not pipeline_plug.output if self.name in (
'inputs', 'outputs') else pipeline_plug.output)
if output or (not self.show_opt_inputs and pipeline_plug.optional):
continue
param_text = self._parameter_text(in_param)
param_name = QtGui.QGraphicsTextItem(self)
param_name.setHtml(param_text)
plug_name = '%s:%s' % (self.name, in_param)
try:
# color = self.colorLink(trait_type_str)
color = self.colType.colorLink(self.process.trait(in_param))
except Exception:
color = ORANGE_2
plug = Plug(color, plug_name,
param_name.boundingRect().size().height(),
plug_width, activated=pipeline_plug.activated,
optional=pipeline_plug.optional, parent=self)
param_name.setZValue(2)
plug.setPos(margin, pos)
param_name.setPos(plug.boundingRect().size().width() + margin, pos)
param_name.setParentItem(self)
plug.setParentItem(self)
self.in_plugs[in_param] = plug
self.in_params[in_param] = param_name
pos = pos + param_name.boundingRect().size().height()
pos = pos0
for out_param, pipeline_plug in six.iteritems(self.parameters):
output = (not pipeline_plug.output if self.name in (
'inputs', 'outputs') else pipeline_plug.output)
if not output or (not self.show_opt_outputs and pipeline_plug.optional):
continue
param_text = self._parameter_text(out_param)
if out_param in selections:
param_name = self._colored_text_item('select: ' + out_param,
param_text, 0)
else:
param_name = QtGui.QGraphicsTextItem(self)
param_name.setHtml(param_text)
plug_name = '%s:%s' % (self.name, out_param)
try:
# color = self.colorLink(trait_type_str)
color = self.colType.colorLink(self.process.trait(out_param))
except Exception:
color = ORANGE_2
plug = Plug(color, plug_name,
param_name.boundingRect().size().height(),
plug_width, activated=pipeline_plug.activated,
optional=pipeline_plug.optional, parent=self)
param_name.setZValue(2)
param_name.setPos(plug.boundingRect().size().width() + margin, pos)
plug.setPos(plug.boundingRect().size().width() + margin +
param_name.boundingRect().size().width() + margin, pos)
param_name.setParentItem(self)
plug.setParentItem(self)
self.out_plugs[out_param] = plug
self.out_params[out_param] = param_name
pos = pos + param_name.boundingRect().size().height()
def change_input_view(self):
self.show_opt_inputs = not self.show_opt_inputs
def change_output_view(self):
self.show_opt_outputs = not self.show_opt_outputs
def _build_logical_view_plugs(self):
margin = 5
plug_width = 12
pos = margin + margin + self.title.boundingRect().size().height()
has_input = False
has_output = False
for in_param, pipeline_plug in six.iteritems(self.parameters):
output = (not pipeline_plug.output if self.name in (
'inputs', 'outputs') else pipeline_plug.output)
if output:
has_output = True
else:
has_input = True
if has_input and has_output:
break
if has_input:
param_name = QtGui.QGraphicsTextItem(self)
param_name.setHtml('')
plug_name = '%s:inputs' % self.name
color = QtCore.Qt.black
plug = Plug(color, plug_name,
param_name.boundingRect().size().height(),
plug_width, activated=True,
optional=False, parent=self)
param_name.setZValue(2)
plug.setPos(margin, pos)
param_name.setPos(plug.boundingRect().size().width() + margin, pos)
param_name.setParentItem(self)
plug.setParentItem(self)
self.in_plugs['inputs'] = plug
self.in_params['inputs'] = param_name
if has_output:
param_name = QtGui.QGraphicsTextItem(self)
param_name.setHtml('')
plug_name = '%s:outputs' % self.name
color = QtCore.Qt.black
plug = Plug(color, plug_name,
param_name.boundingRect().size().height(),
plug_width, activated=True,
optional=False, parent=self)
param_name.setZValue(2)
param_name.setPos(plug.boundingRect().size().width() + margin, pos)
plug.setPos(self.title.boundingRect().width()
- plug.boundingRect().width(), pos)
param_name.setParentItem(self)
plug.setParentItem(self)
self.out_plugs['outputs'] = plug
self.out_params['outputs'] = param_name
def _create_label_marks(self):
labels = self.labels
if labels:
margin = 5
plug_width = 12
xpos = margin + plug_width
ypos = None
params = dict(self.in_params)
params.update(self.out_params)
child = None
for param in params.values():
y = self.mapRectFromItem(param, param.boundingRect()).bottom()
if ypos is None or ypos < y:
ypos = y
child = param
#if ypos is None:
#ypos = margin * 2 + self.title.boundingRect().size().height()
if child is None:
child = self.childItems()[-1]
item_rect = self.mapRectFromItem(child, child.boundingRect())
ypos = item_rect.bottom()
for label in labels:
color = label.color
text = label.text
label_item = self._colored_text_item(label.text, label.text)
label_item.setPos(xpos, ypos)
label_item.setParentItem(self)
self.label_items.append(label_item)
ypos = self.mapRectFromItem(
label_item, label_item.boundingRect()).bottom()
def clear_plugs(self):
for plugs, params in ((self.in_plugs, self.in_params),
(self.out_plugs, self.out_params)):
for plug_name, plug in six.iteritems(plugs):
param_item = params[plug_name]
self.scene().removeItem(param_item)
self.scene().removeItem(plug)
self.in_params = {}
self.in_plugs = {}
self.out_params = {}
self.out_plugs = {}
def updateInfoActived(self, state):
if state:
self.infoActived.setPlainText('')
else:
self.infoActived.setPlainText('disabled')
def fonced_viewer(self, det):
if det:
# color=QtGui.QColor(150, 150, 250)
self.setOpacity(0.2)
else:
# color=self.color
self.setOpacity(1)
# self._set_pen(self.active, self.weak, color)
def _set_brush(self):
pipeline = self.pipeline
if self.name in ('inputs', 'outputs'):
node = pipeline.pipeline_node
else:
node = pipeline.nodes[self.name]
color_1, color_2, color_3, style = pipeline_tools.pipeline_node_colors(
pipeline, node)
self.style = style
color_1 = QtGui.QColor.fromRgbF(*color_1)
color_2 = QtGui.QColor.fromRgbF(*color_2)
# color_1 = ANTHRACITE_1
# color_2 = LIGHT_ANTHRACITE_1
gradient = QtGui.QLinearGradient(0, 0, 0, 50)
gradient.setColorAt(0, color_1)
gradient.setColorAt(1, color_2)
self.bg_brush = QtGui.QBrush(gradient)
if node.activated:
# color_1 = GRAY_1
# color_2 = GRAY_2
self.updateInfoActived(True)
else:
# color_1 = LIGHT_GRAY_1
# color_2 = LIGHT_GRAY_2
self.updateInfoActived(False)
if node in pipeline.disabled_pipeline_steps_nodes():
color_1 = self._color_disabled(color_1)
color_2 = self._color_disabled(color_2)
gradient = QtGui.QLinearGradient(0, 2, 5, 100)
gradient.setColorAt(1, GRAY_1)
gradient.setColorAt(0, GRAY_2)
self.title_brush = QtGui.QBrush(LIGHT_GRAY_2)
def _color_disabled(self, color):
target = [220, 240, 220]
new_color = QtGui.QColor((color.red() + target[0]) / 2,
(color.green() + target[1]) / 2,
(color.blue() + target[2]) / 2)
return new_color
def _create_parameter(self, param_name, pipeline_plug):
plug_width = 12
margin = 5
output = (not pipeline_plug.output if self.name in (
'inputs', 'outputs') else pipeline_plug.output)
if self.logical_view:
if output:
param_name = 'outputs'
else:
param_name = 'inputs'
param_text = self._parameter_text(param_name)
if self.name == 'inputs' and not self.logical_view \
and 'select: ' + param_name in \
[l.text for l in self.scene_labels]:
param_name_item = self._colored_text_item('select: ' + param_name,
param_text, 0)
else:
param_name_item = QtGui.QGraphicsTextItem(self)
param_name_item.setHtml(param_text)
plug_name = '%s:%s' % (self.name, param_name)
color = QtCore.Qt.black
plug = Plug(color, plug_name,
param_name_item.boundingRect().size().height(),
plug_width, activated=pipeline_plug.activated,
optional=pipeline_plug.optional, parent=self)
param_name_item.setZValue(2)
if output:
plugs = self.out_plugs
params = self.out_params
params_size = len(params) + len(self.in_params)
# FIXME: sub-pipeline size
xpos = plug.boundingRect().size().width() + margin
pxpos = plug.boundingRect().size().width() + margin * 2 \
+ param_name_item.boundingRect().size().width()
else:
plugs = self.in_plugs
params = self.in_params
params_size = len(params)
xpos = plug.boundingRect().size().width() + margin
pxpos = margin
if self.logical_view:
params_size = 0
if output:
pxpos = self.title.boundingRect().width() \
- plug.boundingRect().width()
pos = margin * 2 + self.title.boundingRect().size().height() \
+ param_name_item.boundingRect().size().height() * params_size
param_name_item.setPos(xpos, pos)
plug.setPos(pxpos, pos)
param_name_item.setParentItem(self)
plug.setParentItem(self)
plugs[param_name] = plug
params[param_name] = param_name_item
if output:
self._shift_params()
self.updateSize(self.box.boundingRect().size().width(), self.box.boundingRect().size().height())
self.sizer.setPos(self.box.boundingRect().size().width(), self.box.boundingRect().size().height())
# self.hmin=self.box.boundingRect().size().height()
def _shift_params(self):
margin = 5
if not self.in_params:
if not self.out_params:
param_item = None
else:
param_item = list(self.out_params.values())[0]
else:
param_item = list(self.in_params.values())[0]
ni = 0
no = 0
bottom_pos = 0
if param_item:
for param_name, pipeline_plug in six.iteritems(self.parameters):
output = (not pipeline_plug.output if self.name in (
'inputs', 'outputs') else pipeline_plug.output)
if output:
# Added to choose to visualize optional parameters
if not pipeline_plug.optional or (self.show_opt_outputs and pipeline_plug.optional):
params = self.out_params
plugs = self.out_plugs
npos = no + len(self.in_params)
no += 1
else:
continue
else:
# Added to choose to visualize optional parameters
if not pipeline_plug.optional or (self.show_opt_inputs and pipeline_plug.optional):
params = self.in_params
plugs = self.in_plugs
npos = ni
ni += 1
else:
continue
pos = margin * 2 + self.title.boundingRect().size().height() \
+ param_item.boundingRect().size().height() * npos
new_param_item = params.get(param_name)
if new_param_item is None:
continue
param_item = new_param_item
plug = plugs[param_name]
ppos = param_item.pos()
param_item.setPos(ppos.x(), pos)
ppos = plug.pos()
plug.setPos(ppos.x(), pos)
pos += param_item.boundingRect().size().height()
bottom_pos = max(pos, bottom_pos)
if self.logical_view:
nparams = 1
else:
nparams = len(self.in_params) + len(self.out_params)
pos = margin * 2 + self.title.boundingRect().size().height() \
+ param_item.boundingRect().size().height() * nparams
else:
pos = margin * 2 + self.title.boundingRect().size().height()
for label_item in self.label_items:
ppos = label_item.pos()
label_item.setPos(ppos.x(), pos)
pos += label_item.boundingRect().size().height()
def _remove_parameter(self, param_name):
if param_name in self.in_params:
params = self.in_params
plugs = self.in_plugs
else:
params = self.out_params
plugs = self.out_plugs
param_item = params[param_name]
self.scene().removeItem(param_item)
plug = plugs[param_name]
self.scene().removeItem(plug)
del params[param_name]
del plugs[param_name]
self._shift_params()
def _parameter_text(self, param_name):
if self.logical_view:
return ''
pipeline_plug = self.parameters[param_name]
# output = (not pipeline_plug.output if self.name in (
# 'inputs', 'outputs') else pipeline_plug.output)
output = pipeline_plug.output
if output:
param_text = '<font color="#400000"><b>%s</b></font>' % param_name
else:
param_text = '<font color="#111111"><b>%s</b></font>' % param_name
try:
value = getattr(self.process, param_name)
except traits.TraitError:
value = traits.Undefined
if value is None or value is traits.Undefined or value == '':
param_text = '<em>%s</em>' % param_text
else:
trait = self.process.user_traits()[param_name]
if (isinstance(trait.trait_type, traits.File) \
or isinstance(trait.trait_type, traits.Directory)) \
and os.path.exists(value):
param_text = '<b>%s</b>' % param_text
return param_text
def update_node(self):
self._set_brush()
self.box_title.setBrush(self.title_brush)
self.box.setBrush(self.bg_brush)
for param, pipeline_plug in six.iteritems(self.parameters):
output = (not pipeline_plug.output if self.name in (
'inputs', 'outputs') else pipeline_plug.output)
if output:
plugs = self.out_plugs
params = self.out_params
if self.logical_view:
param = 'outputs'
else:
plugs = self.in_plugs
params = self.in_params
if self.logical_view:
param = 'inputs'
gplug = plugs.get(param)
if gplug is None: # new parameter ?
self._create_parameter(param, pipeline_plug)
gplug = plugs.get(param)
if not self.logical_view:
# gplug.update_plug(pipeline_plug.activated,
# pipeline_plug.optional)
try:
# color = self.colorLink(trait_type_str)
color = self.colType.colorLink(self.process.trait(param))
except Exception:
color = ORANGE_2
gplug.update_plug(color)
if isinstance(params[param], QtGui.QGraphicsProxyWidget):
# colored parameters are widgets
params[param].widget().findChild(
QtGui.QLabel, 'label').setText(
self._parameter_text(param))
else:
params[param].setHtml(self._parameter_text(param))
if not self.logical_view:
# check removed params
to_remove = []
# Added to choose to visualize optional parameters
for param, pipeline_plug in six.iteritems(self.parameters):
output = (not pipeline_plug.output if self.name in (
'inputs', 'outputs') else pipeline_plug.output)
if output:
if pipeline_plug.optional and not self.show_opt_outputs:
to_remove.append(param)
else:
if pipeline_plug.optional and not self.show_opt_inputs :
to_remove.append(param)
for param in self.in_params:
if param not in self.parameters:
to_remove.append(param)
for param in self.out_params:
if param not in self.parameters:
to_remove.append(param)
for param in to_remove:
self._remove_parameter(param)
self._shift_params()
# rect = self.title.mapRectToParent(self.title.boundingRect())
# margin = 5
# brect = self.boundingRect()
# brect.setWidth(brect.right() - margin)
# rect.setWidth(brect.width())
# self.box_title.setRect(rect)
# self.box.setRect(self.boundingRect())
################a dd by Irmage OM #############################################
try :
dim = self.scene().dim.get(self.box.name)
if isinstance(dim, Qt.QPointF):
dim=(dim.x(),dim.y())
self.updateSize(dim[0],dim[1])
# self.scene().dim[self.box.name] = (dim[0],dim[1])
# print("update_node : self.scene().dim ",dim)
except Exception:
dim = (self.box.boundingRect().size().width(), self.box.boundingRect().size().height())
self.updateSize(dim[0],dim[1])
# self.scene().dim[self.box.name] = (dim[0],dim[1])
# print("update_node : boundingRect()")
##############################################################################
def contentsRect(self):
brect = QtCore.QRectF(0, 0, 0, 0)
first = True
excluded = []
for name in ('box', 'box_title'):
if hasattr(self, name):
excluded.append(getattr(self, name))
for child in self.childItems():
if not hasattr(child, 'isVisible'):
# we sometimes get some QObject here, I don't know who they are
continue
if not child.isVisible() or child in excluded:
continue
item_rect = self.mapRectFromItem(child, child.boundingRect())
if first:
first = False
brect = item_rect
else:
if child is self.embedded_subpipeline:
margin = 5
item_rect.setBottom(item_rect.bottom() + margin)
if item_rect.left() < brect.left():
brect.setLeft(item_rect.left())
if item_rect.top() < brect.top():
brect.setTop(item_rect.top())
if item_rect.right() > brect.right():
brect.setRight(item_rect.right())
if item_rect.bottom() > brect.bottom():
brect.setBottom(item_rect.bottom())
return brect
def boundingRect(self):
margin = 0
brect = self.contentsRect()
if self.embedded_subpipeline and self.embedded_subpipeline.isVisible():
margin = 5
brect.setRight(brect.right())
brect.setBottom(brect.bottom() + margin)
return brect
def paint(self, painter, option, widget=None):
pass
def postscript(self, file_name):
printer = QtGui.QPrinter(QtGui.QPrinter.HighResolution)
printer.setOutputFormat(QtGui.QPrinter.PostScriptFormat)
printer.setOutputFileName(file_name)
# qreal xmargin = contentRect.width()*0.01;
# qreal ymargin = contentRect.height()*0.01;
# printer.setPaperSize(10*contentRect.size()*1.02,QPrinter::DevicePixel);
# printer.setPageMargins(xmargin,ymargin,xmargin,ymargin,QPrinter::DevicePixel);
painter = QtGui.QPainter()
painter.begin(printer)
painter.setPen(QtCore.Qt.blue)
painter.setFont(QtGui.QFont('Arial', 30))
painter.drawText(0, 0, 'Ca marche !')
# render(&painter,QRectF(QPointF(0,0),10*contentRect.size()),contentRect);
painter.end()
def resize_subpipeline_on_show(self):
margin = 5
param_width = self.in_params_width()
pos = margin * 2 + self.title.boundingRect().size().height()
opos = param_width \
+ self.embedded_subpipeline.boundingRect().width() # + margin ?
for name, param in six.iteritems(self.out_params):
param.setPos(opos, param.pos().y())
plug = self.out_plugs[name]
plug.setPos(opos + margin + param.boundingRect().size().width(),
plug.pos().y())
# rect = self.box_title.boundingRect()
rect = self.box.boundingRect()
rect.setWidth(self.contentsRect().width())
# self.box_title.setRect(rect)
self.box.setRect(self.boundingRect())
def resize_subpipeline_on_hide(self):
margin = 5
for name, param in six.iteritems(self.out_params):
plug = self.out_plugs[name]
param.setPos(plug.boundingRect().width() + margin, param.pos().y())
plug.setPos(plug.boundingRect().size().width() + margin +
param.boundingRect().size().width() + margin, plug.pos().y())
# rect = self.box_title.boundingRect()
rect = self.box.boundingRect()
rect.setWidth(self.contentsRect().width())
# self.box_title.setRect(rect)
self.box.setRect(self.boundingRect())
def in_params_width(self):
margin = 5
width = 0
pwidth = 0
for param_name, param in six.iteritems(self.in_params):
if param.boundingRect().width() > width:
width = param.boundingRect().width()
if pwidth == 0:
plug = self.in_plugs[param_name]
pwidth = plug.boundingRect().width()
return width + margin + pwidth
def out_params_width(self):
width = 0
for param_name, param in six.iteritems(self.out_params):
if param.boundingRect().width() > width:
width = param.boundingRect().width()
return width
def add_subpipeline_view(
self,
sub_pipeline,
allow_open_controller=True,
scale=None):
if self.embedded_subpipeline:
if self.embedded_subpipeline.isVisible():
self.embedded_subpipeline.hide()
self.resize_subpipeline_on_hide()
else:
self.embedded_subpipeline.show()
self.resize_subpipeline_on_show()
else:
sub_view = PipelineDevelopperView(
sub_pipeline,
show_sub_pipelines=True,
allow_open_controller=allow_open_controller,
enable_edition=self.scene().edition_enabled(),
userlevel=self.userlevel)
if scale is not None:
sub_view.scale(scale, scale)
pwid = EmbeddedSubPipelineItem(sub_view)
sub_view._graphics_item = weakref.proxy(pwid)
margin = 5
pos = margin * 2 + self.title.boundingRect().size().height()
pwid.setParentItem(self)
pwid.setPos(self.in_params_width(), pos)
self.embedded_subpipeline = pwid
self.resize_subpipeline_on_show()
self.setFiltersChildEvents(False)
pwid.geometryChanged.connect(self.resize_subpipeline_on_show)
def mouseDoubleClickEvent(self, event):
if self.sub_pipeline:
if isinstance(self.sub_pipeline, weakref.ProxyTypes):
# get the "real" object
process = self.sub_pipeline.__init__.__self__
else:
process = self.sub_pipeline
self.scene().subpipeline_clicked.emit(self.name, process,
event.modifiers())
event.accept()
else:
event.ignore()
def mousePressEvent(self, event):
item = self.scene().itemAt(event.scenePos(), Qt.QTransform())
# print('NodeGWidget click, item:', item)
if isinstance(item, Plug):
item.mousePressEvent(event)
return
super(NodeGWidget, self).mousePressEvent(event)
if isinstance(self.process, weakref.ProxyTypes):
process = self.process.__init__.__self__ # get the "real" object
else:
process = self.process
if event.button() == QtCore.Qt.RightButton and process is not None:
self.scene().node_right_clicked.emit(self.name, process)
event.accept()
if event.button() == QtCore.Qt.LeftButton and process is not None:
if isinstance(process, Process):
self.scene().process_clicked.emit(self.name, process)
else:
self.scene().node_clicked.emit(self.name, process)
if event.button() == QtCore.Qt.LeftButton and event.modifiers() == QtCore.Qt.ControlModifier:
self.scene().node_clicked_ctrl.emit(self.name, process)
self.scene().clearSelection()
self.box.setSelected(True)
return QtGui.QGraphicsItem.mousePressEvent(self, event)
event.accept()
def keyPressEvent(self, event):
super(NodeGWidget, self).keyPressEvent(event)
if event.key() == QtCore.Qt.Key_Up:
self.setPos(self.x(), self.y() - 1)
if event.key() == QtCore.Qt.Key_Down:
self.setPos(self.x(), self.y() + 1)
if event.key() == QtCore.Qt.Key_Left:
self.setPos(self.x() - 1, self.y())
if event.key() == QtCore.Qt.Key_Right:
self.setPos(self.x() + 1, self.y())
return QtGui.QGraphicsItem.keyPressEvent(self, event)
event.accept()
[docs]class HandleItem(QtGui.QGraphicsRectItem):
""" A handle that can be moved by the mouse """
def __init__(self, parent=None):
super(HandleItem, self).__init__(Qt.QRectF(-10.0, -10.0, 10.0, 10.0), parent)
# self.setRect(Qt.QRectF(-4.0,-4.0,4.0,4.0))
self.posChangeCallbacks = []
self.setPen(QtGui.QPen(QtCore.Qt.NoPen))
self.setBrush(QtGui.QBrush(QtCore.Qt.yellow))
self.setFlag(self.ItemIsMovable, True)
self.setFlag(self.ItemSendsScenePositionChanges, True)
self.setCursor(QtGui.QCursor(QtCore.Qt.SizeFDiagCursor))
self.wmin = 0.0
self.hmin = 0.0
self.hmax = 0.0
self.effectiveOpacity()
self.setOpacity(0.01)
def itemChange(self, change, value):
if change == self.ItemPositionChange:
self.x, self.y = value.x(), value.y()
if self.x < self.wmin:
self.x = self.wmin
if self.y < self.hmin:
self.y = self.hmin
# TODO: make this a signal?
# This cannot be a signal because this is not a QObject
for cb in self.posChangeCallbacks:
res = cb(self.x, self.y)
if res:
self.x, self.y = res
if self.x < self.wmin:
self.x = self.wmin
if self.y < self.hmin:
self.y = self.hmin
value = QtCore.QPointF(self.x, self.y)
# value = Qt.QPointF(x, y) #### ??
self.hmax = value.y()
return value
# Call superclass method:
return super(HandleItem, self).itemChange(change, value)
def mouseReleaseEvent(self, mouseEvent):
self.setSelected(False)
self.setPos(self.x, self.y)
return QtGui.QGraphicsRectItem.mouseReleaseEvent(self, mouseEvent)
class Link(QtGui.QGraphicsPathItem):
def __init__(self, origin, target, active, weak, color, parent=None):
super(Link, self).__init__(parent)
self._set_pen(active, weak, color)
self.setFlag(QtGui.QGraphicsItem.ItemIsSelectable, False)
self.setFlag(QtGui.QGraphicsItem.ItemIsFocusable, True)
path = QtGui.QPainterPath()
path.moveTo(origin.x(), origin.y())
path.cubicTo(origin.x() + 90, origin.y(),
target.x() - 90, target.y(),
target.x() - 5, target.y())
self.setPath(path)
self.setZValue(0.5)
self.active = active
self.weak = weak
self.color = color
self.effectiveOpacity()
def _set_pen(self, active, weak, color):
self.pen = QtGui.QPen()
self.pen.setWidth(3)
if active:
self.pen.setBrush(color)
else:
self.pen.setBrush(QtCore.Qt.gray)
if weak:
self.pen.setStyle(QtCore.Qt.DashLine)
self.pen.setCapStyle(QtCore.Qt.RoundCap)
self.pen.setJoinStyle(QtCore.Qt.RoundJoin)
self.setPen(self.pen)
def update(self, origin, target):
path = QtGui.QPainterPath()
path.moveTo(origin.x(), origin.y())
path.cubicTo(origin.x() + 90, origin.y(),
target.x() - 90, target.y(),
target.x() - 5, target.y())
self.setPath(path)
def update_activation(self, active, weak, color):
if color == 'current':
color = self.color
self._set_pen(active, weak, color)
self.active = active
self.weak = weak
def fonced_viewer(self, det):
if det:
# color=QtGui.QColor(150, 150, 250)
self.setOpacity(0.2)
else:
# color=self.color
self.setOpacity(1)
# self._set_pen(self.active, self.weak, color)
def mousePressEvent(self, event):
item = self.scene().itemAt(event.scenePos(), Qt.QTransform())
# print('Link click, item:', item)
if event.button() == QtCore.Qt.RightButton:
# not a signal since we don't jhave enough identity information in
# self: the scene has to help us.
self.scene()._link_right_clicked(self)
else:
super(Link, self).mousePressEvent(event)
event.accept()
def focusInEvent(self, event):
self.setPen(QtGui.QPen(QtGui.QColor(150, 150, 250), 3, QtCore.Qt.DashDotDotLine))
return QtGui.QGraphicsPathItem.focusInEvent(self, event)
def focusOutEvent(self, event):
self.setPen(self.pen)
return QtGui.QGraphicsPathItem.focusOutEvent(self, event)
def keyPressEvent(self, event):
if event.key() == QtCore.Qt.Key_Delete:
self.scene()._link_keydelete_clicked(self)
event.accept()
else:
super(Link, self).keyPressEvent(event)
class PipelineScene(QtGui.QGraphicsScene):
# Signal emitted when a sub pipeline has to be open.
subpipeline_clicked = QtCore.Signal(str, Process,
QtCore.Qt.KeyboardModifiers)
# Signal emitted when a node box is clicked
process_clicked = QtCore.Signal(str, Process)
node_clicked = QtCore.Signal(str, Node)
# Signal emitted when a node box is clicked with ctrl
node_clicked_ctrl = QtCore.Signal(str, Process)
# Signal emitted when a switch box is clicked
switch_clicked = QtCore.Signal(str, Switch)
# Signal emitted when a node box is right-clicked
node_right_clicked = QtCore.Signal(str, Controller)
# Signal emitted when a plug is clicked
plug_clicked = QtCore.Signal(str)
# Signal emitted when a plug is clicked with the right mouse button
plug_right_clicked = QtCore.Signal(str)
# Signal emitted when a link is right-clicked
link_right_clicked = QtCore.Signal(str, str, str, str)
link_keydelete_clicked = QtCore.Signal(str, str, str, str)
node_keydelete_clicked = QtCore.Signal(str)
def __init__(self, parent=None, userlevel=0):
super(PipelineScene, self).__init__(parent)
self.gnodes = {}
self.glinks = {}
self._pos = 50
self.pos = {}
self.dim = {} # add by Irmage OM for recorded dimension of Nodes
self.colored_parameters = True
self.logical_view = False
self._enable_edition = False
self.labels = []
self._userlevel = userlevel
# pen = QtGui.QPen(QtGui.QColor(250,100,0),2)
# self.l = QtCore.QLineF(-10,0,10,0)
# self.addLine(self.l,pen)
# self.l = QtCore.QLineF(0,-10,0,10)
# self.addLine(self.l,pen)
self.colType = ColorType()
self.changed.connect(self.update_paths)
def __del__(self):
#print('PipelineScene.__del__')
if hasattr(self, 'pos'):
del self.pos
if hasattr(self, 'dim'):
del self.dim
if hasattr(self, 'labels'):
del self.labels
if hasattr(self, 'glinks'):
del self.glinks
for gnode in self.gnodes.values():
gnode._release()
del self.gnodes
# force delete gnodes: needs to use gc.collect()
import gc
gc.collect()
@property
def userlevel(self):
return self._userlevel
@userlevel.setter
def userlevel(self, value):
if self._userlevel != value:
self._userlevel = value
for name, gnode in self.gnodes.items():
gnode.userlevel = value
self.update_pipeline()
def _add_node(self, name, gnode):
self.addItem(gnode)
################# add by Irmage OM ####################
dim = self.dim.get(name)
# print("_add_node : dim : ",dim," , type =",type(dim).__name__)
if dim is not None:
if isinstance(dim, Qt.QPointF):
dim=(dim.x(),dim.y())
gnode.updateSize(dim[0],dim[1])
#gnode.sizer.setPos(dim[0],dim[1])
# gnode.update_node()
######################################################
pos = self.pos.get(name)
if pos is None:
gnode.setPos(2 * self._pos, self._pos)
self._pos += 100
else:
if not isinstance(pos, Qt.QPointF):
pos = Qt.QPointF(pos[0], pos[1])
gnode.setPos(pos)
self.gnodes[name] = gnode
# gnode.update_node()
#repositioning 'inputs' node
if name == 'inputs':
pos_left_most=(0,0)
for el in self.gnodes:
if el!='inputs' and el!='outputs':
if pos_left_most[0] > self.gnodes[el].pos().x():
pos_left_most=(self.gnodes[el].pos().x(),self.gnodes[el].pos().y())
xl = pos_left_most[0]-(2*self.gnodes[name].boundingRect().size().width())
yl = pos_left_most[1]
self.gnodes[name].setPos(xl,yl)
# gnode.update_node()
#repositioning 'outputs' node
if name == 'outputs':
pos_right_most=(0,0)
for el in self.gnodes:
if el!='inputs' and el!='outputs':
if pos_right_most[0] < self.gnodes[el].pos().x() + self.gnodes[el].boundingRect().size().width() :
pos_right_most=(self.gnodes[el].pos().x() + self.gnodes[el].boundingRect().size().width(),self.gnodes[el].pos().y())
xl = pos_right_most[0]+self.gnodes[name].boundingRect().size().width()
yl = pos_right_most[1]
self.gnodes[name].setPos(xl,yl)
# gnode.update_node()
################" add by Irmage #############################################
self.setSceneRect(QtCore.QRectF())
#############################################################################
def add_node(self, node_name, node):
if not isinstance(node, ProcessNode):
process = node
if hasattr(node, 'process'):
process = node.process
if isinstance(node, PipelineNode):
sub_pipeline = process
elif process and isinstance(process, ProcessIteration):
sub_pipeline = process.process
else:
sub_pipeline = None
gnode = NodeGWidget(
node_name, node.plugs, self.pipeline,
sub_pipeline=sub_pipeline, process=process,
colored_parameters=self.colored_parameters,
logical_view=self.logical_view, labels=self.labels,
userlevel=self.userlevel)
self._add_node(node_name, gnode)
gnode.update_node()
return gnode
def add_link(self, source, dest, active, weak):
# print("add link ", source, dest)
source_gnode_name, source_param = source
if not source_gnode_name:
source_gnode_name = 'inputs'
dest_gnode_name, dest_param = dest
if not dest_gnode_name:
dest_gnode_name = 'outputs'
if self.logical_view:
source_param = 'outputs'
dest_param = 'inputs'
try:
typeq = self.typeLink(source_gnode_name, source_param)
# color = self.colorLink(typeq)
color = self.colType.colorLink(typeq)
except Exception:
color = ORANGE_2
# verif=((str(dest_gnode_name), str(dest_param)))
# print(str(verif) in str(self.glinks.keys()))
source_dest = ((str(source_gnode_name), str(source_param)),
(str(dest_gnode_name), str(dest_param)))
if source_dest in self.glinks:
# already done
if self.logical_view:
# keep strongest link representation
glink = self.glinks[source_dest]
if active or glink.active:
active = True
if not weak or not glink.weak:
weak = False
if glink.weak != weak or glink.active != active:
glink.update_activation(active, weak, "current")
return # already done
source_gnode = self.gnodes[source_gnode_name]
dest_gnode = self.gnodes.get(dest_gnode_name)
if dest_gnode is not None:
if dest_param in dest_gnode.in_plugs \
and source_param in source_gnode.out_plugs:
glink = Link(
source_gnode.mapToScene(
source_gnode.out_plugs[source_param].get_plug_point()),
dest_gnode.mapToScene(
dest_gnode.in_plugs[dest_param].get_plug_point()),
active, weak, color)
self.glinks[source_dest] = glink
self.addItem(glink)
def _remove_link(self, source_dest):
source, dest = source_dest
source_gnode_name, source_param = source
if not source_gnode_name:
source_gnode_name = 'inputs'
source_gnode = self.gnodes[source_gnode_name]
dest_gnode_name, dest_param = dest
if not dest_gnode_name:
dest_gnode_name = 'outputs'
if self.logical_view:
# is it useful ?
source_param = 'outputs'
dest_param = 'inputs'
dest_gnode = self.gnodes.get(dest_gnode_name)
new_source_dest = ((str(source_gnode_name), str(source_param)),
(str(dest_gnode_name), str(dest_param)))
glink = self.glinks.get(new_source_dest)
if glink is not None:
self.removeItem(glink)
del self.glinks[new_source_dest]
def update_paths(self, regions=[]):
for name, i in six.iteritems(self.gnodes):
self.pos[i.name] = i.pos()
br = i.box.boundingRect()
self.dim[i.name] = (br.width(), br.height())
dropped = []
for source_dest, glink in six.iteritems(self.glinks):
source, dest = source_dest
source_gnode_name, source_param = source
dest_gnode_name, dest_param = dest
source_gnode = self.gnodes[source_gnode_name]
dest_gnode = self.gnodes[dest_gnode_name]
if source_param not in source_gnode.out_plugs \
or dest_param not in dest_gnode.in_plugs:
dropped.append(source_dest)
else:
glink.update(source_gnode.mapToScene(
source_gnode.out_plugs[source_param].get_plug_point()),
dest_gnode.mapToScene(
dest_gnode.in_plugs[dest_param].get_plug_point()))
for source_dest in dropped:
self._remove_link(source_dest)
def set_pipeline(self, pipeline):
self.pipeline = pipeline
self.labels = []
pipeline_inputs = SortedDictionary()
pipeline_outputs = SortedDictionary()
if pipeline is not None:
for name, plug in six.iteritems(pipeline.nodes[''].plugs):
if plug.output:
pipeline_outputs[name] = plug
else:
pipeline_inputs[name] = plug
if pipeline_inputs:
self._add_node(
'inputs', NodeGWidget(
'inputs', pipeline_inputs, pipeline,
process=pipeline,
colored_parameters=self.colored_parameters,
logical_view=self.logical_view,
userlevel=self.userlevel))
for node_name, node in six.iteritems(pipeline.nodes):
if not node_name:
continue
self.add_node(node_name, node)
if pipeline_outputs:
self._add_node(
'outputs', NodeGWidget(
'outputs', pipeline_outputs, pipeline,
process=pipeline,
colored_parameters=self.colored_parameters,
logical_view=self.logical_view,
userlevel=self.userlevel))
for source_node_name, source_node in six.iteritems(pipeline.nodes):
for source_parameter, source_plug \
in six.iteritems(source_node.plugs):
for (dest_node_name, dest_parameter, dest_node, dest_plug,
weak_link) in source_plug.links_to:
if dest_node is pipeline.nodes.get(dest_node_name):
self.add_link(
(source_node_name, source_parameter),
(dest_node_name, dest_parameter),
active=source_plug.activated \
and dest_plug.activated,
weak=weak_link)
def update_pipeline(self):
if self.logical_view:
self._update_logical_pipeline()
else:
self._update_regular_pipeline()
def _update_regular_pipeline(self):
# normal view
pipeline = self.pipeline
removed_nodes = []
# print(self.gnodes)
for node_name, gnode in six.iteritems(self.gnodes):
if gnode.logical_view:
gnode.clear_plugs()
gnode.logical_view = False
if node_name in ('inputs', 'outputs'):
node = pipeline.nodes['']
# in case traits have been added/removed
if node_name == 'inputs':
pipeline_inputs = SortedDictionary()
for name, plug in six.iteritems(node.plugs):
if not plug.output:
trait = node.get_trait(name)
if not trait.hidden \
and (trait.userlevel is None
or trait.userlevel <= self.userlevel):
pipeline_inputs[name] = plug
gnode.parameters = pipeline_inputs
else:
pipeline_outputs = SortedDictionary()
for name, plug in six.iteritems(node.plugs):
if plug.output:
trait = node.get_trait(name)
if not trait.hidden \
and (trait.userlevel is None
or trait.userlevel <= self.userlevel):
pipeline_outputs[name] = plug
gnode.parameters = pipeline_outputs
else:
node = pipeline.nodes.get(node_name)
if node is None: # removed node
removed_nodes.append(node_name)
continue
gnode.active = node.activated
gnode.update_node()
# handle removed nodes
for node_name in removed_nodes:
self.removeItem(self.gnodes[node_name])
del self.gnodes[node_name]
# check for added nodes
added_nodes = []
for node_name, node in six.iteritems(pipeline.nodes):
if node_name == '':
pipeline_inputs = SortedDictionary()
pipeline_outputs = SortedDictionary()
for name, plug in six.iteritems(node.plugs):
if plug.output:
pipeline_outputs[name] = plug
else:
pipeline_inputs[name] = plug
if pipeline_inputs and 'inputs' not in self.gnodes:
self._add_node(
'inputs', NodeGWidget(
'inputs', pipeline_inputs, pipeline,
process=pipeline,
colored_parameters=self.colored_parameters,
logical_view=self.logical_view,
userlevel=self.userlevel))
if pipeline_outputs and 'outputs' not in self.gnodes:
self._add_node(
'outputs', NodeGWidget(
'outputs', pipeline_outputs, pipeline,
process=pipeline,
colored_parameters=self.colored_parameters,
logical_view=self.logical_view,
userlevel=self.userlevel))
elif node_name not in self.gnodes:
process = None
if isinstance(node, Switch):
process = node
if hasattr(node, 'process'):
process = node.process
if isinstance(node, PipelineNode):
sub_pipeline = node.process
else:
sub_pipeline = None
self.add_node(node_name, node)
# links
to_remove = []
for source_dest, glink in six.iteritems(self.glinks):
source, dest = source_dest
source_node_name, source_param = source
dest_node_name, dest_param = dest
if source_node_name == 'inputs':
source_node_name = ''
if dest_node_name == 'outputs':
dest_node_name = ''
source_node = pipeline.nodes.get(source_node_name)
if source_node is None:
to_remove.append(source_dest)
continue
source_plug = source_node.plugs.get(source_param)
dest_node = pipeline.nodes.get(dest_node_name)
if dest_node is None:
to_remove.append(dest_node_name)
continue
dest_plug = dest_node.plugs.get(dest_param)
remove_glink = False
if source_plug is None or dest_plug is None:
# plug[s] removed
remove_glink = True
else:
active = source_plug.activated and dest_plug.activated
weak = [x[4] for x in source_plug.links_to \
if x[:2] == (dest_node_name, dest_param)]
if len(weak) == 0:
# link removed
remove_glink = True
else:
weak = weak[0]
if remove_glink:
to_remove.append(source_dest)
else:
glink.update_activation(active, weak, "current")
for source_dest in to_remove:
self._remove_link(source_dest)
# check added links
for source_node_name, source_node in six.iteritems(pipeline.nodes):
for source_parameter, source_plug \
in six.iteritems(source_node.plugs):
for (dest_node_name, dest_parameter, dest_node, dest_plug,
weak_link) in source_plug.links_to:
if dest_node is pipeline.nodes.get(dest_node_name):
self.add_link(
(source_node_name, source_parameter),
(dest_node_name, dest_parameter),
active=source_plug.activated \
and dest_plug.activated,
weak=weak_link)
self._update_steps()
def _update_steps(self):
pipeline = self.pipeline
if not hasattr(pipeline, 'pipeline_steps'):
return
steps = pipeline.pipeline_steps
if steps is None:
return
for node_name, node in six.iteritems(pipeline.nodes):
gnode = self.gnodes.get(node_name)
if gnode is None:
continue
labels = ['step: %s' % n for n in steps.user_traits()
if node_name in steps.trait(n).nodes]
#print('update step labels on', node_name, ':', labels)
gnode.update_labels(labels)
def _update_logical_pipeline(self):
# update nodes plugs and links in logical view mode
pipeline = self.pipeline
# nodes state
removed_nodes = []
for node_name, gnode in six.iteritems(self.gnodes):
if not gnode.logical_view:
gnode.clear_plugs()
gnode.logical_view = True
if node_name in ('inputs', 'outputs'):
node = pipeline.nodes['']
else:
node = pipeline.nodes.get(node_name)
if node is None: # removed node
removed_nodes.append(node_name)
continue
gnode.active = node.activated
gnode.update_node()
# handle removed nodes
for node_name in removed_nodes:
self.removeItem(self.gnodes[node_name])
del self.gnodes[node_name]
# check for added nodes
added_nodes = []
for node_name, node in six.iteritems(pipeline.nodes):
if node_name == '':
pipeline_inputs = SortedDictionary()
pipeline_outputs = SortedDictionary()
for name, plug in six.iteritems(node.plugs):
if plug.output:
pipeline_outputs['outputs'] = plug
else:
pipeline_inputs['inputs'] = plug
if pipeline_inputs and 'inputs' not in self.gnodes:
self._add_node(
'inputs', NodeGWidget(
'inputs', pipeline_inputs, pipeline,
process=pipeline,
colored_parameters=self.colored_parameters,
logical_view=self.logical_view,
userlevel=self.userlevel))
if pipeline_outputs and 'outputs' not in self.gnodes:
self._add_node(
'outputs', NodeGWidget(
'outputs', pipeline_outputs, pipeline,
process=pipeline,
colored_parameters=self.colored_parameters,
logical_view=self.logical_view,
userlevel=self.userlevel))
elif node_name not in self.gnodes:
process = None
if isinstance(node, Switch):
process = node
if hasattr(node, 'process'):
process = node.process
if isinstance(node, PipelineNode):
sub_pipeline = node.process
else:
sub_pipeline = None
self.add_node(node_name, node)
# links
# delete all links
for source_dest, glink in six.iteritems(self.glinks):
self.removeItem(glink)
self.glinks = {}
# recreate links
for source_node_name, source_node in six.iteritems(pipeline.nodes):
for source_parameter, source_plug \
in six.iteritems(source_node.plugs):
for (dest_node_name, dest_parameter, dest_node, dest_plug,
weak_link) in source_plug.links_to:
if dest_node is pipeline.nodes.get(dest_node_name):
self.add_link(
(source_node_name, source_parameter),
(dest_node_name, dest_parameter),
active=source_plug.activated \
and dest_plug.activated,
weak=weak_link)
self._update_steps()
def set_enable_edition(self, state=True):
self._enable_edition = state
def edition_enabled(self):
return self._enable_edition
def keyPressEvent(self, event):
super(PipelineScene, self).keyPressEvent(event)
if not event.isAccepted():
if event.key() == QtCore.Qt.Key_P:
# print position of boxes
event.accept()
pview = self.parent()
pview.print_node_positions()
elif event.key() == QtCore.Qt.Key_T:
for item in self.items():
if isinstance(item, boxItem):
item.focusOutEvent(Qt.QFocusEvent(Qt.QEvent.FocusOut))
# toggle logical / full view
pview = self.parent()
pview.switch_logical_view()
event.accept()
elif event.key() == QtCore.Qt.Key_A:
# auto-set nodes positions
pview = self.parent()
pview.auto_dot_node_positions()
# elif Qt.QKeySequence(event.key()+int(event.modifiers())) == Qt.QKeySequence("Ctrl+Z"):
# self.undoTyping_clicked.emit()
return QtGui.QGraphicsScene.keyPressEvent(self, event)
def link_tooltip_text(self, source_dest):
'''Tooltip text for the fiven link
Parameters
----------
source_dest: tupe (2 tuples of 2 strings)
link description:
((source_node, source_param), (dest_node, dest_param))
'''
source_node_name = source_dest[0][0]
dest_node_name = source_dest[1][0]
if source_node_name in ('inputs', 'outputs'):
proc = self.pipeline
source_node_name = ''
source_node = self.pipeline.nodes[source_node_name]
else:
source_node = self.pipeline.nodes[source_node_name]
proc = source_node
if hasattr(source_node, 'process'):
proc = source_node.process
if dest_node_name in ('inputs', 'outputs'):
dest_node_name = ''
splug = source_node.plugs[source_dest[0][1]]
link = [l for l in splug.links_to \
if l[0] == dest_node_name and l[1] == source_dest[1][1]][0]
if splug.activated and link[3].activated:
active = '<font color="#ffa000">activated</font>'
else:
active = '<font color="#a0a0a0">inactive</font>'
if link[4]:
weak = '<font color="#e0c0c0">weak</font>'
else:
weak = '<b>strong</b>'
name = source_dest[0][1]
value = getattr(proc, name)
# trait = proc.user_traits()[name]
trait_type = proc.user_traits()[name].trait_type
trait_type_str = str(trait_type)
trait_type_str = trait_type_str[: trait_type_str.find(' object ')]
trait_type_str = trait_type_str[trait_type_str.rfind('.') + 1:]
inst_type = self.get_instance_type_string(value)
typestr = ('%s (%s)' % (inst_type, trait_type_str)).replace(
'<', '').replace('>', '')
msg = '''<h3>%s</h3>
<table cellspacing="6">
<tr>
<td><b>Link:</b></td>
<td>%s</td>
<td>%s</td>
</tr>
</table>
<table>
<tr>
<td><b>type:</b></td>
<td>%s</td>
</tr>
<tr>
<td><b>value:</b></td>
<td>%s</td>
</tr>
''' \
% (source_dest[0][1], active, weak, typestr, str(value))
if isinstance(trait_type, traits.File) \
or isinstance(trait_type, traits.Directory) \
or isinstance(trait_type, traits.Any):
if self.is_existing_path(value):
msg += ''' <tr>
<td></td>
<td>existing path</td>
</tr>
'''
elif not isinstance(trait_type, traits.Any):
msg += ''' <tr>
<td></td>
<td><font color="#a0a0a0">non-existing path</font></td>
</tr>
'''
msg += '</table>'
return msg
@staticmethod
def get_instance_type_string(value):
if value is None:
return 'None'
if value is traits.Undefined:
return 'Undefined'
if isinstance(value, (list, traits.TraitListObject)):
return 'list'
return type(value).__name__
@staticmethod
def is_existing_path(value):
if value not in (None, traits.Undefined) \
and type(value) in (str, six.text_type) and os.path.exists(value):
return True
return False
@staticmethod
def html_doc(doc_text):
# TODO: sphinx transform
text = doc_text.replace('<', '<')
text = text.replace('>', '>')
return text
def plug_tooltip_text(self, node, name):
'''Tooltip text for a node plug
'''
if node.name in ('inputs', 'outputs'):
proc = self.pipeline
splug = self.pipeline.pipeline_node.plugs[name]
else:
src = self.pipeline.nodes[node.name]
splug = src.plugs[name]
proc = src
if hasattr(src, 'process'):
proc = src.process
if splug.output:
output = '<font color="#d00000">output</font>'
else:
output = '<font color="#00d000">input</font>'
if splug.enabled:
enabled = 'enabled'
else:
enabled = '<font color="#a0a0a0">disabled</font>'
if splug.activated:
activated = 'activated'
else:
activated = '<font color="#a0a0a0">inactive</font>'
if splug.optional:
optional = '<font color="#00d000">optional</font>'
else:
optional = 'mandatory'
value = getattr(proc, name)
trait = proc.user_traits()[name]
trait_type = trait.trait_type
trait_type_str = trait_type.__class__.__name__
if trait.output and trait.input_filename is False:
trait_type_str += ', output filename'
typestr = ('%s (%s)' % (self.get_instance_type_string(value),
trait_type_str)).replace(
'<', '').replace('>', '')
msg = '''<h3>%s</h3>
<table cellspacing="6">
<tr>
<td><b>Plug:</b></td>
<td>%s</td>
<td>%s</td>
<td>%s</td>
<td>%s</td>
</tr>
</table>
<table>
<tr>
<td><b>type:</b></td>
<td>%s</td>
</tr>
<tr>
<td><b>value:</b></td>
<td>%s</td>
</tr>
''' \
% (name, output, optional, enabled, activated, typestr, str(value))
if isinstance(trait_type, traits.File) \
or isinstance(trait_type, traits.Directory) \
or isinstance(trait_type, traits.Any):
if self.is_existing_path(value):
msg += ''' <tr>
<td></td>
<td>existing path</td>
</tr>
'''
elif not isinstance(trait_type, traits.Any):
msg += ''' <tr>
<td></td>
<td><font color="#a0a0a0">non-existing path</font></td>
</tr>
'''
msg += '</table>'
desc = trait.desc
if desc:
msg += '\n<h3>Description:</h3>\n'
msg += self.html_doc(desc)
return msg
def node_tooltip_text(self, node):
process = node.process
msg = getattr(process, '__doc__', '')
# msg = self.html_doc(doc)
return msg
def _parentgnode(self, item):
if qt_backend.get_qt_backend() != 'PyQt5':
return item.parentItem()
# in PyQt5 (certain versions at least, Ubuntu 16.04) parentItem()
# returns something inappropriate, having the wrong type
# QGraphicsVideoItem, probably a cast mistake, and which leads to
# a segfault, so we have to get it a different way.
nodes = [node for node in self.gnodes.values()
if item in node.childItems()]
if len(nodes) == 1:
return nodes[0]
def helpEvent(self, event):
'''
Display tooltips on plugs and links
'''
if self.logical_view:
event.setAccepted(False)
super(PipelineScene, self).helpEvent(event)
return
item = self.itemAt(event.scenePos(), Qt.QTransform())
if isinstance(item, Link):
for source_dest, glink in six.iteritems(self.glinks):
if glink is item:
text = self.link_tooltip_text(source_dest)
item.setToolTip(text)
break
elif isinstance(item, Plug):
node = self._parentgnode(item)
found = False
for name, plug in six.iteritems(node.in_plugs):
if plug is item:
found = True
break
if not found:
for name, plug in six.iteritems(node.out_plugs):
if plug is item:
found = True
break
if found:
text = self.plug_tooltip_text(node, name)
item.setToolTip(text)
elif isinstance(item, QtGui.QGraphicsRectItem):
node = self._parentgnode(item)
if isinstance(node, NodeGWidget):
text = self.node_tooltip_text(node)
item.setToolTip(text)
elif isinstance(item, QtGui.QGraphicsProxyWidget):
# PROBLEM: tooltips in child graphics scenes seem not to popup.
#
# to force them we would have to translate the event position to
# the sub-scene position, and call the child scene helpEvent()
# method, with a custom event.
# However this is not possible, since QGraphicsSceneHelpEvent
# does not provide a public (nor even protected) constructor, and
# secondarily helpEvent() is protected.
event.setAccepted(False)
super(PipelineScene, self).helpEvent(event)
def remove_node(self, node_name):
gnode = self.gnodes[node_name]
todel = set()
for link, glink in six.iteritems(self.glinks):
if link[0][0] == node_name or link[1][0] == node_name:
self.removeItem(glink)
todel.add(link)
for link in todel:
del self.glinks[link]
self.removeItem(gnode)
del self.gnodes[node_name]
def _link_right_clicked(self, link):
# find the link in list
# print('Scene._link_right_clicked:', link)
for source_dest, glink in six.iteritems(self.glinks):
if glink is link:
self.link_right_clicked.emit(
source_dest[0][0], source_dest[0][1],
source_dest[1][0], source_dest[1][1])
break
def _link_keydelete_clicked(self, link):
for source_dest, glink in six.iteritems(self.glinks):
if glink is link:
self.link_keydelete_clicked.emit(
source_dest[0][0], source_dest[0][1],
source_dest[1][0], source_dest[1][1])
break
def _node_keydelete_clicked(self, node):
self.node_keydelete_clicked.emit(node.name)
def typeLink(self, name_node, name_plug):
if name_node in ('inputs', 'outputs'):
proc = self.pipeline
splug = self.pipeline.pipeline_node.plugs[name_plug]
else:
src = self.pipeline.nodes[name_node]
splug = src.plugs[name_plug]
proc = src
if hasattr(src, 'process'):
proc = src.process
value = getattr(proc, name_plug)
trait = proc.user_traits()[name_plug]
trait_type = trait.trait_type
trait_type_str = str(trait_type)
trait_type_str = trait_type_str[: trait_type_str.find(' object ')]
trait_type_str = trait_type_str[trait_type_str.rfind('.') + 1:]
return trait_type_str
[docs]class PipelineDevelopperView(QGraphicsView):
'''
Pipeline representation as a graph, using boxes and arrows.
Based on Qt QGraphicsView, this can be used as a Qt QWidget.
Qt signals are emitted on a right click on a node box, and on a double
click on a sub-pipeline box, to allow handling at a higher level. Default
behaviors can be enabled using constructor parameters.
Ctrl + double click opens sub-pipelines in embedded views inside their
parent box.
Attributes
----------
subpipeline_clicked
node_right_clicked
process_clicked
node_clicked
node_clicked_ctrl
plug_clicked
plug_right_clicked
link_right_clicked
colored_parameters
scene
Methods
-------
__init__
set_pipeline
is_logical_view
set_logical_view
zoom_in
zoom_out
openProcessController
add_embedded_subpipeline
onLoadSubPipelineClicked
onOpenProcessController
enableNode
enable_step
disable_preceding_steps
disable_following_steps
enable_preceding_steps
enable_following_steps
set_switch_value
disable_done_steps
enable_all_steps
check_files
auto_dot_node_positions
save_dot_image_ui
reset_initial_nodes_positions
window
'''
subpipeline_clicked = QtCore.Signal(str, Process,
QtCore.Qt.KeyboardModifiers)
'''Signal emitted when a sub pipeline has to be open.'''
process_clicked = QtCore.Signal(str, Process)
node_clicked = QtCore.Signal(str, Node)
'''Signal emitted when a node box has to be open.'''
node_clicked_ctrl = QtCore.Signal(str, Process)
'''Signal emitted when a node box has to be in the foreground.'''
switch_clicked = QtCore.Signal(str, Switch)
'''Signal emitted when a switch box has to be open.'''
node_right_clicked = QtCore.Signal(str, Controller)
'''Signal emitted when a node box is right-clicked'''
plug_clicked = QtCore.Signal(str)
'''Signal emitted when a plug is clicked'''
plug_right_clicked = QtCore.Signal(str)
'''Signal emitted when a plug is right-clicked'''
link_right_clicked = QtCore.Signal(str, str, str, str)
'''Signal emitted when a link is right-clicked'''
edit_sub_pipeline = QtCore.Signal(Pipeline)
'''Signal emitted when a sub-pipeline has to be edited'''
open_filter = QtCore.Signal(str)
'''Signal emitted when an Input Filter has to be opened'''
export_to_db_scans = QtCore.Signal(str)
'''Signal emitted when an Input Filter has to be linked to database_scans'''
link_keydelete_clicked = QtCore.Signal(str, str, str, str)
node_keydelete_clicked = QtCore.Signal(str)
scene = None
'''
type: PipelineScene
the main scene.
'''
colored_parameters = True
'''
If enabled (default), parameters in nodes boxes are displayed with color
codes representing their state, and the state of their values: output
parameters, empty values, existing files, non-existing files...
When colored_parameters is set, however, callbacks have to be installed to
track changes in traits values, so this actually has an overhead.
When colored_parameters is used, the color code is as follows:
* black pamameter name: input
* red parameter name: output
* italics parameter name: Undefined, None, or empty string value
* bold parameter name: existing file or directory name
* regular font parameter name: non-existing file, or non-file parameter type
* black plug: mandatory
* green plug: optional
* grey plug: mandatory, inactive
* light green plug: optional, inactive
* grey link: inactive
* orange link: active
* dotted line link: weak link
'''
[docs] class ProcessNameEdit(Qt.QLineEdit):
''' A specialized QLineEdit with completion for process name
'''
def __init__(self, parent=None,
class_type_check=process_instance.is_process):
super(PipelineDevelopperView.ProcessNameEdit,
self).__init__(parent)
self.compl = QtGui.QCompleter([])
self.setCompleter(self.compl)
self.textEdited.connect(self.on_text_edited)
self.py_cache = {} # cache for loaded python files
self.class_type_check = class_type_check
@staticmethod
def _execfile(filename):
glob_dict = {}
exec(compile(open(filename, "rb").read(), filename, 'exec'),
glob_dict, glob_dict)
return glob_dict
def load_py(self, filename):
if filename not in self.py_cache:
try:
self.py_cache[filename] = self._execfile(filename)
except Exception as e:
print('exception while executing file %s:' % filename, e)
return {}
return self.py_cache[filename]
def get_processes_or_modules(self, filename):
file_dict = self.load_py(filename)
processes = []
for name, item in six.iteritems(file_dict):
if self.class_type_check(item) or inspect.ismodule(item):
processes.append(name)
return processes
def on_text_edited(self, text):
compl = set()
modpath = str(text).split('.')
current_mod = None
paths = []
sel = set()
mod = None
if len(modpath) > 1:
current_mod = '.'.join(modpath[:-1])
try:
mod = importlib.import_module(current_mod)
except ImportError:
mod = None
if mod:
if os.path.basename(mod.__file__).startswith(
'__init__.py'):
paths = [os.path.dirname(mod.__file__)]
# add process/pipeline objects in current_mod
procs = [item for k, item
in six.iteritems(mod.__dict__)
if self.class_type_check(item)
or inspect.ismodule(item)]
compl.update(['.'.join([current_mod, c.__name__])
for c in procs])
if not mod:
# no current module
# is it a path name ?
pathname, filename = os.path.split(str(text))
if os.path.isdir(pathname):
# look for class in python file filename.py#classname
elements = filename.split('.py#')
if len(elements) == 2:
filename = elements[0] + '.py'
object_name = elements[1]
full_path = os.path.join(pathname, filename)
processes = self.get_processes_or_modules(full_path)
if object_name != '':
processes = [p for p in processes
if p.startswith(object_name)]
compl.update(['#'.join((full_path, p))
for p in processes])
else:
# look for matching xml files
for f in os.listdir(pathname):
if (f.endswith('.xml')
or os.path.isdir(os.path.join(pathname,
f))) \
and f.startswith(filename):
compl.add(os.path.join(pathname, f))
elif f.endswith('.py'):
compl.add(os.path.join(pathname, f))
else:
paths = sys.path
for path in paths:
if path == '':
path = '.'
try:
for f in os.listdir(path):
if f.endswith('.py'):
sel.add(f[:-3])
elif f.endswith('.pyc') or f.endswith('.pyo'):
sel.add(f[:-4])
elif f.endswith('.xml'):
sel.add(f)
elif '.' not in f \
and os.path.isdir(os.path.join(
path, f)):
sel.add(f)
except OSError:
pass
begin = modpath[-1]
cm = []
if current_mod is not None:
cm = [current_mod]
compl.update(['.'.join(cm + [f]) for f in sel \
if f.startswith(modpath[-1])])
model = self.compl.model()
model.setStringList(list(compl))
[docs] def __init__(self, pipeline=None, parent=None, show_sub_pipelines=False,
allow_open_controller=False, logical_view=False,
enable_edition=False, userlevel=0):
'''PipelineDevelopperView
Parameters
----------
pipeline: Pipeline (optional)
pipeline object to be displayed
If omitted an empty pipeline will be used, and edition mode will be
activated.
parent: QWidget (optional)
parent widget
show_sub_pipelines: bool (optional)
if set, sub-pipelines will appear as red/pink boxes and a double
click on one of them will open another window with the sub-pipeline
structure in it
allow_open_controller: bool (optional)
if set, a right click on any box will open another window with the
underlying node controller, allowing to see and edit parameters
values, switches states, etc.
logical_view: bool (optional)
if set, plugs and links between plugs are hidden, only links
between nodes are displayed.
enable_edition: bool (optional)
if set, pipeline edition features are available in GUI and menus:
adding process boxes, drawing links etc. If pipeline is not
specified, then edition will be activated anyway.
'''
super(PipelineDevelopperView, self).__init__(parent)
# self.setAlignment(QtCore.Qt.AlignTop | QtCore.Qt.AlignLeft)
self.setAlignment(QtCore.Qt.AlignCenter)
self.centerOn(0,0)
self.setRenderHints(Qt.QPainter.Antialiasing | Qt.QPainter.SmoothPixmapTransform)
self.setBackgroundBrush(QtGui.QColor(60, 60, 60))
self.scene = None
self.colored_parameters = True
self._show_sub_pipelines = show_sub_pipelines
self._allow_open_controller = allow_open_controller
self._logical_view = logical_view
self._enable_edition = enable_edition
self._pipeline_filename = ""
self._restricted_edition = False
self.disable_overwrite = False
self._userlevel = userlevel
self.set_pipeline(pipeline)
self._grab = False
self._grab_link = False
self.plug_clicked.connect(self._plug_clicked)
self.plug_right_clicked.connect(self._plug_right_clicked)
self.link_right_clicked.connect(self._link_clicked)
self.node_clicked_ctrl.connect(self._node_clicked_ctrl)
self.link_keydelete_clicked.connect(self._link_delete_clicked)
self.node_keydelete_clicked.connect(self._node_delete_clicked)
def __del__(self):
#print('PipelineDevelopperView.__del__')
self.release_pipeline(delete=True)
# super(PipelineDevelopperView, self).__del__()
@property
def userlevel(self):
return self._userlevel
@userlevel.setter
def userlevel(self, value):
self._userlevel = value
if self.scene:
self.scene.userlevel = value
for widget in self.findChildren(QtGui.QWidget):
if hasattr(widget, 'userlevel'):
widget.userlevel = value
[docs] def ensure_pipeline(self, pipeline):
'''
Check that we have a pipeline or a process
'''
if pipeline is None:
pipeline = Pipeline()
enable_edition = True
if not isinstance(pipeline, Pipeline):
if isinstance(pipeline, Process):
process = pipeline
pipeline = Pipeline()
pipeline.set_study_config(process.get_study_config())
pipeline.add_process(process.name, process)
pipeline.autoexport_nodes_parameters()
pipeline.node_position["inputs"] = (0., 0.)
pipeline.node_position[process.name] = (300., 0.)
pipeline.node_position["outputs"] = (600., 0.)
# pipeline.scene_scale_factor = 0.5
pipeline.node_dimension[process.name] = (300., 200.) #add by Irmage OM
else:
raise Exception("Expect a Pipeline or a Process, not a "
"'{0}'.".format(repr(pipeline)))
return pipeline
def _set_pipeline(self, pipeline):
pos = {}
dim = {}
if self.scene:
pos = self.scene.pos
dim = self.scene.dim #add by Irmage OM
# pprint(dict((i, (j.x(), j.y())) for i, j in six.iteritems(pos)))
if hasattr(pipeline, 'node_position'):
for i, j in six.iteritems(pipeline.node_position):
if isinstance(j, QtCore.QPointF):
pos[i] = j
else:
pos[i] = QtCore.QPointF(*j)
############### add by Irmage OM #######################
if hasattr(pipeline, 'node_dimension'):
for i, j in six.iteritems(pipeline.node_dimension):
if isinstance(j, QtCore.QPointF):
dim[i] = (j.x(), j.y())
else:
dim[i] = j
# print("_set_pipeline : ",pos," ; ",dim)
#######################################################
self.release_pipeline()
self.scene.set_pipeline(pipeline)
self.scene.pos = pos
self.scene.dim = dim
if pipeline is not None:
self.setWindowTitle(pipeline.name)
# Try to initialize the scene scale factor
if hasattr(pipeline, "scene_scale_factor"):
self.scale(
pipeline.scene_scale_factor, pipeline.scene_scale_factor)
self.reset_initial_nodes_positions()
################" add by Irmage #############################################
self.fitInView(self.sceneRect(), QtCore.Qt.KeepAspectRatio)
#############################################################################
[docs] def set_pipeline(self, pipeline):
'''
Assigns a new pipeline to the view.
'''
pipeline = self.ensure_pipeline(pipeline)
self._set_pipeline(pipeline)
if pipeline is not None:
# Setup callback to update view when pipeline state is modified
pipeline.on_trait_change(self._reset_pipeline, 'selection_changed',
dispatch='ui')
pipeline.on_trait_change(self._reset_pipeline,
'user_traits_changed', dispatch='ui')
if hasattr(pipeline, 'pipeline_steps'):
pipeline.pipeline_steps.on_trait_change(
self._reset_pipeline, dispatch='ui')
[docs] def release_pipeline(self, delete=False):
'''
Releases the pipeline currently viewed (and remove the callbacks)
If ``delete`` is set, this means the view is within deletion process
and a new scene should not be built
'''
# Setup callback to update view when pipeline state is modified
pipeline = None
if self.scene is not None:
pipeline = self.scene.pipeline
if pipeline is not None:
if hasattr(pipeline, 'pipeline_steps'):
pipeline.pipeline_steps.on_trait_change(
self._reset_pipeline, remove=True)
pipeline.on_trait_change(self._reset_pipeline, 'selection_changed',
remove=True)
pipeline.on_trait_change(self._reset_pipeline,
'user_traits_changed', remove=True)
if not delete and (pipeline is not None or self.scene is None):
self.scene = PipelineScene(self, userlevel=self.userlevel)
self.scene.set_enable_edition(self._enable_edition)
self.scene.logical_view = self._logical_view
self.scene.colored_parameters = self.colored_parameters
self.scene.subpipeline_clicked.connect(self.subpipeline_clicked)
self.scene.subpipeline_clicked.connect(self.onLoadSubPipelineClicked)
self.scene.process_clicked.connect(self._node_clicked)
self.scene.node_clicked.connect(self._node_clicked)
self.scene.node_clicked_ctrl.connect(self._node_clicked_ctrl)
self.scene.switch_clicked.connect(self.switch_clicked)
self.scene.node_right_clicked.connect(self.node_right_clicked)
self.scene.node_right_clicked.connect(self.onOpenProcessController)
self.scene.plug_clicked.connect(self.plug_clicked)
self.scene.plug_right_clicked.connect(self.plug_right_clicked)
self.scene.link_right_clicked.connect(self.link_right_clicked)
self.scene.link_keydelete_clicked.connect(self.link_keydelete_clicked)
self.scene.node_keydelete_clicked.connect(self.node_keydelete_clicked)
self.scene.pos = {}
self.scene.dim = {}
self.setWindowTitle('<no pipeline>')
self.setScene(self.scene)
[docs] def is_logical_view(self):
'''
in logical view mode, plugs and links between plugs are hidden, only
links between nodes are displayed.
'''
return self._logical_view
[docs] def set_logical_view(self, state):
'''
in logical view mode, plugs and links between plugs are hidden, only
links between nodes are displayed.
Parameters
----------
state: bool (mandatory)
to set/unset the logical view mode
'''
self._logical_view = state
self._reset_pipeline()
def _reset_pipeline(self):
# self._set_pipeline(pipeline)
self.scene.logical_view = self._logical_view
self.scene.update_pipeline()
[docs] def zoom_in(self):
'''
Zoom the view in, applying a 1.2 zoom factor
'''
self.scale(1.2, 1.2)
[docs] def zoom_out(self):
'''
Zoom the view out, applying a 1/1.2 zool factor
'''
self.scale(1.0 / 1.2, 1.0 / 1.2)
[docs] def edition_enabled(self):
'''
Get the editable state
'''
return self._enable_edition
[docs] def set_enable_edition(self, state=True):
'''
Set the editable state. Edition allows to modify a pipeline: adding /
removing process boxes and switches, drawing links, etc.
'''
self._enable_edition = state
self.scene.set_enable_edition(state)
[docs] def is_restricted_edition_mode(self):
'''
Get the restricted mode status
Returns
-------
enabled: bool
'''
return self._restricted_edition
[docs] def set_restricted_edition_mode(self, enabled):
'''
Set the restricted edition mode. In restricted mode, some background
menu actions ("add process", "open node controller"...) are not
available.
Parameters
----------
enabled: bool
'''
self._restricted_edition = enabled
def wheelEvent(self, event):
done = False
if event.modifiers() == QtCore.Qt.ControlModifier:
item = self.itemAt(event.pos())
if not isinstance(item, QtGui.QGraphicsProxyWidget):
done = True
if qt_backend.get_qt_backend() == 'PyQt5':
delta = event.angleDelta().y()
else:
delta = event.delta()
if delta < 0:
self.zoom_out()
else:
self.zoom_in()
event.accept()
if not done:
super(PipelineDevelopperView, self).wheelEvent(event)
def mousePressEvent(self, event):
super(PipelineDevelopperView, self).mousePressEvent(event)
if not event.isAccepted():
if event.button() == QtCore.Qt.RightButton:
self.open_background_menu()
else:
self._grab = True
self._grabpos = event.pos()
# print("background clicked")
for source_dest, glink in six.iteritems(self.scene.glinks):
glink.fonced_viewer(False)
for node_name, gnode in six.iteritems(self.scene.gnodes):
gnode.fonced_viewer(False)
def mouseReleaseEvent(self, event):
self._grab = False
if self._grab_link:
event.accept()
try:
self._release_grab_link(event)
except Exception:
print("source to destination types are not compatible")
super(PipelineDevelopperView, self).mouseReleaseEvent(event)
self.scene.update()
def mouseMoveEvent(self, event):
if self._grab:
event.accept()
translation = event.pos() - self._grabpos
self._grabpos = event.pos()
self.horizontalScrollBar().setValue(
self.horizontalScrollBar().value() - int(translation.x()))
self.verticalScrollBar().setValue(
self.verticalScrollBar().value() - int(translation.y()))
elif self._grab_link:
self._move_grab_link(event)
event.accept()
else:
super(PipelineDevelopperView, self).mouseMoveEvent(event)
[docs] def add_embedded_subpipeline(self, subpipeline_name, scale=None):
'''
Adds an embedded sub-pipeline inside its parent node.
'''
gnode = self.scene.gnodes.get(str(subpipeline_name))
if gnode is not None:
sub_pipeline \
= self.scene.pipeline.nodes[str(subpipeline_name)].process
gnode.add_subpipeline_view(
sub_pipeline, self._allow_open_controller, scale=scale)
[docs] def onLoadSubPipelineClicked(self, node_name, sub_pipeline, modifiers):
""" Event to load a open a sub-pipeline view.
If ctrl is pressed the new view will be embedded in its parent node
box.
"""
if self._show_sub_pipelines:
if modifiers & QtCore.Qt.ControlModifier:
try:
self.add_embedded_subpipeline(node_name)
return
except KeyError:
print('node not found in:')
print(list(self.scene.gnodes.keys()))
sub_view = PipelineDevelopperView(
sub_pipeline,
show_sub_pipelines=self._show_sub_pipelines,
allow_open_controller=self._allow_open_controller,
enable_edition=self.edition_enabled(),
logical_view=self._logical_view, userlevel=self.userlevel)
# set self.window() as QObject parent (not QWidget parent) to
# prevent the sub_view to close/delete immediately
QtCore.QObject.setParent(sub_view, self.window())
sub_view.setAttribute(QtCore.Qt.WA_DeleteOnClose)
sub_view.setWindowTitle(node_name)
if hasattr(self, 'doc_browser'):
sub_view.doc_browser = self.doc_browser
self.scene.update()
sub_view.show()
[docs] def window(self):
'''
window() is overloaded from QWidget.window() to handle embedded views
cases.
A PipelineDevelopperView may be displayed inside a NodeGWidget.
In this case, we want to go up to the parent scene's window to the
"real" top window, where QWidget.window() will end in the current
graphics scene.
'''
if hasattr(self, '_graphics_item'):
return self._graphics_item.scene().views()[0].window()
else:
return super(PipelineDevelopperView, self).window()
[docs] def onOpenProcessController(self, node_name, process):
""" Event to open a sub-process/sub-pipeline controller
"""
if self._allow_open_controller:
self.open_node_menu(node_name, process)
[docs] def openProcessController(self):
sub_view = QtGui.QScrollArea()
node_name = self.current_node_name
if node_name in ('inputs', 'outputs'):
node_name = ''
process = self.scene.pipeline.nodes[node_name]
if hasattr(process, 'process'):
process = process.process
# force instantiating a completion engine (since
# AttributedProcessWidget does not force it)
if hasattr(process, 'get_study_config'): # exclude custom nodes
engine = process.get_study_config().engine
from capsul.attributes.completion_engine \
import ProcessCompletionEngine
ce = ProcessCompletionEngine.get_completion_engine(process)
cwidget = AttributedProcessWidget(
process, enable_attr_from_filename=True, enable_load_buttons=True,
userlevel=self.userlevel)
sub_view.setWidget(cwidget)
sub_view.setWidgetResizable(True)
sub_view.setAttribute(QtCore.Qt.WA_DeleteOnClose)
sub_view.setWindowTitle(self.current_node_name)
# try to resize to a width that doesn't need an horizontal scrollbar
sub_view.resize(
cwidget.controller_widget.parent().parent().sizeHint().width(),
sub_view.sizeHint().height())
sub_view.show()
# set self.window() as QObject parent (not QWidget parent) to
# prevent the sub_view to close/delete immediately
QtCore.QObject.setParent(sub_view, self.window())
def emit_export_to_db_scans(self):
self.export_to_db_scans.emit(self.current_node_name)
def emit_open_filter(self):
self.open_filter.emit(self.current_node_name)
def emit_edit_sub_pipeline(self):
node = self.scene.pipeline.nodes[self.current_node_name]
sub_pipeline = node.process
if isinstance(sub_pipeline, weakref.ProxyTypes):
# get the "real" object
sub_pipeline = sub_pipeline.__init__.__self__
self.edit_sub_pipeline.emit(sub_pipeline)
[docs] def show_optional_outputs(self):
'''
Added to choose to visualize optional outputs.
'''
gnode = self.scene.gnodes[self.current_node_name]
connected_plugs = []
# The show_opt_outputs attribute is not changed yet
if gnode.show_opt_outputs:
# Verifying that the plugs are not connected to another node
for param, pipeline_plug in six.iteritems(gnode.parameters):
output = (not pipeline_plug.output if gnode.name in (
'inputs', 'outputs') else pipeline_plug.output)
if output:
if pipeline_plug.optional and pipeline_plug.links_to and gnode.show_opt_outputs:
connected_plugs.append(param)
if connected_plugs:
if len(connected_plugs) == 1:
text = "Please remove links from this plug:\n"
else:
text = "Please remove links from these plugs:\n"
for plug_name in connected_plugs:
text += plug_name + ", "
text = text[:-2] + '.'
msg = QMessageBox()
msg.setIcon(QMessageBox.Critical)
msg.setText(text)
msg.setWindowTitle("Error while changing the view of the node")
msg.setStandardButtons(QMessageBox.Ok | QMessageBox.Cancel)
msg.exec_()
return
# Changing the show_opt_outputs attribute
gnode.change_output_view()
self.scene.update_pipeline()
[docs] def enableNode(self, checked):
if self.current_node_name in ['inputs', 'outputs']:
node_name = ''
else:
node_name = self.current_node_name
self.scene.pipeline.nodes[node_name].enabled = checked
[docs] def enable_step(self, step_name, state):
setattr(self.scene.pipeline.pipeline_steps, step_name, state)
[docs] def disable_preceding_steps(self, step_name, dummy):
# don't know why we get this additionall dummy parameter (False)
steps = self.scene.pipeline.pipeline_steps
for step in steps.user_traits():
if step == step_name:
break
setattr(steps, step, False)
[docs] def disable_following_steps(self, step_name, dummy):
steps = self.scene.pipeline.pipeline_steps
found = False
for step in steps.user_traits():
if found:
setattr(steps, step, False)
elif step == step_name:
found = True
[docs] def enable_preceding_steps(self, step_name, dummy):
steps = self.scene.pipeline.pipeline_steps
for step in steps.user_traits():
if step == step_name:
break
setattr(steps, step, True)
[docs] def enable_following_steps(self, step_name, dummy):
steps = self.scene.pipeline.pipeline_steps
found = False
for step in steps.user_traits():
if found:
setattr(steps, step, True)
elif step == step_name:
found = True
[docs] def set_switch_value(self, switch, value, dummy):
switch.switch = value
[docs] def disable_done_steps(self):
pipeline_tools.disable_runtime_steps_with_existing_outputs(
self.scene.pipeline)
[docs] def enable_all_steps(self):
self.scene.pipeline.enable_all_pipeline_steps()
[docs] def check_files(self):
overwritten_outputs = pipeline_tools.nodes_with_existing_outputs(
self.scene.pipeline)
missing_inputs = pipeline_tools.nodes_with_missing_inputs(
self.scene.pipeline)
if len(overwritten_outputs) == 0 and len(missing_inputs) == 0:
QtGui.QMessageBox.information(
self, 'Pipeline ready', 'All input files are available. '
'No output file will be overwritten.')
else:
dialog = QtGui.QWidget()
layout = QtGui.QVBoxLayout(dialog)
warn_widget = PipelineFileWarningWidget(
missing_inputs, overwritten_outputs)
layout.addWidget(warn_widget)
hlay = QtGui.QHBoxLayout()
layout.addLayout(hlay)
hlay.addStretch()
ok = QtGui.QPushButton('OK')
self.ok_button = ok
hlay.addWidget(ok)
ok.clicked.connect(dialog.close)
dialog.show()
self._warn_files_widget = dialog
[docs] def auto_dot_node_positions(self):
'''
Calculate pipeline nodes positions using graphviz/dot, and place the
pipeline view nodes accordingly.
'''
scene = self.scene
scale = 67. # dpi
nodes_sizes = dict([(name,
(gnode.boundingRect().width(),
gnode.boundingRect().height()))
for name, gnode in six.iteritems(scene.gnodes)])
dgraph = pipeline_tools.dot_graph_from_pipeline(
scene.pipeline, nodes_sizes=nodes_sizes)
tfile, tfile_name = tempfile.mkstemp()
os.close(tfile)
pipeline_tools.save_dot_graph(dgraph, tfile_name)
toutfile, toutfile_name = tempfile.mkstemp()
os.close(toutfile)
cmd = ['dot', '-Tplain', '-o', toutfile_name, tfile_name]
soma.subprocess.check_call(cmd)
nodes_pos = self._read_dot_pos(toutfile_name)
rects = dict([(name, node.boundingRect())
for name, node in six.iteritems(scene.gnodes)])
pos = dict([(name, (-rects[name].width() / 2 + pos[0] * scale,
-rects[name].height() / 2 - pos[1] * scale))
for id, name, pos in nodes_pos])
minx = min([x[0] for x in six.itervalues(pos)])
miny = min([x[1] for x in six.itervalues(pos)])
pos = dict([(name, (p[0] - minx, p[1] - miny))
for name, p in six.iteritems(pos)])
# print('pos:')
# print(pos)
scene.pos = pos
for node, position in six.iteritems(pos):
gnode = scene.gnodes[node]
if isinstance(position, Qt.QPointF):
gnode.setPos(position)
else:
gnode.setPos(*position)
os.unlink(tfile_name)
os.unlink(toutfile_name)
def _read_dot_pos(self, filename):
'''
Read the nodes positions from a file generated by graphviz/dot, in
"plain" text format.
Returns
-------
nodes_pos: dict
keys are nodes IDs (names), and values are 2D positions
'''
fileobj = open(filename)
nodes_pos = []
if sys.version_info[0] >= 3:
file_iter = fileobj.readlines()
else:
file_iter = fileobj
for line in file_iter:
if line.startswith('node'):
line_els0 = line.split()
line_els = []
for el in line_els0:
if el.startswith('"') and el.endswith('"'):
line_els.append(el[1:-1])
else:
line_els.append(el)
id = line_els[1]
pos = tuple([float(x) for x in line_els[2:4]])
name = line_els[6]
nodes_pos.append((id, name, pos))
elif line.startswith('edge'):
break
return nodes_pos
[docs] def save_dot_image_ui(self):
'''
Ask for a filename using the file dialog, and save a graphviz/dot
representation of the pipeline.
The pipeline representation follows the current visualization mode
("regular" or "logical" with smaller boxes) with one link of a given
type (active, weak) between two given boxes: all parameters are not
represented.
'''
file_dialog = QtGui.QFileDialog(filter='Images (*.png *.xpm *.jpg *.ps *.eps);; All (*)')
file_dialog.setDefaultSuffix('.png')
file_dialog.setFileMode(QtGui.QFileDialog.AnyFile)
file_dialog.setAcceptMode(QtGui.QFileDialog.AcceptSave)
if file_dialog.exec_():
filename = file_dialog.selectedFiles()
'''filename = QtGui.QFileDialog.getSaveFileName(
None, 'Save image of the pipeline', '',
'Images (*.png *.xpm *.jpg *.ps *.eps);; All (*)')'''
if filename:
pipeline_tools.save_dot_image(self.scene.pipeline, filename[0])
[docs] def reset_initial_nodes_positions(self):
'''
Set each pipeline node to its "saved" position, ie the one which may
be found in the "node_position" variable of the pipeline.
'''
scene = self.scene
if scene.pipeline is None:
return
# ############## add by Irmage OM ###################
# dim = getattr(scene.pipeline, 'node_dimension')
# if dim is not None:
# scene.dim = dim
# print()
# for node, dimension in six.iteritems(dim):
# gnode = scene.gnodes.get(node)
# if gnode is not None:
# if isinstance(dimension, QtCore.QPointF):
# dimension = (dim.x(),dim.y())
# # else:
# # dimension = dim.width(),dim.height()
# gnode.update(0,0,*dimension)
# #####################################################
pos = getattr(scene.pipeline, 'node_position')
if pos is not None:
scene.pos = pos
for node, position in six.iteritems(pos):
gnode = scene.gnodes.get(node)
if gnode is not None:
if isinstance(position, QtCore.QPointF):
position = (position.x(), position.y())
gnode.setPos(*position)
def switch_logical_view(self):
self.set_logical_view(not self.is_logical_view())
def print_node_positions(self):
def conv_pos(p):
if isinstance(p, Qt.QPointF):
return (p.x(), p.y())
return p
posdict = dict([(key, conv_pos(value)) \
for key, value in six.iteritems(self.scene.pos)])
pprint(posdict)
def del_node(self, node_name=None):
pipeline = self.scene.pipeline
if not node_name:
node_name = self.current_node_name
node = pipeline.nodes[node_name]
pipeline.remove_node(node_name)
self.scene.remove_node(node_name)
self.scene.pipeline.update_nodes_and_plugs_activation()
def export_node_plugs(self, node_name, inputs=True, outputs=True,
optional=False):
pipeline = self.scene.pipeline
node = pipeline.nodes[node_name]
for parameter_name, plug in six.iteritems(node.plugs):
if parameter_name in ("nodes_activation", "selection_changed"):
continue
if (((node_name, parameter_name) not in pipeline.do_not_export and
((outputs and plug.output and not plug.links_to) or
(inputs and not plug.output and not plug.links_from)) and
(optional or not node.get_trait(parameter_name).optional))):
pipeline.export_parameter(node_name, parameter_name)
def export_plugs(self, inputs=True, outputs=True, optional=False):
for node_name in self.scene.pipeline.nodes:
if node_name != "":
self.export_node_plugs(node_name, inputs=inputs,
outputs=outputs, optional=optional)
def export_node_unconnected_mandatory_plugs(self):
self.export_node_plugs(self.current_node_name)
def export_node_all_unconnected_plugs(self):
self.export_node_plugs(self.current_node_name, optional=True)
def export_node_unconnected_mandatory_inputs(self):
self.export_node_plugs(
self.current_node_name, inputs=True, outputs=False)
def export_node_all_unconnected_inputs(self):
self.export_node_plugs(
self.current_node_name, inputs=True, outputs=False, optional=True)
def export_node_unconnected_mandatory_outputs(self):
self.export_node_plugs(
self.current_node_name, inputs=False, outputs=True)
def export_node_all_unconnected_outputs(self):
self.export_node_plugs(
self.current_node_name, inputs=False, outputs=True, optional=True)
def export_unconnected_mandatory_plugs(self):
self.export_plugs()
def export_all_unconnected_plugs(self):
self.export_plugs(optional=True)
def export_unconnected_mandatory_inputs(self):
self.export_plugs(inputs=True, outputs=False)
def export_all_unconnected_inputs(self):
self.export_plugs(inputs=True, outputs=False, optional=True)
def export_unconnected_mandatory_outputs(self):
self.export_plugs(inputs=False, outputs=True)
def export_all_unconnected_outputs(self):
self.export_plugs(inputs=False, outputs=True, optional=True)
def _change_step(self):
node_name = self.current_node_name
node = self.scene.pipeline.nodes[node_name]
steps = getattr(self.scene.pipeline, 'pipeline_steps', None)
steps_defined = True
if steps is None:
steps = Controller()
steps_defined = False
wid = Qt.QDialog()
wid.setModal(True)
lay = Qt.QVBoxLayout()
wid.setLayout(lay)
listw = Qt.QListWidget()
listw.setSelectionMode(listw.MultiSelection)
lay.addWidget(listw)
n = 0
for step in steps.user_traits():
listw.addItem(step)
nodes = steps.trait(step).nodes
if node_name in nodes:
item = listw.item(n)
item.setSelected(True)
n += 1
addlay = Qt.QHBoxLayout()
lay.addLayout(addlay)
addb = Qt.QPushButton('+')
addlay.addWidget(addb)
remb = Qt.QPushButton('-')
addlay.addWidget(remb)
def add_clicked():
d = Qt.QDialog()
d.setModal(True)
la = Qt.QHBoxLayout()
d.setLayout(la)
l = Qt.QLineEdit()
la.addWidget(l)
l.returnPressed.connect(d.accept)
r = d.exec_()
if r:
name = l.text()
if name not in steps.user_traits():
n = listw.count()
listw.addItem(name)
listw.item(n).setSelected(True)
def remove_clicked():
selected = []
for i in range(listw.count()):
item = listw.item(i)
if item.isSelected():
selected.append((i, item.text()))
if len(selected) != 0:
r = Qt.QMessageBox.question(
wid, 'remove steps',
'remove the following steps from the whole pipeline ?\n%s'
% repr([s[1] for s in selected]))
if r == Qt.QMessageBox.Yes:
for s in reversed(selected):
listw.takeItem(s[0])
def up_clicked():
selected = []
for i in range(listw.count()):
item = listw.item(i)
if item.isSelected():
selected.append(i)
if len(selected) != 0 and selected[0] != 0:
for i in selected:
item = listw.takeItem(i)
listw.insertItem(i-1, item)
item.setSelected(True)
def down_clicked():
selected = []
for i in range(listw.count()):
item = listw.item(i)
if item.isSelected():
selected.append(i)
if len(selected) != 0 and selected[-1] != listw.count() - 1:
for i in reversed(selected):
item = listw.takeItem(i)
listw.insertItem(i + 1, item)
item.setSelected(True)
addb.clicked.connect(add_clicked)
remb.clicked.connect(remove_clicked)
up = Qt.QPushButton('^')
addlay.addWidget(up)
down = Qt.QPushButton('v')
addlay.addWidget(down)
up.clicked.connect(up_clicked)
down.clicked.connect(down_clicked)
oklay = Qt.QHBoxLayout()
lay.addLayout(oklay)
ok = Qt.QPushButton('OK')
oklay.addWidget(ok)
cancel = Qt.QPushButton('Cancel')
oklay.addWidget(cancel)
ok.clicked.connect(wid.accept)
cancel.clicked.connect(wid.reject)
res = wid.exec_()
if res:
items = set()
sitems = []
for i in range(listw.count()):
item = listw.item(i)
name = item.text()
sel = item.isSelected()
items.add(name)
sitems.append(name)
trait = steps.trait(name)
if sel:
if trait is None:
self.scene.pipeline.add_pipeline_step(
name, [node_name])
steps = self.scene.pipeline.pipeline_steps
else:
nodes = steps.trait(name).nodes
if node_name not in nodes:
nodes.append(node_name)
elif trait is not None:
if node_name in trait.nodes:
trait.nodes.remove(node_name)
steps = list(steps.user_traits().keys())
for step in steps:
if step not in items:
self.scene.pipeline.remove_pipeline_step(step)
# reorder traits if needed
steps = self.scene.pipeline.pipeline_steps
if list(steps.user_traits().keys()) != sitems:
values = [steps.trait(step).nodes for step in sitems]
for step in sitems:
steps.remove_trait(step)
for step, nodes in zip(sitems, values):
self.scene.pipeline.add_pipeline_step(step, nodes)
self.scene.update_pipeline()
class ProcessModuleInput(QtGui.QDialog):
def __init__(self, display_str='process module/name',
class_type_check=process_instance.is_process):
super(PipelineDevelopperView.ProcessModuleInput, self).__init__()
self.setWindowTitle('%s:' % display_str)
layout = QtGui.QGridLayout(self)
layout.addWidget(QtGui.QLabel('module/process:'), 0, 0)
self.proc_line = PipelineDevelopperView.ProcessNameEdit(
class_type_check=class_type_check)
layout.addWidget(self.proc_line, 0, 1)
layout.addWidget(QtGui.QLabel('node name'), 1, 0)
self.name_line = QtGui.QLineEdit()
layout.addWidget(self.name_line, 1, 1)
# hlay = QtGui.QHBoxLayout()
# layout.addLayout(hlay, 1, 1)
ok = QtGui.QPushButton('OK')
layout.addWidget(ok, 2, 0)
cancel = QtGui.QPushButton('Cancel')
layout.addWidget(cancel, 2, 1)
ok.clicked.connect(self.accept)
cancel.clicked.connect(self.reject)
[docs] def add_process(self):
'''
Insert a process node in the pipeline. Asks for the process
module/name, and the node name before inserting.
'''
proc_name_gui = PipelineDevelopperView.ProcessModuleInput()
proc_name_gui.resize(800, proc_name_gui.sizeHint().height())
res = proc_name_gui.exec_()
if res:
proc_module = six.text_type(proc_name_gui.proc_line.text())
node_name = str(proc_name_gui.name_line.text())
pipeline = self.scene.pipeline
engine = pipeline.get_study_config().engine
try:
process = engine.get_process_instance(
six.text_type(proc_name_gui.proc_line.text()))
except Exception as e:
print(e)
return
pipeline.add_process(node_name, process)
node = pipeline.nodes[node_name]
gnode = self.scene.add_node(node_name, node)
gnode.setPos(self.mapToScene(self.mapFromGlobal(self.click_pos)))
[docs] def add_node(self):
'''
Insert a custom node in the pipeline. Asks for the node
module/name, and the node name before inserting.
'''
def configure_node(cls):
conf_controller = cls.configure_controller()
print('configure_node crl:', conf_controller.export_to_dict())
w = Qt.QDialog()
w.setWindowTitle('Custom node parameterization')
l = Qt.QVBoxLayout()
w.setLayout(l)
c = ScrollControllerWidget(conf_controller, live=True)
l.addWidget(c)
h = Qt.QHBoxLayout()
l.addLayout(h)
ok = Qt.QPushButton('OK')
h.addWidget(ok)
cancel = Qt.QPushButton('Cancel')
h.addWidget(cancel)
ok.clicked.connect(w.accept)
cancel.clicked.connect(w.reject)
res = w.exec_()
if res:
return conf_controller
else:
return None
def get_node_instance(class_str, pipeline):
print('get_node_instance:', class_str)
cls_and_name = process_instance.get_node_class(class_str)
print('cls:', cls_and_name)
if cls_and_name is None:
return None
name, cls = cls_and_name
print('name:', name, ', cls:', cls)
if hasattr(cls, 'configure_controller'):
conf_controller = configure_node(cls)
if conf_controller is None:
return None # abort
else:
conf_controller = Controller()
print('controller:', conf_controller.export_to_dict())
if hasattr(cls, 'build_node'):
node = cls.build_node(pipeline, name, conf_controller)
else:
# probably bound to fail...
node = cls(pipeline, name, [], [])
return node
def is_pipeline_node(item):
return item is not Node and isinstance(item, Node)
node_name_gui = PipelineDevelopperView.ProcessModuleInput(
display_str='node module/name', class_type_check=is_pipeline_node)
node_name_gui.resize(800, node_name_gui.sizeHint().height())
res = node_name_gui.exec_()
if res:
node_module = six.text_type(node_name_gui.proc_line.text())
node_name = str(node_name_gui.name_line.text())
pipeline = self.scene.pipeline
try:
node = get_node_instance(
six.text_type(node_name_gui.proc_line.text()), pipeline)
print('Node:', node)
except Exception as e:
print(e)
return
if node is None:
return
pipeline.nodes[node_name] = node
gnode = self.scene.add_node(node_name, node)
gnode.setPos(self.mapToScene(self.mapFromGlobal(self.click_pos)))
class IterativeProcessInput(ProcessModuleInput):
def __init__(self, engine):
super(PipelineDevelopperView.IterativeProcessInput,
self).__init__()
# hlay = Qt.QHBoxLayout()
# self.layout().addLayout(hlay)
lay = self.layout()
item = lay.itemAtPosition(2, 0)
widget = item.widget()
lay.removeItem(item)
lay.addWidget(widget, 3, 0)
item = lay.itemAtPosition(2, 1)
widget = item.widget()
lay.removeItem(item)
lay.addWidget(widget, 3, 1)
lay.addWidget(Qt.QLabel('iterative plugs:'), 2, 0)
self.plugs = Qt.QListWidget()
self.plugs.setEditTriggers(Qt.QListWidget.NoEditTriggers)
self.plugs.setSelectionMode(Qt.QListWidget.ExtendedSelection)
lay.addWidget(self.plugs, 2, 1)
self.proc_line.textChanged.connect(self.set_plugs)
# self.proc_line.editingFinished.connect(self.set_plugs)
self.engine = engine
def set_plugs(self, text):
self.plugs.clear()
try:
process = self.engine.get_process_instance(text)
except Exception:
return
traits = list(process.user_traits().keys())
self.plugs.addItems(traits)
def iterative_plugs(self):
return [item.text() for item in self.plugs.selectedItems()]
[docs] def add_iterative_process(self):
'''
Insert an iterative process node in the pipeline. Asks for the process
module/name, the node name, and iterative plugs before inserting.
'''
pipeline = self.scene.pipeline
engine = pipeline.get_study_config().engine
proc_name_gui = PipelineDevelopperView.IterativeProcessInput(engine)
proc_name_gui.resize(800, proc_name_gui.sizeHint().height())
res = proc_name_gui.exec_()
if res:
proc_module = six.text_type(proc_name_gui.proc_line.text())
node_name = str(proc_name_gui.name_line.text())
try:
process = engine.get_process_instance(
six.text_type(proc_name_gui.proc_line.text()))
except Exception as e:
print(e)
return
iterative_plugs = proc_name_gui.iterative_plugs()
do_not_export = list(process.user_traits().keys())
pipeline.add_iterative_process(node_name, process, iterative_plugs,
do_not_export=do_not_export)
node = pipeline.nodes[node_name]
gnode = self.scene.add_node(node_name, node)
gnode.setPos(self.mapToScene(self.mapFromGlobal(self.click_pos)))
[docs] def add_switch(self):
'''
Insert a switch node in the pipeline. Asks for the switch
inputs/outputs, and the node name before inserting.
'''
class SwitchInput(QtGui.QDialog):
def __init__(self):
super(SwitchInput, self).__init__()
self.setWindowTitle('switch parameters/name:')
layout = QtGui.QGridLayout(self)
layout.addWidget(QtGui.QLabel('inputs:'), 0, 0)
self.inputs_line = QtGui.QLineEdit()
layout.addWidget(self.inputs_line, 0, 1)
layout.addWidget(QtGui.QLabel('outputs:'), 1, 0)
self.outputs_line = QtGui.QLineEdit()
layout.addWidget(self.outputs_line, 1, 1)
layout.addWidget(QtGui.QLabel('node name'), 2, 0)
self.name_line = QtGui.QLineEdit()
layout.addWidget(self.name_line, 2, 1)
ok = QtGui.QPushButton('OK')
layout.addWidget(ok, 3, 0)
cancel = QtGui.QPushButton('Cancel')
layout.addWidget(cancel, 3, 1)
ok.clicked.connect(self.accept)
cancel.clicked.connect(self.reject)
switch_name_gui = SwitchInput()
switch_name_gui.resize(600, switch_name_gui.sizeHint().height())
res = switch_name_gui.exec_()
if res:
pipeline = self.scene.pipeline
node_name = str(switch_name_gui.name_line.text()).strip()
inputs = str(switch_name_gui.inputs_line.text()).split()
outputs = str(switch_name_gui.outputs_line.text()).split()
pipeline.add_switch(node_name, inputs, outputs)
# add_switch triggers an update
gnode = self.scene.gnodes[node_name]
gnode.setPos(self.mapToScene(self.mapFromGlobal(self.click_pos)))
[docs] def add_optional_output_switch(self):
'''
Insert an optional output switch node in the pipeline. Asks for the
switch inputs/outputs, and the node name before inserting.
'''
class SwitchInput(QtGui.QDialog):
def __init__(self):
super(SwitchInput, self).__init__()
self.setWindowTitle('switch parameters/name:')
layout = QtGui.QGridLayout(self)
layout.addWidget(QtGui.QLabel('input:'), 0, 0)
self.inputs_line = QtGui.QLineEdit()
layout.addWidget(self.inputs_line, 0, 1)
layout.addWidget(QtGui.QLabel('output:'), 1, 0)
self.outputs_line = QtGui.QLineEdit()
layout.addWidget(self.outputs_line, 1, 1)
layout.addWidget(QtGui.QLabel('node name'), 2, 0)
self.name_line = QtGui.QLineEdit()
layout.addWidget(self.name_line, 2, 1)
ok = QtGui.QPushButton('OK')
layout.addWidget(ok, 3, 0)
cancel = QtGui.QPushButton('Cancel')
layout.addWidget(cancel, 3, 1)
ok.clicked.connect(self.accept)
cancel.clicked.connect(self.reject)
switch_name_gui = SwitchInput()
switch_name_gui.resize(600, switch_name_gui.sizeHint().height())
res = switch_name_gui.exec_()
if res:
pipeline = self.scene.pipeline
node_name = str(switch_name_gui.name_line.text()).strip()
input = str(switch_name_gui.inputs_line.text()).strip()
output = str(switch_name_gui.outputs_line.text()).strip()
if output == '' and node_name != '':
output = node_name
elif output != '' and node_name == '':
node_name = output
pipeline.add_optional_output_switch(node_name, input, output)
# add_optional_output_switch does *not* trigger an update
self._reset_pipeline()
gnode = self.scene.gnodes[node_name]
gnode.setPos(self.mapToScene(self.mapFromGlobal(self.click_pos)))
def _plug_clicked(self, name):
if self.is_logical_view() or not self.edition_enabled():
# in logival view, links are not editable since they do not reflect
# the details of reality
return
node_name, plug_name = str(name).split(':')
plug_name = str(plug_name)
gnode = self.scene.gnodes[node_name]
plug = gnode.out_plugs.get(plug_name)
typeq = self.scene.typeLink(node_name, plug_name)
try:
# color = self.scene.colorLink(typeq)
color = self.scene.colType.colorLink(typeq)
except Exception:
color = ORANGE_2
if not plug:
return # probably an input plug
plug_pos = plug.mapToScene(plug.mapFromParent(plug.get_plug_point()))
self._grabpos = self.mapFromScene(plug_pos)
self._temp_link = Link(
plug_pos,
self.mapToScene(self.mapFromGlobal(QtGui.QCursor.pos())),
True, False, color)
self._temp_link.pen.setBrush(RED_2)
self.scene.addItem(self._temp_link)
self._grab_link = True
self._grabbed_plug = (node_name, plug_name)
def _move_grab_link(self, event):
pos = self.mapToScene(event.pos())
self._temp_link.update(self.mapToScene(self._grabpos), pos)
def _release_grab_link(self, event, ret=False):
max_square_dist = 100.
self._grab_link = False
# delete the temp link
self.scene.removeItem(self._temp_link)
del self._temp_link
pos = self.mapToScene(event.pos())
item = self.scene.itemAt(pos, Qt.QTransform())
plug = None
if isinstance(item, Link):
# look for its dest plug
plug = None
for source_dest, link in six.iteritems(self.scene.glinks):
if link is item:
plug = source_dest[1]
break
if plug is not None:
# check the plug is not too far from the drop point
gnode = self.scene.gnodes[plug[0]]
gplug = gnode.in_plugs[plug[1]]
plug_pos = gplug.mapToScene(
gplug.mapFromParent(gplug.get_plug_point()))
pdiff = plug_pos - pos
dist2 = pdiff.x() * pdiff.x() + pdiff.y() * pdiff.y()
if dist2 > max_square_dist:
plug = None
elif isinstance(item, Plug):
plug = str(item.name).split(':')
if plug is not None:
if self._grabbed_plug[0] not in ('', 'inputs'):
src = '%s.%s' % self._grabbed_plug
else:
src = self._grabbed_plug[1]
if plug[0] not in ('', 'outputs'):
dst = '%s.%s' % tuple(plug)
else:
dst = plug[1]
# if (src != dst) and ("inputs."+src != dst) and not self.isInputYet(dst) :
if (src != dst) and ("inputs." + src != dst):
self.scene.pipeline.add_link('%s->%s' % (src, dst))
self.scene.update_pipeline()
if ret:
self._grabbed_plug = None
return '%s->%s' % (src, dst)
self._grabbed_plug = None
# def isInputYet(self,dest):##################################################################### add by OM
# for listK in self.scene.glinks.keys():
# if ( eval(str(eval(str(listK))[1]))[0]+"."+ eval(str(eval(str(listK))[1]))[1]==dest or eval(str(eval(str(listK))[1]))[0]+"."+ eval(str(eval(str(listK))[1]))[1]=="outputs."+dest):
# print("input '",dest, "' already used !!")
# return True
# return False
def _node_delete_clicked(self, name_node):
#
if name_node not in ('inputs', 'outputs'):
self.current_node_name = name_node
self.del_node()
def _link_delete_clicked(self, src_node, src_plug, dst_node, dst_plug):
src_node = str(src_node)
src_plug = str(src_plug)
dst_node = str(dst_node)
dst_plug = str(dst_plug)
# print(src_node,",",src_plug,",",dst_node,",",dst_plug)
if self.is_logical_view() or not self.edition_enabled():
# in logical view, links are not real links
return
if src_node in ('', 'inputs'):
src = src_plug
snode = self.scene.pipeline.pipeline_node
else:
src = '%s.%s' % (src_node, src_plug)
snode = self.scene.pipeline.nodes[src_node]
if dst_node in ('', 'outputs'):
dst = dst_plug
dnode = self.scene.pipeline.pipeline_node
else:
dst = '%s.%s' % (dst_node, dst_plug)
dnode = self.scene.pipeline.nodes[dst_node]
name = '%s->%s' % (src, dst)
self._current_link = name # (src_node, src_plug, dst_node, dst_plug)
self._del_link()
del self._current_link
def _link_clicked(self, src_node, src_plug, dst_node, dst_plug):
src_node = str(src_node)
src_plug = str(src_plug)
dst_node = str(dst_node)
dst_plug = str(dst_plug)
if self.is_logical_view() or not self.edition_enabled():
# in logical view, links are not real links
return
if src_node in ('', 'inputs'):
src = src_plug
snode = self.scene.pipeline.pipeline_node
else:
src = '%s.%s' % (src_node, src_plug)
snode = self.scene.pipeline.nodes[src_node]
if dst_node in ('', 'outputs'):
dst = dst_plug
dnode = self.scene.pipeline.pipeline_node
else:
dst = '%s.%s' % (dst_node, dst_plug)
dnode = self.scene.pipeline.nodes[dst_node]
name = '%s->%s' % (src, dst)
self._current_link = name # (src_node, src_plug, dst_node, dst_plug)
self._current_link_def = (src_node, src_plug, dst_node, dst_plug)
menu = QtGui.QMenu('Link: %s' % name)
title = menu.addAction('Link: %s' % name)
title.setEnabled(False)
menu.addSeparator()
weak = False
splug = snode.plugs[src_plug]
for link in splug.links_to:
if link[0] == dst_node and link[1] == dst_plug:
weak = link[4]
break
weak_action = menu.addAction('Weak link')
weak_action.setCheckable(True)
weak_action.setChecked(bool(weak))
weak_action.toggled.connect(self._change_weak_link)
menu.addSeparator()
del_link = menu.addAction('Delete link')
del_link.triggered.connect(self._del_link)
menu.exec_(QtGui.QCursor.pos())
del self._current_link
del self._current_link_def
def get_doc_browser(self, create=False):
doc_browser = getattr(self, 'doc_browser', None)
if doc_browser or not create:
return doc_browser
try:
# use the newer Qt5 QtWebEngine
from soma.qt_gui.qt_backend import QtWebEngine
from soma.qt_gui.qt_backend.QtWebEngineWidgets \
import QWebEngineView, QWebEnginePage
use_webengine = True
except ImportError:
from soma.qt_gui.qt_backend import QtWebKit
QWebEngineView = QtWebKit.QWebView
QWebPage = QtWebKit.QWebPage
QWebEnginePage = QWebPage
use_webengine = False
self._use_webengine = use_webengine
self.doc_browser = QWebEngineView()
self.doc_browser.show()
return self.doc_browser
def _node_clicked(self, name, node):
self.show_node_doc(node)
if isinstance(node, Process):
self.process_clicked.emit(name, node)
else:
self.node_clicked.emit(name, node)
@staticmethod
def get_node_html_doc(node):
doc_path = getattr(node, '_doc_path', None)
if doc_path and os.path.isabs(doc_path):
return doc_path
modname = node.__module__
init_modname = modname
while True:
mod = sys.modules[modname]
mod_doc_path = getattr(mod, '_doc_path', None)
if mod_doc_path:
if doc_path:
return os.path.join(mod_doc_path, doc_path)
node_type = 'process'
if isinstance(node, Pipeline):
node_type = 'pipeline'
path = os.path.join(
mod_doc_path, node_type,
'%s.html' % '.'.join((node.__module__,
node.__class__.__name__)))
if os.path.exists(path) or path.startswith('http://') \
or path.startswith('https://'):
return path
# try using the 1st sub-module
modsplit = init_modname.split('.')
if len(modsplit) >= 3:
path = os.path.join(
mod_doc_path, modsplit[1], node_type,
'%s.html' % '.'.join((node.__module__,
node.__class__.__name__)))
if os.path.exists(path) or path.startswith('http://') \
or path.startswith('https://'):
return path
return None
s = modname.rsplit('.', 1)
if len(s) == 1:
break
modname = s[0]
def show_doc(self, node_name=None):
pipeline = self.scene.pipeline
if not node_name:
node_name = self.current_node_name
if node_name in ('inputs', 'outputs'):
node = pipeline.pipeline_node
else:
node = pipeline.nodes[node_name]
if isinstance(node, ProcessNode):
node = node.process
doc_browser = self.get_doc_browser(create=True)
self.show_node_doc(node)
def show_node_doc(self, node):
doc_browser = self.get_doc_browser()
if doc_browser:
doc_path = self.get_node_html_doc(node)
if doc_path:
if not doc_path.startswith('http://') \
and not doc_path.startswith('https://') \
and not doc_path.startswith('file://'):
doc_path = 'file://%s' % os.path.abspath(doc_path)
doc_browser.setUrl(Qt.QUrl(doc_path))
else:
gethelp = getattr(node, 'get_help')
msg = None
if gethelp:
msg = node.get_help(returnhelp=True)
if not msg:
msg = node.getattr(node, '__doc__', None)
if msg:
doc_browser.setContent(Qt.QByteArray(msg.encode('utf-8')),
'text/plain')
def _node_clicked_ctrl(self, name, process):
for source_dest, glink in six.iteritems(self.scene.glinks):
glink.fonced_viewer(False)
# print("source-dest ",source_dest)
if name not in str(source_dest):
glink.fonced_viewer(True)
# else:
# print(source_dest[0])
for node_name, gnode in six.iteritems(self.scene.gnodes):
# print(" node_name",node_name)
gnode.fonced_viewer(False)
if name not in str(node_name):
gnode.fonced_viewer(True)
def _change_weak_link(self, weak):
# src_node, src_plug, dst_node, dst_plug = self._current_link
link_def = self._current_link
self.scene.pipeline.remove_link(link_def)
self.scene.pipeline.add_link(link_def, weak_link=weak)
self.scene.update_pipeline()
def _del_link(self):
print('\nRemoving the link: ', self._current_link)
src_node, src_plug, dst_node, dst_plug = self._current_link_def
link_def = self._current_link
pipeline = self.scene.pipeline
pipeline.remove_link(link_def)
if (src_node in ('', 'inputs') and
len(pipeline.pipeline_node.plugs[src_plug].links_to) == 0):
# remove orphan pipeline plug
pipeline.remove_trait(src_plug)
elif (dst_node in ('', 'outputs') and
len(pipeline.pipeline_node.plugs[dst_plug].links_from) == 0):
# remove orphan pipeline plug
pipeline.remove_trait(dst_plug)
self.scene.update_pipeline()
def _plug_right_clicked(self, name):
for node_name, gnode in six.iteritems(self.scene.gnodes):
if node_name in 'inputs':
self.inputYet = True
if node_name in 'outputs':
self.outputYet = False
if self.is_logical_view() or not self.edition_enabled():
# in logival view, links are not editable since they do not reflect
# the details of reality
return
node_name, plug_name = str(name).split(':')
plug_name = str(plug_name)
if node_name in ('inputs', 'outputs'):
node = self.scene.pipeline.pipeline_node
else:
node = self.scene.pipeline.nodes[node_name]
plug = node.plugs[plug_name]
output = plug.output
self._temp_node = node
self._temp_plug = plug
self._temp_plug_name = (node_name, plug_name)
menu = QtGui.QMenu('Plug: %s' % name)
title = menu.addAction('Plug: %s' % name)
title.setEnabled(False)
menu.addSeparator()
if node_name not in ('inputs', 'outputs'):
# not a main node: allow export
if output:
links = plug.links_to
else:
links = plug.links_from
existing = False
for link in links:
if link[0] == '':
existing = True
break
export_action = menu.addAction('export plug')
export_action.triggered.connect(self._export_plug)
if existing:
export_action.setEnabled(False)
if isinstance(node, ProcessNode) \
and isinstance(node.process, ProcessIteration):
iter_action = menu.addAction('iterative plug')
iter_action.setCheckable(True)
iter_action.setChecked(
plug_name in node.process.iterative_parameters)
iter_action.toggled[bool].connect(self._change_iterative_plug)
else:
del_plug = menu.addAction('Remove plug')
del_plug.triggered.connect(self._remove_plug)
edit_plug = menu.addAction('Rename / edit plug')
edit_plug.triggered.connect(self._edit_plug)
menu.exec_(QtGui.QCursor.pos())
del self._temp_plug
del self._temp_plug_name
del self._temp_node
class _PlugEdit(QtGui.QDialog):
def __init__(self, show_weak=True, parent=None):
super(PipelineDevelopperView._PlugEdit, self).__init__(parent)
layout = QtGui.QVBoxLayout(self)
hlay1 = QtGui.QHBoxLayout()
layout.addLayout(hlay1)
hlay1.addWidget(QtGui.QLabel('Plug name:'))
self.name_line = QtGui.QLineEdit()
hlay1.addWidget(self.name_line)
hlay2 = QtGui.QHBoxLayout()
layout.addLayout(hlay2)
self.optional = QtGui.QCheckBox('Optional')
hlay2.addWidget(self.optional)
if show_weak:
self.weak = QtGui.QCheckBox('Weak link')
hlay2.addWidget(self.weak)
hlay3 = QtGui.QHBoxLayout()
layout.addLayout(hlay3)
ok = QtGui.QPushButton('OK')
hlay3.addWidget(ok)
cancel = QtGui.QPushButton('Cancel')
hlay3.addWidget(cancel)
ok.clicked.connect(self.accept)
cancel.clicked.connect(self.reject)
def _export_plug(self):
dial = self._PlugEdit()
dial.name_line.setText(self._temp_plug_name[1])
dial.optional.setChecked(self._temp_plug.optional)
res = dial.exec_()
if res:
# for node_name, gnode in six.iteritems(self.scene.gnodes):
# print("list Nodes",node_name)
try:
self.scene.pipeline.export_parameter(
self._temp_plug_name[0], self._temp_plug_name[1],
pipeline_parameter=str(dial.name_line.text()),
is_optional=dial.optional.isChecked(),
weak_link=dial.weak.isChecked())
# print(str(dial.name_line.text()))
# self.scene.gnodes.changeHmin(15)
except Exception as e:
print('exception while export plug:', e)
pass
self.scene.update_pipeline()
def _change_iterative_plug(self, checked):
node = self._temp_node
node_name, name = self._temp_plug_name
node.process.change_iterative_plug(name, checked)
self.scene.update_pipeline()
def _remove_plug(self):
if self._temp_plug_name[0] in ('inputs', 'outputs'):
# print 'remove plug:', self._temp_plug_name[1]
print('#' * 50)
print(self._temp_plug_name)
print(self._temp_plug)
for trait_name, trait in self.scene.pipeline.traits().items():
print(trait_name, trait)
if trait.handler is None:
print('HANDLER IS NONE')
else:
print('HANDLER:', trait.handler)
if trait.has_items:
print("HANDLER HAS ITEMS")
self.scene.pipeline.remove_trait(self._temp_plug_name[1])
self.scene.update_pipeline()
def _edit_plug(self):
dial = self._PlugEdit(show_weak=False)
dial.name_line.setText(self._temp_plug_name[1])
dial.name_line.setEnabled(False) ## FIXME
dial.optional.setChecked(self._temp_plug.optional)
res = dial.exec_()
if res:
plug = self._temp_plug
plug.optional = dial.optional.isChecked()
# print 'TODO.'
self.scene.update_pipeline()
def _prune_plugs(self):
pipeline = self.scene.pipeline
pnode = pipeline.pipeline_node
to_del = []
for plug_name, plug in six.iteritems(pnode.plugs):
if plug.output and len(plug.links_from) == 0:
to_del.append(plug_name)
elif not plug.output and len(plug.links_to) == 0:
to_del.append(plug_name)
for plug_name in to_del:
pipeline.remove_trait(plug_name)
self.scene.update_pipeline()
def confirm_erase_pipeline(self):
if len(self.scene.pipeline.nodes) <= 1:
return True
confirm = Qt.QMessageBox.warning(
self,
'New pipeline',
'The current pipeline will be lost. Continue ?',
Qt.QMessageBox.Ok | Qt.QMessageBox.Cancel,
Qt.QMessageBox.Cancel)
if confirm != Qt.QMessageBox.Ok:
return False
return True
def new_pipeline(self):
if not self.confirm_erase_pipeline():
return
w = Qt.QDialog(self)
w.setModal(True)
w.setWindowTitle('Pipeline name')
l = Qt.QVBoxLayout()
w.setLayout(l)
le = Qt.QLineEdit()
l.addWidget(le)
l2 = Qt.QHBoxLayout()
l.addLayout(l2)
ok = Qt.QPushButton('OK')
l2.addWidget(ok)
cancel = Qt.QPushButton('Cancel')
l2.addWidget(cancel)
ok.clicked.connect(w.accept)
cancel.clicked.connect(w.reject)
le.returnPressed.connect(w.accept)
res = w.exec_()
if res:
class_kwargs = {
'__module__': '__main__',
'do_autoexport_nodes_parameters': False,
'node_position': {},
'node_dimension': {}
}
name = le.text()
if type(name) is not str: # unicode ?
name = name.encode()
pipeline_class = type(name, (Pipeline,), class_kwargs)
pipeline = pipeline_class()
self.set_pipeline(pipeline)
self._pipeline_filename = ''
def load_pipeline(self, filename='', load_pipeline=True):
class LoadProcessUi(Qt.QDialog):
def __init__(self, parent=None, old_filename=''):
super(LoadProcessUi, self).__init__(parent)
self.old_filename = old_filename
lay = Qt.QVBoxLayout()
self.setLayout(lay)
l2 = Qt.QHBoxLayout()
lay.addLayout(l2)
l2.addWidget(Qt.QLabel('Pipeline:'))
self.proc_edit = PipelineDevelopperView.ProcessNameEdit()
l2.addWidget(self.proc_edit)
self.loadbt = Qt.QPushButton('...')
l2.addWidget(self.loadbt)
l3 = Qt.QHBoxLayout()
lay.addLayout(l3)
ok = Qt.QPushButton('OK')
l3.addWidget(ok)
cancel = Qt.QPushButton('Cancel')
l3.addWidget(cancel)
ok.clicked.connect(self.accept)
cancel.clicked.connect(self.reject)
self.loadbt.clicked.connect(self.get_filename)
self.proc_edit.returnPressed.connect(self.accept)
def get_filename(self):
filename = qt_backend.getOpenFileName(
None, 'Load the pipeline', self.old_filename,
'Compatible files (*.xml *.py);; All (*)')
if filename:
self.proc_edit.setText(filename)
if not self.confirm_erase_pipeline():
return
if not filename:
old_filename = getattr(self, '_pipeline_filename', '')
dialog = LoadProcessUi(self, old_filename=old_filename)
dialog.setWindowTitle('Load pipeline')
dialog.setModal(True)
dialog.resize(800, dialog.sizeHint().height())
res = dialog.exec_()
if res:
filename = dialog.proc_edit.text()
if filename:
if not load_pipeline:
return filename
else:
try:
if self.scene.pipeline:
# keep the same engine
engine = self.scene.pipeline.get_study_config().engine
pipeline = engine.get_process_instance(filename)
else:
pipeline = get_process_instance(filename)
except Exception as e:
print(e)
pipeline = None
if pipeline is not None:
self.set_pipeline(pipeline)
self._pipeline_filename = filename
return filename
[docs] def save_pipeline(self):
'''
Ask for a filename using the file dialog, and save the pipeline as a
XML or python file.
'''
pipeline = self.scene.pipeline
old_filename = getattr(self, '_pipeline_filename', '')
filename = qt_backend.getSaveFileName(
None, 'Save the pipeline', old_filename,
'Compatible files (*.xml *.py);; All (*)')
if filename:
posdict = {}
for key, value in six.iteritems(self.scene.pos):
if hasattr(value, 'x'):
posdict[key] = (value.x(), value.y())
else:
posdict[key] = (value[0], value[1])
dimdict = {}
for key, value in six.iteritems(self.scene.dim):
if hasattr(value, 'boundingRect'):
dimdict[key] = (value.boundingRect().width(),
value.boundingRect().height())
else:
dimdict[key] = (value[0], value[1])
pipeline.node_dimension = dimdict
old_pos = pipeline.node_position
old_dim = pipeline.node_dimension
pipeline.node_position = posdict
pipeline_tools.save_pipeline(pipeline, filename)
self._pipeline_filename = six.text_type(filename)
pipeline.node_position = old_pos
pipeline.node_dimension = old_dim
#def load_pipeline_parameters(self):
#"""
#Loading and setting pipeline parameters (inputs and outputs) from a Json file.
#"""
#pipeline = self.scene.pipeline
#filename = qt_backend.getOpenFileName(
#None, 'Load pipeline parameters', '',
#'Compatible files (*.json)')
#pipeline_tools.load_pipeline_parameters(filename, pipeline)
#def save_pipeline_parameters(self):
#"""
#Saving pipeline parameters (inputs and outputs) to a Json file.
#"""
#pipeline = self.scene.pipeline
#filename = qt_backend.getSaveFileName(
#None, 'Save pipeline parameters', '',
#'Compatible files (*.json)')
#pipeline_tools.save_pipeline_parameters(filename, pipeline)
[docs] def load_pipeline_parameters(self, root_path=''):
"""
Loading and setting pipeline parameters (inputs and outputs) from a Json file.
:return:
"""
def hinted_tuple_hook(obj):
if '__tuple__' in obj:
return tuple(obj['items'])
else:
return obj
filename = qt_backend.getOpenFileName(
None, 'Load the pipeline parameters', root_path,
'Compatible files (*.json)')
if filename:
with io.open(filename, 'r', encoding='utf8') as fileJson:
dic = json.load(fileJson)
dic = json.loads(dic, object_hook=hinted_tuple_hook)
if "pipeline_parameters" not in list(dic.keys()):
raise KeyError('No "pipeline_parameters" key found in {0}.'.format(filename))
for trait_name, trait_value in dic["pipeline_parameters"].items():
if trait_name not in list(self.scene.pipeline.user_traits().keys()):
print('No "{0}" parameter in pipeline.'.format(trait_name))
try:
setattr(self.scene.pipeline, trait_name, trait_value)
except traits.TraitError:
print("Error for the plug {0}".format(trait_name))
self.scene.pipeline.update_nodes_and_plugs_activation()
[docs] def save_pipeline_parameters(self):
"""
Saving pipeline parameters (inputs and outputs) to a Json file.
:return:
"""
class MultiDimensionalArrayEncoder(json.JSONEncoder):
def encode(self, obj):
def hint_tuples(item):
if isinstance(item, tuple):
return {'__tuple__': True,
'items': [hint_tuples(e) for e in item]}
if isinstance(item, list):
return [hint_tuples(e) for e in item]
if isinstance(item, dict):
return dict((key, hint_tuples(value)) for key, value in
item.items())
else:
return item
return super(MultiDimensionalArrayEncoder, self).encode(
hint_tuples(obj))
pipeline = self.scene.pipeline
filename = qt_backend.getSaveFileName(
None, 'Save the pipeline parameters', '',
'Compatible files (*.json)')
if not filename: # save widget was cancelled by the user
return ''
if os.path.splitext(filename)[1] == '': # which means no extension
filename += '.json'
elif os.path.splitext(filename)[1] != '.json':
msg = QMessageBox()
msg.setIcon(QMessageBox.Warning)
msg.setText('The parameters must be saved in the ".json" format, '
'not the "{0}" format'.format(
os.path.splitext(filename)[1]))
msg.setWindowTitle("Warning")
msg.setStandardButtons(QMessageBox.Ok)
msg.buttonClicked.connect(msg.close)
msg.exec_()
self.save_pipeline_parameters()
return ''
if os.path.exists(filename) and self.disable_overwrite:
msg = QMessageBox()
msg.setIcon(QMessageBox.Warning)
msg.setText('This file already exists, you do not have the '
'rights to overwrite it.')
msg.setWindowTitle("Warning")
msg.setStandardButtons(QMessageBox.Ok)
msg.buttonClicked.connect(msg.close)
msg.exec_()
self.save_pipeline_parameters()
return ''
if filename:
from traits.api import Undefined
# Generating the dictionary
param_dic = {}
for trait_name, trait in pipeline.user_traits().items():
if trait_name in ["nodes_activation"]:
continue
value = getattr(pipeline, trait_name)
if value is Undefined:
value = ""
param_dic[trait_name] = value
# In the future, more information may be added to this dictionary
dic = {}
dic["pipeline_parameters"] = param_dic
jsonstring = MultiDimensionalArrayEncoder().encode(dic)
# Saving the dictionary in the Json file
if sys.version_info[0] >= 3:
with open(filename, 'w', encoding='utf8') as file:
json.dump(jsonstring, file)
else:
with open(filename, 'w') as file:
json.dump(jsonstring, file)