######################################################################################################################
# Copyright (C) 2017-2022 Spine project consortium
# Copyright Spine Toolbox contributors
# This file is part of Spine Toolbox.
# Spine Toolbox is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option)
# any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
# Public License for more details. You should have received a copy of the GNU Lesser General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
######################################################################################################################
"""Contains logging connection and jump classes."""
from spinedb_api.filters.scenario_filter import SCENARIO_FILTER_TYPE
from spinedb_api.filters.alternative_filter import ALTERNATIVE_FILTER_TYPE
from spinedb_api import DatabaseMapping, SpineDBAPIError, SpineDBVersionError
from spine_engine.project_item.connection import ResourceConvertingConnection, Jump, ConnectionBase, FilterSettings
from ..log_mixin import LogMixin
from ..mvcmodels.resource_filter_model import ResourceFilterModel
from ..helpers import busy_effect
from ..fetch_parent import FlexibleFetchParent
[docs]_DATABASE_ITEM_TYPE = {ALTERNATIVE_FILTER_TYPE: "alternative", SCENARIO_FILTER_TYPE: "scenario"}
[docs]class HeadlessConnection(ResourceConvertingConnection):
"""A project item connection that is compatible with headless mode."""
def __init__(
self,
source_name,
source_position,
destination_name,
destination_position,
options=None,
filter_settings=None,
legacy_resource_filter_ids=None,
):
super().__init__(source_name, source_position, destination_name, destination_position, options, filter_settings)
self._legacy_resource_filter_ids = legacy_resource_filter_ids
@property
[docs] def database_resources(self):
"""Connection's database resources"""
return self._resources
[docs] def set_filter_enabled(self, resource_label, filter_type, filter_name, enabled):
"""Enables or disables a filter.
Args:
resource_label (str): database resource name
filter_type (str): filter type
filter_name (str): filter name
enabled (bool): True to enable the filter, False to disable it
"""
specific_filter_settings = self._filter_settings.known_filters.setdefault(resource_label, {}).setdefault(
filter_type, {}
)
specific_filter_settings[filter_name] = enabled
[docs] def set_filter_type_enabled(self, filter_type, enabled):
"""Enables or disables a filter type.
Args:
filter_type (str): filter type
enabled (bool): True to enable the filter type, False to disable it
"""
self._filter_settings.enabled_filter_types[filter_type] = enabled
[docs] def _convert_legacy_resource_filter_ids_to_filter_settings(self):
"""Converts legacy resource filter ids to filter settings.
This method should be called once after constructing the connection from potentially legacy dict
using ``from_dict()``.
"""
for resource in self._resources:
resource_filter_ids = self._legacy_resource_filter_ids.get(resource.label)
if resource_filter_ids is None:
continue
url = resource.url
if not url:
continue
try:
db_map = DatabaseMapping(url)
except (SpineDBAPIError, SpineDBVersionError):
continue
try:
scenario_filter_ids = resource_filter_ids.get(SCENARIO_FILTER_TYPE)
if scenario_filter_ids is not None:
specific_filter_settings = self._filter_settings.known_filters.setdefault(
resource.label, {}
).setdefault(SCENARIO_FILTER_TYPE, {})
for row in db_map.query(db_map.scenario_sq):
specific_filter_settings[row.name]: row.id = row.id in scenario_filter_ids
finally:
db_map.close()
self._legacy_resource_filter_ids = None
@staticmethod
[docs] def _constructor_args_from_dict(connection_dict):
"""See base class."""
kw_args = ResourceConvertingConnection._constructor_args_from_dict(connection_dict)
resource_filters = connection_dict.get("resource_filters")
if resource_filters is not None:
# Legacy, for backwards compatibility. Resource filters have been superseded by disabled_filters.
kw_args["legacy_resource_filter_ids"] = resource_filters
return kw_args
@classmethod
[docs] def from_dict(cls, connection_dict, **kwargs):
"""Deserializes a connection from dict.
Args:
connection_dict (dict): serialized LoggingConnection
**kwargs: additional keyword arguments to be forwarded to class constructor
"""
kw_args_from_dict = cls._constructor_args_from_dict(connection_dict)
return cls(**kw_args_from_dict, **kwargs)
[docs] def receive_resources_from_source(self, resources):
"""See base class."""
super().receive_resources_from_source(resources)
if self._legacy_resource_filter_ids is not None:
self._convert_legacy_resource_filter_ids_to_filter_settings()
[docs] def replace_resources_from_source(self, old, new):
"""Replaces existing resources by new ones.
Args:
old (list of ProjectItemResource): old resources
new (list of ProjectItemResource): new resources
"""
for old_resource, new_resource in zip(old, new):
self._resources.discard(old_resource)
old_filters = self._filter_settings.known_filters.pop(old_resource.label, None)
if new_resource.type_ == "database":
self._resources.add(new_resource)
if old_filters is not None:
self._filter_settings.known_filters[new_resource.label] = old_filters
[docs]class LoggingConnection(LogMixin, HeadlessConnection):
def __init__(self, *args, toolbox, **kwargs):
super().__init__(*args, **kwargs)
self._toolbox = toolbox
self.resource_filter_model = ResourceFilterModel(self, toolbox.project(), toolbox.undo_stack, toolbox)
self.link = None
self._source_item_type = self._toolbox.project().get_item(self.source).item_type()
self._destination_item_type = self._toolbox.project().get_item(self.destination).item_type()
self._db_maps = {}
self._fetch_parents = {}
[docs] def __hash__(self):
return super(ConnectionBase, self).__hash__()
@staticmethod
[docs] def item_type():
return "connection"
@property
[docs] def graphics_item(self):
return self.link
[docs] def has_filters(self):
"""Returns True if connection has any filters.
Returns:
bool: True if connection has filters, False otherwise
"""
for resource in self._resources:
url = resource.url
if not url:
continue
db_map = self._get_db_map(url, ignore_version_error=True)
if db_map is None:
continue
known_filters = self._filter_settings.known_filters.get(resource.label, {})
for filter_type, item_type in _DATABASE_ITEM_TYPE.items():
available = {x["name"] for x in self._toolbox.db_mngr.get_items(db_map, item_type)}
filters = known_filters.get(filter_type, {})
if any(enabled for s, enabled in filters.items() if s in available):
return True
if self._filter_settings.auto_online and any(name not in filters for name in available):
return True
return False
[docs] def _get_db_map(self, url, ignore_version_error=False):
if url not in self._db_maps:
db_map = self._toolbox.db_mngr.get_db_map(url, self._toolbox, ignore_version_error=ignore_version_error)
if db_map is None:
return None
self._db_maps[url] = db_map
self._toolbox.db_mngr.register_listener(self, db_map)
self._fetch_more_if_possible()
return self._db_maps[url]
[docs] def _pop_unused_db_maps(self):
"""Removes unused database maps and unregisters from listening the DB manager."""
resource_urls = {resource.url for resource in self._resources}
resource_urls.discard(None)
obsolete_urls = set(self._db_maps) - resource_urls
for url in obsolete_urls:
db_map = self._db_maps.pop(url)
self._fetch_parents.pop(db_map)
self._toolbox.db_mngr.unregister_listener(self, db_map)
[docs] def _make_fetch_parent(self, db_map, item_type):
fetch_parents = self._fetch_parents.setdefault(db_map, {})
if item_type not in fetch_parents:
fetch_parents[item_type] = self._fetch_parents.setdefault(db_map, {}).setdefault(
item_type,
FlexibleFetchParent(
item_type,
handle_items_added=lambda _: self._receive_data_changed(),
handle_items_removed=lambda _: self._receive_data_changed(),
handle_items_updated=lambda _: self._receive_data_changed(),
owner=self.resource_filter_model,
),
)
return fetch_parents[item_type]
[docs] def _fetch_more_if_possible(self):
for db_map in self._db_maps.values():
for item_type in ("scenario",):
fetch_parent = self._make_fetch_parent(db_map, item_type)
if self._toolbox.db_mngr.can_fetch_more(db_map, fetch_parent):
self._toolbox.db_mngr.fetch_more(db_map, fetch_parent)
[docs] def _receive_data_changed(self):
self.link.update_icons()
self.refresh_resource_filter_model()
self._fetch_more_if_possible()
[docs] def receive_session_committed(self, db_maps, cookie):
self._receive_data_changed()
[docs] def receive_session_rolled_back(self, db_map):
self._receive_data_changed()
[docs] def receive_error_msg(self, _db_map_error_log):
pass
[docs] def get_filter_item_names(self, filter_type, url):
db_map = self._get_db_map(url)
if db_map is None:
return []
item_type = _DATABASE_ITEM_TYPE[filter_type]
return sorted(x["name"] for x in self._toolbox.db_mngr.get_items(db_map, item_type))
[docs] def _do_purge_before_writing(self, resources):
purged_urls = super()._do_purge_before_writing(resources)
committed_db_maps = set()
for url in purged_urls:
db_map = self._toolbox.db_mngr.db_map(url)
if db_map:
committed_db_maps.add(db_map)
if committed_db_maps:
self._toolbox.db_mngr.notify_session_committed(self, *committed_db_maps)
return purged_urls
[docs] def may_have_filters(self):
"""Returns whether this connection may have filters.
Returns:
bool: True if it is possible for the connection to have filters, False otherwise
"""
return bool(self._resources)
[docs] def may_have_write_index(self):
"""Returns whether this connection may have write index.
Returns:
bool: True if it is possible for the connection to have write index, False otherwise
"""
return self._destination_item_type == "Data Store"
[docs] def may_use_memory_db(self):
"""Returns whether this connection may use memory DB.
Returns:
bool: True if it is possible for the connection to use memory DB, False otherwise
"""
return {"Tool", "Data Store"} == {self._source_item_type, self._destination_item_type}
[docs] def may_use_datapackage(self):
"""Returns whether this connection may use datapackage.
Returns:
bool: True if it is possible for the connection to use datapackage, False otherwise
"""
return self._source_item_type in {"Exporter", "Data Connection", "Tool"}
[docs] def may_purge_before_writing(self):
"""Returns whether this connection may purge before writing.
Returns:
bool: True if it is possible for the connection to purge before writing, False otherwise
"""
return self._destination_item_type == "Data Store"
[docs] def online_filters(self, resource_label, filter_type):
"""Returns filter online states for given resource and filter type.
Args:
resource_label (str): resource label
filter_type (str): filter type
Returns:
dict: mapping from filter names to online states
"""
found_resource = None
for resource in self._resources:
if resource.label == resource_label:
found_resource = resource
break
if found_resource is None:
return {}
return self._resource_filters_online(found_resource, filter_type)
[docs] def set_online(self, resource, filter_type, online):
"""Sets the given filters online or offline.
Args:
resource (str): Resource label
filter_type (str): filter type
online (dict): mapping from scenario name to online flag
"""
self._filter_settings.known_filters.setdefault(resource, {}).setdefault(filter_type, {}).update(online)
[docs] def set_filter_default_online_status(self, auto_online):
"""Sets the auto_online flag.
Args:
auto_online (bool): If True, unknown filters are online by default
"""
self._filter_settings.auto_online = auto_online
if self is self._toolbox.active_link_item:
self._toolbox.link_properties_widgets[LoggingConnection].set_auto_check_filters_state(auto_online)
[docs] def refresh_resource_filter_model(self):
"""Makes resource filter mode fetch filter data from database."""
self.resource_filter_model.build_tree()
[docs] def set_filter_type_enabled(self, filter_type, enabled):
"""See base class."""
super().set_filter_type_enabled(filter_type, enabled)
self.resource_filter_model.set_filter_type_enabled(filter_type, enabled)
if self is self._toolbox.active_link_item:
self._toolbox.link_properties_widgets[LoggingConnection].set_filter_type_enabled(filter_type, enabled)
[docs] def receive_resources_from_source(self, resources):
"""See base class."""
super().receive_resources_from_source(resources)
self._pop_unused_db_maps()
self.link.update_icons()
[docs] def replace_resources_from_source(self, old, new):
"""See base class."""
super().replace_resources_from_source(old, new)
self._pop_unused_db_maps()
self.link.update_icons()
@busy_effect
[docs] def set_connection_options(self, options):
"""Overwrites connections options.
Args:
options (dict): new options
"""
if options == self.options:
return
self.options = options
project = self._toolbox.project()
sibling_conns = project.incoming_connections(self.destination)
for conn in sibling_conns:
conn.link.update_icons()
project.notify_resource_changes_to_successors(project.get_item(self.source))
project.notify_resource_changes_to_predecessors(project.get_item(self.destination))
if self is self._toolbox.active_link_item:
self._toolbox.link_properties_widgets[LoggingConnection].load_connection_options()
[docs] def _check_available_filters(self):
"""Cross-checks filter settings with source databases.
Returns:
FilterSettings: filter settings containing only filters that exist in source databases
"""
filter_settings = FilterSettings(
auto_online=self._filter_settings.auto_online,
enabled_filter_types=self._filter_settings.enabled_filter_types,
)
for resource in self._resources:
for filter_type in (SCENARIO_FILTER_TYPE, ALTERNATIVE_FILTER_TYPE):
online_filters = self._resource_filters_online(resource, filter_type)
if online_filters is not None:
filter_settings.known_filters.setdefault(resource.label, {})[filter_type] = online_filters
return filter_settings
[docs] def _resource_filters_online(self, resource, filter_type):
url = resource.url
if not url:
return None
db_map = self._get_db_map(url)
if db_map is None:
return None
db_item_type = _DATABASE_ITEM_TYPE[filter_type]
available_filters = (x["name"] for x in self._toolbox.db_mngr.get_items(db_map, db_item_type))
specific_filter_settings = self._filter_settings.known_filters.get(resource.label, {}).get(filter_type, {})
checked_specific_filter_settings = {}
for name in sorted(available_filters):
checked_specific_filter_settings[name] = specific_filter_settings.get(
name, self._filter_settings.auto_online
)
return checked_specific_filter_settings
[docs] def to_dict(self):
"""See base class."""
original = None
if self.has_filters():
# Temporarily remove unavailable filters to keep project.json clean.
original = self._filter_settings
self._filter_settings = self._check_available_filters()
d = super().to_dict()
if original is not None:
self._filter_settings = original
return d
[docs] def tear_down(self):
"""Releases system resources held by the connection."""
for db_map in self._db_maps.values():
self._toolbox.db_mngr.unregister_listener(self, db_map)
self.resource_filter_model.deleteLater()
[docs]class LoggingJump(LogMixin, Jump):
def __init__(self, *args, toolbox=None, **kwargs):
super().__init__(*args, **kwargs)
self._toolbox = toolbox
self.jump_link = None
@property
[docs] def graphics_item(self):
return self.jump_link
@staticmethod
[docs] def item_type():
return "jump"