Source code for spinetoolbox.project
######################################################################################################################
# Copyright (C) 2017-2020 Spine project consortium
# This file is part of Spine Toolbox.
# Spine Toolbox is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option)
# any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
# Public License for more details. You should have received a copy of the GNU Lesser General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
######################################################################################################################
"""
Spine Toolbox project class.
:authors: P. Savolainen (VTT), E. Rinne (VTT)
:date: 10.1.2018
"""
import os
import json
from PySide2.QtCore import Slot, Signal
from PySide2.QtWidgets import QMessageBox
from spine_engine import SpineEngine, SpineEngineState
from .metaobject import MetaObject
from .helpers import create_dir, inverted, erase_dir
from .config import LATEST_PROJECT_VERSION, PROJECT_FILENAME
from .dag_handler import DirectedGraphHandler
from .project_tree_item import LeafProjectTreeItem
from .spine_db_manager import SpineDBManager
from .project_commands import (
SetProjectNameCommand,
SetProjectDescriptionCommand,
AddProjectItemsCommand,
RemoveProjectItemCommand,
RemoveAllProjectItemsCommand,
)
[docs]class SpineToolboxProject(MetaObject):
"""Class for Spine Toolbox projects."""
"""Emitted just before the entire project is executed."""
def __init__(
self,
toolbox,
name,
description,
p_dir,
project_item_model,
settings,
embedded_julia_console,
embedded_python_console,
logger,
):
"""
Args:
toolbox (ToolboxUI): toolbox of this project
name (str): Project name
description (str): Project description
p_dir (str): Project directory
project_item_model (ProjectItemModel): project item tree model
settings (QSettings): Toolbox settings
embedded_julia_console (JuliaConsoleWidget): a Julia console widget for execution in the embedded console
embedded_python_console (PythonConsoleWidget): a Python console widget for execution in the embedded console
logger (LoggerInterface): a logger instance
"""
super().__init__(name, description)
self._toolbox = toolbox
self._project_item_model = project_item_model
self._logger = logger
self._settings = settings
self._embedded_julia_console = embedded_julia_console
self._embedded_python_console = embedded_python_console
self.dag_handler = DirectedGraphHandler()
self.db_mngr = SpineDBManager(settings, logger, self)
self.engine = None
self._execution_stopped = True
self._dag_execution_list = None
self._dag_execution_permits_list = None
self._dag_execution_index = None
self.project_dir = None # Full path to project directory
self.config_dir = None # Full path to .spinetoolbox directory
self.items_dir = None # Full path to items directory
self.config_file = None # Full path to .spinetoolbox/project.json file
self._toolbox.undo_stack.clear()
if not self._create_project_structure(p_dir):
self._logger.msg_error.emit("Creating project directory " "structure to <b>{0}</b> failed".format(p_dir))
[docs] def connect_signals(self):
"""Connect signals to slots."""
self.dag_handler.dag_simulation_requested.connect(self.notify_changes_in_dag)
self.dag_execution_finished.connect(self.execute_next_dag)
[docs] def _create_project_structure(self, directory):
"""Makes the given directory a Spine Toolbox project directory.
Creates directories and files that are common to all projects.
Args:
directory (str): Abs. path to a directory that should be made into a project directory
Returns:
bool: True if project structure was created successfully, False otherwise
"""
self.project_dir = directory
self.config_dir = os.path.abspath(os.path.join(self.project_dir, ".spinetoolbox"))
self.items_dir = os.path.abspath(os.path.join(self.config_dir, "items"))
self.config_file = os.path.abspath(os.path.join(self.config_dir, PROJECT_FILENAME))
try:
create_dir(self.project_dir) # Make project directory
except OSError:
self._logger.msg_error.emit("Creating directory {0} failed".format(self.project_dir))
return False
try:
create_dir(self.config_dir) # Make project config directory
except OSError:
self._logger.msg_error.emit("Creating directory {0} failed".format(self.config_dir))
return False
try:
create_dir(self.items_dir) # Make project items directory
except OSError:
self._logger.msg_error.emit("Creating directory {0} failed".format(self.items_dir))
return False
return True
[docs] def call_set_name(self, name):
self._toolbox.undo_stack.push(SetProjectNameCommand(self, name))
[docs] def call_set_description(self, description):
self._toolbox.undo_stack.push(SetProjectDescriptionCommand(self, description))
[docs] def set_name(self, name):
"""Changes project name.
Args:
name (str): New project name
"""
super().set_name(name)
self._toolbox.update_window_title()
# Remove entry with the old name from File->Open recent menu
self._toolbox.remove_path_from_recent_projects(self.project_dir)
# Add entry with the new name back to File->Open recent menu
self._toolbox.update_recent_projects()
self._logger.msg.emit("Project name changed to <b>{0}</b>".format(self.name))
[docs] def set_description(self, description):
super().set_description(description)
msg = "Project description "
if description:
msg += f"changed to <b>{description}</b>"
else:
msg += "cleared"
self._logger.msg.emit(msg)
@staticmethod
[docs] def get_connections(links):
connections = list()
for link in links:
src_connector = link.src_connector
src_anchor = src_connector.position
src_name = src_connector.parent_name()
dst_connector = link.dst_connector
dst_anchor = dst_connector.position
dst_name = dst_connector.parent_name()
conn = {"from": [src_name, src_anchor], "to": [dst_name, dst_anchor]}
connections.append(conn)
return connections
[docs] def save(self, spec_paths):
"""Collects project information and objects
into a dictionary and writes it to a JSON file.
Args:
spec_paths (list): List of absolute paths to specification files
Returns:
bool: True or False depending on success
"""
project_dict = dict() # Dictionary for storing project info
project_dict["version"] = LATEST_PROJECT_VERSION
project_dict["name"] = self.name
project_dict["description"] = self.description
project_dict["specifications"] = dict()
project_dict["specifications"]["Tool"] = spec_paths
# Compute connections directly from Links on scene
project_dict["connections"] = self.get_connections(self._toolbox.ui.graphicsView.links())
items_dict = dict() # Dictionary for storing project items
# Traverse all items in project model by category
for category_item in self._project_item_model.root().children():
category = category_item.name
# Store item dictionaries with item name as key and item_dict as value
for item in self._project_item_model.items(category):
items_dict[item.name] = item.project_item.item_dict()
# Save project to file
saved_dict = dict(project=project_dict, items=items_dict)
# Write into JSON file
with open(self.config_file, "w") as fp:
json.dump(saved_dict, fp, indent=4)
return True
[docs] def load(self, items_dict):
"""Populates project item model with items loaded from project file.
Args:
items_dict (dict): Dictionary containing all project items in JSON format
Returns:
bool: True if successful, False otherwise
"""
self._logger.msg.emit("Loading project items...")
empty = True
items_by_type = dict() # Key: item_type, value: list of items
for item_name, item_dict in items_dict.items():
empty = False
item_type = item_dict.pop("type") # ProjectItem constructors do not need this
item_dict["name"] = item_name # ProjectItem constructors need this
items_by_type.setdefault(item_type, list()).append(item_dict)
for item_type, items in items_by_type.items():
if not self.make_and_add_project_items(item_type, items, verbosity=False):
return False
if empty:
self._logger.msg_warning.emit("Project has no items")
return True
[docs] def add_project_items(self, item_type, *items, set_selected=False, verbosity=True):
"""Pushes an AddProjectItemsCommand to the toolbox undo stack.
"""
if not items:
return
self._toolbox.undo_stack.push(
AddProjectItemsCommand(self, item_type, items, set_selected=set_selected, verbosity=verbosity)
)
[docs] def make_project_tree_items(self, item_type, items):
"""Creates and returns a dictionary mapping category indexes to a list of corresponding LeafProjectTreeItem instances.
Args:
item_type (str): item type
items (list): one or more dicts of items to add
Returns:
dict(QModelIndex, list(LeafProjectTreeItem))
"""
factory = self._toolbox.item_factories.get(item_type)
if factory is None:
self._logger.msg_error.emit(f"Unknown item type <b>{item_type}</b>")
for item in items:
self._logger.msg_error.emit(f"Loading project item <b>{item['name']}</b> failed")
return {None: None}
project_items_by_category = {}
for item_dict in items:
try:
project_item = factory.make_item(self._toolbox, self, self._logger, **item_dict)
except TypeError:
self._logger.msg_error.emit(
f"Creating <b>{item_type}</b> project item <b>{item_dict['name']}</b> failed. "
"This is most likely caused by an outdated project file."
)
continue
except KeyError as error:
self._logger.msg_error.emit(
f"Creating <b>{item_type}</b> project item <b>{item_dict['name']}</b> failed. "
f"This is most likely caused by an outdated or corrupted project file "
f"(missing JSON key: {str(error)})."
)
continue
project_items_by_category.setdefault(project_item.item_category(), list()).append(project_item)
project_tree_items = {}
for category, project_items in project_items_by_category.items():
category_ind = self._project_item_model.find_category(category)
# NOTE: category_ind might be None, and needs to be handled caller side
project_tree_items[category_ind] = [
LeafProjectTreeItem(project_item, self._toolbox) for project_item in project_items
]
return project_tree_items
[docs] def do_add_project_tree_items(self, category_ind, *project_tree_items, set_selected=False, verbosity=True):
"""Adds LeafProjectTreeItem instances to project.
Args:
category_ind (QModelIndex): The category index
project_tree_items (LeafProjectTreeItem): one or more LeafProjectTreeItem instances to add
set_selected (bool): Whether to set item selected after the item has been added to project
verbosity (bool): If True, prints message
"""
for project_tree_item in project_tree_items:
project_item = project_tree_item.project_item
self._project_item_model.insert_item(project_tree_item, category_ind)
factory = self._toolbox.item_factories[project_item.item_type()]
factory.activate_project_item(self._toolbox, project_item)
# Append new node to networkx graph
self.add_to_dag(project_item.name)
if verbosity:
self._logger.msg.emit(
"{0} <b>{1}</b> added to project".format(project_item.item_type(), project_item.name)
)
if set_selected:
item = list(project_tree_items)[-1]
self.set_item_selected(item)
[docs] def set_item_selected(self, item):
"""
Selects the given item.
Args:
item (LeafProjectTreeItem)
"""
ind = self._project_item_model.find_item(item.name)
self._toolbox.ui.treeView_project.setCurrentIndex(ind)
[docs] def make_and_add_project_items(self, item_type, items, set_selected=False, verbosity=True):
"""Adds items to project at loading.
Args:
item_type (str): Item type e.g. "Tool"
items (list): one or more dict of items to add
set_selected (bool): Whether to set item selected after the item has been added to project
verbosity (bool): If True, prints message
"""
for category_ind, project_tree_items in self.make_project_tree_items(item_type, items).items():
if not category_ind:
continue
self.do_add_project_tree_items(
category_ind, *project_tree_items, set_selected=set_selected, verbosity=verbosity
)
return True
[docs] def add_to_dag(self, item_name):
"""Add new node (project item) to the directed graph."""
self.dag_handler.add_dag_node(item_name)
[docs] def remove_all_items(self):
"""Pushes a RemoveAllProjectItemsCommand to the toolbox undo stack."""
items_per_category = self._project_item_model.items_per_category()
if not any(v for v in items_per_category.values()):
self._logger.msg.emit("No project items to remove")
return
delete_data = int(self._settings.value("appSettings/deleteData", defaultValue="0")) != 0
msg = "Remove all items from project?"
if not delete_data:
msg += "Item data directory will still be available in the project directory after this operation."
else:
msg += "<br><br><b>Warning: Item data will be permanently lost after this operation.</b>"
message_box = QMessageBox(
QMessageBox.Question,
"Remove All Items",
msg,
buttons=QMessageBox.Ok | QMessageBox.Cancel,
parent=self._toolbox,
)
message_box.button(QMessageBox.Ok).setText("Remove Items")
answer = message_box.exec_()
if answer != QMessageBox.Ok:
return
links = self._toolbox.ui.graphicsView.links()
self._toolbox.undo_stack.push(
RemoveAllProjectItemsCommand(self, items_per_category, links, delete_data=delete_data)
)
[docs] def remove_item(self, name, check_dialog=False):
"""Pushes a RemoveProjectItemCommand to the toolbox undo stack.
Args:
name (str): Item's name
check_dialog (bool): If True, shows 'Are you sure?' message box
"""
delete_data = int(self._settings.value("appSettings/deleteData", defaultValue="0")) != 0
if check_dialog:
msg = "Remove item <b>{}</b> from project? ".format(name)
if not delete_data:
msg += "Item data directory will still be available in the project directory after this operation."
else:
msg += "<br><br><b>Warning: Item data will be permanently lost after this operation.</b>"
msg += "<br><br>Tip: Remove items by pressing 'Delete' key to bypass this dialog."
# noinspection PyCallByClass, PyTypeChecker
message_box = QMessageBox(
QMessageBox.Question,
"Remove Item",
msg,
buttons=QMessageBox.Ok | QMessageBox.Cancel,
parent=self._toolbox,
)
message_box.button(QMessageBox.Ok).setText("Remove Item")
answer = message_box.exec_()
if answer != QMessageBox.Ok:
return
self._toolbox.undo_stack.push(RemoveProjectItemCommand(self, name, delete_data=delete_data))
[docs] def do_remove_item(self, name):
"""Removes item from project given its name.
This method is used when closing the existing project for opening a new one.
Args:
name (str): Item's name
"""
ind = self._project_item_model.find_item(name)
category_ind = ind.parent()
item = self._project_item_model.item(ind)
self._remove_item(category_ind, item)
[docs] def _remove_item(self, category_ind, item, delete_data=False):
"""Removes LeafProjectTreeItem from project.
Args:
category_ind (QModelIndex): The category index
item (LeafProjectTreeItem): the item to remove
delete_data (bool): If set to True, deletes the directories and data associated with the item
"""
try:
data_dir = item.project_item.data_dir
except AttributeError:
data_dir = None
# Remove item from project model
if not self._project_item_model.remove_item(item, parent=category_ind):
self._logger.msg_error.emit("Removing item <b>{0}</b> from project failed".format(item.name))
# Remove item icon and connected links (QGraphicsItems) from scene
icon = item.project_item.get_icon()
self._toolbox.ui.graphicsView.remove_icon(icon)
self.dag_handler.remove_node_from_graph(item.name)
item.project_item.tear_down()
if delete_data:
if data_dir:
# Remove data directory and all its contents
self._logger.msg.emit("Removing directory <b>{0}</b>".format(data_dir))
try:
if not erase_dir(data_dir):
self._logger.msg_error.emit("Directory does not exist")
except OSError:
self._logger.msg_error.emit("[OSError] Removing directory failed. Check directory permissions.")
self._logger.msg.emit("Item <b>{0}</b> removed from project".format(item.name))
[docs] def execute_dags(self, dags, execution_permits):
"""Executes given dags.
Args:
dags (Sequence(DiGraph))
execution_permits (Sequence(dict))
"""
self._execution_stopped = False
self._dag_execution_list = list(dags)
self._dag_execution_permits_list = list(execution_permits)
self._dag_execution_index = 0
self.execute_next_dag()
@Slot()
[docs] def execute_next_dag(self):
"""Executes next dag in the execution list."""
if self._execution_stopped:
return
try:
dag = self._dag_execution_list[self._dag_execution_index]
execution_permits = self._dag_execution_permits_list[self._dag_execution_index]
except IndexError:
return
dag_identifier = f"{self._dag_execution_index + 1}/{len(self._dag_execution_list)}"
self.execute_dag(dag, execution_permits, dag_identifier)
self._dag_execution_index += 1
self.dag_execution_finished.emit()
[docs] def execute_dag(self, dag, execution_permits, dag_identifier):
"""Executes given dag.
Args:
dag (DiGraph): Executed DAG
execution_permits (dict): Dictionary, where keys are node names in dag and value is a boolean
dag_identifier (str): Identifier number for printing purposes
"""
node_successors = self.dag_handler.node_successors(dag)
if not node_successors:
self._logger.msg_warning.emit("<b>Graph {0} is not a Directed Acyclic Graph</b>".format(dag_identifier))
self._logger.msg.emit("Items in graph: {0}".format(", ".join(dag.nodes())))
edges = ["{0} -> {1}".format(*edge) for edge in self.dag_handler.edges_causing_loops(dag)]
self._logger.msg.emit(
"Please edit connections in Design View to execute it. "
"Possible fix: remove connection(s) {0}.".format(", ".join(edges))
)
return
items = [self._project_item_model.get_item(name).project_item.execution_item() for name in node_successors]
self.engine = SpineEngine(items, node_successors, execution_permits)
self.engine.dag_node_execution_started.connect(self._toolbox.ui.graphicsView._start_animation)
self.engine.dag_node_execution_finished.connect(self._toolbox.ui.graphicsView._stop_animation)
self.engine.dag_node_execution_finished.connect(self._toolbox.ui.graphicsView._run_leave_animation)
self.engine.dag_node_execution_finished.connect(self._handle_dag_node_execution_finished)
self._logger.msg.emit("<b>Starting DAG {0}</b>".format(dag_identifier))
self._logger.msg.emit("Order: {0}".format(" -> ".join(list(node_successors))))
self.engine.run()
outcome = {
SpineEngineState.USER_STOPPED: "stopped by the user",
SpineEngineState.FAILED: "failed",
SpineEngineState.COMPLETED: "completed successfully",
}[self.engine.state()]
self._logger.msg.emit("<b>DAG {0} {1}</b>".format(dag_identifier, outcome))
self.engine.dag_node_execution_started.disconnect(self._toolbox.ui.graphicsView._start_animation)
self.engine.dag_node_execution_finished.disconnect(self._toolbox.ui.graphicsView._stop_animation)
self.engine.dag_node_execution_finished.disconnect(self._toolbox.ui.graphicsView._run_leave_animation)
self.engine.dag_node_execution_finished.disconnect(self._handle_dag_node_execution_finished)
[docs] def execute_selected(self):
"""Executes DAGs corresponding to all selected project items."""
if not self.dag_handler.dags():
self._logger.msg_warning.emit("Project has no items to execute")
return
# Get selected item
selected_indexes = self._toolbox.ui.treeView_project.selectedIndexes()
if not selected_indexes:
self._logger.msg_warning.emit("Please select a project item and try again.")
return
dags = set()
executable_item_names = list()
for ind in selected_indexes:
item = self._project_item_model.item(ind)
executable_item_names.append(item.name)
dag = self.dag_handler.dag_with_node(item.name)
if not dag:
self._logger.msg_error.emit(
"[BUG] Could not find a graph containing {0}. "
"<b>Please reopen the project.</b>".format(item.name)
)
continue
dags.add(dag)
execution_permit_list = list()
for dag in dags:
execution_permits = dict()
for item_name in dag.nodes:
execution_permits[item_name] = item_name in executable_item_names
execution_permit_list.append(execution_permits)
self._logger.msg.emit("")
self._logger.msg.emit("-------------------------------------------------")
self._logger.msg.emit("<b>Executing Selected Directed Acyclic Graphs</b>")
self._logger.msg.emit("-------------------------------------------------")
self.execute_dags(dags, execution_permit_list)
for name in executable_item_names:
# Make sure transient files and file pattern resources get updated throughout the DAG
self.notify_changes_in_containing_dag(name)
[docs] def execute_project(self):
"""Executes all dags in the project."""
self.project_execution_about_to_start.emit()
dags = self.dag_handler.dags()
if not dags:
self._logger.msg_warning.emit("Project has no items to execute")
return
execution_permit_list = list()
for dag in dags:
execution_permit_list.append({item_name: True for item_name in dag.nodes})
self._logger.msg.emit("")
self._logger.msg.emit("--------------------------------------------")
self._logger.msg.emit("<b>Executing All Directed Acyclic Graphs</b>")
self._logger.msg.emit("--------------------------------------------")
self.execute_dags(dags, execution_permit_list)
# Make sure transient file resources are updated after execution.
self.notify_changes_in_all_dags()
[docs] def stop(self):
"""Stops execution. Slot for the main window Stop tool button in the toolbar."""
if self._execution_stopped:
self._logger.msg.emit("No execution in progress")
return
self._logger.msg.emit("Stopping...")
self._execution_stopped = True
if self.engine:
self.engine.stop()
[docs] def export_graphs(self):
"""Exports all valid directed acyclic graphs in project to GraphML files."""
if not self.dag_handler.dags():
self._logger.msg_warning.emit("Project has no graphs to export")
return
i = 0
for g in self.dag_handler.dags():
fn = str(i) + ".graphml"
path = os.path.join(self.project_dir, fn)
if not self.dag_handler.export_to_graphml(g, path):
self._logger.msg_warning.emit("Exporting graph nr. {0} failed. Not a directed acyclic graph".format(i))
else:
self._logger.msg.emit("Graph nr. {0} exported to {1}".format(i, path))
i += 1
@Slot(object)
[docs] def notify_changes_in_dag(self, dag):
"""Notifies the items in given dag that the dag has changed."""
node_successors = self.dag_handler.node_successors(dag)
reduced_node_successors = dict(node_successors)
reversed_ranking = []
while reduced_node_successors:
same_ranks = [node for node, successors in reduced_node_successors.items() if not successors]
for ranked_node in same_ranks:
del reduced_node_successors[ranked_node]
for node, successors in reduced_node_successors.items():
reduced_node_successors[node] = [s for s in successors if s != ranked_node]
reversed_ranking.append(same_ranks)
ranks = {node: rank for rank, nodes in enumerate(reversed(reversed_ranking)) for node in nodes}
if not node_successors:
# Not a dag, invalidate workflow
edges = self.dag_handler.edges_causing_loops(dag)
for node in dag.nodes():
ind = self._project_item_model.find_item(node)
project_item = self._project_item_model.item(ind).project_item
project_item.invalidate_workflow(edges)
return
# Make resource map and run simulation
node_predecessors = inverted(node_successors)
for item_name in node_successors:
item = self._project_item_model.get_item(item_name).project_item
resources = []
for parent_name in node_predecessors.get(item_name, set()):
parent_item = self._project_item_model.get_item(parent_name).project_item
resources += parent_item.resources_for_direct_successors()
item.handle_dag_changed(ranks[item_name], resources)
[docs] def notify_changes_in_all_dags(self):
"""Notifies all items of changes in all dags in the project."""
for g in self.dag_handler.dags():
self.notify_changes_in_dag(g)
[docs] def notify_changes_in_containing_dag(self, item):
"""Notifies items in dag containing the given item that the dag has changed."""
dag = self.dag_handler.dag_with_node(item)
# Some items trigger this method while they are being initialized
# but before they have been added to any DAG.
# In those cases we don't need to notify other items.
if dag:
self.notify_changes_in_dag(dag)
@property
@Slot(str, "QVariant", "QVariant")
[docs] def _handle_dag_node_execution_finished(self, item_name, execution_direction, engine_state):
"""Handles successful execution of a dag node.
Performs post successful execution actions in corresponding project item."""
item = self._project_item_model.get_item(item_name)
if item is None:
return
item.project_item.handle_execution_successful(execution_direction, engine_state)
[docs] def direct_successors(self, item):
"""Returns a list of direct successor nodes for given project item."""
item_name = item.name
dags = self.dag_handler.dags()
for dag in dags:
successors = self.dag_handler.node_successors(dag)
items_successors = successors.get(item_name)
if items_successors is not None:
return [self._project_item_model.get_item(successor).project_item for successor in items_successors]
return []