Source code for spinetoolbox.spine_db_editor.widgets.spine_db_editor

######################################################################################################################
# Copyright (C) 2017-2022 Spine project consortium
# Copyright Spine Toolbox contributors
# This file is part of Spine Toolbox.
# Spine Toolbox is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option)
# any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
# Public License for more details. You should have received a copy of the GNU Lesser General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
######################################################################################################################

"""Contains the SpineDBEditor class."""
import os
import json
from sqlalchemy.engine.url import URL
from PySide6.QtWidgets import (
    QMainWindow,
    QErrorMessage,
    QDockWidget,
    QMessageBox,
    QMenu,
    QAbstractScrollArea,
    QTabBar,
    QCheckBox,
    QDialog,
    QToolButton,
)
from PySide6.QtCore import QModelIndex, Qt, Signal, Slot, QTimer
from PySide6.QtGui import QGuiApplication, QKeySequence, QIcon, QAction
from spinedb_api import import_data, export_data, DatabaseMapping, SpineDBAPIError, SpineDBVersionError, Asterisk
from spinedb_api.spine_io.importers.excel_reader import get_mapped_data_from_xlsx
from spinedb_api.helpers import vacuum
from .custom_menus import MainMenu
from .commit_viewer import CommitViewer
from .mass_select_items_dialogs import MassRemoveItemsDialog, MassExportItemsDialog
from .stacked_view_mixin import StackedViewMixin
from .tree_view_mixin import TreeViewMixin
from .graph_view_mixin import GraphViewMixin
from .tabular_view_mixin import TabularViewMixin
from .url_toolbar import UrlToolBar
from .metadata_editor import MetadataEditor
from .item_metadata_editor import ItemMetadataEditor
from ...widgets.notification import ChangeNotifier, Notification
from ...widgets.parameter_value_editor import ParameterValueEditor
from ...widgets.custom_qwidgets import ToolBarWidgetAction
from ...widgets.commit_dialog import CommitDialog
from ...helpers import (
    get_save_file_name_in_last_dir,
    get_open_file_name_in_last_dir,
    format_string_list,
    call_on_focused_widget,
    busy_effect,
    CharIconEngine,
    preferred_row_height,
    unique_name,
)
from ...spine_db_parcel import SpineDBParcel
from ...config import APPLICATION_PATH


[docs]class SpineDBEditorBase(QMainWindow): """Base class for SpineDBEditor (i.e. Spine database editor)."""
[docs] msg = Signal(str)
[docs] msg_error = Signal(str)
[docs] file_exported = Signal(str, float, bool)
"""filepath, progress between 0 and 1, True if sqlite file""" def __init__(self, db_mngr): """ Args: db_mngr (SpineDBManager): The manager to use """ super().__init__() from ..ui.spine_db_editor_window import Ui_MainWindow # pylint: disable=import-outside-toplevel self.db_mngr = db_mngr self.db_maps = [] self.db_urls = [] self._change_notifiers = [] self._changelog = [] # Setup UI from Qt Designer file self.ui = Ui_MainWindow() self.ui.setupUi(self) self.takeCentralWidget().deleteLater() self.url_toolbar = UrlToolBar(self) self.addToolBar(Qt.TopToolBarArea, self.url_toolbar) toolbox = self.db_mngr.parent() if toolbox is not None: self.url_toolbar.show_toolbox_action.triggered.connect(toolbox.restore_and_activate) else: self.url_toolbar.show_toolbox_action.deleteLater() self.setAttribute(Qt.WA_DeleteOnClose) self.setWindowTitle("") self.qsettings = self.db_mngr.qsettings self.err_msg = QErrorMessage(self) self.err_msg.setWindowTitle("Error") self.err_msg.setWindowFlag(Qt.WindowType.WindowContextHelpButtonHint, False) self.silenced = False max_screen_height = max([s.availableSize().height() for s in QGuiApplication.screens()]) self.visible_rows = int(max_screen_height / preferred_row_height(self)) self.settings_group = "spineDBEditor" self.undo_action = None self.redo_action = None self.ui.actionUndo.setShortcuts(QKeySequence.Undo) self.ui.actionRedo.setShortcuts(QKeySequence.Redo) self.setContextMenuPolicy(Qt.NoContextMenu) self._torn_down = False self._purge_items_dialog = None self._purge_items_dialog_state = None self._export_items_dialog = None self._export_items_dialog_state = None self.update_commit_enabled() @property
[docs] def toolbox(self): return self.db_mngr.parent()
@property
[docs] def settings_subgroup(self): return ";".join(self.db_urls)
@property
[docs] def db_names(self): return ", ".join([f"{db_map.codename}" for db_map in self.db_maps])
@property
[docs] def first_db_map(self): return self.db_maps[0]
@property
[docs] def db_url_codenames(self): return {db_map.db_url: db_map.codename for db_map in self.db_maps}
@staticmethod
[docs] def is_db_map_editor(): """Always returns True as SpineDBEditors are truly database editors. Unless, of course, the database can one day be opened in read-only mode. In that case this method should return False. Returns: bool: Always True """ return True
[docs] def load_db_urls(self, db_url_codenames, create=False, update_history=True, window=False): self.ui.actionImport.setEnabled(False) self.ui.actionExport.setEnabled(False) self.ui.actionMass_remove_items.setEnabled(False) self.ui.actionVacuum.setEnabled(False) self.url_toolbar.reload_action.setEnabled(False) if not db_url_codenames: return True if not self.tear_down(): return False if self.db_maps: self.save_window_state() self.db_maps = [] self._changelog.clear() self._purge_change_notifiers() for url, codename in db_url_codenames.items(): db_map = self.db_mngr.get_db_map(url, self, codename=codename, create=create, window=window) if db_map is not None: self.db_maps.append(db_map) if not self.db_maps: return False self.db_urls = [db_map.db_url for db_map in self.db_maps] self.ui.actionImport.setEnabled(True) self.ui.actionExport.setEnabled(True) self.ui.actionMass_remove_items.setEnabled(True) self.ui.actionVacuum.setEnabled(any(url.startswith("sqlite") for url in self.db_urls)) self.url_toolbar.reload_action.setEnabled(True) self._change_notifiers = [ ChangeNotifier(self, self.db_mngr.undo_stack[db_map], self.qsettings, "appSettings/dbEditorShowUndo") for db_map in self.db_maps ] self.url_toolbar.set_current_urls(self.db_urls) self.db_mngr.register_listener(self, *self.db_maps) self.init_models() self.init_add_undo_redo_actions() self.setWindowTitle(f"{self.db_names}") # This sets the tab name, just in case if update_history: self.url_toolbar.add_urls_to_history(self.db_urls) self.update_last_view() self.restore_ui(self.last_view, fresh=True) self.update_commit_enabled() return True
[docs] def init_add_undo_redo_actions(self): new_undo_action = self.db_mngr.undo_action[self.first_db_map] new_redo_action = self.db_mngr.redo_action[self.first_db_map] self._replace_undo_redo_actions(new_undo_action, new_redo_action)
@Slot(bool)
[docs] def load_previous_urls(self, _=False): urls = self.url_toolbar.get_previous_urls() self.load_db_urls({url: None for url in urls}, update_history=False)
@Slot(bool)
[docs] def load_next_urls(self, _=False): urls = self.url_toolbar.get_next_urls() self.load_db_urls({url: None for url in urls}, update_history=False)
@Slot(bool)
[docs] def open_db_file(self, _=False): self.qsettings.beginGroup(self.settings_group) file_path, _ = get_open_file_name_in_last_dir( self.qsettings, "openSQLiteUrl", self, "Open SQLite file", self._get_base_dir(), "SQLite (*.sqlite)" ) self.qsettings.endGroup() if not file_path: return url = "sqlite:///" + file_path self.load_db_urls({url: None})
@Slot(bool)
[docs] def add_db_file(self, _=False): self.qsettings.beginGroup(self.settings_group) file_path, _ = get_open_file_name_in_last_dir( self.qsettings, "addSQLiteUrl", self, "Add SQLite file", self._get_base_dir(), "SQLite (*.sqlite)" ) self.qsettings.endGroup() if not file_path: return url = "sqlite:///" + file_path db_url_codenames = self.db_url_codenames db_url_codenames[url] = None self.load_db_urls(db_url_codenames)
@Slot(bool)
[docs] def create_db_file(self, _=False): self.qsettings.beginGroup(self.settings_group) file_path, _ = get_save_file_name_in_last_dir( self.qsettings, "createSQLiteUrl", self, "Create SQLite file", self._get_base_dir(), "SQLite (*.sqlite)" ) self.qsettings.endGroup() if not file_path: return try: os.remove(file_path) except OSError: pass url = "sqlite:///" + file_path self.load_db_urls({url: None}, create=True)
[docs] def reset_docs(self): """Resets the layout of the dock widgets for this URL""" self.qsettings.beginGroup(self.settings_group) self.qsettings.beginGroup(self.settings_subgroup) self.qsettings.remove("") self.qsettings.endGroup() self.qsettings.endGroup() self.last_view = None self.apply_stacked_style()
[docs] def _make_docks_menu(self): """Returns a menu with all dock toggle/view actions. Called by ``self.add_main_menu()``. Returns: QMenu """ menu = QMenu(self) reset_docs_action = QAction("Reset docs", self) reset_docs_action.triggered.connect(self.reset_docs) menu.addAction(reset_docs_action) menu.addAction(self.ui.dockWidget_entity_tree.toggleViewAction()) menu.addSeparator() menu.addAction(self.ui.dockWidget_entity_tree.toggleViewAction()) menu.addSeparator() menu.addAction(self.ui.dockWidget_parameter_value.toggleViewAction()) menu.addAction(self.ui.dockWidget_parameter_definition.toggleViewAction()) menu.addAction(self.ui.dockWidget_entity_alternative.toggleViewAction()) menu.addSeparator() menu.addAction(self.ui.dockWidget_pivot_table.toggleViewAction()) menu.addAction(self.ui.dockWidget_frozen_table.toggleViewAction()) menu.addSeparator() menu.addAction(self.ui.dockWidget_entity_graph.toggleViewAction()) menu.addSeparator() menu.addAction(self.ui.dockWidget_parameter_value_list.toggleViewAction()) menu.addAction(self.ui.alternative_dock_widget.toggleViewAction()) menu.addAction(self.ui.scenario_dock_widget.toggleViewAction()) menu.addAction(self.ui.metadata_dock_widget.toggleViewAction()) menu.addAction(self.ui.item_metadata_dock_widget.toggleViewAction()) menu.addSeparator() menu.addAction(self.ui.dockWidget_exports.toggleViewAction()) return menu
[docs] def add_main_menu(self): """Adds a menu with main actions to toolbar.""" menu = MainMenu(self) file_action = ToolBarWidgetAction("File", menu) file_action.tool_bar.addActions( [self.ui.actionNew_db_file, self.ui.actionOpen_db_file, self.ui.actionAdd_db_file] ) file_action.tool_bar.addSeparator() file_action.tool_bar.addActions([self.ui.actionImport, self.ui.actionExport, self.ui.actionExport_session]) edit_action = ToolBarWidgetAction("Edit", menu) edit_action.tool_bar.addActions([self.ui.actionUndo, self.ui.actionRedo]) edit_action.tool_bar.addSeparator() edit_action.tool_bar.addActions([self.ui.actionCopy, self.ui.actionPaste]) edit_action.tool_bar.addSeparator() edit_action.tool_bar.addActions([self.ui.actionMass_remove_items, self.ui.actionVacuum]) view_action = ToolBarWidgetAction("View", menu) view_action.tool_bar.addActions([self.ui.actionStacked_style, self.ui.actionGraph_style]) pivot_actions = self.pivot_action_group.actions() view_action.tool_bar.addActions(pivot_actions) view_action.tool_bar.add_frame(pivot_actions[0], pivot_actions[-1], "Pivot table") view_action.tool_bar.addSeparator() docks_menu_action = view_action.tool_bar.addAction(QIcon(CharIconEngine("\uf2d0")), "Doc&ks...") docks_menu = self._make_docks_menu() docks_menu_action.setMenu(docks_menu) docks_menu_button = view_action.tool_bar.widgetForAction(docks_menu_action) docks_menu_button.setPopupMode(QToolButton.ToolButtonPopupMode.InstantPopup) session_action = ToolBarWidgetAction("Session", menu) session_action.tool_bar.addActions([self.ui.actionCommit, self.ui.actionRollback]) session_action.tool_bar.addSeparator() session_action.tool_bar.addAction(self.ui.actionView_history) menu.addAction(file_action) menu.addSeparator() menu.addAction(edit_action) menu.addSeparator() menu.addAction(view_action) menu.addSeparator() menu.addAction(session_action) menu.addSeparator() menu.addAction(self.ui.actionUser_guide) menu.addAction(self.ui.actionSettings) self.ui.actionClose.setShortcut(QKeySequence.Close) menu.addAction(self.ui.actionClose) menu_action = self.url_toolbar.add_main_menu(menu) actions = [ self.ui.actionNew_db_file, self.ui.actionOpen_db_file, self.ui.actionAdd_db_file, self.ui.actionImport, self.ui.actionExport, self.ui.actionExport_session, self.ui.actionUndo, self.ui.actionRedo, self.ui.actionCopy, self.ui.actionPaste, self.ui.actionMass_remove_items, self.ui.actionVacuum, self.ui.actionStacked_style, self.ui.actionGraph_style, *docks_menu.actions(), *self.pivot_action_group.actions(), self.ui.actionCommit, self.ui.actionRollback, self.ui.actionView_history, ] for action in actions: action.triggered.connect(menu.hide) # Add actions to activate shortcuts self.addActions([menu_action, *actions])
[docs] def _browse_commits(self): browser = CommitViewer(self.qsettings, self.db_mngr, *self.db_maps, parent=self) browser.show()
[docs] def connect_signals(self): """Connects signals to slots.""" # Message signals self.msg.connect(self.add_message) self.msg_error.connect(self.err_msg.showMessage) self.db_mngr.items_added.connect(self._handle_items_added) self.db_mngr.items_updated.connect(self._handle_items_updated) self.db_mngr.items_removed.connect(self._handle_items_removed) # Menu actions self.ui.actionCommit.triggered.connect(self.commit_session) self.ui.actionRollback.triggered.connect(self.rollback_session) self.ui.actionView_history.triggered.connect(self._browse_commits) self.ui.actionNew_db_file.triggered.connect(self.create_db_file) self.ui.actionOpen_db_file.triggered.connect(self.open_db_file) self.ui.actionAdd_db_file.triggered.connect(self.add_db_file) self.ui.actionImport.triggered.connect(self.import_file) self.ui.actionExport.triggered.connect(self.show_mass_export_items_dialog) self.ui.actionExport_session.triggered.connect(self.export_session) self.ui.actionCopy.triggered.connect(self.copy) self.ui.actionPaste.triggered.connect(self.paste) self.ui.actionMass_remove_items.triggered.connect(self.show_mass_remove_items_form) self.ui.actionVacuum.triggered.connect(self.vacuum)
@Slot(bool)
[docs] def vacuum(self, _checked=False): msg = "Vacuum finished<ul>" for db_map in self.db_maps: freed, unit = vacuum(db_map.db_url) msg += f"<li>{freed} {unit} freed from {db_map.codename}</li>" msg += "</ul>" self.msg.emit(msg)
@Slot(bool)
[docs] def update_undo_redo_actions(self, _): undo_db_map = max(self.db_maps, key=lambda db_map: self.db_mngr.undo_stack[db_map].undo_age) redo_db_map = max(self.db_maps, key=lambda db_map: self.db_mngr.undo_stack[db_map].redo_age) new_undo_action = self.db_mngr.undo_action[undo_db_map] new_redo_action = self.db_mngr.redo_action[redo_db_map] self._replace_undo_redo_actions(new_undo_action, new_redo_action)
[docs] def _replace_undo_redo_actions(self, new_undo_action, new_redo_action): if new_undo_action != self.undo_action: if self.undo_action: self.ui.actionUndo.triggered.disconnect(self.undo_action.triggered) self.ui.actionUndo.triggered.connect(new_undo_action.triggered) self.undo_action = new_undo_action if new_redo_action != self.redo_action: if self.redo_action: self.ui.actionRedo.triggered.disconnect(self.redo_action.triggered) self.ui.actionRedo.triggered.connect(new_redo_action.triggered) self.redo_action = new_redo_action self._refresh_undo_redo_actions()
@Slot()
[docs] def _refresh_undo_redo_actions(self): self.ui.actionUndo.setEnabled(self.undo_action.isEnabled()) self.ui.actionUndo.setToolTip(f"<p>{self.undo_action.text()}</p><p>Ctrl+Z</p>") self.ui.actionRedo.setEnabled(self.redo_action.isEnabled()) self.ui.actionRedo.setToolTip(f"<p>{self.redo_action.text()}</p><p>Ctrl+Y</p>")
@Slot(bool)
[docs] def update_commit_enabled(self, _clean=False): dirty = any(self.db_mngr.is_dirty(db_map) for db_map in self.db_maps) self.ui.actionExport_session.setEnabled(dirty) self.ui.actionCommit.setEnabled(dirty) self.ui.actionRollback.setEnabled(dirty) self.setWindowModified(dirty) self.windowTitleChanged.emit(self.windowTitle())
[docs] def init_models(self): """Initializes models."""
@Slot(str)
[docs] def add_message(self, msg): """Pushes message to notification stack. Args: msg (str): String to show in the notification """ if self.silenced: return Notification(self, msg, corner=Qt.BottomRightCorner).show()
@Slot()
[docs] def refresh_copy_paste_actions(self): """Runs when menus are about to show. Enables or disables actions according to selection status.""" self.ui.actionCopy.setEnabled(bool(call_on_focused_widget(self, "can_copy"))) self.ui.actionPaste.setEnabled(bool(call_on_focused_widget(self, "can_paste")))
@Slot(bool)
[docs] def copy(self, checked=False): """Copies data to clipboard.""" call_on_focused_widget(self, "copy")
@Slot(bool)
[docs] def paste(self, checked=False): """Pastes data from clipboard.""" call_on_focused_widget(self, "paste")
[docs] def import_data(self, data): """Imports data to all database mappings open in the editor. Args: data (dict): data to import """ self.db_mngr.import_data({db_map: data for db_map in self.db_maps})
@Slot(bool)
[docs] def import_file(self, checked=False): """Imports file. It supports SQLite, JSON, and Excel.""" self.qsettings.beginGroup(self.settings_group) file_path, selected_filter = get_open_file_name_in_last_dir( self.qsettings, "importFileIntoDB", self, "Import file", self._get_base_dir(), "All files (*);;SQLite files (*.sqlite);;JSON files (*.json);;Excel files (*.xlsx)", ) self.qsettings.endGroup() if not file_path: # File selection cancelled return extension = os.path.splitext(file_path)[1].lower() if extension == ".json": self.import_from_json(file_path) elif extension == ".sqlite": self.import_from_sqlite(file_path) elif extension == ".xlsx": self.import_from_excel(file_path) else: self.msg_error.emit(f"Unrecognized file type {extension} - must be a .json, .sqlite, or .xlsx file")
[docs] def import_from_json(self, file_path): with open(file_path) as f: try: data = json.load(f) except json.decoder.JSONDecodeError as err: self.msg_error.emit(f"File {file_path} is not a valid json: {err}") return sanitized_data = {} for item_type, items in data.items(): try: sanitized_items = tuple( tuple(x if not isinstance(x, list) else tuple(x) for x in item) for item in items ) except TypeError: self.msg_error.emit(f"Data in {file_path} is not valid for importing.") return sanitized_data[item_type] = sanitized_items self.import_data(sanitized_data) filename = os.path.split(file_path)[1] self.msg.emit(f"File {filename} successfully imported.")
[docs] def import_from_sqlite(self, file_path): url = URL("sqlite", database=file_path) filename = os.path.split(file_path)[1] try: db_map = DatabaseMapping(url) except (SpineDBAPIError, SpineDBVersionError) as err: self.msg.emit(f"Couldn't import file {filename}: {str(err)}") return data = export_data(db_map) self.import_data(data) self.msg.emit(f"File {filename} successfully imported.")
[docs] def import_from_excel(self, file_path): filename = os.path.split(file_path)[1] try: mapped_data, errors = get_mapped_data_from_xlsx(file_path) except Exception as err: # pylint: disable=broad-except self.msg.emit(f"Couldn't import file {filename}: {str(err)}") raise err # NOTE: This is so the programmer gets to see the traceback if errors: msg = f"The following errors where found parsing {filename}:" + format_string_list(errors) self.msg_error.emit(msg) self.import_data(mapped_data) self.msg.emit(f"File {filename} successfully imported.")
@Slot(bool)
[docs] def show_mass_export_items_dialog(self, checked=False): """Shows dialog for user to select dbs and items for export.""" if self._export_items_dialog is not None: self._export_items_dialog.raise_() return self._export_items_dialog = MassExportItemsDialog( self, self.db_mngr, *self.db_maps, stored_state=self._export_items_dialog_state ) self._export_items_dialog.state_storing_requested.connect(self._store_export_settings) self._export_items_dialog.data_submitted.connect(self.mass_export_items, Qt.ConnectionType.QueuedConnection) self._export_items_dialog.destroyed.connect(self._clean_up_export_items_dialog) self._export_items_dialog.show()
@Slot(dict)
[docs] def _store_export_settings(self, state): """Stores export items dialog settings.""" self._export_items_dialog_state = state
@Slot()
[docs] def _clean_up_export_items_dialog(self): """Cleans up export items dialog.""" self._export_items_dialog = None
@Slot(bool)
[docs] def export_session(self, checked=False): """Exports changes made in the current session.""" db_map_ent_cls_ids = {db_map: db_map.dirty_ids("entity_class") for db_map in self.db_maps} db_map_ent_ids = {db_map: db_map.dirty_ids("entity") for db_map in self.db_maps} db_map_par_val_lst_ids = {db_map: db_map.dirty_ids("parameter_value_list") for db_map in self.db_maps} db_map_par_def_ids = {db_map: db_map.dirty_ids("parameter_definition") for db_map in self.db_maps} db_map_par_val_ids = {db_map: db_map.dirty_ids("parameter_value") for db_map in self.db_maps} db_map_ent_group_ids = {db_map: db_map.dirty_ids("entity_group") for db_map in self.db_maps} parcel = SpineDBParcel(self.db_mngr) parcel.push_entity_class_ids(db_map_ent_cls_ids) parcel.push_entity_ids(db_map_ent_ids) parcel.push_parameter_definition_ids(db_map_par_def_ids) parcel.push_parameter_value_ids(db_map_par_val_ids) parcel.push_parameter_value_list_ids(db_map_par_val_lst_ids) parcel.push_entity_group_ids(db_map_ent_group_ids) self.export_data(parcel.data)
@Slot(object)
[docs] def mass_export_items(self, db_map_item_types): def _ids(t, types): return Asterisk if t in types else () db_map_ent_cls_ids = {db_map: _ids("entity_class", types) for db_map, types in db_map_item_types.items()} db_map_ent_ids = {db_map: _ids("entity", types) for db_map, types in db_map_item_types.items()} db_map_par_val_lst_ids = { db_map: _ids("parameter_value_list", types) for db_map, types in db_map_item_types.items() } db_map_par_def_ids = { db_map: _ids("parameter_definition", types) for db_map, types in db_map_item_types.items() } db_map_par_val_ids = {db_map: _ids("parameter_value", types) for db_map, types in db_map_item_types.items()} db_map_ent_group_ids = {db_map: _ids("entity_group", types) for db_map, types in db_map_item_types.items()} db_map_alt_ids = {db_map: _ids("alternative", types) for db_map, types in db_map_item_types.items()} db_map_scen_ids = {db_map: _ids("scenario", types) for db_map, types in db_map_item_types.items()} db_map_scen_alt_ids = { db_map: _ids("scenario_alternative", types) for db_map, types in db_map_item_types.items() } parcel = SpineDBParcel(self.db_mngr) parcel.push_entity_class_ids(db_map_ent_cls_ids) parcel.push_entity_ids(db_map_ent_ids) parcel.push_parameter_definition_ids(db_map_par_def_ids) parcel.push_parameter_value_ids(db_map_par_val_ids) parcel.push_parameter_value_list_ids(db_map_par_val_lst_ids) parcel.push_entity_group_ids(db_map_ent_group_ids) parcel.push_alternative_ids(db_map_alt_ids) parcel.push_scenario_ids(db_map_scen_ids) parcel.push_scenario_alternative_ids(db_map_scen_alt_ids) self.export_data(parcel.data)
[docs] def duplicate_entity(self, entity_item): """ Duplicates an entity. Args: entity_item (EntityItem) """ orig_name = entity_item.name class_name = entity_item.entity_class_name existing_names = { ent["name"] for db_map in self.db_maps for ent in db_map.get_items("entity", entity_class_name=class_name) } dup_name = unique_name(orig_name, existing_names) self.db_mngr.duplicate_entity(orig_name, dup_name, class_name, entity_item.db_maps)
[docs] def duplicate_scenario(self, db_map, scen_id): """ Duplicates a scenario. Args: db_map (DiffDatabaseMapping) scen_id (int) """ orig_name = self.db_mngr.get_item(db_map, "scenario", scen_id)["name"] parcel = SpineDBParcel(self.db_mngr) parcel.full_push_scenario_ids({db_map: {scen_id}}) existing_names = {i.get("name") for i in self.db_mngr.get_items(db_map, "scenario")} dup_name = unique_name(orig_name, existing_names) self.db_mngr.duplicate_scenario(parcel.data, dup_name, db_map)
@Slot(object)
[docs] def export_data(self, db_map_ids_for_export): """Exports data from given dictionary into a file. Args: db_map_ids_for_export: Dictionary mapping db maps to keyword arguments for spinedb_api.export_data """ # noinspection PyCallByClass, PyTypeChecker, PyArgumentList self.qsettings.beginGroup(self.settings_group) file_path, file_filter = get_save_file_name_in_last_dir( self.qsettings, "exportDB", self, "Export file", self._get_base_dir(), "SQLite (*.sqlite);; JSON file (*.json);; Excel file (*.xlsx)", ) self.qsettings.endGroup() if not file_path: # File selection cancelled return self.db_mngr.export_data(self, db_map_ids_for_export, file_path, file_filter)
@Slot(bool)
[docs] def refresh_session(self, checked=False): self.db_mngr.refresh_session(*self.db_maps)
@Slot(bool)
[docs] def commit_session(self, checked=False): """Commits dirty database maps.""" dirty_db_maps = self.db_mngr.dirty(*self.db_maps) if not dirty_db_maps: return db_names = ", ".join([db_map.codename for db_map in dirty_db_maps]) commit_msg = self._get_commit_msg(db_names) if not commit_msg: return self.db_mngr.commit_session(commit_msg, *dirty_db_maps, cookie=self)
@Slot(bool)
[docs] def rollback_session(self, checked=False): """Rolls back dirty database maps.""" dirty_db_maps = self.db_mngr.dirty(*self.db_maps) if not dirty_db_maps: return db_names = ", ".join([db_map.codename for db_map in dirty_db_maps]) if not self._get_rollback_confirmation(db_names): return self.db_mngr.rollback_session(*dirty_db_maps)
[docs] def receive_session_committed(self, db_maps, cookie): db_maps = set(self.db_maps) & set(db_maps) if not db_maps: return db_names = ", ".join([x.codename for x in db_maps]) if cookie is self: msg = f"All changes in {db_names} committed successfully." self.msg.emit(msg) return # Commit done by an 'outside force'. self.init_models() self.msg.emit(f"Databases {db_names} reloaded from an external action.")
[docs] def receive_session_rolled_back(self, db_maps): db_maps = set(self.db_maps) & set(db_maps) if not db_maps: return db_names = ", ".join([x.codename for x in db_maps]) msg = f"All changes in {db_names} rolled back successfully." self.msg.emit(msg)
[docs] def receive_session_refreshed(self, db_maps): db_maps = set(self.db_maps) & set(db_maps) if not db_maps: return self.init_models() self.msg.emit("Session refreshed.")
@Slot(bool)
[docs] def show_mass_remove_items_form(self, checked=False): """Opens the purge items dialog.""" if self._purge_items_dialog is not None: self._purge_items_dialog.raise_() return self._purge_items_dialog = MassRemoveItemsDialog( self, self.db_mngr, *self.db_maps, stored_state=self._purge_items_dialog_state ) self._purge_items_dialog.state_storing_requested.connect(self._store_purge_settings) self._purge_items_dialog.destroyed.connect(self._clean_up_purge_items_dialog) self._purge_items_dialog.show()
@Slot(dict)
[docs] def _store_purge_settings(self, state): """Stores Purge items dialog state. Args: state (dict): dialog state """ self._purge_items_dialog_state = state
@Slot()
[docs] def _clean_up_purge_items_dialog(self): """Removes references to purge items dialog.""" self._purge_items_dialog = None
@busy_effect @Slot(QModelIndex)
[docs] def show_parameter_value_editor(self, index, plain=False): """Shows the parameter_value editor for the given index of given table view.""" editor = ParameterValueEditor(index, parent=self, plain=plain) editor.show()
[docs] def receive_error_msg(self, db_map_error_log): msgs = [] for db_map, error_log in db_map_error_log.items(): if isinstance(error_log, str): error_log = [error_log] msg = "From " + db_map.codename + ":" + format_string_list(error_log) msgs.append(msg) self.msg_error.emit(format_string_list(msgs))
[docs] def _update_export_enabled(self): """Updates export enabled."""
# TODO: check if db_mngr has any cache or something like that
[docs] def _log_items_change(self, msg): """Enables or disables actions and informs the user about what just happened.""" self._changelog.append(msg) self._update_export_enabled()
[docs] def _handle_items_added(self, item_type, db_map_data): count = sum(len(data) for data in db_map_data.values()) msg = f"Successfully added {count} {item_type} item(s)" self._log_items_change(msg)
[docs] def _handle_items_updated(self, item_type, db_map_data): count = sum(len(data) for data in db_map_data.values()) msg = f"Successfully updated {count} {item_type} item(s)" self._log_items_change(msg)
[docs] def _handle_items_removed(self, item_type, db_map_data): count = sum(len(data) for data in db_map_data.values()) msg = f"Successfully removed {count} {item_type} item(s)" self._log_items_change(msg)
[docs] def restore_ui(self, view_type, fresh=False): """Restores UI state from previous session. Args: view_type (str): What the selected view type is. fresh (bool): If true, the view specified with subgroup will be applied, instead of loading the previous window state of the said view. """ if fresh and view_type: # Apply the view instead of loading the window state self.last_view = None options = { "stacked": self.apply_stacked_style, "graph": self.apply_graph_style, } func = options[view_type] if view_type in options else self.apply_pivot_style func(view_type) return window_state = None if view_type: self.qsettings.beginGroup(self.settings_group) self.qsettings.beginGroup(self.settings_subgroup) self.qsettings.beginGroup(view_type) window_state = self.qsettings.value("windowState") self.qsettings.endGroup() self.qsettings.endGroup() self.qsettings.endGroup() else: # To ensure that the first time changes of a window are saved. self.last_view = "stacked" if window_state: self.restoreState(window_state, version=1) # Toolbar and dockWidget positions
[docs] def save_window_state(self): """Saves window state parameters (size, position, state) via QSettings.""" if not self.db_maps or len(self.db_urls) != 1: # Only save window sates of single db tabs return self.qsettings.beginGroup(self.settings_group) self.qsettings.beginGroup(self.settings_subgroup) self.qsettings.setValue("last_open", self.last_view) self.qsettings.beginGroup(self.last_view) self.qsettings.setValue("windowState", self.saveState(version=1)) self.qsettings.endGroup() self.qsettings.endGroup() self.qsettings.endGroup()
[docs] def tear_down(self): """Performs clean up duties. Returns: bool: True if editor is ready to close, False otherwise """ dirty_db_maps = self.db_mngr.dirty_and_without_editors(self, *self.db_maps) commit_dirty = False commit_msg = "" if dirty_db_maps: answer = self._prompt_to_commit_changes() if answer == QMessageBox.StandardButton.Cancel: return False db_names = ", ".join([db_map.codename for db_map in dirty_db_maps]) if answer == QMessageBox.StandardButton.Save: commit_dirty = True commit_msg = self._get_commit_msg(db_names) if not commit_msg: return False self._purge_change_notifiers() self._torn_down = True failed_db_maps = self.db_mngr.unregister_listener( self, *self.db_maps, dirty_db_maps=dirty_db_maps, commit_dirty=commit_dirty, commit_msg=commit_msg ) if failed_db_maps: msg = f"Failed to commit {[db_map.codename for db_map in failed_db_maps]}" self.db_mngr.receive_error_msg({i: [msg] for i in failed_db_maps}) return False return True
[docs] def _prompt_to_commit_changes(self): """Prompts the user to commit or rollback changes to 'dirty' db maps. Returns: int: QMessageBox status code """ commit_at_exit = int(self.qsettings.value("appSettings/commitAtExit", defaultValue="1")) if commit_at_exit == 0: # Don't commit session and don't show message box return QMessageBox.StandardButton.Discard if commit_at_exit == 1: # Default # Show message box msg = QMessageBox(self) msg.setIcon(QMessageBox.Icon.Question) msg.setWindowTitle(self.windowTitle()) msg.setText("The current session has uncommitted changes. Do you want to commit them now?") msg.setStandardButtons( QMessageBox.StandardButton.Save | QMessageBox.StandardButton.Discard | QMessageBox.StandardButton.Cancel ) msg.button(QMessageBox.StandardButton.Save).setText("Commit and close ") msg.button(QMessageBox.StandardButton.Discard).setText("Discard changes and close") chkbox = QCheckBox() chkbox.setText("Do not ask me again") msg.setCheckBox(chkbox) answer = msg.exec() if answer != QMessageBox.StandardButton.Cancel and chkbox.checkState() == 2: # Save preference preference = "2" if answer == QMessageBox.StandardButton.Save else "0" self.qsettings.setValue("appSettings/commitAtExit", preference) return answer if commit_at_exit == 2: # Commit session and don't show message box return QMessageBox.StandardButton.Save
[docs] def _get_commit_msg(self, db_names): """Prompts user for commit message. Args: db_names (Iterable of str): database names Returns: str: commit message """ dialog = CommitDialog(self, db_names) answer = dialog.exec() if answer == QDialog.DialogCode.Accepted: return dialog.commit_msg
[docs] def _get_rollback_confirmation(self, db_names): """Prompts user for confirmation before rolling back the session. Args: db_names (Iterable of str): database names Returns: bool: True if user confirmed, False otherwise """ message_box = QMessageBox( QMessageBox.Icon.Question, f"Rollback changes in {db_names}", "Are you sure? " "All your changes since the last commit will be reverted and removed from the undo/redo stack.", QMessageBox.StandardButton.Ok | QMessageBox.StandardButton.Cancel, parent=self, ) message_box.button(QMessageBox.StandardButton.Ok).setText("Rollback") answer = message_box.exec() return answer == QMessageBox.StandardButton.Ok
[docs] def _purge_change_notifiers(self): """Tears down change notifiers.""" while self._change_notifiers: notifier = self._change_notifiers.pop(0) notifier.tear_down() notifier.deleteLater()
[docs] def closeEvent(self, event): """Handle close window. Args: event (QCloseEvent): Closing event """ if not self.tear_down(): event.ignore() return self.save_window_state() super().closeEvent(event)
@staticmethod
[docs] def _get_base_dir(): return APPLICATION_PATH
[docs]class SpineDBEditor(TabularViewMixin, GraphViewMixin, StackedViewMixin, TreeViewMixin, SpineDBEditorBase): """A widget to visualize Spine dbs."""
[docs] pinned_values_updated = Signal(list)
def __init__(self, db_mngr, db_url_codenames=None): """Initializes everything. Args: db_mngr (SpineDBManager): The manager to use """ super().__init__(db_mngr) self._original_size = None self._metadata_editor = MetadataEditor(self.ui.metadata_table_view, self, db_mngr) self._item_metadata_editor = ItemMetadataEditor( self.ui.item_metadata_table_view, self, self._metadata_editor, db_mngr ) self._dock_views = {d: d.findChild(QAbstractScrollArea) for d in self.findChildren(QDockWidget)} self._timer_refresh_tab_order = QTimer(self) # Used to limit refresh self._timer_refresh_tab_order.setSingleShot(True) self.add_main_menu() self.connect_signals() self.last_view = None self.apply_stacked_style() if db_url_codenames is not None: self.load_db_urls(db_url_codenames)
[docs] def emit_pinned_values_updated(self): self.pinned_values_updated.emit(self.ui.tableView_parameter_value.pinned_values)
[docs] def connect_signals(self): super().connect_signals() self._metadata_editor.connect_signals(self.ui) self._item_metadata_editor.connect_signals(self.ui) self.ui.actionStacked_style.triggered.connect(self.apply_stacked_style) self.ui.actionGraph_style.triggered.connect(self.apply_graph_style) self.pivot_action_group.triggered.connect(self.apply_pivot_style) for dock in self._dock_views: dock.visibilityChanged.connect(self._restart_timer_refresh_tab_order)
[docs] def init_models(self): super().init_models() self._metadata_editor.init_models(self.db_maps) self._item_metadata_editor.init_models(self.db_maps)
@Slot(bool)
[docs] def _restart_timer_refresh_tab_order(self, _visible=False): if self._torn_down: return self._timer_refresh_tab_order.timeout.connect(self._refresh_tab_order, Qt.UniqueConnection) self._timer_refresh_tab_order.start(100)
[docs] def _refresh_tab_order(self): if self._torn_down: return self._timer_refresh_tab_order.timeout.disconnect(self._refresh_tab_order) visible_docks = [] for dock, view in self._dock_views.items(): if view is None: continue if dock.pos().x() >= 0 and not dock.isFloating(): visible_docks.append(dock) view.setFocusPolicy(Qt.StrongFocus) else: view.setFocusPolicy(Qt.ClickFocus) if not visible_docks: return sorted_docks = sorted(visible_docks, key=lambda d: (d.pos().x(), d.pos().y())) tab_bars = {} for tab_bar in self.findChildren(QTabBar): i = tab_bar.currentIndex() if i != -1: tab_bars[tab_bar.tabText(i)] = tab_bar sorted_widgets = [] for dock in sorted_docks: sorted_widgets.append(self._dock_views[dock]) tab_bar = tab_bars.get(dock.windowTitle()) if tab_bar is not None: sorted_widgets.append(tab_bar) self.setTabOrder(self.url_toolbar.line_edit, sorted_widgets[0]) for first, second in zip(sorted_widgets[:-1], sorted_widgets[1:]): self.setTabOrder(first, second)
[docs] def tabify_and_raise(self, docks): """ Tabifies docks in given list, then raises the first. Args: docks (list) """ for first, second in zip(docks[:-1], docks[1:]): self.tabifyDockWidget(first, second) docks[0].raise_()
[docs] def restore_dock_widgets(self): """Docks all floating and or hidden QDockWidgets back to the window.""" for dock in self._dock_views: dock.setFloating(False) dock.setVisible(True) self.addDockWidget(Qt.RightDockWidgetArea, dock)
[docs] def update_last_view(self): self.qsettings.beginGroup(self.settings_group) self.qsettings.beginGroup(self.settings_subgroup) last_view = self.qsettings.value("last_open") self.last_view = last_view self.qsettings.endGroup() self.qsettings.endGroup()
[docs] def begin_style_change(self): """Begins a style change operation.""" self._original_size = self.size() self.restore_dock_widgets()
[docs] def end_style_change(self): """Ends a style change operation.""" for tab_bar in self.children(): # This is a workaround to hide a rogue tab bar that sometimes shows as a single gray line # somewhere in the editor window when closing the Object parameter value dock and/or switching # between Table view and the other views. # This could be caused by a bug in Qt but was still present in PySide6 6.5.0. # See issue #2091 for more information. if not isinstance(tab_bar, QTabBar): continue if tab_bar.count() == 0 and tab_bar.isVisible(): tab_bar.hide() qApp.processEvents() # pylint: disable=undefined-variable self.ui.dockWidget_exports.hide() self.resize(self._original_size)
@Slot(object)
[docs] def apply_stacked_style(self, _checked=None): """Applies the stacked style, inspired in the former tree view.""" if self.last_view: self.save_window_state() self.last_view = "stacked" self.begin_style_change() self.splitDockWidget( self.ui.dockWidget_entity_tree, self.ui.dockWidget_parameter_value, Qt.Orientation.Horizontal ) self.splitDockWidget( self.ui.dockWidget_parameter_value, self.ui.alternative_dock_widget, Qt.Orientation.Horizontal ) self._finish_stacked_style() self.ui.dockWidget_entity_graph.hide() self.end_style_change() self.restore_ui(self.last_view)
[docs] def _finish_stacked_style(self): # right-side self.splitDockWidget(self.ui.alternative_dock_widget, self.ui.scenario_dock_widget, Qt.Orientation.Vertical) self.splitDockWidget( self.ui.scenario_dock_widget, self.ui.dockWidget_parameter_value_list, Qt.Orientation.Vertical ) self.tabify_and_raise([self.ui.dockWidget_parameter_value_list, self.ui.metadata_dock_widget]) self.tabify_and_raise([self.ui.metadata_dock_widget, self.ui.item_metadata_dock_widget]) self.ui.dockWidget_parameter_value_list.raise_() # center self.tabify_and_raise( [ self.ui.dockWidget_parameter_value, self.ui.dockWidget_parameter_definition, self.ui.dockWidget_entity_alternative, ] ) self.ui.dockWidget_pivot_table.hide() self.ui.dockWidget_frozen_table.hide() docks = [self.ui.dockWidget_entity_tree, self.ui.dockWidget_parameter_value, self.ui.alternative_dock_widget] width = sum(d.size().width() for d in docks) self.resizeDocks(docks, [0.3 * width, 0.5 * width, 0.2 * width], Qt.Orientation.Horizontal)
@Slot(object)
[docs] def apply_pivot_style(self, _checked=None): """Applies the pivot style, inspired in the former tabular view.""" if self.last_view: self.save_window_state() self.last_view = _checked if isinstance(_checked, str) else _checked.text() self.current_input_type = self.last_view self.begin_style_change() self.splitDockWidget(self.ui.dockWidget_entity_tree, self.ui.dockWidget_pivot_table, Qt.Orientation.Horizontal) self.splitDockWidget(self.ui.dockWidget_pivot_table, self.ui.dockWidget_frozen_table, Qt.Orientation.Horizontal) self.splitDockWidget(self.ui.dockWidget_frozen_table, self.ui.alternative_dock_widget, Qt.Orientation.Vertical) self.splitDockWidget(self.ui.alternative_dock_widget, self.ui.scenario_dock_widget, Qt.Orientation.Vertical) self.splitDockWidget( self.ui.scenario_dock_widget, self.ui.dockWidget_parameter_value_list, Qt.Orientation.Vertical ) self.ui.dockWidget_entity_graph.hide() self.ui.dockWidget_parameter_value.hide() self.ui.dockWidget_parameter_definition.hide() self.ui.dockWidget_entity_alternative.hide() self.ui.metadata_dock_widget.hide() self.ui.item_metadata_dock_widget.hide() docks = [self.ui.dockWidget_entity_tree, self.ui.dockWidget_pivot_table, self.ui.dockWidget_frozen_table] width = sum(d.size().width() for d in docks) self.resizeDocks(docks, [0.2 * width, 0.65 * width, 0.15 * width], Qt.Orientation.Horizontal) self.end_style_change() self.restore_ui(self.last_view)
@Slot(object)
[docs] def apply_graph_style(self, _checked=None): """Applies the graph style, inspired in the former graph view.""" if self.last_view: self.save_window_state() self.last_view = "graph" self.begin_style_change() self.splitDockWidget(self.ui.dockWidget_entity_tree, self.ui.dockWidget_entity_graph, Qt.Orientation.Horizontal) self.splitDockWidget( self.ui.dockWidget_entity_graph, self.ui.alternative_dock_widget, Qt.Orientation.Horizontal ) self.splitDockWidget( self.ui.dockWidget_entity_graph, self.ui.dockWidget_parameter_value, Qt.Orientation.Vertical ) self._finish_stacked_style() self.ui.dockWidget_entity_graph.show() docks = [self.ui.dockWidget_entity_graph, self.ui.dockWidget_parameter_value] height = sum(d.size().height() for d in docks) self.resizeDocks(docks, [0.7 * height, 0.3 * height], Qt.Orientation.Vertical) self.end_style_change() self.restore_ui(self.last_view) self.ui.graphicsView.reset_zoom()
@staticmethod
[docs] def _get_base_dir(): return APPLICATION_PATH