######################################################################################################################
# Copyright (C) 2017-2020 Spine project consortium
# This file is part of Spine Toolbox.
# Spine Toolbox is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option)
# any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
# Public License for more details. You should have received a copy of the GNU Lesser General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
######################################################################################################################
"""
Contains facilities to open and execute projects without GUI.
:authors: A. Soininen (VTT)
:date: 29.4.2020
"""
from enum import IntEnum, unique
import json
import logging
import pathlib
import sys
from PySide2.QtCore import QCoreApplication, QEvent, QObject, QSettings, Signal, Slot
from spine_engine import SpineEngine, SpineEngineState
from .dag_handler import DirectedGraphHandler
from .helpers import deserialize_path
from .load_project_items import load_executable_items, load_item_specification_factories
[docs]class HeadlessLogger(QObject):
"""A :class:`LoggerInterface` compliant logger that uses Python's standard logging facilities."""
"""Emits a notification message."""
[docs] msg_success = Signal(str)
"""Emits a message on success"""
[docs] msg_warning = Signal(str)
"""Emits a warning message."""
[docs] msg_error = Signal(str)
"""Emits an error message."""
"""Emits a message originating from a subprocess (usually something printed to stdout)."""
[docs] msg_proc_error = Signal(str)
"""Emits an error message originating from a subprocess (usually something printed to stderr)."""
"""Requests an 'information message box' (e.g. a message window) to be opened with a given title and message."""
[docs] error_box = Signal(str, str)
"""Requests an 'error message box' to be opened with a given title and message."""
def __init__(self):
super().__init__()
self.msg.connect(self._log_message)
self.msg_success.connect(self._log_message)
self.msg_warning.connect(self._log_warning)
self.msg_error.connect(self._log_error)
self.msg_proc.connect(self._log_message)
self.msg_proc_error.connect(self._log_error)
self.information_box.connect(self._show_information_box)
self.error_box.connect(self._show_error_box)
# pylint: disable=no-self-use
@Slot(str)
[docs] def _log_message(self, message):
"""Writes an information message to Python's logging system."""
logging.info(message)
# pylint: disable=no-self-use
@Slot(str)
[docs] def _log_warning(self, message):
"""Writes a warning message to Python's logging system."""
logging.warning(message)
# pylint: disable=no-self-use
@Slot(str)
[docs] def _log_error(self, message):
"""Writes an error message to Python's logging system."""
logging.error(message)
# pylint: disable=no-self-use
@Slot(str, str)
# pylint: disable=no-self-use
@Slot(str, str)
[docs] def _show_error_box(self, title, message):
"""Writes an error message with a title to Python's logging system."""
logging.error(title + ": " + message)
[docs]class ExecuteProject(QObject):
"""
A 'task' which opens and executes a Toolbox project when triggered to do so.
The execution of this task is triggered by sending it a 'startup' QEvent using, e.g. QCoreApplication.postEvent()
"""
"""A private signal to actually start execution. Not to be used directly. Post a startup event instead."""
def __init__(self, args, startup_event_type, parent):
"""
Args:
args (argparse.Namespace): parsed command line arguments
startup_event_type (int): expected type id for the event that starts this task
parent (QObject): a parent object
"""
super().__init__(parent)
self._args = args
self._startup_event_type = startup_event_type
self._start.connect(self._execute)
@Slot()
[docs] def _execute(self):
"""Executes this task."""
try:
status = self._open_and_execute_project()
QCoreApplication.instance().exit(status)
except Exception:
QCoreApplication.instance().exit(_Status.ERROR)
raise
[docs] def _open_and_execute_project(self):
"""Opens a project and executes all DAGs in that project."""
logger = HeadlessLogger()
project_dir = self._args.project
project_file_path = pathlib.Path(project_dir, ".spinetoolbox", "project.json").resolve()
try:
with project_file_path.open() as project_file:
try:
project_dict = json.load(project_file)
except json.decoder.JSONDecodeError:
logger.msg_error.emit(f"Error in project file {project_file_path}. Invalid JSON.")
return _Status.ERROR
except OSError:
logger.msg_error.emit(f"Project file {project_file_path} missing")
return _Status.ERROR
executable_items, dag_handler = open_project(project_dict, project_dir, logger)
if executable_items is None:
return _Status.ERROR
dags = dag_handler.dags()
for dag in dags:
node_successors = dag_handler.node_successors(dag)
if not node_successors:
logger.msg_error.emit("The project contains a graph that is not a Directed Acyclic Graph.")
return _Status.ERROR
items_in_dag = tuple(item for item in executable_items if item.name in dag.nodes)
execution_permits = {item_name: True for item_name in dag.nodes}
engine = SpineEngine(items_in_dag, node_successors, execution_permits)
engine.run()
if engine.state() == SpineEngineState.FAILED:
return _Status.ERROR
return _Status.OK
[docs] def event(self, e):
if e.type() == self._startup_event_type:
e.accept()
self._start.emit()
return True
return super().event(e)
[docs]def headless_main(args):
"""
Executes a project using :class:`QCoreApplication`.
Args:
args (argparser.Namespace): parsed command line arguments.
Returns:
int: exit status code; 0 for success, everything else for failure
"""
application = QCoreApplication(sys.argv)
startup_event_type = QEvent.Type(QEvent.registerEventType())
task = ExecuteProject(args, startup_event_type, application)
application.postEvent(task, QEvent(startup_event_type))
return application.exec_()
[docs]def open_project(project_dict, project_dir, logger):
"""
Opens a project.
Args:
project_dict (dict): a serialized project dictionary
project_dir (str): path to a directory containing the ``.spinetoolbox`` dir
logger (LoggerInterface): a logger
Returns:
tuple: a list of executable items, a dict of item specifications, and a DagHandler object
"""
app_settings = QSettings("SpineProject", "Spine Toolbox")
specification_factories = load_item_specification_factories()
item_specifications = _specifications(project_dict, project_dir, specification_factories, app_settings, logger)
executable_classes = load_executable_items()
executable_items = list()
dag_handler = DirectedGraphHandler()
for item_dicts in project_dict["objects"].values():
for item_name, item_dict in item_dicts.items():
dag_handler.add_dag_node(item_name)
try:
item_type = item_dict["type"]
except KeyError:
logger.msg_error.emit(
"Project item is missing the 'type' attribute in the project.json file."
" This might be caused by an outdated project file."
)
return None, None
executable_class = executable_classes[item_type]
try:
item = executable_class.from_dict(
item_dict, item_name, project_dir, app_settings, item_specifications, logger
)
except KeyError as missing_key:
logger.msg_error.emit(f"'{missing_key}' is missing in the project.json file.")
item = None
if item is None:
return None, None
executable_items.append(item)
for connection in project_dict["project"]["connections"]:
from_name = connection["from"][0]
to_name = connection["to"][0]
dag_handler.add_graph_edge(from_name, to_name)
return executable_items, dag_handler
[docs]def _specifications(project_dict, project_dir, specification_factories, app_settings, logger):
"""
Creates project item specifications.
Args:
project_dict (dict): a serialized project dictionary
project_dir (str): path to a directory containing the ``.spinetoolbox`` dir
specification_factories (dict): a mapping from item type to specification factory
app_settings (QSettings): Toolbox settings
logger (LoggerInterface): a logger
Returns:
dict: a mapping from item type and specification name to specification
"""
specifications = dict()
specifications_dict = project_dict["project"].get("specifications", {})
definition_file_paths = dict()
for item_type, serialized_paths in specifications_dict.items():
definition_file_paths[item_type] = [deserialize_path(path, project_dir) for path in serialized_paths]
for item_type, paths in definition_file_paths.items():
for definition_path in paths:
try:
with open(definition_path, "r") as definition_file:
try:
definition = json.load(definition_file)
except ValueError:
logger.msg_error.emit(f"Item specification file '{definition_path}' not valid")
continue
except FileNotFoundError:
logger.msg_error.emit(f"Specification file <b>{definition_path}</b> does not exist")
continue
factory = specification_factories.get(item_type)
if factory is None:
continue
specification = factory.make_specification(
definition,
definition_path,
app_settings,
logger,
embedded_julia_console=None,
embedded_python_console=None,
)
specifications.setdefault(item_type, dict())[specification.name] = specification
return specifications
[docs]@unique
class _Status(IntEnum):
"""Status codes returned from headless execution."""