######################################################################################################################
# Copyright (C) 2017-2022 Spine project consortium
# Copyright Spine Toolbox contributors
# This file is part of Spine Items.
# Spine Items is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option)
# any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
# Public License for more details. You should have received a copy of the GNU Lesser General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
######################################################################################################################
"""Contains SpineEngineWorker."""
import copy
from PySide6.QtCore import Signal, Slot, QObject, QThread
from spine_engine.exception import EngineInitFailed, RemoteEngineInitFailed
from spine_engine.spine_engine import ItemExecutionFinishState, SpineEngineState
from spine_engine.utils.helpers import ExecutionDirection
from .widgets.options_dialog import OptionsDialog
from .spine_engine_manager import make_engine_manager, LocalSpineEngineManager
@Slot(list)
[docs]def _handle_dag_execution_started(project_items):
for item in project_items:
item.get_icon().execution_icon.mark_execution_waiting()
@Slot(list)
[docs]def _handle_node_execution_ignored(project_items):
for item in project_items:
item.get_icon().execution_icon.mark_execution_ignored()
@Slot(object, object)
[docs]def _handle_node_execution_started(item, direction):
icon = item.get_icon()
if direction == ExecutionDirection.FORWARD:
icon.execution_icon.mark_execution_started()
if hasattr(icon, "animation_signaller"):
icon.animation_signaller.animation_started.emit()
@Slot(object, object, object, object)
[docs]def _handle_node_execution_finished(item, direction, item_state):
icon = item.get_icon()
if direction == ExecutionDirection.FORWARD:
icon.execution_icon.mark_execution_finished(item_state)
if hasattr(icon, "animation_signaller"):
icon.animation_signaller.animation_stopped.emit()
@Slot(object, str, str)
[docs]def _handle_event_message_arrived(item, filter_id, msg_type, msg_text):
item.add_event_message(filter_id, msg_type, msg_text)
@Slot(object, str, str)
[docs]def _handle_process_message_arrived(item, filter_id, msg_type, msg_text):
item.add_process_message(filter_id, msg_type, msg_text)
@Slot(dict, object)
[docs]def _handle_prompt_arrived(prompt, engine_mngr, logger=None):
prompter_id = prompt["prompter_id"]
title, text, option_to_answer, notes, preferred = prompt["data"]
answer = OptionsDialog.get_answer(logger, title, text, option_to_answer, notes=notes, preferred=preferred)
engine_mngr.answer_prompt(prompter_id, answer)
@Slot(object)
[docs]def _handle_flash_arrived(connection):
connection.graphics_item.run_execution_animation()
@Slot(list)
[docs]def _mark_all_items_failed(items):
"""Fails all project items.
Args:
items (list of ProjectItem): project items
"""
for item in items:
icon = item.get_icon()
icon.execution_icon.mark_execution_finished(ItemExecutionFinishState.FAILURE)
if hasattr(icon, "animation_signaller"):
icon.animation_signaller.animation_stopped.emit()
[docs]class SpineEngineWorker(QObject):
[docs] _mark_items_ignored = Signal(list)
[docs] _dag_execution_started = Signal(list)
[docs] _node_execution_started = Signal(object, object)
[docs] _node_execution_finished = Signal(object, object, object)
[docs] _event_message_arrived = Signal(object, str, str, str)
[docs] _process_message_arrived = Signal(object, str, str, str)
[docs] _prompt_arrived = Signal(dict, object, object)
[docs] _flash_arrived = Signal(object)
[docs] _all_items_failed = Signal(list)
def __init__(self, engine_data, dag, dag_identifier, project_items, connections, logger, job_id):
"""
Args:
engine_data (dict): engine data
dag (DirectedGraphHandler):
dag_identifier (str):
project_items (dict): mapping from project item name to :class:`ProjectItem`
connections (dict): mapping from jump name to :class:`LoggingConnection` or :class:`LoggingJump`
logger (LoggerInterface): a logger
job_id (str): Job id for remote execution
"""
super().__init__()
self._engine_data = engine_data
exec_remotely = engine_data["settings"].get("engineSettings/remoteExecutionEnabled", "false") == "true"
self._job_id = job_id
self._engine_mngr = make_engine_manager(exec_remotely, job_id)
self.dag = dag
self.dag_identifier = dag_identifier
self._engine_final_state = "UNKNOWN"
self._executing_items = set()
self._project_items = project_items
self._connections = connections
self._logger = logger
self.event_messages = {}
self.process_messages = {}
self.successful_executions = []
self._thread = QThread()
self.moveToThread(self._thread)
self._thread.started.connect(self.do_work)
@property
[docs] def job_id(self):
return self._job_id
@property
[docs] def engine_data(self):
"""Engine data dictionary."""
return self._engine_data
[docs] def get_engine_data(self):
"""Returns the engine data. Together with ``self.set_engine_data()`` it can be used to modify
the workflow after it's initially created. We use it at the moment for creating Julia sysimages.
Returns:
dict
"""
return copy.deepcopy(self._engine_data)
[docs] def set_engine_data(self, engine_data):
"""Sets the engine data.
Args:
engine_data (dict): New data
"""
self._engine_data = engine_data
@Slot(object, str, str)
[docs] def _handle_event_message_arrived_silent(self, item, filter_id, msg_type, msg_text):
self.event_messages.setdefault(msg_type, []).append(msg_text)
@Slot(object, str, str)
[docs] def _handle_process_message_arrived_silent(self, item, filter_id, msg_type, msg_text):
self.process_messages.setdefault(msg_type, []).append(msg_text)
[docs] def stop_engine(self):
self._engine_mngr.stop_engine()
[docs] def engine_final_state(self):
return self._engine_final_state
[docs] def thread(self):
return self._thread
[docs] def _connect_log_signals(self, silent):
if silent:
self._event_message_arrived.connect(self._handle_event_message_arrived_silent)
self._process_message_arrived.connect(self._handle_process_message_arrived_silent)
return
self._mark_items_ignored.connect(_handle_node_execution_ignored)
self._dag_execution_started.connect(_handle_dag_execution_started)
self._node_execution_started.connect(_handle_node_execution_started)
self._node_execution_finished.connect(_handle_node_execution_finished)
self._event_message_arrived.connect(_handle_event_message_arrived)
self._process_message_arrived.connect(_handle_process_message_arrived)
self._prompt_arrived.connect(_handle_prompt_arrived)
self._flash_arrived.connect(_handle_flash_arrived)
[docs] def start(self, silent=False):
"""Connects log signals.
Args:
silent (bool, optional): If True, log messages are not forwarded to the loggers
but saved in internal dicts.
"""
self._connect_log_signals(silent)
self._all_items_failed.connect(_mark_all_items_failed)
included_items, ignored_items = self._included_and_ignored_items()
self._dag_execution_started.emit(included_items)
self._mark_items_ignored.emit(ignored_items)
self._thread.start()
[docs] def _included_and_ignored_items(self):
"""Returns two lists, where the first one contains project items that are about to be executed and the second
one contains project items that are about to be ignored."""
included = self._included_items(self.engine_data["execution_permits"], self.engine_data["connections"])
included_items = [item_dict for name, item_dict in self._project_items.items() if name in included]
ignored_items = [item_dict for name, item_dict in self._project_items.items() if name not in included]
return included_items, ignored_items
[docs] def _included_items(self, permitted_items, connections):
"""Collects a list of project item names that are going to be executed in this DAG
based on execution permits and connections in the DAG.
Args:
permitted_items (dict): Mapping of item names to bool. True items have been selected by user for execution.
connections (list): Serialized connections
Returns:
list: Project item names
"""
selected_items = [name for name, item_dict in self._project_items.items() if permitted_items[name]]
i_names = set() # Names of items that are connected to permitted items
for conn in connections:
src_item = conn["from"][0]
dst_item = conn["to"][0]
if src_item in selected_items or dst_item in selected_items:
i_names.add(src_item)
i_names.add(dst_item)
if not i_names: # Single node DAG with no connections
return selected_items
return list(i_names)
@Slot()
[docs] def do_work(self):
"""Does the work and emits finished when done."""
try:
self._engine_mngr.run_engine(self._engine_data)
except EngineInitFailed as error:
self._logger.msg_error.emit(f"Failed to start engine: {error}")
self._engine_final_state = str(SpineEngineState.FAILED)
self._all_items_failed.emit(list(self._project_items.values()))
self.finished.emit()
return
except RemoteEngineInitFailed as e:
self._logger.msg_error.emit(
f"Server is not responding. {e}. Check settings " f"in <b>File->Settings->Engine</b>."
)
self._engine_final_state = str(SpineEngineState.FAILED)
self._all_items_failed.emit(list(self._project_items.values()))
self.finished.emit()
return
while True:
event_type, data = self._engine_mngr.get_engine_event()
self._process_event(event_type, data)
if event_type == "dag_exec_finished":
self._engine_final_state = data
break
if event_type in ("remote_execution_init_failed", "server_init_failed"):
self._logger.msg_error.emit(f"{data}")
self._engine_final_state = str(SpineEngineState.FAILED)
self._all_items_failed.emit(list(self._project_items.values()))
break
self.finished.emit()
[docs] def _process_event(self, event_type, data):
handler = {
"exec_started": self._handle_node_execution_started,
"exec_finished": self._handle_node_execution_finished,
"event_msg": self._handle_event_msg,
"process_msg": self._handle_process_msg,
"standard_execution_msg": self._handle_standard_execution_msg,
"persistent_execution_msg": self._handle_persistent_execution_msg,
"kernel_execution_msg": self._handle_kernel_execution_msg,
"prompt": self._handle_prompt,
"flash": self._handle_flash,
"server_status_msg": self._handle_server_status_msg,
}.get(event_type)
if handler is None:
return
handler(data)
[docs] def _handle_prompt(self, prompt):
self._prompt_arrived.emit(prompt, self._engine_mngr, self._logger)
[docs] def _handle_flash(self, flash):
connection = self._connections[flash["item_name"]]
self._flash_arrived.emit(connection)
[docs] def _handle_standard_execution_msg(self, msg):
item = self._project_items[msg["item_name"]]
if msg["type"] == "execution_failed_to_start":
msg_text = f"Program <b>{msg['program']}</b> failed to start: {msg['error']}"
self._event_message_arrived.emit(item, msg["filter_id"], "msg_error", msg_text)
elif msg["type"] == "execution_started":
self._event_message_arrived.emit(
item, msg["filter_id"], "msg", f"\tStarting program <b>{msg['program']}</b>"
)
self._event_message_arrived.emit(item, msg["filter_id"], "msg", f"\tArguments: <b>{msg['args']}</b>")
self._event_message_arrived.emit(
item, msg["filter_id"], "msg_warning", "\tExecution is in progress. See messages below (stdout&stderr)"
)
[docs] def _handle_persistent_execution_msg(self, msg):
item = self._project_items.get(msg["item_name"]) or self._connections.get(msg["item_name"])
msg_type = msg["type"]
if msg_type == "persistent_started":
self._logger.persistent_console_requested.emit(item, msg["filter_id"], msg["key"], msg["language"])
elif msg_type == "persistent_failed_to_start":
msg_text = (
f"Unable to start persistent process <b>{msg['args']}</b>: {msg['error']}."
"Please go to Settings->Tools and check your setup."
)
self._event_message_arrived.emit(item, msg["filter_id"], "msg_error", msg_text)
elif msg_type == "persistent_killed":
self._logger.persistent_killed(item, msg["filter_id"])
elif msg_type == "stdin":
self._logger.add_persistent_stdin(item, msg["filter_id"], msg["data"])
elif msg_type == "stdout":
self._logger.add_persistent_stdout(item, msg["filter_id"], msg["data"])
elif msg_type == "stderr":
self._logger.add_persistent_stderr(item, msg["filter_id"], msg["data"])
elif msg_type == "execution_started":
self._event_message_arrived.emit(
item, msg["filter_id"], "msg", f"*** Starting execution on persistent process <b>{msg['args']}</b> ***"
)
self._event_message_arrived.emit(item, msg["filter_id"], "msg_warning", "See Console for messages")
else:
raise RuntimeError(f"Logic error: unknown persistent execution msg_type '{msg_type}'")
[docs] def _handle_kernel_execution_msg(self, msg):
item = self._project_items[msg["item_name"]] or self._connections.get(msg["item_name"])
if msg["type"] == "kernel_started":
self._logger.jupyter_console_requested.emit(
item,
msg["filter_id"],
msg["kernel_name"],
msg["connection_file"],
msg.get("connection_file_dict", dict()),
)
elif msg["type"] == "kernel_spec_not_found":
msg_text = (
f"Unable to find kernel spec <b>{msg['kernel_name']}</b>"
"<br/>For Python Tools, select a kernel spec in the Tool specification editor."
"<br/>For Julia Tools, select a kernel spec from File->Settings->Tools."
)
self._event_message_arrived.emit(item, msg["filter_id"], "msg_error", msg_text)
elif msg["type"] == "conda_not_found":
msg_text = (
f"{msg['error']}<br/>Couldn't call Conda. Set up <b>Conda executable</b> "
f"in <b>File->Settings->Tools</b>."
)
self._event_message_arrived.emit(item, msg["filter_id"], "msg_error", msg_text)
elif msg["type"] == "execution_failed_to_start":
msg_text = f"Execution on kernel <b>{msg['kernel_name']}</b> failed to start: {msg['error']}"
self._event_message_arrived.emit(item, msg["filter_id"], "msg_error", msg_text)
elif msg["type"] == "kernel_spec_exe_not_found":
msg_text = (
f"Invalid kernel spec ({msg['kernel_name']}). File <b>{msg['kernel_exe_path']}</b> " f"does not exist."
)
self._event_message_arrived.emit(item, msg["filter_id"], "msg_error", msg_text)
elif msg["type"] == "execution_started":
self._event_message_arrived.emit(
item, msg["filter_id"], "msg", f"*** Starting execution on kernel spec <b>{msg['kernel_name']}</b> ***"
)
self._event_message_arrived.emit(item, msg["filter_id"], "msg_warning", "See Console for messages")
elif msg["type"] == "kernel_shutdown":
self._logger.kernel_shutdown.emit(item, msg["filter_id"])
[docs] def _handle_process_msg(self, data):
self._do_handle_process_msg(**data)
[docs] def _do_handle_process_msg(self, item_name, filter_id, msg_type, msg_text):
item = self._project_items.get(item_name) or self._connections.get(item_name)
self._process_message_arrived.emit(item, filter_id, msg_type, msg_text)
[docs] def _handle_event_msg(self, data):
self._do_handle_event_msg(**data)
[docs] def _do_handle_event_msg(self, item_name, filter_id, msg_type, msg_text):
item = self._project_items.get(item_name) or self._connections.get(item_name)
self._event_message_arrived.emit(item, filter_id, msg_type, msg_text)
[docs] def _handle_node_execution_started(self, data):
self._do_handle_node_execution_started(**data)
[docs] def _do_handle_node_execution_started(self, item_name, direction):
"""Starts item icon animation when executing forward."""
item = self._project_items[item_name]
self._executing_items.add(item)
self._node_execution_started.emit(item, direction)
[docs] def _handle_node_execution_finished(self, data):
self._do_handle_node_execution_finished(**data)
[docs] def _do_handle_node_execution_finished(self, item_name, direction, state, item_state):
item = self._project_items[item_name]
if item_state == ItemExecutionFinishState.SUCCESS:
self.successful_executions.append((item, direction, state))
self._executing_items.discard(item)
# NOTE: A single item may seemingly finish multiple times
# when the execution is stopped by user during filtered execution.
self._node_execution_finished.emit(item, direction, item_state)
[docs] def _handle_server_status_msg(self, data):
if data["msg_type"] == "success":
self._logger.msg_success.emit(data["text"])
elif data["msg_type"] == "neutral":
self._logger.msg.emit(data["text"])
elif data["msg_type"] == "fail":
self._logger.msg_error.emit(data["text"])
elif data["msg_type"] == "warning":
self._logger.msg_warning.emit(data["text"])
[docs] def clean_up(self):
for item in self._executing_items:
self._node_execution_finished.emit(item, None, None)
if isinstance(self._engine_mngr, LocalSpineEngineManager):
self._engine_mngr.stop_engine()
else:
self._engine_mngr.clean_up()
self._thread.quit()
self._thread.wait()
self._thread.deleteLater()
self.deleteLater()