######################################################################################################################
# Copyright (C) 2017-2020 Spine project consortium
# This file is part of Spine Toolbox.
# Spine Toolbox is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option)
# any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
# Public License for more details. You should have received a copy of the GNU Lesser General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
######################################################################################################################
"""
Contains the GraphViewMixin class.
:author: M. Marin (KTH)
:date: 26.11.2018
"""
import enum
import numpy as np
from numpy import atleast_1d as arr
from scipy.sparse.csgraph import dijkstra
from PySide2.QtCore import Signal, Slot, QObject, QThread, Qt
from PySide2.QtWidgets import QProgressBar, QDialogButtonBox, QLabel, QWidget, QVBoxLayout, QHBoxLayout
from PySide2.QtGui import QPainter, QColor
from spinetoolbox.helpers import busy_effect
@busy_effect
[docs]def make_heat_map(x, y, values):
values = np.array(values)
min_x, min_y, max_x, max_y = min(x), min(y), max(x), max(y)
tick_count = round(len(values) ** 2)
xticks = np.linspace(min_x, max_x, tick_count)
yticks = np.linspace(min_y, max_y, tick_count)
xv, yv = np.meshgrid(xticks, yticks)
try:
import scipy.interpolate
points = np.column_stack((x, y))
heat_map = scipy.interpolate.griddata(points, values, (xv, yv), method="cubic")
except ImportError:
import matplotlib.tri as tri
triang = tri.Triangulation(x, y)
interpolator = tri.CubicTriInterpolator(triang, values)
heat_map = interpolator(xv, yv)
return heat_map, xv, yv, min_x, min_y, max_x, max_y
[docs]class _State(enum.Enum):
"""State of GraphLayoutGenerator."""
[docs] CANCELLED = enum.auto()
[docs]class GraphLayoutGenerator(QObject):
"""Computes the layout for the Entity Graph View."""
[docs] finished = Signal(object, object)
[docs] progressed = Signal(int)
def __init__(self, vertex_count, src_inds, dst_inds, spread, heavy_positions=None, iterations=10, weight_exp=-2):
super().__init__()
if vertex_count == 0:
vertex_count = 1
if heavy_positions is None:
heavy_positions = dict()
self.vertex_count = vertex_count
self.src_inds = src_inds
self.dst_inds = dst_inds
self.spread = spread
self.heavy_positions = heavy_positions
self.iterations = max(3, round(iterations * (1 - len(heavy_positions) / self.vertex_count)))
self.weight_exp = weight_exp
self.initial_diameter = (self.vertex_count ** (0.5)) * self.spread
self._state = _State.ACTIVE
self._thread = QThread()
self.moveToThread(self._thread)
self._thread.start()
qApp.aboutToQuit.connect(self._thread.quit) # pylint: disable=undefined-variable
self.started.connect(self.get_coordinates)
self.finished.connect(self.clean_up)
[docs] def clean_up(self):
self._thread.quit()
self.done.emit()
[docs] def stop(self):
self._state = _State.STOPPED
[docs] def cancel(self, checked=False):
self._state = _State.CANCELLED
[docs] def start(self):
self.started.emit()
[docs] def shortest_path_matrix(self):
"""Returns the shortest-path matrix.
"""
if not self.src_inds:
# Introduce fake pair of links to help 'spreadness'
self.src_inds = [self.vertex_count, self.vertex_count]
self.dst_inds = [np.random.randint(0, self.vertex_count), np.random.randint(0, self.vertex_count)]
self.vertex_count += 1
dist = np.zeros((self.vertex_count, self.vertex_count))
src_inds = arr(self.src_inds)
dst_inds = arr(self.dst_inds)
try:
dist[src_inds, dst_inds] = dist[dst_inds, src_inds] = self.spread
except IndexError:
pass
matrix = dijkstra(dist, directed=False)
# Remove infinites and zeros
matrix[matrix == np.inf] = self.spread * self.vertex_count ** (0.5)
matrix[matrix == 0] = self.spread * 1e-6
return matrix
[docs] def sets(self):
"""Returns sets of vertex pairs indices.
"""
sets = []
for n in range(1, self.vertex_count):
pairs = np.zeros((self.vertex_count - n, 2), int) # pairs on diagonal n
pairs[:, 0] = np.arange(self.vertex_count - n)
pairs[:, 1] = pairs[:, 0] + n
mask = np.mod(range(self.vertex_count - n), 2 * n) < n
s1 = pairs[mask]
s2 = pairs[~mask]
if s1.any():
sets.append(s1)
if s2.any():
sets.append(s2)
return sets
[docs] def emit_finished(self, x, y):
if self._state == _State.STOPPED:
self.clean_up()
return
self.finished.emit(x, y)
@Slot()
[docs] def get_coordinates(self):
"""Computes and returns x and y coordinates for each vertex in the graph, using VSGD-MS."""
if self.vertex_count <= 1:
layout = np.zeros((self.vertex_count, 2))
x, y = [0], [0]
self.emit_finished(x, y)
return x, y
matrix = self.shortest_path_matrix()
mask = np.ones((self.vertex_count, self.vertex_count)) == 1 - np.tril(
np.ones((self.vertex_count, self.vertex_count))
) # Upper triangular except diagonal
np.random.seed(0)
layout = np.random.rand(self.vertex_count, 2) * self.initial_diameter - self.initial_diameter / 2
heavy_ind_list = list()
heavy_pos_list = list()
for ind, pos in self.heavy_positions.items():
heavy_ind_list.append(ind)
heavy_pos_list.append([pos["x"], pos["y"]])
heavy_ind = arr(heavy_ind_list)
heavy_pos = arr(heavy_pos_list)
if heavy_ind.any():
layout[heavy_ind, :] = heavy_pos
weights = matrix ** self.weight_exp # bus-pair weights (lower for distant buses)
maxstep = 1 / np.min(weights[mask])
minstep = 1 / np.max(weights[mask])
lambda_ = np.log(minstep / maxstep) / (self.iterations - 1) # exponential decay of allowed adjustment
sets = self.sets() # construct sets of bus pairs
for iteration in range(self.iterations):
self.progressed.emit(iteration)
step = maxstep * np.exp(lambda_ * iteration) # how big adjustments are allowed?
rand_order = np.random.permutation(
self.vertex_count
) # we don't want to use the same pair order each iteration
for s in sets:
if self._state in (_State.STOPPED, _State.CANCELLED):
break
v1, v2 = rand_order[s[:, 0]], rand_order[s[:, 1]] # arrays of vertex1 and vertex2
# current distance (possibly accounting for system rescaling)
dist = ((layout[v1, 0] - layout[v2, 0]) ** 2 + (layout[v1, 1] - layout[v2, 1]) ** 2) ** 0.5
r = (matrix[v1, v2] - dist)[:, None] * (layout[v1] - layout[v2]) / dist[:, None] / 2 # desired change
dx1 = r * np.minimum(1, weights[v1, v2] * step)[:, None]
dx2 = -dx1
layout[v1, :] += dx1 # update position
layout[v2, :] += dx2
if heavy_ind.any():
layout[heavy_ind, :] = heavy_pos
else: # nobreak
continue
break
x, y = layout[:, 0], layout[:, 1]
self.emit_finished(x, y)
return x, y