Source code for spinetoolbox.custom_file_system_watcher

######################################################################################################################
# Copyright (C) 2017-2021 Spine project consortium
# This file is part of Spine Toolbox.
# Spine Toolbox is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option)
# any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
# Public License for more details. You should have received a copy of the GNU Lesser General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
######################################################################################################################

"""
Contains CustomFileSystemWatcher.

:author: M. Marin (KTH)
:date:   12.11.2020
"""

import os
from PySide2.QtCore import QFileSystemWatcher, Signal, Slot


[docs]class CustomFileSystemWatcher(QFileSystemWatcher): """A file system watcher that keeps track of renamed files. """
[docs] file_renamed = Signal(str, str)
[docs] file_removed = Signal(str)
[docs] file_added = Signal(str)
def __init__(self, parent=None): super().__init__(parent) self._watched_files = {} self._watched_dirs = set() self._directory_snapshots = {} self.directoryChanged.connect(self._handle_dir_changed) @Slot(str)
[docs] def _handle_dir_changed(self, dirname): is_watched_dir = dirname in self._watched_dirs watched_files = self._watched_files.get(dirname, set()) if not is_watched_dir and not watched_files: return snapshot = self._directory_snapshots[dirname] new_snapshot = self._directory_snapshots[dirname] = self._take_snapshot(dirname) removed_filepath = next(iter(snapshot - new_snapshot), None) if not is_watched_dir and removed_filepath not in watched_files: return added_filepath = next(iter(new_snapshot - snapshot), None) try: watched_files.remove(removed_filepath) watched_files.add(added_filepath) except KeyError: pass if removed_filepath and added_filepath: self.file_renamed.emit(removed_filepath, added_filepath) return if removed_filepath: self.file_removed.emit(removed_filepath) return if is_watched_dir: self.file_added.emit(added_filepath)
[docs] def add_persistent_file_path(self, path): if not os.path.isfile(path): return False dirname = os.path.dirname(path) self._watched_files.setdefault(dirname, set()).add(path) self._directory_snapshots.setdefault(dirname, self._take_snapshot(dirname)) return self.addPath(dirname)
[docs] def add_persistent_file_paths(self, paths): added_paths = [] for path in paths: if self.add_persistent_file_path(path): added_paths.append(path) return added_paths
[docs] def remove_persistent_file_path(self, path): if not os.path.isfile(path): return False dirname = os.path.dirname(path) watched_files = self._watched_files.get(dirname) if not watched_files: return watched_files.discard(path) if watched_files: return del self._watched_files[dirname] if path in self._watched_dirs: return del self._directory_snapshots[dirname] return self.removePath(path)
[docs] def remove_persistent_file_paths(self, paths): removed_paths = [] for path in paths: if self.remove_persistent_file_path(path): removed_paths.append(path) return removed_paths
[docs] def add_persistent_dir_path(self, path): if not os.path.isdir(path): return False self._watched_dirs.add(path) self._directory_snapshots.setdefault(path, self._take_snapshot(path)) return self.addPath(path)
[docs] def remove_persistent_dir_path(self, path): self._watched_dirs.discard(path) if path in self._watched_files: return del self._directory_snapshots[path] return self.removePath(path)
[docs] def tear_down(self): directories = self.directories() if directories: self.removePaths(directories) self.deleteLater()
[docs] def _take_snapshot(self, dirname): return set(self._absfilepaths(dirname))
@staticmethod
[docs] def _absfilepaths(dirname): with os.scandir(dirname) as scan_iterator: for entry in scan_iterator: if entry.is_file(): yield entry.path