{ "cells": [ { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "# CAPSUL: chain algorithms in pipelines" ] }, { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "

Capsul is a simple and efficient Python tool that aims to organize a set of processings.\n", "It is accessible to everybody, and is reusable in various contexts.\n", "The project is hosted on github: https://github.com/populse/capsul.

\n", "\n", "

Documentation: http://populse.github.io/capsul\n", "

\n", "\n", "

The following examples are using CAPSUL, and PyQt (or PySide). To get the GUI running in a non-blocking way, the IPython notebook should be started with the option --gui=qt:\n", "

ipython notebook --gui=qt
\n", "Otherwise calls to the Qt loop will be blocking until windows are closed at each demo step.\n", "

\n", "\n", "

\n", "Definitions\n", "\n", "

\n", "

\n", "\n", "

\n", "First check\n", "\n", "In order to test if capsul is installed on your machine, you can ask the the Capsul version:\n", "

\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "deletable": true, "editable": true }, "outputs": [], "source": [ "# just to ensure compatibility of this notebook with python 2 and 3\n", "from __future__ import print_function\n", "# the following to avoid display when this notebook is converted to sphinx doc\n", "import os\n", "if os.environ.get('ALLOW_GUI', 'TRUE') in ('FALSE', '0'):\n", " use_gui = False\n", "else:\n", " %gui qt4\n", " use_gui = True\n", " from soma.qt_gui import qt_backend\n", " qt_backend.set_qt_backend()" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "deletable": true, "editable": true }, "outputs": [], "source": [ "import capsul\n", "print(capsul.__version__)" ] }, { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "

Process and pipeline creation API

\n", "\n", "A process can be either a Process class instance, or a wrapping of a function\n", "\n", "

Process and parameters

" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "deletable": true, "editable": true }, "outputs": [], "source": [ "# Capsul import\n", "from capsul.api import Process\n", "\n", "# Trait import\n", "from traits.api import Float\n", "\n", "class Process1(Process):\n", " f = Float(output=False)\n", "\n", " def __init__(self):\n", " super(Process1, self).__init__()\n", " self.add_trait(\"ff\", Float(output=False))\n", " self.add_trait(\"out\", Float(output=True))\n", " \n", " def _run_process(self):\n", " self.out = self.f + self.ff\n", " print('Process1 execution, f:', self.f, ', ff:', self.ff)\n", "\n", "process = Process1()\n", "print(process.user_traits().keys())\n", "\n", "process.ff = 132.6\n", "process.f = 13.3\n", "\n", "#execution\n", "process()\n", "print('Process result =', process.out)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "deletable": true, "editable": true }, "outputs": [], "source": [ "from traits.api import Str\n", "\n", "class Process2(Process):\n", " \n", " def __init__(self):\n", " super(Process2, self).__init__()\n", " self.add_trait(\"a\", Float(output=True))\n", " self.add_trait(\"b\", Str(output=False))\n", " \n", " def get_commandline(self):\n", " return ['echo', 'Process2 execution, a: %f, b: %s' % (self.a, self.b)]\n" ] }, { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "

Use a function as a building block

\n", "\n", "It is possible to convert a function in Process and thus use it as a building block of a pipeline. In the following example we will use an addition simple Python function:\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "deletable": true, "editable": true }, "outputs": [], "source": [ "from capsul.process.xml import xml_process\n", "from capsul.api import get_process_instance\n", "\n", "@xml_process('''\n", "\n", " \n", " \n", " \n", "\n", "''')\n", "def add(a, b):\n", " return a + b" ] }, { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "

This is a pure Python function with an XML Process description in the @xml_process decorator. Inside <process> and </process> elements, each input parameters are described as well as the returned value. The parameters are typed and a description is asked in order to generate proper tooltips or documentations. See XML specifications for more information.

\n", "\n", "

We can now create a Process from this Python function:

" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "deletable": true, "editable": true }, "outputs": [], "source": [ "from capsul.api import get_process_instance\n", "\n", "process = get_process_instance('__main__.add')\n" ] }, { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "We can set some input parameters and execute the process:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "deletable": true, "editable": true }, "outputs": [], "source": [ "process.a = 40\n", "process.b = 2\n", "process()\n", "print(process.a, '+', process.b, '=', process.addition)\n" ] }, { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "

Output files

\n", "There is a particular case for output files: a file is a filename (string) and the file itself. When the file itself is an output, the filename may in some cases be an input, when the output filename is specified in advance, or an output when the filename itself is generated by the process and output by it. For the case output file + input filename, we have to specify that the filename is actually an input, and to which parameter it is attached:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "deletable": true, "editable": true }, "outputs": [], "source": [ "@xml_process('''\n", "\n", " \n", " \n", " \n", "\n", "''')\n", "def cat(a, b, c):\n", " with open(c, 'w') as f:\n", " f.write(open(a).read())\n", " f.write(open(b).read())\n", "\n", "process = get_process_instance('__main__.cat')" ] }, { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "

Pipeline

\n", "\n", "

A pipeline uses processes, or sub-pipelines, in order to define a full processing chain, with links between building blocks. A pipeline may be defined either using the Python API, as a Pipeline subclass, or using a XML definition file.\n", "

\n", "\n", "

Pipeline API

" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "deletable": true, "editable": true }, "outputs": [], "source": [ "from capsul.api import Pipeline\n", "\n", "class Pipeline1(Pipeline):\n", "\n", " def pipeline_definition(self):\n", " # Create processes\n", " self.add_process(\"node1\", Process1())\n", " self.add_process(\"node2\", Process2())\n", " \n", "pipeline1 = Pipeline1()" ] }, { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "

Viewing / debugging a pipeline

\n", "

Pipeline structure

" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "deletable": true, "editable": true }, "outputs": [], "source": [ "if globals().get('use_gui', True):\n", " import sys\n", " # note that the qt backend may be either PyQt4 or PySide.\n", " from soma.qt_gui.qt_backend import QtGui\n", " from capsul.qt_gui.widgets import PipelineDevelopperView\n", "\n", " # here we determine whether the Qt GUI is already running or not.\n", " run_qt_loop = False\n", " if QtGui.QApplication.instance() is None:\n", " app = QtGui.QApplication(sys.argv)\n", " run_qt_loop = True\n", " else:\n", " app = QtGui.QApplication.instance()\n", " # in the following we will reuse this run_qt_loop variable for simplicity\n", "\n", " # now the real thing for pipeline viewing \n", " view1 = PipelineDevelopperView(pipeline1)\n", " view1.show()\n", "\n", " if run_qt_loop:\n", " print('close window to gon on...')\n", " app.exec_()" ] }, { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "It should display something like this:\n", "![capsul pipeline view](images/capsul_pipeline1.jpg \"capsul pipeline view\")\n", "\n", "#### Entering parameters" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "deletable": true, "editable": true }, "outputs": [], "source": [ "if globals().get('use_gui', True):\n", " from soma.qt_gui.controller_widget import ControllerWidget\n", "\n", " controller1 = ControllerWidget(pipeline1, live=True)\n", " controller1.show()\n", " controller2 = ControllerWidget(pipeline1, live=True)\n", " controller2.show()\n", " if run_qt_loop:\n", " app.exec_()" ] }, { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "![controller views](images/capsul_controller1.jpg \"controller views\")\n", "\n", "### Links and exportations" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "deletable": true, "editable": true }, "outputs": [], "source": [ "class Pipeline2(Pipeline):\n", "\n", " def pipeline_definition(self):\n", " # Create processes\n", " self.add_process(\"node1\", Process1())\n", " self.add_process(\"node2\", Process2())\n", " # links\n", " self.add_link('node2.a->node1.ff')\n", " # custom exports\n", " self.export_parameter(\"node2\", \"b\", \"node_string\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "deletable": true, "editable": true }, "outputs": [], "source": [ "pipeline2 = Pipeline2()\n", "if globals().get('use_gui', True):\n", " view2 = PipelineDevelopperView(pipeline2)\n", " view2.show()\n", " if run_qt_loop:\n", " app.exec_()" ] }, { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "![pipeline with links](images/capsul_pipeline2.jpg \"pipeline with links\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "deletable": true, "editable": true }, "outputs": [], "source": [ "pipeline2.f = 13.2\n", "pipeline2.node_string = \"blop\"\n", "pipeline2()" ] }, { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "

Defining a Pipeline from XML a file

\n", "\n", "A Pipeline can be described from an xml file. For the documentation of the description glossary, please refer to the capsul documentation. In the following example we will use the 'xml_pipeline.xml' test description. See XML specifications for more information." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "deletable": true, "editable": true, "scrolled": false }, "outputs": [], "source": [ "import os\n", "import capsul.process.test as test\n", "\n", "xmldesc = os.path.join(os.path.dirname(test.__file__), \"xml_pipeline.xml\")\n", "with open(xmldesc, \"r\") as openfile:\n", " print(\"\".join(openfile.readlines()))" ] }, { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "Two building blocks are connected in this example. We will soon have a graphical representation of the pipeline, which in turn will clarify the xml sections. But first we must create a Pipeline from this xml description:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "deletable": true, "editable": true }, "outputs": [], "source": [ "from capsul.api import get_process_instance\n", "\n", "xmlpipe = get_process_instance(\"capsul.process.test.xml_pipeline\")\n", "xmlpipe.help()" ] }, { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "One major advantage of the capsul pipeline system is to be able to represent graphically the processing sequence:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "deletable": true, "editable": true }, "outputs": [], "source": [ "if globals().get('use_gui', True):\n", " import sys\n", " from soma.qt_gui.qt_backend import QtGui\n", " from capsul.qt_gui.widgets import PipelineDevelopperView\n", " from soma.qt_gui.controller_widget import ControllerWidget\n", "\n", " view = PipelineDevelopperView(xmlpipe)\n", " controller = ControllerWidget(xmlpipe, live=True)\n", " view.show()\n", " controller.show()\n", " if run_qt_loop:\n", " app.exec_()" ] }, { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "![xml pipeline](images/capsul_pipeline_xml1.jpg \"xml pipeline\")\n", "![xml pipeline controller](images/capsul_pipeline_xml1_controller.jpg \"xml pipeline controller\")\n", "\n", "## Switches\n", "\n", "In Capsul it is possible to define a building block which aims to select a sequence of processings. It is done with a Switch building block as follows:\n", "\n", "### Using the Python API" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "deletable": true, "editable": true }, "outputs": [], "source": [ "class Pipeline3(Pipeline):\n", " \n", " def pipeline_definition(self):\n", " # Create processes\n", " self.add_process(\"node1\", Process1())\n", " self.add_process(\"node2\", Process2())\n", " self.add_switch(\"switch\", [\"case1\", \"case2\"], [\"output\"])\n", " #links\n", " self.add_link(\"node1.out->switch.case1_switch_output\")\n", " self.add_link(\"node2.a->switch.case2_switch_output\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "deletable": true, "editable": true }, "outputs": [], "source": [ "pipeline3 = Pipeline3()\n", "if globals().get('use_gui', True):\n", " view3 = PipelineDevelopperView(pipeline3, allow_open_controller=True, show_sub_pipelines=True)\n", " view3.show()\n", " if run_qt_loop:\n", " app.exec_()" ] }, { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "![pipeline3](images/capsul_pipeline3.jpg \"pipeline3\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "deletable": true, "editable": true }, "outputs": [], "source": [ "pipeline3.switch = \"case2\"\n", "\n", "if globals().get('use_gui', True):\n", " view3.show()\n", " if run_qt_loop:\n", " app.exec_()" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "deletable": true, "editable": true }, "outputs": [], "source": [ "pipeline3(b='yup')" ] }, { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "

Using XML definition

" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "deletable": true, "editable": true }, "outputs": [], "source": [ "import os\n", "import capsul.process.test as test\n", "\n", "xmldesc = os.path.join(os.path.dirname(test.__file__), \"test_pipeline.xml\")\n", "with open(xmldesc, \"r\") as openfile:\n", " print(\"\".join(openfile.readlines()))" ] }, { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "Again we can create a Pipeline from his xml description:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "deletable": true, "editable": true }, "outputs": [], "source": [ "from capsul.api import get_process_instance\n", "\n", "xmlpipe = get_process_instance(\"capsul.process.test.test_pipeline\")\n", "xmlpipe.help()" ] }, { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "And generate its graphical representation:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "deletable": true, "editable": true }, "outputs": [], "source": [ "if globals().get('use_gui', True):\n", " from soma.qt_gui.qt_backend import QtGui\n", " from capsul.qt_gui.widgets import PipelineDevelopperView\n", " from soma.qt_gui.controller_widget import ControllerWidget\n", "\n", " view = PipelineDevelopperView(xmlpipe)\n", " controller = ControllerWidget(xmlpipe, live=True)\n", " view.show()\n", " controller.show()\n", " if run_qt_loop:\n", " app.exec_()\n" ] }, { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "![xml pipeline](images/capsul_pipeline_xml2.jpg \"xml pipeline\")\n", "![xml pipeline controller](images/capsul_pipeline_xml2_controller.jpg \"xml pipeline controller\")\n", "\n", "## Iteration over a process or pipeline\n", "A process can be used several times in parallel: some of its parameters will be changed for lists" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "deletable": true, "editable": true }, "outputs": [], "source": [ "@xml_process('''\n", " \n", " \n", "\n", "''')\n", "def sum_node(inputs):\n", " return sum(inputs)\n", "\n", "@xml_process('''\n", " \n", " \n", "\n", "''')\n", "def add_12(a):\n", " return a + 12\n", "\n", "class Pipeline4(Pipeline):\n", " \n", " def pipeline_definition(self):\n", " self.add_iterative_process(\"add_12\", '__main__.add_12', iterative_plugs=['a', 'b'])\n", " self.add_process(\"sum_node\", '__main__.sum_node')\n", " self.add_link('add_12.b->sum_node.inputs')\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "deletable": true, "editable": true }, "outputs": [], "source": [ "pipeline4 = Pipeline4()\n", "if globals().get('use_gui', True):\n", " view4 = PipelineDevelopperView(pipeline4, allow_open_controller=True, show_sub_pipelines=True)\n", " view4.show()\n", " if run_qt_loop:\n", " app.exec_()" ] }, { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "![iteration](images/capsul_iter1.jpg \"iteration\")\n", "The green node here is an iterative node, it can be opened (double-clicked) to see what is inside it." ] }, { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "Set input parameters and run.\n", "\n", "We set 3 inputs, 4.2, 6.8 and 7.\n", "\n", "Each should be added 12, then all outputs will be summed up.\n", "\n", "This should make (4.2 + 12) + (6.8 + 12) + (7. + 12) = 54." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "deletable": true, "editable": true }, "outputs": [], "source": [ "pipeline4.a = [4.2, 6.8, 7.]\n", "pipeline4()\n", "print(pipeline4.addition)\n", "assert(pipeline4.addition == 54)" ] }, { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "

Optional outputs and temporary files

\n", "Let's consider the following pipeline, with an **optional output** from the intermediate output:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "deletable": true, "editable": true }, "outputs": [], "source": [ "@xml_process('''\n", " \n", " \n", "\n", "''')\n", "def node_first(input, output):\n", " lines = open(input).readlines()\n", " with open(output, 'w') as f:\n", " f.write('\\n'.join(['* %s *' % l[:-1] for l in lines]))\n", "\n", "@xml_process('''\n", " \n", " \n", "\n", "''')\n", "def node_second(a, b):\n", " lines = open(a).readlines()\n", " with open(b, 'w') as f:\n", " f.write(''.join([l.replace('*', '^') for l in lines]))\n", "\n", "class Pipeline5(Pipeline):\n", " \n", " def pipeline_definition(self):\n", " self.add_process(\"node1\", '__main__.node_first')\n", " self.add_process(\"node2\", '__main__.node_second')\n", " self.add_link('node1.output->node2.a')\n", " self.export_parameter('node1', 'output', 'intermediate', is_optional=True)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "deletable": true, "editable": true }, "outputs": [], "source": [ "pipeline5 = Pipeline5()\n", "if globals().get('use_gui', True):\n", " view5 = PipelineDevelopperView(pipeline5, allow_open_controller=True, show_sub_pipelines=True)\n", " view5.show()\n", " if run_qt_loop:\n", " app.exec_()" ] }, { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "![pipeline5](images/capsul_pipeline5.jpg \"pipeline5\")\n", "\n", "But there is really a problem here now: if we do not specify pipeline5.intermediate, even when it is optional, the pipeline will not work because *node1* needs to write its output into a file. As its *output* is exported, it will not get a temporary value. Thus in the current state, *pipeline5.intermediate* is actually mandatory." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "deletable": true, "editable": true }, "outputs": [], "source": [ "infile = '/tmp/dummy.txt'\n", "outfile = '/tmp/dummy_modified.txt'\n", "intermediate = '/tmp/dummy_intermediate.txt'\n", "open(infile, 'w').write('I ate 5 cakes this morning.\\nLet\\'s eat 3 or 4 more.\\n')\n", "pipeline5.input = infile\n", "pipeline5.b = outfile\n", "pipeline5.intermediate = intermediate\n", "pipeline5()\n", "print(open(outfile).read())" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "deletable": true, "editable": true }, "outputs": [], "source": [ "# cleanup\n", "os.unlink(outfile)\n", "os.unlink(intermediate)" ] }, { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "To solve this issue, we need an element which conditionally bridges or breaks the link between *node1.output* and *intermediate*. This is the job of **optional output switches**:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "deletable": true, "editable": true }, "outputs": [], "source": [ "class Pipeline6(Pipeline):\n", " \n", " def pipeline_definition(self):\n", " self.add_process(\"node1\", '__main__.node_first')\n", " self.add_process(\"node2\", '__main__.node_second')\n", " self.add_optional_output_switch('b', 'input')\n", " self.add_link('node1.output->node2.a')\n", " self.add_link('node1.output->b.input_switch_b')\n", " self.export_parameter('b', 'b', 'intermediate', is_optional=True)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "deletable": true, "editable": true }, "outputs": [], "source": [ "pipeline6 = Pipeline6()\n", "if globals().get('use_gui', True):\n", " view6 = PipelineDevelopperView(pipeline6, allow_open_controller=True, show_sub_pipelines=True)\n", " view6.show()\n", " if run_qt_loop:\n", " app.exec_()" ] }, { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "![pipeline6](images/capsul_pipeline6.jpg \"pipeline6\")\n", "\n", "as long as pipeline6.intermediate is empty, the switch is off. *node1.output* gets a temporary value." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "deletable": true, "editable": true }, "outputs": [], "source": [ "pipeline6.input = infile\n", "pipeline6.b = outfile\n", "pipeline6()\n", "print(open(outfile).read())" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "deletable": true, "editable": true }, "outputs": [], "source": [ "# cleanup\n", "os.unlink(outfile)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "deletable": true, "editable": true }, "outputs": [], "source": [ "pipeline6.intermediate = intermediate" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "deletable": true, "editable": true }, "outputs": [], "source": [ "# the switch has been switched on automatically\n", "# node1.output gets the value assigned to pipeline6.intermediate\n", "if globals().get('use_gui', True):\n", " view6.show()\n", " if run_qt_loop:\n", " app.exec_()" ] }, { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "![pipeline6 connected](images/capsul_pipeline6b.jpg \"pipeline6 connected\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "deletable": true, "editable": true }, "outputs": [], "source": [ "pipeline6()\n", "print(open(intermediate).read())" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "deletable": true, "editable": true }, "outputs": [], "source": [ "# cleanup\n", "os.unlink(outfile)\n", "os.unlink(intermediate)\n", "os.unlink(infile)" ] }, { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "

StudyConfig

\n", "\n", "

StudyConfig is a placeholder for study-wide parameters, settings, paths and so on. It is a modular configuration tool, which has modules to configure some external software.\n", "

\n", "\n", "

A helper to configure state of the art medical software

\n", "\n", "Capsul propose a module to configure external software:\n", "\n", "\n", "\n", "With this module it is also possible to configure the execution of the pipeline:\n", "\n", "\n", "\n", "For instance:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "deletable": true, "editable": true }, "outputs": [], "source": [ "from capsul.api import StudyConfig\n", "# optional config modules, need not to be explicitly loaded\n", "# from capsul.study_config.config_modules.freesurfer_config import FreeSurferConfig\n", "# from capsul.study_config.config_modules.brainvisa_config import BrainVISAConfig\n", "\n", "default_config = {\"use_soma_workflow\": True}\n", "study_config = StudyConfig(initial_config=default_config, \n", " modules=StudyConfig.default_modules + \\\n", " ['BrainVISAConfig', 'FSLConfig', 'FomConfig'])\n", "\n", "# inspect config options\n", "for k in study_config.user_traits().keys(): print(k, ': ', getattr(study_config, k))" ] }, { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "Let show how to configure FSL:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "deletable": true, "editable": true }, "outputs": [], "source": [ "from capsul.api import StudyConfig\n", "\n", "study_config = StudyConfig(\n", " modules=[\"FSLConfig\"],\n", " fsl_config=\"/etc/fsl/5.0/fsl.sh\",\n", " use_smart_caching=True,\n", " output_directory=\"/tmp/capsul_demo\")\n", "print(study_config.run.__doc__)" ] }, { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "

Execution of the pipeline

\n", "\n", "In this section a simple execution is performed on your machine using one CPU (if more than one CPU are used it means that the called external software is parallelized). We just have to call the StudyConfig run method:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "deletable": true, "editable": true }, "outputs": [], "source": [ "study_config.reset_process_counter()\n", "study_config.run(pipeline2, verbose=1)" ] }, { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "

Distributed execution using Soma-Workflow

\n", "\n", "Capsul can execute a pipeline through Soma-Workflow in order to address large parallelized pipelines, or huge datasets in the case of population imaging studies.\n", "\n", "**Limitation:**\n", "\n", "Currently, when used in Soma-Workflow, process outputs can only be files. Output numbers, strings, etc. will be lost." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "deletable": true, "editable": true }, "outputs": [], "source": [ "class Process1(Process):\n", " f = Float(output=False)\n", "\n", " def __init__(self):\n", " super(Process1, self).__init__()\n", " self.add_trait(\"ff\", Float(output=False))\n", " self.add_trait(\"out\", Float(output=True))\n", " \n", " def get_commandline(self):\n", " return ['echo', 'Process1 execution, f: %f, ff: %f' % (self.f, self.ff)]\n", "\n", "class Process2(Process):\n", " \n", " def __init__(self):\n", " super(Process2, self).__init__()\n", " self.add_trait(\"a\", Float(output=True))\n", " self.add_trait(\"b\", Str(output=False))\n", " \n", " def get_commandline(self):\n", " return ['echo', 'Process2 execution, a: %f, b: %s' % (self.a, self.b)]\n", "\n", "class Pipeline2(Pipeline):\n", "\n", " def pipeline_definition(self):\n", " # Create processes\n", " self.add_process(\"node1\", Process1())\n", " self.add_process(\"node2\", Process2())\n", " # links\n", " self.add_link('node2.a->node1.ff')\n", " # custom exports\n", " self.export_parameter(\"node2\", \"b\", \"node_string\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "deletable": true, "editable": true }, "outputs": [], "source": [ "from capsul.pipeline.pipeline_workflow import workflow_from_pipeline\n", "\n", "pipeline2 = get_process_instance(Pipeline2)\n", "workflow = workflow_from_pipeline(pipeline2)\n", "print('jobs:', workflow.jobs)\n", "print('dependencies:', workflow.dependencies)" ] }, { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "The workwlow can be saved and reloaded in soma_workflow_gui, or used in a soma-workflow controller:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "deletable": true, "editable": true }, "outputs": [], "source": [ "from soma_workflow import client as swclient\n", "\n", "# save workflow to disk in json format\n", "swclient.Helper.serialize('/tmp/pipeline2.workflow', workflow)\n", "\n", "# run locally via a workflow controller\n", "wc = swclient.WorkflowController()\n", "wf_id = wc.submit_workflow(workflow)\n", "swclient.Helper.wait_workflow(wf_id, wc)\n", "print('execution status:', wc.workflow_status(wf_id))\n", "wc.delete_workflow(wf_id)" ] }, { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "ok if we don't use it anymore, we can remove this workflow file." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "deletable": true, "editable": true }, "outputs": [], "source": [ "import os\n", "os.unlink('/tmp/pipeline2.workflow')" ] }, { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "More transparently, Soma-Workflow execution can be used from StudyConfig:\n", "For this, StudyConfig should have \"SomaWorkflowConfig\" amongst its modules, and it should be enabled (which should be the default when the module is loaded in the config)." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "deletable": true, "editable": true }, "outputs": [], "source": [ "study_config = StudyConfig(modules=['SomaWorkflowConfig'])\n", "study_config.use_soma_workflow = True\n", "study_config.run(pipeline2)\n", "# if execution fails, an exception should be raised." ] }, { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "If we wanted more control over the executed workflow, we could have added things:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "deletable": true, "editable": true }, "outputs": [], "source": [ "study_config.somaworkflow_keep_succeeded_workflows = True\n", "wc = study_config.modules['SomaWorkflowConfig'].get_workflow_controller()\n", "if wc is not None:\n", " init_wf_list = wc.workflows().keys()\n", "else:\n", " init_wf_list = []\n", "try:\n", " study_config.run(pipeline2)\n", "except Exception as e:\n", " print('Workflow failed:', e)\n", "# get workflow after running\n", "wc = study_config.modules['SomaWorkflowConfig'].get_workflow_controller()\n", "wf_list = wc.workflows().keys()\n", "wf_id = [wf for wf in wf_list if wf not in init_wf_list][0]\n", "print('workflow id:', wf_id)\n", "wc.delete_workflow(wf_id)" ] }, { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "

Use thirdparty software in Capsul

\n", "\n", "Some thirdparty software need to be configured before they are called. StudyConfig is used to hold this\n", "configuration. There are a set of modules that can be modified to enable/disable configuration of thirdparty \n", "software.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "deletable": true, "editable": true }, "outputs": [], "source": [ "from capsul.api import StudyConfig\n", "\n", "print(StudyConfig.default_modules)" ] }, { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "In order to change modules that are used, one must give a full module list to StudyConfig():" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "deletable": true, "editable": true }, "outputs": [], "source": [ "study_config = StudyConfig(modules=StudyConfig.default_modules + ['BrainVISAConfig', 'NipypeConfig'])" ] }, { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "

Use FSL in Capsul

\n", "\n", "FSL is called via specific subprocess-like functions taking into account a \n", "configuration done in StudyConfig. If a StudyConfig is not\n", "configured to use FSL, it may be automatically configured. Automatic\n", "configuration had been tested in the two following cases :\n", "
    \n", "
  1. FSL was installed from the FMRIB site and, at least, FSLDIR \n", " environment variable is set (fsl.sh can be sourced or not)
  2. \n", "
  3. FSL was installed from Neurodebian packages
  4. \n", "
\n", "\n", "Automatic configuration is done lazily. To start it, one must either use `StudyConfig(use_fsl=True)` or use one of the calling functions of `capsul.subprocess.fsl`.\n", "\n", "For calling FSL command with `capsul.subprocess.fsl` module, the first argument of\n", "command line must be the FSL executable without any path nor prefix. \n", "Prefix are used in Neurodebian install. For instance on Ubuntu 16.04 \n", "Neurodebian FSL commands are prefixed with `fsl5.0-`.\n", "The appropriate path and eventually prefix are added from the configuration\n", "of the `StudyConfig` instance." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "deletable": true, "editable": true }, "outputs": [], "source": [ "import sys\n", "import os\n", "import os.path as osp\n", "import tempfile\n", "\n", "from traits.api import File, Undefined\n", "\n", "from capsul.api import StudyConfig, Process\n", "from capsul.in_context import fsl\n", " \n", "class Bet(Process):\n", " '''\n", " A process that calls bet command without any parameter except input and output files.\n", " '''\n", " input_image = File(optional=False, output=False)\n", " output_image = File(optional=False, output=True)\n", " \n", "\n", " def _run_process(self):\n", " fsl.fsl_check_call(['bet', self.input_image, self.output_image])\n", " \n", "try:\n", " study_config = StudyConfig(use_fsl=True)\n", "except EnvironmentError as e:\n", " # If FSL cannot be configured automatically, tells why\n", " print('FSL cannot be configured automatically:', str(e))\n", " study_config = None\n", "if study_config and study_config.use_fsl is True:\n", " with study_config.engine:\n", " # Try to find an image in FSL installation directory\n", " test_image = '/usr/share/data/fsl-mni152-templates/MNI152_T1_1mm_brain.nii.gz'\n", " if not osp.exists(test_image):\n", " fsl_dir = os.environ.get('FSLDIR')\n", " if not fsl_dir and study_config.fsl_config is not Undefined:\n", " fsl_dir = osp.dirname(osp.dirname(osp.dirname(study_config.fsl_config)))\n", " if fsl_dir:\n", " test_image = glob(osp.join(fsl_dir, 'fslpython/envs/fslpython/lib/python*/site-packages/nibabel/tests/data/anatomical.nii'))\n", " if test_image:\n", " test_image = test_image[0]\n", " else:\n", " print('FSL test data cannot be found')\n", " test_image = None\n", " \n", " if test_image:\n", " bet = study_config.get_process_instance(Bet)\n", " with tempfile.NamedTemporaryFile(suffix='.nii.gz') as tmp:\n", " bet.run(input_image=test_image, output_image=tmp.name)\n", " print('Output file size:', os.stat(bet.output_image).st_size)\n" ] }, { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "

Use Nipype in Capsul

\n", "\n", "It is possible to use all the nipype interfaces (FSL, SPM, FreeSurfer, ...) as building blocks in Capsul. This step requires nipype to be properly installed as well as the software we want to use. For instance if we want to perform a brain extraction with FSL we can simply write:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "deletable": true, "editable": true }, "outputs": [], "source": [ "from capsul.api import StudyConfig, get_process_instance\n", "from soma.path import find_in_path\n", "\n", "study_config = StudyConfig(modules=StudyConfig.default_modules + ['NipypeConfig'])\n", "\n", "# It is necessary to have FLS commands (such as \"bet\") in\n", "# the PATH in order to use FSL via Nipype\n", "if study_config.use_nipype and find_in_path('bet'):\n", " betpipe = get_process_instance(\"nipype.interfaces.fsl.BET\")\n", " betpipe.get_help()\n", " betpipe.in_file=\"/tmp/MNI152_T1_2mm.nii.gz\"\n", " betpipe.output_directory = '/tmp'\n", "else:\n", " betpipe = None\n", " print('NiPype is not present.')" ] }, { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "As shown it is possible to set the BET algorithm input parameters. Note that in capsul the standard nipype outputs are prefixed with underscores. We can execute this Process but unfortunately, as mentioned by the nipype warnings, FSL needs to be configured in the study confit, otherwise the pipeline will not run. As we have done it above, we can run it:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "deletable": true, "editable": true }, "outputs": [], "source": [ "if betpipe:\n", " study_config.reset_process_counter()\n", " study_config.run(betpipe, verbose=1)" ] }, { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "

Parameters completion using FOM (File Organization Model)

\n", "\n", "FOMs allow to complete file names in large pipelines with many parameters from a small set of attributes. To illustrate this feature, we will first create a pipeline with several such parameters, from a XML description." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "deletable": true, "editable": true }, "outputs": [], "source": [ "from capsul.pipeline.xml import create_xml_pipeline\n", "\n", "xmlstr = '''\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "'''\n", "\n", "DemoPipeline = create_xml_pipeline('__main__', 'DemoPipeline', xmlstr)\n", "pipeline = DemoPipeline()" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "deletable": true, "editable": true }, "outputs": [], "source": [ "if globals().get('use_gui', True):\n", " from capsul.qt_gui.widgets import PipelineDevelopperView\n", "\n", " pv = PipelineDevelopperView(pipeline, allow_open_controller=True, show_sub_pipelines=True)\n", " pv.show()\n", "\n", " if run_qt_loop:\n", " app.exec_()\n" ] }, { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "![pipeline](images/capsul_pipeline7.jpg \"pipeline\")\n", "\n", "## FOM definition" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "deletable": true, "editable": true }, "outputs": [], "source": [ "fom_content = '''{\n", " \"fom_name\": \"demo_fom\",\n", "\n", " \"formats\": {\n", " \"NIFTI\": \"nii\",\n", " \"NIFTI gz\": \"nii.gz\"\n", " },\n", " \"format_lists\": {\n", " \"images\": [\"NIFTI gz\", \"NIFTI\"]\n", " },\n", "\n", " \"shared_patterns\": {\n", " \"subject_dir\": \"
_\"\n", " },\n", "\n", " \"processes\": {\n", " \"DemoPipeline\": {\n", " \"input_image1\": \n", " [[\"input:
_/\", \"images\"]],\n", " \"input_image2\":\n", " [[\"input:{subject_dir}/alt2_\", \"images\"]],\n", " \"input_image3\":\n", " [[\"input:{subject_dir}/alt2_\", \"images\"]],\n", " \"input_image4\":\n", " [[\"input:{subject_dir}/alt4_\", \"images\"]],\n", " \"image_out1\":\n", " [[\"output:{subject_dir}/out_image__1\", \"images\"]],\n", " \"image_out3\":\n", " [[\"output:{subject_dir}/out_image__3\", \"images\"]]\n", " },\n", " \"DemoPipeline.proc4\": {\n", " \"output_image\": [[\"output:{subject_dir}/out_image__4\", \"images\"]]\n", " }\n", " }\n", "\n", "}\n", "'''\n", "\n", "try: os.path.makedirs('/tmp/capsul_demo')\n", "except: pass\n", "open('/tmp/capsul_demo/demo_fom.json', 'w').write(fom_content)\n" ] }, { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "

File names completion using FOM

\n", "FOM completion is used through the attributes completion system ProcessCompletionEngine." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "deletable": true, "editable": true }, "outputs": [], "source": [ "import sys\n", "import os\n", "from capsul.api import StudyConfig\n", "from capsul.attributes.completion_engine import ProcessCompletionEngine\n", "if globals().get('use_gui', True):\n", " from soma.qt_gui.controller_widget import ScrollControllerWidget\n", " from capsul.qt_gui.widgets.attributed_process_widget import AttributedProcessWidget\n", "\n", "# the following 4 lines are a hack to add /tmp/capsul_demo to the FOM search path\n", "# before it is used by StudyConfig\n", "from soma.application import Application\n", "soma_app = Application('capsul', plugin_modules=['soma.fom'])\n", "soma_app.initialize()\n", "soma_app.fom_manager.paths.append('/tmp/capsul_demo')\n", "\n", "config = {\n", " \"name\" : \"morphologist_fom\",\n", " \"input_directory\" : \"/data/capsul_demo\",\n", " \"output_directory\" : \"/data/capsul_demo\",\n", " \"input_fom\" : \"demo_fom\",\n", " \"output_fom\" : \"demo_fom\",\n", " \"use_soma_workflow\" : True,\n", " \"use_fom\" : True,\n", " \"volumes_format\" : \"nii.gz\",\n", " \"meshes_format\" : \"gii\",\n", "}\n", "\n", "study_config = StudyConfig(init_config=config, \n", " modules=StudyConfig.default_modules + ['FomConfig', 'BrainVISAConfig'])\n", "soma_app.fom_manager._cache = None # while debugging\n", "\n", "mp = study_config.get_process_instance(DemoPipeline)\n", "\n", "ce = ProcessCompletionEngine.get_completion_engine(mp)\n", "print('completion engine type:', type(ce).__name__)\n", "attributes = ce.get_attribute_values()\n", "ce.install_auto_completion()\n", "attributes.center = 'subjects'\n", "attributes.subject = 'irm2'\n", "\n", "if globals().get('use_gui', True):\n", " # pipeline controller GUI\n", " p_view = ScrollControllerWidget(mp, live=True)\n", " p_view.show()\n", " # attributed pipeline controller GUI\n", " ap_view = AttributedProcessWidget(mp)\n", " ap_view.show()\n", "\n", " pv = PipelineDevelopperView(mp, allow_open_controller=True, show_sub_pipelines=True)\n", " pv.show()\n", "\n", " if run_qt_loop:\n", " app.exec_()\n" ] }, { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "![pipeline with FOM](images/capsul_fom1.jpg \"pipeline with FOM\")\n", "![completion controller](images/capsul_fom1_compl.jpg \"completion controller\")\n", "![controller](images/capsul_fom1_controller.jpg \"controller\")\n", "\n", "

Note how the output \"image_out5\" depends on the proc_select2 switch value:

\n", "

While \"image_out1\" is fixed via the FOM completion, its value \"back-propagates\" to both \"proc1.output_image\" and \"proc2.output_image\". For \"image_out5\" the FOM does not impose its value, it is deduced from either \"proc3.output_image\" (in turn set via the global \"image_out3\") or \"proc4.output_image\", depending on the proc_select2 swtch value.

" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "deletable": true, "editable": true }, "outputs": [], "source": [ "mp.proc_select2 = \"proc3\"\n", "print(\"switch proc_select2 value:\", mp.proc_select2)\n", "print(\"output image_out5:\", mp.image_out5)\n", "mp.proc_select2 = \"proc4\"\n", "print(\"switch proc_select2 value:\", mp.proc_select2)\n", "print(\"output image_out5:\", mp.image_out5)" ] }, { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "![changed pipeline](images/capsul_fom2.jpg \"changed pipeline\")\n", "![changed completion controller](images/capsul_fom2_compl.jpg \"changed completion controller\")\n", "\n", "## Now Cleanup the files we have created..." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "deletable": true, "editable": true }, "outputs": [], "source": [ "import shutil\n", "shutil.rmtree('/tmp/capsul_demo')" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython3", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.5.2" } }, "nbformat": 4, "nbformat_minor": 1 }