######################################################################################################################
# Copyright (C) 2017-2022 Spine project consortium
# Copyright Spine Toolbox contributors
# This file is part of Spine Toolbox.
# Spine Toolbox is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option)
# any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
# Public License for more details. You should have received a copy of the GNU Lesser General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
######################################################################################################################
"""General helper functions and classes."""
import functools
import time
from enum import Enum, unique
import itertools
import os
import glob
from html.parser import HTMLParser
import json
import logging
import datetime
import shutil
import re
import pathlib
import bisect
from contextlib import contextmanager
import tempfile
from typing import Sequence
import matplotlib
from PySide6.QtCore import Qt, Slot, QFile, QIODevice, QSize, QRect, QPoint, QUrl, QObject, QEvent
from PySide6.QtCore import __version__ as qt_version
from PySide6.QtCore import __version_info__ as qt_version_info
from PySide6.QtWidgets import QApplication, QMessageBox, QFileIconProvider, QStyle, QFileDialog, QInputDialog, QSplitter
from PySide6.QtGui import (
QGuiApplication,
QCursor,
QImageReader,
QPixmap,
QIcon,
QIconEngine,
QStandardItemModel,
QStandardItem,
QDesktopServices,
QKeySequence,
QPalette,
QSyntaxHighlighter,
QTextCharFormat,
QBrush,
QColor,
QFont,
QPainter,
QUndoCommand,
)
from spine_engine.utils.serialization import deserialize_path
from spinedb_api.spine_io.gdx_utils import find_gams_directory
from spinedb_api.helpers import group_consecutive
from .config import (
DEFAULT_WORK_DIR,
PLUGINS_PATH,
PROJECT_FILENAME,
PROJECT_LOCAL_DATA_DIR_NAME,
PROJECT_LOCAL_DATA_FILENAME,
SPECIFICATION_LOCAL_DATA_FILENAME,
)
if os.name == "nt":
import ctypes
matplotlib.use("Qt5Agg")
matplotlib.rcParams.update({"font.size": 8})
logging.getLogger("matplotlib").setLevel(logging.WARNING)
[docs]_matplotlib_version = [int(x) for x in matplotlib.__version__.split(".") if x.isdigit()]
if _matplotlib_version[0] == 3 and _matplotlib_version[1] == 0:
from pandas.plotting import register_matplotlib_converters
register_matplotlib_converters()
@unique
[docs]class LinkType(Enum):
"""Graphics scene's link types."""
[docs] CONNECTION = "connection"
[docs]def home_dir():
"""Returns user's home dir"""
return str(pathlib.Path.home())
[docs]def busy_effect(func):
"""Decorator to change the mouse cursor to 'busy' while a function is processed.
Args:
func (Callable): Decorated function.
"""
@functools.wraps(func)
def new_function(*args, **kwargs):
# noinspection PyTypeChecker, PyArgumentList, PyCallByClass
QApplication.setOverrideCursor(QCursor(Qt.BusyCursor))
try:
return func(*args, **kwargs)
finally:
# noinspection PyArgumentList
QApplication.restoreOverrideCursor()
return new_function
[docs]def create_dir(base_path, folder="", verbosity=False):
"""Create (input/output) directories recursively.
Args:
base_path (str): Absolute path to wanted dir
folder (str): (Optional) Folder name. Usually short name of item.
verbosity (bool): True prints a message that tells if the directory already existed or if it was created.
Raises:
OSError if operation failed.
"""
directory = os.path.join(base_path, folder)
if os.path.exists(directory) and verbosity:
logging.debug("Directory found: %s", directory)
else:
os.makedirs(directory, exist_ok=True)
if verbosity:
logging.debug("Directory created: %s", directory)
[docs]def rename_dir(old_dir, new_dir, toolbox, box_title):
"""Renames directory.
Args:
old_dir (str): Absolute path to directory that will be renamed
new_dir (str): Absolute path to new directory
toolbox (ToolboxUI): A toolbox to log messages and ask questions.
box_title (str): The title of the message boxes, (e.g. "Undoing 'rename DC1 to DC2'")
Returns:
bool: True if operation was successful, False otherwise
"""
if os.path.exists(new_dir):
msg = "Directory <b>{0}</b> already exists.<br/><br/>Would you like to overwrite its contents?".format(new_dir)
box = QMessageBox(
QMessageBox.Icon.Question,
box_title,
msg,
buttons=QMessageBox.StandardButton.Ok | QMessageBox.StandardButton.Cancel,
parent=toolbox,
)
box.button(QMessageBox.StandardButton.Ok).setText("Overwrite")
answer = box.exec()
if answer != QMessageBox.StandardButton.Ok:
return False
shutil.rmtree(new_dir)
try:
shutil.move(old_dir, new_dir)
except FileExistsError:
# This is unlikely because of the above `if`, but still possible since another concurrent process
# might have done things in between
msg = "Directory<br/><b>{0}</b><br/>already exists".format(new_dir)
toolbox.information_box.emit(box_title, msg)
return False
except PermissionError as pe_e:
logging.error(pe_e)
msg = (
"Access to directory <br/><b>{0}</b><br/>denied."
"<br/><br/>Possible reasons:"
"<br/>1. You don't have a permission to edit the directory"
"<br/>2. Windows Explorer is open in the directory"
"<br/><br/>Check these and try again.".format(old_dir)
)
toolbox.information_box.emit(box_title, msg)
return False
except OSError as os_e:
logging.error(os_e)
msg = (
"Renaming directory "
"<br/><b>{0}</b> "
"<br/>to "
"<br/><b>{1}</b> "
"<br/>failed."
"<br/><br/>Possibly reasons:"
"<br/>1. Windows Explorer is open in the directory."
"<br/>2. A file in the directory is open in another program. "
"<br/><br/>Check these and try again.".format(old_dir, new_dir)
)
toolbox.information_box.emit(box_title, msg)
return False
return True
[docs]def open_url(url):
"""Opens the given url in the appropriate Web browser for the user's desktop environment,
and returns true if successful; otherwise returns false.
If the URL is a reference to a local file (i.e., the URL scheme is "file") then it will
be opened with a suitable application instead of a Web browser.
Handle return value on caller side.
Args:
url(str): URL to open
Returns:
bool: True if successful, False otherwise
"""
return QDesktopServices.openUrl(QUrl(url, QUrl.TolerantMode))
[docs]def set_taskbar_icon():
"""Set application icon to Windows taskbar."""
if os.name == "nt":
myappid = "{6E794A8A-E508-47C4-9319-1113852224D3}"
ctypes.windll.shell32.SetCurrentProcessExplicitAppUserModelID(myappid)
@Slot()
[docs]def pyside6_version_check():
"""Check that PySide6 version is at least 6.4.
qt_version (str) is the Qt version used to compile PySide6. E.g. "6.4.1"
qt_version_info (tuple) contains each version component separately e.g. (6, 4, 1)
"""
if not (qt_version_info[0] == 6 and qt_version_info[1] >= 4):
print(
f"""Sorry for the inconvenience but,
Spine Toolbox does not support PySide6 version {qt_version}.
At the moment, PySide6 version must be 6.4 or greater.
To upgrade PySide6 to latest supported version, run
pip install -r requirements.txt --upgrade
And start the application again.
"""
)
return False
return True
[docs]def get_datetime(show, date=True):
"""Returns date and time string for appending into Event Log messages.
Args:
show (bool): True returns date and time string. False returns empty string.
date (bool): Whether or not the date should be included in the result
Returns:
str: datetime string or empty string if show is False
"""
if not show:
return ""
t = datetime.datetime.now()
time_str = "{:02d}:{:02d}:{:02d}".format(t.hour, t.minute, t.second)
if not date:
return "[{}] ".format(time_str)
date_str = "{:02d}-{:02d}-{:02d}".format(t.day, t.month, t.year)
return "[{} {}] ".format(date_str, time_str)
@busy_effect
[docs]def copy_files(src_dir, dst_dir, includes=None, excludes=None):
"""Function for copying files. Does not copy folders.
Args:
src_dir (str): Source directory
dst_dir (str): Destination directory
includes (list, optional): Included files (wildcards accepted)
excludes (list, optional): Excluded files (wildcards accepted)
Returns:
count (int): Number of files copied
"""
if includes is None:
includes = ["*"]
if excludes is None:
excludes = []
src_files = []
for pattern in includes:
src_files += glob.glob(os.path.join(src_dir, pattern))
exclude_files = []
for pattern in excludes:
exclude_files += glob.glob(os.path.join(src_dir, pattern))
count = 0
for filename in src_files:
if os.path.isdir(filename):
continue
if filename not in exclude_files:
shutil.copy(filename, dst_dir)
count += 1
return count
@busy_effect
[docs]def erase_dir(path, verbosity=False):
"""Deletes a directory and all its contents without prompt.
Args:
path (str): Path to directory
verbosity (bool): Print logging messages or not
Returns:
bool: True if operation was successful, False otherwise
"""
if not os.path.exists(path):
if verbosity:
logging.debug("Path does not exist: %s", path)
return False
if verbosity:
logging.debug("Deleting directory %s", path)
shutil.rmtree(path)
return True
@busy_effect
[docs]def recursive_overwrite(logger, src, dst, ignore=None, silent=True):
"""Copies everything from source directory to destination directory recursively.
Overwrites existing files.
Args:
logger (LoggerInterface): Enables e.g. printing to Event Log
src (str): Source directory
dst (str): Destination directory
ignore (Callable, optional): Ignore function
silent (bool): If False, messages are sent to Event Log, If True, copying is done in silence
"""
if os.path.isdir(src):
if not os.path.isdir(dst):
if not silent:
logger.msg.emit("Creating directory <b>{0}</b>".format(dst))
os.makedirs(dst)
files = os.listdir(src)
for file_name in list(files):
# Avoid ending up in 'dst' as this would result in infinite recursion.
file_path = os.path.join(src, file_name)
if os.path.samefile(os.path.commonpath((file_path, dst)), file_path):
files.remove(file_name)
break
if ignore is not None:
ignored = ignore(src, files)
else:
ignored = set()
for f in files:
if f not in ignored:
recursive_overwrite(logger, os.path.join(src, f), os.path.join(dst, f), ignore, silent)
else:
if not silent:
_, src_filename = os.path.split(src)
dst_dir, _ = os.path.split(dst)
logger.msg.emit("Copying <b>{0}</b> -> <b>{1}</b>".format(src_filename, dst_dir))
shutil.copyfile(src, dst)
[docs]def tuple_itemgetter(itemgetter_func, num_indexes):
"""Change output of itemgetter to always be a tuple even for a single index.
Args:
itemgetter_func (Callable): item getter function
num_indexes (int): number of indexes
Returns:
Callable: getter function that works with a single index
"""
return (lambda item: (itemgetter_func(item),)) if num_indexes == 1 else itemgetter_func
[docs]def rows_to_row_count_tuples(rows):
"""Breaks a list of rows into a list of (row, count) tuples corresponding to chunks of successive rows.
Args:
rows (Iterable of int): rows
Returns:
list of tuple: row count tuples
"""
return [(first, last - first + 1) for first, last in group_consecutive(rows)]
[docs]class IconListManager:
"""A class to manage icons for icon list widgets."""
def __init__(self, icon_size):
"""
Args:
icon_size (QSize): icon's size
"""
self._icon_size = icon_size
self.searchterms = {}
self.model = QStandardItemModel()
self.model.data = self._model_data
@busy_effect
[docs] def init_model(self):
"""Init model that can be used to display all icons in a list."""
if self.searchterms:
return
qfile = QFile(":/fonts/fontawesome5-searchterms.json")
qfile.open(QIODevice.ReadOnly | QIODevice.Text)
data = str(qfile.readAll().data(), "utf-8")
qfile.close()
self.searchterms = json.loads(data)
items = []
for codepoint, searchterms in self.searchterms.items():
item = QStandardItem()
display_icon = int(codepoint, 16)
item.setData(display_icon, Qt.ItemDataRole.UserRole)
item.setData(searchterms, Qt.ItemDataRole.UserRole + 1)
items.append(item)
self.model.invisibleRootItem().appendRows(items)
[docs] def _model_data(self, index, role):
"""Creates pixmaps as they're requested by the data() method, to reduce loading time.
Args:
index (QModelIndex): index to the model
role (int): data role
Returns:
Any: role-dependent model data
"""
if role == Qt.ItemDataRole.DisplayRole:
return None
if role != Qt.ItemDataRole.DecorationRole:
return QStandardItemModel.data(self.model, index, role)
display_icon = index.data(Qt.ItemDataRole.UserRole)
return object_icon(display_icon)
[docs]def object_icon(display_icon):
"""Creates and returns a QIcon corresponding to display_icon.
Args:
display_icon (int): icon id
Returns:
QIcon: requested icon
"""
icon_code, color_code = interpret_icon_id(display_icon)
engine = CharIconEngine(chr(icon_code), color_code)
return QIcon(engine)
[docs]class TransparentIconEngine(QIconEngine):
"""Specialization of QIconEngine with transparent background."""
[docs] def pixmap(self, size=QSize(512, 512), mode=None, state=None):
pm = QPixmap(size)
pm.fill(Qt.transparent)
self.paint(QPainter(pm), QRect(QPoint(0, 0), size), mode, state)
return pm
[docs]class CharIconEngine(TransparentIconEngine):
"""Specialization of QIconEngine used to draw font-based icons."""
def __init__(self, char, color=None):
"""
Args:
char (str): character to use as the icon
color (QColor, optional):
"""
super().__init__()
self.char = char
self.color = QColor(color)
self.font = QFont("Font Awesome 5 Free Solid")
[docs] def paint(self, painter, rect, mode=None, state=None):
painter.save()
size = 0.875 * round(min(rect.width(), rect.height()))
self.font.setPixelSize(max(1, size))
painter.setFont(self.font)
if self.color:
color = self.color
else:
palette = QPalette(QApplication.palette())
if mode == QIcon.Disabled:
palette.setCurrentColorGroup(QPalette.Disabled)
elif mode == QIcon.Active:
palette.setCurrentColorGroup(QPalette.Active)
color = palette.buttonText().color()
painter.setPen(color)
painter.drawText(rect, Qt.AlignCenter | Qt.AlignVCenter, self.char)
painter.restore()
[docs]class ColoredIcon(QIcon):
def __init__(self, icon_file_name, icon_color, icon_size, colored=None):
self._engine = ColoredIconEngine(icon_file_name, icon_color, icon_size, colored=colored)
super().__init__(self._engine)
[docs] def set_colored(self, colored):
self._engine.set_colored(colored)
[docs] def color(self, mode=QIcon.Normal):
return self._engine.color(mode=mode)
[docs]class ColoredIconEngine(QIconEngine):
def __init__(self, icon_file_name, icon_color, icon_size, colored=None):
super().__init__()
self._icon = QIcon(icon_file_name)
self._icon_color = icon_color
self._base_pixmap = self._icon.pixmap(icon_size)
self._colored = None
self._pixmaps = {}
self.set_colored(colored)
[docs] def color(self, mode=QIcon.Normal):
color = self._icon_color if self._colored else QColor("black")
if mode == QIcon.Disabled:
r, g, b, a = color.getRgbF()
tint = 0.37255
color = QColor.fromRgbF(r + (1.0 - r) * tint, g + (1.0 - g) * tint, b + (1.0 - b) * tint, a)
return color
[docs] def set_colored(self, colored):
if self._colored == colored:
return
self._colored = colored
self._pixmaps.clear()
[docs] def _do_make_pixmap(self, mode, state):
color = self.color(mode)
return color_pixmap(self._base_pixmap, color)
[docs] def _make_pixmap(self, mode, state):
if (mode, state) not in self._pixmaps:
self._pixmaps[mode, state] = self._do_make_pixmap(mode, state)
return self._pixmaps[mode, state]
[docs] def pixmap(self, size, mode, state):
return self._make_pixmap(mode, state).scaled(
self._icon.actualSize(size), Qt.KeepAspectRatio, Qt.SmoothTransformation
)
[docs]def color_pixmap(pixmap, color):
img = pixmap.toImage()
for y in range(img.height()):
for x in range(img.width()):
color.setAlpha(img.pixelColor(x, y).alpha())
img.setPixelColor(x, y, color)
return QPixmap.fromImage(img)
[docs]def make_icon_id(icon_code, color_code):
"""Takes icon and color codes, and return equivalent integer.
Args:
icon_code (int):icon's code
color_code (int): color code
Returns:
int: icon id
"""
return icon_code + (color_code << 16)
[docs]def interpret_icon_id(display_icon):
"""Takes a display icon id and returns an equivalent tuple of icon and color code.
Args:
display_icon (int, optional): icon id
Returns:
tuple: icon's code, color code
"""
if not isinstance(display_icon, int) or display_icon < 0:
return 0xF1B2, 0
icon_code = display_icon & 65535
try:
color_code = display_icon >> 16
except OverflowError:
color_code = 0
return icon_code, color_code
[docs]def default_icon_id():
"""Creates a default icon id.
Returns:
int: default icon's id
"""
return make_icon_id(*interpret_icon_id(None))
[docs]class ProjectDirectoryIconProvider(QFileIconProvider):
"""QFileIconProvider that provides a Spine icon to the
Open Project Dialog when a Spine Toolbox project
directory is encountered."""
def __init__(self):
super().__init__()
self.spine_icon = QIcon(":/symbols/Spine_symbol.png")
[docs] def icon(self, info):
"""Returns an icon for the file described by info.
Args:
info (QFileInfo): File (or directory) info
Returns:
QIcon: Icon for a file system resource with the given info
"""
if isinstance(info, QFileIconProvider.IconType):
return super().icon(info) # Because there are two icon() methods
if not info.isDir():
return super().icon(info)
p = info.filePath()
# logging.debug("In dir:{0}".format(p))
if os.path.exists(os.path.join(p, ".spinetoolbox")):
# logging.debug("found project dir:{0}".format(p))
return self.spine_icon
return super().icon(info)
[docs]def ensure_window_is_on_screen(window, size):
"""
Checks if window is on screen and if not, moves and resizes it to make it visible on the primary screen.
Args:
window (QWidget): a window to check
size (QSize): desired window size if the window is moved
"""
window_geometry = window.frameGeometry()
widget_center = window_geometry.center()
screens = QApplication.screens()
widget_inside_screen = False
for screen in screens:
screen_geometry = screen.geometry()
if screen_geometry.contains(widget_center):
widget_inside_screen = True
break
if not widget_inside_screen:
primary_screen = QApplication.primaryScreen()
screen_geometry = primary_screen.availableGeometry()
window.setGeometry(QStyle.alignedRect(Qt.LeftToRight, Qt.AlignCenter, size, screen_geometry))
[docs]def first_non_null(s):
"""Returns the first element in Iterable s that is not None."""
try:
return next(itertools.dropwhile(lambda x: x is None, s))
except StopIteration:
return None
[docs]def get_save_file_name_in_last_dir(qsettings, key, parent, caption, given_dir, filter_=""):
"""Calls QFileDialog.getSaveFileName in the directory that was selected last time the dialog was accepted.
Args:
qsettings (QSettings): A QSettings object where the last directory is stored
key (string): The name of the entry in the above QSettings
parent, caption, given_dir, filter_: Args passed to QFileDialog.getSaveFileName
Returns:
str: filename
str: selected filter
"""
dir_ = qsettings.value(key, defaultValue=given_dir)
filename, selected_filter = QFileDialog.getSaveFileName(parent, caption, dir_, filter_)
if filename:
qsettings.setValue(key, os.path.dirname(filename))
return filename, selected_filter
[docs]def get_open_file_name_in_last_dir(qsettings, key, parent, caption, given_dir, filter_=""):
dir_ = qsettings.value(key, defaultValue=given_dir)
filename, selected_filter = QFileDialog.getOpenFileName(parent, caption, dir_, filter_)
if filename:
qsettings.setValue(key, os.path.dirname(filename))
return filename, selected_filter
[docs]def try_number_from_string(text):
"""Tries to convert a string to integer or float.
Args:
text (str): string to convert
Returns:
int or float or str: converted value or text if conversion failed
"""
try:
return int(text)
except ValueError:
try:
return float(text)
except ValueError:
return text
except TypeError:
return None
[docs]class ChildCyclingKeyPressFilter(QObject):
"""Event filter class for catching next and previous child key presses.
Used in filtering the Ctrl+Tab and Ctrl+Shift+Tab key presses in the
Item Properties tab widget."""
[docs] def eventFilter(self, obj, event):
if event.type() == QEvent.KeyPress:
if event.matches(QKeySequence.NextChild) or event.matches(QKeySequence.PreviousChild):
return True
return QObject.eventFilter(self, obj, event) # Pass event further
[docs]def select_gams_executable(parent, line_edit):
"""Opens file browser where user can select a Gams executable (i.e. gams.exe on Windows).
Args:
parent (QWidget, optional): Parent widget for the file dialog and message boxes
line_edit (QLineEdit): Line edit where the selected path will be inserted
"""
start_dir = find_gams_directory()
if not start_dir:
start_dir = home_dir()
# noinspection PyCallByClass, PyTypeChecker, PyArgumentList
answer = QFileDialog.getOpenFileName(parent, "Select GAMS Program (e.g. gams.exe on Windows)", start_dir)
if answer[0] == "": # Canceled (american-english), cancelled (british-english)
return
# Check that selected file at least starts with string 'gams'
_, selected_file = os.path.split(answer[0])
if not selected_file.lower().startswith("gams"):
msg = "Selected file <b>{0}</b> may not be a valid GAMS program".format(selected_file)
# noinspection PyCallByClass, PyArgumentList
QMessageBox.warning(parent, "Invalid GAMS Program", msg)
return
line_edit.setText(answer[0])
[docs]def select_julia_executable(parent, line_edit):
"""Opens file browser where user can select a Julia executable (i.e. julia.exe on Windows).
Used in SettingsWidget and KernelEditor.
Args:
parent (QWidget, optional): Parent widget for the file dialog and message boxes
line_edit (QLineEdit): Line edit where the selected path will be inserted
"""
# noinspection PyCallByClass, PyTypeChecker, PyArgumentList
answer = QFileDialog.getOpenFileName(parent, "Select Julia Executable (e.g. julia.exe on Windows)", home_dir())
if answer[0] == "": # Canceled (american-english), cancelled (british-english)
return
# Check that selected file at least starts with string 'julia'
_, selected_file = os.path.split(answer[0])
if not selected_file.lower().startswith("julia"):
msg = "Selected file <b>{0}</b> is not a valid Julia Executable".format(selected_file)
# noinspection PyCallByClass, PyArgumentList
QMessageBox.warning(parent, "Invalid Julia Executable", msg)
return
line_edit.setText(answer[0])
[docs]def select_julia_project(parent, line_edit):
"""Shows file browser and inserts selected julia project dir to give line_edit.
Used in SettingsWidget and KernelEditor.
Args:
parent (QWidget, optional): Parent of QFileDialog
line_edit (QLineEdit): Line edit where the selected path will be inserted
"""
answer = QFileDialog.getExistingDirectory(parent, "Select Julia project directory", home_dir())
if not answer: # Canceled (american-english), cancelled (british-english)
return
line_edit.setText(answer)
[docs]def select_python_interpreter(parent, line_edit):
"""Opens file browser where user can select a python interpreter (i.e. python.exe on Windows).
Used in SettingsWidget and KernelEditor.
Args:
parent (QWidget): Parent widget for the file dialog and message boxes
line_edit (QLineEdit): Line edit where the selected path will be inserted
"""
# noinspection PyCallByClass, PyTypeChecker, PyArgumentList
answer = QFileDialog.getOpenFileName(parent, "Select Python Interpreter (e.g. python.exe on Windows)", home_dir())
if answer[0] == "": # Canceled
return
# Check that selected file at least starts with string 'python'
_, selected_file = os.path.split(answer[0])
if not selected_file.lower().startswith("python"):
msg = "Selected file <b>{0}</b> is not a valid Python interpreter".format(selected_file)
# noinspection PyCallByClass, PyArgumentList
QMessageBox.warning(parent, "Invalid Python Interpreter", msg)
return
line_edit.setText(answer[0])
return
[docs]def select_conda_executable(parent, line_edit):
"""Opens file browser where user can select a conda executable.
Args:
parent (QWidget): Parent widget for the file dialog and message boxes
line_edit (QLineEdit): Line edit where the selected path will be inserted
"""
# noinspection PyCallByClass, PyTypeChecker, PyArgumentList
answer = QFileDialog.getOpenFileName(parent, "Select Conda Executable (e.g. conda.exe on Windows)", home_dir())
if answer[0] == "": # Canceled
return
# Check that selected file at least starts with string 'conda'
if not is_valid_conda_executable(answer[0]):
_, selected_file = os.path.split(answer[0])
msg = "Selected file <b>{0}</b> is not a valid Conda executable".format(selected_file)
# noinspection PyCallByClass, PyArgumentList
QMessageBox.warning(parent, "Invalid Conda selected", msg)
return
line_edit.setText(answer[0])
[docs]def is_valid_conda_executable(p):
"""Checks that given path points to an existing file and the file name starts with 'conda'.
Args:
p (str): Absolute path to a file
"""
if not os.path.isfile(p):
return False
_, filename = os.path.split(p)
if not filename.lower().startswith("conda"):
return False
return True
[docs]def select_certificate_directory(parent, line_edit):
"""Shows file browser and inserts selected certificate directory to given line edit.
Args:
parent (QWidget, optional): Parent of QFileDialog
line_edit (QLineEdit): Line edit where the selected dir path will be inserted
"""
answer = QFileDialog.getExistingDirectory(parent, "Select certificates directory", home_dir())
if not answer:
return
line_edit.setText(answer)
[docs]def file_is_valid(parent, file_path, msgbox_title, extra_check=None):
"""Checks that given path is not a directory and it's a file that actually exists.
In addition, can be used to check if the file name in given file path starts with
the given extra_check string. Needed in SettingsWidget and KernelEditor because
the QLineEdits are editable. Returns True when file_path is an empty string so that
we can use default values (e.g. from line edit place holder text). Returns also True
when file_path is just 'python' or 'julia' so that user's can use the python or julia
in PATH.
Args:
parent (QWidget): Parent widget for the message boxes
file_path (str): Path to check
msgbox_title (str): Title for message boxes
extra_check (str, optional): String that must match the file name of the given file_path (without extension)
Returns:
bool: True if given path is an empty string or if path is valid, False otherwise
"""
if file_path == "":
return True
if file_path.lower() in ("python", "python3", "julia"):
return True
if os.path.isdir(file_path):
msg = "Please select a file and not a directory"
# noinspection PyCallByClass, PyArgumentList
QMessageBox.warning(parent, msgbox_title, msg)
return False
if not os.path.exists(file_path):
msg = f"File {file_path} does not exist"
# noinspection PyCallByClass, PyArgumentList
QMessageBox.warning(parent, msgbox_title, msg)
return False
if extra_check is not None:
# Check that file name in given file path starts with the extra_check string (e.g. 'python' or 'julia')
_, file_name = os.path.split(file_path)
if not file_name.lower().startswith(extra_check):
msg = f"Selected file <b>{file_name}</b> is not a valid {extra_check} file"
# noinspection PyCallByClass, PyArgumentList
QMessageBox.warning(parent, msgbox_title, msg)
return False
return True
[docs]def dir_is_valid(parent, dir_path, msgbox_title):
"""Checks that given path is a directory. Needed in
SettingsWdiget and KernelEditor because the QLineEdits
are editable. Returns True when dir_path is an empty string so that
we can use default values (e.g. from line edit place holder text)
Args:
parent (QWidget): Parent widget for the message box
dir_path (str): Directory path to check
msgbox_title (str): Message box title
Returns:
bool: True if given path is an empty string or if path is an existing directory, False otherwise
"""
if dir_path == "":
return True
if not os.path.isdir(dir_path):
msg = "Please select a valid directory"
# noinspection PyCallByClass, PyArgumentList
QMessageBox.warning(parent, msgbox_title, msg)
return False
return True
[docs]class QuietLogger:
[docs] def __getattr__(self, _):
return self
[docs] def __call__(self, *args, **kwargs):
pass
[docs]def make_settings_dict_for_engine(app_settings):
"""Converts Toolbox settings to a dictionary acceptable by Engine.
Args:
app_settings (QSettings): Toolbox settings
Returns:
dict: Engine-compatible settings
"""
def dump_group(group):
app_settings.beginGroup(group)
for key in app_settings.childKeys():
value = app_settings.value(key)
try:
json.dumps(value)
except (TypeError, json.decoder.JSONDecodeError):
continue
settings[f"{group}/{key}"] = value
app_settings.endGroup()
settings = {}
dump_group("appSettings")
dump_group("engineSettings")
if "appSettings/workDir" not in settings:
# Headless mode may execute on a system where we don't have any Toolbox settings available.
# Make sure we set a sane work directory for Tools, at least.
settings["appSettings/workDir"] = DEFAULT_WORK_DIR
return settings
[docs]def make_icon_background(color):
color0 = color.name()
color1 = color.lighter(140).name()
return f"qlineargradient(x1: 1, y1: 1, x2: 0, y2: 0, stop: 0 {color0}, stop: 1 {color1});"
[docs]def color_from_index(i, count, base_hue=0.0, saturation=1.0, value=1.0):
golden_ratio = 0.618033988749895
h = golden_ratio * (360 / count) * i
h = ((base_hue + h) % 360) / 360
return QColor.fromHsvF(h, saturation, value, 1.0)
[docs]def unique_name(prefix, existing):
"""
Creates a unique name in the form `prefix (xx)` where xx is a counter value.
When `prefix` already contains a counter `(xx)`, the value `xx` is updated.
Args:
prefix (str): name prefix
existing (Iterable of str): existing names
Returns:
str: unique name
"""
reserved = set()
# check if `prefix` is already a duplicate and adjust if needed
match = re.fullmatch(r"^(.*) \(([0-9]+)\)$", prefix)
if match:
prefix = match[1]
reserved.add(int(match[2]))
pattern = re.compile(fr"^{prefix} \(([0-9]+)\)$")
for name in existing:
match = pattern.fullmatch(name)
if match:
reserved.add(int(match[1]))
free = len(reserved) + 1
for i in range(1, len(reserved) + 1):
if i not in reserved:
free = i
break
return f"{prefix} ({free})"
[docs]def parse_specification_file(spec_path, logger):
"""Parses specification file.
Args:
spec_path (str): path to specification file
logger (LoggerInterface): a logger
Returns:
dict: specification dict or None if the operation failed
"""
try:
with open(spec_path, "r") as fp:
try:
return json.load(fp)
except ValueError:
logger.msg_error.emit("Item specification file not valid")
return None
except FileNotFoundError:
logger.msg_error.emit(f"Specification file <b>{spec_path}</b> does not exist")
return None
except OSError:
logger.msg_error.emit(f"Specification file <b>{spec_path}</b> not found")
return None
[docs]def load_specification_from_file(spec_path, local_data_dict, spec_factories, app_settings, logger):
"""Returns an Item specification from a definition file.
Args:
spec_path (str): Path of the specification definition file
local_data_dict (dict): specifications local data dict
spec_factories (dict): Dictionary mapping specification type to ProjectItemSpecificationFactory
app_settings (QSettings): Toolbox settings
logger (LoggerInterface): a logger
Returns:
ProjectItemSpecification: item specification or None if reading the file failed
"""
spec_dict = parse_specification_file(spec_path, logger)
if spec_dict is None:
return None
spec_dict["definition_file_path"] = spec_path
spec = specification_from_dict(spec_dict, local_data_dict, spec_factories, app_settings, logger)
if spec is not None:
spec.definition_file_path = spec_path
return spec
[docs]def specification_from_dict(spec_dict, local_data_dict, spec_factories, app_settings, logger):
"""Returns item specification from a dictionary.
Args:
spec_dict (dict): Dictionary with the specification
local_data_dict (dict): specifications local data
spec_factories (dict): Dictionary mapping specification name to ProjectItemSpecificationFactory
app_settings (QSettings): Toolbox settings
logger (LoggerInterface): a logger
Returns:
ProjectItemSpecification or NoneType: specification or None if factory isn't found.
"""
# NOTE: If the spec doesn't have the "item_type" key, we can assume it's a tool spec
item_type = spec_dict.get("item_type", "Tool")
local_data = local_data_dict.get(item_type, {}).get(spec_dict["name"])
if local_data is not None:
merge_dicts(local_data, spec_dict)
spec_factory = spec_factories.get(item_type)
if spec_factory is None:
return None
return spec_factory.make_specification(spec_dict, app_settings, logger)
[docs]def plugins_dirs(app_settings):
"""Loads plugins.
Args:
app_settings (QSettings): Toolbox settings
Returns:
list of str: plugin directories
"""
search_paths = {PLUGINS_PATH}
search_paths |= set(app_settings.value("appSettings/pluginSearchPaths", defaultValue="").split(";"))
# Plugin dirs are top-level dirs in all search paths
plugin_dirs = []
for path in search_paths:
try:
top_level_items = [os.path.join(path, item) for item in os.listdir(path)]
except FileNotFoundError:
continue
plugin_dirs += [item for item in top_level_items if os.path.isdir(item)]
return plugin_dirs
[docs]def load_plugin_dict(plugin_dir, logger):
"""Loads plugin dict from plugin directory.
Args:
plugin_dir (str): path of plugin dir with "plugin.json" in it
logger (LoggerInterface): a logger
Returns:
dict: plugin dict or None if the operation failed
"""
plugin_file = os.path.join(plugin_dir, "plugin.json")
if not os.path.isfile(plugin_file):
return None
with open(plugin_file, "r") as fh:
try:
plugin_dict = json.load(fh)
except json.decoder.JSONDecodeError:
logger.msg_error.emit(f"Error in plugin file <b>{plugin_file}</b>. Invalid JSON.")
return None
try:
plugin_dict["plugin_dir"] = plugin_dir
except KeyError as key:
logger.msg_error.emit(f"Error in plugin file <b>{plugin_file}</b>. Key '{key}' not found.")
return None
return plugin_dict
[docs]def load_plugin_specifications(plugin_dict, local_data_dict, spec_factories, app_settings, logger):
"""Loads plugin's specifications.
Args:
plugin_dict (dict): plugin dict
local_data_dict (dict): specifications local data dictionary
spec_factories (dict): Dictionary mapping specification name to ProjectItemSpecificationFactory
app_settings (QSettings): Toolbox settings
logger (LoggerInterface): a logger
Returns:
dict: mapping from plugin name to list of specifications or None if the operation failed
"""
plugin_dir = plugin_dict["plugin_dir"]
try:
name = plugin_dict["name"]
specifications = plugin_dict["specifications"]
except KeyError as key:
logger.msg_error.emit(f"Error in plugin file <b>{plugin_dir}</b>. Key '{key}' not found.")
return None
deserialized_paths = [deserialize_path(path, plugin_dir) for paths in specifications.values() for path in paths]
plugin_specs = []
for path in deserialized_paths:
spec = load_specification_from_file(path, local_data_dict, spec_factories, app_settings, logger)
if not spec:
continue
spec.plugin = name
plugin_specs.append(spec)
return {name: plugin_specs}
[docs]def load_specification_local_data(config_dir):
"""Loads specifications' project-specific data.
Args:
config_dir (str or Path): project config dir
Returns:
dict: specifications local data
"""
local_data_path = pathlib.Path(config_dir, PROJECT_LOCAL_DATA_DIR_NAME, SPECIFICATION_LOCAL_DATA_FILENAME)
if not local_data_path.exists():
return {}
with open(local_data_path) as data_file:
return json.load(data_file)
[docs]DB_ITEM_SEPARATOR = " \u01C0 "
"""Display string to separate items such as entity names."""
[docs]def parameter_identifier(database, parameter, names, alternative):
"""Concatenates given information into parameter value identifier string.
Args:
database (str, optional): database's code name
parameter (str): parameter's name
names (list of str): name of the entity or class that holds the value
alternative (str or NoneType): name of the value's alternative
"""
parts = [database] if database is not None else []
parts += [parameter]
if alternative is not None:
parts += [alternative]
parts += [DB_ITEM_SEPARATOR.join(names)]
return " - ".join(parts)
@contextmanager
[docs]def disconnect(signal, *slots):
"""Disconnects signal for the duration of a 'with' block.
Args:
signal (Signal): signal to disconnect
*slots: slots to disconnect from
"""
for slot in slots:
signal.disconnect(slot)
try:
yield
finally:
for slot in slots:
signal.connect(slot)
[docs]class SignalWaiter(QObject):
"""A 'traffic light' that allows waiting for a signal to be emitted in another thread."""
def __init__(self, condition=None, timeout=None):
"""
Args:
condition (function, optional): receiving the self.args and returning whether to stop waiting.
timeout (float, optional): timeout in seconds; wait will raise after timeout
"""
super().__init__()
self._triggered = False
self.args = ()
self._condition = condition
self._timeout = timeout
self._start = time.monotonic() if self._timeout is not None else None
[docs] def trigger(self, *args):
"""Signal receiving slot."""
if self._triggered:
return
self._triggered = True if self._condition is None else self._condition(*args)
self.args = args
[docs] def wait(self):
"""Wait for signal to be received."""
while not self._triggered:
QApplication.processEvents()
if self._timeout is not None and time.monotonic() - self._start > self._timeout:
raise RuntimeError("timeout exceeded")
@contextmanager
[docs]def signal_waiter(signal, condition=None, timeout=None):
"""Gives a context manager that waits for the emission of given Qt signal.
Args:
signal (Any): signal to wait
condition (Callable, optional): a callable that takes the signal's parameters and returns True to stop waiting
timeout (float, optional): timeout in seconds; if None, wait indefinitely
Yields:
SignalWaiter: waiter instance
"""
waiter = SignalWaiter(condition=condition, timeout=timeout)
signal.connect(waiter.trigger)
try:
yield waiter
finally:
signal.disconnect(waiter.trigger)
waiter.deleteLater()
[docs]class CustomSyntaxHighlighter(QSyntaxHighlighter):
def __init__(self, *arg, **kwargs):
super().__init__(*arg, **kwargs)
self.lexer = None
self._formats = {}
@property
[docs] def set_style(self, style):
self._formats.clear()
for ttype, tstyle in style:
text_format = self._formats[ttype] = QTextCharFormat()
if tstyle["color"]:
brush = QBrush(QColor("#" + tstyle["color"]))
text_format.setForeground(brush)
if tstyle["bgcolor"]:
brush = QBrush(QColor("#" + tstyle["bgcolor"]))
text_format.setBackground(brush)
if tstyle["bold"]:
text_format.setFontWeight(QFont.Bold)
if tstyle["italic"]:
text_format.setFontItalic(True)
if tstyle["underline"]:
text_format.setFontUnderline(True)
[docs] def highlightBlock(self, text):
for start, count, text_format in self.yield_formats(text):
self.setFormat(start, count, text_format)
[docs]def inquire_index_name(model, column, title, parent_widget):
"""Asks for indexed parameter's index name and updates model accordingly.
Args:
model (IndexedValueTableModel or ArrayModel): a model with header that contains index names
column (int): column index
title (str): input dialog's title
parent_widget (QWidget): dialog's parent widget
"""
index_name = model.headerData(column, Qt.Orientation.Horizontal)
dialog_flags = Qt.Dialog | Qt.CustomizeWindowHint | Qt.WindowTitleHint | Qt.WindowCloseButtonHint
new_name, ok = QInputDialog.getText(parent_widget, title, "Index name:", text=index_name, flags=dialog_flags)
if not ok:
return
model.setHeaderData(column, Qt.Orientation.Horizontal, new_name)
[docs]def preferred_row_height(widget, factor=1.5):
return factor * widget.fontMetrics().lineSpacing()
[docs]def restore_ui(window, app_settings, settings_group):
"""Restores UI state from previous session.
Args:
window (QMainWindow)
app_settings (QSettings)
settings_group (str)
"""
app_settings.beginGroup(settings_group)
window_size = app_settings.value("windowSize")
window_pos = app_settings.value("windowPosition")
window_state = app_settings.value("windowState")
window_maximized = app_settings.value("windowMaximized", defaultValue="false")
n_screens = app_settings.value("n_screens", defaultValue=1)
splitter_states = {
splitter: app_settings.value(splitter.objectName() + "State") for splitter in window.findChildren(QSplitter)
}
app_settings.endGroup()
original_size = window.size()
if window_size:
window.resize(window_size)
if window_pos:
window.move(window_pos)
if window_state:
window.restoreState(window_state, version=1) # Toolbar and dockWidget positions
# noinspection PyArgumentList
if len(QGuiApplication.screens()) < int(n_screens):
# There are less screens available now than on previous application startup
window.move(0, 0) # Move this widget to primary screen position (0,0)
for splitter, state in splitter_states.items():
splitter.restoreState(state)
ensure_window_is_on_screen(window, original_size)
if window_maximized == "true":
window.setWindowState(Qt.WindowMaximized)
[docs]def save_ui(window, app_settings, settings_group):
"""Saves UI state for next session.
Args:
window (QMainWindow)
app_settings (QSettings)
settings_group (str)
"""
app_settings.beginGroup(settings_group)
app_settings.setValue("windowSize", window.size())
app_settings.setValue("windowPosition", window.pos())
app_settings.setValue("windowState", window.saveState(version=1))
app_settings.setValue("windowMaximized", window.windowState() == Qt.WindowMaximized)
app_settings.setValue("n_screens", len(QGuiApplication.screens()))
for splitter in window.findChildren(QSplitter):
app_settings.setValue(splitter.objectName() + "State", splitter.saveState())
app_settings.endGroup()
[docs]def bisect_chunks(current_data, new_data, key=None):
"""Finds insertion points for chunks of data using binary search.
Args:
current_data (list): sorted list where to insert new data
new_data (list): data to insert
key (Callable, optional): sort key
Returns:
tuple: sorted chunk of new data, insertion position
"""
if key is not None:
current_data = [key(x) for x in current_data]
else:
key = lambda x: x
new_data = sorted(new_data, key=key)
if not new_data:
return ()
item = new_data[0]
chunk = [item]
lo = bisect.bisect_left(current_data, key(item))
for item in new_data[1:]:
row = bisect.bisect_left(current_data, key(item), lo=lo)
if row == lo:
chunk.append(item)
continue
yield chunk, lo
count = len(chunk)
chunk = [item]
lo = row + count
yield chunk, lo
[docs]def load_project_dict(project_config_dir, logger):
"""Loads project dictionary from project directory.
Args:
project_config_dir (str): project's .spinetoolbox directory
logger (LoggerInterface): a logger
Returns:
dict: project dictionary
"""
load_path = os.path.abspath(os.path.join(project_config_dir, PROJECT_FILENAME))
try:
with open(load_path, "r") as fh:
try:
project_dict = json.load(fh)
except json.decoder.JSONDecodeError:
logger.msg_error.emit(f"Error in project file <b>{load_path}</b>. Invalid JSON.")
return None
except OSError:
logger.msg_error.emit(f"Project file <b>{load_path}</b> missing")
return None
return project_dict
[docs]def load_local_project_data(project_config_dir, logger):
"""Loads local project data.
Args:
project_config_dir (Path or str): project's .spinetoolbox directory
logger (LoggerInterface): a logger
Returns:
dict: project's local data
"""
load_path = pathlib.Path(project_config_dir, PROJECT_LOCAL_DATA_DIR_NAME, PROJECT_LOCAL_DATA_FILENAME)
if not load_path.exists():
return {}
with load_path.open() as fh:
try:
local_data_dict = json.load(fh)
except json.decoder.JSONDecodeError:
logger.msg_error.emit(f"Error in project's local data file <b>{load_path}</b>. Invalid JSON.")
return {}
return local_data_dict
[docs]def merge_dicts(source, target):
"""Merges two dictionaries that may contain nested dictionaries recursively.
Args:
source (dict): dictionary that will be merged to ``target``
target (dict): target dictionary
"""
for key, value in source.items():
target_entry = target.get(key)
if isinstance(value, dict) and target_entry is not None:
merge_dicts(value, target_entry)
else:
target[key] = value
[docs]def fix_lightness_color(color, lightness=240):
h, s, _, a = color.getHsl()
return QColor.fromHsl(h, s, lightness, a)
@contextmanager
[docs]class HTMLTagFilter(HTMLParser):
"""HTML tag filter."""
def __init__(self):
super().__init__()
self._text = ""
[docs] def drain(self):
text = self._text
self._text = ""
return text
[docs] def handle_data(self, data):
self._text += data
[docs] def handle_starttag(self, tag, attrs):
if tag == "br":
self._text += "\n"
[docs]def same_path(path1, path2):
"""Checks if two paths are equal.
This is a lightweight version of os.path.samefile(): it doesn't check if the paths
point to the same file system object but rather takes into account file system
case-sensitivity and such.
Args:
path1 (str): a path
path2 (str): a path
Returns:
bool: True if paths point to the same
"""
return os.path.normcase(path1) == os.path.normcase(path2)
[docs]def solve_connection_file(connection_file, connection_file_dict):
"""Returns the connection_file path, if it exists on this computer. If the path
doesn't exist, assume that it points to a path on another computer, in which
case store the contents of connection_file_dict into a tempfile.
Args:
connection_file (str): Path to a connection file
connection_file_dict (dict) Contents of a connection file
Returns:
str: Path to a connection file on this computer.
"""
if not os.path.exists(connection_file):
fp = tempfile.TemporaryFile(mode="w+", suffix=".json", delete=False)
json.dump(connection_file_dict, fp)
connection_file = fp.name
fp.close()
return connection_file
return connection_file
[docs]def remove_first(lst, items):
for x in items:
try:
lst.remove(x)
break
except ValueError:
pass
[docs]class SealCommand(QUndoCommand):
"""A 'meta' command that does not store undo data but can be used in mergeWith methods of other commands."""
def __init__(self, command_id=1):
"""
Args:
command_id (int): command id
"""
super().__init__("")
self._id = command_id
[docs] def redo(self):
self.setObsolete(True)
[docs] def id(self):
return self._id
[docs]def plain_to_rich(text):
"""Turns plain strings into rich text.
Args:
text (str): string to convert
Returns:
str: rich text string
"""
return "<qt>" + text + "</qt>"
[docs]def list_to_rich_text(data):
"""Turns a sequence of strings into rich text list.
Args:
data (Sequence of str): iterable to convert
Returns:
str: rich text string
"""
return plain_to_rich("<br>".join(data))