######################################################################################################################
# Copyright (C) 2017-2022 Spine project consortium
# Copyright Spine Toolbox contributors
# This file is part of Spine Toolbox.
# Spine Toolbox is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option)
# any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
# Public License for more details. You should have received a copy of the GNU Lesser General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
######################################################################################################################
"""Models that vertically concatenate two or more table models."""
import bisect
from PySide6.QtCore import Qt, Signal, Slot, QModelIndex, QTimer
from ..mvcmodels.minimal_table_model import MinimalTableModel
[docs]class CompoundTableModel(MinimalTableModel):
"""A model that concatenates several sub table models vertically."""
def __init__(self, parent=None, header=None):
"""Initializes model.
Args:
parent (QObject, optional): the parent object
header (list of str, optional): header labels
"""
super().__init__(parent=parent, header=header)
self.sub_models = []
self._row_map = [] # Maps compound row to tuple (sub_model, sub_row)
self._inv_row_map = {} # Maps tuple (sub_model, sub_row) to compound row
self._next_sub_model = None
[docs] def map_to_sub(self, index):
"""Returns an equivalent submodel index.
Args:
index (QModelIndex): the compound model index.
Returns:
QModelIndex: the equivalent index in one of the submodels
"""
if not index.isValid():
return QModelIndex()
try:
sub_model, sub_row = self._row_map[index.row()]
except IndexError:
return QModelIndex()
return sub_model.index(sub_row, index.column())
[docs] def map_from_sub(self, sub_model, sub_index):
"""Returns an equivalent compound model index.
Args:
sub_model (MinimalTableModel): the submodel
sub_index (QModelIndex): the submodel index.
Returns:
QModelIndex: the equivalent index in the compound model
"""
try:
row = self._inv_row_map[sub_model, sub_index.row()]
except KeyError:
return QModelIndex()
return self.index(row, sub_index.column())
[docs] def item_at_row(self, row):
"""Returns the item at given row.
Args:
row (int)
Returns:
object
"""
sub_model, sub_row = self._row_map[row]
return sub_model._main_data[sub_row]
[docs] def sub_model_at_row(self, row):
"""Returns the submodel corresponding to the given row in the compound model.
Args:
row (int):
Returns:
MinimalTableModel
"""
sub_model, _ = self._row_map[row]
return sub_model
[docs] def sub_model_row(self, row):
"""Calculates sub model row.
Args:
row (int): row in compound model
Returns:
int: row in sub model
"""
_, sub_row = self._row_map[row]
return sub_row
@Slot()
[docs] def refresh(self):
"""Refreshes the layout by computing a new row map."""
self.layoutAboutToBeChanged.emit()
self._do_refresh()
self.layoutChanged.emit()
if self.canFetchMore(QModelIndex()):
self.fetchMore(QModelIndex())
[docs] def _do_refresh(self):
"""Recomputes the row and inverse row maps."""
self._row_map.clear()
self._inv_row_map.clear()
for model in self.sub_models:
row_map = self._row_map_for_model(model)
self._append_row_map(row_map)
self.refreshed.emit()
[docs] def _append_row_map(self, row_map):
"""Appends given row map to the tail of the model.
Args:
row_map (list): tuples (model, row number)
"""
for model_row_tup in row_map:
self._inv_row_map[model_row_tup] = self.rowCount()
self._row_map.append(model_row_tup)
[docs] def _row_map_iterator_for_model(self, model):
"""Yields row map for given model.
The base class implementation just yields all model rows.
Args:
model (MinimalTableModel)
Yields:
tuple: (model, row number)
"""
for i in range(model.rowCount()):
yield (model, i)
[docs] def _row_map_for_model(self, model):
"""Returns row map for given model.
The base class implementation just returns all model rows.
Args:
model (MinimalTableModel)
Returns:
list: tuples (model, row number)
"""
return list(self._row_map_iterator_for_model(model))
[docs] def canFetchMore(self, parent):
"""Returns True if any of the submodels that haven't been fetched yet can fetch more."""
for self._next_sub_model in self.sub_models:
if self._next_sub_model.canFetchMore(self.map_to_sub(parent)):
return True
return False
[docs] def fetchMore(self, parent):
"""Fetches the next sub model and increments the fetched counter."""
self._next_sub_model.fetchMore(self.map_to_sub(parent))
if not self._next_sub_model.rowCount():
self.sub_models.remove(self._next_sub_model)
self.layoutChanged.emit()
[docs] def flags(self, index):
return self.map_to_sub(index).flags()
[docs] def data(self, index, role=Qt.ItemDataRole.DisplayRole):
return self.map_to_sub(index).data(role)
[docs] def rowCount(self, parent=QModelIndex()):
"""Returns the sum of rows in all models."""
return len(self._row_map)
[docs] def batch_set_data(self, indexes, data):
"""Sets data for indexes in batch.
Distributes indexes and values among the different submodels
and calls batch_set_data on each of them."""
if not indexes or not data:
return False
d = {} # Maps models to (index, value) tuples
rows = []
columns = []
successful = True
for index, value in zip(indexes, data):
if not index.isValid():
continue
rows.append(index.row())
columns.append(index.column())
sub_model, _ = self._row_map[index.row()]
sub_index = self.map_to_sub(index)
d.setdefault(sub_model, list()).append((sub_index, value))
for model, index_value_tuples in d.items():
indexes, values = zip(*index_value_tuples)
if not model.batch_set_data(list(indexes), list(values)):
successful = False
break
# Find square envelope of indexes to emit dataChanged
top = min(rows)
bottom = max(rows)
left = min(columns)
right = max(columns)
self.dataChanged.emit(self.index(top, left), self.index(bottom, right))
return successful
[docs] def insertRows(self, row, count, parent=QModelIndex()):
"""Inserts count rows after the given row under the given parent.
Localizes the appropriate submodel and calls insertRows on it.
"""
if row < 0 or row > self.rowCount():
return False
if count < 1:
return False
if row < self.rowCount():
sub_model, sub_row = self._row_map[row]
else:
sub_model, sub_row = self._row_map[-1]
sub_row += 1
self.beginInsertRows(parent, row, row + count - 1)
sub_model.insertRows(sub_row, count, self.map_to_sub(parent))
self.endInsertRows()
self.refresh()
return True
[docs] def removeRows(self, row, count, parent=QModelIndex()):
"""Removes count rows starting with the given row under parent.
Localizes the appropriate submodels and calls removeRows on it.
"""
if row < 0 or row > self.rowCount():
return False
if count < 1:
return False
first = row
last = row + count - 1
self.beginRemoveRows(parent, first, last)
while first <= last:
try:
sub_model, sub_row = self._row_map[first]
sub_count = min(sub_model.rowCount(), count)
first += sub_count
count -= sub_count
except IndexError:
sub_model, sub_row = self._row_map[-1]
sub_count = min(sub_model.rowCount(), count)
break
finally:
sub_model.removeRows(sub_row, sub_count, self.map_to_sub(parent))
self.endRemoveRows()
self.refresh()
return True
[docs]class CompoundWithEmptyTableModel(CompoundTableModel):
"""A compound parameter table model where the last model is an empty row model."""
@property
[docs] def single_models(self):
return self.sub_models[:-1]
@property
[docs] def empty_model(self):
return self.sub_models[-1]
[docs] def _create_empty_model(self):
"""Creates and returns an empty model.
Returns:
EmptyRowModel: model
"""
raise NotImplementedError()
[docs] def init_model(self):
"""Initializes the compound model.
Basically populates the sub_models list attribute with the result of _create_empty_model.
"""
self.clear_model()
self.sub_models.append(self._create_empty_model())
self.empty_model.rowsRemoved.connect(self._handle_empty_rows_removed)
self.empty_model.rowsInserted.connect(self._handle_empty_rows_inserted)
[docs] def _connect_single_model(self, model):
"""Connects signals so changes in the submodels are acknowledged by the compound."""
model.modelReset.connect(lambda model=model: self._handle_single_model_reset(model))
model.modelAboutToBeReset.connect(lambda model=model: self._handle_single_model_about_to_be_reset(model))
model.dataChanged.connect(
lambda top_left, bottom_right, roles, model=model: self.dataChanged.emit(
self.map_from_sub(model, top_left), self.map_from_sub(model, bottom_right), roles
)
)
[docs] def _recompute_empty_row_map(self):
"""Recomputes the part of the row map corresponding to the empty model."""
empty_row_map = self._row_map_for_model(self.empty_model)
try:
row = self._inv_row_map[self.empty_model, 0]
self._row_map = self._row_map[:row]
except KeyError:
pass
self._append_row_map(empty_row_map)
@Slot(QModelIndex, int, int)
[docs] def _handle_empty_rows_removed(self, parent, empty_first, empty_last):
"""Updates row_map when rows are removed from the empty model."""
first = self._inv_row_map[self.empty_model, empty_first]
last = self._inv_row_map[self.empty_model, empty_last]
self.beginRemoveRows(QModelIndex(), first, last)
self._recompute_empty_row_map()
self.endRemoveRows()
@Slot(QModelIndex, int, int)
[docs] def _handle_empty_rows_inserted(self, parent, empty_first, empty_last):
"""Runs when rows are inserted to the empty model.
Updates row_map, then emits rowsInserted so the new rows become visible.
"""
self._recompute_empty_row_map()
first = self._inv_row_map[self.empty_model, empty_first]
last = self._inv_row_map[self.empty_model, empty_last]
self.rowsInserted.emit(QModelIndex(), first, last)
[docs] def _handle_single_model_about_to_be_reset(self, model):
"""Runs when given model is about to reset."""
if model not in self.single_models:
return
row_map = self._row_map_for_model(model)
if not row_map:
return
try:
first = self._inv_row_map[row_map[0]]
except KeyError:
# Sometimes the submodel may get reset before it has been added to the inverted row map.
# In this case there are no rows to remove, so we can bail out here.
return
last = first + len(row_map) - 1
tail_row_map = self._row_map[last + 1 :]
self.beginRemoveRows(QModelIndex(), first, last)
for key in self._row_map[first:]:
del self._inv_row_map[key]
self._row_map[first:] = []
self._append_row_map(tail_row_map)
self.endRemoveRows()
[docs] def _handle_single_model_reset(self, model):
"""Runs when given model is reset."""
if model in self.single_models:
self._refresh_single_model(model)
else:
self._insert_single_model(model)
[docs] def _refresh_single_model(self, model):
single_row_map = self._row_map_for_model(model)
pos = self.single_models.index(model) + 1
self._insert_row_map(pos, single_row_map)
[docs] def _get_insert_position(self, model):
return bisect.bisect_left(self.single_models, model)
[docs] def _insert_single_model(self, model):
single_row_map = self._row_map_for_model(model)
pos = self._get_insert_position(model)
self._insert_row_map(pos, single_row_map)
self.sub_models.insert(pos, model)
[docs] def _get_row_for_insertion(self, pos):
for model in self.sub_models[pos:]:
first_row_map_item = next(self._row_map_iterator_for_model(model), None)
if first_row_map_item is not None:
try:
return self._inv_row_map[first_row_map_item]
except KeyError:
# Sometimes the submodel is not yet in the inverted row map.
# In this case we just skip it and try another insertion point.
pass
return self.rowCount()
[docs] def _insert_row_map(self, pos, single_row_map):
if not single_row_map:
# Emit layoutChanged to trigger fetching.
# The QTimer is to avoid funny situations where the user enters new data via the empty row model,
# and those rows need to be removed at the same time as we fetch the added data.
# Doing it in the same loop cycle was causing bugs.
QTimer.singleShot(0, self.layoutChanged.emit)
return
row = self._get_row_for_insertion(pos)
last = row + len(single_row_map) - 1
self.beginInsertRows(QModelIndex(), row, last)
self._row_map, tail_row_map = self._row_map[:row], self._row_map[row:]
self._append_row_map(single_row_map)
self._append_row_map(tail_row_map)
self.endInsertRows()
[docs] def clear_model(self):
"""Clears the model."""
if self._row_map:
self.beginResetModel()
self._row_map.clear()
self.endResetModel()
for m in self.sub_models:
m.deleteLater()
self.sub_models.clear()
self._inv_row_map.clear()