Source code for spinetoolbox.widgets.spine_console_widget
######################################################################################################################
# Copyright (C) 2017-2020 Spine project consortium
# This file is part of Spine Toolbox.
# Spine Toolbox is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option)
# any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
# Public License for more details. You should have received a copy of the GNU Lesser General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
######################################################################################################################
"""
Class for a custom RichJupyterWidget that can run Tool instances.
:authors: M. Marin (KTH), P. Savolainen (VTT)
:date: 22.10.2019
"""
import logging
import os
from PySide2.QtCore import Signal, Slot, Qt
from PySide2.QtWidgets import QAction, QApplication
from qtconsole.rich_jupyter_widget import RichJupyterWidget
from qtconsole.manager import QtKernelManager
from jupyter_client.kernelspec import NoSuchKernel
from spinetoolbox.widgets.custom_qlistview import DragListView
from spinetoolbox.widgets.kernel_editor import find_python_kernels, find_julia_kernels
from spinetoolbox.config import JUPYTER_KERNEL_TIME_TO_DEAD
[docs]class SpineConsoleWidget(RichJupyterWidget):
"""Base class for all embedded console widgets that can run tool instances."""
def __init__(self, toolbox, name):
"""
Args:
toolbox (ToolboxUI): QMainWindow instance
name (str): Console name, e.g. 'Python Console'
"""
super().__init__(parent=toolbox)
self._toolbox = toolbox
self._name = name
self._kernel_starting = False # Warning: Do not use self._starting (protected class variable in JupyterWidget)
self.kernel_name = None
self.kernel_manager = None
self.kernel_client = None
self.normal_cursor = self._control.viewport().cursor()
self._copy_input_action = QAction('Copy (Only Input)', self)
self._copy_input_action.triggered.connect(lambda checked: self.copy_input())
self._copy_input_action.setEnabled(False)
self.copy_available.connect(self._copy_input_action.setEnabled)
self.start_console_action = QAction("Start", self)
self.start_console_action.triggered.connect(self.start_menu_action)
self.restart_console_action = QAction("Restart", self)
self.restart_console_action.triggered.connect(self.restart_menu_action)
# Set logging level for jupyter loggers
traitlets_logger = logging.getLogger("traitlets")
asyncio_logger = logging.getLogger("asyncio")
traitlets_logger.setLevel(level=logging.WARNING)
asyncio_logger.setLevel(level=logging.WARNING)
@Slot(bool)
@Slot(bool)
[docs] def wake_up(self, k_name=None):
"""Wakes up the console in preparation for execution. Either
ready_to_execute or execution_failed signal must be emitted
as a consequence of calling this method.
"""
if self._name == "Python Console":
if not k_name:
k_name = self._toolbox.qsettings().value("appSettings/pythonKernel", defaultValue="")
if not k_name:
self._toolbox.msg_error.emit("No kernel selected. Go to Settings->Tools to select a Python kernel.")
self.execution_failed.emit(-1)
return
kernels = find_python_kernels()
try:
kernel_path = kernels[k_name]
except KeyError:
self._toolbox.msg_error.emit(
f"Kernel {k_name} not found. Go to Settings->Tools " f"and select another Python kernel."
)
self.execution_failed.emit(-1)
return
elif self._name == "Julia Console":
if not k_name:
k_name = self._toolbox.qsettings().value("appSettings/juliaKernel", defaultValue="")
if not k_name:
self._toolbox.msg_error.emit("No kernel selected. Go to Settings->Tools to select a Julia kernel.")
self.execution_failed.emit(-1)
return
kernels = find_julia_kernels()
try:
kernel_path = kernels[k_name]
except KeyError:
self._toolbox.msg_error.emit(
f"Kernel {k_name} not found. Go to Settings->Tools and select another Julia kernel."
)
self.execution_failed.emit(-1)
return
else:
self._toolbox.msg_error.emit("Unknown Console")
self.execution_failed.emit(-1)
return
# Check if this kernel is already running
if self.kernel_manager and self.kernel_name == k_name:
self.ready_to_execute.emit()
else:
self.start_kernel(k_name, kernel_path)
[docs] def start_kernel(self, k_name, k_path):
"""Starts a kernel manager and kernel client and attaches the client to Julia or Python Console.
Args:
k_name (str): Kernel name
k_path (str): Directory where the the kernel specs are located
"""
if self.kernel_manager and self.kernel_name != k_name:
old_k_name_anchor = "<a style='color:#99CCFF;' title='{0}' href='#'>{1}</a>".format(
k_path, self.kernel_name
)
self._toolbox.msg.emit(f"Kernel changed in Settings. Shutting down current kernel {old_k_name_anchor}.")
self.shutdown_kernel()
self.kernel_name = k_name
new_k_name_anchor = "<a style='color:#99CCFF;' title='{0}' href='#'>{1}</a>".format(k_path, self.kernel_name)
self._toolbox.msg.emit(f"*** Starting {self._name} (kernel {new_k_name_anchor}) ***")
self._kernel_starting = True # This flag is unset when a correct msg is received from iopub_channel
km = QtKernelManager(kernel_name=self.kernel_name)
try:
blackhole = open(os.devnull, 'w')
km.start_kernel(stdout=blackhole, stderr=blackhole)
kc = km.client()
kc.hb_channel.time_to_dead = JUPYTER_KERNEL_TIME_TO_DEAD
kc.start_channels()
self.kernel_manager = km
self.kernel_client = kc
return
except FileNotFoundError:
self._toolbox.msg_error.emit(f"Couldn't find the executable specified by kernel {self.kernel_name}")
self._kernel_starting = False
return
except NoSuchKernel: # kernelspecs missing (probably happens when kernel.json file does not exist
self._toolbox.msg_error.emit(f"Couldn't find kernel specs for kernel {self.kernel_name}")
self._kernel_starting = False
return
[docs] def shutdown_kernel(self):
"""Shut down Julia/Python kernel."""
if not self.kernel_manager or not self.kernel_manager.is_alive():
return
self._toolbox.msg.emit(f"Shutting down {self._name}...")
if self.kernel_client is not None:
self.kernel_client.stop_channels()
self.kernel_manager.shutdown_kernel(now=True)
self.kernel_manager.deleteLater()
self.kernel_manager = None
self.kernel_client.deleteLater()
self.kernel_client = None
[docs] def dragEnterEvent(self, e):
"""Don't accept project item drops."""
source = e.source()
if isinstance(source, DragListView):
e.ignore()
else:
super().dragEnterEvent(e)
@Slot(dict)
[docs] def _handle_execute_reply(self, msg):
super()._handle_execute_reply(msg)
content = msg["content"]
if content["execution_count"] == 0:
return # This is not in response to commands, this is just the kernel saying hello
if content["status"] != "ok":
self.execution_failed.emit(-1)
else:
self.ready_to_execute.emit()
@Slot(dict)
[docs] def _handle_status(self, msg):
"""Handles status message."""
super()._handle_status(msg)
self.kernel_execution_state = msg["content"].get("execution_state", "")
if self.kernel_execution_state == "starting":
# This msg does not show up when starting the Python Console but on Restart it does (strange)
# self._toolbox.msg.emit(f"*** Starting {self._name} ***")
self._kernel_starting = True
return
if self.kernel_execution_state == "idle" and self._kernel_starting:
self._kernel_starting = False
self._toolbox.msg_success.emit(f"{self._name} ready for action")
self._control.viewport().setCursor(self.normal_cursor)
self.ready_to_execute.emit()
@Slot(dict)
[docs] def _handle_error(self, msg):
"""Handles error messages."""
super()._handle_error(msg)
self.execution_failed.emit(-1)
[docs] def enterEvent(self, event):
"""Sets busy cursor during console (re)starts."""
if self._kernel_starting:
self._control.viewport().setCursor(Qt.BusyCursor)
[docs] def _is_complete(self, source, interactive):
"""See base class."""
raise NotImplementedError()
[docs] def copy_input(self):
"""Copies only input."""
if not self._control.hasFocus():
return
text = self._control.textCursor().selection().toPlainText()
if not text:
return
# Remove prompts.
lines = text.splitlines()
useful_lines = []
for line in lines:
m = self._highlighter._classic_prompt_re.match(line)
if m:
useful_lines.append(line[len(m.group(0)) :])
continue
m = self._highlighter._ipy_prompt_re.match(line)
if m:
useful_lines.append(line[len(m.group(0)) :])
continue
text = '\n'.join(useful_lines)
try:
was_newline = text[-1] == '\n'
except IndexError:
was_newline = False
if was_newline: # user doesn't need newline
text = text[:-1]
QApplication.clipboard().setText(text)
[docs] def _setup_client(self):
"""Sets up client."""
if self.kernel_manager is None:
return
new_kernel_client = self.kernel_manager.client()
new_kernel_client.hb_channel.time_to_dead = (
JUPYTER_KERNEL_TIME_TO_DEAD # Not crucial, but nicer to keep the same as mngr
)
new_kernel_client.start_channels()
if self.kernel_client is not None:
self.kernel_client.stop_channels()
self.kernel_client = new_kernel_client
[docs] def connect_to_kernel(self, kernel_name, connection_file):
"""Connects to an existing kernel. Used when Spine Engine is managing the kernel
for project execution.
Args:
connection_file (str): Path to the connection file of the kernel
"""
self.kernel_manager = QtKernelManager(connection_file=connection_file)
self.kernel_manager.load_connection_file()
self.kernel_name = kernel_name
self._setup_client()
self.include_other_output = True
self.other_output_prefix = ""
[docs] def interrupt(self):
"""[TODO: Remove?] Sends interrupt signal to kernel."""
if not self.kernel_manager:
return
self.kernel_manager.interrupt_kernel()