Source code for spinetoolbox.spine_io.connection_manager

######################################################################################################################
# Copyright (C) 2017-2020 Spine project consortium
# This file is part of Spine Toolbox.
# Spine Toolbox is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option)
# any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
# Public License for more details. You should have received a copy of the GNU Lesser General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
######################################################################################################################

"""
Contains ConnectionManager class.

:author: P. Vennström (VTT)
:date:   1.6.2019
"""

from PySide2.QtCore import QObject, QThread, Signal, Slot


[docs]class ConnectionManager(QObject): """Class to manage data connections in another thread. Args: connection (class): A class derived from `SourceConnection`, e.g. `CSVConnector` """
[docs] start_table_get = Signal()
[docs] start_data_get = Signal(str, dict, int)
[docs] start_mapped_data_get = Signal(dict, dict, dict, dict, int)
# Signal with error message if connection fails
[docs] connection_failed = Signal(str)
# Signal that a connection to the datasource is ready
[docs] connection_ready = Signal()
# Signal that connection is being closed
[docs] connection_closed = Signal()
# error while reading data or connection to data source
[docs] error = Signal(str)
# signal that the data connection is getting data
[docs] fetching_data = Signal()
# data from source is ready, should send list of data and headers
[docs] data_ready = Signal(list, list)
# tables from source is ready, should send a list of str of available tables
[docs] tables_ready = Signal(dict)
# mapped data read from data source
[docs] mapped_data_ready = Signal(dict, list)
[docs] current_table_changed = Signal()
"""Emitted when the current table has changed.""" def __init__(self, connection, connection_settings, parent=None): super().__init__(parent) self._thread = None self._worker = None self._source = None self._current_table = None self._table_options = {} self._table_types = {} self._table_row_types = {} self._connection = connection self._connection_settings = connection_settings self._is_connected = False @property
[docs] def connection(self): return self._connection
@property
[docs] def current_table(self): return self._current_table
@property
[docs] def is_connected(self): return self._is_connected
@property
[docs] def table_options(self): return self._table_options
@property
[docs] def table_types(self): return self._table_types
@property
[docs] def table_row_types(self): return self._table_row_types
@property
[docs] def source(self): return self._source
@source.setter def source(self, source): self._source = source @property
[docs] def source_type(self): return self._connection.__name__
[docs] def set_table(self, table): """Sets the current table of the data source. Args: table (str): table name """ # check if table has options self._current_table = table self.current_table_changed.emit()
[docs] def request_tables(self): """Get tables tables from source, emits two singals, fetchingData: ConnectionManager is busy waiting for data startTableGet: a signal that the worker in another thread is listening to know when to run get a list of table names. """ if self.is_connected: self.fetching_data.emit() self.start_table_get.emit()
[docs] def request_data(self, table=None, max_rows=-1): """Request data from emits dataReady to with data Keyword Arguments: table {str} -- which table to get data from (default: {None}) max_rows {int} -- how many rows to read (default: {-1}) """ if self.is_connected: options = self._table_options.get(self._current_table, {}) self.fetching_data.emit() self.start_data_get.emit(table, options, max_rows)
[docs] def request_mapped_data(self, table_mappings, max_rows=-1): """Get mapped data from csv file Arguments: table_mappings {dict} -- dict with filename as key and a list of mappings as value Keyword Arguments: max_rows {int} -- number of rows to read, if -1 read all rows (default: {-1}) """ if self.is_connected: options = {} types = {} row_types = {} for table_name in table_mappings: options[table_name] = self._table_options.get(table_name, {}) types.setdefault(table_name, self._table_types.get(table_name, {})) row_types.setdefault(table_name, self._table_row_types.get(table_name, {})) self.fetching_data.emit() self.start_mapped_data_get.emit(table_mappings, options, types, row_types, max_rows)
[docs] def connection_ui(self): """ launches a modal ui that prompts the user to select source. ex: fileselect if source is a file. """ source, action = self._connection.SELECT_SOURCE_UI() if not source or not action: return False self._source = source return True
[docs] def init_connection(self): """Creates a Worker and a new thread to read source data. If there is an existing thread close that one. """ # close existing thread self.close_connection() # create new thread and worker self._thread = QThread() self._worker = ConnectionWorker(self._source, self._connection, self._connection_settings) self._worker.moveToThread(self._thread) # connect worker signals self._worker.connectionReady.connect(self._handle_connection_ready) self._worker.tablesReady.connect(self._handle_tables_ready) self._worker.dataReady.connect(self.data_ready.emit) self._worker.mappedDataReady.connect(self.mapped_data_ready.emit) self._worker.error.connect(self.error.emit) self._worker.connectionFailed.connect(self.connection_failed.emit) # connect start working signals self.start_table_get.connect(self._worker.tables) self.start_data_get.connect(self._worker.data) self.start_mapped_data_get.connect(self._worker.mapped_data) self.connection_closed.connect(self._worker.disconnect) # when thread is started, connect worker to source self._thread.started.connect(self._worker.init_connection) self._thread.start()
[docs] def _handle_connection_ready(self): self._is_connected = True self.connection_ready.emit()
@Slot("QVariant")
[docs] def _handle_tables_ready(self, table_options): if isinstance(table_options, list): table_options = {name: {} for name in table_options} # save table options if they don't already exists for key, table_settings in table_options.items(): options = table_settings.get("options", {}) if options is not None: self._table_options.setdefault(key, options) # save table types if they don't already exists for key, table_settings in table_options.items(): types = table_settings.get("types", {}) if types is not None: self._table_types.setdefault(key, types) # save table row types if they don't already exists for key, table_settings in table_options.items(): row_types = table_settings.get("row_types", {}) if row_types is not None: self._table_row_types.setdefault(key, row_types) tables = {k: t.get("mapping", None) for k, t in table_options.items()} self.tables_ready.emit(tables) # update options if a sheet is selected if self._current_table in self._table_options: self.current_table_changed.emit()
@Slot()
[docs] def update_options(self, options): if not self._current_table: return self._table_options.setdefault(self._current_table, {}).update(options) self.request_data(self._current_table, 100)
[docs] def get_current_options(self): if not self._current_table: return {} return self._table_options.get(self._current_table, {})
[docs] def get_current_option_value(self, option_key): """Returns the value for option_key for the current table or the default value.""" current_options = self._table_options.get(self._current_table, {}) option_value = current_options.get(option_key) if option_value is None: option_specification = self._connection.OPTIONS[option_key] return option_specification["default"] return option_value
[docs] def set_table_options(self, options): """Sets connection manager options for current connector Args: options (dict): settings for the tables """ self._table_options.update(options) if self._current_table in self._table_options: self.current_table_changed.emit()
[docs] def set_table_types(self, types): """Sets connection manager types for current connector Args: types (dict): dict with types settings, column (int) as key, type as value """ self._table_types.update(types)
[docs] def set_table_row_types(self, types): """Sets connection manager types for current connector Arguments: types {dict} -- Dict with types settings, row (int) as key, type as value """ self._table_row_types.update(types)
[docs] def close_connection(self): """Closes and deletes thread and worker """ self._is_connected = False self.connection_closed.emit() if self._worker: self._worker.deleteLater() self._worker = None if self._thread: self._thread.quit() self._thread.wait()
[docs]class ConnectionWorker(QObject): """A class for delegating SourceConnection operations to another QThread. Args: source (str): path of the source file connection (class): A class derived from `SourceConnection` for connecting to the source file """ # Signal with error message if connection fails
[docs] connectionFailed = Signal(str)
# Signal with error message if something errors
[docs] error = Signal(str)
# Signal that connection is ready to be read
[docs] connectionReady = Signal()
# Signal when tables from source is ready, list of tablenames
[docs] tablesReady = Signal(list)
# Signal when data from a specific table in source is ready, list of data and list of headers
[docs] dataReady = Signal(list, list)
# Signal when data is read and mapped, dict with data and list of errors when reading data with mappings
[docs] mappedDataReady = Signal(dict, list)
def __init__(self, source, connection, connection_settings, parent=None): super().__init__(parent) self._source = source self._connection = connection(connection_settings)
[docs] def init_connection(self): """ Connect to data source """ if self._source: try: self._connection.connect_to_source(self._source) self.connectionReady.emit() except Exception as error: self.connectionFailed.emit(f"Could not connect to source: {error}") raise error else: self.connectionFailed.emit("Connection has no source")
[docs] def tables(self): try: tables = self._connection.get_tables() self.tablesReady.emit(tables) except Exception as error: self.error.emit(f"Could not get tables from source: {error}") raise error
[docs] def data(self, table, options, max_rows): try: data, header = self._connection.get_data(table, options, max_rows) self.dataReady.emit(data, header) except Exception as error: self.error.emit(f"Could not get data from source: {error}") raise error
[docs] def mapped_data(self, table_mappings, options, types, table_row_types, max_rows): try: data, errors = self._connection.get_mapped_data(table_mappings, options, types, table_row_types, max_rows) self.mappedDataReady.emit(data, errors) except Exception as error: self.error.emit(f"Could not get mapped data from source: {error}") raise error
[docs] def disconnect(self): try: self._connection.disconnect() except Exception as error: self.error.emit(f"Could not disconnect from source: {error}") raise error