######################################################################################################################
# Copyright (C) 2017-2022 Spine project consortium
# Copyright Spine Toolbox contributors
# This file is part of Spine Toolbox.
# Spine Toolbox is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option)
# any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
# Public License for more details. You should have received a copy of the GNU Lesser General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
######################################################################################################################
"""The SpineDBManager class."""
import os
import json
from PySide6.QtCore import Qt, QObject, Signal, Slot
from PySide6.QtWidgets import QApplication, QMessageBox, QWidget
from PySide6.QtGui import QWindow
from sqlalchemy.engine.url import URL
from spinedb_api import (
Array,
Asterisk,
create_new_spine_database,
DatabaseMapping,
export_data,
from_database,
get_data_for_import,
IndexedValue,
import_data,
is_empty,
Map,
ParameterValueFormatError,
relativedelta_to_duration,
SpineDBAPIError,
TimePattern,
TimeSeries,
TimeSeriesFixedResolution,
TimeSeriesVariableResolution,
to_database,
)
from spinedb_api.parameter_value import deep_copy_value, load_db_value, dump_db_value
from spinedb_api.parameter_value import join_value_and_type, split_value_and_type
from spinedb_api.helpers import remove_credentials_from_url
from spinedb_api.spine_io.exporters.excel import export_spine_database_to_xlsx
from .spine_db_icon_manager import SpineDBIconManager
from .spine_db_worker import SpineDBWorker
from .spine_db_commands import (
AgedUndoStack,
AddItemsCommand,
UpdateItemsCommand,
AddUpdateItemsCommand,
RemoveItemsCommand,
)
from .mvcmodels.shared import PARSED_ROLE
from .widgets.options_dialog import OptionsDialog
from .spine_db_editor.widgets.multi_spine_db_editor import MultiSpineDBEditor
from .helpers import busy_effect, plain_to_tool_tip
@busy_effect
[docs]def do_create_new_spine_database(url):
"""Creates a new spine database at the given url."""
create_new_spine_database(url)
[docs]class SpineDBManager(QObject):
"""Class to manage DBs within a project."""
[docs] error_msg = Signal(object)
# Data changed signals
[docs] items_added = Signal(str, dict)
"""Emitted whenever items are added to a DB.
Args:
str: item type, such as "object_class"
dict: mapping DatabaseMapping to list of added dict-items.
"""
[docs] items_updated = Signal(str, dict)
"""Emitted whenever items are updated in a DB.
Args:
str: item type, such as "object_class"
dict: mapping DatabaseMapping to list of updated dict-items.
"""
[docs] items_removed = Signal(str, dict)
"""Emitted whenever items are removed from a DB.
Args:
str: item type, such as "object_class"
dict: mapping DatabaseMapping to list of updated dict-items.
"""
def __init__(self, settings, parent, synchronous=False):
"""
Args:
settings (QSettings): Toolbox settings
parent (QObject, optional): parent object
synchronous (bool): If True, fetch database synchronously
"""
super().__init__(parent)
self.qsettings = settings
self._db_maps = {}
self._workers = {}
self.listeners = dict()
self.undo_stack = {}
self.undo_action = {}
self.redo_action = {}
self._icon_mngr = {}
self._connect_signals()
self._cmd_id = 0
self._synchronous = synchronous
self.data_stores = dict()
[docs] def _connect_signals(self):
self.error_msg.connect(self.receive_error_msg)
qApp.aboutToQuit.connect(self.clean_up) # pylint: disable=undefined-variable
@Slot(object)
[docs] def receive_error_msg(self, db_map_error_log):
for db_map, error_log in db_map_error_log.items():
for listener in self.listeners.get(db_map, ()):
listener.receive_error_msg({db_map: error_log})
@Slot(set)
[docs] def receive_session_refreshed(self, db_maps):
for db_map in db_maps:
for listener in self.listeners.get(db_map, ()):
try:
listener.receive_session_refreshed([db_map])
except AttributeError:
pass
@Slot(set, object)
[docs] def receive_session_committed(self, db_maps, cookie):
for db_map in db_maps:
for listener in self.listeners.get(db_map, ()):
try:
listener.receive_session_committed([db_map], cookie)
except AttributeError:
pass
@Slot(set)
[docs] def receive_session_rolled_back(self, db_maps):
for db_map in db_maps:
for listener in self.listeners.get(db_map, ()):
try:
listener.receive_session_rolled_back([db_map])
except AttributeError:
pass
[docs] def _get_worker(self, db_map):
"""Returns a worker.
Args:
db_map (DatabaseMapping): database mapping
Returns:
SpineDBWorker: worker for the db_map
"""
return self._workers[db_map]
[docs] def register_fetch_parent(self, db_map, parent):
"""Registers a fetch parent.
Args:
db_map (DatabaseMapping): target database mapping
parent (FetchParent): fetch parent
"""
if db_map.closed:
return
try:
worker = self._get_worker(db_map)
except KeyError:
return
worker.register_fetch_parent(parent)
[docs] def can_fetch_more(self, db_map, parent):
"""Whether or not we can fetch more items of given type from given db.
Args:
db_map (DatabaseMapping)
parent (FetchParent): The object that requests the fetching and that might want to react to further DB
modifications.
Returns:
bool
"""
if db_map.closed:
return False
try:
worker = self._get_worker(db_map)
except KeyError:
return False
return worker.can_fetch_more(parent)
[docs] def fetch_more(self, db_map, parent):
"""Fetches more items of given type from given db.
Args:
db_map (DatabaseMapping)
parent (FetchParent): The object that requests the fetching.
"""
if db_map.closed:
return
try:
worker = self._get_worker(db_map)
except KeyError:
return
worker.fetch_more(parent)
[docs] def get_icon_mngr(self, db_map):
"""Returns an icon manager for given db_map.
Args:
db_map (DatabaseMapping)
Returns:
SpineDBIconManager
"""
if db_map not in self._icon_mngr:
self._icon_mngr[db_map] = SpineDBIconManager()
return self._icon_mngr[db_map]
[docs] def update_icons(self, db_map, item_type, items):
"""Runs when items are added or updated. Setups icons."""
if item_type == "entity_class":
self.get_icon_mngr(db_map).update_icon_caches(items)
@property
[docs] def db_maps(self):
return set(self._db_maps.values())
@staticmethod
[docs] def db_map_key(db_map):
"""Creates an identifier for given db_map.
Args:
db_map (DatabaseMapping): database mapping
Returns:
int: identification key
"""
return str(hash(db_map))
[docs] def db_map_from_key(self, key):
"""Returns database mapping that corresponds to given identification key.
Args:
key (int): identification key
Returns:
DatabaseMapping: database mapping
Raises:
KeyError: raised if database map is not found
"""
return {self.db_map_key(db_map): db_map for db_map in self._db_maps.values()}[key]
@property
[docs] def db_urls(self):
return set(self._db_maps)
[docs] def db_map(self, url):
"""
Returns a database mapping for given URL.
Args:
url (str): a database URL
Returns:
DatabaseMapping: a database map or None if not found
"""
url = str(url)
return self._db_maps.get(url)
[docs] def create_new_spine_database(self, url, logger, overwrite=False):
if not overwrite and not is_empty(url):
msg = QMessageBox(qApp.activeWindow()) # pylint: disable=undefined-variable
msg.setIcon(QMessageBox.Icon.Question)
msg.setWindowTitle("Database not empty")
msg.setText(f"The URL <b>{remove_credentials_from_url(url)}</b> points to an existing database.")
msg.setInformativeText("Do you want to overwrite it?")
overwrite_button = msg.addButton("Overwrite", QMessageBox.ButtonRole.AcceptRole)
msg.addButton("Cancel", QMessageBox.ButtonRole.RejectRole)
msg.exec()
# We have custom buttons, exec() returns an opaque value.
# Let's check the clicked button explicitly instead.
clicked_button = msg.clickedButton()
if clicked_button is not overwrite_button:
return
try:
do_create_new_spine_database(url)
logger.msg_success.emit(f"New Spine db successfully created at '{url}'.")
db_map = self.db_map(url)
self.refresh_session(db_map)
except SpineDBAPIError as e:
logger.msg_error.emit(f"Unable to create new Spine db at '{url}': {e}.")
[docs] def close_session(self, url):
"""Pops any db map on the given url and closes its connection.
Args:
url (str)
"""
db_map = self._db_maps.pop(url, None)
if db_map is None:
return
worker = self._workers.pop(db_map, None)
if worker is not None:
worker.close_db_map() # NOTE: This calls ThreadPoolExecutor.shutdown() which waits for Futures to finish
worker.clean_up()
del self.undo_stack[db_map]
del self.undo_action[db_map]
del self.redo_action[db_map]
[docs] def close_all_sessions(self):
"""Closes connections to all database mappings."""
for url in list(self._db_maps):
self.close_session(url)
[docs] def get_db_map(self, url, logger, ignore_version_error=False, window=False, codename=None, create=False):
"""Returns a DatabaseMapping instance from url if possible, None otherwise.
If needed, asks the user to upgrade to the latest db version.
Args:
url (str, URL)
logger (LoggerInterface)
ignore_version_error (bool, optional)
window (bool, optional)
codename (str, NoneType, optional)
create (bool, optional)
Returns:
DatabaseMapping, NoneType
"""
url = str(url)
db_map = self._db_maps.get(url)
if db_map is not None:
if not window and codename is not None and db_map.codename != codename:
return None
return db_map
try:
prompt_data = DatabaseMapping.get_upgrade_db_prompt_data(url, create=create)
except SpineDBAPIError as err:
logger.msg_error.emit(err.msg)
return None
if prompt_data is not None:
if ignore_version_error:
return None
title, text, option_to_kwargs, notes, preferred = prompt_data
kwargs = OptionsDialog.get_answer(
self.parent(), title, text, option_to_kwargs, notes=notes, preferred=preferred
)
if kwargs is None:
return None
else:
kwargs = {}
kwargs.update(codename=codename, create=create)
try:
return self._do_get_db_map(url, **kwargs)
except SpineDBAPIError as err:
logger.msg_error.emit(err.msg)
return None
@busy_effect
[docs] def _do_get_db_map(self, url, **kwargs):
"""Returns a memorized DatabaseMapping instance from url.
Called by `get_db_map`.
Args:
url (str, URL)
codename (str, NoneType)
upgrade (bool)
create (bool)
Returns:
DatabaseMapping
"""
worker = SpineDBWorker(self, url, synchronous=self._synchronous)
try:
db_map = worker.get_db_map(**kwargs)
except Exception as error:
worker.clean_up()
raise error
self._workers[db_map] = worker
self._db_maps[url] = db_map
stack = self.undo_stack[db_map] = AgedUndoStack(self)
self.undo_action[db_map] = stack.createUndoAction(self)
self.redo_action[db_map] = stack.createRedoAction(self)
return db_map
[docs] def query(self, db_map, sq_name):
"""For tests."""
return self._get_worker(db_map).query(sq_name)
[docs] def add_db_map_listener(self, db_map, listener):
"""Adds listener for given db_map."""
self.listeners.setdefault(db_map, set()).add(listener)
[docs] def remove_db_map_listener(self, db_map, listener):
"""Removes db_map from the maps listener listens to."""
listeners = self.listeners.get(db_map, set())
listeners.discard(listener)
if not listeners:
self.listeners.pop(db_map)
[docs] def db_map_listeners(self, db_map):
return self.listeners.get(db_map, set())
[docs] def register_listener(self, listener, *db_maps):
"""Register given listener for all given db_map's signals.
Args:
listener (object)
*db_maps
"""
self.update_data_store_db_maps()
for db_map in db_maps:
self.add_db_map_listener(db_map, listener)
stack = self.undo_stack[db_map]
try:
stack.canRedoChanged.connect(listener.update_undo_redo_actions)
stack.canUndoChanged.connect(listener.update_undo_redo_actions)
stack.cleanChanged.connect(listener.update_commit_enabled)
stores = self.data_stores.get(db_map)
if not stores:
continue
for store in stores:
self.undo_stack[db_map].cleanChanged.connect(store.notify_about_dirtiness)
except AttributeError:
pass
[docs] def unregister_listener(self, listener, *db_maps, dirty_db_maps=None, commit_dirty=False, commit_msg=""):
"""Unregisters given listener from given db_map signals.
If any of the db_maps becomes an orphan and is dirty, prompts user to commit or rollback.
Args:
listener (object)
*db_maps
commit_dirty (bool): True to commit dirty database mapping, False to roll back
commit_msg (str): commit message
Returns:
failed_db_maps (list): All the db maps that failed to commit
"""
failed_db_maps = list()
for db_map in db_maps:
self.remove_db_map_listener(db_map, listener)
try:
self.undo_stack[db_map].canRedoChanged.disconnect(listener.update_undo_redo_actions)
self.undo_stack[db_map].canUndoChanged.disconnect(listener.update_undo_redo_actions)
self.undo_stack[db_map].cleanChanged.disconnect(listener.update_commit_enabled)
if not self.db_map_listeners(db_map) == 1:
# Unregister the dirty listeners for every data store ONLY when the last editor
# for that db map is closed. This way the dirtiness state of a data store that is
# not open in a db editor can still be affected.
continue
stores = self.data_stores.get(db_map)
if not stores:
continue
for store in stores:
self.undo_stack[db_map].cleanChanged.disconnect(store.notify_about_dirtiness)
store.notify_about_dirtiness(True)
except AttributeError:
pass
if dirty_db_maps:
if commit_dirty:
failed_db_maps += self.commit_session(commit_msg, *dirty_db_maps)
else:
self.rollback_session(*dirty_db_maps)
# If some db maps failed to commit, reinstate their listeners
for db_map in failed_db_maps:
self.add_db_map_listener(db_map, listener)
try:
self.undo_stack[db_map].canRedoChanged.connect(listener.update_undo_redo_actions)
self.undo_stack[db_map].canUndoChanged.connect(listener.update_undo_redo_actions)
self.undo_stack[db_map].cleanChanged.connect(listener.update_commit_enabled)
stores = self.data_stores.get(db_map)
if not stores:
continue
for store in stores:
self.undo_stack[db_map].cleanChanged.connect(store.notify_about_dirtiness)
except AttributeError:
pass
for db_map in db_maps:
if not self.db_map_listeners(db_map):
self.close_session(db_map.db_url)
return failed_db_maps
[docs] def add_data_store_db_map(self, db_map, store):
"""Adds a Data Store instance under a db_map into memory"""
if db_map in self.data_stores:
self.data_stores[db_map].append(store)
else:
self.data_stores[db_map] = [store]
[docs] def remove_data_store_db_map(self, db_map, store):
"""Removes a Data Store instance from memory"""
if self.data_stores.get(db_map):
self.data_stores[db_map].remove(store)
[docs] def update_data_store_db_maps(self):
"""Updates the db maps of the dict that maps db_maps to Data Stores"""
new_data_stores = {}
old_stores = []
for store in self.data_stores.values():
old_stores.extend(store)
for store in old_stores:
db_map = store.get_db_map_for_ds()
if db_map in new_data_stores:
new_data_stores[db_map].append(store)
else:
new_data_stores[db_map] = [store]
self.data_stores = new_data_stores
[docs] def is_dirty(self, db_map):
"""Returns True if mapping has pending changes.
Args:
db_map (DatabaseMapping): database mapping
Returns:
bool: True if db_map has pending changes, False otherwise
"""
return not self.undo_stack[db_map].isClean()
[docs] def dirty(self, *db_maps):
"""Filters clean mappings from given database maps.
Args:
*db_maps: mappings to check
Return:
list of DatabaseMapping: dirty mappings
"""
return [db_map for db_map in db_maps if self.is_dirty(db_map)]
[docs] def dirty_and_without_editors(self, listener, *db_maps):
"""Checks which of the given database mappings are dirty and have no editors.
Args:
listener (Any): a listener object
*db_maps: mappings to check
Return:
list of DatabaseMapping: mappings that are dirty and don't have editors
"""
def has_editors(db_map):
return any(
hasattr(x, "is_db_map_editor") and x.is_db_map_editor()
for x in self.db_map_listeners(db_map) - {listener}
)
return [db_map for db_map in self.dirty(*db_maps) if not has_editors(db_map)]
[docs] def clean_up(self):
while self._workers:
_, worker = self._workers.popitem()
worker.clean_up()
self.deleteLater()
[docs] def refresh_session(self, *db_maps):
refreshed_db_maps = set()
for db_map in db_maps:
try:
worker = self._get_worker(db_map)
except KeyError:
continue
worker.refresh_session()
refreshed_db_maps.add(db_map)
self.receive_session_refreshed(refreshed_db_maps)
[docs] def reset_session(self, *db_maps):
for db_map in db_maps:
try:
worker = self._get_worker(db_map)
except KeyError:
continue
worker.reset_session()
self.undo_stack[db_map].clear()
[docs] def commit_session(self, commit_msg, *dirty_db_maps, cookie=None):
"""
Commits the current session.
Args:
commit_msg (str): commit message for all database maps
*dirty_db_maps: dirty database maps to commit
cookie (object, optional): a free form identifier which will be forwarded to ``SpineDBWorker.commit_session``
"""
failed_db_maps = []
for db_map in dirty_db_maps:
success = self._do_commit_session(db_map, commit_msg, cookie)
if not success:
failed_db_maps.append(db_map)
continue
return failed_db_maps
[docs] def _do_commit_session(self, db_map, commit_msg, cookie=None):
"""Commits session for given db.
Args:
db_map (DatabaseMapping): db map
commit_msg (str): commit message
cookie (Any): a cookie to include in receive_session_committed call
Returns:
bool
"""
try:
transformations, info = db_map.commit_session(commit_msg, apply_compatibility_transforms=False)
self.undo_stack[db_map].setClean()
if info:
info = "".join(f"- {x}\n" for x in info)
QMessageBox.warning(
QApplication.activeWindow(),
"Your data needs to be refitted",
f"Some of the data committed to the DB at '{db_map.db_url}' "
"uses an old format and needs to be refitted. "
f"The following transformations will be applied:\n\n{info}\n"
"Afterwards, you can review the changes "
"and either commit or rollback.",
buttons=QMessageBox.StandardButton.Apply,
)
identifier = self.get_command_identifier()
for tablename, (items_to_add, items_to_update, ids_to_remove) in transformations:
self.remove_items({db_map: {tablename: ids_to_remove}}, identifier=identifier)
self.update_items(tablename, {db_map: items_to_update}, identifier=identifier)
self.add_items(tablename, {db_map: items_to_add}, identifier=identifier)
self.receive_session_committed({db_map}, cookie)
return True
except SpineDBAPIError as err:
self.error_msg.emit({db_map: [err.msg]})
return False
[docs] def notify_session_committed(self, cookie, *db_maps):
"""Notifies manager and listeners when a commit has taken place by a third party.
Args:
cookie (Any): commit cookie
*db_maps: database maps that were committed
"""
self.reset_session(*db_maps)
self.receive_session_committed(set(db_maps), cookie)
[docs] def rollback_session(self, *dirty_db_maps):
"""Rolls back the current session.
Args:
*dirty_db_maps: dirty database maps to commit
"""
for db_map in dirty_db_maps:
self._do_rollback_session(db_map)
[docs] def _do_rollback_session(self, db_map):
"""Rolls back session for given db.
Args:
db_map (DatabaseMapping): db map
"""
try:
db_map.rollback_session()
self.undo_stack[db_map].clear()
self.receive_session_rolled_back({db_map})
except SpineDBAPIError as err:
self.error_msg.emit({db_map: [err.msg]})
[docs] def entity_class_renderer(self, db_map, entity_class_id, for_group=False, color=None):
"""Returns an icon renderer for a given entity class.
Args:
db_map (DatabaseMapping): database map
entity_class_id (int): entity class id
for_group (bool): if True, return the group object icon instead
color (QColor, optional): icon color
Returns:
QSvgRenderer: requested renderer or None if no entity class was found
"""
entity_class = self.get_item(db_map, "entity_class", entity_class_id)
if not entity_class:
return None
if for_group:
return self.get_icon_mngr(db_map).group_renderer(entity_class)
if color is not None:
color_code = int(color.rgba())
return self.get_icon_mngr(db_map).color_class_renderer(entity_class, color_code)
return self.get_icon_mngr(db_map).class_renderer(entity_class)
[docs] def entity_class_icon(self, db_map, entity_class_id, for_group=False):
"""Returns an appropriate icon for a given entity class.
Args:
db_map (DatabaseMapping): database map
entity_class_id (int): entity class' id
for_group (bool): if True, return the group object icon instead
Returns:
QIcon: requested icon or None if no entity class was found
"""
renderer = self.entity_class_renderer(db_map, entity_class_id, for_group=for_group)
return SpineDBIconManager.icon_from_renderer(renderer) if renderer is not None else None
@staticmethod
[docs] def get_item(db_map, item_type, id_):
"""Returns the item of the given type in the given db map that has the given id,
or an empty dict if not found.
Args:
db_map (DatabaseMapping)
item_type (str)
id_ (TempId)
Returns:
PublicItem: the item
"""
mapped_table = db_map.mapped_table(item_type)
item = mapped_table.find_item_by_id(id_)
if hasattr(item, 'public_item'):
return item.public_item
else:
return item
[docs] def get_field(self, db_map, item_type, id_, field):
return self.get_item(db_map, item_type, id_).get(field)
@staticmethod
[docs] def get_items(db_map, item_type):
"""Returns a list of the items of the given type in the given db map.
Args:
db_map (DatabaseMapping)
item_type (str)
Returns:
list
"""
return db_map.get_items(item_type)
[docs] def get_items_by_field(self, db_map, item_type, field, value):
"""Returns a list of items of the given type in the given db map that have the given value
for the given field.
Args:
db_map (DatabaseMapping)
item_type (str)
field (str)
value
Returns:
list
"""
return [x for x in self.get_items(db_map, item_type) if x.get(field) == value]
[docs] def get_item_by_field(self, db_map, item_type, field, value):
"""Returns the first item of the given type in the given db map
that has the given value for the given field
Returns an empty dictionary if none found.
Args:
db_map (DatabaseMapping)
item_type (str)
field (str)
value
Returns:
dict
"""
return next(iter(self.get_items_by_field(db_map, item_type, field, value)), {})
@staticmethod
[docs] def display_data_from_parsed(parsed_data):
"""Returns the value's database representation formatted for Qt.ItemDataRole.DisplayRole."""
if isinstance(parsed_data, TimeSeries):
display_data = "Time series"
elif isinstance(parsed_data, Map):
display_data = "Map"
elif isinstance(parsed_data, Array):
display_data = "Array"
elif isinstance(parsed_data, TimePattern):
display_data = "Time pattern"
elif isinstance(parsed_data, ParameterValueFormatError):
display_data = "Error"
else:
display_data = str(parsed_data)
return display_data
@staticmethod
[docs] def get_value(self, db_map, item, role=Qt.ItemDataRole.DisplayRole):
"""Returns the value or default value of a parameter.
Args:
db_map (DatabaseMapping): database mapping
item (PublicItem): parameter value item, parameter definition item, or list value item
role (Qt.ItemDataRole): data role
Returns:
any
"""
if not item:
return None
value_field, type_field = {
"parameter_value": ("value", "type"),
"list_value": ("value", "type"),
"parameter_definition": ("default_value", "default_type"),
}[item.item_type]
list_value_id = item["id"] if item.item_type == "list_value" else item["list_value_id"]
complex_types = {"array": "Array", "time_series": "Time series", "time_pattern": "Time pattern", "map": "Map"}
if role == Qt.ItemDataRole.DisplayRole and item[type_field] in complex_types:
return self._format_list_value(db_map, item.item_type, complex_types[item[type_field]], list_value_id)
if role == Qt.ItemDataRole.EditRole:
return join_value_and_type(item[value_field], item[type_field])
return self._format_value(item["parsed_value"], role=role)
[docs] def get_value_from_data(self, data, role=Qt.ItemDataRole.DisplayRole):
"""Returns the value or default value of a parameter directly from data.
Used by ``EmptyParameterModel.data()``.
Args:
data (str): joined value and type
role (int, optional)
Returns:
any
"""
if data is None:
return None
parsed_value = self._parse_value(*split_value_and_type(data))
return self._format_value(parsed_value, role=role)
@staticmethod
[docs] def _parse_value(db_value, type_=None):
try:
return from_database(db_value, type_=type_)
except ParameterValueFormatError as error:
return error
[docs] def get_value_indexes(self, db_map, item_type, id_):
"""Returns the value or default value indexes of a parameter.
Args:
db_map (DatabaseMapping)
item_type (str): either "parameter_definition" or "parameter_value"
id_ (int): The parameter_value or definition id
"""
item = self.get_item(db_map, item_type, id_)
parsed_value = self.get_value(db_map, item, role=PARSED_ROLE)
if isinstance(parsed_value, IndexedValue):
return parsed_value.indexes
return [""]
[docs] def get_value_index(self, db_map, item_type, id_, index, role=Qt.ItemDataRole.DisplayRole):
"""Returns the value or default value of a parameter for a given index.
Args:
db_map (DatabaseMapping)
item_type (str): either "parameter_definition" or "parameter_value"
id_ (int): The parameter_value or definition id
index: The index to retrieve
role (int, optional)
"""
item = self.get_item(db_map, item_type, id_)
parsed_value = self.get_value(db_map, item, role=PARSED_ROLE)
if isinstance(parsed_value, IndexedValue):
parsed_value = parsed_value.get_value(index)
if role == Qt.ItemDataRole.EditRole:
return join_value_and_type(*to_database(parsed_value))
if role == Qt.ItemDataRole.DisplayRole:
return self.display_data_from_parsed(parsed_value)
if role == Qt.ItemDataRole.ToolTipRole:
return self.tool_tip_data_from_parsed(parsed_value)
if role == PARSED_ROLE:
return parsed_value
return None
[docs] def get_value_list_item(self, db_map, id_, index, role=Qt.ItemDataRole.DisplayRole):
"""Returns one value item of a parameter_value_list.
Args:
db_map (DatabaseMapping)
id_ (int): The parameter_value_list id
index (int): The value item index
role (int, optional)
"""
try:
return self.get_parameter_value_list(db_map, id_, role=role)[index]
except IndexError:
return None
[docs] def get_parameter_value_list(self, db_map, id_, role=Qt.ItemDataRole.DisplayRole):
"""Returns a parameter_value_list formatted for the given role.
Args:
db_map (DatabaseMapping)
id_ (int): The parameter_value_list id
role (int, optional)
"""
return [
self.get_value(db_map, item, role=role)
for item in self.get_items_by_field(db_map, "list_value", "parameter_value_list_id", id_)
]
[docs] def get_scenario_alternative_id_list(self, db_map, scen_id):
scen = self.get_item(db_map, "scenario", scen_id)
return scen["alternative_id_list"] if scen else []
[docs] def import_data(self, db_map_data, command_text="Import data"):
"""Imports the given data into given db maps using the dedicated import functions from spinedb_api.
Condenses all in a single command for undo/redo.
Args:
db_map_data (dict(DatabaseMapping, dict())): Maps dbs to data to be passed as keyword arguments
to `get_data_for_import`
command_text (str, optional): What to call the command that condenses the operation.
"""
db_map_error_log = {}
for db_map, data in db_map_data.items():
try:
data_for_import = get_data_for_import(db_map, **data)
except (TypeError, ValueError) as err:
msg = f"Failed to import data: {err}. Please check that your data source has the right format."
db_map_error_log.setdefault(db_map, []).append(msg)
continue
identifier = self.get_command_identifier()
for item_type, items in data_for_import:
if isinstance(items, tuple):
items, errors = items
db_map_error_log.setdefault(db_map, []).extend(errors)
self.add_update_items(item_type, {db_map: list(items)}, identifier=identifier)
if any(db_map_error_log.values()):
self.error_msg.emit(db_map_error_log)
[docs] def add_alternatives(self, db_map_data):
"""Adds alternatives to db.
Args:
db_map_data (dict): lists of items to add keyed by DatabaseMapping
"""
self.add_items("alternative", db_map_data)
[docs] def add_scenarios(self, db_map_data):
"""Adds scenarios to db.
Args:
db_map_data (dict): lists of items to add keyed by DatabaseMapping
"""
self.add_items("scenario", db_map_data)
[docs] def add_entity_classes(self, db_map_data):
"""Adds entity classes to db.
Args:
db_map_data (dict): lists of items to add keyed by DatabaseMapping
"""
self.add_items("entity_class", db_map_data)
[docs] def add_entities(self, db_map_data):
"""Adds entities to db.
Args:
db_map_data (dict): lists of items to add keyed by DatabaseMapping
"""
self.add_items("entity", db_map_data)
[docs] def add_entity_groups(self, db_map_data):
"""Adds entity groups to db.
Args:
db_map_data (dict): lists of items to add keyed by DatabaseMapping
"""
self.add_items("entity_group", db_map_data)
[docs] def add_entity_alternatives(self, db_map_data):
"""Adds entity alternatives to db.
Args:
db_map_data (dict): lists of items to add keyed by DatabaseMapping
"""
self.add_items("entity_alternative", db_map_data)
[docs] def add_parameter_definitions(self, db_map_data):
"""Adds parameter definitions to db.
Args:
db_map_data (dict): lists of items to add keyed by DatabaseMapping
"""
self.add_items("parameter_definition", db_map_data)
[docs] def add_parameter_values(self, db_map_data):
"""Adds parameter values to db without checking integrity.
Args:
db_map_data (dict): lists of items to add keyed by DatabaseMapping
"""
self.add_items("parameter_value", db_map_data)
[docs] def add_parameter_value_lists(self, db_map_data):
"""Adds parameter_value lists to db.
Args:
db_map_data (dict): lists of items to add keyed by DatabaseMapping
"""
self.add_items("parameter_value_list", db_map_data)
[docs] def add_list_values(self, db_map_data):
"""Adds parameter_value list values to db.
Args:
db_map_data (dict): lists of items to add keyed by DatabaseMapping
"""
self.add_items("list_value", db_map_data)
[docs] def update_alternatives(self, db_map_data):
"""Updates alternatives in db.
Args:
db_map_data (dict): lists of items to update keyed by DatabaseMapping
"""
self.update_items("alternative", db_map_data)
[docs] def update_scenarios(self, db_map_data):
"""Updates scenarios in db.
Args:
db_map_data (dict): lists of items to update keyed by DatabaseMapping
"""
self.update_items("scenario", db_map_data)
[docs] def update_entity_classes(self, db_map_data):
"""Updates entity classes in db.
Args:
db_map_data (dict): lists of items to update keyed by DatabaseMapping
"""
self.update_items("entity_class", db_map_data)
[docs] def update_entities(self, db_map_data):
"""Updates entities in db.
Args:
db_map_data (dict): lists of items to update keyed by DatabaseMapping
"""
self.update_items("entity", db_map_data)
[docs] def update_entity_alternatives(self, db_map_data):
"""Updates entity alternatives in db.
Args:
db_map_data (dict): lists of items to update keyed by DatabaseMapping
"""
self.update_items("entity_alternative", db_map_data)
[docs] def update_parameter_definitions(self, db_map_data):
"""Updates parameter definitions in db.
Args:
db_map_data (dict): lists of items to update keyed by DatabaseMapping
"""
self.update_items("parameter_definition", db_map_data)
[docs] def update_parameter_values(self, db_map_data):
"""Updates parameter values in db.
Args:
db_map_data (dict): lists of items to update keyed by DatabaseMapping
"""
self.update_items("parameter_value", db_map_data)
[docs] def update_expanded_parameter_values(self, db_map_data):
"""Updates expanded parameter values in db.
Args:
db_map_data (dict): lists of expanded items to update keyed by DatabaseMapping
"""
for db_map, expanded_data in db_map_data.items():
packed_data = {}
for item in expanded_data:
packed_data.setdefault(item["id"], {})[item["index"]] = (item["value"], item["type"])
items = []
for id_, indexed_values in packed_data.items():
item = self.get_item(db_map, "parameter_value", id_)
parsed_value = self.get_value(db_map, item, role=PARSED_ROLE)
if isinstance(parsed_value, IndexedValue):
parsed_value = deep_copy_value(parsed_value)
for index, (val, typ) in indexed_values.items():
parsed_val = from_database(val, typ)
parsed_value.set_value(index, parsed_val)
value, value_type = to_database(parsed_value)
else:
value, value_type = next(iter(indexed_values.values()))
item = {"id": id_, "value": value, "type": value_type}
items.append(item)
self.undo_stack[db_map].push(UpdateItemsCommand(self, db_map, "parameter_value", items))
[docs] def update_parameter_value_lists(self, db_map_data):
"""Updates parameter_value lists in db.
Args:
db_map_data (dict): lists of items to update keyed by DatabaseMapping
"""
self.update_items("parameter_value_list", db_map_data)
[docs] def update_list_values(self, db_map_data):
"""Updates parameter_value list values in db.
Args:
db_map_data (dict): lists of items to update keyed by DatabaseMapping
"""
self.update_items("list_value", db_map_data)
[docs] def set_scenario_alternatives(self, db_map_data):
"""Sets scenario alternatives in db.
Args:
db_map_data (dict): lists of items to set keyed by DatabaseMapping
"""
db_map_error_log = {}
for db_map, data in db_map_data.items():
identifier = self.get_command_identifier()
items_to_add, ids_to_remove, errors = self.get_data_to_set_scenario_alternatives(db_map, data)
if ids_to_remove:
self.remove_items({db_map: {"scenario_alternative": ids_to_remove}}, identifier=identifier)
if items_to_add:
self.add_items("scenario_alternative", {db_map: items_to_add}, identifier=identifier)
if errors:
db_map_error_log.setdefault(db_map, []).extend([str(x) for x in errors])
if any(db_map_error_log.values()):
self.error_msg.emit(db_map_error_log)
@staticmethod
[docs] def get_data_to_set_scenario_alternatives(db_map, scenarios):
"""Returns data to add and remove, in order to set wide scenario alternatives.
Args:
db_map (DatabaseMapping): the db_map
scenarios: One or more wide scenario :class:`dict` objects to set.
Each item must include the following keys:
- "id": integer scenario id
- "alternative_id_list": list of alternative ids for that scenario
Returns
list: scenario_alternative :class:`dict` objects to add.
set: integer scenario_alternative ids to remove
"""
def _is_equal(to_add, to_rm):
return all(to_rm[k] == v for k, v in to_add.items())
scen_alts_to_add = []
scen_alt_ids_to_remove = {}
errors = []
for scen in scenarios:
current_scen = db_map.get_item("scenario", id=scen["id"])
if current_scen is None:
error = f"no scenario matching {scen} to set alternatives for"
errors.append(error)
continue
for k, alternative_id in enumerate(scen.get("alternative_id_list", ())):
item_to_add = {"scenario_id": current_scen["id"], "alternative_id": alternative_id, "rank": k + 1}
scen_alts_to_add.append(item_to_add)
for k, alternative_name in enumerate(scen.get("alternative_name_list", ())):
item_to_add = {"scenario_id": current_scen["id"], "alternative_name": alternative_name, "rank": k + 1}
scen_alts_to_add.append(item_to_add)
for alternative_name in current_scen["alternative_name_list"]:
current_scen_alt = db_map.get_item(
"scenario_alternative", scenario_name=current_scen["name"], alternative_name=alternative_name
)
scen_alt_ids_to_remove[current_scen_alt["id"]] = current_scen_alt
# Remove items that are both to add and to remove
for id_, to_rm in list(scen_alt_ids_to_remove.items()):
i = next((i for i, to_add in enumerate(scen_alts_to_add) if _is_equal(to_add, to_rm)), None)
if i is not None:
del scen_alts_to_add[i]
del scen_alt_ids_to_remove[id_]
return scen_alts_to_add, set(scen_alt_ids_to_remove), errors
[docs] def purge_items(self, db_map_item_types, **kwargs):
"""Purges selected items from given database.
Args:
db_map_item_types (dict): mapping from database map to list of purgable item types
**kwargs: keyword arguments passed to ``remove_items()``
"""
db_map_typed_data = {
db_map: {item_type: {Asterisk} for item_type in item_types}
for db_map, item_types in db_map_item_types.items()
}
self.remove_items(db_map_typed_data, **kwargs)
[docs] def add_items(self, item_type, db_map_data, identifier=None, **kwargs):
"""Pushes commands to add items to undo stack."""
if identifier is None:
identifier = self.get_command_identifier()
for db_map, data in db_map_data.items():
self.undo_stack[db_map].push(
AddItemsCommand(self, db_map, item_type, data, identifier=identifier, **kwargs)
)
[docs] def update_items(self, item_type, db_map_data, identifier=None, **kwargs):
"""Pushes commands to update items to undo stack."""
if identifier is None:
identifier = self.get_command_identifier()
for db_map, data in db_map_data.items():
self.undo_stack[db_map].push(
UpdateItemsCommand(self, db_map, item_type, data, identifier=identifier, **kwargs)
)
[docs] def add_update_items(self, item_type, db_map_data, identifier=None, **kwargs):
"""Pushes commands to add_update items to undo stack."""
if identifier is None:
identifier = self.get_command_identifier()
for db_map, data in db_map_data.items():
self.undo_stack[db_map].push(
AddUpdateItemsCommand(self, db_map, item_type, data, identifier=identifier, **kwargs)
)
[docs] def remove_items(self, db_map_typed_ids, identifier=None, **kwargs):
"""Pushes commands to remove items to undo stack."""
if identifier is None:
identifier = self.get_command_identifier()
for db_map, ids_per_type in db_map_typed_ids.items():
for item_type, ids in ids_per_type.items():
self.undo_stack[db_map].push(
RemoveItemsCommand(self, db_map, item_type, ids, identifier=identifier, **kwargs)
)
[docs] def get_command_identifier(self):
try:
return self._cmd_id
finally:
self._cmd_id += 1
@busy_effect
[docs] def do_add_items(self, db_map, item_type, data, check=True):
try:
worker = self._get_worker(db_map)
except KeyError:
# We're closing the kiosk.
return []
return worker.add_items(item_type, data, check)
@busy_effect
[docs] def do_update_items(self, db_map, item_type, data, check=True):
try:
worker = self._get_worker(db_map)
except KeyError:
# We're closing the kiosk.
return []
return worker.update_items(item_type, data, check)
@busy_effect
[docs] def do_add_update_items(self, db_map, item_type, data, check=True):
try:
worker = self._get_worker(db_map)
except KeyError:
# We're closing the kiosk.
return []
return worker.add_update_items(item_type, data, check)
@busy_effect
[docs] def do_remove_items(self, db_map, item_type, ids, check=True):
"""Removes items from database.
Args:
db_map: DatabaseMapping instance
item_type (str): database item type
ids (set): ids to remove
"""
try:
worker = self._get_worker(db_map)
except KeyError:
return []
return worker.remove_items(item_type, ids, check)
@busy_effect
[docs] def do_restore_items(self, db_map, item_type, ids):
"""Restores items in database.
Args:
db_map: DatabaseMapping instance
item_type (str): database item type
ids (set): ids to restore
"""
try:
worker = self._get_worker(db_map)
except KeyError:
return []
return worker.restore_items(item_type, ids)
@staticmethod
[docs] def db_map_ids(db_map_data):
return {db_map: {x["id"] for x in data if x} for db_map, data in db_map_data.items()}
@staticmethod
[docs] def db_map_class_ids(db_map_data):
d = {}
for db_map, items in db_map_data.items():
for item in items:
if item:
d.setdefault((db_map, item["class_id"]), set()).add(item["id"])
return d
[docs] def find_cascading_entity_classes(self, db_map_ids):
"""Finds and returns cascading entity classes for the given dimension ids."""
db_map_cascading_data = dict()
for db_map, dimension_ids in db_map_ids.items():
for item in self.get_items(db_map, "entity_class"):
if set(item["dimension_id_list"]) & set(dimension_ids):
db_map_cascading_data.setdefault(db_map, []).append(item)
return db_map_cascading_data
[docs] def find_cascading_entities(self, db_map_ids):
"""Finds and returns cascading entities for the given element ids."""
db_map_cascading_data = dict()
for db_map, element_ids in db_map_ids.items():
for item in self.get_items(db_map, "entity"):
if set(item["element_id_list"]) & set(element_ids):
db_map_cascading_data.setdefault(db_map, []).append(item)
return db_map_cascading_data
[docs] def find_cascading_parameter_data(self, db_map_ids, item_type):
"""Finds and returns cascading parameter definitions or values for the given entity_class ids."""
db_map_cascading_data = dict()
for db_map, entity_class_ids in db_map_ids.items():
for item in self.get_items(db_map, item_type):
if item["entity_class_id"] in entity_class_ids:
db_map_cascading_data.setdefault(db_map, []).append(item)
return db_map_cascading_data
[docs] def find_cascading_parameter_values_by_entity(self, db_map_ids):
"""Finds and returns cascading parameter values for the given entity ids."""
db_map_cascading_data = dict()
for db_map, entity_ids in db_map_ids.items():
for item in self.get_items(db_map, "parameter_value"):
if item["entity_id"] in entity_ids:
db_map_cascading_data.setdefault(db_map, []).append(item)
return db_map_cascading_data
[docs] def find_cascading_parameter_values_by_definition(self, db_map_ids):
"""Finds and returns cascading parameter values for the given parameter_definition ids."""
db_map_cascading_data = dict()
for db_map, param_def_ids in db_map_ids.items():
for item in self.get_items(db_map, "parameter_value"):
if item["parameter_id"] in param_def_ids:
db_map_cascading_data.setdefault(db_map, []).append(item)
return db_map_cascading_data
[docs] def find_cascading_scenario_alternatives_by_scenario(self, db_map_ids):
"""Finds and returns cascading scenario alternatives for the given scenario ids."""
db_map_cascading_data = dict()
for db_map, ids in db_map_ids.items():
for item in self.get_items(db_map, "scenario_alternative"):
if item["scenario_id"] in ids:
db_map_cascading_data.setdefault(db_map, []).append(item)
return db_map_cascading_data
[docs] def find_groups_by_entity(self, db_map_ids):
"""Finds and returns groups for the given entity ids."""
db_map_group_data = dict()
for db_map, entity_ids in db_map_ids.items():
for item in self.get_items(db_map, "entity_group"):
if item["entity_id"] in entity_ids:
db_map_group_data.setdefault(db_map, []).append(item)
return db_map_group_data
[docs] def duplicate_scenario(self, scen_data, dup_name, db_map):
data = self._get_data_for_export(scen_data)
data = {
"scenarios": [(dup_name, active, description) for (_, active, description) in data.get("scenarios", [])],
"alternatives": data.get("alternatives", []),
"scenario_alternatives": [
(dup_name, alt_name, before_alt_name)
for (_, alt_name, before_alt_name) in data.get("scenario_alternatives", [])
],
}
self.import_data({db_map: data}, command_text="Duplicate scenario")
[docs] def duplicate_entity(self, orig_name, dup_name, class_name, db_maps):
"""Duplicates entity, its parameter values and related multidimensional entities.
Args:
orig_name (str): original entity's name
dup_name (str): duplicate's name
class_name (str): entity class name
db_maps (Iterable of DatabaseMapping): database mappings where duplication should take place
"""
dup_import_data = {}
for db_map in db_maps:
entity = db_map.get_entity_item(entity_class_name=class_name, name=orig_name)
element_name_list = entity["element_name_list"]
if element_name_list:
first_import_entry = (class_name, dup_name, element_name_list, entity["description"])
else:
first_import_entry = (class_name, dup_name, entity["description"])
dup_entity_import_data = [first_import_entry]
for item in db_map.get_entity_items():
element_name_list = item["element_name_list"]
item_class_name = item["entity_class_name"]
if orig_name in element_name_list and item_class_name != class_name:
index = item["dimension_name_list"].index(class_name)
name_list = element_name_list
dup_name_list = name_list[:index] + (dup_name,) + name_list[index + 1 :]
dup_entity_import_data.append((item_class_name, dup_name_list, item["description"]))
dup_import_data[db_map] = {"entities": dup_entity_import_data}
dup_value_import_data = []
for item in db_map.get_parameter_value_items(entity_class_name=class_name, entity_name=orig_name):
dup_value_import_data.append(
(
class_name,
dup_name,
item["parameter_definition_name"],
item["parsed_value"],
item["alternative_name"],
)
)
dup_import_data[db_map].update(parameter_values=dup_value_import_data)
dup_entity_alternative_import_data = []
for item in db_map.get_entity_alternative_items(entity_class_name=class_name, entity_name=orig_name):
dup_entity_alternative_import_data.append(
(class_name, dup_name, item["alternative_name"], item["active"])
)
dup_import_data[db_map]["entity_alternatives"] = dup_entity_alternative_import_data
self.import_data(dup_import_data, command_text="Duplicate entity")
[docs] def _get_data_for_export(self, db_map_item_ids):
data = {}
for db_map, item_ids in db_map_item_ids.items():
for key, items in export_data(db_map, parse_value=load_db_value, **item_ids).items():
data.setdefault(key, []).extend(items)
return data
[docs] def export_data(self, caller, db_map_item_ids, file_path, file_filter):
data = self._get_data_for_export(db_map_item_ids)
if file_filter.startswith("JSON"):
self.export_to_json(file_path, data, caller)
elif file_filter.startswith("SQLite"):
self.export_to_sqlite(file_path, data, caller)
elif file_filter.startswith("Excel"):
self.export_to_excel(file_path, data, caller)
else:
raise ValueError()
[docs] def _is_url_available(self, url, logger):
if str(url) in self.db_urls:
message = f"The URL <b>{url}</b> is in use. Please close all applications using it and try again."
logger.msg_error.emit(message)
return False
return True
[docs] def export_to_sqlite(self, file_path, data_for_export, caller):
"""Exports given data into SQLite file."""
url = URL("sqlite", database=file_path)
if not self._is_url_available(url, caller):
return
create_new_spine_database(url)
db_map = DatabaseMapping(url)
import_data(db_map, **data_for_export)
try:
db_map.commit_session("Export data from Spine Toolbox.")
except SpineDBAPIError as err:
error_msg = f"[SpineDBAPIError] Unable to export file <b>{db_map.codename}</b>: {err.msg}"
caller.msg_error.emit(error_msg)
else:
caller.file_exported.emit(file_path, 1.0, True)
finally:
db_map.close()
@staticmethod
[docs] def export_to_json(file_path, data_for_export, caller):
"""Exports given data into JSON file."""
json_data = json.dumps(data_for_export, indent=4)
with open(file_path, "w", encoding="utf-8") as f:
f.write(json_data)
caller.file_exported.emit(file_path, 1.0, False)
@staticmethod
[docs] def export_to_excel(file_path, data_for_export, caller):
"""Exports given data into Excel file."""
# NOTE: We import data into an in-memory Spine db and then export that to excel.
url = URL("sqlite", database="")
with DatabaseMapping(url, create=True) as db_map:
count, errors = import_data(db_map, **data_for_export, unparse_value=dump_db_value)
file_name = os.path.split(file_path)[1]
if errors:
error_msg = (
f"Unable to export file <b>{file_name}</b>."
f"Failed to copy the data to temporary database: <p>{errors}</p>"
)
caller.msg_error.emit(error_msg)
return
if count > 0:
db_map.commit_session("Added data for exporting.")
if os.path.exists(file_path):
os.remove(file_path)
try:
export_spine_database_to_xlsx(db_map, file_path)
except PermissionError:
error_msg = f"Unable to export file <b>{file_name}</b>.<br/>Close the file in Excel and try again."
caller.msg_error.emit(error_msg)
except OSError:
error_msg = f"[OSError] Unable to export file <b>{file_name}</b>."
caller.msg_error.emit(error_msg)
else:
caller.file_exported.emit(file_path, 1.0, False)
[docs] def get_items_for_commit(self, db_map, commit_id):
try:
worker = self._get_worker(db_map)
except KeyError:
return {}
return worker.commit_cache.get(commit_id.db_id, {})
@staticmethod
[docs] def get_all_multi_spine_db_editors():
"""Yields all instances of MultiSpineDBEditor currently open.
Yields:
MultiSpineDBEditor
"""
for window in qApp.topLevelWindows(): # pylint: disable=undefined-variable
if isinstance(window, QWindow):
widget = QWidget.find(window.winId())
if isinstance(widget, MultiSpineDBEditor) and widget.accepting_new_tabs:
yield widget
[docs] def get_all_spine_db_editors(self):
"""Yields all instances of SpineDBEditor currently open.
Yields:
SpineDBEditor
"""
for w in self.get_all_multi_spine_db_editors():
for k in range(w.tab_widget.count()):
yield w.tab_widget.widget(k)
[docs] def _get_existing_spine_db_editor(self, db_url_codenames):
db_url_codenames = {str(url): codename for url, codename in db_url_codenames.items()}
for multi_db_editor in self.get_all_multi_spine_db_editors():
for k in range(multi_db_editor.tab_widget.count()):
db_editor = multi_db_editor.tab_widget.widget(k)
if db_editor.db_url_codenames == db_url_codenames:
return multi_db_editor, db_editor
return None
[docs] def open_db_editor(self, db_url_codenames, reuse_existing_editor):
"""Opens a SpineDBEditor with given urls. Uses an existing MultiSpineDBEditor if any.
Also, if the same urls are open in an existing SpineDBEditor, just raises that one
instead of creating another.
Args:
db_url_codenames (dict): mapping url to codename
reuse_existing_editor (bool): if True and the same URL is already open, just raise the existing window
"""
multi_db_editor = next(self.get_all_multi_spine_db_editors(), None) if reuse_existing_editor else None
if multi_db_editor is None:
multi_db_editor = MultiSpineDBEditor(self, db_url_codenames)
if multi_db_editor.tab_load_success: # don't open an editor if tabs were not loaded successfully
multi_db_editor.show()
return
existing = self._get_existing_spine_db_editor(db_url_codenames)
if existing is None:
multi_db_editor.add_new_tab(db_url_codenames)
else:
multi_db_editor, db_editor = existing
multi_db_editor.set_current_tab(db_editor)
if multi_db_editor.isMinimized():
multi_db_editor.showNormal()
multi_db_editor.activateWindow()