Source code for spinetoolbox.project_items.exporter.worker

######################################################################################################################
# Copyright (C) 2017 - 2019 Spine project consortium
# This file is part of Spine Toolbox.
# Spine Toolbox is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option)
# any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
# Public License for more details. You should have received a copy of the GNU Lesser General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
######################################################################################################################

"""
A worker based machinery to construct the settings data structures needed for gdx export outside the UI loop.

:author: A. Soininen (VTT)
:date:   19.12.2019
"""

from copy import deepcopy
from PySide2.QtCore import QObject, QThread, Signal, Slot
from spinedb_api import (
    apply_scenario_filter_to_parameter_value_sq,
    apply_alternative_filter_to_parameter_value_sq,
    DatabaseMapping,
    SpineDBAPIError,
)
from spinetoolbox.spine_io.exporters import gdx
from .db_utils import latest_database_commit_time_stamp


[docs]class Worker(QObject): """ A worker to construct export settings for a database. Attributes: thread (QThread): the thread the worker executes in """
[docs] database_unavailable = Signal(str)
"""Emitted when opening the database fails."""
[docs] errored = Signal(str, object)
"""Emitted when an error occurs."""
[docs] finished = Signal(str, object)
"""Emitted when the worker has finished.""" # LoggerInterface signals
[docs] msg = Signal(str, str)
[docs] msg_warning = Signal(str, str)
[docs] msg_error = Signal(str, str)
def __init__(self, database_url, scenario, none_fallback): """ Args: database_url (str): database's URL scenario (str, optional): scenario name or None if 'Base' alternative should be used none_fallback (NoneFallback): how to handle None parameter values """ super().__init__() self.thread = QThread() self.moveToThread(self.thread) self._scenario = scenario self._none_fallback = none_fallback self._database_url = str(database_url) self._previous_settings = None self._previous_indexing_settings = None self._previous_merging_settings = None self.thread.started.connect(self._fetch_settings) @Slot()
[docs] def _fetch_settings(self): """Constructs settings and parameter index settings.""" result = _Result(*self._read_settings()) if result.set_settings is None: return if self._previous_settings is not None: updated_settings = deepcopy(self._previous_settings) updated_settings.update(result.set_settings) updated_indexing_settings = self._update_indexing_settings(updated_settings, result.indexing_settings) if updated_indexing_settings is None: return updated_merging_settings = self._update_merging_settings(updated_settings) if updated_merging_settings is None: return result.set_settings = updated_settings result.indexing_settings = updated_indexing_settings result.merging_settings = updated_merging_settings self.finished.emit(self._database_url, result) self.thread.quit()
[docs] def set_previous_settings(self, previous_settings, previous_indexing_settings, previous_merging_settings): """ Makes worker update existing settings instead of just making new ones. Args: previous_settings (gdx.SetSettings): existing set settings previous_indexing_settings (dict): existing indexing settings previous_merging_settings (dict): existing merging settings """ self._previous_settings = previous_settings self._previous_indexing_settings = previous_indexing_settings self._previous_merging_settings = previous_merging_settings
@staticmethod
[docs] def _read_scenarios(database_map): scenario_rows = database_map.query(database_map.scenario_sq).all() scenarios = {row.name: row.active for row in scenario_rows} return scenarios
[docs] def _read_settings(self): """Reads fresh gdx settings from the database.""" try: database_map = DatabaseMapping(self._database_url) except SpineDBAPIError: self.database_unavailable.emit(self._database_url) return None, None, None, None try: scenarios = self._read_scenarios(database_map) if self._scenario is not None and self._scenario not in scenarios: self.errored.emit(self._database_url, f"Scenario {self._scenario} not found.") return None, None, None, None if self._scenario is None: apply_alternative_filter_to_parameter_value_sq(database_map, ["Base"]) else: apply_scenario_filter_to_parameter_value_sq(database_map, self._scenario) except SpineDBAPIError as error: self.errored.emit(self._database_url, error) return None, None, None, None try: time_stamp = latest_database_commit_time_stamp(database_map) settings = gdx.make_set_settings(database_map) logger = _Logger(self._database_url, self) indexing_settings = gdx.make_indexing_settings(database_map, self._none_fallback, logger) except gdx.GdxExportException as error: self.errored.emit(self._database_url, error) return None, None, None, None finally: database_map.connection.close() return time_stamp, settings, indexing_settings, scenarios
[docs] def _update_indexing_settings(self, updated_settings, new_indexing_settings): """Updates the parameter indexing settings according to changes in the database.""" updated_indexing_settings = gdx.update_indexing_settings( self._previous_indexing_settings, new_indexing_settings, updated_settings ) return updated_indexing_settings
[docs] def _update_merging_settings(self, updated_settings): """Updates the parameter merging settings according to changes in the database""" try: database_map = DatabaseMapping(self._database_url) except SpineDBAPIError as error: self.errored.emit(self._database_url, error) return None try: updated_merging_settings = gdx.update_merging_settings( self._previous_merging_settings, updated_settings, database_map ) except gdx.GdxExportException as error: self.errored.emit(self._database_url, error) return None finally: database_map.connection.close() return updated_merging_settings
[docs]class _Result: """ Contains fetched export settings. Attributes: commit_time_stamp (datetime): time of the database's last commit set_settings (gdx.SetSettings): gdx export settings indexing_settings (dict): parameter indexing settings merging_settings (dict): parameter merging settings scenarios (dict): map from scenario name to boolean 'active' flag """ def __init__(self, time_stamp, set_settings, indexing_settings, scenarios): """ Args: time_stamp (datetime): time of the database's last commit set_settings (gdx.SetSettings): gdx export settings indexing_settings (dict): parameter indexing settings scenarios (dict): map from scenario name to boolean 'active' flag """ self.commit_time_stamp = time_stamp self.set_settings = set_settings self.indexing_settings = indexing_settings self.merging_settings = dict() self.scenarios = scenarios
[docs]class _Logger(QObject): """A ``LoggerInterface`` compliant logger that relays messages to :class:`Worker`'s signals."""
[docs] msg = Signal(str)
[docs] msg_warning = Signal(str)
[docs] msg_error = Signal(str)
def __init__(self, database_url, worker): """ Args: database_url (str): a database url worker (Worker): a worker """ super().__init__() self._url = database_url self._worker = worker self.msg.connect(self.relay_message) self.msg_warning.connect(self.relay_warning) self.msg_error.connect(self.relay_error) @Slot(str)
[docs] def relay_message(self, text): self._worker.msg.emit(self._url, text)
@Slot(str)
[docs] def relay_warning(self, text): self._worker.msg_warning.emit(self._url, text)
@Slot(str)
[docs] def relay_error(self, text): self._worker.msg_error.emit(self._url, text)