Source code for spinetoolbox.widgets.persistent_console_widget
######################################################################################################################
# Copyright (C) 2017-2022 Spine project consortium
# Copyright Spine Toolbox contributors
# This file is part of Spine Toolbox.
# Spine Toolbox is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option)
# any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
# Public License for more details. You should have received a copy of the GNU Lesser General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
######################################################################################################################
"""Contains a widget acting as a console for Julia & Python REPL's."""
import os
import uuid
from pygments.styles import get_style_by_name
from pygments.lexers import get_lexer_by_name
from pygments.util import ClassNotFound
from pygments.token import Token
from PySide6.QtCore import Qt, Slot, QTimer, Signal, QRect
from PySide6.QtWidgets import QPlainTextEdit, QSizePolicy
from PySide6.QtGui import (
QFontDatabase,
QTextCharFormat,
QFont,
QTextCursor,
QColor,
QTextBlockFormat,
QTextOption,
QKeySequence,
)
from spinetoolbox.helpers import CustomSyntaxHighlighter
from spinetoolbox.spine_engine_manager import make_engine_manager
from spinetoolbox.qthread_pool_executor import QtBasedThreadPoolExecutor
from spine_engine.exception import RemoteEngineInitFailed
[docs]class _CustomLineEdit(QPlainTextEdit):
def __init__(self, console):
super().__init__(console)
self._updating = False
self._console = console
self._current_prompt = ""
self.setStyleSheet("QPlainTextEdit {background-color: transparent; color: transparent; border:none;}")
self.setVerticalScrollBarPolicy(Qt.ScrollBarAlwaysOff)
self.setHorizontalScrollBarPolicy(Qt.ScrollBarAlwaysOff)
self.document().setDocumentMargin(0)
self.setAttribute(Qt.WA_TransparentForMouseEvents)
self.setTabChangesFocus(False)
self.setUndoRedoEnabled(False)
self.setSizePolicy(QSizePolicy.MinimumExpanding, QSizePolicy.MinimumExpanding)
self.cursorPositionChanged.connect(self._handle_cursor_position_changed)
self.textChanged.connect(self._handle_text_changed)
@property
@property
[docs] def new_line_indent(self):
return len(self._current_prompt.lstrip()) # lstrip() is to remove leading '\n'
[docs] def reset(self, current_prompt):
self._current_prompt = current_prompt
self.setPlainText(current_prompt)
[docs] def formatted_text(self):
text = self.raw_text()
if not text:
return ""
lines = iter(text.splitlines())
new_lines = [next(lines).rstrip()] + [line.rstrip()[self.new_line_indent :] for line in lines]
return "\n".join(new_lines)
@Slot()
[docs] def _handle_text_changed(self):
"""Add indent to new lines."""
if self._updating:
return
if not self.raw_text():
return
self._updating = True
cursor = self.textCursor()
for i in range(self.document().blockCount()):
block = self.document().findBlockByNumber(i)
if block.position() < self.min_pos:
continue
if not block.text().startswith(self.new_line_indent * " "):
cursor.setPosition(block.position())
cursor.insertText(self.new_line_indent * " ")
self._updating = False
@Slot()
[docs] def _handle_cursor_position_changed(self):
"""Move cursor away from indent areas."""
if self._updating:
return
self._updating = True
cursor = self.textCursor()
if cursor.position() < self.min_pos:
cursor.setPosition(self.min_pos)
elif cursor.positionInBlock() < self.new_line_indent:
cursor.movePosition(QTextCursor.MoveOperation.StartOfBlock)
cursor.movePosition(QTextCursor.MoveOperation.NextCharacter, n=self.new_line_indent)
self.setTextCursor(cursor)
self._updating = False
[docs] def keyPressEvent(self, ev):
if ev.matches(QKeySequence.Copy):
ev.ignore()
return
if ev.key() == Qt.Key_Backspace:
cursor = self.textCursor()
if cursor.position() == self.min_pos:
return
if cursor.positionInBlock() == self.new_line_indent:
cursor.movePosition(
QTextCursor.MoveOperation.PreviousCharacter,
QTextCursor.MoveMode.KeepAnchor,
n=self.new_line_indent + 1,
)
cursor.removeSelectedText()
return
if ev.key() == Qt.Key_Left:
cursor = self.textCursor()
if cursor.positionInBlock() == self.new_line_indent:
cursor.movePosition(QTextCursor.MoveOperation.PreviousCharacter, n=self.new_line_indent + 1)
self.setTextCursor(cursor)
return
if ev.key() == Qt.Key_Delete:
cursor = self.textCursor()
if cursor.atBlockEnd():
cursor.movePosition(
QTextCursor.MoveOperation.NextCharacter, QTextCursor.MoveMode.KeepAnchor, n=self.new_line_indent + 1
)
cursor.removeSelectedText()
return
if self._console.key_press_event(ev):
return
super().keyPressEvent(ev)
[docs]class PersistentConsoleWidget(QPlainTextEdit):
"""A widget to interact with a persistent process."""
def __init__(self, toolbox, key, language, owner=None):
"""
Args:
toolbox (ToolboxUI)
key (tuple): persistent process identifier
language (str): for syntax highlighting and prompting, etc.
owner (ProjectItemBase, optional): console owner
"""
super().__init__(parent=toolbox)
self._executor = QtBasedThreadPoolExecutor(max_workers=1)
self._updating = False
font = QFontDatabase.systemFont(QFontDatabase.FixedFont)
self.setFont(font)
self.setMaximumBlockCount(self._MAX_LINES_COUNT)
self._toolbox = toolbox
self._key = key
self._is_dead = False
self._language = language
self.owners = {owner}
self._prompt, self._prompt_format = self._make_prompt()
self._prefix = None
self._pending_command_count = 0
self._text_buffer = []
self._skipped = {}
self._anchor = None
self._style = get_style_by_name("monokai")
background_color = self._style.background_color
foreground_color = self._style.styles[Token] or self._style.styles[Token.Text]
self.setStyleSheet(
f"QPlainTextEdit {{background-color: {background_color}; color: {foreground_color}; border: 0px}}"
)
cursor_width = self.fontMetrics().horizontalAdvance("x")
self.setWordWrapMode(QTextOption.WrapAnywhere)
self.setTabStopDistance(4 * cursor_width)
self._line_edit = _CustomLineEdit(self)
self._line_edit.setFont(font)
self._line_edit.setCursorWidth(cursor_width)
self._line_edit.setWordWrapMode(QTextOption.WrapAnywhere)
self._line_edit.setTabStopDistance(4 * cursor_width)
self._highlighter = CustomSyntaxHighlighter(None)
self._highlighter.set_style(self._style)
try:
self._highlighter.lexer = get_lexer_by_name(self._language)
except ClassNotFound:
pass
self._ansi_esc_code_handler = AnsiEscapeCodeHandler(foreground_color, background_color)
self._prompt_block = None
self._current_prompt = ""
self._make_prompt_block(prompt=self._prompt)
self._at_bottom = True
self._flush_in_progress = False
self._flush_timer = QTimer()
self._flush_timer.setInterval(self._FLUSH_INTERVAL)
self._flush_timer.timeout.connect(self._flush_text_buffer)
self._flush_timer.setSingleShot(True)
self.engine_mngr = None
self.setReadOnly(True)
self.document().contentsChanged.connect(self._handle_contents_changed)
self.updateRequest.connect(self._handle_update_request)
self.selectionChanged.connect(self._handle_selection_changed)
self.cursorPositionChanged.connect(self._handle_cursor_position_changed)
self._flush_needed.connect(self._start_flush_timer)
self._line_edit.textChanged.connect(self._update_user_input)
self._command_checked.connect(self._handle_command_checked)
self._msg_available.connect(self._handle_msg_available)
self._command_finished.connect(self._handle_command_finished)
self._history_item_available.connect(self._display_history_item)
self._completions_available.connect(self._display_completions)
self._restarted.connect(self._handle_restarted)
self._killed.connect(self._do_set_killed)
[docs] def name(self):
"""Returns console name for display purposes."""
return f"{self._language.capitalize()} Console - {self.owner_names}"
@property
@property
@property
[docs] def mouseMoveEvent(self, ev):
super().mouseMoveEvent(ev)
if self.anchorAt(ev.position().toPoint()):
self.viewport().setCursor(Qt.PointingHandCursor)
else:
self.viewport().setCursor(Qt.IBeamCursor)
[docs] def mousePressEvent(self, ev):
super().mousePressEvent(ev)
self._anchor = self.anchorAt(ev.position().toPoint())
[docs] def mouseReleaseEvent(self, ev):
super().mouseReleaseEvent(ev)
if self._anchor is None:
return
text_buffer = self._skipped.pop(self._anchor, None)
if text_buffer is None:
return
cursor = self.cursorForPosition(ev.position().toPoint())
cursor.select(cursor.BlockUnderCursor)
cursor.removeSelectedText()
cursor.beginEditBlock()
while text_buffer:
text, with_prompt = text_buffer.pop(0)
self._insert_text(cursor, text, with_prompt)
cursor.endEditBlock()
self._anchor = None
[docs] def scrollContentsBy(self, dx, dy):
super().scrollContentsBy(dx, dy)
scrollbar = self.verticalScrollBar()
self._at_bottom = scrollbar.value() == scrollbar.maximum()
@Slot()
[docs] def _handle_contents_changed(self):
if self._at_bottom:
scrollbar = self.verticalScrollBar()
scrollbar.setValue(scrollbar.maximum())
@Slot()
[docs] def _handle_selection_changed(self):
if self._updating:
return
cursor = self.textCursor()
le_cursor = self._line_edit.textCursor()
le_selection_start = cursor.selectionStart() - self._prompt_block.position()
le_selection_end = cursor.selectionEnd() - self._prompt_block.position()
if le_selection_start < self._line_edit.min_pos and le_selection_end < self._line_edit.min_pos:
le_cursor.clearSelection()
else:
le_selection_start = max(self._line_edit.min_pos, le_selection_start)
le_selection_end = max(self._line_edit.min_pos, le_selection_end)
le_cursor.setPosition(le_selection_start)
le_cursor.setPosition(le_selection_end, QTextCursor.MoveMode.KeepAnchor)
self._line_edit.setTextCursor(le_cursor)
@Slot()
[docs] def _handle_cursor_position_changed(self):
if self._updating:
return
cursor = self.textCursor()
le_cursor = self._line_edit.textCursor()
le_position = cursor.position() - self._prompt_block.position()
if self._line_edit.min_pos <= le_position < self._line_edit.document().characterCount():
le_cursor.setPosition(le_position)
self._line_edit.setTextCursor(le_cursor)
@Slot(QRect, int)
[docs] def _handle_update_request(self, _rect, _dy):
"""Move line edit to input start pos."""
if not self._updating:
self._move_and_resize_line_edit()
[docs] def resizeEvent(self, ev):
super().resizeEvent(ev)
if not self._updating:
self._move_and_resize_line_edit()
[docs] def _move_and_resize_line_edit(self):
if self._prompt_block is None:
return
cursor = self.textCursor()
cursor.setPosition(self._prompt_block.position())
rect = self.cursorRect(cursor)
self._line_edit.move(rect.topLeft())
width = self.geometry().width()
scrollbar = self.verticalScrollBar()
if scrollbar.isVisible():
width -= scrollbar.width()
cursor_width = self._line_edit.cursorWidth()
width = (width // cursor_width) * cursor_width
self._line_edit.setFixedWidth(width)
@Slot()
[docs] def _update_user_input(self):
self._updating = True
cursor = self.textCursor()
cursor.setPosition(self._input_start_pos)
cursor.movePosition(QTextCursor.MoveOperation.End, QTextCursor.MoveMode.KeepAnchor)
text = self._line_edit.raw_text()
cursor.insertText(text)
self._highlight_current_input()
self._updating = False
self._move_and_resize_line_edit()
@Slot()
@Slot()
[docs] def _flush_text_buffer(self):
"""Inserts all text from buffer."""
cursor = self.textCursor()
cursor.beginEditBlock()
k = 0
while self._text_buffer and k < self._MAX_LINES_PER_CYCLE:
cursor.setPosition(self._prompt_block.position() - 1)
text, with_prompt = self._text_buffer.pop(0)
self._insert_text(cursor, text, with_prompt)
k += 1
if self._text_buffer:
address = uuid.uuid4().hex
char_format = cursor.charFormat()
char_format.setBackground(QColor("white"))
char_format.setForeground(QColor("blue"))
char_format.setAnchor(True)
char_format.setAnchorHref(address)
self._skipped[address] = self._text_buffer[-self._MAX_LINES_COUNT :]
cursor.setPosition(self._prompt_block.position() - 1)
cursor.insertBlock(QTextBlockFormat())
cursor.insertText(f"<--- {len(self._text_buffer)} more lines --->", char_format)
self._text_buffer.clear()
cursor.endEditBlock()
self._flush_in_progress = False
[docs] def _make_prompt(self):
text_format = QTextCharFormat()
if self._language == "julia":
prompt = "\njulia> "
text_format.setForeground(Qt.darkGreen)
text_format.setFontWeight(QFont.Bold)
elif self._language == "python":
prompt = ">>> "
else:
prompt = "$ "
return prompt, text_format
[docs] def _make_prompt_block(self, prompt=""):
cursor = self.textCursor()
cursor.movePosition(QTextCursor.MoveOperation.End)
cursor.insertBlock()
self._prompt_block = cursor.block()
self._insert_prompt(prompt=prompt)
[docs] def _insert_prompt(self, prompt=""):
cursor = self.textCursor()
cursor.setPosition(self._prompt_block.position())
cursor.insertText(prompt, self._prompt_format)
cursor.movePosition(QTextCursor.MoveOperation.End)
self._current_prompt = prompt
self._line_edit.reset(self._current_prompt)
self._move_and_resize_line_edit()
[docs] def _insert_stdin_text(self, cursor, text):
"""Inserts highlighted text.
Args:
cursor (QTextCursor)
text (str)
"""
if not text:
cursor.insertText("")
return
lines = iter(text.splitlines())
line = next(lines)
self._do_insert_stdin_text(cursor, line)
for line in lines:
cursor.insertText("\n")
self._do_insert_stdin_text(cursor, line)
[docs] def _do_insert_stdin_text(self, cursor, text):
for start, count, text_format in self._highlighter.yield_formats(text):
chunk = text[start : start + count]
cursor.insertText(chunk, text_format)
[docs] def _insert_stdout_text(self, cursor, text):
"""Inserts ansi highlighted text.
Args:
cursor (QTextCursor)
text (str)
"""
for chunk, text_format in self._ansi_esc_code_handler.parse_text(text):
cursor.insertText(chunk, text_format)
[docs] def _insert_text_before_prompt(self, text, with_prompt=False):
"""Inserts given text before the prompt. Used when adding input and output from external execution.
Args:
text (str)
"""
self._text_buffer.append((text, with_prompt))
if not self._flush_in_progress:
self._flush_in_progress = True
self._flush_needed.emit()
[docs] def _insert_text(self, cursor, text, with_prompt):
cursor.insertBlock(QTextBlockFormat())
if with_prompt:
cursor.insertText(self._prompt, self._prompt_format)
self._insert_stdin_text(cursor, text)
else:
self._insert_stdout_text(cursor, text)
[docs] def set_killed(self, killed):
"""Emits the ``killed`` signal.
Args:
killed (bool): if True, may the console rest in peace
"""
self._killed.emit(killed)
@Slot(bool)
[docs] def _do_set_killed(self, killed):
"""Sets the console as killed or alive.
Args:
killed (bool): if True, may the console rest in peace
"""
self._is_dead = killed
self._line_edit.setVisible(not killed)
if killed:
self._make_prompt_block("Console killed (can be restarted from the right-click context menu)")
[docs] def add_stdin(self, data):
"""Adds new prompt with data. Used when adding stdin from external execution.
Args:
data (str)
"""
self._insert_text_before_prompt(data, with_prompt=True)
[docs] def add_stdout(self, data):
"""Adds new line to stdout. Used when adding stdout from external execution.
Args:
data (str)
"""
self._insert_text_before_prompt(data)
[docs] def add_stderr(self, data):
"""Adds new line to stderr. Used when adding stderr from external execution.
Args:
data (str)
"""
self._insert_text_before_prompt(data)
[docs] def _get_prefix(self):
le_cursor = self._line_edit.textCursor()
le_cursor.setPosition(self._line_edit.min_pos, QTextCursor.MoveMode.KeepAnchor)
return le_cursor.selectedText().rstrip()
[docs] def _highlight_current_input(self):
cursor = self.textCursor()
cursor.setPosition(self._input_start_pos)
cursor.movePosition(QTextCursor.MoveOperation.End, QTextCursor.MoveMode.KeepAnchor)
text = cursor.selectedText()
for start, count, text_format in self._highlighter.yield_formats(text):
start += self._input_start_pos
cursor.setPosition(start)
cursor.setPosition(start + count, QTextCursor.MoveMode.KeepAnchor)
cursor.setCharFormat(text_format)
cursor.movePosition(QTextCursor.MoveOperation.NextCharacter)
cursor.setCharFormat(QTextCharFormat())
[docs] def key_press_event(self, ev):
"""Handles key press event from line edit.
Returns:
True if handled, False if not.
"""
self._at_bottom = True
text = self._get_current_text()
if ev.key() in (Qt.Key_Return, Qt.Key_Enter):
self._issue_command(text)
elif ev.key() == Qt.Key_Up:
self._move_history(text, True)
elif ev.key() == Qt.Key_Down:
self._move_history(text, False)
elif ev.key() == Qt.Key_Tab:
self._autocomplete(text)
else:
return False
return True
[docs] def create_engine_manager(self):
"""Returns a new local or remote spine engine manager or
an existing remote spine engine manager.
Returns None if connecting to Spine Engine Server fails."""
exec_remotely = self._toolbox.qsettings().value("engineSettings/remoteExecutionEnabled", "false") == "true"
if exec_remotely:
if self.engine_mngr:
return self.engine_mngr
self.engine_mngr = make_engine_manager(exec_remotely)
host, port, security, sec_folder = self._toolbox.engine_server_settings()
try:
self.engine_mngr.make_engine_client(host, port, security, sec_folder)
except RemoteEngineInitFailed as e:
self._toolbox.msg_error.emit(f"Connecting to Spine Engine Server failed. {e}")
return None
return self.engine_mngr
else:
engine_mngr = make_engine_manager(exec_remotely)
return engine_mngr
[docs] def _issue_command(self, text):
"""Issues command.
Args:
text (str)
"""
self._executor.submit(self._do_check_command, text)
[docs] def _do_check_command(self, text):
if not text.strip(): # Don't send empty command to execution manager
self._make_prompt_block(prompt=self._prompt)
return
engine_mngr = self.create_engine_manager()
if not engine_mngr:
return
complete = engine_mngr.is_persistent_command_complete(self._key, text)
self._command_checked.emit(text, complete)
@Slot(str, bool)
[docs] def _handle_command_checked(self, text, complete):
"""Issues command.
Args:
text (str)
"""
if not complete:
self._line_edit.new_line()
return
self._make_prompt_block(prompt="")
self._executor.submit(self._do_issue_command, text)
[docs] def _do_issue_command(self, text):
engine_mngr = self.create_engine_manager()
if not engine_mngr:
return
log_stdin = bool(self._pending_command_count)
self._pending_command_count += 1
for msg in engine_mngr.issue_persistent_command(self._key, text):
if msg["type"] != "stdin" or log_stdin:
self._msg_available.emit(msg["type"], msg["data"])
self._command_finished.emit()
@Slot(str, str)
[docs] def _handle_msg_available(self, msg_type, text):
if msg_type == "stdin":
self.add_stdin(text)
elif msg_type == "stdout":
self.add_stdout(text)
elif msg_type == "stderr":
self.add_stderr(text)
@Slot()
[docs] def _handle_command_finished(self):
self._pending_command_count -= 1
if self._pending_command_count == 0:
self._insert_prompt(prompt=self._prompt)
[docs] def _move_history(self, text, backwards):
"""Moves history."""
self._executor.submit(self._do_move_history, text, backwards)
[docs] def _do_move_history(self, text, backwards):
engine_mngr = self.create_engine_manager()
if not engine_mngr:
return
prefix = self._get_prefix()
history_item = engine_mngr.get_persistent_history_item(self._key, text, prefix, backwards)
self._history_item_available.emit(history_item, prefix)
@Slot(str, str)
[docs] def _display_history_item(self, history_item, prefix):
self._line_edit.set_raw_text(history_item)
if prefix:
le_cursor = self._line_edit.textCursor()
le_cursor.setPosition(self._line_edit.min_pos + len(prefix))
self._line_edit.setTextCursor(le_cursor)
[docs] def _autocomplete(self, text):
"""Autocompletes current text in the prompt (or output options if multiple matches).
Args:
text (str)
"""
prev_char = self._line_edit.document().characterAt(self._line_edit.textCursor().position() - 1)
if prev_char.isspace():
le_cursor = self._line_edit.textCursor()
le_cursor.insertText(4 * " ")
self._line_edit.setTextCursor(le_cursor)
return
self._executor.submit(self._do_autocomplete, text)
[docs] def _do_autocomplete(self, text):
engine_mngr = self.create_engine_manager()
if not engine_mngr:
return
prefix = self._get_prefix()
completions = engine_mngr.get_persistent_completions(self._key, prefix)
self._completions_available.emit(text, prefix, completions)
@Slot(str, str, list)
[docs] def _display_completions(self, text, prefix, completions):
completion = os.path.commonprefix(completions)
if prefix.endswith(completion) and len(completions) > 1:
# Can't complete, but there is more than one option: 'commit' stdin and output options to stdout
self.add_stdin(text)
self.add_stdout("\t\t".join(completions))
le_cursor = self._line_edit.textCursor()
self._line_edit.set_raw_text(text)
self._line_edit.setTextCursor(le_cursor)
else:
# Complete in current line
last_prefix_word = prefix.split(" ")[-1]
text_to_insert = completion[len(last_prefix_word) :]
index = len(prefix)
new_text = text[:index] + text_to_insert + text[index:]
self._line_edit.set_raw_text(new_text)
le_cursor = self._line_edit.textCursor()
le_cursor.setPosition(self._line_edit.min_pos + index + len(text_to_insert))
self._line_edit.setTextCursor(le_cursor)
@Slot(bool)
[docs] def _restart_persistent(self, _=False):
"""Restarts underlying persistent process."""
self._updating = True
self.clear()
self._make_prompt_block("")
self._updating = False
self._text_buffer.clear()
self._executor.submit(self._do_restart_persistent)
[docs] def _do_restart_persistent(self):
engine_mngr = self.create_engine_manager()
if not engine_mngr:
return
for msg in engine_mngr.restart_persistent(self._key):
self._msg_available.emit(msg["type"], msg["data"])
self._restarted.emit()
@Slot()
[docs] def _handle_restarted(self):
self._do_set_killed(False)
self._make_prompt_block(prompt=self._prompt)
@Slot(bool)
[docs] def _interrupt_persistent(self, _=False):
"""Sends a task to executor which will interrupt the underlying persistent process."""
self._executor.submit(self._do_interrupt_persistent)
[docs] def _do_interrupt_persistent(self):
"""Interrupts the underlying persistent process."""
engine_mngr = self.create_engine_manager()
if not engine_mngr:
return
engine_mngr.interrupt_persistent(self._key)
@Slot(bool)
[docs] def _kill_persistent(self, _=False):
"""Sends a task to executor which will kill the underlying persistent process."""
self._do_set_killed(True)
self._executor.submit(self._do_kill_persistent)
[docs] def _do_kill_persistent(self):
"""Kills underlying persistent process."""
engine_mngr = self.create_engine_manager()
if not engine_mngr:
return
engine_mngr.kill_persistent(self._key)
[docs] def contextMenuEvent(self, ev):
"""Reimplemented to extend menu with custom actions."""
le_geom = self._line_edit.frameGeometry()
menu = (
self._line_edit.createStandardContextMenu()
if le_geom.contains(ev.pos())
else self.createStandardContextMenu()
)
self._extend_menu(menu)
menu.exec(ev.globalPos())
# Translated from
# https://code.qt.io/cgit/qt-creator/qt-creator.git/tree/src/libs/utils/ansiescapecodehandler.cpp?h=master
# TODO: Consider qtconsole's QtAnsiCodeProcessor
[docs]class AnsiEscapeCodeHandler:
def __init__(self, fg_color, bg_color):
self._previous_format_closed = True
self._previous_format = QTextCharFormat()
self._pending_text = ""
self._bg_color = QColor(bg_color)
self._fg_color = QColor(fg_color)
[docs] def _make_default_format(self):
default_format = QTextCharFormat()
default_format.setBackground(self._bg_color)
default_format.setForeground(self._fg_color)
return default_format
[docs] def setFormatScope(self, char_format):
self._previous_format = char_format
self._previous_format_closed = False
[docs] def parse_text(self, text):
class AnsiEscapeCode:
ResetFormat = 0
BoldText = 1
FaintText = 2
ItalicText = 3
NormalIntensity = 22
NotItalic = 23
TextColorStart = 30
TextColorEnd = 37
RgbTextColor = 38
DefaultTextColor = 39
BackgroundColorStart = 40
BackgroundColorEnd = 47
RgbBackgroundColor = 48
DefaultBackgroundColor = 49
BrightTextColorStart = 90
BrightTextColorEnd = 97
BrightBackgroundColorStart = 100
BrightBackgroundColorEnd = 107
escape = "\x1b["
semicolon = ";"
color_terminator = "m"
erase_to_eol = "K"
char_format = self._make_default_format() if self._previous_format_closed else self._previous_format
stripped_text = self._pending_text + text
self._pending_text = ""
while stripped_text:
if self._pending_text:
break
try:
escape_pos = stripped_text.index(escape[0])
except ValueError:
yield stripped_text, char_format
break
if escape_pos != 0:
yield stripped_text[:escape_pos], char_format
stripped_text = stripped_text[escape_pos:]
if stripped_text[0] != escape[0]:
break
while stripped_text and escape[0] == stripped_text[0]:
if escape.startswith(stripped_text):
# control sequence is not complete
self._pending_text += stripped_text
stripped_text = ""
break
if not stripped_text.startswith(escape):
# not a control sequence
self._pending_text = ""
yield stripped_text[:1], char_format
stripped_text = stripped_text[1:]
continue
self._pending_text += stripped_text[: len(escape)]
stripped_text = stripped_text[len(escape) :]
# \e[K is not supported. Just strip it.
if stripped_text.startswith(erase_to_eol):
self._pending_text = ""
stripped_text = stripped_text[1:]
continue
# get the number
str_number = ""
numbers = []
while stripped_text:
if stripped_text[0].isdigit():
str_number += stripped_text[0]
else:
if str_number:
numbers.append(str_number)
if not str_number or stripped_text[0] != semicolon:
break
str_number = ""
self._pending_text += stripped_text[0:1]
stripped_text = stripped_text[1:]
if not stripped_text:
break
# remove terminating char
if not stripped_text.startswith(color_terminator):
self._pending_text = ""
stripped_text = stripped_text[1:]
break
# got consistent control sequence, ok to clear pending text
self._pending_text = ""
stripped_text = stripped_text[1:]
if not numbers:
char_format = self._make_default_format()
self.endFormatScope()
for i in range(len(numbers)): # pylint: disable=consider-using-enumerate
code = int(numbers[i])
if AnsiEscapeCode.TextColorStart <= code <= AnsiEscapeCode.TextColorEnd:
char_format.setForeground(_ansi_color(code - AnsiEscapeCode.TextColorStart))
self.setFormatScope(char_format)
elif AnsiEscapeCode.BrightTextColorStart <= code <= AnsiEscapeCode.BrightTextColorEnd:
char_format.setForeground(_ansi_color(code - AnsiEscapeCode.BrightTextColorStart, bright=True))
self.setFormatScope(char_format)
elif AnsiEscapeCode.BackgroundColorStart <= code <= AnsiEscapeCode.BackgroundColorEnd:
char_format.setBackground(_ansi_color(code - AnsiEscapeCode.BackgroundColorStart))
self.setFormatScope(char_format)
elif AnsiEscapeCode.BrightBackgroundColorStart <= code <= AnsiEscapeCode.BrightBackgroundColorEnd:
char_format.setBackground(
_ansi_color(code - AnsiEscapeCode.BrightBackgroundColorStart, bright=True)
)
self.setFormatScope(char_format)
else:
if code == AnsiEscapeCode.ResetFormat:
char_format = self._make_default_format()
self.endFormatScope()
break
if code == AnsiEscapeCode.BoldText:
char_format.setFontWeight(QFont.Bold)
self.setFormatScope(char_format)
break
if code == AnsiEscapeCode.FaintText:
char_format.setFontWeight(QFont.Light)
self.setFormatScope(char_format)
break
if code == AnsiEscapeCode.ItalicText:
char_format.setFontItalic(True)
self.setFormatScope(char_format)
break
if code == AnsiEscapeCode.NormalIntensity:
char_format.setFontWeight(QFont.Normal)
self.setFormatScope(char_format)
break
if code == AnsiEscapeCode.NotItalic:
char_format.setFontItalic(False)
self.setFormatScope(char_format)
break
if code == AnsiEscapeCode.DefaultTextColor:
char_format.setForeground(self._fg_color)
self.setFormatScope(char_format)
break
if code == AnsiEscapeCode.DefaultBackgroundColor:
char_format.setBackground(self._bg_color)
self.setFormatScope(char_format)
break
if code == AnsiEscapeCode.RgbBackgroundColor:
# See http://en.wikipedia.org/wiki/ANSI_escape_code#Colors
i += 1
if i >= len(numbers):
break
j = int(numbers[i])
if j == 2:
# RGB set with format: 38;2;<r>;<g>;<b>
if i + 3 < len(numbers):
color = QColor(int(numbers[i + 1]), int(numbers[i + 2]), int(numbers[i + 3]))
if code == AnsiEscapeCode.RgbTextColor:
char_format.setForeground(color)
else:
char_format.setBackground(color)
self.setFormatScope(char_format)
i += 3
break
if j == 5:
# 256 color mode with format: 38;5;<i>
index = int(numbers[i + 1])
color = QColor()
if index < 8:
# The first 8 colors are standard low-intensity ANSI colors.
color = _ansi_color(index)
elif index < 16:
# The next 8 colors are standard high-intensity ANSI colors.
color = _ansi_color(index - 8).lighter(150)
elif index < 232:
# The next 216 colors are a 6x6x6 RGB cube.
o = index - 16
color = QColor((o / 36) * 51, ((o / 6) % 6) * 51, (o % 6) * 51)
else:
# The last 24 colors are a greyscale gradient.
grey = int((index - 232) * 11)
color = QColor(grey, grey, grey)
if code == AnsiEscapeCode.RgbTextColor:
char_format.setForeground(color)
else:
char_format.setBackground(color)
self.setFormatScope(char_format)
i += 1
break
break
[docs]def _ansi_color(code, bright=False):
if code >= 8:
return QColor()
on = 170 if not bright else 255
off = 0 if not bright else 85
red = on if code & 1 else off
green = on if code & 2 else off
blue = on if code & 4 else off
return QColor(red, green, blue)