Source code for spinetoolbox.widgets.spine_console_widget

######################################################################################################################
# Copyright (C) 2017-2020 Spine project consortium
# This file is part of Spine Toolbox.
# Spine Toolbox is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option)
# any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
# Public License for more details. You should have received a copy of the GNU Lesser General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
######################################################################################################################

"""
Class for a custom RichJupyterWidget that can run Tool instances.

:authors: M. Marin (KTH), P. Savolainen (VTT)
:date:   22.10.2019
"""

import logging
import os
from PySide2.QtCore import Signal, Slot, Qt
from PySide2.QtWidgets import QAction, QApplication
from qtconsole.rich_jupyter_widget import RichJupyterWidget
from qtconsole.manager import QtKernelManager
from jupyter_client.kernelspec import NoSuchKernel
from spinetoolbox.widgets.custom_qlistview import DragListView
from spinetoolbox.widgets.kernel_editor import find_python_kernels, find_julia_kernels
from spinetoolbox.config import JUPYTER_KERNEL_TIME_TO_DEAD


[docs]class SpineConsoleWidget(RichJupyterWidget): """Base class for all embedded console widgets that can run tool instances."""
[docs] ready_to_execute = Signal()
[docs] execution_failed = Signal(int)
def __init__(self, toolbox, name): """ Args: toolbox (ToolboxUI): QMainWindow instance name (str): Console name, e.g. 'Python Console' """ super().__init__(parent=toolbox) self._toolbox = toolbox self._name = name self._kernel_starting = False # Warning: Do not use self._starting (protected class variable in JupyterWidget) self.kernel_name = None self.kernel_manager = None self.kernel_client = None self.normal_cursor = self._control.viewport().cursor() self._copy_input_action = QAction('Copy (Only Input)', self) self._copy_input_action.triggered.connect(lambda checked: self.copy_input()) self._copy_input_action.setEnabled(False) self.copy_available.connect(self._copy_input_action.setEnabled) self.start_console_action = QAction("Start", self) self.start_console_action.triggered.connect(self.start_menu_action) self.restart_console_action = QAction("Restart", self) self.restart_console_action.triggered.connect(self.restart_menu_action) # Set logging level for jupyter loggers traitlets_logger = logging.getLogger("traitlets") asyncio_logger = logging.getLogger("asyncio") traitlets_logger.setLevel(level=logging.WARNING) asyncio_logger.setLevel(level=logging.WARNING)
[docs] def name(self): """Returns console name.""" return self._name
@Slot(bool)
[docs] def start_menu_action(self, checked=False): """Starts chosen Python/Julia kernel if available and not already running. Context menu start action handler.""" if self._name == "Python Console": k_name = self._toolbox.qsettings().value("appSettings/pythonKernel", defaultValue="") else: k_name = self._toolbox.qsettings().value("appSettings/juliaKernel", defaultValue="") if k_name == "": self._toolbox.msg_error.emit( f"No kernel selected. Go to Settings->Tools to select a kernel for {self._name}" ) return if self.kernel_manager and self.kernel_name == k_name: self._toolbox.msg_warning.emit(f"Kernel {k_name} already running in {self._name}") return self.wake_up(k_name)
@Slot(bool)
[docs] def restart_menu_action(self, checked=False): """Restarts chosen Python/Julia kernel. Starts a new kernel if it is not running or if chosen kernel has been changed in Settings. Context menu restart action handler.""" if self._name == "Python Console": k_name = self._toolbox.qsettings().value("appSettings/pythonKernel", defaultValue="") else: k_name = self._toolbox.qsettings().value("appSettings/juliaKernel", defaultValue="") if k_name == "": self._toolbox.msg_error.emit( f"No kernel selected. Go to Settings->Tools to select a kernel for {self._name}" ) return if self.kernel_manager and self.kernel_name == k_name: # Restart current kernel self._kernel_starting = True # This flag is unset when a correct msg is received from iopub_channel self._toolbox.msg.emit(f"*** Restarting {self._name} ***") # self.shutdown_kernel() if self.kernel_client: self.kernel_client.stop_channels() # Restart kernel manager blackhole = open(os.devnull, 'w') self.kernel_manager.restart_kernel(now=True, stdout=blackhole, stderr=blackhole) # Start kernel client and attach it to kernel manager kc = self.kernel_manager.client() kc.hb_channel.time_to_dead = JUPYTER_KERNEL_TIME_TO_DEAD kc.start_channels() self.kernel_client = kc else: # No kernel running in Python Console or Python kernel has been changed in Settings->Tools. Start kernel self.wake_up(k_name)
[docs] def wake_up(self, k_name=None): """Wakes up the console in preparation for execution. Either ready_to_execute or execution_failed signal must be emitted as a consequence of calling this method. """ if self._name == "Python Console": if not k_name: k_name = self._toolbox.qsettings().value("appSettings/pythonKernel", defaultValue="") if not k_name: self._toolbox.msg_error.emit("No kernel selected. Go to Settings->Tools to select a Python kernel.") self.execution_failed.emit(-1) return kernels = find_python_kernels() try: kernel_path = kernels[k_name] except KeyError: self._toolbox.msg_error.emit( f"Kernel {k_name} not found. Go to Settings->Tools " f"and select another Python kernel." ) self.execution_failed.emit(-1) return elif self._name == "Julia Console": if not k_name: k_name = self._toolbox.qsettings().value("appSettings/juliaKernel", defaultValue="") if not k_name: self._toolbox.msg_error.emit("No kernel selected. Go to Settings->Tools to select a Julia kernel.") self.execution_failed.emit(-1) return kernels = find_julia_kernels() try: kernel_path = kernels[k_name] except KeyError: self._toolbox.msg_error.emit( f"Kernel {k_name} not found. Go to Settings->Tools and select another Julia kernel." ) self.execution_failed.emit(-1) return else: self._toolbox.msg_error.emit("Unknown Console") self.execution_failed.emit(-1) return # Check if this kernel is already running if self.kernel_manager and self.kernel_name == k_name: self.ready_to_execute.emit() else: self.start_kernel(k_name, kernel_path)
[docs] def start_kernel(self, k_name, k_path): """Starts a kernel manager and kernel client and attaches the client to Julia or Python Console. Args: k_name (str): Kernel name k_path (str): Directory where the the kernel specs are located """ if self.kernel_manager and self.kernel_name != k_name: old_k_name_anchor = "<a style='color:#99CCFF;' title='{0}' href='#'>{1}</a>".format( k_path, self.kernel_name ) self._toolbox.msg.emit(f"Kernel changed in Settings. Shutting down current kernel {old_k_name_anchor}.") self.shutdown_kernel() self.kernel_name = k_name new_k_name_anchor = "<a style='color:#99CCFF;' title='{0}' href='#'>{1}</a>".format(k_path, self.kernel_name) self._toolbox.msg.emit(f"*** Starting {self._name} (kernel {new_k_name_anchor}) ***") self._kernel_starting = True # This flag is unset when a correct msg is received from iopub_channel km = QtKernelManager(kernel_name=self.kernel_name) try: blackhole = open(os.devnull, 'w') km.start_kernel(stdout=blackhole, stderr=blackhole) kc = km.client() kc.hb_channel.time_to_dead = JUPYTER_KERNEL_TIME_TO_DEAD kc.start_channels() self.kernel_manager = km self.kernel_client = kc return except FileNotFoundError: self._toolbox.msg_error.emit(f"Couldn't find the executable specified by kernel {self.kernel_name}") self._kernel_starting = False return except NoSuchKernel: # kernelspecs missing (probably happens when kernel.json file does not exist self._toolbox.msg_error.emit(f"Couldn't find kernel specs for kernel {self.kernel_name}") self._kernel_starting = False return
[docs] def shutdown_kernel(self): """Shut down Julia/Python kernel.""" if not self.kernel_manager or not self.kernel_manager.is_alive(): return self._toolbox.msg.emit(f"Shutting down {self._name}...") if self.kernel_client is not None: self.kernel_client.stop_channels() self.kernel_manager.shutdown_kernel(now=True) self.kernel_manager.deleteLater() self.kernel_manager = None self.kernel_client.deleteLater() self.kernel_client = None
[docs] def dragEnterEvent(self, e): """Don't accept project item drops.""" source = e.source() if isinstance(source, DragListView): e.ignore() else: super().dragEnterEvent(e)
@Slot(dict)
[docs] def _handle_execute_reply(self, msg): super()._handle_execute_reply(msg) content = msg["content"] if content["execution_count"] == 0: return # This is not in response to commands, this is just the kernel saying hello if content["status"] != "ok": self.execution_failed.emit(-1) else: self.ready_to_execute.emit()
@Slot(dict)
[docs] def _handle_status(self, msg): """Handles status message.""" super()._handle_status(msg) self.kernel_execution_state = msg["content"].get("execution_state", "") if self.kernel_execution_state == "starting": # This msg does not show up when starting the Python Console but on Restart it does (strange) # self._toolbox.msg.emit(f"*** Starting {self._name} ***") self._kernel_starting = True return if self.kernel_execution_state == "idle" and self._kernel_starting: self._kernel_starting = False self._toolbox.msg_success.emit(f"{self._name} ready for action") self._control.viewport().setCursor(self.normal_cursor) self.ready_to_execute.emit()
@Slot(dict)
[docs] def _handle_error(self, msg): """Handles error messages.""" super()._handle_error(msg) self.execution_failed.emit(-1)
[docs] def enterEvent(self, event): """Sets busy cursor during console (re)starts.""" if self._kernel_starting: self._control.viewport().setCursor(Qt.BusyCursor)
[docs] def _is_complete(self, source, interactive): """See base class.""" raise NotImplementedError()
[docs] def _context_menu_make(self, pos): """Reimplemented to add actions to console context-menus.""" menu = super()._context_menu_make(pos) for before_action in menu.actions(): if before_action.text() == 'Copy (Raw Text)': menu.insertAction(before_action, self._copy_input_action) break first_action = menu.actions()[0] if not self.kernel_manager: menu.insertAction(first_action, self.start_console_action) else: menu.insertAction(first_action, self.restart_console_action) menu.insertSeparator(first_action) return menu
[docs] def copy_input(self): """Copies only input.""" if not self._control.hasFocus(): return text = self._control.textCursor().selection().toPlainText() if not text: return # Remove prompts. lines = text.splitlines() useful_lines = [] for line in lines: m = self._highlighter._classic_prompt_re.match(line) if m: useful_lines.append(line[len(m.group(0)) :]) continue m = self._highlighter._ipy_prompt_re.match(line) if m: useful_lines.append(line[len(m.group(0)) :]) continue text = '\n'.join(useful_lines) try: was_newline = text[-1] == '\n' except IndexError: was_newline = False if was_newline: # user doesn't need newline text = text[:-1] QApplication.clipboard().setText(text)
[docs] def _setup_client(self): """Sets up client.""" if self.kernel_manager is None: return new_kernel_client = self.kernel_manager.client() new_kernel_client.hb_channel.time_to_dead = ( JUPYTER_KERNEL_TIME_TO_DEAD # Not crucial, but nicer to keep the same as mngr ) new_kernel_client.start_channels() if self.kernel_client is not None: self.kernel_client.stop_channels() self.kernel_client = new_kernel_client
[docs] def connect_to_kernel(self, kernel_name, connection_file): """Connects to an existing kernel. Used when Spine Engine is managing the kernel for project execution. Args: connection_file (str): Path to the connection file of the kernel """ self.kernel_manager = QtKernelManager(connection_file=connection_file) self.kernel_manager.load_connection_file() self.kernel_name = kernel_name self._setup_client() self.include_other_output = True self.other_output_prefix = ""
[docs] def interrupt(self): """[TODO: Remove?] Sends interrupt signal to kernel.""" if not self.kernel_manager: return self.kernel_manager.interrupt_kernel()