from pathlib import Path as path from PyQt6.QtGui import QAction, QIcon, QKeySequence from PyQt6.QtCore import Qt, QSize, QThreadPool, QTimer from PyQt6.QtWidgets import ( QGridLayout, QPushButton, QToolBar, QWidget, QMainWindow, QPlainTextEdit, QMenuBar, QStatusBar, ) import uldaq import nixio as nix import pyqtgraph as pg import numpy as np from pyrelacs.dataio.daq_producer import DaqProducer from pyrelacs.worker import Worker from pyrelacs.repros.repros import Repro from pyrelacs.dataio.circbuffer import CircBuffer from pyrelacs.ui.about import AboutDialog from pyrelacs.ui.plots.calibration import CalibrationPlot from pyrelacs.util.logging import config_logging log = config_logging() _root = path(__file__).parent.parent from IPython import embed class PyRelacs(QMainWindow): def __init__(self): super().__init__() self.setToolButtonStyle( Qt.ToolButtonStyle.ToolButtonTextBesideIcon ) # Ensure icons are displayed with text self.setWindowTitle("PyRelacs") self.figure = pg.GraphicsLayoutWidget() filename = path.joinpath(path.cwd(), "data.nix") if filename.exists(): self.nix_file = nix.File.open(str(filename), nix.FileMode.ReadOnly) filename = path.joinpath(path.cwd(), "calibration.nix") self.nix_calibration = nix.File.open(str(filename), nix.FileMode.Overwrite) self.calibration_plot = CalibrationPlot(self.figure, self.nix_file) self.threadpool = QThreadPool() self.repros = Repro() self.text = QPlainTextEdit() self.text.setReadOnly(True) self.setMenuBar(QMenuBar(self)) self.setStatusBar(QStatusBar(self)) self.create_actions() self.create_buttons() self.create_toolbars() layout = QGridLayout() layout.addWidget(self.figure, 0, 0, 2, 2) layout.addWidget(self.text, 2, 0, 1, 2) widget = QWidget() widget.setLayout(layout) self.setCentralWidget(widget) self.buffer = CircBuffer(size=500) self.connect_dac() self.daq_producer = DaqProducer(self.buffer, self.daq_device) self.plot_daq() def create_actions(self): self._rlx_exitaction = QAction(QIcon(":/icons/exit.png"), "Exit", self) self._rlx_exitaction.setStatusTip("Close relacs") self._rlx_exitaction.setShortcut(QKeySequence("Alt+q")) self._rlx_exitaction.triggered.connect(self.on_exit) self._rlx_aboutaction = QAction("about") self._rlx_aboutaction.setStatusTip("Show about dialog") self._rlx_aboutaction.setEnabled(True) self._rlx_aboutaction.triggered.connect(self.on_about) self._daq_connectaction = QAction( QIcon(":icons/connect.png"), "Connect DAQ", self ) self._daq_connectaction.setStatusTip("Connect to daq device") # self._daq_connectaction.setShortcut(QKeySequence("Alt+d")) self._daq_connectaction.triggered.connect(self.connect_dac) self._daq_disconnectaction = QAction( QIcon(":/icons/disconnect.png"), "Disconnect DAQ", self ) self._daq_disconnectaction.setStatusTip("Disconnect the DAQ device") # self._daq_connectaction.setShortcut(QKeySequence("Alt+d")) self._daq_disconnectaction.triggered.connect(self.disconnect_dac) self._daq_calibaction = QAction( QIcon(":/icons/calibration.png"), "Plot calibration", self ) self._daq_calibaction.setStatusTip("Calibrate the attenuator device") # self._daq_calibaction.setShortcut(QKeySequence("Alt+d")) self._daq_calibaction.triggered.connect(self.calibration_plot.plot) self._run_action = QAction(QIcon(":/icons/calibration.png"), "Run", self) self._run_action.triggered.connect(self.run_daq) self.create_menu() def create_menu(self): menu = self.menuBar() if menu is not None: file_menu = menu.addMenu("&File") device_menu = menu.addMenu("&DAQ") help_menu = menu.addMenu("&Help") if file_menu is not None: file_menu.addAction(self._rlx_exitaction) file_menu.addAction(self._rlx_aboutaction) if device_menu is not None: device_menu.addAction(self._daq_connectaction) device_menu.addAction(self._daq_disconnectaction) device_menu.addSeparator() device_menu.addAction(self._daq_calibaction) device_menu.addAction(self._run_action) if help_menu is not None: help_menu.addSeparator() # help_menu.addAction(self._help_action) else: log.error("could not create file menu and device menu") self.on_exit() self.setMenuBar(menu) def create_toolbars(self): rlx_toolbar = QToolBar("Relacs") rlx_toolbar.addAction(self._rlx_exitaction) rlx_toolbar.setIconSize(QSize(24, 24)) self.addToolBar(Qt.ToolBarArea.TopToolBarArea, rlx_toolbar) daq_toolbar = QToolBar("DAQ") daq_toolbar.addAction(self._daq_connectaction) daq_toolbar.addAction(self._daq_disconnectaction) daq_toolbar.addAction(self._daq_calibaction) daq_toolbar.addAction(self._run_action) self.addToolBar(Qt.ToolBarArea.TopToolBarArea, daq_toolbar) repro_toolbar = QToolBar("Repros") repro_names, file_names = self.repros.names_of_repros() for rep, fn in zip(repro_names, file_names): repro_action = QAction(rep, self) repro_action.setStatusTip(rep) repro_action.triggered.connect( lambda checked, n=rep, f=fn: self.run_repro(n, f) ) repro_toolbar.addAction(repro_action) self.addToolBar(Qt.ToolBarArea.BottomToolBarArea, repro_toolbar) def create_buttons(self): self.daq_connect_button = QPushButton("Connect Daq") self.daq_connect_button.setCheckable(True) self.daq_connect_button.clicked.connect(self.connect_dac) self.daq_disconnect_button = QPushButton("Disconnect Daq") self.daq_disconnect_button.setCheckable(True) self.daq_disconnect_button.clicked.connect(self.disconnect_dac) self.plot_calibration_button = QPushButton("Plot Calibration") self.plot_calibration_button.setCheckable(True) self.plot_calibration_button.clicked.connect(self.calibration_plot.plot) self.daq_run_button = QPushButton("Run") self.daq_run_button.setCheckable(True) self.daq_run_button.clicked.connect(self.run_daq) def plot_daq(self): self.figure.setBackground("w") self.daq_plot = self.figure.addPlot(row=0, col=0) self.time = list(range(10)) pen = pg.mkPen("red") self.data = list(range(10)) self.line = self.daq_plot.plot(self.time, self.data, pen=pen) self.timer = QTimer() self.timer.setInterval(50) self.timer.timeout.connect(self.update_plot) self.timer.start() # self.update_plot() def update_plot(self): self.time = self.time[1:] self.time.append(self.time[-1] + 1) self.data = self.data[1:] try: item = self.buffer.get(self.buffer.write_index() - 1) except IndexError: item = 0 self.data.append(item) self.line.setData(self.time, self.data) def run_daq(self): read_daq = Worker(self.daq_producer.read_analog_continously) read_daq.signals.result.connect(self.print_output) read_daq.signals.finished.connect(self.thread_complete) read_daq.signals.progress.connect(self.progress_fn) self.threadpool.start(read_daq) def connect_dac(self): devices = uldaq.get_daq_device_inventory(uldaq.InterfaceType.USB) try: self.daq_device = uldaq.DaqDevice(devices[0]) log.debug(f"Found daq devices {len(devices)}, connecting to the first one") self.daq_device.connect() log.debug("connected") except IndexError: log.error("DAQ is not connected") log.error("Please connect a DAQ device to the system") if hasattr(PyRelacs, "daq_device"): try: self.daq_device.connect() log.debug("Connected") except uldaq.ul_exception.ULException as e: log.error(f"Could not Connect to DAQ: {e}") self.daq_connect_button.setDisabled(True) def disconnect_dac(self): try: log.debug(f"{self.daq_device}") self.daq_device.disconnect() self.daq_device.release() log.debug(f"{self.daq_device}") self.daq_disconnect_button.setDisabled(True) self.daq_connect_button.setEnabled(True) except AttributeError: log.debug("DAQ was not connected") def run_repro(self, n, fn): self.text.appendPlainText(f"started Repro {n}, {fn}") worker = Worker(self.repros.run_repro, self.nix_calibration, n, fn) worker.signals.result.connect(self.print_output) worker.signals.finished.connect(self.thread_complete) worker.signals.progress.connect(self.progress_fn) self.threadpool.start(worker) def add_to_textfield(self, s: str): self.text.appendPlainText(s) def on_exit(self): log.info("exit button!") self.add_to_textfield("exiting") self.close() def on_about(self, e): about = AboutDialog(self) about.show() def print_output(self, s): log.info(s) self.add_to_textfield(s) def thread_complete(self): log.info("Thread complete!") self.add_to_textfield("Thread complete!") def progress_fn(self, n): print("%d%% done" % n)