######################################################################################################################
# Copyright (C) 2017-2021 Spine project consortium
# This file is part of Spine Items.
# Spine Items is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option)
# any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
# Public License for more details. You should have received a copy of the GNU Lesser General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
######################################################################################################################
"""
Contains SpineEngineWorker.
:authors: M. Marin (KTH)
:date: 14.10.2020
"""
import copy
from PySide2.QtCore import Signal, Slot, QObject, QThread
from .spine_engine_manager import make_engine_manager
from spine_engine.spine_engine import ItemExecutionFinishState
@Slot(list)
[docs]def _handle_dag_execution_started(project_items):
for item in project_items:
item.get_icon().execution_icon.mark_execution_waiting()
@Slot(object, object)
[docs]def _handle_node_execution_started(item, direction):
icon = item.get_icon()
if direction == "FORWARD":
icon.execution_icon.mark_execution_started()
if hasattr(icon, "animation_signaller"):
icon.animation_signaller.animation_started.emit()
@Slot(object, object, object, object)
[docs]def _handle_node_execution_finished(item, direction, state, item_state):
icon = item.get_icon()
if direction == "FORWARD":
icon.execution_icon.mark_execution_finished(item_state)
if hasattr(icon, "animation_signaller"):
icon.animation_signaller.animation_stopped.emit()
if state == "RUNNING":
excluded = True if item_state == ItemExecutionFinishState.EXCLUDED else False
icon.run_execution_leave_animation(excluded)
@Slot(object, str, str)
[docs]def _handle_event_message_arrived(item, filter_id, msg_type, msg_text):
item.add_event_message(filter_id, msg_type, msg_text)
@Slot(object, str, str)
[docs]def _handle_process_message_arrived(item, filter_id, msg_type, msg_text):
item.add_process_message(filter_id, msg_type, msg_text)
[docs]class SpineEngineWorker(QObject):
[docs] _dag_execution_started = Signal(list)
[docs] _node_execution_started = Signal(object, object)
[docs] _node_execution_finished = Signal(object, object, object, object)
[docs] _event_message_arrived = Signal(object, str, str, str)
[docs] _process_message_arrived = Signal(object, str, str, str)
def __init__(self, engine_server_address, engine_data, dag, dag_identifier, project_items):
"""
Args:
engine_server_address (str): Address of engine server if any
engine_data (dict): engine data
dag (DirectedGraphHandler)
dag_identifier (str)
project_items (dict): mapping from project item name to :class:`ProjectItem`
"""
super().__init__()
self._engine_data = engine_data
self._engine_mngr = make_engine_manager(engine_server_address)
self.dag = dag
self.dag_identifier = dag_identifier
self._engine_final_state = "UNKNOWN"
self._executing_items = []
self._project_items = project_items
self.event_messages = {}
self.process_messages = {}
self.successful_executions = []
self._thread = QThread()
self.moveToThread(self._thread)
self._thread.started.connect(self.do_work)
@property
[docs] def engine_data(self):
"""Engine data dictionary."""
return self._engine_data
[docs] def get_engine_data(self):
"""Returns the engine data. Together with ``self.set_engine_data()`` it can be used to modify
the workflow after it's initially created. We use it at the moment for creating Julia sysimages.
Returns:
dict
"""
return copy.deepcopy(self._engine_data)
[docs] def set_engine_data(self, engine_data):
"""Sets the engine data.
Args:
engine_data (dict): New data
"""
self._engine_data = engine_data
@Slot(object, str, str)
[docs] def _handle_event_message_arrived(self, item, filter_id, msg_type, msg_text):
self.event_messages.setdefault(msg_type, []).append(msg_text)
@Slot(object, str, str)
[docs] def _handle_process_message_arrived(self, item, filter_id, msg_type, msg_text):
self.process_messages.setdefault(msg_type, []).append(msg_text)
[docs] def stop_engine(self):
self._engine_mngr.stop_engine()
[docs] def engine_final_state(self):
return self._engine_final_state
[docs] def thread(self):
return self._thread
[docs] def _connect_log_signals(self, silent):
if silent:
self._event_message_arrived.connect(self._handle_event_message_arrived)
self._process_message_arrived.connect(self._handle_process_message_arrived)
return
self._dag_execution_started.connect(_handle_dag_execution_started)
self._node_execution_started.connect(_handle_node_execution_started)
self._node_execution_finished.connect(_handle_node_execution_finished)
self._event_message_arrived.connect(_handle_event_message_arrived)
self._process_message_arrived.connect(_handle_process_message_arrived)
[docs] def start(self, silent=False):
"""Connects log signals.
Args:
silent (bool, optional): If True, log messages are not forwarded to the loggers
but saved in internal dicts.
"""
self._connect_log_signals(silent)
self._dag_execution_started.emit(list(self._project_items.values()))
self._thread.start()
@Slot()
[docs] def do_work(self):
"""Does the work and emits finished when done."""
self._engine_mngr.run_engine(self._engine_data)
while True:
event_type, data = self._engine_mngr.get_engine_event()
self._process_event(event_type, data)
if event_type == "dag_exec_finished":
self._engine_final_state = data
break
self.finished.emit()
[docs] def _process_event(self, event_type, data):
handler = {
"exec_started": self._handle_node_execution_started,
"exec_finished": self._handle_node_execution_finished,
"event_msg": self._handle_event_msg,
"process_msg": self._handle_process_msg,
"standard_execution_msg": self._handle_standard_execution_msg,
"kernel_execution_msg": self._handle_kernel_execution_msg,
}.get(event_type)
if handler is None:
return
handler(data)
[docs] def _handle_standard_execution_msg(self, msg):
item = self._project_items[msg["item_name"]]
if msg["type"] == "execution_failed_to_start":
msg_text = f"Program <b>{msg['program']}</b> failed to start: {msg['error']}"
self._event_message_arrived.emit(item, msg["filter_id"], "msg_error", msg_text)
elif msg["type"] == "execution_started":
self._event_message_arrived.emit(
item, msg["filter_id"], "msg", f"\tStarting program <b>{msg['program']}</b>"
)
self._event_message_arrived.emit(item, msg["filter_id"], "msg", f"\tArguments: <b>{msg['args']}</b>")
self._event_message_arrived.emit(
item, msg["filter_id"], "msg_warning", "\tExecution is in progress. See messages below (stdout&stderr)"
)
[docs] def _handle_kernel_execution_msg(self, msg):
item = self._project_items[msg["item_name"]]
language = msg["language"].capitalize()
if msg["type"] == "kernel_started":
console_requested_signal = {
"julia": item.julia_console_requested,
"python": item.python_console_requested,
}.get(msg["language"])
if console_requested_signal is not None:
console_requested_signal.emit(msg["filter_id"], msg["kernel_name"], msg["connection_file"])
elif msg["type"] == "kernel_spec_not_found":
msg_text = (
f"\tUnable to find specification for {language} kernel <b>{msg['kernel_name']}</b>. "
f"Go to Settings->Tools to select a valid {language} kernel."
)
self._event_message_arrived.emit(item, msg["filter_id"], "msg_error", msg_text)
elif msg["type"] == "execution_failed_to_start":
msg_text = f"\tExecution on {language} kernel <b>{msg['kernel_name']}</b> failed to start: {msg['error']}"
self._event_message_arrived.emit(item, msg["filter_id"], "msg_error", msg_text)
elif msg["type"] == "execution_started":
self._event_message_arrived.emit(
item, msg["filter_id"], "msg", f"\tStarting program on {language} kernel <b>{msg['kernel_name']}</b>"
)
self._event_message_arrived.emit(
item, msg["filter_id"], "msg_warning", f"See {language} Console for messages."
)
[docs] def _handle_process_msg(self, data):
self._do_handle_process_msg(**data)
[docs] def _do_handle_process_msg(self, item_name, filter_id, msg_type, msg_text):
item = self._project_items[item_name]
self._process_message_arrived.emit(item, filter_id, msg_type, msg_text)
[docs] def _handle_event_msg(self, data):
self._do_handle_event_msg(**data)
[docs] def _do_handle_event_msg(self, item_name, filter_id, msg_type, msg_text):
item = self._project_items[item_name]
self._event_message_arrived.emit(item, filter_id, msg_type, msg_text)
[docs] def _handle_node_execution_started(self, data):
self._do_handle_node_execution_started(**data)
[docs] def _do_handle_node_execution_started(self, item_name, direction):
"""Starts item icon animation when executing forward."""
item = self._project_items[item_name]
self._executing_items.append(item)
self._node_execution_started.emit(item, direction)
[docs] def _handle_node_execution_finished(self, data):
self._do_handle_node_execution_finished(**data)
[docs] def _do_handle_node_execution_finished(self, item_name, direction, state, item_state):
item = self._project_items[item_name]
if item_state == ItemExecutionFinishState.SUCCESS:
self.successful_executions.append((item, direction, state))
self._executing_items.remove(item)
self._node_execution_finished.emit(item, direction, state, item_state)
[docs] def clean_up(self):
for item in self._executing_items:
self._node_execution_finished.emit(item, None, None, None)
self._thread.quit()
self._thread.wait()
self._thread.deleteLater()
self.deleteLater()