######################################################################################################################
# Copyright (C) 2017-2022 Spine project consortium
# Copyright Spine Toolbox contributors
# This file is part of Spine Items.
# Spine Items is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option)
# any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
# Public License for more details. You should have received a copy of the GNU Lesser General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
######################################################################################################################
"""Contains SpineEngineManagerBase."""
import queue
import threading
import json
from spine_engine.exception import RemoteEngineInitFailed
from spine_engine.server.util.event_data_converter import EventDataConverter
from spinetoolbox.server.engine_client import EngineClient, ClientSecurityModel
[docs]class SpineEngineManagerBase:
[docs] def run_engine(self, engine_data):
"""Runs an engine with given data.
Args:
engine_data (dict): The engine data.
"""
raise NotImplementedError()
[docs] def get_engine_event(self):
"""Gets next event from a running engine.
Returns:
tuple(str,dict): two element tuple: event type identifier string, and event data dictionary
"""
raise NotImplementedError()
[docs] def stop_engine(self):
"""Stops a running engine."""
raise NotImplementedError()
[docs] def answer_prompt(self, prompter_id, answer):
"""Answers prompt.
Args:
prompter_id (int): The id of the prompter
answer: The user's decision.
"""
raise NotImplementedError()
[docs] def restart_kernel(self, connection_file):
"""Restarts the jupyter kernel associated to given connection file.
Args:
connection_file (str): path of connection file
"""
raise NotImplementedError()
[docs] def shutdown_kernel(self, connection_file):
"""Shuts down the jupyter kernel associated to given connection file.
Args:
connection_file (str): path of connection file
"""
raise NotImplementedError()
[docs] def issue_persistent_command(self, persistent_key, command):
"""Issues a command to a persistent process.
Args:
persistent_key (tuple): persistent identifier
command (str): command to issue
Returns:
generator: stdin, stdout, and stderr messages (dictionaries with two keys: type, and data)
"""
raise NotImplementedError()
[docs] def is_persistent_command_complete(self, persistent_key, command):
"""Checks whether a command is complete.
Args:
key (tuple): persistent identifier
cmd (str): command to issue
Returns:
bool
"""
raise NotImplementedError()
[docs] def restart_persistent(self, persistent_key):
"""Restarts a persistent process.
Args:
persistent_key (tuple): persistent identifier
Returns:
generator: stdout and stderr messages (dictionaries with two keys: type, and data)
"""
raise NotImplementedError()
[docs] def interrupt_persistent(self, persistent_key):
"""Interrupts a persistent process.
Args:
persistent_key (tuple): persistent identifier
"""
raise NotImplementedError()
[docs] def kill_persistent(self, persistent_key):
"""Kills a persistent process.
Args:
persistent_key (tuple): persistent identifier
"""
raise NotImplementedError()
[docs] def get_persistent_completions(self, persistent_key, text):
"""Returns a list of auto-completion options from given text.
Args:
persistent_key (tuple): persistent identifier
text (str): text to complete
Returns:
list of str
"""
raise NotImplementedError()
[docs] def get_persistent_history_item(self, persistent_key, text, prefix, backwards):
"""Returns an item from persistent history.
Args:
persistent_key (tuple): persistent identifier
Returns:
str: history item or empty string if none
"""
raise NotImplementedError()
[docs]class LocalSpineEngineManager(SpineEngineManagerBase):
def __init__(self):
super().__init__()
self._engine = None
[docs] def run_engine(self, engine_data):
from spine_engine.spine_engine import SpineEngine # pylint: disable=import-outside-toplevel
self._engine = SpineEngine(**engine_data)
[docs] def get_engine_event(self):
return self._engine.get_event()
[docs] def stop_engine(self):
if self._engine is not None:
self._engine.stop()
[docs] def answer_prompt(self, prompter_id, answer):
self._engine.answer_prompt(prompter_id, answer)
[docs] def restart_kernel(self, connection_file):
# pylint: disable=import-outside-toplevel
from spine_engine.execution_managers.kernel_execution_manager import restart_kernel_manager
return restart_kernel_manager(connection_file)
[docs] def shutdown_kernel(self, connection_file):
# pylint: disable=import-outside-toplevel
from spine_engine.execution_managers.kernel_execution_manager import shutdown_kernel_manager
return shutdown_kernel_manager(connection_file)
[docs] def kernel_managers(self):
# pylint: disable=import-outside-toplevel
from spine_engine.execution_managers.kernel_execution_manager import n_kernel_managers
return n_kernel_managers()
[docs] def issue_persistent_command(self, persistent_key, command):
# pylint: disable=import-outside-toplevel
from spine_engine.execution_managers.persistent_execution_manager import issue_persistent_command
yield from issue_persistent_command(persistent_key, command)
[docs] def is_persistent_command_complete(self, persistent_key, command):
# pylint: disable=import-outside-toplevel
from spine_engine.execution_managers.persistent_execution_manager import is_persistent_command_complete
return is_persistent_command_complete(persistent_key, command)
[docs] def restart_persistent(self, persistent_key):
# pylint: disable=import-outside-toplevel
from spine_engine.execution_managers.persistent_execution_manager import restart_persistent
yield from restart_persistent(persistent_key)
[docs] def interrupt_persistent(self, persistent_key):
# pylint: disable=import-outside-toplevel
from spine_engine.execution_managers.persistent_execution_manager import interrupt_persistent
interrupt_persistent(persistent_key)
[docs] def kill_persistent(self, persistent_key):
# pylint: disable=import-outside-toplevel
from spine_engine.execution_managers.persistent_execution_manager import kill_persistent
kill_persistent(persistent_key)
[docs] def get_persistent_completions(self, persistent_key, text):
# pylint: disable=import-outside-toplevel
from spine_engine.execution_managers.persistent_execution_manager import get_persistent_completions
return get_persistent_completions(persistent_key, text)
[docs] def get_persistent_history_item(self, persistent_key, text, prefix, backwards):
# pylint: disable=import-outside-toplevel
from spine_engine.execution_managers.persistent_execution_manager import get_persistent_history_item
return get_persistent_history_item(persistent_key, text, prefix, backwards)
[docs]class RemoteSpineEngineManager(SpineEngineManagerBase):
"""Responsible for remote project execution."""
def __init__(self, job_id=""):
"""Initializer."""
super().__init__()
self._runner = threading.Thread(name="RemoteSpineEngineManagerRunnerThread", target=self._run)
self._engine_data = None
self.engine_client = None
self.job_id = job_id # Job Id of ProjectExtractionService for finding the extracted project on server
self.exec_job_id = "" # Job Id of RemoteExecutionService for stopping the execution
self.q = queue.Queue() # Queue for sending events forward to SpineEngineWorker
[docs] def make_engine_client(self, host, port, security, sec_folder, ping=True):
"""Creates a client for connecting to Spine Engine Server."""
try:
self.engine_client = EngineClient(host, port, security, sec_folder, ping)
except RemoteEngineInitFailed:
raise
except Exception:
raise
[docs] def run_engine(self, engine_data):
"""Makes an engine client for communicating with the engine server.
Starts a thread for monitoring the DAG execution on server.
Args:
engine_data (dict): The engine data.
"""
app_settings = engine_data["settings"]
host = app_settings.get("engineSettings/remoteHost", "") # Host name
port = app_settings.get("engineSettings/remotePort", "49152") # Host port
sec_model = app_settings.get("engineSettings/remoteSecurityModel", "") # ZQM security model
security = ClientSecurityModel.NONE if not sec_model else ClientSecurityModel.STONEHOUSE
sec_folder = (
"" if security == ClientSecurityModel.NONE else app_settings.get("engineSettings/remoteSecurityFolder", "")
)
self.make_engine_client(host, port, security, sec_folder)
self._engine_data = engine_data
self._runner.start()
[docs] def get_engine_event(self):
"""Returns the next engine execution event."""
return self.q.get()
[docs] def clean_up(self):
"""Closes EngineClient and joins _runner thread if still active."""
self.engine_client.close()
if self._runner.is_alive():
self._runner.join()
[docs] def stop_engine(self):
"""Sends a request to stop execution on Server then waits for _runner thread to end."""
if self._runner.is_alive():
self.engine_client.stop_execution(self.exec_job_id)
self._runner.join()
[docs] def _run(self):
"""Sends a start execution request to server with the job Id.
Sets up a subscribe socket according to the publish port received from server.
Passes received events to SpineEngineWorker for processing. After execution
has finished, downloads new files from server.
"""
self.engine_client.client_project_dir = self._engine_data["project_dir"]
engine_data_json = json.dumps(self._engine_data) # Transform dictionary to JSON string
# Send request to server, and wait for an execution started response containing the publish port
start_response_data = self.engine_client.start_execution(engine_data_json, self.job_id)
if start_response_data[0] != "remote_execution_started":
# Initializing the server for execution failed. 'remote_execution_init_failed' and 'server_init_failed'
# are handled in SpineEngineWorker.
self.q.put(
(
"server_status_msg",
{
"msg_type": "fail",
"text": f"Server init failed: event_type:{start_response_data[0]} "
f"data:{start_response_data[1]}. Aborting.",
},
)
)
self.q.put(start_response_data)
return
self.exec_job_id = start_response_data[2] # Needed for stopping DAG execution on server
self.engine_client.connect_pull_socket(start_response_data[1])
while True: # Pull events until dag_exec_finished event
rcv = self.engine_client.rcv_next("pull")
event = EventDataConverter.deconvert(*rcv) # Unpack list
if event[0] == "dag_exec_finished":
# Download all files before sending 'dag_exec_finished' to SpineEngineWorker
# because it will destroy this thread before the file transfers have finished.
if event[1] == "COMPLETED":
self.engine_client.download_files(self.q)
t = self.engine_client.get_elapsed_time()
self.q.put(("server_status_msg", {"msg_type": "success", "text": f"Execution time: {t}"}))
self.q.put(event)
break
elif event[0] == "server_execution_error":
# spine engine raised an exception during execution
self.q.put(("server_status_msg", {"msg_type": "fail", "text": f"{event[0]: {event[1]}}"}))
break
else:
self.q.put(event)
self.engine_client.close()
[docs] def answer_prompt(self, prompter_id, answer):
"""See base class."""
self.engine_client.answer_prompt(self.exec_job_id, prompter_id, answer)
[docs] def restart_kernel(self, connection_file):
"""See base class."""
# TODO: This does not restart the kernel, only replaces the client. Do kernel_manager.restart_kernel() on server
pass
[docs] def shutdown_kernel(self, connection_file):
"""See base class."""
pass
[docs] def is_persistent_command_complete(self, persistent_key, command):
return self.engine_client.send_is_complete(persistent_key, command)
[docs] def issue_persistent_command(self, persistent_key, command):
return self.engine_client.send_issue_persistent_command(persistent_key, command)
[docs] def restart_persistent(self, persistent_key):
"""See base class."""
return self.engine_client.send_restart_persistent(persistent_key)
[docs] def interrupt_persistent(self, persistent_key):
"""See base class."""
return self.engine_client.send_interrupt_persistent(persistent_key)
[docs] def kill_persistent(self, persistent_key):
"""See base class."""
return self.engine_client.send_kill_persistent(persistent_key)
[docs] def get_persistent_completions(self, persistent_key, text):
"""See base class."""
return self.engine_client.send_get_persistent_completions(persistent_key, text)
[docs] def get_persistent_history_item(self, persistent_key, text, prefix, backwards):
"""Returns an item from persistent history.
Args:
persistent_key (tuple): persistent identifier
Returns:
str: history item or empty string if none
"""
return self.engine_client.send_get_persistent_history_item(persistent_key, text, prefix, backwards)
[docs]def make_engine_manager(remote_execution_enabled=False, job_id=""):
"""Returns either a Local or a remote Spine Engine Manager based on settings.
Args:
remote_execution_enabled (bool): True returns a local Spine Engine Manager instance,
False returns a remote Spine Engine Manager instance
job_id (str): Server execution job Id
"""
if remote_execution_enabled:
return RemoteSpineEngineManager(job_id)
return LocalSpineEngineManager()