######################################################################################################################
# Copyright (C) 2017 - 2019 Spine project consortium
# This file is part of Spine Toolbox.
# Spine Toolbox is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option)
# any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
# Public License for more details. You should have received a copy of the GNU Lesser General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
######################################################################################################################
"""
Functions to import and export from excel to spine database.
:author: P. Vennström (VTT)
:date: 21.8.2018
"""
# TODO: PEP8: Do not use bare except. Too broad exception clause
from collections import namedtuple
from itertools import groupby, islice, takewhile
import json
from operator import itemgetter
from openpyxl import Workbook, load_workbook
from openpyxl.utils import get_column_letter
import numpy as np
from spinedb_api import import_data, from_database, TimeSeries, TimeSeriesVariableResolution, TimePattern
[docs]SheetData = namedtuple(
"SheetData",
["sheet_name", "class_name", "object_classes", "parameters", "parameter_values", "objects", "class_type"],
)
[docs]def import_xlsx_to_db(db, filepath):
"""reads excel file in 'filepath' and insert into database in mapping 'db'.
Returns two list, one with succesful writes to database, one with errors
when trying to write to database.
Args:
db (spinedb_api.DatabaseMapping): database mapping for database to write to
filepath (str): str with filepath to excel file to read from
Returns:
(Int, List) Returns number of inserted items and a list of
error information on all failed writes
"""
obj_data, rel_data, error_log = read_spine_xlsx(filepath)
object_classes = []
objects = []
object_parameters = []
object_values = []
for sheet in obj_data:
object_classes.append(sheet.class_name)
objects.extend([(sheet.class_name, o) for o in sheet.objects])
object_parameters.extend([(sheet.class_name, o) for o in sheet.parameters])
d_getter = itemgetter(*[1, 2, 0, 3])
object_values.extend([(sheet.class_name,) + d_getter(d) for d in sheet.parameter_values])
rel_classes = []
rels = []
rel_parameters = []
rel_values = []
for sheet in rel_data:
num_oc = len(sheet.object_classes)
rel_getter = itemgetter(*range(1, num_oc + 1))
d_getter = itemgetter(*[num_oc + 1, 0, num_oc + 2])
rel_classes.append((sheet.class_name, sheet.object_classes))
rels.extend([(sheet.class_name, o) for o in sheet.objects])
rel_parameters.extend([(sheet.class_name, o) for o in sheet.parameters])
rel_values.extend([(sheet.class_name, rel_getter(d)) + d_getter(d) for d in sheet.parameter_values])
object_values = [o[:-2] + (o[-1],) for o in object_values]
rel_values = [rel[:-2] + (rel[-1],) for rel in rel_values]
num_imported, errors = import_data(
db, object_classes, rel_classes, object_parameters, rel_parameters, objects, rels, object_values, rel_values
)
error_log.extend(errors)
return num_imported, error_log
[docs]def get_objects_and_parameters(db):
"""Exports all object data from spine database into unstacked list of lists
Args:
db (spinedb_api.DatabaseMapping): database mapping for database
Returns:
(List, List) First list contains parameter data, second one json data
"""
# get all objects
obj = db.object_list().all()
# get all object classes
obj_class = db.object_class_list().all()
obj_class_id_2_name = {oc.id: oc.name for oc in obj_class}
# get all parameter values
pval = db.object_parameter_value_list().all()
# get all parameter definitions
par = db.object_parameter_definition_list().all()
# make all in same format
par = [(p.object_class_name, None, p.parameter_name, None) for p in par]
pval = [(p.object_class_name, p.object_name, p.parameter_name, from_database(p.value)) for p in pval]
obj = [(obj_class_id_2_name[p.class_id], p.name, None, None) for p in obj]
obj_class = [(p.name, None, None, None) for p in obj_class]
object_and_par = pval + par + obj + obj_class
object_par = []
object_json = []
object_ts = []
object_timepattern = []
for d in object_and_par:
if d[3] is None or isinstance(d[3], (int, float, str)):
object_par.append(d)
elif isinstance(d[3], list):
object_json.append(d)
object_par.append(d[:-1] + (None,))
elif isinstance(d[3], TimeSeries):
object_ts.append(d)
object_par.append(d[:-1] + (None,))
elif isinstance(d[3], TimePattern):
object_timepattern.append(d)
object_par.append(d[:-1] + (None,))
else:
raise Warning(f"Unsuported export type: {type(d[3])}, Skipping export")
return object_par, object_json, object_ts, object_timepattern
[docs]def get_relationships_and_parameters(db):
"""Exports all relationship data from spine database into unstacked list of lists
Args:
db (spinedb_api.DatabaseMapping): database mapping for database
Returns:
(List, List) First list contains parameter data, second one json data
"""
rel_class = db.wide_relationship_class_list().all()
rel = db.wide_relationship_list().all()
rel_par = db.relationship_parameter_definition_list().all()
rel_par_value = db.relationship_parameter_value_list().all()
rel_class_id_2_name = {rc.id: rc.name for rc in rel_class}
out_data = [
[r.relationship_class_name, r.object_name_list, r.parameter_name, from_database(r.value)] for r in rel_par_value
]
rel_with_par = set(r.object_name_list for r in rel_par_value)
rel_without_par = [
[rel_class_id_2_name[r.class_id], r.object_name_list, None, None]
for r in rel
if r.object_name_list not in rel_with_par
]
rel_class_par = [[r.relationship_class_name, None, r.parameter_name, None] for r in rel_par]
rel_class_with_par = [r.relationship_class_name for r in rel_par]
rel_class_without_par = [[r.name, None, None, None] for r in rel_class if r.name not in rel_class_with_par]
rel_data = out_data + rel_without_par + rel_class_par + rel_class_without_par
rel_par = []
rel_json = []
rel_ts = []
rel_timepattern = []
for d in rel_data:
if d[3] is None or isinstance(d[3], (int, float, str)):
rel_par.append(d)
elif isinstance(d[3], list):
rel_json.append(d)
rel_par.append(d[:-1] + [None])
elif isinstance(d[3], TimeSeries):
rel_ts.append(d)
rel_par.append(d[:-1] + [None])
elif isinstance(d[3], TimePattern):
rel_timepattern.append(d)
rel_par.append(d[:-1] + [None])
else:
raise Warning(f"Unsuported export type: {type(d[3])}, Skipping export")
return rel_par, rel_json, rel_class, rel_ts, rel_timepattern
[docs]def unstack_list_of_tuples(data, headers, key_cols, value_name_col, value_col):
"""Unstacks list of lists or list of tuples and creates a list of namedtuples
whit unstacked data (pivoted data)
Args:
data (List[List]): List of lists with data to unstack
headers (List[str]): List of header names for data
key_cols (List[Int]): List of index for column that are keys, columns to not unstack
value_name_col (Int): index to column containing name of data to unstack
value_col (Int): index to column containing value to value_name_col
Returns:
(List[List]): List of list with headers in headers list
(List): List of header names for each item in inner list
"""
# find header names
if isinstance(value_name_col, list) and len(value_name_col) > 1:
value_name_getter = itemgetter(*value_name_col)
value_names = sorted(
set(value_name_getter(x) for x in data if not any(i is None for i in value_name_getter(x)))
)
else:
if isinstance(value_name_col, list):
value_name_col = value_name_col[0]
value_name_getter = itemgetter(value_name_col)
value_names = sorted(set(x[value_name_col] for x in data if x[value_name_col] is not None))
key_names = [headers[n] for n in key_cols]
# value_names = sorted(set(x[value_name_col] for x in data if x[value_name_col] is not None))
headers = key_names + value_names
# remove data with invalid key cols
keyfunc = lambda x: [x[k] for k in key_cols]
data = [x for x in data if None not in keyfunc(x)]
data = sorted(data, key=keyfunc)
# unstack data
data_list_out = []
for k, k_data in groupby(data, key=keyfunc):
if None in k:
continue
line_data = [None] * len(value_names)
for d in k_data:
if value_name_getter(d) in value_names and d[value_col] is not None:
line_data[value_names.index(value_name_getter(d))] = d[value_col]
data_list_out.append(k + line_data)
return data_list_out, headers
[docs]def stack_list_of_tuples(data, headers, key_cols, value_cols):
"""Stacks list of lists or list of tuples and creates a list of namedtuples
with stacked data (unpivoted data)
Args:
data (List[List]): List of lists with data to unstack
headers (List[str]): List of header names for data
key_cols (List[Int]): List of index for columns that are keys
value_cols (List[Int]): List of index for columns containing values to stack
Returns:
(List[namedtuple]): List of namedtuples whit fields given by headers
and 'parameter' and 'value' which contains stacked values
"""
value_names = [headers[n] for n in value_cols]
key_names = [headers[n] for n in key_cols]
new_tuple_names = key_names + ["parameter", "value"]
NewDataTuple = namedtuple("Data", new_tuple_names)
# takes unstacked data and duplicates columns in key_cols and then zips
# them with values in value_cols
new_data_list = [
list(
map(
NewDataTuple._make,
[
a + [b] + [c]
for a, b, c in zip(
[[dl[k] for k in key_cols]] * len(value_cols), value_names, [dl[vk] for vk in value_cols]
)
],
)
)
for dl in data
]
new_data_list = [item for sublist in new_data_list for item in sublist]
return new_data_list
[docs]def unpack_json_parameters(data, json_index):
out_data = []
for data_row in data:
json_data = json.loads(data_row[json_index].replace("\n", ""))
if json_index == 0:
key_cols = [list(data_row[json_index + 1 :])] * len(json_data)
else:
key_cols = [list(data_row[:json_index]) + list(data_row[json_index + 1 :])] * len(json_data)
out_data += [a + [b] + [c] for a, b, c in zip(key_cols, range(0, len(json_data), 1), json_data)]
return out_data
[docs]def pack_json_parameters(data, key_cols, value_col, index_col=None):
out_data = []
# group by keys cols
keyfunc = lambda x: [x[k] for k in key_cols]
data = sorted(data, key=keyfunc)
for key, grouped in groupby(data, key=keyfunc):
# sort if index is given.
if index_col is not None:
grouped = sorted(grouped, key=lambda x: x[index_col])
# pack values into json
values = [g[value_col] for g in grouped]
json_val = json.dumps(values)
out_data.append(key + [json_val])
return out_data
[docs]def get_unstacked_relationships(db):
"""Gets all data for relationships in a unstacked list of list
Args:
db (spinedb_api.DatabaseMapping): database mapping for database
Returns:
(List, List): Two list of data for relationship, one with parameter values
and the second one with json values
"""
data, data_json, rel_class, data_ts, data_timepattern = get_relationships_and_parameters(db)
class_2_obj_list = {rc.name: rc.object_class_name_list.split(',') for rc in rel_class}
keyfunc = lambda x: x[0]
data_json = sorted(data_json, key=keyfunc)
parsed_json = []
# json data, split by relationship class
for k, v in groupby(data_json, key=keyfunc):
json_vals = []
for row in v:
rel_list = row[1].split(',')
parameter = row[2]
val = row[3]
json_vals.append([rel_list + [parameter], val])
if json_vals:
object_classes = class_2_obj_list[k]
parsed_json.append([k, object_classes, json_vals])
data_ts = sorted(data_ts, key=keyfunc)
parsed_ts = []
# ts data, split by relationship class
for k, v in groupby(data_ts, key=keyfunc):
ts_vals = []
for row in v:
rel_list = row[1].split(',')
parameter = row[2]
val = row[3]
ts_vals.append([rel_list + [parameter], val])
if ts_vals:
object_classes = class_2_obj_list[k]
parsed_ts.append([k, object_classes, ts_vals])
data_timepattern = sorted(data_timepattern, key=keyfunc)
parsed_timepattern = []
# ts data, split by relationship class
for k, v in groupby(data_timepattern, key=keyfunc):
tp_vals = []
for row in v:
rel_list = row[1].split(',')
parameter = row[2]
val = row[3]
tp_vals.append([rel_list + [parameter], val])
if tp_vals:
object_classes = class_2_obj_list[k]
parsed_timepattern.append([k, object_classes, tp_vals])
# parameter data, split by relationship class
stacked_rels = []
data = sorted(data, key=keyfunc)
for k, v in groupby(data, key=keyfunc):
values = list(v)
rel, par_names = unstack_list_of_tuples(
values, ["relationship_class", "relationship", "parameter", "value"], [0, 1], 2, 3
)
if rel:
parameters = par_names[2:]
else:
parameters = list(set([p[2] for p in values]))
rel = [r[1].split(',') + list(r[2:]) for r in rel]
object_classes = class_2_obj_list[k]
stacked_rels.append([k, rel, object_classes, parameters])
return stacked_rels, parsed_json, parsed_ts, parsed_timepattern
[docs]def get_unstacked_objects(db):
"""Gets all data for objects in a unstacked list of list
Args:
db (spinedb_api.DatabaseMapping): database mapping for database
Returns:
(List, List): Two list of data for objects, one with parameter values
and the second one with json values
"""
data, data_json, data_ts, data_timepattern = get_objects_and_parameters(db)
keyfunc = lambda x: x[0]
parsed_json = []
data_json = sorted(data_json, key=keyfunc)
for k, v in groupby(data_json, key=keyfunc):
json_vals = []
for row in v:
obj = row[1]
parameter = row[2]
val = row[3]
json_vals.append([[obj, parameter], val])
if json_vals:
parsed_json.append([k, [k], json_vals])
parsed_ts = []
data_ts = sorted(data_ts, key=keyfunc)
for k, v in groupby(data_ts, key=keyfunc):
ts_vals = []
for row in v:
obj = row[1]
parameter = row[2]
val = row[3]
ts_vals.append([[obj, parameter], val])
if ts_vals:
parsed_ts.append([k, [k], ts_vals])
data_timepattern = sorted(data_timepattern, key=keyfunc)
parsed_timepattern = []
# ts data, split by object class
for k, v in groupby(data_timepattern, key=keyfunc):
tp_vals = []
for row in v:
obj = row[1]
parameter = row[2]
val = row[3]
tp_vals.append([[obj, parameter], val])
if tp_vals:
parsed_timepattern.append([k, [k], tp_vals])
stacked_obj = []
data = sorted(data, key=keyfunc)
for k, v in groupby(data, key=keyfunc):
values = list(v)
obj, par_names = unstack_list_of_tuples(values, ["object_class", "object", "parameter", "value"], [0, 1], 2, 3)
if obj:
parameters = par_names[2:]
else:
parameters = list(set([p[2] for p in values if p[2] is not None]))
obj = [[o[1]] + list(o[2:]) for o in obj]
object_classes = [k]
stacked_obj.append([k, obj, object_classes, parameters])
return stacked_obj, parsed_json, parsed_ts, parsed_timepattern
[docs]def write_relationships_to_xlsx(wb, relationship_data):
"""Writes Classes, parameter and parameter values for relationships.
Writes one sheet per relationship class.
Args:
wb (openpyxl.Workbook): excel workbook to write too.
relationship_data (List[List]): List of lists containing relationship
data give by function get_unstacked_relationships
"""
for rel in relationship_data:
ws = wb.create_sheet()
# try setting the sheetname to relationship class name
# sheet name can only be 31 chars log
title = "rel_" + rel[0]
if len(title) < 32:
ws.title = title
ws['A1'] = "Sheet type"
ws['A2'] = "relationship"
ws['B1'] = "Data type"
ws['B2'] = "Parameter"
ws['C1'] = "relationship class name"
ws['C2'] = rel[0]
ws['D1'] = "Number of relationship dimensions"
ws['D2'] = len(rel[2])
ws['E1'] = "Number of pivoted relationship dimensions"
ws['E2'] = 0
for c, val in enumerate(rel[2]):
ws.cell(row=4, column=c + 1).value = val
for c, val in enumerate(rel[3]):
ws.cell(row=4, column=len(rel[2]) + 1 + c).value = val
start_row = 5
start_col = 1
for r, line in enumerate(rel[1]):
for c, val in enumerate(line):
ws.cell(row=start_row + r, column=start_col + c).value = val
[docs]def write_json_array_to_xlsx(wb, data, sheet_type):
"""Writes json array data for object classes and relationship classes.
Writes one sheet per relationship/object class.
Args:
wb (openpyxl.Workbook): excel workbook to write too.
data (List[List]): List of lists containing json data give by function
get_unstacked_objects and get_unstacked_relationships
sheet_type (str): str with value "relationship" or "object" telling if data is for a relationship or object
"""
for i, d in enumerate(data):
if sheet_type == "relationship":
sheet_title = "json_"
elif sheet_type == "object":
sheet_title = "json_"
else:
raise ValueError("sheet_type must be a str with value 'relationship' or 'object'")
ws = wb.create_sheet()
# sheet name can only be 31 chars log
title = sheet_title + d[0]
if len(title) < 32:
ws.title = title
else:
ws.title = '{}_json{}'.format(sheet_type, i)
ws['A1'] = "Sheet type"
ws['A2'] = sheet_type
ws['B1'] = "Data type"
ws['B2'] = "json array"
ws['C1'] = sheet_type + " class name"
ws['C2'] = d[0]
if sheet_type == "relationship":
ws['D1'] = "Number of relationship dimensions"
ws['D2'] = len(d[1])
title_rows = d[1] + ["json parameter"]
for c, val in enumerate(title_rows):
ws.cell(row=4 + c, column=1).value = val
start_row = 4 + len(title_rows)
for col, obj_list in enumerate(d[2]):
for obj_iter, obj in enumerate(obj_list[0]):
ws.cell(row=4 + obj_iter, column=2 + col).value = obj
for row_iter, json_val in enumerate(obj_list[1]):
ws.cell(row=start_row + row_iter, column=2 + col).value = json_val
[docs]def write_TimeSeries_to_xlsx(wb, data, sheet_type, data_type):
"""Writes spinedb_api TimeSeries data for object classes and relationship classes.
Writes one sheet per relationship/object class.
Args:
wb (openpyxl.Workbook): excel workbook to write too.
data (List[List]): List of lists containing json data give by function
get_unstacked_objects and get_unstacked_relationships
sheet_type (str): str with value "relationship" or "object" telling if data is for a relationship or object
"""
for i, d in enumerate(data):
if sheet_type == "relationship":
sheet_title = "ts_"
elif sheet_type == "object":
sheet_title = "ts_"
else:
raise ValueError("sheet_type must be a str with value 'relationship' or 'object'")
if data_type.lower() == "time series":
index_name = "timestamp"
elif data_type.lower() == "time pattern":
index_name = "pattern"
else:
raise ValueError("data_type must be a str with value 'time series' or 'time pattern'")
ws = wb.create_sheet()
# sheet name can only be 31 chars log
title = sheet_title + d[0]
if len(title) < 32:
ws.title = title
else:
ws.title = '{}_ts{}'.format(sheet_type, i)
ws['A1'] = "Sheet type"
ws['A2'] = sheet_type
ws['B1'] = "Data type"
ws['B2'] = data_type
ws['C1'] = sheet_type + " class name"
ws['C2'] = d[0]
if sheet_type == "relationship":
ws['D1'] = "Number of relationship dimensions"
ws['D2'] = len(d[1])
title_rows = d[1] + [index_name]
for c, val in enumerate(title_rows):
ws.cell(row=4 + c, column=1).value = val
# find common timestamps
unique_timestamps = np.unique(np.concatenate([v[1].indexes for v in d[2]]))
# write object names
start_row = 4 + len(title_rows)
for col, obj_list in enumerate(d[2]):
for obj_iter, obj in enumerate(obj_list[0]):
ws.cell(row=4 + obj_iter, column=2 + col).value = obj
# write timestamps
if data_type.lower() == "time series":
for row_iter, time in enumerate(unique_timestamps):
ws.cell(row=start_row + row_iter, column=1).value = str(np.datetime_as_string(time))
else:
for row_iter, time in enumerate(unique_timestamps):
ws.cell(row=start_row + row_iter, column=1).value = str(time)
# write values
for col, obj_list in enumerate(d[2]):
for row_index, value in zip(
np.where(np.isin(unique_timestamps, obj_list[1].indexes))[0], obj_list[1].values
):
ws.cell(row=start_row + row_index, column=2 + col).value = value
[docs]def write_objects_to_xlsx(wb, object_data):
"""Writes Classes, parameter and parameter values for objects.
Writes one sheet per relationship/object class.
Args:
wb (openpyxl.Workbook): excel workbook to write too.
object_data (List[List]): List of lists containing relationship data give by function get_unstacked_objects
"""
for i, obj in enumerate(object_data):
ws = wb.create_sheet()
# try setting the sheetname to object class name
# sheet name can only be 31 chars log
title = "obj_" + obj[0]
if len(title) < 32:
ws.title = title
else:
ws.title = "object_class{}".format(i)
ws['A1'] = "Sheet type"
ws['A2'] = "object"
ws['B1'] = "Data type"
ws['B2'] = "Parameter"
ws['C1'] = "object class name"
ws['C2'] = obj[0]
for c, val in enumerate(obj[2]):
ws.cell(row=4, column=c + 1).value = val
for c, val in enumerate(obj[3]):
ws.cell(row=4, column=len(obj[2]) + 1 + c).value = val
start_row = 5
start_col = 1
for r, line in enumerate(obj[1]):
for c, val in enumerate(line):
ws.cell(row=start_row + r, column=start_col + c).value = val
[docs]def export_spine_database_to_xlsx(db, filepath):
"""Writes all data in a spine database into an excel file.
Args:
db (spinedb_api.DatabaseMapping): database mapping for database.
filepath (str): str with filepath to save excel file to.
"""
obj_data, obj_json_data, obj_ts, obj_timepattern = get_unstacked_objects(db)
rel_data, rel_json_data, rel_ts, rel_timepattern = get_unstacked_relationships(db)
wb = Workbook()
write_relationships_to_xlsx(wb, rel_data)
write_objects_to_xlsx(wb, obj_data)
write_json_array_to_xlsx(wb, obj_json_data, "object")
write_json_array_to_xlsx(wb, rel_json_data, "relationship")
write_TimeSeries_to_xlsx(wb, obj_ts, "object", "time series")
write_TimeSeries_to_xlsx(wb, rel_ts, "relationship", "time series")
write_TimeSeries_to_xlsx(wb, obj_timepattern, "object", "time pattern")
write_TimeSeries_to_xlsx(wb, rel_timepattern, "relationship", "time pattern")
wb.save(filepath)
wb.close()
[docs]def read_spine_xlsx(filepath):
"""reads all data from a excel file where the sheets are in valid spine data format
Args:
filepath (str): str with filepath to excel file to read from.
"""
#
# read_only=true doesn't seem to close the file properly, possible solution if
# speed is needed is to do following:
# with open(xlsx_filename, "rb") as f:
# in_mem_file = io.BytesIO(f.read())
# wb = load_workbook(in_mem_file, read_only=True)
wb = load_workbook(filepath, read_only=False)
sheets = wb.sheetnames
ErrorLogMsg = namedtuple('ErrorLogMsg', ('msg', 'db_type', 'imported_from', 'other'))
obj_data = []
rel_data = []
obj_json_data = []
rel_json_data = []
error_log = []
# read all sheets
for s in sheets:
ws = wb[s]
# check if valid
if not validate_sheet(ws):
continue
sheet_type = ws['A2'].value.lower()
sheet_data = ws['B2'].value.lower()
if sheet_data == "parameter":
# read sheet with data type: 'parameter'
try:
data = read_parameter_sheet(ws)
if sheet_type == "relationship":
rel_data.append(data)
else:
obj_data.append(data)
except Exception as e:
error_log.append(ErrorLogMsg("Error reading sheet {}: {}".format(ws.title, e), "sheet", filepath, ''))
elif sheet_data == "json array":
# read sheet with data type: 'json array'
try:
data = read_json_sheet(ws, sheet_type)
if sheet_type == "relationship":
rel_json_data.append(data)
else:
obj_json_data.append(data)
except Exception as e:
error_log.append(ErrorLogMsg("Error reading sheet {}: {}".format(ws.title, e), "sheet", filepath, ''))
elif sheet_data in ("time series", "time pattern"):
# read sheet with data type: 'time series'
try:
data = read_TimeSeries_sheet(ws, sheet_type)
if sheet_type == "relationship":
rel_json_data.append(data)
else:
obj_json_data.append(data)
except Exception as e:
error_log.append(ErrorLogMsg("Error reading sheet {}: {}".format(ws.title, e), "sheet", filepath, ''))
wb.close()
# merge sheets that have the same class.
obj_data, el = merge_spine_xlsx_data(obj_data + obj_json_data)
error_log = error_log + el
rel_data, el = merge_spine_xlsx_data(rel_data + rel_json_data)
error_log = error_log + el
return obj_data, rel_data, error_log
[docs]def merge_spine_xlsx_data(data):
"""Merge data from different sheets with same object class or
relationship class.
Args:
data (List(SheetData)): list of SheetData
Returns:
(List[SheetData]): List of SheetData with only one relationship/object class per item
"""
error_log = []
new_data = []
data = sorted(data, key=lambda x: x.class_name)
for class_name, values in groupby(data, key=lambda x: x.class_name):
values = list(values)
if len(values) == 1:
# only one sheet
new_data.append(values[0])
continue
else:
# if more than one SheetData with same class_name
sheet_name = values[0].sheet_name
object_classes = values[0].object_classes
parameters = values[0].parameters
parameter_values = values[0].parameter_values
objects = values[0].objects
class_type = values[0].class_type
# skip first sheet
iter_values = iter(values)
next(iter_values)
for v in iter_values:
# make sure that the new sheet has same object_classes that first
if v.object_classes != object_classes:
error_log.append(
[
"sheet",
v.sheet_name,
"sheet {} as different "
"object_classes than sheet {} for class {}".format(v.sheet_name, sheet_name, class_name),
]
)
continue
parameters = parameters + v.parameters
objects = objects + v.objects
parameter_values = parameter_values + v.parameter_values
# make unique again
parameters = list(set(parameters))
if len(object_classes) > 1:
keyfunc = lambda x: [x[i] for i, _ in enumerate(object_classes)]
objects = sorted(objects, key=keyfunc)
objects = list(k for k, _ in groupby(objects, key=keyfunc))
else:
objects = list(set(objects))
new_data.append(
SheetData(
sheet_name=sheet_name,
class_name=class_name,
object_classes=object_classes,
parameters=parameters,
parameter_values=parameter_values,
objects=objects,
class_type=class_type,
)
)
return new_data, error_log
[docs]def validate_sheet(ws):
"""Checks if supplied sheet is a valid import sheet for spine.
Args:
ws (openpyxl.workbook.worksheet): worksheet to validate
Returns:
(bool): True if sheet is valid, False otherwise
"""
sheet_type = ws['A2'].value
sheet_data = ws['B2'].value
if not isinstance(sheet_type, str):
return False
if not isinstance(sheet_data, str):
return False
if sheet_type.lower() not in ["relationship", "object"]:
return False
if sheet_data.lower() not in ["parameter", "json array", "time series", "time pattern"]:
return False
if sheet_type.lower() == "relationship":
rel_dimension = ws['D2'].value
rel_name = ws['C2'].value
if not isinstance(rel_name, str):
return False
if not rel_name:
return False
if not isinstance(rel_dimension, int):
return False
if not rel_dimension > 1:
return False
if sheet_data.lower() == 'parameter':
rel_row = read_2d(ws, 4, 4, 1, rel_dimension)[0]
else:
rel_row = read_2d(ws, 4, 4 + rel_dimension - 1, 1, 1)
rel_row = [r[0] for r in rel_row]
if None in rel_row:
return False
if not all(isinstance(r, str) for r in rel_row):
return False
if not all(r for r in rel_row):
return False
elif sheet_type.lower() == "object":
obj_name = ws['C2'].value
if not isinstance(obj_name, str):
return False
if not obj_name:
return False
else:
return False
return True
[docs]def read_json_sheet(ws, sheet_type):
"""Reads a sheet containg json array data for objects and relationships
Args:
ws (openpyxl.workbook.worksheet): worksheet to read from
sheet_type (str): str with value "relationship" or "object" telling if sheet is a relationship or object sheet
Returns:
(List[SheetData])
"""
if sheet_type == "relationship":
dim = ws['D2'].value
else:
dim = 1
path = ["object" + str(i) for i in range(dim)]
class_name = ws['C2'].value
object_classes = []
for i in range(4, 4 + dim, 1):
object_classes.append(ws["A" + str(i)].value)
# search row for until first empty cell
add_if_not_break = 1
c = 0
for c, cell in enumerate(ws[4]):
if c > 0:
if cell.value is None:
add_if_not_break = 0
break
read_cols = range(1, c + add_if_not_break)
json_data = []
# red columnwise from second column.
rows = ws.iter_rows()
obj_path = []
parameters = []
for r, row in enumerate(rows):
if 2 < r < 3 + dim:
# get object path
obj_path.append([cell.value for i, cell in enumerate(row) if i in read_cols])
elif r == 3 + dim:
# get parameter name
parameters = [cell.value for i, cell in enumerate(row) if i in read_cols]
break
data = [[cell.value for i, cell in enumerate(row) if i in read_cols] for row in rows]
# pivot data
obj_path = [[obj_path[r][c] for r in range(len(obj_path))] for c in range(len(obj_path[0]))]
data = [[data[r][c] for r in range(len(data))] for c in range(len(data[0]))]
Data = namedtuple("Data", ["parameter_type"] + path + ["parameter", "value"])
if data:
for objects, parameter, data_list in zip(obj_path, parameters, data):
# save values if there is json data, a parameter name
# and the obj_path doesn't contain None.
packed_json = list(takewhile(lambda x: x is not None, data_list))
json_data.append(Data._make(["json"] + objects + [parameter, packed_json]))
return SheetData(
sheet_name=ws.title,
class_name=class_name,
object_classes=object_classes,
parameters=list(set(parameters)),
parameter_values=json_data,
objects=[],
class_type=sheet_type,
)
[docs]def read_TimeSeries_sheet(ws, sheet_type):
"""Reads a sheet containg json array data for objects and relationships
Args:
ws (openpyxl.workbook.worksheet): worksheet to read from
sheet_type (str): str with value "relationship" or "object" telling if sheet is a relationship or object sheet
Returns:
(List[SheetData])
"""
if sheet_type == "relationship":
dim = ws['D2'].value
else:
dim = 1
sheet_data = ws['B2'].value
path = ["object" + str(i) for i in range(dim)]
class_name = ws['C2'].value
object_classes = []
for i in range(4, 4 + dim, 1):
object_classes.append(ws["A" + str(i)].value)
# search row for until first empty cell
add_if_not_break = 1
c = 0
for c, cell in enumerate(ws[4]):
if c > 0:
if cell.value is None:
add_if_not_break = 0
break
read_cols = range(1, c + add_if_not_break)
read_to_col = c + add_if_not_break
json_data = []
# red columnwise from second column.
rows = ws.iter_rows()
obj_path = []
parameters = []
for r, row in enumerate(rows):
if 2 < r < 3 + dim:
# get object path
obj_path.append([cell.value for i, cell in enumerate(row) if i in read_cols])
elif r == 3 + dim:
# get parameter name
parameters = [cell.value for i, cell in enumerate(row) if i in read_cols]
break
# pivot object paths
obj_path = [[obj_path[r][c] for r in range(len(obj_path))] for c in range(len(obj_path[0]))]
# read data
objects_parameters = [(tuple(o), p) for o, p in zip(obj_path, parameters)]
values = {(o, p): {'indexes': [], 'values': []} for o, p in objects_parameters}
for row in rows:
timestamp = row[0].value
if timestamp is None:
break
for cell, obj_par in zip(islice(row, 1, read_to_col), objects_parameters):
cell_value = cell.value
if cell_value is None:
continue
values[obj_par]['indexes'].append(timestamp)
values[obj_par]['values'].append(cell_value)
Data = namedtuple("Data", ["parameter_type"] + path + ["parameter", "value"])
for (objects, parameter), data in values.items():
# save values if there is json data, a parameter name
# and the obj_path doesn't contain None.
if sheet_data.lower() == "time series":
timeseries = TimeSeriesVariableResolution(data['indexes'], data['values'], repeat=False, ignore_year=False)
elif sheet_data.lower() == "time pattern":
timeseries = TimePattern(data['indexes'], data['values'])
json_data.append(Data._make(["json"] + list(objects) + [parameter, timeseries]))
return SheetData(
sheet_name=ws.title,
class_name=class_name,
object_classes=object_classes,
parameters=list(set(parameters)),
parameter_values=json_data,
objects=[],
class_type=sheet_type,
)
[docs]def read_parameter_sheet(ws):
"""Reads a sheet containg parameter data for objects and relationships
Args:
ws (openpyxl.workbook.worksheet): worksheet to read from
Returns:
(List[SheetData])
"""
sheet_type = ws['A2'].value.lower()
class_name = ws['C2'].value
if sheet_type == "object":
dim = 1
elif sheet_type == "relationship":
dim = ws['D2'].value
else:
raise ValueError("sheet_type must be a str with value 'relationship' or 'object'")
# object classes
object_classes = read_2d(ws, 4, 4, 1, dim)[0]
# read all columns to the right of the number of cells in dim. Read until
# encounters a empty cell.
parameters = []
read_cols = []
c = 0
for c, cell in enumerate(ws[4]):
if cell.value is None:
break
elif c >= dim:
parameters.append(cell.value)
read_cols = range(0, c + 1)
# get data
rows = islice(ws.iter_rows(), 4, None)
try:
data = [[cell.value for i, cell in enumerate(row) if i in read_cols] for row in rows]
except StopIteration:
data = []
keyfunc = lambda x: [x[i] for i, _ in enumerate(object_classes)]
# remove data where not all dimensions exists
data = [d for d in data if not None in keyfunc(d)]
data_parameter = []
if parameters:
# add that parameter type type should be "value"
data = [["value"] + d for d in data]
keyfunc = lambda x: [x[i + 1] for i, _ in enumerate(object_classes)]
key_cols = list(range(0, dim + 1, 1))
val_cols = list(range(dim + 1, dim + len(parameters) + 1))
headers = ["parameter_type"] + ["object" + str(x) for x in range(dim)] + parameters
data_parameter = stack_list_of_tuples(data, headers, key_cols, val_cols)
data_parameter = [d for d in data_parameter if d.value is not None]
# find unique relationships from data
data = sorted(data, key=keyfunc)
objects = list(k for k, _ in groupby(data, key=keyfunc))
if dim == 1:
# flatten list if only one object per row
objects = [item for sublist in objects for item in sublist]
return SheetData(
sheet_name=ws.title,
class_name=class_name,
object_classes=object_classes,
parameters=parameters,
parameter_values=data_parameter,
objects=objects,
class_type=sheet_type,
)
[docs]def read_2d(ws, start_row=1, end_row=1, start_col=1, end_col=1):
"""Reads a 2d area from worksheet into a list of lists where each line is
the inner list.
Args:
ws (openpyxl.workbook.worksheet): Worksheet to look in
start_row (Integer): start row to read, 1-indexed (as excel)
end_row (Integer): row to read to, 1-indexed (as excel)
start_col (Integer): start column to read, 1-indexed (as excel)
end_col (Integer): end column to read to, 1-indexed (as excel)
Returns:
(List) List of all lines read.
"""
end_col = get_column_letter(end_col)
start_col = get_column_letter(start_col)
xl_index = '{}{}:{}{}'.format(start_col, start_row, end_col, end_row)
values = [[inner.value for inner in outer] for outer in ws[xl_index]]
return values
[docs]def max_col_in_row(ws, row=1):
"""Finds max col index for given row. If no data exists on row, returns 1.
Args:
ws (openpyxl.workbook.worksheet): Worksheet to look in
row (Integer): index for row to search, 1-indexed (as excel)
Returns:
(Integer) column index of last cell with value.
"""
# TODO: Does this work if ws[row] is empty? In that case 'cell' is not initialized.
for cell in reversed(ws[row]):
if cell.value is not None:
break
return cell.col_idx