Source code for spinetoolbox.execution_managers

######################################################################################################################
# Copyright (C) 2017-2022 Spine project consortium
# Copyright Spine Toolbox contributors
# This file is part of Spine Toolbox.
# Spine Toolbox is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option)
# any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
# Public License for more details. You should have received a copy of the GNU Lesser General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
######################################################################################################################

"""Classes to manage tool instance execution in various forms."""
import logging
from PySide6.QtCore import QObject, QProcess, Slot, Signal


[docs]class ExecutionManager(QObject): """Base class for all tool instance execution managers."""
[docs] execution_finished = Signal(int)
def __init__(self, logger): """Class constructor. Args: logger (LoggerInterface): a logger instance """ super().__init__() self._logger = logger # noinspection PyUnresolvedReferences
[docs] def start_execution(self, workdir=None): """Starts the execution. Args: workdir (str): Work directory """ raise NotImplementedError()
[docs] def stop_execution(self): """Stops the execution.""" raise NotImplementedError()
[docs]class QProcessExecutionManager(ExecutionManager): """Class to manage tool instance execution using a PySide6 QProcess.""" def __init__(self, logger, program="", args=None, silent=False, semisilent=False): """Class constructor. Args: logger (LoggerInterface): a logger instance program (str): Path to program to run in the subprocess (e.g. julia.exe) args (list, optional): List of argument for the program (e.g. path to script file) silent (bool): Whether or not to emit logger msg signals semisilent (bool): If True, show Process Log messages """ super().__init__(logger) self._program = program self._args = args if args is not None else [] self._silent = silent # Do not show Event Log nor Process Log messages self._semisilent = semisilent # Do not show Event Log messages but show Process Log messages self.process_failed = False self.process_failed_to_start = False self.user_stopped = False self._process = QProcess(self) self.process_output = None # stdout when running silent self.process_error = None # stderr when running silent self._out_chunks = [] self._err_chunks = []
[docs] def program(self): """Program getter method.""" return self._program
[docs] def args(self): """Program argument getter method.""" return self._args
# noinspection PyUnresolvedReferences
[docs] def start_execution(self, workdir=None): """Starts the execution of a command in a QProcess. Args: workdir (str, optional): Work directory """ if workdir is not None: self._process.setWorkingDirectory(workdir) self._process.started.connect(self.process_started) self._process.finished.connect(self.on_process_finished) if not self._silent and not self._semisilent: # Loud self._process.readyReadStandardOutput.connect(self.on_ready_stdout) self._process.readyReadStandardError.connect(self.on_ready_stderr) self._process.errorOccurred.connect(self.on_process_error) self._process.stateChanged.connect(self.on_state_changed) elif self._semisilent: # semi-silent self._process.readyReadStandardOutput.connect(self.on_ready_stdout) self._process.readyReadStandardError.connect(self.on_ready_stderr) self._process.start(self._program, self._args) if self._process is not None and not self._process.waitForStarted(msecs=10000): self.process_failed = True self.process_failed_to_start = True self._process.deleteLater() self._process = None self.execution_finished.emit(-9998)
[docs] def wait_for_process_finished(self, msecs=30000): """Wait for subprocess to finish. Args: msecs (int): Timeout in milliseconds Return: True if process finished successfully, False otherwise """ if self._process is None: return False if self.process_failed or self.process_failed_to_start: return False if not self._process.waitForFinished(msecs): self.process_failed = True self._process.close() self._process = None return False return True
@Slot()
[docs] def process_started(self): """Run when subprocess has started."""
@Slot(int)
[docs] def on_state_changed(self, new_state): """Runs when QProcess state changes. Args: new_state (int): Process state number (``QProcess::ProcessState``) """ if new_state == QProcess.Starting: self._logger.msg.emit("\tStarting program <b>{0}</b>".format(self._program)) arg_str = " ".join(self._args) self._logger.msg.emit("\tArguments: <b>{0}</b>".format(arg_str)) elif new_state == QProcess.Running: self._logger.msg_warning.emit("\tExecution in progress...") elif new_state == QProcess.NotRunning: # logging.debug("Process is not running") pass else: self._logger.msg_error.emit("Process is in an unspecified state") logging.error("QProcess unspecified state: %s", new_state)
@Slot(int)
[docs] def on_process_error(self, process_error): """Runs if there is an error in the running QProcess. Args: process_error (int): Process error number (``QProcess::ProcessError``) """ if process_error == QProcess.FailedToStart: self.process_failed = True self.process_failed_to_start = True self._logger.msg_error.emit("Process failed to start") elif process_error == QProcess.Timedout: self.process_failed = True self._logger.msg_error.emit("Timed out") elif process_error == QProcess.Crashed: self.process_failed = True if not self.user_stopped: self._logger.msg_error.emit("Process crashed") elif process_error == QProcess.WriteError: self._logger.msg_error.emit("Process WriteError") elif process_error == QProcess.ReadError: self._logger.msg_error.emit("Process ReadError") elif process_error == QProcess.UnknownError: self._logger.msg_error.emit("Unknown error in process") else: self._logger.msg_error.emit("Unspecified error in process: {0}".format(process_error)) self.teardown_process()
[docs] def teardown_process(self): """Tears down the QProcess in case a QProcess.ProcessError occurred. Emits execution_finished signal.""" if not self._process: pass else: out = str(self._process.readAllStandardOutput().data(), "utf-8", errors="replace") errout = str(self._process.readAllStandardError().data(), "utf-8", errors="replace") if out is not None: self._logger.msg_proc.emit(out.strip()) if errout is not None: self._logger.msg_proc.emit(errout.strip()) self._process.deleteLater() self._process = None self.execution_finished.emit(-9998)
[docs] def stop_execution(self): """See base class.""" self.user_stopped = True self.process_failed = True if not self._process: return try: self._process.kill() if not self._process.waitForFinished(5000): self._process.finished.emit(-1, -1) self._process.deleteLater() except Exception as ex: # pylint: disable=broad-except self._logger.msg_error.emit("[{0}] exception when terminating process".format(ex)) logging.exception("Exception in closing QProcess: %s", ex) finally: self._process = None
@Slot(int, int)
[docs] def on_process_finished(self, exit_code, exit_status): """Runs when subprocess has finished. Args: exit_code (int): Return code from external program (only valid for normal exits) exit_status (int): Crash or normal exit (``QProcess::ExitStatus``) """ if not self._process: return if exit_status == QProcess.CrashExit: if not self._silent: self._logger.msg_error.emit("\tProcess crashed") exit_code = -1 elif exit_status == QProcess.NormalExit: pass else: if not self._silent: self._logger.msg_error.emit("Unknown QProcess exit status [{0}]".format(exit_status)) exit_code = -1 if exit_code != 0: self.process_failed = True if not self.user_stopped: out = str(self._process.readAllStandardOutput().data(), "utf-8", errors="replace") errout = str(self._process.readAllStandardError().data(), "utf-8", errors="replace") if out is not None: if not self._silent: self._logger.msg_proc.emit(out.strip()) else: self.process_output = out.strip() self.process_error = errout.strip() else: self._logger.msg.emit("*** Terminating process ***") # Delete QProcess self._process.deleteLater() self._process = None self.execution_finished.emit(exit_code)
@Slot()
[docs] def on_ready_stdout(self): """Emit data from stdout.""" if not self._process: return self._process.setReadChannel(QProcess.StandardOutput) chunk = self._process.readLine().data() self._out_chunks.append(chunk) if not chunk.endswith(b"\n"): return line = b"".join(self._out_chunks) line = str(line, "utf-8", errors="replace").strip() self._logger.msg_proc.emit(line) self._out_chunks.clear()
@Slot()
[docs] def on_ready_stderr(self): """Emit data from stderr.""" if not self._process: return self._process.setReadChannel(QProcess.StandardError) chunk = self._process.readLine().data() self._err_chunks.append(chunk) if not chunk.endswith(b"\n"): return line = b"".join(self._err_chunks) line = str(line, "utf-8", errors="replace").strip() self._logger.msg_proc_error.emit(line) self._err_chunks.clear()