Source code for capsul.engine.database_populse
# -*- coding: utf-8 -*-
import os.path as osp
from capsul.engine.database import DatabaseEngine
from populse_db.storage import Storage
schemas = [
{
"version": "1.0.0",
"schema": {
"named_directory": [
{
"name": [str, {"primary_key": True}],
"path": str,
}
],
"json_value": [
{
"name": [str, {"primary_key": True}],
"json_dict": dict,
}
],
"path_metadata": [
{
"path": [str, {"primary_key": True}],
"named_directory": str,
}
],
"metadata": [
{
"path": [str, {"primary_key": True}],
"subject": str,
"time_point": str,
"history": list[str], # contains a list of execution_id
}
],
},
},
]
[docs]
class PopulseDBEngine(DatabaseEngine):
def __init__(self, database_engine):
self.storage = Storage(database_engine)
with self.storage.schema() as schema:
schema.add_schema("capsul.engine.database_populse")
def __del__(self):
self.close()
def close(self):
self.storage = None
[docs]
def set_named_directory(self, name, path):
if path:
path = osp.normpath(osp.abspath(path))
with self.storage.data(write=True) as db:
if path:
db.named_directory[name] = path
else:
del db.named_directory[name]
[docs]
def named_directory(self, name):
with self.storage.data() as db:
return db.named_directory[name].path.get()
[docs]
def named_directories(self):
with self.storage.data() as db:
for row in db.named_directory.search(fields=["name"], as_list=True):
yield row[0]
[docs]
def set_json_value(self, name, json_value):
with self.storage.data(write=True) as db:
db["json_value"][name].json_dict = json_value
[docs]
def json_value(self, name):
with self.storage.data(write=True) as db:
return db["json_value"][name].json_dict.get()
[docs]
def set_path_metadata(self, path, metadata):
named_directory = metadata.get("named_directory")
if named_directory:
base_path = self.named_directory("capsul_engine")
if base_path:
if not path.startswith(named_directory):
raise ValueError(
'Path "%s" is defined as relative to named directory %s but it does not start with "%s"'
% (path, named_directory, base_path)
)
path = path[len(base_path) + 1 :]
else:
if osp.isabs(path):
raise ValueError(
'Cannot determine relative path for "%s" because its base named directory "%s" is unknown'
% (path, named_directory)
)
else:
if osp.isabs(path):
for nd in self.named_directories():
if path.startswith(nd.path):
named_directory = nd.name()
path = path[len(nd.path) + 1]
break
else:
named_directory = None
else:
# capsul_engine is the default named directory for relative paths
named_directory = "capsul_engine"
doc = metadata.copy()
doc["path"] = path
if named_directory:
doc["named_directory"] = named_directory
with self.db as dbs:
dbs.add_document("path_metadata", doc)
[docs]
def path_metadata(self, path):
if osp.isabs(path):
for nd in self.named_directories():
if path.startswith(nd.path):
path = path[len(nd.path) + 1 :]
break
with self.db as dbs:
return dbs.get_document("path_metadata", path)