######################################################################################################################
# Copyright (C) 2017-2022 Spine project consortium
# Copyright Spine Toolbox contributors
# This file is part of Spine Toolbox.
# Spine Toolbox is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option)
# any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
# Public License for more details. You should have received a copy of the GNU Lesser General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
######################################################################################################################
"""QUndoCommand subclasses for modifying the db."""
import time
from PySide6.QtGui import QUndoCommand, QUndoStack
[docs]class AgedUndoStack(QUndoStack):
@property
[docs] def redo_age(self):
if self.canRedo():
return self.command(self.index()).age
return -1
@property
[docs] def undo_age(self):
if self.canUndo():
return self.command(self.index() - 1).age
return -1
[docs]class AgedUndoCommand(QUndoCommand):
def __init__(self, parent=None, identifier=-1):
"""
Args:
parent (QUndoCommand, optional): The parent command, used for defining macros.
"""
super().__init__(parent=parent)
self._age = -1
self._id = identifier
self._buddies = []
self.merged = False
[docs] def id(self):
"""override"""
return self._id
[docs] def ours(self):
yield self
yield from self._buddies
[docs] def mergeWith(self, command):
if not isinstance(command, AgedUndoCommand):
return False
self._buddies += [x for x in command.ours() if not x.isObsolete()]
command.merged = True
return True
[docs] def redo(self):
if self.merged:
return
super().redo()
for cmd in self._buddies:
cmd.redo()
self._age = time.time()
[docs] def undo(self):
if self.merged:
return
for cmd in reversed(self._buddies):
cmd.undo()
super().undo()
self._age = time.time()
@property
[docs] def age(self):
return self._age
[docs]class SpineDBCommand(AgedUndoCommand):
"""Base class for all commands that modify a Spine DB."""
def __init__(self, db_mngr, db_map, **kwargs):
"""
Args:
db_mngr (SpineDBManager): SpineDBManager instance
db_map (DiffDatabaseMapping): DiffDatabaseMapping instance
"""
super().__init__(**kwargs)
self.db_mngr = db_mngr
self.db_map = db_map
[docs]class AddItemsCommand(SpineDBCommand):
def __init__(self, db_mngr, db_map, item_type, data, check=True, **kwargs):
"""
Args:
db_mngr (SpineDBManager): SpineDBManager instance
db_map (DiffDatabaseMapping): DiffDatabaseMapping instance
data (list): list of dict-items to add
item_type (str): the item type
"""
super().__init__(db_mngr, db_map, **kwargs)
if not data:
self.setObsolete(True)
self.item_type = item_type
self.redo_data = data
self.undo_ids = None
self._check = check
self.setText(f"add {item_type} items to {db_map.codename}")
[docs] def redo(self):
super().redo()
if self.undo_ids:
self.db_mngr.do_restore_items(self.db_map, self.item_type, self.undo_ids)
return
data = self.db_mngr.do_add_items(self.db_map, self.item_type, self.redo_data, check=self._check)
if not data:
self.setObsolete(True)
return
self.undo_ids = {x["id"] for x in data}
[docs] def undo(self):
super().undo()
self.db_mngr.do_remove_items(self.db_map, self.item_type, self.undo_ids, check=False)
[docs]class UpdateItemsCommand(SpineDBCommand):
def __init__(self, db_mngr, db_map, item_type, data, check=True, **kwargs):
"""
Args:
db_mngr (SpineDBManager): SpineDBManager instance
db_map (DiffDatabaseMapping): DiffDatabaseMapping instance
item_type (str): the item type
data (list): list of dict-items to update
"""
super().__init__(db_mngr, db_map, **kwargs)
if not data:
self.setObsolete(True)
self.item_type = item_type
self.redo_data = data
self.undo_data = [self.db_mngr.get_item(self.db_map, item_type, item["id"])._asdict() for item in data]
if self.redo_data == self.undo_data:
self.setObsolete(True)
self._check = check
self.setText(f"update {item_type} items in {db_map.codename}")
[docs] def redo(self):
super().redo()
self.redo_data = [
x._asdict()
for x in self.db_mngr.do_update_items(self.db_map, self.item_type, self.redo_data, check=self._check)
]
if not self.redo_data:
self.setObsolete(True)
return
self._check = False
[docs] def undo(self):
super().undo()
self.db_mngr.do_update_items(self.db_map, self.item_type, self.undo_data, check=False)
[docs]class AddUpdateItemsCommand(SpineDBCommand):
def __init__(self, db_mngr, db_map, item_type, data, check=True, **kwargs):
"""
Args:
db_mngr (SpineDBManager): SpineDBManager instance
db_map (DiffDatabaseMapping): DiffDatabaseMapping instance
item_type (str): the item type
data (list): list of dict-items to add-update
"""
super().__init__(db_mngr, db_map, **kwargs)
if not data:
self.setObsolete(True)
self.item_type = item_type
self.new_data = data
old_data = [x._asdict() for item in data if (x := self.db_map.get_item(item_type, **item))]
if self.new_data == old_data:
self.setObsolete(True)
self.old_data = {x["id"]: x for x in old_data}
self.redo_restore_ids = None
self.redo_update_data = None
self.undo_remove_ids = None
self.undo_update_data = None
self.setText(f"update {item_type} items in {db_map.codename}")
[docs] def redo(self):
super().redo()
if self.redo_restore_ids is None:
added, updated = self.db_mngr.do_add_update_items(self.db_map, self.item_type, self.new_data)
if not added and not updated:
self.setObsolete(True)
return
self.redo_restore_ids = {x["id"] for x in added}
self.redo_update_data = [x._asdict() for x in updated]
self.undo_remove_ids = {x["id"] for x in added}
self.undo_update_data = [self.old_data[id_] for id_ in {x["id"] for x in updated}]
return
if self.redo_restore_ids:
self.db_mngr.do_restore_items(self.db_map, self.item_type, self.redo_restore_ids)
if self.redo_update_data:
self.db_mngr.do_update_items(self.db_map, self.item_type, self.redo_update_data, check=False)
[docs] def undo(self):
super().undo()
if self.undo_remove_ids:
self.db_mngr.do_remove_items(self.db_map, self.item_type, self.undo_remove_ids, check=False)
if self.undo_update_data:
self.db_mngr.do_update_items(self.db_map, self.item_type, self.undo_update_data, check=False)
[docs]class RemoveItemsCommand(SpineDBCommand):
def __init__(self, db_mngr, db_map, item_type, ids, check=True, **kwargs):
"""
Args:
db_mngr (SpineDBManager): SpineDBManager instance
db_map (DiffDatabaseMapping): DiffDatabaseMapping instance
item_type (str): the item type
ids (set): set of ids to remove
"""
super().__init__(db_mngr, db_map, **kwargs)
if not ids:
self.setObsolete(True)
self.item_type = item_type
self.ids = ids
self._check = check
self.setText(f"remove {item_type} items from {db_map.codename}")
[docs] def redo(self):
super().redo()
items = self.db_mngr.do_remove_items(self.db_map, self.item_type, self.ids, check=self._check)
if not items:
self.setObsolete(True)
self.ids = {x["id"] for x in items}
self._check = False
[docs] def undo(self):
super().undo()
self.db_mngr.do_restore_items(self.db_map, self.item_type, self.ids)