Source code for spinetoolbox.project_items.importer.executable_item

######################################################################################################################
# Copyright (C) 2017-2020 Spine project consortium
# This file is part of Spine Toolbox.
# Spine Toolbox is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option)
# any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
# Public License for more details. You should have received a copy of the GNU Lesser General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
######################################################################################################################

"""
Contains Importer's executable item as well as support utilities.

:authors: A. Soininen (VTT)
:date:   1.4.2020
"""
import os
import pathlib
from PySide2.QtCore import QObject, QEventLoop, Signal, Slot, QThread
from spinetoolbox.executable_item_base import ExecutableItemBase
from spinetoolbox.spine_io.gdx_utils import find_gams_directory
from spinetoolbox.helpers import shorten
from .importer_worker import ImporterWorker
from .item_info import ItemInfo
from .utils import deserialize_mappings


[docs]class ExecutableItem(ExecutableItemBase, QObject):
[docs] importing_finished = Signal()
"""Emitted after import thread has finished.""" def __init__(self, name, settings, logs_dir, gams_path, cancel_on_error, logger): """ Args: name (str): Importer's name settings (dict): import mappings logs_dir (str): path to the directory where logs should be stored gams_path (str): path to system's GAMS executable or empty string for the default path cancel_on_error (bool): if True, revert changes on error and quit logger (LoggerInterface): a logger """ ExecutableItemBase.__init__(self, name, logger) QObject.__init__(self) self._settings = settings self._logs_dir = logs_dir self._gams_path = gams_path self._cancel_on_error = cancel_on_error self._resources_from_downstream = list() self._worker = None self._worker_thread = None self._worker_succeeded = None self._loop = None @staticmethod
[docs] def item_type(): """Returns ImporterExecutable's type identifier string.""" return ItemInfo.item_type()
[docs] def stop_execution(self): """Stops execution.""" super().stop_execution() if self._loop: if self._loop.isRunning(): self._loop.exit(-1) self._loop = None if self._worker: self._worker.deleteLater() self._worker = None if self._worker_thread: self._worker_thread.quit() self._worker_thread.wait() self._worker_thread = None
[docs] def _execute_backward(self, resources): """See base class.""" self._resources_from_downstream = resources.copy() return True
[docs] def _execute_forward(self, resources): """See base class.""" if not self._settings: return True absolute_paths = _files_from_resources(resources) absolute_path_settings = dict() for label in self._settings: absolute_path = absolute_paths.get(label) if absolute_path is not None: absolute_path_settings[absolute_path] = self._settings[label] source_settings = {"GdxConnector": {"gams_directory": self._gams_system_directory()}} self._destroy_current_worker() self._loop = QEventLoop() self._worker = ImporterWorker( list(absolute_paths.values()), absolute_path_settings, source_settings, [r.url for r in self._resources_from_downstream if r.type_ == "database"], self._logs_dir, self._cancel_on_error, self._logger, ) self._worker_thread = QThread() self._worker.moveToThread(self._worker_thread) self._worker.import_finished.connect(self._handle_worker_finished) self._worker.import_finished.connect(self._loop.quit) self._worker_thread.started.connect(self._worker.do_work) self._worker_thread.start() loop_retval = self._loop.exec_() if loop_retval: # If retval is not 0, loop exited with nonzero return value. Should happen when # user stops execution self._logger.msg_error.emit(f"Importer {self.name} stopped") self._worker_succeeded = -1 return self._worker_succeeded if not self._worker_succeeded: self._logger.msg_error.emit(f"Executing Importer {self.name} failed") else: self._logger.msg_success.emit(f"Executing Importer {self.name} finished") return self._worker_succeeded
@Slot(int)
[docs] def _handle_worker_finished(self, exit_code): self._worker_succeeded = exit_code == 0 self._destroy_current_worker()
[docs] def _destroy_current_worker(self): """Runs before starting execution and after worker finishes. Destroys current worker and quits thread if present. """ if self._loop: self._loop.deleteLater() self._loop = None if self._worker: self._worker.deleteLater() self._worker = None if self._worker_thread: self._worker_thread.quit() self._worker_thread.wait() self._worker_thread = None
[docs] def _gams_system_directory(self): """Returns GAMS system path or None if GAMS default is to be used.""" path = self._gams_path if not path: path = find_gams_directory() if path is not None and os.path.isfile(path): path = os.path.dirname(path) return path
@classmethod
[docs] def from_dict(cls, item_dict, name, project_dir, app_settings, specifications, logger): """See base class.""" settings = deserialize_mappings(item_dict["mappings"], project_dir) data_dir = pathlib.Path(project_dir, ".spinetoolbox", "items", shorten(name)) logs_dir = os.path.join(data_dir, "logs") gams_path = app_settings.value("appSettings/gamsPath", defaultValue=None) cancel_on_error = item_dict["cancel_on_error"] return cls(name, settings, logs_dir, gams_path, cancel_on_error, logger)
[docs]def _files_from_resources(resources): """Returns a list of files available in given resources.""" files = dict() for resource in resources: if resource.type_ == "file": files[resource.path] = resource.path elif resource.type_ == "transient_file": files[resource.metadata["label"]] = resource.path return files