######################################################################################################################
# Copyright (C) 2017-2020 Spine project consortium
# This file is part of Spine Toolbox.
# Spine Toolbox is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option)
# any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
# Public License for more details. You should have received a copy of the GNU Lesser General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
######################################################################################################################
"""
Contains Importer project item class.
:authors: P. Savolainen (VTT), P. Vennström (VTT), A. Soininen (VTT)
:date: 10.6.2019
"""
from collections import Counter
import os
from PySide2.QtCore import Qt, Slot
from PySide2.QtWidgets import QListWidget, QDialog, QVBoxLayout, QDialogButtonBox
from spine_engine import ExecutionDirection
from spinetoolbox.project_item import ProjectItem
from spinetoolbox.helpers import create_dir, serialize_path
from spinetoolbox.spine_io.gdx_utils import find_gams_directory
from spinetoolbox.spine_io.importers.csv_reader import CSVConnector
from spinetoolbox.spine_io.importers.excel_reader import ExcelConnector
from spinetoolbox.spine_io.importers.gdx_connector import GdxConnector
from spinetoolbox.spine_io.importers.json_reader import JSONConnector
from spinetoolbox.import_editor.widgets.import_editor_window import ImportEditorWindow
from .commands import UpdateSettingsCommand
from ..shared.commands import UpdateCancelOnErrorCommand, ChangeItemSelectionCommand
from ..shared.models import FileListModel
from ..shared.helpers import deserialize_checked_states, serialize_checked_states
from .executable_item import ExecutableItem
from .item_info import ItemInfo
from .utils import deserialize_mappings
[docs]_CONNECTOR_NAME_TO_CLASS = {
"CSVConnector": CSVConnector,
"ExcelConnector": ExcelConnector,
"GdxConnector": GdxConnector,
"JSONConnector": JSONConnector,
}
[docs]class Importer(ProjectItem):
def __init__(
self, toolbox, project, logger, name, description, mappings, x, y, cancel_on_error=True, mapping_selection=None
):
"""Importer class.
Args:
toolbox (ToolboxUI): QMainWindow instance
project (SpineToolboxProject): the project this item belongs to
logger (LoggerInterface): a logger instance
name (str): Project item name
description (str): Project item description
mappings (list): List where each element contains two dicts (path dict and mapping dict)
x (float): Initial icon scene X coordinate
y (float): Initial icon scene Y coordinate
cancel_on_error (bool, optional): if True the item's execution will stop on import error
mapping_selection (list, optional): serialized checked states for each file item
either selected or unselected
"""
super().__init__(name, description, x, y, project, logger)
# Make logs subdirectory for this item
self._toolbox = toolbox
self.logs_dir = os.path.join(self.data_dir, "logs")
try:
create_dir(self.logs_dir)
except OSError:
self._logger.msg_error.emit(f"[OSError] Creating directory {self.logs_dir} failed. Check permissions.")
# Variables for saving selections when item is (de)activated
if not mappings:
mappings = list()
# convert table_types and table_row_types keys to int since json always has strings as keys.
for _, mapping in mappings:
table_types = mapping.get("table_types", {})
mapping["table_types"] = {
table_name: {int(col): t for col, t in col_types.items()}
for table_name, col_types in table_types.items()
}
table_row_types = mapping.get("table_row_types", {})
mapping["table_row_types"] = {
table_name: {int(row): t for row, t in row_types.items()}
for table_name, row_types in table_row_types.items()
}
# Convert serialized paths to absolute in mappings
_fix_1d_array_to_array(mappings)
self.settings = deserialize_mappings(mappings, self._project.project_dir)
# self.settings is now a dictionary, where elements have the absolute path as the key and the mapping as value
self.cancel_on_error = cancel_on_error
self._file_model = FileListModel()
mapping_selection = deserialize_checked_states(mapping_selection, self._project.project_dir)
self._file_model.set_initial_state(mapping_selection)
self._file_model.selected_state_changed.connect(self._push_file_selection_change_to_undo_stack)
# connector class
self._preview_widget = {} # Key is the filepath, value is the ImportEditorWindow instance
@staticmethod
[docs] def item_type():
"""See base class."""
return ItemInfo.item_type()
@staticmethod
[docs] def item_category():
"""See base class."""
return ItemInfo.item_category()
[docs] def execution_item(self):
"""Creates project item's execution counterpart."""
selected_settings = dict()
for file_item in self._file_model.files:
label = file_item.label
settings = self.settings.get(label) if file_item.selected else "deselected"
if not settings:
continue
selected_settings[label] = settings
gams_path = self._project.settings.value("appSettings/gamsPath", defaultValue=None)
executable = ExecutableItem(
self.name, selected_settings, self.logs_dir, gams_path, self.cancel_on_error, self._logger
)
return executable
@Slot()
[docs] def handle_execution_successful(self, execution_direction, engine_state):
"""Notifies Toolbox of successful database import."""
if execution_direction != ExecutionDirection.FORWARD:
return
successors = self._project.direct_successors(self)
committed_db_maps = set()
for successor in successors:
if successor.item_type() == "Data Store":
url = successor.sql_alchemy_url()
database_map = self._project.db_mngr.get_db_map(url, self._logger)
if database_map is not None:
committed_db_maps.add(database_map)
if committed_db_maps:
cookie = self
self._project.db_mngr.session_committed.emit(committed_db_maps, cookie)
[docs] def make_signal_handler_dict(self):
"""Returns a dictionary of all shared signals and their handlers.
This is to enable simpler connecting and disconnecting."""
s = super().make_signal_handler_dict()
s[self._properties_ui.toolButton_open_dir.clicked] = lambda checked=False: self.open_directory()
s[self._properties_ui.pushButton_import_editor.clicked] = self._handle_import_editor_clicked
s[self._properties_ui.treeView_files.doubleClicked] = self._handle_files_double_clicked
s[self._properties_ui.cancel_on_error_checkBox.stateChanged] = self._handle_cancel_on_error_changed
return s
@Slot(int)
[docs] def _handle_cancel_on_error_changed(self, _state):
cancel_on_error = self._properties_ui.cancel_on_error_checkBox.isChecked()
if self.cancel_on_error == cancel_on_error:
return
self._toolbox.undo_stack.push(UpdateCancelOnErrorCommand(self, cancel_on_error))
[docs] def set_cancel_on_error(self, cancel_on_error):
self.cancel_on_error = cancel_on_error
if not self._active:
return
check_state = Qt.Checked if self.cancel_on_error else Qt.Unchecked
self._properties_ui.cancel_on_error_checkBox.blockSignals(True)
self._properties_ui.cancel_on_error_checkBox.setCheckState(check_state)
self._properties_ui.cancel_on_error_checkBox.blockSignals(False)
[docs] def set_file_selected(self, label, selected):
self._file_model.set_selected(label, selected)
[docs] def restore_selections(self):
"""Restores selections into shared widgets when this project item is selected."""
self._properties_ui.cancel_on_error_checkBox.setCheckState(Qt.Checked if self.cancel_on_error else Qt.Unchecked)
self._properties_ui.label_name.setText(self.name)
self._properties_ui.treeView_files.setModel(self._file_model)
[docs] def save_selections(self):
"""Saves selections in shared widgets for this project item into instance variables."""
self._properties_ui.treeView_files.setModel(None)
[docs] def update_name_label(self):
"""Update Importer properties tab name label. Used only when renaming project items."""
self._properties_ui.label_name.setText(self.name)
@Slot(bool)
[docs] def _handle_import_editor_clicked(self, checked=False):
"""Opens Import editor for the file selected in list view."""
index = self._properties_ui.treeView_files.currentIndex()
if not index.isValid():
for row, item in enumerate(self._file_model.files):
if item.exists():
index = self._file_model.index(row, 0)
self._properties_ui.treeView_files.setCurrentIndex(index)
break
self.open_import_editor(index)
@Slot("QModelIndex")
[docs] def _handle_files_double_clicked(self, index):
"""Opens Import editor for the double clicked index."""
self.open_import_editor(index)
[docs] def open_import_editor(self, index):
"""Opens Import editor for the given index."""
label = index.data()
if label is None:
self._logger.msg_error.emit("Please select a source file from the list first.")
return
file_item = self._file_model.find_file(label)
file_path = file_item.path
if not file_item.exists():
self._logger.msg_error.emit(f"File does not exist yet.")
self._file_model.mark_as_nonexistent(index)
return
if not os.path.exists(file_path):
self._logger.msg_error.emit(f"Cannot find file '{file_path}'.")
self._file_model.mark_as_nonexistent(index)
return
# Raise current form for the selected file if any
preview_widget = self._preview_widget.get(label, None)
if preview_widget:
if preview_widget.windowState() & Qt.WindowMinimized:
# Remove minimized status and restore window with the previous state (maximized/normal state)
preview_widget.setWindowState(preview_widget.windowState() & ~Qt.WindowMinimized | Qt.WindowActive)
preview_widget.activateWindow()
else:
preview_widget.raise_()
return
# Create a new form for the selected file
settings = self.get_settings(label)
# Try and get connector from settings
source_type = settings.get("source_type", None)
if source_type is not None:
connector = _CONNECTOR_NAME_TO_CLASS[source_type]
else:
# Ask user
connector = self.get_connector(label)
if not connector:
# Aborted by the user
return
self._logger.msg.emit(f"Opening Import editor for file: {file_path}")
connector_settings = {"gams_directory": self._gams_system_directory()}
preview_widget = self._preview_widget[label] = ImportEditorWindow(
self, file_path, connector, connector_settings, settings, self._toolbox
)
preview_widget.settings_updated.connect(lambda s, importee=label: self.save_settings(s, importee))
preview_widget.connection_failed.connect(lambda m, importee=label: self._connection_failed(m, importee))
preview_widget.destroyed.connect(lambda o=None, importee=label: self._preview_destroyed(importee))
preview_widget.start_ui()
[docs] def get_connector(self, importee):
"""Shows a QDialog to select a connector for the given source file.
Mimics similar routine in `spine_io.widgets.import_widget.ImportDialog`
Args:
importee (str): Label of the file acting as an importee
Returns:
Asynchronous data reader class for the given importee
"""
connector_list = [CSVConnector, ExcelConnector, GdxConnector, JSONConnector] # add others as needed
connector_names = [c.DISPLAY_NAME for c in connector_list]
dialog = QDialog(self._toolbox)
dialog.setLayout(QVBoxLayout())
connector_list_wg = QListWidget()
connector_list_wg.addItems(connector_names)
# Set current item in `connector_list_wg` based on file extension
_filename, file_extension = os.path.splitext(importee)
file_extension = file_extension.lower()
if file_extension.startswith(".xls"):
row = connector_list.index(ExcelConnector)
elif file_extension in (".csv", ".dat", ".txt"):
row = connector_list.index(CSVConnector)
elif file_extension == ".gdx":
row = connector_list.index(GdxConnector)
elif file_extension == ".json":
row = connector_list.index(JSONConnector)
else:
row = None
if row is not None:
connector_list_wg.setCurrentRow(row)
button_box = QDialogButtonBox(QDialogButtonBox.Ok | QDialogButtonBox.Cancel)
button_box.button(QDialogButtonBox.Ok).clicked.connect(dialog.accept)
button_box.button(QDialogButtonBox.Cancel).clicked.connect(dialog.reject)
connector_list_wg.doubleClicked.connect(dialog.accept)
dialog.layout().addWidget(connector_list_wg)
dialog.layout().addWidget(button_box)
_dirname, filename = os.path.split(importee)
dialog.setWindowTitle("Select connector for '{}'".format(filename))
answer = dialog.exec_()
if answer:
row = connector_list_wg.currentIndex().row()
return connector_list[row]
[docs] def select_connector_type(self, index):
"""Opens dialog to select connector type for the given index."""
importee = index.data()
connector = self.get_connector(importee)
if not connector:
# Aborted by the user
return
settings = self.get_settings(importee)
settings["source_type"] = connector.__name__
[docs] def _connection_failed(self, msg, importee):
self._logger.msg_error.emit(msg)
preview_widget = self._preview_widget.pop(importee, None)
if preview_widget:
preview_widget.close()
[docs] def get_settings(self, importee):
"""Returns the mapping dictionary for the file in given path.
Args:
importee (str): Label of the file whose mapping is queried
Returns:
dict: Mapping dictionary for the requested importee or an empty dict if not found
"""
return self.settings.get(importee, {})
[docs] def save_settings(self, settings, importee):
"""Updates an existing mapping or adds a new mapping
(settings) after closing the import preview window.
Args:
settings (dict): Updated mapping (settings) dictionary
importee (str): Absolute path to a file, whose mapping has been updated
"""
if self.settings.get(importee) == settings:
return
self._toolbox.undo_stack.push(UpdateSettingsCommand(self, settings, importee))
[docs] def _preview_destroyed(self, importee):
"""Destroys preview widget instance for the given importee.
Args:
importee (str): Absolute path to a file, whose preview widget is destroyed
"""
self._preview_widget.pop(importee, None)
@Slot(bool, str)
[docs] def _push_file_selection_change_to_undo_stack(self, selected, label):
"""Makes changes to file selection undoable."""
self._toolbox.undo_stack.push(ChangeItemSelectionCommand(self, selected, label))
[docs] def _do_handle_dag_changed(self, resources):
"""See base class."""
self._file_model.update(resources)
labels = self._file_model.labels()
for settings_label in list(self.settings):
if settings_label not in labels:
del self.settings[settings_label]
self._notify_if_duplicate_file_paths()
if self._file_model.rowCount() == 0:
self.add_notification(
"This Importer does not have any input data. "
"Connect Data Connections to this Importer to use their data as input."
)
[docs] def item_dict(self):
"""Returns a dictionary corresponding to this item."""
d = super().item_dict()
# Serialize mappings before saving
d["mappings"] = self.serialize_mappings(self.settings, self._project.project_dir)
d["cancel_on_error"] = self._properties_ui.cancel_on_error_checkBox.isChecked()
d["mapping_selection"] = serialize_checked_states(self._file_model.files, self._project.project_dir)
return d
[docs] def notify_destination(self, source_item):
"""See base class."""
if source_item.item_type() == "Data Connection":
self._logger.msg.emit(
"Link established. You can define mappings on data from "
f"<b>{source_item.name}</b> using item <b>{self.name}</b>."
)
elif source_item.item_type() == "Tool":
self._logger.msg.emit(
"Link established. You can define mappings on output files from "
f"<b>{source_item.name}</b> using item <b>{self.name}</b>."
)
elif source_item.item_type() == "Data Store":
self._logger.msg.emit("Link established")
elif source_item.item_type() == "Gimlet":
self._logger.msg.emit(
"Link established. You can define mappings on output files from "
f"<b>{source_item.name}</b> using item <b>{self.name}</b>."
)
else:
super().notify_destination(source_item)
@staticmethod
[docs] def default_name_prefix():
"""see base class"""
return "Importer"
[docs] def tear_down(self):
"""Closes all preview widgets."""
for widget in self._preview_widget.values():
widget.close()
[docs] def _notify_if_duplicate_file_paths(self):
"""Adds a notification if file_list contains duplicate entries."""
labels = list()
for item in self._file_model.files:
labels.append(item.label)
file_counter = Counter(labels)
duplicates = list()
for label, count in file_counter.items():
if count > 1:
duplicates.append(label)
if duplicates:
self.add_notification("Duplicate input files from upstream items:<br>{}".format("<br>".join(duplicates)))
@staticmethod
[docs] def upgrade_from_no_version_to_version_1(item_name, old_item_dict, old_project_dir):
"""Converts mappings to a list, where each element contains two dictionaries,
the serialized path dictionary and the mapping dictionary for the file in that
path."""
new_importer = dict(old_item_dict)
mappings = new_importer.get("mappings", {})
list_of_mappings = list()
paths = list(mappings.keys())
for path in paths:
mapping = mappings[path]
if "source_type" in mapping and mapping["source_type"] == "CSVConnector":
_fix_csv_connector_settings(mapping)
new_path = serialize_path(path, old_project_dir)
if new_path["relative"]:
new_path["path"] = os.path.join(".spinetoolbox", "items", new_path["path"])
list_of_mappings.append([new_path, mapping])
new_importer["mappings"] = list_of_mappings
return new_importer
@staticmethod
[docs] def upgrade_v1_to_v2(item_name, item_dict):
"""Upgrades item's dictionary from v1 to v2.
Changes:
- if mapping_selection does not exist or if it
is a list of booleans, reset mapping_selection
to an empty list.
Args:
item_name (str): item's name
item_dict (dict): Version 1 item dictionary
Returns:
dict: Version 2 Exporter dictionary
"""
try:
mapping_selection = item_dict["mapping_selection"]
except KeyError:
item_dict["mapping_selection"] = list()
return item_dict
if len(mapping_selection) == 0:
return item_dict
if isinstance(mapping_selection[0], bool):
item_dict["mapping_selection"] = list()
return item_dict
@staticmethod
[docs] def serialize_mappings(mappings, project_path):
"""
Serializes the importer's mappings.
Returns a list where each element contains two dictionaries:
the 'serialized' file label in a dictionary and the mapping dictionary.
Args:
mappings (dict): Dictionary with mapping specifications
project_path (str): Path to project directory
Returns:
list: serialized file item labels and mappings
"""
serialized_mappings = list()
for source, mapping in mappings.items(): # mappings is a dict with absolute paths as keys and mapping as values
serialized_mappings.append([serialize_path(source, project_path), mapping])
return serialized_mappings
[docs] def _gams_system_directory(self):
"""Returns GAMS system path from Toolbox settings or None if GAMS default is to be used."""
path = self._project.settings.value("appSettings/gamsPath", defaultValue=None)
if not path:
path = find_gams_directory()
if path is not None and os.path.isfile(path):
path = os.path.dirname(path)
return path
[docs]def _fix_1d_array_to_array(mappings):
"""
Replaces '1d array' with 'array' for parameter type in mappings.
With spinedb_api >= 0.3, '1d array' parameter type was replaced by 'array'.
Other settings in a mapping are backwards compatible except the name.
"""
for more_mappings in mappings:
for settings in more_mappings:
table_mappings = settings.get("table_mappings")
if table_mappings is not None:
for sheet_settings in table_mappings.values():
for setting in sheet_settings:
parameter_setting = setting.get("parameters")
if parameter_setting is not None:
parameter_type = parameter_setting.get("parameter_type")
if parameter_type == "1d array":
parameter_setting["parameter_type"] = "array"
[docs]def _fix_csv_connector_settings(settings):
"""CSVConnector saved the table names as the filepath, change that
to 'csv' instead. This function will mutate the dictionary.
Args:
settings (dict): Mapping settings that should be updated
"""
table_mappings = settings.get("table_mappings", {})
k = list(table_mappings)
if len(k) == 1 and k[0] != "csv":
table_mappings["csv"] = table_mappings.pop(k[0])
table_types = settings.get("table_types", {})
k = list(table_types.keys())
if len(k) == 1 and k[0] != "csv":
table_types["csv"] = table_types.pop(k[0])
table_row_types = settings.get("table_row_types", {})
k = list(table_row_types.keys())
if len(k) == 1 and k[0] != "csv":
table_row_types["csv"] = table_row_types.pop(k[0])
table_options = settings.get("table_options", {})
k = list(table_options.keys())
if len(k) == 1 and k[0] != "csv":
table_options["csv"] = table_options.pop(k[0])
selected_tables = settings.get("selected_tables", [])
if len(selected_tables) == 1 and selected_tables[0] != "csv":
selected_tables.pop(0)
selected_tables.append("csv")