Source code for spinetoolbox.spine_io.exporters.excel

######################################################################################################################
# Copyright (C) 2017-2020 Spine project consortium
# This file is part of Spine Toolbox.
# Spine Toolbox is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option)
# any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
# Public License for more details. You should have received a copy of the GNU Lesser General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
######################################################################################################################

"""
Framework for exporting a database to Excel file.

:author: P. Vennström (VTT), A. Soininen (VTT)
:date:   31.1.2020
"""

from itertools import groupby
from operator import itemgetter
import numpy as np
from openpyxl import Workbook
from spinedb_api import from_database, TimeSeries, TimePattern, DateTime, Duration, to_database


[docs]def _get_objects_and_parameters(db): """Exports all object data from spine database into unstacked list of lists Args: db (spinedb_api.DatabaseMapping): database mapping for database Returns: (List, List) First list contains parameter data, second one json data """ # get all objects obj = db.object_list().all() # get all object classes obj_class = db.object_class_list().all() obj_class_id_2_name = {oc.id: oc.name for oc in obj_class} # get all parameter values pval = db.object_parameter_value_list().all() # get all parameter definitions par = db.object_parameter_definition_list().all() # make all in same format par = [(p.object_class_name, None, p.parameter_name, None) for p in par] pval = [(p.object_class_name, p.object_name, p.parameter_name, from_database(p.value)) for p in pval] obj = [(obj_class_id_2_name[p.class_id], p.name, None, None) for p in obj] obj_class = [(p.name, None, None, None) for p in obj_class] object_and_par = pval + par + obj + obj_class object_par = [] object_json = [] object_ts = [] object_timepattern = [] for d in object_and_par: if d[3] is None or isinstance(d[3], (int, float, str, DateTime, Duration)): object_par.append(d) elif isinstance(d[3], list): object_json.append(d) object_par.append(d[:-1] + (None,)) elif isinstance(d[3], TimeSeries): object_ts.append(d) object_par.append(d[:-1] + (None,)) elif isinstance(d[3], TimePattern): object_timepattern.append(d) object_par.append(d[:-1] + (None,)) else: raise Warning(f"Unsuported export type: {type(d[3])}, Skipping export") return object_par, object_json, object_ts, object_timepattern
[docs]def _get_relationships_and_parameters(db): """Exports all relationship data from spine database into unstacked list of lists Args: db (spinedb_api.DatabaseMapping): database mapping for database Returns: (List, List) First list contains parameter data, second one json data """ rel_class = db.wide_relationship_class_list().all() rel = db.wide_relationship_list().all() rel_par = db.relationship_parameter_definition_list().all() rel_par_value = db.relationship_parameter_value_list().all() rel_class_id_2_name = {rc.id: rc.name for rc in rel_class} out_data = [ [r.relationship_class_name, r.object_name_list, r.parameter_name, from_database(r.value)] for r in rel_par_value ] rel_with_par = set(r.object_name_list for r in rel_par_value) rel_without_par = [ [rel_class_id_2_name[r.class_id], r.object_name_list, None, None] for r in rel if r.object_name_list not in rel_with_par ] rel_class_par = [[r.relationship_class_name, None, r.parameter_name, None] for r in rel_par] rel_class_with_par = [r.relationship_class_name for r in rel_par] rel_class_without_par = [[r.name, None, None, None] for r in rel_class if r.name not in rel_class_with_par] rel_data = out_data + rel_without_par + rel_class_par + rel_class_without_par rel_par = [] rel_json = [] rel_ts = [] rel_timepattern = [] for d in rel_data: if d[3] is None or isinstance(d[3], (int, float, str, DateTime, Duration)): rel_par.append(d) elif isinstance(d[3], list): rel_json.append(d) rel_par.append(d[:-1] + [None]) elif isinstance(d[3], TimeSeries): rel_ts.append(d) rel_par.append(d[:-1] + [None]) elif isinstance(d[3], TimePattern): rel_timepattern.append(d) rel_par.append(d[:-1] + [None]) else: raise Warning(f"Unsuported export type: {type(d[3])}, Skipping export") return rel_par, rel_json, rel_class, rel_ts, rel_timepattern
[docs]def _unstack_list_of_tuples(data, headers, key_cols, value_name_col, value_col): """Unstacks list of lists or list of tuples and creates a list of namedtuples whit unstacked data (pivoted data) Args: data (List[List]): List of lists with data to unstack headers (List[str]): List of header names for data key_cols (List[Int]): List of index for column that are keys, columns to not unstack value_name_col (Int): index to column containing name of data to unstack value_col (Int): index to column containing value to value_name_col Returns: (List[List]): List of list with headers in headers list (List): List of header names for each item in inner list """ # find header names if isinstance(value_name_col, list) and len(value_name_col) > 1: value_name_getter = itemgetter(*value_name_col) value_names = sorted( set(value_name_getter(x) for x in data if not any(i is None for i in value_name_getter(x))) ) else: if isinstance(value_name_col, list): value_name_col = value_name_col[0] value_name_getter = itemgetter(value_name_col) value_names = sorted(set(x[value_name_col] for x in data if x[value_name_col] is not None)) key_names = [headers[n] for n in key_cols] # value_names = sorted(set(x[value_name_col] for x in data if x[value_name_col] is not None)) headers = key_names + value_names # remove data with invalid key cols keyfunc = lambda x: [x[k] for k in key_cols] data = [x for x in data if None not in keyfunc(x)] data = sorted(data, key=keyfunc) # unstack data data_list_out = [] for k, k_data in groupby(data, key=keyfunc): if None in k: continue line_data = [None] * len(value_names) for d in k_data: if value_name_getter(d) in value_names and d[value_col] is not None: line_data[value_names.index(value_name_getter(d))] = d[value_col] data_list_out.append(k + line_data) return data_list_out, headers
[docs]def _get_unstacked_relationships(db): """Gets all data for relationships in a unstacked list of list Args: db (spinedb_api.DatabaseMapping): database mapping for database Returns: (list, list, list, list): stacked relationships, stacked JSON, stacked time series and stacked time patterns """ data, data_json, rel_class, data_ts, data_timepattern = _get_relationships_and_parameters(db) class_2_obj_list = {rc.name: rc.object_class_name_list.split(',') for rc in rel_class} keyfunc = lambda x: x[0] data_json = sorted(data_json, key=keyfunc) parsed_json = [] # json data, split by relationship class for k, v in groupby(data_json, key=keyfunc): json_vals = [] for row in v: rel_list = row[1].split(',') parameter = row[2] val = row[3] json_vals.append([rel_list + [parameter], val]) if json_vals: object_classes = class_2_obj_list[k] parsed_json.append([k, object_classes, json_vals]) data_ts = sorted(data_ts, key=keyfunc) parsed_ts = [] # ts data, split by relationship class for k, v in groupby(data_ts, key=keyfunc): ts_vals = [] for row in v: rel_list = row[1].split(',') parameter = row[2] val = row[3] ts_vals.append([rel_list + [parameter], val]) if ts_vals: object_classes = class_2_obj_list[k] parsed_ts.append([k, object_classes, ts_vals]) data_timepattern = sorted(data_timepattern, key=keyfunc) parsed_timepattern = [] # ts data, split by relationship class for k, v in groupby(data_timepattern, key=keyfunc): tp_vals = [] for row in v: rel_list = row[1].split(',') parameter = row[2] val = row[3] tp_vals.append([rel_list + [parameter], val]) if tp_vals: object_classes = class_2_obj_list[k] parsed_timepattern.append([k, object_classes, tp_vals]) # parameter data, split by relationship class stacked_rels = [] data = sorted(data, key=keyfunc) for k, v in groupby(data, key=keyfunc): values = list(v) rel, par_names = _unstack_list_of_tuples( values, ["relationship_class", "relationship", "parameter", "value"], [0, 1], 2, 3 ) if rel: parameters = par_names[2:] else: parameters = list(set(p[2] for p in values)) rel = [r[1].split(',') + list(r[2:]) for r in rel] object_classes = class_2_obj_list[k] stacked_rels.append([k, rel, object_classes, parameters]) return stacked_rels, parsed_json, parsed_ts, parsed_timepattern
[docs]def _get_unstacked_objects(db): """Gets all data for objects in a unstacked list of list Args: db (spinedb_api.DatabaseMapping): database mapping for database Returns: (list, list, list, list): stacked objects, parsed JSON, parsed time series and parsed time patterns """ data, data_json, data_ts, data_timepattern = _get_objects_and_parameters(db) keyfunc = lambda x: x[0] parsed_json = [] data_json = sorted(data_json, key=keyfunc) for k, v in groupby(data_json, key=keyfunc): json_vals = [] for row in v: obj = row[1] parameter = row[2] val = row[3] json_vals.append([[obj, parameter], val]) if json_vals: parsed_json.append([k, [k], json_vals]) parsed_ts = [] data_ts = sorted(data_ts, key=keyfunc) for k, v in groupby(data_ts, key=keyfunc): ts_vals = [] for row in v: obj = row[1] parameter = row[2] val = row[3] ts_vals.append([[obj, parameter], val]) if ts_vals: parsed_ts.append([k, [k], ts_vals]) data_timepattern = sorted(data_timepattern, key=keyfunc) parsed_timepattern = [] # ts data, split by object class for k, v in groupby(data_timepattern, key=keyfunc): tp_vals = [] for row in v: obj = row[1] parameter = row[2] val = row[3] tp_vals.append([[obj, parameter], val]) if tp_vals: parsed_timepattern.append([k, [k], tp_vals]) stacked_obj = [] data = sorted(data, key=keyfunc) for k, v in groupby(data, key=keyfunc): values = list(v) obj, par_names = _unstack_list_of_tuples(values, ["object_class", "object", "parameter", "value"], [0, 1], 2, 3) if obj: parameters = par_names[2:] else: parameters = list(set(p[2] for p in values if p[2] is not None)) obj = [[o[1]] + list(o[2:]) for o in obj] object_classes = [k] stacked_obj.append([k, obj, object_classes, parameters]) return stacked_obj, parsed_json, parsed_ts, parsed_timepattern
[docs]def _write_relationships_to_xlsx(wb, relationship_data): """Writes Classes, parameter and parameter values for relationships. Writes one sheet per relationship class. Args: wb (openpyxl.Workbook): excel workbook to write too. relationship_data (List[List]): List of lists containing relationship data give by function get_unstacked_relationships """ for rel in relationship_data: ws = wb.create_sheet() # try setting the sheet name to relationship class name # sheet name can only be 31 chars log title = "rel_" + rel[0] if len(title) < 32: ws.title = title ws['A1'] = "Sheet type" ws['A2'] = "relationship" ws['B1'] = "Data type" ws['B2'] = "Parameter" ws['C1'] = "relationship class name" ws['C2'] = rel[0] ws['D1'] = "Number of relationship dimensions" ws['D2'] = len(rel[2]) ws['E1'] = "Number of pivoted relationship dimensions" ws['E2'] = 0 for c, val in enumerate(rel[2]): ws.cell(row=4, column=c + 1).value = val for c, val in enumerate(rel[3]): ws.cell(row=4, column=len(rel[2]) + 1 + c).value = val start_row = 5 start_col = 1 for r, line in enumerate(rel[1]): for c, val in enumerate(line): if isinstance(val, (Duration, DateTime)): val = to_database(val) ws.cell(row=start_row + r, column=start_col + c).value = val
[docs]def _write_json_array_to_xlsx(wb, data, sheet_type): """Writes json array data for object classes and relationship classes. Writes one sheet per relationship/object class. Args: wb (openpyxl.Workbook): excel workbook to write too. data (List[List]): List of lists containing json data give by function get_unstacked_objects and get_unstacked_relationships sheet_type (str): str with value "relationship" or "object" telling if data is for a relationship or object """ for i, d in enumerate(data): if sheet_type == "relationship": sheet_title = "json_" elif sheet_type == "object": sheet_title = "json_" else: raise ValueError("sheet_type must be a str with value 'relationship' or 'object'") ws = wb.create_sheet() # sheet name can only be 31 chars log title = sheet_title + d[0] if len(title) < 32: ws.title = title else: ws.title = '{}_json{}'.format(sheet_type, i) ws['A1'] = "Sheet type" ws['A2'] = sheet_type ws['B1'] = "Data type" ws['B2'] = "1d array" ws['C1'] = sheet_type + " class name" ws['C2'] = d[0] if sheet_type == "relationship": ws['D1'] = "Number of relationship dimensions" ws['D2'] = len(d[1]) title_rows = d[1] + ["json parameter"] for c, val in enumerate(title_rows): ws.cell(row=4 + c, column=1).value = val start_row = 4 + len(title_rows) for col, obj_list in enumerate(d[2]): for obj_iter, obj in enumerate(obj_list[0]): ws.cell(row=4 + obj_iter, column=2 + col).value = obj for row_iter, json_val in enumerate(obj_list[1]): ws.cell(row=start_row + row_iter, column=2 + col).value = json_val
[docs]def _write_TimeSeries_to_xlsx(wb, data, sheet_type, data_type): """Writes spinedb_api TimeSeries data for object classes and relationship classes. Writes one sheet per relationship/object class. Args: wb (openpyxl.Workbook): excel workbook to write too. data (List[List]): List of lists containing json data give by function get_unstacked_objects and get_unstacked_relationships sheet_type (str): str with value "relationship" or "object" telling if data is for a relationship or object """ for i, d in enumerate(data): if sheet_type == "relationship": sheet_title = "ts_" elif sheet_type == "object": sheet_title = "ts_" else: raise ValueError("sheet_type must be a str with value 'relationship' or 'object'") if data_type.lower() == "time series": index_name = "timestamp" elif data_type.lower() == "time pattern": index_name = "pattern" else: raise ValueError("data_type must be a str with value 'time series' or 'time pattern'") ws = wb.create_sheet() # sheet name can only be 31 chars log title = sheet_title + d[0] if len(title) < 32: ws.title = title else: ws.title = '{}_ts{}'.format(sheet_type, i) ws['A1'] = "Sheet type" ws['A2'] = sheet_type ws['B1'] = "Data type" ws['B2'] = data_type ws['C1'] = sheet_type + " class name" ws['C2'] = d[0] if sheet_type == "relationship": ws['D1'] = "Number of relationship dimensions" ws['D2'] = len(d[1]) title_rows = d[1] + [index_name] for c, val in enumerate(title_rows): ws.cell(row=4 + c, column=1).value = val # find common timestamps unique_timestamps = np.unique(np.concatenate([v[1].indexes for v in d[2]])) # write object names start_row = 4 + len(title_rows) for col, obj_list in enumerate(d[2]): for obj_iter, obj in enumerate(obj_list[0]): ws.cell(row=4 + obj_iter, column=2 + col).value = obj # write timestamps if data_type.lower() == "time series": for row_iter, time in enumerate(unique_timestamps): ws.cell(row=start_row + row_iter, column=1).value = str(np.datetime_as_string(time)) else: for row_iter, time in enumerate(unique_timestamps): ws.cell(row=start_row + row_iter, column=1).value = str(time) # write values for col, obj_list in enumerate(d[2]): for row_index, value in zip( np.where(np.isin(unique_timestamps, obj_list[1].indexes))[0], obj_list[1].values ): ws.cell(row=start_row + row_index, column=2 + col).value = value
[docs]def _write_objects_to_xlsx(wb, object_data): """Writes Classes, parameter and parameter values for objects. Writes one sheet per relationship/object class. Args: wb (openpyxl.Workbook): excel workbook to write too. object_data (List[List]): List of lists containing relationship data give by function get_unstacked_objects """ for i, obj in enumerate(object_data): ws = wb.create_sheet() # try setting the sheet name to object class name # sheet name can only be 31 chars log title = "obj_" + obj[0] if len(title) < 32: ws.title = title else: ws.title = "object_class{}".format(i) ws['A1'] = "Sheet type" ws['A2'] = "object" ws['B1'] = "Data type" ws['B2'] = "Parameter" ws['C1'] = "object class name" ws['C2'] = obj[0] for c, val in enumerate(obj[2]): ws.cell(row=4, column=c + 1).value = val for c, val in enumerate(obj[3]): ws.cell(row=4, column=len(obj[2]) + 1 + c).value = val start_row = 5 start_col = 1 for r, line in enumerate(obj[1]): for c, val in enumerate(line): if isinstance(val, (Duration, DateTime)): val = to_database(val) ws.cell(row=start_row + r, column=start_col + c).value = val
[docs]def export_spine_database_to_xlsx(db, filepath): """Writes all data in a spine database into an excel file. Args: db (spinedb_api.DatabaseMapping): database mapping for database. filepath (str): str with filepath to save excel file to. """ obj_data, obj_json_data, obj_ts, obj_timepattern = _get_unstacked_objects(db) rel_data, rel_json_data, rel_ts, rel_timepattern = _get_unstacked_relationships(db) wb = Workbook() _write_relationships_to_xlsx(wb, rel_data) _write_objects_to_xlsx(wb, obj_data) _write_json_array_to_xlsx(wb, obj_json_data, "object") _write_json_array_to_xlsx(wb, rel_json_data, "relationship") _write_TimeSeries_to_xlsx(wb, obj_ts, "object", "time series") _write_TimeSeries_to_xlsx(wb, rel_ts, "relationship", "time series") _write_TimeSeries_to_xlsx(wb, obj_timepattern, "object", "time pattern") _write_TimeSeries_to_xlsx(wb, rel_timepattern, "relationship", "time pattern") wb.save(filepath) wb.close()