Source code for spinetoolbox.widgets.python_repl_widget
######################################################################################################################
# Copyright (C) 2017-2020 Spine project consortium
# This file is part of Spine Toolbox.
# Spine Toolbox is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option)
# any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
# Public License for more details. You should have received a copy of the GNU Lesser General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
######################################################################################################################
"""
Class for a custom SpineConsoleWidget to use as Python REPL.
:author: P. Savolainen (VTT)
:date: 14.3.2019
"""
import os.path
import subprocess
from PySide2.QtCore import Qt, Slot
from PySide2.QtWidgets import QAction, QMessageBox
from qtconsole.manager import QtKernelManager
from jupyter_client.kernelspec import find_kernel_specs, get_kernel_spec, NoSuchKernel
from .toolbars import DraggableWidget
from ..helpers import busy_effect
from ..config import PYTHON_EXECUTABLE
from ..execution_managers import QProcessExecutionManager
from .spine_console_widget import SpineConsoleWidget
[docs]class PythonReplWidget(SpineConsoleWidget):
"""Python Repl Widget class.
Attributes:
toolbox (ToolboxUI): App main window (QMainWindow) instance
"""
def __init__(self, toolbox):
"""Class constructor."""
super().__init__(toolbox)
self._kernel_starting = False # Warning: Do not use self._starting (protected class variable in JupyterWidget)
self.kernel_name = None
self.kernel_display_name = ""
self.kernel_manager = None
self.kernel_client = None
self.python_cmd = None # Contains the path to selected python executable (i.e. pythondir/python.exe on Win.)
self.install_proc_exec_mngr = None # QSubProcess instance for installing required packages
self.may_need_restart = True # Has the user changed the Python environment in Settings
self.normal_cursor = self._control.viewport().cursor()
# QActions
self.start_action = QAction("Start", self)
self.start_action.triggered.connect(lambda checked: self.setup_python_kernel())
self.restart_action = QAction("Restart", self)
self.restart_action.triggered.connect(lambda a: self.restart_kernel("Do you want to restart the kernel?"))
[docs] def connect_signals(self):
"""Connect signals."""
self.executing.connect(self.handle_executing) # Signal defined in FrontEndWidget class
self.executed.connect(self.handle_executed) # Signal defined in FrontEndWidget class
self.kernel_client.iopub_channel.message_received.connect(self.receive_iopub_msg)
[docs] def disconnect_signals(self):
"""Disconnect signals. Needed before
switching to another Python kernel."""
self.executing.disconnect(self.handle_executing)
self.executed.disconnect(self.handle_executed)
self.kernel_client.iopub_channel.message_received.disconnect(self.receive_iopub_msg)
@busy_effect
[docs] def python_kernel_name(self):
"""Returns the name of the Python kernel specification
and its display name according to the selected Python
environment in Settings. Returns None if Python version
cannot be determined."""
if not self.may_need_restart:
return self.kernel_name, self.kernel_display_name
python_path = self._toolbox.qsettings().value("appSettings/pythonPath", defaultValue="")
if python_path:
self.python_cmd = python_path
else:
self.python_cmd = PYTHON_EXECUTABLE
program = str(self.python_cmd)
args = list()
args.append("-V")
proc_exec_mngr = QProcessExecutionManager(self._toolbox, program, args, silent=True)
proc_exec_mngr.start_execution()
if not proc_exec_mngr.wait_for_process_finished(msecs=5000):
self._toolbox.msg_error.emit(
"Couldn't determine Python version. Please check " "the <b>Python interpreter</b> option in Settings."
)
return None
python_version_str = proc_exec_mngr.process_output
if not python_version_str:
# The version str might be in stderr instead of stdout (happens at least with Python 2.7.14)
python_version_str = proc_exec_mngr.error_output
p_v_list = python_version_str.split()
ver = p_v_list[1] # version e.g. 3.7.1
kernel_name = "python-" + ver[:3]
kernel_display_name = "Python-" + ver
self.may_need_restart = False
return kernel_name, kernel_display_name
@Slot()
[docs] def setup_python_kernel(self):
"""Context menu Start action handler."""
k_tuple = self.python_kernel_name()
if not k_tuple:
return
self.launch_kernel(k_tuple[0], k_tuple[1])
[docs] def launch_kernel(self, k_name, k_display_name):
"""Check if selected kernel exists or if it needs to be set up before launching."""
if self.kernel_manager:
if self.kernel_name == k_name:
# Happens when context-menu 'Start' item is clicked while the kernel is already running
self._toolbox.msg.emit("Kernel {0} already running in Python Console".format(self.kernel_name))
return
self._toolbox.msg.emit("*** Restarting Python Console ***")
self._toolbox.msg.emit("\tShutting down IPython kernel <b>{0}</b>".format(self.kernel_name))
self.shutdown_kernel(hush=True)
else:
self._toolbox.msg.emit("*** Starting Python Console ***")
self.kernel_name = k_name
self.kernel_display_name = k_display_name
self.check_and_install_requirements()
[docs] def check_and_install_requirements(self):
"""Prompts user to install IPython and ipykernel if they are missing.
After installing the required packages, installs kernelspecs for the
selected Python if they are missing.
Returns:
Boolean value depending on whether or not the user chooses to proceed.
"""
if not self.is_package_installed("ipykernel"):
message = (
"Python Console requires package <b>ipykernel</b>." "<p>Do you want to install the package now?</p>"
)
message_box = QMessageBox(
QMessageBox.Question,
"ipykernel Missing",
message,
QMessageBox.Ok | QMessageBox.Cancel,
parent=self._toolbox,
)
message_box.button(QMessageBox.Ok).setText("Install ipykernel")
answer = message_box.exec_()
if answer == QMessageBox.Cancel:
self._control.viewport().setCursor(self.normal_cursor)
return False
self._toolbox.msg.emit("*** Installing ipykernel ***")
self.start_package_install_process("ipykernel")
return True
# Install kernelspecs for self.kernel_name if not already present
kernel_specs = find_kernel_specs()
spec_exists = self.kernel_name in kernel_specs
executable_valid = False
if spec_exists:
spec = get_kernel_spec(self.kernel_name)
executable_valid = os.path.exists(spec.argv[0])
if not spec_exists or not executable_valid:
message = (
"IPython kernel specifications for the selected environment are missing. "
"<p>Do you want to install kernel <b>{0}</b> specifications now?</p>".format(self.kernel_name)
)
message_box = QMessageBox(
QMessageBox.Question,
"Kernel Specs Missing",
message,
QMessageBox.Ok | QMessageBox.Cancel,
parent=self._toolbox,
)
message_box.button(QMessageBox.Ok).setText("Install specifications")
answer = message_box.exec_()
if answer == QMessageBox.Cancel:
self._control.viewport().setCursor(self.normal_cursor)
return False
self._toolbox.msg.emit("*** Installing IPython kernel <b>{0}</b> specs ***".format(self.kernel_name))
self.start_kernelspec_install_process()
# New specs installed, update the variable
if self.install_proc_exec_mngr.wait_for_process_finished():
kernel_specs = find_kernel_specs()
else:
self._toolbox.msg_error.emit("Failed to install IPython kernel specifications.")
return False
# Everything ready, start Python Console
kernel_dir = kernel_specs[self.kernel_name]
kernel_spec_anchor = "<a style='color:#99CCFF;' title='{0}' href='#'>{1}</a>".format(
kernel_dir, self.kernel_name
)
self._toolbox.msg.emit("\tStarting IPython kernel {0}".format(kernel_spec_anchor))
self.start_python_kernel()
return True
[docs] def is_package_installed(self, package_name):
"""Checks if given package is installed to selected Python environment.
Args:
package_name (str): Package name
Returns:
(bool): True if installed, False if not
"""
response = subprocess.check_output([self.python_cmd, '-m', 'pip', 'freeze', '-q'])
installed_packages = [r.decode().split('==')[0] for r in response.split()]
if package_name in installed_packages:
return True
return False
[docs] def start_package_install_process(self, package_name):
"""Starts installing the given package using pip.
Args:
package_name (str): Package name to install using pip
"""
program = "{0}".format(self.python_cmd) # selected python environment
args = list()
args.append("-m")
args.append("pip")
args.append("install")
args.append(package_name)
self.install_proc_exec_mngr = QProcessExecutionManager(self._toolbox, program, args, semisilent=True)
self.install_proc_exec_mngr.execution_finished.connect(self.handle_package_install_process_finished)
self.install_proc_exec_mngr.start_execution()
@Slot(int)
[docs] def handle_package_install_process_finished(self, retval):
"""Handles installing package finished.
Args:
retval (int): Process return value. 0: success, !0: failure
"""
self.install_proc_exec_mngr.execution_finished.disconnect()
self.install_proc_exec_mngr.deleteLater()
self.install_proc_exec_mngr = None
if retval != 0:
self._toolbox.msg_error.emit("\tInstalling required package failed. Please install it manually.")
return
self._toolbox.msg_success.emit("\tInstalling package to environment {0} succeeded".format(self.python_cmd))
self.check_and_install_requirements() # Check reqs again
[docs] def start_kernelspec_install_process(self):
"""Install kernel specifications for the selected Python environment."""
# python -m ipykernel install --user --name python-3.4 --display-name Python3.4
# Creates kernelspecs to
# C:\Users\ttepsa\AppData\Roaming\jupyter\kernels\python-3.4
program = "{0}".format(self.python_cmd) # selected python environment
args = list()
args.append("-m")
args.append("ipykernel")
args.append("install")
args.append("--user")
args.append("--name")
args.append(self.kernel_name)
args.append("--display-name")
args.append(self.kernel_display_name)
self.install_proc_exec_mngr = QProcessExecutionManager(self._toolbox, program, args, semisilent=True)
self.install_proc_exec_mngr.execution_finished.connect(self.handle_kernelspec_install_process_finished)
self.install_proc_exec_mngr.start_execution()
@Slot(int)
[docs] def handle_kernelspec_install_process_finished(self, retval):
"""Handles installing package finished.
Args:
retval (int): Process return value. 0: success, !0: failure
"""
self.install_proc_exec_mngr.execution_finished.disconnect()
self.install_proc_exec_mngr.deleteLater()
self.install_proc_exec_mngr = None
if retval != 0:
self._toolbox.msg_error.emit("\tInstalling kernel specs failed. Please install them manually.")
return
self._toolbox.msg_success.emit("\tKernel specs <b>{0}</b> installed".format(self.kernel_name))
self.start_python_kernel()
[docs] def start_python_kernel(self):
"""Starts kernel manager and client and attaches
the client to the Python Console."""
self._kernel_starting = True
km = QtKernelManager(kernel_name=self.kernel_name)
try:
blackhole = open(os.devnull, 'w')
km.start_kernel(stdout=blackhole, stderr=blackhole)
kc = km.client()
kc.start_channels()
self.kernel_manager = km
self.kernel_client = kc
self.connect_signals()
return True
except FileNotFoundError:
self._toolbox.msg_error.emit("\tCouldn't find the Python executable specified by the Jupyter kernel")
self._kernel_starting = False
return False
except NoSuchKernel: # kernelspecs for the selected kernel_name not available
self._toolbox.msg_error.emit(
"\tCouldn't find the specified IPython kernel specs [{0}]".format(self.kernel_name)
)
self._kernel_starting = False
return False
[docs] def wake_up(self):
"""See base class."""
k_tuple = self.python_kernel_name()
if not k_tuple:
self.execution_failed.emit(-1)
return
# Check if this kernel is already running
kernel_name, kernel_display_name = k_tuple
if self.kernel_manager and self.kernel_name == kernel_name:
self.ready_to_execute.emit()
else:
self.launch_kernel(kernel_name, kernel_display_name)
@Slot(str)
[docs] def handle_executing(self, code):
"""Slot for handling the 'executing' signal. Signal is emitted
when a user visible 'execute_request' has been submitted to the
kernel from the FrontendWidget.
Args:
code (str): Code to be executed (actually not 'str' but 'object')
"""
self._control.viewport().setCursor(Qt.BusyCursor)
@Slot(dict)
[docs] def handle_executed(self, msg):
"""Slot for handling the 'executed' signal. Signal is emitted
when a user-visible 'execute_reply' has been received from the
kernel and processed by the FrontendWidget.
Args:
msg (dict): Response message (actually not 'dict' but 'object')
"""
self._control.viewport().setCursor(self.normal_cursor)
if msg['content']['status'] == 'ok':
# TODO: If user Stops execution, it should be handled here
self.ready_to_execute.emit()
else:
# TODO: if status='error' you can get the exception and more info from msg
self.execution_failed.emit(-1)
@Slot(dict)
[docs] def receive_iopub_msg(self, msg):
"""Message received from the IOPUB channel.
Note: We are only monitoring when the kernel has started
successfully and ready for action here. Alternatively, this
could be done in the Slot for the 'executed' signal. However,
this Slot could come in handy at some point. See 'Messaging in
Jupyter' for details:
https://jupyter-client.readthedocs.io/en/latest/messaging.html
Args:
msg (dict): Received message from IOPUB channel
"""
if msg["header"]["msg_type"] == "status":
# When msg_type:'status, content has the kernel execution_state
# When msg_type:'execute_input', content has the code to be executed
# When msg_type:'stream', content has the stream name (e.g. stdout) and the text
execution_state = msg['content']['execution_state']
try:
parent_msg_type = msg["parent_header"]["msg_type"] # msg that the received msg is replying to
except KeyError:
# execution_state: 'starting' -> no parent_header
parent_msg_type = "na" # status msg has no parent header
# self._toolbox.msg.emit(msg)
if parent_msg_type == "kernel_info_request":
# If kernel_state:'busy', kernel_info_request is being processed
# If kernel_state:'idle', kernel_info_request is done
pass
elif parent_msg_type == "history_request":
# If kernel_state:'busy', history_request is being processed
# If kernel_state:'idle', history_request is done
pass
elif parent_msg_type == "execute_request":
# If kernel_state:'busy', execute_request is being processed (i.e. execution or start-up in progress)
# If kernel_state:'idle', execute_request is done
if execution_state == "busy":
# kernel is busy starting up or executing
pass
elif execution_state == "idle":
# Kernel is idle after execution
if self._kernel_starting:
# Kernel is idle after starting up -> execute pending command
self._kernel_starting = False
# Start executing the first command (if available) from the command buffer immediately
self.ready_to_execute.emit()
else:
# This should probably happen when _kernel_state is 'starting' but it doesn't seem to show up
self._toolbox.msg_error.emit(
"Unhandled execution_state '{0}' after " "execute_request".format(execution_state)
)
[docs] def shutdown_kernel(self, hush=False):
"""Shut down Python kernel."""
if not self.kernel_client:
return
self.disconnect_signals()
if not hush:
self._toolbox.msg.emit("Shutting down Python Console...")
self.kernel_client.stop_channels()
self.kernel_manager.shutdown_kernel()
[docs] def push_vars(self, var_name, var_value):
"""Push a variable to Python Console session.
Simply executes command 'var_name=var_value'.
Args:
var_name (str): Variable name
var_value (object): Variable value
Returns:
(bool): True if succeeded, False otherwise
"""
if not self.kernel_manager:
return False
self.execute("{0}={1}".format(var_name, var_value))
return True
@Slot()
[docs] def test_push_vars(self):
"""QAction slot to test pushing variables to Python Console."""
a = dict()
a["eka"] = 1
a["toka"] = 2
a["kolmas"] = 3
if not self.push_vars("a", a):
self._toolbox.msg_error.emit("Pushing variable to Python Console failed")
else:
self._toolbox.msg.emit("Variable 'a' is now in Python Console")
[docs] def dragEnterEvent(self, e):
"""Don't accept project item drops."""
source = e.source()
if isinstance(source, DraggableWidget):
e.ignore()
else:
super().dragEnterEvent(e)
[docs] def _is_complete(self, source, interactive):
"""See base class."""
raise NotImplementedError()