Source code for spinetoolbox.project_items.combiner.executable_item

######################################################################################################################
# Copyright (C) 2017-2020 Spine project consortium
# This file is part of Spine Toolbox.
# Spine Toolbox is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option)
# any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
# Public License for more details. You should have received a copy of the GNU Lesser General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
######################################################################################################################

"""
Contains Combiner's executable item as well as support utilities.

:authors: A. Soininen (VTT)
:date:   2.4.2020
"""

import os
import pathlib
from PySide2.QtCore import QObject, QEventLoop, Slot, QThread
from spinetoolbox.executable_item_base import ExecutableItemBase
from spinetoolbox.helpers import shorten
from .item_info import ItemInfo
from .combiner_worker import CombinerWorker


[docs]class ExecutableItem(ExecutableItemBase, QObject): def __init__(self, name, logs_dir, cancel_on_error, logger): """ Args: name (str): item's name logs_dir (str): path to the directory where logs should be stored cancel_on_error (bool): if True, revert changes on error and move on logger (LoggerInterface): a logger """ ExecutableItemBase.__init__(self, name, logger) QObject.__init__(self) self._resources_from_downstream = list() self._logs_dir = logs_dir self._cancel_on_error = cancel_on_error self._worker = None self._worker_thread = None self._loop = None self._merge_succeeded = False @staticmethod
[docs] def item_type(): """Returns Combiner's type identifier string.""" return ItemInfo.item_type()
@classmethod
[docs] def from_dict(cls, item_dict, name, project_dir, app_settings, specifications, logger): """See base class.""" data_dir = pathlib.Path(project_dir, ".spinetoolbox", "items", shorten(name)) logs_dir = os.path.join(data_dir, "logs") cancel_on_error = item_dict["cancel_on_error"] return cls(name, logs_dir, cancel_on_error, logger)
[docs] def stop_execution(self): """Stops execution.""" super().stop_execution() if self._loop: if self._loop.isRunning(): self._loop.exit(-1) self._loop = None if self._worker: self._worker.deleteLater() self._worker = None if self._worker_thread: self._worker_thread.quit() self._worker_thread.wait() self._worker_thread = None
[docs] def _execute_backward(self, resources): """See base class.""" self._resources_from_downstream = resources.copy() return True
@staticmethod
[docs] def _urls_from_resources(resources): return [r.url for r in resources if r.type_ == "database"]
[docs] def _execute_forward(self, resources): """See base class.""" from_urls = self._urls_from_resources(resources) to_urls = self._urls_from_resources(self._resources_from_downstream) if not from_urls: self._logger.msg_warning.emit("No input database(s) available. Moving on...") return True elif not to_urls: self._logger.msg_warning.emit("No output database available. Moving on...") return True self._destroy_current_worker() self._loop = QEventLoop() self._worker = CombinerWorker(from_urls, to_urls, self._logs_dir, self._cancel_on_error, self._logger) self._worker_thread = QThread() self._worker.moveToThread(self._worker_thread) self._worker.finished.connect(self._handle_worker_finished) self._worker.finished.connect(self._loop.quit) self._worker_thread.started.connect(self._worker.do_work) self._worker_thread.start() loop_retval = self._loop.exec_() if loop_retval: # If retval is not 0, loop exited with nonzero return value. Should happen when # user stops execution self._logger.msg_error.emit(f"Combiner {self.name} stopped") return False else: self._logger.msg_success.emit(f"Executing Combiner {self.name} finished") return self._merge_succeeded
[docs] def _handle_worker_finished(self): """Runs when Combiner worker has finished.""" self._destroy_current_worker() self._merge_succeeded = True
@Slot()
[docs] def _destroy_current_worker(self): """Runs when starting execution and after worker has finished. Destroys current loop, worker and quits thread, if any. """ if self._loop: self._loop.deleteLater() self._loop = None if self._worker: self._worker.deleteLater() self._worker = None if self._worker_thread: self._worker_thread.quit() self._worker_thread.wait() self._worker_thread = None