######################################################################################################################
# Copyright (C) 2017-2022 Spine project consortium
# Copyright Spine Toolbox contributors
# This file is part of Spine Toolbox.
# Spine Toolbox is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option)
# any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
# Public License for more details. You should have received a copy of the GNU Lesser General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
######################################################################################################################
"""Qt-based thread pool executor."""
import os
from PySide6.QtCore import QMutex, QSemaphore, QThread
[docs]class TimeOutError(Exception):
"""An exception to raise when a timeouts expire"""
[docs]class _CustomQSemaphore(QSemaphore):
[docs] def tryAcquire(self, n, timeout=None):
if timeout is None:
timeout = -1
timeout *= 1000
return super().tryAcquire(n, timeout)
[docs]class QtBasedQueue:
"""A Qt-based clone of queue.Queue."""
def __init__(self):
self._items = []
self._mutex = QMutex()
self._semafore = _CustomQSemaphore()
[docs] def put(self, item):
self._mutex.lock()
self._items.append(item)
self._mutex.unlock()
self._semafore.release()
[docs] def get(self, timeout=None):
if not self._semafore.tryAcquire(1, timeout):
raise TimeOutError()
self._mutex.lock()
item = self._items.pop(0)
self._mutex.unlock()
return item
[docs]class QtBasedFuture:
"""A Qt-based clone of concurrent.futures.Future."""
def __init__(self):
self._semafore = _CustomQSemaphore()
self._done = False
self._result = None
self._exception = None
self._done_callbacks = []
[docs] def set_result(self, result):
self._result = result
self._done = True
self._semafore.release()
for callback in self._done_callbacks:
callback(self)
self._done_callbacks = []
[docs] def set_exception(self, exc):
self._exception = exc
self._done = True
self._semafore.release()
[docs] def result(self, timeout=None):
if not self._done and not self._semafore.tryAcquire(1, timeout):
raise TimeOutError()
if self._exception is not None:
raise self._exception
return self._result
[docs] def exception(self, timeout=None):
if not self._done and not self._semafore.tryAcquire(1, timeout):
raise TimeOutError()
return self._exception
[docs] def add_done_callback(self, callback):
if self._done:
callback(self)
return
self._done_callbacks.append(callback)
[docs]class QtBasedThread(QThread):
"""A Qt-based clone of threading.Thread."""
def __init__(self, target=None, args=()):
super().__init__()
self._target = target
self._args = args
[docs] def run(self):
return self._target(*self._args)
[docs]class QtBasedThreadPoolExecutor:
"""A Qt-based clone of concurrent.futures.ThreadPoolExecutor"""
def __init__(self, max_workers=None):
if max_workers is None:
max_workers = min(32, os.cpu_count() + 4)
self._max_workers = max_workers
self._threads = set()
self._requests = QtBasedQueue()
self._semafore = QSemaphore()
self._shutdown = False
[docs] def submit(self, fn, *args, **kwargs):
future = QtBasedFuture()
self._requests.put((future, fn, args, kwargs))
self._spawn_thread()
return future
[docs] def _spawn_thread(self):
if self._semafore.tryAcquire():
# No need to spawn a new thread
return
if len(self._threads) == self._max_workers:
# Not possible to spawn a new thread
return
thread = QtBasedThread(target=self._do_work)
self._threads.add(thread)
thread.start()
[docs] def _do_work(self):
while True:
request = self._requests.get()
if self._shutdown:
break
future, fn, args, kwargs = request
_set_future_result_and_exc(future, fn, *args, **kwargs)
self._semafore.release()
[docs] def shutdown(self):
self._shutdown = True
for _ in self._threads:
self._requests.put(None)
while self._threads:
thread = self._threads.pop()
thread.wait()
thread.deleteLater()
[docs]class SynchronousExecutor:
[docs] def submit(self, fn, *args, **kwargs):
future = QtBasedFuture()
_set_future_result_and_exc(future, fn, *args, **kwargs)
return future
[docs] def shutdown(self):
pass
[docs]def _set_future_result_and_exc(future, fn, *args, **kwargs):
try:
result = fn(*args, **kwargs)
future.set_result(result)
except Exception as exc: # pylint: disable=broad-except
future.set_exception(exc)