from PyQt6.QtGui import QAction import sys import pathlib import ctypes from PyQt6.QtCore import QProcess, QSize, QThreadPool, Qt from PyQt6.QtWidgets import ( QApplication, QGridLayout, QPushButton, QToolBar, QWidget, QMainWindow, QPlainTextEdit, ) import tomli import uldaq from IPython import embed import numpy as np from pyrelacs.util.logging import config_logging log = config_logging() class PyRelacs(QMainWindow): def __init__(self): super().__init__() self.setWindowTitle("PyRelacs") self.setMinimumSize(1000, 1000) self.threadpool = QThreadPool() # for starting a Qprocess self.p = None self.daq_connect_button = QPushButton("Connect Daq") self.daq_connect_button.setCheckable(True) self.daq_connect_button.clicked.connect(self.connect_dac) self.daq_disconnect_button = QPushButton("Disconnect Daq") self.daq_disconnect_button.setCheckable(True) self.daq_disconnect_button.clicked.connect(self.disconnect_dac) self.text = QPlainTextEdit() self.text.setReadOnly(True) layout = QGridLayout() layout.addWidget(self.daq_connect_button, 0, 0) layout.addWidget(self.daq_disconnect_button, 0, 1) layout.addWidget(self.text, 2, 0, 1, 2) self.toolbar = QToolBar("Repros") self.addToolBar(self.toolbar) self.repro() self.setFixedSize(QSize(400, 300)) widget = QWidget() widget.setLayout(layout) self.setCentralWidget(widget) def connect_dac(self): devices = uldaq.get_daq_device_inventory(uldaq.InterfaceType.USB) try: self.daq_device = uldaq.DaqDevice(devices[0]) log.debug(f"Found daq devices {len(devices)}, connecting to the first one") self.daq_device.connect() log.debug("Connected") except IndexError: log.debug("DAQ is not connected, closing") QApplication.quit() self.daq_connect_button.setDisabled(True) def disconnect_dac(self): try: log.debug(f"{self.daq_device}") self.daq_device.disconnect() self.daq_device.release() log.debug(f"{self.daq_device}") self.daq_disconnect_button.setDisabled(True) self.daq_connect_button.setEnabled(True) except AttributeError: log.debug("DAQ was not connected") def repro(self): repos_path = pathlib.Path(__file__).parent / "repros" repos_names = list(repos_path.glob("*.py")) # exclude the repos.py file repos_names = [ f.with_suffix("").name for f in repos_names if not f.name == "repos.py" ] for rep in repos_names: individual_repro_button = QAction(rep, self) individual_repro_button.setStatusTip("Button") individual_repro_button.triggered.connect( lambda checked, n=rep: self.run_repro(n) ) self.toolbar.addAction(individual_repro_button) def message(self, s): self.text.appendPlainText(s) def run_repro(self, name_of_repo): if self.p is None: self.message(f"Executing process {name_of_repo}") self.p = QProcess() self.p.setWorkingDirectory(str(pathlib.Path(__file__).parent / "repros/")) # log.debug(pathlib.Path(__file__).parent / "repos") self.p.readyReadStandardOutput.connect(self.handle_stdout) self.p.readyReadStandardError.connect(self.handle_stderr) self.p.stateChanged.connect(self.handle_state) self.p.finished.connect(self.process_finished) self.p.start("python3", [f"{name_of_repo}" + ".py"]) def handle_stderr(self): if self.p is not None: data = self.p.readAllStandardError() stderr = bytes(data).decode("utf8") self.message(stderr) def handle_stdout(self): if self.p is not None: data = self.p.readAllStandardOutput() stdout = bytes(data).decode("utf8") self.message(stdout) def handle_state(self, state): states = { QProcess.ProcessState.NotRunning: "Not running", QProcess.ProcessState.Starting: "Starting", QProcess.ProcessState.Running: "Running", } state_name = states[state] self.message(f"State changed: {state_name}") def process_finished(self): self.text.appendPlainText("Process finished") self.p = None def main(): app = QApplication(sys.argv) window = PyRelacs() window.show() app.exec() if __name__ == "__main__": main()