import pathlib import tomli from PyQt6.QtCore import QRunnable, QThread, pyqtSignal, pyqtSlot, QObject import ctypes import uldaq import numpy as np from IPython import embed import traceback, sys from pyrelacs.util.logging import config_logging log = config_logging() class WorkerSignals(QObject): """ Defines the signals available from a running worker thread. Supported signals are: finished No data error tuple (exctype, value, traceback.format_exc() ) result object data returned from processing, anything progress int indicating % progress """ finished = pyqtSignal() error = pyqtSignal(tuple) result = pyqtSignal(object) progress = pyqtSignal(int) class Worker(QRunnable): """ Worker thread Inherits from QRunnable to handler worker thread setup, signals and wrap-up. :param callback: The function callback to run on this worker thread. Supplied args and kwargs will be passed through to the runner. :type callback: function :param args: Arguments to pass to the callback function :param kwargs: Keywords to pass to the callback function """ def __init__(self, fn, *args, **kwargs): super(Worker, self).__init__() # Store constructor arguments (re-used for processing) self.fn = fn self.args = args self.kwargs = kwargs self.signals = WorkerSignals() # Add the callback to our kwargs self.kwargs["progress_callback"] = self.signals.progress @pyqtSlot() def run(self): """ Initialise the runner function with passed args, kwargs. """ # Retrieve args/kwargs here; and fire processing using them try: result = self.fn(*self.args, **self.kwargs) except: traceback.print_exc() exctype, value = sys.exc_info()[:2] self.signals.error.emit((exctype, value, traceback.format_exc())) else: self.signals.result.emit(result) # Return the result of the processing finally: self.signals.finished.emit() # Done