Source code for spine_io.importers.csv_reader

######################################################################################################################
# Copyright (C) 2017 - 2019 Spine project consortium
# This file is part of Spine Toolbox.
# Spine Toolbox is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option)
# any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
# Public License for more details. You should have received a copy of the GNU Lesser General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
######################################################################################################################

"""
Contains CSVConnector class and a help function.

:author: P. Vennström (VTT)
:date:   1.6.2019
"""


import csv
from itertools import islice
from PySide2.QtWidgets import QFileDialog
from spine_io.io_api import SourceConnection


[docs]def select_csv_file(parent=None): """ Launches QFileDialog with no filter """ return QFileDialog.getOpenFileName(parent, "", "*.*")
[docs]class CSVConnector(SourceConnection): """ Template class to read data from another QThread """ # name of data source, ex: "Text/CSV"
[docs] DISPLAY_NAME = "Text/CSV"
# dict with option specification for source.
[docs] OPTIONS = { "delimiter": {'type': str, 'label': 'Delimiter', 'MaxLength': 1, 'default': ','}, "quotechar": {'type': str, 'label': 'Quotechar', 'MaxLength': 1, 'default': ''}, "has_header": {'type': bool, 'label': 'Has header', 'default': False}, "skip": {'type': int, 'label': 'Skip rows', 'Minimum': 0, 'default': 0},
} # Modal widget that returns source object and action (OK, CANCEL)
[docs] SELECT_SOURCE_UI = select_csv_file
def __init__(self): super(CSVConnector, self).__init__() self._filename = None
[docs] def connect_to_source(self, source): """saves filepath Arguments: source {str} -- filepath """ self._filename = source
[docs] def disconnect(self):
"""Disconnect from connected source. """
[docs] def get_tables(self): """Method that should return a list of table names, list(str) Raises: NotImplementedError: [description] """ tables = [self._filename] return tables
@staticmethod
[docs] def parse_options(options): """Parses options dict to dialect and quotechar options for csv.reader Arguments: options {dict} -- dict with options: "delimiter": file delimiter "quotechar": file quotechar "has_header": if first row should be treated as a header "skip": how many rows should be skipped Returns: tuple(dict, bool, integer) -- tuple dialect for csv.reader, quotechar for csv.reader and number of rows to skip """ delimiter = options.get("delimiter", ",") if not delimiter: delimiter = ',' dialect = {"delimiter": delimiter} quotechar = options.get("Quotechar", None) if quotechar: dialect.update({"quotechar": quotechar}) has_header = options.get("has_header", False) skip = options.get("skip", 0) return dialect, has_header, skip
[docs] def file_iterator(self, options, max_rows): """creates an iterator that reads max_rows number of rows from text file Arguments: options {dict} -- dict with options: max_rows {integer} -- max number of rows to read, if -1 then read all rows Returns: iterator -- iterator of csv file """ if not self._filename: return [] dialect, _has_header, skip = self.parse_options(options) if max_rows == -1: max_rows = None else: max_rows += skip with open(self._filename) as text_file: csv_reader = csv.reader(text_file, **dialect) csv_reader = islice(csv_reader, skip, max_rows) yield from csv_reader
[docs] def get_data_iterator(self, table, options, max_rows=-1): """Creates a iterator for the file in self.filename Arguments: table {string} -- ignored, used in abstract IOWorker class options {dict} -- dict with options Keyword Arguments: max_rows {int} -- how many rows of data to read, if -1 read all rows (default: {-1}) Returns: [type] -- [description] """ csv_iter = self.file_iterator(options, max_rows) # Iterate once to get num_cols try: first_row = next(csv_iter) except StopIteration: return iter([]), [], 0 num_cols = len(first_row) _dialect, has_header, _skip = self.parse_options(options) if has_header: # Very good, we already have the first row header = first_row else: header = [] # reset iterator csv_iter = self.file_iterator(options, max_rows) return csv_iter, header, num_cols