######################################################################################################################
# Copyright (C) 2017 - 2019 Spine project consortium
# This file is part of Spine Toolbox.
# Spine Toolbox is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option)
# any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
# Public License for more details. You should have received a copy of the GNU Lesser General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
######################################################################################################################
"""
Contains classes for handling project item execution.
:author: P. Savolainen (VTT)
:date: 8.4.2019
"""
import logging
import os
import fnmatch
from PySide2.QtCore import Signal, Slot, QObject
import networkx as nx
[docs]class DirectedGraphHandler:
"""Class for manipulating graphs according to user's actions.
Args:
toolbox (ToolboxUI): QMainWindow instance
"""
def __init__(self, toolbox):
"""Class constructor."""
self._toolbox = toolbox
self.running_dag = None
self.running_item = None
self._dags = list()
[docs] def dags(self):
"""Returns a list of graphs (DiGraph) in the project."""
return self._dags
[docs] def add_dag(self, dag):
"""Add graph to list.
Args:
dag (DiGraph): Graph to add
"""
self._dags.append(dag)
[docs] def remove_dag(self, dag):
"""Remove graph from instance variable list.
Args:
dag (DiGraph): Graph to remove
"""
self._dags.remove(dag)
[docs] def add_dag_node(self, node_name):
"""Create directed graph with one node and add it to list.
Args:
node_name (str): Project item name to add as a node
"""
dag = nx.DiGraph()
dag.add_node(node_name)
self._dags.append(dag)
[docs] def add_graph_edge(self, src_node, dst_node):
"""Adds an edge between the src and dst nodes. If nodes are in
different graphs, the reference to union graph is saved and the
references to the original graphs are removed. If src and dst
nodes are already in the same graph, the edge is added to the graph.
If src and dst are the same node, a self-loop (feedback) edge is
added.
Args:
src_node (str): Source project item node name
dst_node (str): Destination project item node name
"""
src_graph = self.dag_with_node(src_node)
dst_graph = self.dag_with_node(dst_node)
if src_node == dst_node:
# Add self-loop to src graph and return
src_graph.add_edge(src_node, dst_node)
return
if src_graph == dst_graph:
# src and dst are already in same graph. Just add edge to src_graph and return
src_graph.add_edge(src_node, dst_node)
return
# Unify graphs
union_dag = nx.union(src_graph, dst_graph)
union_dag.add_edge(src_node, dst_node)
self.add_dag(union_dag)
# Remove src and dst graphs
self.remove_dag(src_graph)
self.remove_dag(dst_graph)
return
[docs] def remove_graph_edge(self, src_node, dst_node):
"""Removes edge from a directed graph.
Args:
src_node (str): Source project item node name
dst_node (str): Destination project item node name
"""
dag = self.dag_with_edge(src_node, dst_node)
if src_node == dst_node: # Removing self-loop
dag.remove_edge(src_node, dst_node)
return
# dag_copy = copy.deepcopy(dag) # Make a copy before messing with the graph
dag.remove_edge(src_node, dst_node)
# Check if src or dst node is isolated (without connections) after removing the edge
if self.node_is_isolated(src_node):
dag.remove_node(src_node) # Remove node from original dag
g = nx.DiGraph()
g.add_node(src_node) # Make a new graph containing only the isolated node
self.add_dag(g)
return
if self.node_is_isolated(dst_node):
dag.remove_node(dst_node)
g = nx.DiGraph()
g.add_node(dst_node)
self.add_dag(g)
return
# If src node still has a path (ignoring edge directions) to dst node -> return, we're fine
if self.nodes_connected(dag, src_node, dst_node):
return
# Now for the fun part. We need to break the original DAG into two separate DAGs.
# Get src node descendant edges, ancestor edges, and its own edges
src_descendants = nx.descendants(dag, src_node)
src_descendant_edges = nx.edges(dag, src_descendants)
src_ancestors = nx.ancestors(dag, src_node)
src_ancestor_edges = nx.edges(dag, src_ancestors)
src_edges = nx.edges(dag, src_node)
# Get dst node descendant edges, ancestor edges, and its own edges
dst_descendants = nx.descendants(dag, dst_node)
dst_descendant_edges = nx.edges(dag, dst_descendants)
dst_ancestors = nx.ancestors(dag, dst_node)
dst_ancestor_edges = nx.edges(dag, dst_ancestors)
dst_edges = nx.edges(dag, dst_node)
# Make descendant graph. This graph contains src node and all its neighbors.
descendant_graph = nx.DiGraph()
descendant_graph.add_edges_from(src_descendant_edges)
descendant_graph.add_edges_from(src_ancestor_edges)
descendant_graph.add_edges_from(src_edges)
# Make ancestor graph. This graph contains the dst node and all its neighbors.
ancestor_graph = nx.DiGraph()
ancestor_graph.add_edges_from(dst_descendant_edges)
ancestor_graph.add_edges_from(dst_ancestor_edges)
ancestor_graph.add_edges_from(dst_edges)
# Remove old graph and add new graphs instead
self.remove_dag(dag)
self.add_dag(descendant_graph)
self.add_dag(ancestor_graph)
[docs] def remove_node_from_graph(self, node_name):
"""Removes node from a graph that contains
it. Called when project item is removed from project.
Args:
node_name (str): Project item name
"""
# This is called every time a previous project is closed and another is opened.
g = self.dag_with_node(node_name)
edges_to_remove = list()
for edge in g.edges():
if node_name in (edge[0], edge[1]):
edges_to_remove.append(edge)
g.remove_edges_from(edges_to_remove)
# Now remove the node itself
g.remove_node(node_name)
# Loop through remaining nodes and check if any of them are isolated now
nodes_to_remove = list()
for node in g.nodes():
if self.node_is_isolated(node, allow_self_loop=True):
nodes_to_remove.append(node)
h = nx.DiGraph()
h.add_node(node)
if g.has_edge(node, node):
h.add_edge(node, node)
self.add_dag(h)
g.remove_nodes_from(nodes_to_remove)
if not g.nodes():
self.remove_dag(g)
[docs] def rename_node(self, old_name, new_name):
"""Handles renaming the node and edges in a graph when a project item is renamed.
Args:
old_name (str): Old project item name
new_name (str): New project item name
Returns:
bool: True if successful, False if renaming failed
"""
g = self.dag_with_node(old_name)
mapping = {old_name: new_name} # old_name->new_name
nx.relabel_nodes(g, mapping, copy=False) # copy=False modifies g in place
[docs] def dag_with_node(self, node_name):
"""Returns directed graph that contains given node.
Args:
node_name (str): Node to look for
Returns:
(DiGraph): Directed graph that contains node or None if not found.
"""
for dag in self.dags():
if dag.has_node(node_name):
return dag
logging.error("Graph containing node %s not found. Something is wrong.", node_name)
return None
[docs] def dag_with_edge(self, src_node, dst_node):
"""Returns directed graph that contains given edge.
Args:
src_node (str): Source node name
dst_node (str): Destination node name
Returns:
(DiGraph): Directed graph that contains edge or None if not found.
"""
for dag in self.dags():
if dag.has_edge(src_node, dst_node):
return dag
logging.error("Graph containing edge %s->%s not found. Something is wrong.", src_node, dst_node)
return None
[docs] def calc_exec_order(self, g):
"""Returns an bfs-ordered list of nodes in the given graph.
Adds a dummy source node to the graph if there are more than
one nodes that have no inbound connections. The dummy source
node is needed for the bfs-algorithm.
Args:
g (DiGraph): Directed graph to process
Returns:
list: bfs-ordered list of node names (first item at index 0).
Empty list if given graph is not a DAG.
"""
exec_order = list()
if not nx.is_directed_acyclic_graph(g):
return exec_order
sources = self.source_nodes(g) # Project items that have no inbound connections
if not sources:
# Should not happen if nx.is_directed_acyclic_graph() works
logging.error("This graph has no source nodes. Execution failed.")
return exec_order
if len(sources) > 1:
# Make an invisible source node for all nodes that have no inbound connections
invisible_src_node = 0 # This is unique name since it's an integer. Item called "0" can still be created
g.add_node(invisible_src_node)
for src in sources:
g.add_edge(invisible_src_node, src)
# Calculate bfs-order by using the invisible dummy source node
edges_to_execute = list(nx.bfs_edges(g, invisible_src_node))
# Now remove the invisible dummy source node
for src in sources:
g.remove_edge(invisible_src_node, src)
g.remove_node(invisible_src_node)
else:
# The dag contains only one source item, so it can be used as the source node directly
# Calculate bfs-order
edges_to_execute = list(nx.bfs_edges(g, sources[0]))
exec_order.append(sources[0]) # Add source node
# Collect dst nodes from bfs-edge iterator
for src, dst in edges_to_execute:
exec_order.append(dst)
return exec_order
[docs] def node_is_isolated(self, node, allow_self_loop=False):
"""Checks if the project item with the given name has any connections.
Args:
node (str): Project item name
allow_self_loop (bool): If default (False), Self-loops are considered as an
in-neighbor or an out-neighbor so the method returns False. If True,
single node with a self-loop is considered isolated.
Returns:
bool: True if project item has no in-neighbors nor out-neighbors, False if it does.
Single node with a self-loop is NOT isolated (returns False).
"""
g = self.dag_with_node(node)
if not allow_self_loop:
return nx.is_isolate(g, node)
has_self_loop = g.has_edge(node, node)
if not has_self_loop:
return nx.is_isolate(g, node)
# The node has a self-loop.
# Node degree is the number of edges that are connected to it. A self-loop increases the degree by 2
deg = g.degree(node)
if deg - 2 == 0: # If degree - 2 is zero, it is isolated.
return True
return False
@staticmethod
[docs] def source_nodes(g):
"""Returns a list of source nodes in given graph.
A source node has no incoming edges. This is determined
by calculating the in-degree of each node in the graph.
If nodes in-degree == 0, it is a source node
Args:
g (DiGraph): Graph to examine
Returns:
list: List of source node names or an empty list is there are none.
"""
s = list()
for node in g.nodes():
in_deg = g.in_degree(node)
if in_deg == 0:
# logging.debug("node:{0} is a source node".format(node))
s.append(node)
return s
@staticmethod
[docs] def nodes_connected(dag, a, b):
"""Checks if node a is connected to node b. Edge directions are ignored.
If any of source node a's ancestors or descendants have a path to destination
node b, returns True. Also returns True if destination node b has a path to
any of source node a's ancestors or descendants.
Args:
dag (DiGraph): Graph that contains nodes a and b
a (str): Node name
b (str): Another node name
Returns:
bool: True if a and b are connected, False otherwise
"""
src_anc = nx.ancestors(dag, a)
src_des = nx.descendants(dag, a)
# logging.debug("src {0} ancestors:{1}. descendants:{2}".format(a, src_anc, src_des))
# Check ancestors
for anc in src_anc:
# Check if any src ancestor has a path to dst node
if nx.has_path(dag, anc, b):
# logging.debug("Found path from anc {0} to dst {1}".format(anc, b))
return True
# Check if dst node has a path to any src ancestor
if nx.has_path(dag, b, anc):
# logging.debug("Found path from dst {0} to anc {1}".format(b, anc))
return True
# Check descendants
for des in src_des:
# Check if any src descendant has a path to dst node
if nx.has_path(dag, des, b):
# logging.debug("Found path from des {0} to dst {1}".format(des, b))
return True
# Check if dst node has a path to any src descendant
if nx.has_path(dag, b, des):
# logging.debug("Found path from dst {0} to des {1}".format(b, des))
return True
return False
@staticmethod
[docs] def export_to_graphml(g, path):
"""Export given graph to a path in GraphML format.
Args:
g (DiGraph): Graph to export
path (str): Full output path for GraphML file
Returns:
bool: Operation success status
"""
if not nx.is_directed_acyclic_graph(g):
return False
nx.write_graphml(g, path, prettyprint=True)
return True
[docs]class ExecutionInstance(QObject):
"""Class for the graph that is being executed. Contains references to
files and resources advertised by project items so that project items downstream can find them.
Args:
toolbox (ToolboxUI): QMainWindow instance
execution_list (list): Ordered list of nodes to execute
"""
[docs] graph_execution_finished_signal = Signal(int, name="graph_execution_finished_signal")
[docs] project_item_execution_finished_signal = Signal(int, name="project_item_execution_finished_signal")
def __init__(self, toolbox, execution_list):
"""Class constructor."""
QObject.__init__(self)
self._toolbox = toolbox
self.execution_list = execution_list # Ordered list of nodes to execute. First node at index 0
self.running_item = None
self.dc_refs = list() # Data Connection reference list
self.dc_files = list() # Data Connection file list
self.ds_refs = dict() # DS refs. Key is dialect, value is a list of paths or urls depending on dialect
self.di_data = dict() # Data Interface data. Key is DI name, value is data for import
self.tool_output_files = list() # Paths to result files from ToolInstance
[docs] def start_execution(self):
"""Pops the next item from the execution list and starts executing it."""
self.running_item = self.execution_list.pop(0)
self.execute_project_item()
[docs] def execute_project_item(self):
"""Starts executing project item."""
self.project_item_execution_finished_signal.connect(self.item_execution_finished)
item_ind = self._toolbox.project_item_model.find_item(self.running_item)
item = self._toolbox.project_item_model.project_item(item_ind)
item.execute()
@Slot(int, name="item_execution_finished")
[docs] def item_execution_finished(self, item_finish_state):
"""Pop next project item to execute or finish current graph if there are no items left.
Args:
item_finish_state (int): 0=Continue to next project item. -2=Stop executing this graph (happens when e.g.
Tool does not find req. input files or something)
"""
self.project_item_execution_finished_signal.disconnect()
if item_finish_state == -1:
# Item execution failed due to e.g. Tool did not find input files or something
self.graph_execution_finished_signal.emit(-1)
return
if item_finish_state == -2:
# User pressed Stop button
self.graph_execution_finished_signal.emit(-2)
return
try:
self.running_item = self.execution_list.pop(0)
except IndexError:
self.graph_execution_finished_signal.emit(0)
return
self.execute_project_item()
[docs] def stop(self):
"""Stops running project item and terminates current graph execution."""
if not self.running_item:
self._toolbox.msg.emit("No running item")
self.graph_execution_finished_signal.emit(-2)
return
item_ind = self._toolbox.project_item_model.find_item(self.running_item)
item = self._toolbox.project_item_model.project_item(item_ind)
item.stop_execution()
return
[docs] def add_ds_ref(self, dialect, ref):
"""Adds given database reference to a dictionary. Key is the dialect.
If dialect is sqlite, value is a list of full paths to sqlite files.
For other dialects, key is the dialect and value is a list of URLs to
database servers.
Args:
dialect (str): Dialect name (lower case)
ref (str): Database reference
"""
try:
self.ds_refs[dialect].append(ref)
except KeyError:
self.ds_refs[dialect] = [ref]
[docs] def add_di_data(self, di_name, data):
"""Adds given data from data interface to a list.
Args:
di_name (str): Data interface name
data (dict): Data to import
"""
self.di_data[di_name] = data
[docs] def append_dc_refs(self, refs):
"""Adds given file paths (Data Connection file references) to a list.
Args:
refs (list): List of file paths (references)
"""
self.dc_refs += refs
[docs] def append_dc_files(self, files):
"""Adds given project data file paths to a list.
Args:
files (list): List of file paths
"""
self.dc_files += files
[docs] def find_file(self, filename):
"""Returns the first occurrence to full path to given file name or None if file was not found.
Args:
filename (str): Searched file name (no path) TODO: Change to pattern
Returns:
str: Full path to file if found, None if not found
"""
# Look in Data Stores
# SQLITE
try:
for sqlite_ref in self.ds_refs["sqlite"]:
_, file_candidate = os.path.split(sqlite_ref)
if file_candidate == filename:
# logging.debug("Found path for {0} from ds refs: {1}".format(filename, sqlite_ref))
return sqlite_ref
except KeyError:
pass
# MYSQL
try:
for mysql_url in self.ds_refs["mysql"]:
_, file_candidate = os.path.split(mysql_url)
if file_candidate == filename:
# logging.debug("Found path for {0} from ds refs: {1}".format(filename, mysql_url))
return mysql_url
except KeyError:
pass
# MSSQL
try:
for mssql_url in self.ds_refs["mssql"]:
_, file_candidate = os.path.split(mssql_url)
if file_candidate == filename:
# logging.debug("Found path for {0} from ds refs: {1}".format(filename, mssql_url))
return mssql_url
except KeyError:
pass
# POSTGRESQL
try:
for postgresql_url in self.ds_refs["postgresql"]:
_, file_candidate = os.path.split(postgresql_url)
if file_candidate == filename:
# logging.debug("Found path for {0} from ds refs: {1}".format(filename, postgresql_url))
return postgresql_url
except KeyError:
pass
# ORACLE
try:
for oracle_url in self.ds_refs["oracle"]:
_, file_candidate = os.path.split(oracle_url)
if file_candidate == filename:
logging.debug("Found path for % from ds refs: %s", filename, oracle_url)
return oracle_url
except KeyError:
pass
# Look in Data Connections
for dc_ref in self.dc_refs:
_, file_candidate = os.path.split(dc_ref)
if file_candidate == filename:
# logging.debug("Found path for {0} from dc refs: {1}".format(filename, dc_ref))
return dc_ref
for dc_file in self.dc_files:
_, file_candidate = os.path.split(dc_file)
if file_candidate == filename:
# logging.debug("Found path for {0} from dc files: {1}".format(filename, dc_file))
return dc_file
# Look in Tool output files
for tool_file in self.tool_output_files:
_, file_candidate = os.path.split(tool_file)
if file_candidate == filename:
# logging.debug("Found path for {0} from Tool result files: {1}".format(filename, tool_file))
return tool_file
return None
[docs] def find_optional_files(self, pattern):
"""Returns a list of found paths to files that match the given pattern.
Returns:
list: List of (full) paths
"""
# logging.debug("Searching optional input files. Pattern: '{0}'".format(pattern))
matches = list()
# Find matches when pattern includes wildcards
if ('*' in pattern) or ('?' in pattern):
# Find matches in Data Store references
try:
# NOTE: Only sqlite files are checked
ds_matches = fnmatch.filter(self.ds_refs["sqlite"], pattern)
except KeyError:
ds_matches = list()
# Find matches in Data Connection references
dc_ref_matches = fnmatch.filter(self.dc_refs, pattern)
# Find matches in Data Connection data files
dc_file_matches = fnmatch.filter(self.dc_files, pattern)
# Find matches in Tool output files
tool_matches = fnmatch.filter(self.tool_output_files, pattern)
matches += ds_matches + dc_ref_matches + dc_file_matches + tool_matches
else:
# Pattern is an exact filename (no wildcards)
match = self.find_file(pattern)
if match is not None:
matches.append(match)
# logging.debug("Matches:{0}".format(matches))
return matches