# -*- coding: utf-8 -*-
'''
`Soma-Workflow <http://brainvisa.info/soma-workflow/>`_ configuration module
Classes
=======
:class:`ResourceController`
---------------------------
:class:`SomaWorkflowConfig`
---------------------------
'''
from __future__ import absolute_import
from traits.api import Bool, Str, Undefined, List, Dict, File
from capsul.study_config.study_config import StudyConfigModule
from soma.controller import Controller, ControllerTrait, OpenKeyController
[docs]class ResourceController(Controller):
''' Configuration options for one Soma-Workflow computing resource
Attributes
----------
queue: str
Jobs queue to be used on the computing resource for w
orkflow submissions
transfer_paths: list(str)
list of paths where files have to be transferred by soma-workflow
path_translations: dict(str, (str, str))
Soma-workflow paths translations mapping:
``{local_path: (identifier, uuid)}``
'''
def __init__(self):
super(ResourceController, self).__init__()
self.add_trait(
'queue',
Str(Undefined, output=False,
desc='Jobs queue to be used on the computing resource for '
'workflow submissions'))
self.add_trait(
'transfer_paths', List(
[],
output=False,
desc='list of paths where files have to be transferred '
'by soma-workflow'))
self.add_trait(
'path_translations',
ControllerTrait(
OpenKeyController(
value_trait=List(trait=Str(), value=('', ''),
minlen=2, maxlen=2)),
output=False,
desc='Soma-workflow paths translations mapping: '
'{local_path: (identifier, uuid)}'))
[docs]class SomaWorkflowConfig(StudyConfigModule):
''' Configuration module for :somaworkflow:`Soma-Workflow <index.html>`
Stores configuration options which are not part of Soma-Workflow own
configuration, and used to run workflows from CAPSUL pipelines.
The configuration module may also store connected Soma-Workflow
:somaworkflow:`WorkflowController <client_API.html>` objects to allow
monitoring and submiting workflows.
Attributes
----------
use_soma_workflow: bool
Use soma workflow for the execution
somaworkflow_computing_resource: str
Soma-workflow computing resource to be used to run processing
somaworkflow_config_file: filename
Soma-Workflow configuration file. Default: ``$HOME/.soma_workflow.cfg``
somaworkflow_keep_failed_workflows: bool
Keep failed workflows after pipeline execution through StudyConfig
somaworkflow_keep_succeeded_workflows: bool
Keep succeeded workflows after pipeline execution through StudyConfig
somaworkflow_computing_resources_config: dict(str, ResourceController)
Computing resource config dict, keys are resource ids. Values are
:py:class:`ResourceController` instances
Methods
-------
get_resource_id
set_computing_resource_password
get_workflow_controller
connect_resource
disconnect_resource
'''
def __init__(self, study_config, configuration):
super(SomaWorkflowConfig, self).__init__(study_config, configuration)
study_config.add_trait('use_soma_workflow', Bool(
False,
output=False,
desc='Use soma workflow for the execution',
groups=['soma-workflow']))
study_config.add_trait(
'somaworkflow_computing_resource',
Str(
Undefined,
output=False,
desc='Soma-workflow computing resource to be used to run processing',
groups=['soma-workflow']))
study_config.add_trait(
'somaworkflow_config_file',
File(Undefined, output=False, optional=True,
desc='Soma-Workflow configuration file. '
'Default: $HOME/.soma_workflow.cfg',
groups=['soma-workflow']))
study_config.add_trait(
'somaworkflow_keep_failed_workflows',
Bool(
True,
desc='Keep failed workflows after pipeline execution through '
'StudyConfig',
groups=['soma-workflow']))
study_config.add_trait(
'somaworkflow_keep_succeeded_workflows',
Bool(
False,
desc='Keep succeeded workflows after pipeline execution '
'through StudyConfig',
groups=['soma-workflow']))
study_config.add_trait(
'somaworkflow_computing_resources_config',
ControllerTrait(
OpenKeyController(
value_trait=ControllerTrait(
ResourceController(),
output=False, allow_none=False,
desc='Computing resource config')),
output=False, allow_none=False,
desc='Computing resource config',
groups=['soma-workflow']))
self.study_config.modules_data.somaworkflow = {}
def initialize_callbacks(self):
self.study_config.on_trait_change(
self.initialize_module, 'use_soma_workflow')
[docs] def get_resource_id(self, resource_id=None, set_it=False):
''' Get a computing resource name according to the (optional) input
resource_id and Soma-Workflow configuration.
For instance, ``None`` or ``"localhost"`` will be transformed to the
local host id.
Parameters
----------
resource_id: str (optional)
set_it: bool (optional)
if True, the computed resource id will be set as the current
resource in the somaworkflow_computing_resource trait value.
Returns
-------
computed resource id.
'''
if resource_id is None:
resource_id = self.study_config.somaworkflow_computing_resource
else:
self.study_config.somaworkflow_computing_resource = resource_id
if resource_id in (None, Undefined, 'localhost'):
from soma_workflow import configuration as swcf
resource_id = swcf.Configuration.get_local_resource_id()
if set_it:
if resource_id is None:
import socket
resource_id = socket.gethostname()
self.study_config.somaworkflow_computing_resource = resource_id
return resource_id
[docs] def set_computing_resource_password(self, resource_id, password=None,
rsa_key_password=None):
''' Set credentials for a given computinf resource.
Such credentials are stored in the config object, but will not be
written when the config is saved in a file. They are thus non-
persistant.
Parameters
----------
resource_id: str (optional)
password: str (optional)
rsa_key_password: str (optional)
'''
resource_id = self.get_resource_id(resource_id)
r = self.study_config.modules_data.somaworkflow.setdefault(
resource_id, Controller())
if password:
r.password = password
if rsa_key_password:
r.rsa_key_password = rsa_key_password
[docs] def get_workflow_controller(self, resource_id=None):
''' Get a connected
:somaworkflow:`WorkflowController <client_API.html>` for the given
resource
'''
resource_id = self.get_resource_id(resource_id)
r = self.study_config.modules_data.somaworkflow.setdefault(
resource_id, Controller())
wc = getattr(r, 'workflow_controller', None)
return wc
[docs] def connect_resource(self, resource_id=None, force_reconnect=False):
''' Connect a soma-workflow computing resource.
Sets the current resource to the given resource_id (transformed by
get_resource_id() if None or "localhost" is given, for instance).
Parameters
----------
resource_id: str (optional)
resource name, may be None or "localhost". If None, the current
one (study_config.somaworkflow_computing_resource) is used, or
the localhost if none is configured.
force_reconnect: bool (optional)
if True, if an existing workflow controller is already connected,
it will be disconnected (deleted) and a new one will be connected.
If False, an existing controller will be reused without
reconnection.
Returns
-------
:somaworkflow:`WorkflowController <client_API.html>` object
'''
import soma_workflow.client as swclient
resource_id = self.get_resource_id(resource_id, True)
if force_reconnect:
self.disconnect_resource(resource_id)
r = self.study_config.modules_data.somaworkflow.setdefault(
resource_id, Controller())
if not force_reconnect:
wc = self.get_workflow_controller(resource_id)
if wc is not None:
return wc
conf_file = self.study_config.somaworkflow_config_file
if conf_file in (None, Undefined):
conf_file \
= swclient.configuration.Configuration.search_config_path()
login = swclient.configuration.Configuration.get_logins(
conf_file).get(resource_id)
config = getattr(r, 'config', None)
if config is None:
config = swclient.configuration.Configuration.load_from_file(
config_file_path=conf_file)
r.config = config
password = getattr(r, 'password', None)
rsa_key_pass = getattr(r, 'rsa_key_password', None)
wc = swclient.WorkflowController(
resource_id=resource_id,
login=login,
password=password,
rsa_key_pass=rsa_key_pass,
config=config)
r.workflow_controller = wc
return wc
[docs] def disconnect_resource(self, resource_id=None):
''' Disconnect a connected
:somaworkflow:`WorkflowController <client_API.html>` and removes it
from the internal list
'''
resource_id = self.get_resource_id(resource_id, True)
wc = self.get_workflow_controller(resource_id)
if wc:
wc.disconnect()
del self.study_config.modules_data.somaworkflow[
self.study_config.somaworkflow_computing_resource
].workflow_controller