######################################################################################################################
# Copyright (C) 2017-2020 Spine project consortium
# This file is part of Spine Toolbox.
# Spine Toolbox is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option)
# any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
# Public License for more details. You should have received a copy of the GNU Lesser General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
######################################################################################################################
"""
Exporter project item.
:author: A. Soininen (VTT)
:date: 5.9.2019
"""
from copy import deepcopy
import pathlib
import os.path
from PySide2.QtCore import QObject, Signal, Slot
from spinedb_api import DatabaseMapping, SpineDBAPIError
from spinetoolbox.project_item import ProjectItem, ProjectItemResource
from spinetoolbox.project_commands import UpdateExporterOutFileNameCommand, UpdateExporterSettingsCommand
from spinetoolbox.helpers import deserialize_path, serialize_url
from spinetoolbox.spine_io import gdx_utils
from spinetoolbox.spine_io.exporters import gdx
from .settings_state import SettingsState
from .widgets.gdx_export_settings import GdxExportSettings
from .widgets.export_list_item import ExportListItem
from .worker import Worker
[docs]class Exporter(ProjectItem):
"""
This project item handles all functionality regarding exporting a database to a file.
Currently, only .gdx format is supported.
"""
def __init__(self, name, description, settings_packs, x, y, toolbox, project, logger):
"""
Args:
name (str): item name
description (str): item description
settings_packs (list): dicts mapping database URLs to _SettingsPack objects
x (float): initial X coordinate of item icon
y (float): initial Y coordinate of item icon
toolbox (ToolboxUI): a ToolboxUI instance
project (SpineToolboxProject): the project this item belongs to
logger (LoggerInterface): a logger instance
"""
super().__init__(name, description, x, y, project, logger)
self._toolbox = toolbox
self._settings_packs = dict()
self._export_list_items = dict()
self._workers = dict()
if settings_packs is None:
settings_packs = list()
for pack in settings_packs:
serialized_url = pack["database_url"]
url = deserialize_path(serialized_url, self._project.project_dir)
url = _normalize_url(url)
try:
settings_pack = SettingsPack.from_dict(pack, url, logger)
except gdx.GdxExportException:
settings_pack = SettingsPack("")
settings_pack.notifications.changed_due_to_settings_state.connect(self._report_notifications)
self._settings_packs[url] = settings_pack
for url, pack in self._settings_packs.items():
if pack.state != SettingsState.OK:
self._start_worker(url)
[docs] def set_up(self):
"""See base class."""
self._project.db_mngr.session_committed.connect(self._update_settings_after_db_commit)
@staticmethod
[docs] def item_type():
"""See base class."""
return "Exporter"
@staticmethod
[docs] def category():
"""See base class."""
return "Exporters"
[docs] def settings_pack(self, database_path):
return self._settings_packs[database_path]
[docs] def make_signal_handler_dict(self):
"""Returns a dictionary of all shared signals and their handlers."""
s = {self._properties_ui.open_directory_button.clicked: self.open_directory}
return s
[docs] def restore_selections(self):
"""Restores selections and connects signals."""
self._properties_ui.item_name_label.setText(self.name)
self._update_properties_tab()
[docs] def _connect_signals(self):
super()._connect_signals()
for url, pack in self._settings_packs.items():
if pack.state == SettingsState.ERROR:
self._start_worker(url)
[docs] def _update_properties_tab(self):
"""Updates the database list in the properties tab."""
database_list_storage = self._properties_ui.databases_list_layout
while not database_list_storage.isEmpty():
widget_to_remove = database_list_storage.takeAt(0)
widget_to_remove.widget().deleteLater()
self._export_list_items.clear()
for url, pack in self._settings_packs.items():
item = self._export_list_items[url] = ExportListItem(url, pack.output_file_name, pack.state)
database_list_storage.addWidget(item)
item.open_settings_clicked.connect(self._show_settings)
item.file_name_changed.connect(self._update_out_file_name)
pack.state_changed.connect(item.handle_settings_state_changed)
[docs] def execute_forward(self, resources):
"""See base class."""
database_urls = [r.url for r in resources if r.type_ == "database"]
gams_system_directory = self._resolve_gams_system_directory()
if gams_system_directory is None:
self._logger.msg_error.emit(f"<b>{self.name}</b>: Cannot proceed. No GAMS installation found.")
return False
for url in database_urls:
settings_pack = self._settings_packs.get(url, None)
if settings_pack is None:
self._logger.msg_error.emit(f"<b>{self.name}</b>: No export settings defined for database {url}.")
return False
if not settings_pack.output_file_name:
self._logger.msg_error.emit(f"<b>{self.name}</b>: No file name given to export database {url}.")
return False
if settings_pack.state == SettingsState.FETCHING:
self._logger.msg_error.emit(f"<b>{self.name}</b>: Settings not ready for database {url}.")
return False
if settings_pack.state == SettingsState.INDEXING_PROBLEM:
self._logger.msg_error.emit(
f"<b>{self.name}</b>: Parameters missing indexing information for database {url}."
)
return False
if settings_pack.state == SettingsState.ERROR:
self._logger.msg_error.emit(f"<b>{self.name}</b>: Ill formed database {url}.")
return False
out_path = os.path.join(self.data_dir, settings_pack.output_file_name)
try:
database_map = DatabaseMapping(url)
gdx.to_gdx_file(
database_map,
out_path,
settings_pack.indexing_domains + settings_pack.merging_domains,
settings_pack.settings,
settings_pack.indexing_settings,
settings_pack.merging_settings,
gams_system_directory,
)
except (gdx.GdxExportException, SpineDBAPIError) as error:
self._logger.msg_error.emit(f"Failed to export <b>{url}</b> to .gdx: {error}")
return False
finally:
database_map.connection.close()
self._logger.msg_success.emit(f"File <b>{out_path}</b> written")
return True
[docs] def _do_handle_dag_changed(self, resources):
"""See base class."""
database_urls = set(r.url for r in resources if r.type_ == "database")
if database_urls == set(self._settings_packs):
self._check_state()
return
# Drop settings packs without connected databases.
for database_url in list(self._settings_packs):
if database_url not in database_urls:
pack = self._settings_packs[database_url]
if pack.settings_window is not None:
pack.settings_window.close()
del self._settings_packs[database_url]
# Add new databases.
for database_url in database_urls:
if database_url not in self._settings_packs:
self._settings_packs[database_url] = SettingsPack("")
self._start_worker(database_url)
if self._active:
self._update_properties_tab()
self._check_state()
[docs] def _start_worker(self, database_url, update_settings=False):
"""Starts fetching settings using a worker in another thread."""
worker = self._workers.get(database_url, None)
if worker is not None and worker.isRunning():
worker.requestInterruption()
worker.wait()
elif worker is None:
worker = Worker(database_url)
worker.settings_read.connect(self._update_export_settings)
worker.indexing_settings_read.connect(self._update_indexing_settings)
worker.indexing_domains_read.connect(self._update_indexing_domains)
worker.merging_settings_read.connect(self._update_merging_settings)
worker.merging_domains_read.connect(self._update_merging_domains)
worker.finished.connect(self._worker_finished)
worker.errored.connect(self._worker_failed)
self._workers[database_url] = worker
if update_settings:
pack = self._settings_packs[database_url]
worker.set_previous_settings(
pack.settings, pack.indexing_settings, pack.indexing_domains, pack.merging_settings
)
else:
worker.reset_previous_settings()
self._settings_packs[database_url].state = SettingsState.FETCHING
worker.start()
@Slot(str, "QVariant")
[docs] def _update_export_settings(self, database_url, settings):
"""Sets new settings for given database."""
pack = self._settings_packs.get(database_url)
if pack is None:
return
pack.settings = settings
@Slot(str, "QVariant")
[docs] def _update_indexing_settings(self, database_url, indexing_settings):
"""Sets new indexing settings for given database."""
pack = self._settings_packs.get(database_url)
if pack is None:
return
pack.indexing_settings = indexing_settings
@Slot(str, "QVariant")
[docs] def _update_indexing_domains(self, database_url, domains):
"""Sets new indexing domains for given database."""
pack = self._settings_packs.get(database_url)
if pack is None:
return
pack.indexing_domains = domains
@Slot(str, "QVariant")
[docs] def _update_merging_settings(self, database_url, settings):
"""Sets new merging settings for given database."""
pack = self._settings_packs.get(database_url)
if pack is None:
return
pack.merging_settings = settings
@Slot(str, "QVariant")
[docs] def _update_merging_domains(self, database_url, domains):
"""Sets new merging domains for given database."""
pack = self._settings_packs.get(database_url)
if pack is None:
return
pack.merging_domains = domains
@Slot(str)
[docs] def _worker_finished(self, database_url):
"""Cleans up after a worker has finished fetching export settings."""
if database_url in self._workers:
worker = self._workers[database_url]
worker.wait()
worker.deleteLater()
del self._workers[database_url]
if database_url in self._settings_packs:
settings_pack = self._settings_packs[database_url]
if settings_pack.settings_window is not None:
self._send_settings_to_window(database_url)
settings_pack.state = SettingsState.OK
self._check_state()
@Slot(str, "QVariant")
[docs] def _worker_failed(self, database_url, exception):
"""Clean up after a worker has failed fetching export settings."""
if database_url in self._settings_packs:
self._logger.msg_error.emit(
f"<b>[{self.name}]</b> Initializing settings for database {database_url}" f" failed: {exception}"
)
self._settings_packs[database_url].state = SettingsState.ERROR
self._report_notifications()
if database_url in self._workers:
worker = self._workers[database_url]
worker.wait()
worker.deleteLater()
del self._workers[database_url]
[docs] def _check_state(self, clear_before_check=True):
"""
Checks the status of database export settings.
Updates both the notification message (exclamation icon) and settings states.
"""
self._check_missing_file_names()
self._check_duplicate_file_names()
self._check_missing_parameter_indexing()
self._check_erroneous_databases()
self._report_notifications()
[docs] def _check_missing_file_names(self):
"""Checks the status of output file names."""
for pack in self._settings_packs.values():
pack.notifications.missing_output_file_name = not pack.output_file_name
[docs] def _check_duplicate_file_names(self):
"""Checks for duplicate output file names."""
packs = list(self._settings_packs.values())
for pack in packs:
pack.notifications.duplicate_output_file_name = False
for index, pack in enumerate(packs):
if not pack.output_file_name:
continue
for other_pack in packs[index + 1 :]:
if pack.output_file_name == other_pack.output_file_name:
pack.notifications.duplicate_output_file_name = True
other_pack.notifications.duplicate_output_file_name = True
break
[docs] def _check_missing_parameter_indexing(self):
"""Checks the status of parameter indexing settings."""
for pack in self._settings_packs.values():
missing_indexing = False
if pack.state not in (SettingsState.FETCHING, SettingsState.ERROR):
pack.state = SettingsState.OK
for setting in pack.indexing_settings.values():
if setting.indexing_domain is None:
pack.state = SettingsState.INDEXING_PROBLEM
missing_indexing = True
break
pack.notifications.missing_parameter_indexing = missing_indexing
[docs] def _check_erroneous_databases(self):
"""Checks errors in settings fetching from a database."""
for pack in self._settings_packs.values():
pack.notifications.erroneous_database = pack.state == SettingsState.ERROR
@Slot()
[docs] def _report_notifications(self):
"""Updates the exclamation icon and notifications labels."""
if self._icon is None:
return
self.clear_notifications()
merged = _Notifications()
for pack in self._settings_packs.values():
merged |= pack.notifications
if merged.duplicate_output_file_name:
self.add_notification("Duplicate output file names.")
if merged.missing_output_file_name:
self.add_notification("Output file name(s) missing.")
if merged.missing_parameter_indexing:
self.add_notification("Parameter indexing settings need to be updated.")
if merged.erroneous_database:
self.add_notification("Failed to initialize export settings for a database.")
@Slot(str)
[docs] def _show_settings(self, database_url):
"""Opens the item's settings window."""
settings_pack = self._settings_packs[database_url]
if settings_pack.state == SettingsState.FETCHING:
return
# Give window its own settings and indexing domains so Cancel doesn't change anything here.
settings = deepcopy(settings_pack.settings)
indexing_settings = deepcopy(settings_pack.indexing_settings)
additional_parameter_indexing_domains = list(settings_pack.indexing_domains)
merging_settings = deepcopy(settings_pack.merging_settings)
additional_merging_domains = list(settings_pack.merging_domains)
if settings_pack.settings_window is None:
settings_pack.settings_window = GdxExportSettings(
settings,
indexing_settings,
additional_parameter_indexing_domains,
merging_settings,
additional_merging_domains,
database_url,
self._toolbox,
)
settings_pack.settings_window.settings_accepted.connect(self._update_settings_from_settings_window)
settings_pack.settings_window.settings_rejected.connect(self._dispose_settings_window)
settings_pack.settings_window.reset_requested.connect(self._reset_settings_window)
settings_pack.state_changed.connect(settings_pack.settings_window.handle_settings_state_changed)
settings_pack.settings_window.show()
@Slot(str)
[docs] def _reset_settings_window(self, database_url):
"""Sends new settings to Gdx Export Settings window."""
pack = self._settings_packs[database_url]
pack.merging_settings = dict()
pack.merging_domains = list()
self._start_worker(database_url)
@Slot(str)
[docs] def _dispose_settings_window(self, database_url):
"""Deletes rejected export settings windows."""
self._settings_packs[database_url].settings_window = None
@Slot(str, str)
[docs] def _update_out_file_name(self, file_name, database_path):
"""Pushes a new UpdateExporterOutFileNameCommand to the toolbox undo stack."""
self._toolbox.undo_stack.push(UpdateExporterOutFileNameCommand(self, file_name, database_path))
@Slot(str)
[docs] def _update_settings_from_settings_window(self, database_path):
"""Pushes a new UpdateExporterSettingsCommand to the toolbox undo stack."""
window = self._settings_packs[database_path].settings_window
settings = window.settings
indexing_settings = window.indexing_settings
indexing_domains = window.indexing_domains
merging_settings = window.merging_settings
merging_domains = window.merging_domains
self._toolbox.undo_stack.push(
UpdateExporterSettingsCommand(
self, settings, indexing_settings, indexing_domains, merging_settings, merging_domains, database_path
)
)
[docs] def undo_redo_out_file_name(self, file_name, database_path):
"""Updates the output file name for given database"""
if self._active:
export_list_item = self._export_list_items.get(database_path)
export_list_item.out_file_name_edit.setText(file_name)
self._settings_packs[database_path].output_file_name = file_name
self._settings_packs[database_path].notifications.missing_output_file_name = not file_name
self._check_duplicate_file_names()
self._report_notifications()
[docs] def undo_or_redo_settings(
self, settings, indexing_settings, indexing_domains, merging_settings, merging_domains, database_path
):
"""Updates the export settings for given database."""
settings_pack = self._settings_packs[database_path]
settings_pack.settings = settings
settings_pack.indexing_settings = indexing_settings
settings_pack.indexing_domains = indexing_domains
settings_pack.merging_settings = merging_settings
settings_pack.merging_domains = merging_domains
window = settings_pack.settings_window
if window is not None:
self._send_settings_to_window(database_path)
self._check_missing_parameter_indexing()
self._report_notifications()
[docs] def item_dict(self):
"""Returns a dictionary corresponding to this item's configuration."""
d = super().item_dict()
packs = list()
for url, pack in self._settings_packs.items():
pack_dict = pack.to_dict()
serialized_url = serialize_url(url, self._project.project_dir)
pack_dict["database_url"] = serialized_url
packs.append(pack_dict)
d["settings_packs"] = packs
return d
[docs] def _discard_settings_window(self, database_path):
"""Discards the settings window for given database."""
del self._settings_windows[database_path]
[docs] def _send_settings_to_window(self, database_url):
"""Resets settings in given export settings window."""
settings_pack = self._settings_packs[database_url]
window = settings_pack.settings_window
settings = deepcopy(settings_pack.settings)
indexing_settings = deepcopy(settings_pack.indexing_settings)
indexing_domains = list(settings_pack.indexing_domains)
merging_settings = deepcopy(settings_pack.merging_settings)
merging_domains = list(settings_pack.merging_domains)
window.reset_settings(settings, indexing_settings, indexing_domains, merging_settings, merging_domains)
[docs] def update_name_label(self):
"""See base class."""
self._properties_ui.item_name_label.setText(self.name)
[docs] def _resolve_gams_system_directory(self):
"""Returns GAMS system path from Toolbox settings or None if GAMS default is to be used."""
path = self._project.settings.value("appSettings/gamsPath", defaultValue=None)
if not path:
path = gdx_utils.find_gams_directory()
if path is not None and os.path.isfile(path):
path = os.path.dirname(path)
return path
[docs] def notify_destination(self, source_item):
"""See base class."""
if source_item.item_type() == "Data Store":
self._logger.msg.emit(
f"Link established. Data Store <b>{source_item.name}</b> will be "
f"exported to a .gdx file by <b>{self.name}</b> when executing."
)
else:
super().notify_destination(source_item)
@Slot("QVariant")
[docs] def _update_settings_after_db_commit(self, committed_db_maps):
"""Refreshes export settings for databases after data has been committed to them."""
for db_map in committed_db_maps:
url = str(db_map.db_url)
if url in self._settings_packs:
self._start_worker(url, update_settings=True)
@staticmethod
[docs] def default_name_prefix():
"""See base class."""
return "Exporter"
[docs] def output_resources_forward(self):
"""See base class."""
files = [pack.output_file_name for pack in self._settings_packs.values()]
paths = [os.path.join(self.data_dir, file_name) for file_name in files]
resources = [ProjectItemResource(self, "file", url=pathlib.Path(path).as_uri()) for path in paths]
return resources
[docs] def tear_down(self):
"""See base class."""
self._project.db_mngr.session_committed.disconnect(self._update_settings_after_db_commit)
[docs]class SettingsPack(QObject):
"""
Keeper of all settings and stuff needed for exporting a database.
Attributes:
output_file_name (str): name of the export file
settings (Settings): export settings
indexing_settings (dict): parameter indexing settings
indexing_domains (list): extra domains needed for parameter indexing
merging_settings (dict): parameter merging settings
merging_domains (list): extra domains needed for parameter merging
settings_window (GdxExportSettings): settings editor window
"""
[docs] state_changed = Signal("QVariant")
"""Emitted when the pack's state changes."""
def __init__(self, output_file_name):
"""
Args:
output_file_name (str): name of the export file
"""
super().__init__()
self.output_file_name = output_file_name
self.settings = None
self.indexing_settings = None
self.indexing_domains = list()
self.merging_settings = dict()
self.merging_domains = list()
self.settings_window = None
self._state = SettingsState.FETCHING
self.notifications = _Notifications()
self.state_changed.connect(self.notifications.update_settings_state)
@property
[docs] def state(self):
"""State of the pack."""
return self._state
@state.setter
def state(self, state):
self._state = state
self.state_changed.emit(state)
[docs] def to_dict(self):
"""Stores the settings pack into a JSON compatible dictionary."""
d = dict()
d["output_file_name"] = self.output_file_name
# Override ERROR by FETCHING so we'll retry reading the database when reopening the project.
d["state"] = self.state.value
if self.state != SettingsState.OK:
return d
d["settings"] = self.settings.to_dict()
d["indexing_settings"] = gdx.indexing_settings_to_dict(self.indexing_settings)
d["indexing_domains"] = [domain.to_dict() for domain in self.indexing_domains]
d["merging_settings"] = {
parameter_name: setting.to_dict() for parameter_name, setting in self.merging_settings.items()
}
d["merging_domains"] = [domain.to_dict() for domain in self.merging_domains]
return d
@staticmethod
[docs] def from_dict(pack_dict, database_url, logger):
"""Restores the settings pack from a dictionary."""
pack = SettingsPack(pack_dict["output_file_name"])
pack.state = SettingsState(pack_dict["state"])
if pack.state != SettingsState.OK:
return pack
pack.settings = gdx.Settings.from_dict(pack_dict["settings"])
try:
db_map = DatabaseMapping(database_url)
pack.indexing_settings = gdx.indexing_settings_from_dict(pack_dict["indexing_settings"], db_map)
except SpineDBAPIError as error:
logger.msg_error.emit(
f"Failed to fully restore Exporter settings. Error while reading database '{database_url}': {error}"
)
return pack
else:
db_map.connection.close()
pack.indexing_domains = [gdx.Set.from_dict(set_dict) for set_dict in pack_dict["indexing_domains"]]
pack.merging_settings = {
parameter_name: gdx.MergingSetting.from_dict(setting_dict)
for parameter_name, setting_dict in pack_dict["merging_settings"].items()
}
pack.merging_domains = [gdx.Set.from_dict(set_dict) for set_dict in pack_dict["merging_domains"]]
return pack
[docs]class _Notifications(QObject):
"""
Holds flags for different error conditions.
Attributes:
duplicate_output_file_name (bool): if True there are duplicate output file names
missing_output_file_name (bool): if True the output file name is missing
missing_parameter_indexing (bool): if True there are indexed parameters without indexing domains
erroneous_database (bool): if True the database has issues
"""
[docs] changed_due_to_settings_state = Signal()
"""Emitted when notifications have changed due to changes in settings state."""
def __init__(self):
super().__init__()
self.duplicate_output_file_name = False
self.missing_output_file_name = False
self.missing_parameter_indexing = False
self.erroneous_database = False
[docs] def __ior__(self, other):
"""
ORs the flags with another notifications.
Args:
other (_Notifications): a _Notifications object
"""
self.duplicate_output_file_name |= other.duplicate_output_file_name
self.missing_output_file_name |= other.missing_output_file_name
self.missing_parameter_indexing |= other.missing_parameter_indexing
self.erroneous_database |= other.erroneous_database
return self
@Slot("QVariant")
[docs] def update_settings_state(self, state):
"""Updates the notifications according to settings state."""
changed = False
is_erroneous = state == SettingsState.ERROR
if self.erroneous_database != is_erroneous:
self.erroneous_database = is_erroneous
changed = True
is_problem = state == state.INDEXING_PROBLEM
if self.missing_parameter_indexing != is_problem:
self.missing_parameter_indexing = is_problem
changed = True
if changed:
self.changed_due_to_settings_state.emit()
[docs]def _normalize_url(url):
"""
Normalized url's path separators to their OS specific characters.
This function is needed during the transition period from no-version to version 1 project files.
It should be removed once we are using version 1 files.
"""
return "sqlite:///" + url[10:].replace("/", os.sep)