######################################################################################################################
# Copyright (C) 2017-2022 Spine project consortium
# Copyright Spine Toolbox contributors
# This file is part of Spine Toolbox.
# Spine Toolbox is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option)
# any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
# Public License for more details. You should have received a copy of the GNU Lesser General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
######################################################################################################################
""" Compound models. These models concatenate several 'single' models and one 'empty' model. """
from PySide6.QtCore import Qt, Slot, QTimer, QModelIndex
from PySide6.QtGui import QFont
from spinedb_api.parameter_value import join_value_and_type
from ...helpers import parameter_identifier, rows_to_row_count_tuples
from ...fetch_parent import FlexibleFetchParent
from ...mvcmodels.compound_table_model import CompoundWithEmptyTableModel
from ..widgets.custom_menus import AutoFilterMenu
from .empty_models import EmptyParameterDefinitionModel, EmptyParameterValueModel, EmptyEntityAlternativeModel
from .single_models import SingleParameterDefinitionModel, SingleParameterValueModel, SingleEntityAlternativeModel
[docs]class CompoundModelBase(CompoundWithEmptyTableModel):
"""A base model for all models that show data in stacked format."""
def __init__(self, parent, db_mngr, *db_maps):
"""
Args:
parent (SpineDBEditor): the parent object
db_mngr (SpineDBManager): the database manager
*db_maps (DatabaseMapping): the database maps included in the model
"""
super().__init__(parent=parent, header=self._make_header())
self._parent = parent
self.db_mngr = db_mngr
self.db_maps = db_maps
self._filter_class_ids = {}
self._auto_filter_menus = {}
self._auto_filter = {}
self._filter_timer = QTimer(self)
self._filter_timer.setSingleShot(True)
self._filter_timer.setInterval(100)
self._filter_timer.timeout.connect(self.refresh)
self._fetch_parent = FlexibleFetchParent(
self.item_type,
shows_item=self.shows_item,
handle_items_added=self.handle_items_added,
handle_items_removed=self.handle_items_removed,
handle_items_updated=self.handle_items_updated,
owner=self,
)
@property
[docs] def field_map(self):
return {}
@property
[docs] def item_type(self):
"""Returns the DB item type, e.g., 'parameter_value'.
Returns:
str
"""
raise NotImplementedError()
@property
[docs] def _single_model_type(self):
"""
Returns a constructor for the single models.
Returns:
SingleParameterModel
"""
raise NotImplementedError()
@property
[docs] def _empty_model_type(self):
"""
Returns a constructor for the empty model.
Returns:
EmptyParameterModel
"""
raise NotImplementedError()
[docs] def canFetchMore(self, _parent):
result = False
for db_map in self.db_maps:
result |= self.db_mngr.can_fetch_more(db_map, self._fetch_parent)
return result
[docs] def fetchMore(self, _parent):
for db_map in self.db_maps:
self.db_mngr.fetch_more(db_map, self._fetch_parent)
[docs] def shows_item(self, item, db_map):
return any(m.db_map == db_map and m.filter_accepts_item(item) for m in self.accepted_single_models())
[docs] def reset_db_maps(self, db_maps):
if set(db_maps) == set(self.db_maps):
return
self.db_maps = db_maps
self._fetch_parent.set_obsolete(False)
self._fetch_parent.reset()
[docs] def init_model(self):
"""Initializes the model."""
super().init_model()
self._filter_class_ids = {}
self._auto_filter = {}
self.empty_model.fetchMore(QModelIndex())
while self._auto_filter_menus:
_, menu = self._auto_filter_menus.popitem()
menu.deleteLater()
[docs] def _create_empty_model(self):
"""Returns the empty model for this compound model.
Returns:
EmptyParameterModel
"""
return self._empty_model_type(self)
[docs] def filter_accepts_model(self, model):
"""Returns a boolean indicating whether the given model passes the filter for compound model.
Args:
model (SingleModelBase or EmptyModelBase)
Returns:
bool
"""
if not model.can_be_filtered:
return True
if not self._auto_filter_accepts_model(model):
return False
if not self._class_filter_accepts_model(model):
return False
return True
[docs] def _class_filter_accepts_model(self, model):
if not self._filter_class_ids:
return True
class_ids = self._filter_class_ids.get(model.db_map, set())
return model.entity_class_id in class_ids or bool(set(model.dimension_id_list) & class_ids)
[docs] def _auto_filter_accepts_model(self, model):
if None in self._auto_filter.values():
return False
for values in self._auto_filter.values():
if not values:
continue
for db_map, entity_class_id in values:
if model.db_map == db_map and (entity_class_id is None or model.entity_class_id == entity_class_id):
break
else: # nobreak
return False
return True
[docs] def accepted_single_models(self):
"""Returns a list of accepted single models by calling filter_accepts_model
on each of them, just for convenience.
Returns:
list
"""
return [m for m in self.single_models if self.filter_accepts_model(m)]
[docs] def _invalidate_filter(self):
"""Sets the filter invalid."""
self._filter_timer.start()
[docs] def stop_invalidating_filter(self):
"""Stops invalidating the filter."""
self._filter_timer.stop()
[docs] def set_filter_class_ids(self, class_ids):
if class_ids != self._filter_class_ids:
self._filter_class_ids = class_ids
self._invalidate_filter()
[docs] def clear_auto_filter(self):
self._auto_filter = {}
self._invalidate_filter()
@Slot(str, object)
[docs] def set_auto_filter(self, field, values):
"""Updates and applies the auto filter.
Args:
field (str): the field name
values (dict): mapping (db_map, entity_class_id) to set of valid values
"""
self._set_compound_auto_filter(field, values)
for model in self.accepted_single_models():
self._set_single_auto_filter(model, field)
[docs] def _set_compound_auto_filter(self, field, values):
"""Sets the auto filter for given column in the compound model.
Args:
field (str): the field name
values (set): set of valid (db_map, item_type, id) tuples
"""
if self._auto_filter.setdefault(field, {}) == values:
return
self._auto_filter[field] = values
self._invalidate_filter()
[docs] def _set_single_auto_filter(self, model, field):
"""Sets the auto filter for given column in the given single model.
Args:
model (SingleParameterModel): the model
field (str): the field name
Returns:
bool: True if the auto-filtered values were updated, None otherwise
"""
values = self._auto_filter[field].get((model.db_map, model.entity_class_id), set())
if model.set_auto_filter(field, values):
self._invalidate_filter()
[docs] def _row_map_iterator_for_model(self, model):
"""Yields row map for the given model.
Reimplemented to take filter status into account.
Args:
model (SingleParameterModel, EmptyParameterModel)
Yields:
tuple: (model, row number) for each accepted row
"""
if not self.filter_accepts_model(model):
return ()
for i in model.accepted_rows():
yield (model, i)
[docs] def _models_with_db_map(self, db_map):
"""Returns a collection of single models with given db_map.
Args:
db_map (DatabaseMapping)
Returns:
list
"""
return [m for m in self.single_models if m.db_map == db_map]
@staticmethod
[docs] def _items_per_class(items):
"""Returns a dict mapping entity_class ids to a set of items.
Args:
items (list)
Returns:
dict
"""
d = {}
for item in items:
entity_class_id = item.get("entity_class_id")
if not entity_class_id:
continue
d.setdefault(entity_class_id, []).append(item)
return d
[docs] def handle_items_added(self, db_map_data):
"""Runs when either parameter definitions or values are added to the dbs.
Adds necessary sub-models and initializes them with data.
Also notifies the empty model so it can remove rows that are already in.
Args:
db_map_data (dict): list of added dict-items keyed by DatabaseMapping
"""
for db_map, items in db_map_data.items():
if db_map not in self.db_maps:
continue
db_map_single_models = [m for m in self.single_models if m.db_map is db_map]
existing_ids = set().union(*(m.item_ids() for m in db_map_single_models))
items_per_class = self._items_per_class(items)
for entity_class_id, class_items in items_per_class.items():
ids_committed = []
ids_uncommitted = []
for item in class_items:
item_id = item["id"]
if item_id in existing_ids:
continue
if item.is_committed():
ids_committed.append(item_id)
else:
ids_uncommitted.append(item_id)
self._add_items(db_map, entity_class_id, ids_committed, committed=True)
self._add_items(db_map, entity_class_id, ids_uncommitted, committed=False)
self.empty_model.handle_items_added(db_map_data)
[docs] def _get_insert_position(self, model):
if model.committed:
return super()._get_insert_position(model)
return len(self.single_models)
[docs] def _create_single_model(self, db_map, entity_class_id, committed):
model = self._single_model_type(self, db_map, entity_class_id, committed)
self._connect_single_model(model)
for field in self._auto_filter:
self._set_single_auto_filter(model, field)
return model
[docs] def _add_items(self, db_map, entity_class_id, ids, committed):
"""Creates new single model and resets it with the given parameter ids.
Args:
db_map (DatabaseMapping): database map
entity_class_id (int): parameter's entity class id
ids (list of int): parameter ids
committed (bool): True if the ids have been committed, False otherwise
"""
if not ids:
return
if committed:
existing = next(
(m for m in self.single_models if (m.db_map, m.entity_class_id) == (db_map, entity_class_id)), None
)
if existing is not None:
existing.add_rows(ids)
return
model = self._create_single_model(db_map, entity_class_id, committed)
model.reset_model(ids)
[docs] def handle_items_updated(self, db_map_data):
"""Runs when either parameter definitions or values are updated in the dbs.
Emits dataChanged so the parameter_name column is refreshed.
Args:
db_map_data (dict): list of updated dict-items keyed by DatabaseMapping
"""
if all(db_map not in self.db_maps for db_map in db_map_data):
return
self.dataChanged.emit(
self.index(0, 0), self.index(self.rowCount() - 1, self.columnCount() - 1), [Qt.ItemDataRole.DisplayRole]
)
[docs] def handle_items_removed(self, db_map_data):
"""Runs when either parameter definitions or values are removed from the dbs.
Removes the affected rows from the corresponding single models.
Args:
db_map_data (dict): list of removed dict-items keyed by DatabaseMapping
"""
self.layoutAboutToBeChanged.emit()
for db_map, items in db_map_data.items():
if db_map not in self.db_maps:
continue
items_per_class = self._items_per_class(items)
emptied_single_model_indexes = []
for model_index, model in enumerate(self.single_models):
if model.db_map != db_map:
continue
removed_ids = {x["id"] for x in items_per_class.get(model.entity_class_id, {})}
if not removed_ids:
continue
removed_rows = []
for row in range(model.rowCount()):
id_ = model._main_data[row]
if id_ in removed_ids:
removed_rows.append(row)
removed_ids.remove(id_)
if not removed_ids:
break
for row, count in sorted(rows_to_row_count_tuples(removed_rows), reverse=True):
del model._main_data[row : row + count]
if model.rowCount() == 0:
emptied_single_model_indexes.append(model_index)
for model_index in reversed(emptied_single_model_indexes):
model = self.sub_models.pop(model_index)
model.deleteLater()
self._do_refresh()
self.layoutChanged.emit()
[docs] def db_item(self, index):
sub_index = self.map_to_sub(index)
return sub_index.model().db_item(sub_index)
[docs] def db_map_id(self, index):
sub_index = self.map_to_sub(index)
sub_model = sub_index.model()
if sub_model is None:
return None, None
return sub_model.db_map, sub_model.item_id(sub_index.row())
[docs] def get_entity_class_id(self, index, db_map):
entity_class_name = index.sibling(index.row(), self.header.index("entity_class_name")).data()
entity_class = db_map.get_item("entity_class", name=entity_class_name)
return entity_class.get("id")
[docs] def filter_by(self, rows_per_column):
for column, rows in rows_per_column.items():
field = self.headerData(column)
menu = self._make_auto_filter_menu(field)
accepted_values = {self.index(row, column).data(Qt.ItemDataRole.DisplayRole) for row in rows}
menu.set_filter_accepted_values(accepted_values)
[docs] def filter_excluding(self, rows_per_column):
for column, rows in rows_per_column.items():
field = self.headerData(column)
menu = self._make_auto_filter_menu(field)
rejected_values = {self.index(row, column).data(Qt.ItemDataRole.DisplayRole) for row in rows}
menu.set_filter_rejected_values(rejected_values)
[docs]class FilterEntityAlternativeMixin:
"""Provides the interface to filter by entity and alternative."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._filter_entity_ids = {}
self._filter_alternative_ids = {}
[docs] def init_model(self):
super().init_model()
self._filter_entity_ids = {}
self._filter_alternative_ids = {}
[docs] def set_filter_entity_ids(self, entity_ids):
self._filter_entity_ids = entity_ids
for model in self.single_models:
if model.set_filter_entity_ids(entity_ids):
self._invalidate_filter()
[docs] def set_filter_alternative_ids(self, alternative_ids):
self._filter_alternative_ids = alternative_ids
for model in self.single_models:
if model.set_filter_alternative_ids(alternative_ids):
self._invalidate_filter()
[docs] def _create_single_model(self, db_map, entity_class_id, committed):
model = super()._create_single_model(db_map, entity_class_id, committed)
model.set_filter_entity_ids(self._filter_entity_ids)
model.set_filter_alternative_ids(self._filter_alternative_ids)
return model
[docs]class EditParameterValueMixin:
"""Provides the interface to edit values via ParameterValueEditor."""
[docs] def index_name(self, index):
"""Generates a name for data at given index.
Args:
index (QModelIndex): index to model
Returns:
str: label identifying the data
"""
item = self.db_item(index)
if item is None:
return ""
database = self.index(index.row(), self.columnCount() - 1).data()
if self.item_type == "parameter_definition":
parameter_name = item["name"]
names = [item["entity_class_name"]]
elif self.item_type == "parameter_value":
parameter_name = item["parameter_name"]
names = list(item["entity_byname"])
else:
raise ValueError(
f"invalid item_type: expected parameter_definition or parameter_value, got {self.item_type}"
)
alternative_name = {"parameter_definition": lambda x: None, "parameter_value": lambda x: x["alternative_name"]}[
self.item_type
](item)
return parameter_identifier(database, parameter_name, names, alternative_name)
[docs] def get_set_data_delayed(self, index):
"""Returns a function that ParameterValueEditor can call to set data for the given index at any later time,
even if the model changes.
Args:
index (QModelIndex)
Returns:
function
"""
sub_model = self.sub_model_at_row(index.row())
if sub_model == self.empty_model:
return lambda value_and_type, index=index: self.setData(index, join_value_and_type(*value_and_type))
id_ = self.item_at_row(index.row())
value_field = {"parameter_value": "value", "parameter_definition": "default_value"}[self.item_type]
return lambda value_and_type, sub_model=sub_model, id_=id_: sub_model.update_items_in_db(
[{"id": id_, value_field: join_value_and_type(*value_and_type)}]
)
[docs]class CompoundParameterDefinitionModel(EditParameterValueMixin, CompoundModelBase):
"""A model that concatenates several single parameter_definition models and one empty parameter_definition model."""
@property
[docs] def item_type(self):
return "parameter_definition"
@property
[docs] def field_map(self):
return {"parameter_name": "name", "value_list_name": "parameter_value_list_name"}
@property
[docs] def _single_model_type(self):
return SingleParameterDefinitionModel
@property
[docs] def _empty_model_type(self):
return EmptyParameterDefinitionModel
[docs]class CompoundParameterValueModel(FilterEntityAlternativeMixin, EditParameterValueMixin, CompoundModelBase):
"""A model that concatenates several single parameter_value models and one empty parameter_value model."""
@property
[docs] def item_type(self):
return "parameter_value"
@property
[docs] def field_map(self):
return {"parameter_name": "parameter_definition_name"}
@property
[docs] def _single_model_type(self):
return SingleParameterValueModel
@property
[docs] def _empty_model_type(self):
return EmptyParameterValueModel
[docs]class CompoundEntityAlternativeModel(FilterEntityAlternativeMixin, CompoundModelBase):
@property
[docs] def item_type(self):
return "entity_alternative"
@property
[docs] def _single_model_type(self):
return SingleEntityAlternativeModel
@property
[docs] def _empty_model_type(self):
return EmptyEntityAlternativeModel