import time from pathlib import Path as path from PyQt6.QtGui import QAction, QIcon, QKeySequence from PyQt6.QtCore import Qt, QSize, QThreadPool, QMutex, QTimer from PyQt6.QtWidgets import ( QGridLayout, QToolBar, QWidget, QMainWindow, QPlainTextEdit, QMenuBar, QStatusBar, ) import uldaq import nixio as nix import pyqtgraph as pg import numpy as np import quantities as pq from pyrelacs.dataio.daq_producer import DaqProducer from pyrelacs.dataio.nix_writer import NixWriter from pyrelacs.dataio.sin_producer import SinProducer from pyrelacs.worker import Worker from pyrelacs.repros.repros import Repro from pyrelacs.dataio.circbuffer import CircBuffer from pyrelacs.ui.about import AboutDialog from pyrelacs.ui.plots.calibration import CalibrationPlot from pyrelacs.ui.plots.continously import Continously from pyrelacs.util.logging import config_logging log = config_logging() _root = path(__file__).parent.parent from IPython import embed class PyRelacs(QMainWindow): def __init__(self, config): super().__init__() self.config = config self.setToolButtonStyle( Qt.ToolButtonStyle.ToolButtonTextBesideIcon ) # Ensure icons are displayed with text self.setWindowTitle("PyRelacs") self.mutex = QMutex() self.timer = QTimer(self) self.timer.setInterval(200) self.figure = pg.GraphicsLayoutWidget() # filename = path.joinpath(path.cwd(), "data.nix") # if filename.exists(): # self.nix_file = nix.File.open(str(filename), nix.FileMode.ReadOnly) # filename = path.joinpath(path.cwd(), "calibration.nix") # self.nix_file = nix.File.open(str(filename), nix.FileMode.Overwrite) # # self.calibration_plot = CalibrationPlot(self.figure, self.nix_file) # self.nix_file.close() self.threadpool = QThreadPool() self.repros = Repro() self.text = QPlainTextEdit() self.text.setReadOnly(True) self.setMenuBar(QMenuBar(self)) self.setStatusBar(QStatusBar(self)) self.create_actions() self.create_toolbars() layout = QGridLayout() layout.addWidget(self.figure, 0, 0, 2, 2) layout.addWidget(self.text, 2, 0, 1, 2) widget = QWidget() widget.setLayout(layout) self.setCentralWidget(widget) SAMPLERATE = pq.Quantity( self.config.pyrelacs.data.input.inputsamplerate, self.config.pyrelacs.data.input.inputsamplerateunit, ) INPUTTRACECAPACITY = pq.Quantity( self.config.pyrelacs.data.input.inputtracecapacity, self.config.pyrelacs.data.input.inputtracecapacityunit, ) start = time.time() BUFFERSIZE = (SAMPLERATE * INPUTTRACECAPACITY).simplified end = time.time() log.debug(f"Buffer allocation took {end - start}") self.buffer = CircBuffer( size=BUFFERSIZE, samplerate=SAMPLERATE, mutex=self.mutex ) self.continously_plot = Continously(self.figure, self.buffer) self.continously_plot.plot() start = time.time() self.connect_dac() end = time.time() log.debug(f"Connection to DAQ took {end - start}") if hasattr(PyRelacs, "daq_device"): log.debug("Creating Daq Generator") self.daq_producer = DaqProducer(self.buffer, self.daq_device, [1, 1]) else: log.debug("Creating Sinus Generator") self.sinus_producer = SinProducer(self.buffer) self.nix_writer = NixWriter(self.buffer) def create_actions(self): self._rlx_exitaction = QAction(QIcon(":/icons/exit.png"), "Exit", self) self._rlx_exitaction.setStatusTip("Close relacs") self._rlx_exitaction.setShortcut(QKeySequence("Alt+q")) self._rlx_exitaction.triggered.connect(self.on_exit) self._rlx_aboutaction = QAction("about") self._rlx_aboutaction.setStatusTip("Show about dialog") self._rlx_aboutaction.setEnabled(True) self._rlx_aboutaction.triggered.connect(self.on_about) self._daq_connectaction = QAction( QIcon(":icons/connect.png"), "Connect DAQ", self ) self._daq_connectaction.setStatusTip("Connect to daq device") # self._daq_connectaction.setShortcut(QKeySequence("Alt+d")) self._daq_connectaction.triggered.connect(self.connect_dac) self._daq_disconnectaction = QAction( QIcon(":/icons/disconnect.png"), "Disconnect DAQ", self ) self._daq_disconnectaction.setStatusTip("Disconnect the DAQ device") # self._daq_connectaction.setShortcut(QKeySequence("Alt+d")) self._daq_disconnectaction.triggered.connect(self.disconnect_dac) # self._daq_calibaction = QAction( # QIcon(":/icons/calibration.png"), "Plot calibration", self # ) # self._daq_calibaction.setStatusTip("Calibrate the attenuator device") # # self._daq_calibaction.setShortcut(QKeySequence("Alt+d")) # self._daq_calibaction.triggered.connect(self.calibration_plot.plot) self._run_action = QAction(QIcon(":/icons/record.png"), "Run", self) self._run_action.triggered.connect(self.run_daq) self._run_sinus_action = QAction(QIcon(":/icons/record.png"), "Sinus", self) self._run_sinus_action.triggered.connect(self.run_sinus) self._stop_recording = QAction(QIcon(":/icons/stop.png"), "Stop", self) self._stop_recording.triggered.connect(self.stop_recording) self._recenter_plot = QAction("Recenter", self) self._recenter_plot.triggered.connect(self.recenter_continously_plot) self._recenter_plot.setShortcut(QKeySequence("Alt+r")) self._record = QAction(QIcon(":/icons/record.png"), "Record", self) self._record.triggered.connect(self.record) self.create_menu() def create_menu(self): menu = self.menuBar() if menu is not None: file_menu = menu.addMenu("&File") device_menu = menu.addMenu("&DAQ") help_menu = menu.addMenu("&Help") if file_menu is not None: file_menu.addAction(self._rlx_exitaction) file_menu.addAction(self._rlx_aboutaction) if device_menu is not None: device_menu.addAction(self._daq_connectaction) device_menu.addAction(self._daq_disconnectaction) device_menu.addSeparator() # device_menu.addAction(self._daq_calibaction) device_menu.addAction(self._run_action) if help_menu is not None: help_menu.addSeparator() # help_menu.addAction(self._help_action) else: log.error("could not create file menu and device menu") self.on_exit() self.setMenuBar(menu) def create_toolbars(self): rlx_toolbar = QToolBar("Relacs") rlx_toolbar.addAction(self._rlx_exitaction) rlx_toolbar.setIconSize(QSize(24, 24)) self.addToolBar(Qt.ToolBarArea.TopToolBarArea, rlx_toolbar) daq_toolbar = QToolBar("DAQ") daq_toolbar.addAction(self._daq_connectaction) daq_toolbar.addAction(self._daq_disconnectaction) # daq_toolbar.addAction(self._daq_calibaction) daq_toolbar.addAction(self._run_action) daq_toolbar.addAction(self._run_sinus_action) daq_toolbar.addAction(self._stop_recording) daq_toolbar.addAction(self._recenter_plot) daq_toolbar.addSeparator() daq_toolbar.addAction(self._record) self.addToolBar(Qt.ToolBarArea.TopToolBarArea, daq_toolbar) repro_toolbar = QToolBar("Repros") repro_names, file_names = self.repros.names_of_repros() for rep, fn in zip(repro_names, file_names): repro_action = QAction(rep, self) repro_action.setStatusTip(rep) repro_action.triggered.connect( lambda checked, n=rep, f=fn: self.run_repro(n, f) ) repro_toolbar.addAction(repro_action) self.addToolBar(Qt.ToolBarArea.BottomToolBarArea, repro_toolbar) def recenter_continously_plot(self): self.continously_plot.refresh() def plot_continously(self): plot_con = Worker(self.continously_plot.plot) plot_con.signals.result.connect(self.print_output) plot_con.signals.finished.connect(self.thread_complete) plot_con.signals.progress.connect(self.progress_fn) self.threadpool.start(plot_con) def run_daq(self): read_daq = Worker(self.daq_producer.read_analog_continously) read_daq.signals.result.connect(self.print_output) read_daq.signals.finished.connect(self.thread_complete) read_daq.signals.progress.connect(self.progress_fn) self.threadpool.start(read_daq) self.continously_plot.plot() def run_sinus(self): sinus_pro = Worker(self.sinus_producer.produce_sin) sinus_pro.signals.result.connect(self.print_output) sinus_pro.signals.finished.connect(self.thread_complete) sinus_pro.signals.progress.connect(self.progress_fn) self.threadpool.start(sinus_pro) self.continously_plot.plot() def record(self): nix_writer = Worker(self.nix_writer.write_nix) nix_writer.signals.result.connect(self.print_output) nix_writer.signals.finished.connect(self.thread_complete) nix_writer.signals.progress.connect(self.progress_fn) self.threadpool.start(nix_writer) def stop_recording(self): self.add_to_textfield("Stopping the recording") self.continously_plot.stop_plotting() self.nix_writer.stop_writing() log.debug("Stopping acquisiton") try: self.sinus_producer.stop_request() log.debug("Stopping Sinus") except AttributeError: log.debug("Did not generate Sinus") if hasattr(PyRelacs, "daq_device"): log.debug("Stopping DAQ") self.daq_producer.stop_aquisition() def connect_dac(self): devices = uldaq.get_daq_device_inventory(uldaq.InterfaceType.USB) try: self.daq_device = uldaq.DaqDevice(devices[0]) log.debug(f"Found daq devices {len(devices)}, connecting to the first one") self.daq_device.connect() log.debug("connected") except IndexError: log.info("DAQ is not connected") log.info("Please connect a DAQ device to the system") def disconnect_dac(self): try: self.daq_device.disconnect() self.daq_device.release() except AttributeError: log.debug("DAQ was not connected") def run_repro(self, n, fn): self.text.appendPlainText(f"started Repro {n}, {fn}") worker = Worker(self.repros.run_repro, self.nix_calibration, n, fn) worker.signals.result.connect(self.print_output) worker.signals.finished.connect(self.thread_complete) worker.signals.progress.connect(self.progress_fn) self.threadpool.start(worker) def add_to_textfield(self, s: str): self.text.appendPlainText(s) def on_exit(self): log.info("exit button!") self.stop_recording() self.add_to_textfield("exiting") self.disconnect_dac() log.info("closing GUI") self.close() def on_about(self, e): about = AboutDialog(self) about.show() def print_output(self, s): log.info(s) self.add_to_textfield(s) def thread_complete(self): log.info("Thread complete!") self.add_to_textfield("Thread complete!") def progress_fn(self, n): print("%d%% done" % n)