import time from pathlib import Path as path from PyQt6.QtGui import QAction, QIcon, QKeySequence from PyQt6.QtCore import Qt, QSize, QThreadPool, QMutex from PyQt6.QtWidgets import ( QGridLayout, QToolBar, QWidget, QMainWindow, QPlainTextEdit, QMenuBar, QStatusBar, ) from pyqtgraph.Qt.QtCore import QThread import uldaq import nixio as nix import pyqtgraph as pg import quantities as pq from pyrelacs.devices.mccdaq import MccDaq from pyrelacs.dataio.circbuffer import CircBuffer from pyrelacs.dataio.daq_producer import DaqProducer from pyrelacs.dataio.nix_writer import NixWriter from pyrelacs.dataio.sin_producer import SinProducer from pyrelacs.worker import Worker from pyrelacs.repros.repros import Repro from pyrelacs.ui.about import AboutDialog from pyrelacs.ui.plots.calibration import CalibrationPlot from pyrelacs.ui.plots.continously import Continously from pyrelacs.util.logging import config_logging log = config_logging() _root = path(__file__).parent.parent from IPython import embed class PyRelacs(QMainWindow): def __init__(self, config): super().__init__() # loaded config self.config = config if self.config.settings.daq: start = time.time() self.mccdaq = MccDaq() end = time.time() log.debug(f"Connection to DAQ took {end - start}") self.repros = Repro() self.setToolButtonStyle( Qt.ToolButtonStyle.ToolButtonTextBesideIcon ) # Ensure icons are displayed with text self.setWindowTitle("PyRelacs") self.mutex = QMutex() self.figure = pg.GraphicsLayoutWidget() self.threadpool = QThreadPool() self.text = QPlainTextEdit() self.text.setReadOnly(True) self.setMenuBar(QMenuBar(self)) self.setStatusBar(QStatusBar(self)) self.create_actions() self.create_toolbars() layout = QGridLayout() layout.addWidget(self.figure, 0, 0, 2, 2) layout.addWidget(self.text, 2, 0, 1, 2) widget = QWidget() widget.setLayout(layout) self.setCentralWidget(widget) SAMPLERATE = pq.Quantity( self.config.pyrelacs.data.input.inputsamplerate, self.config.pyrelacs.data.input.inputsamplerateunit, ).rescale("Hz") INPUTTRACECAPACITY = pq.Quantity( self.config.pyrelacs.data.input.inputtracecapacity, self.config.pyrelacs.data.input.inputtracecapacityunit, ).rescale("s") start = time.time() BUFFERSIZE = (SAMPLERATE * INPUTTRACECAPACITY).simplified end = time.time() log.debug(f"Buffer allocation took {end - start}") self.buffer = CircBuffer( size=int(BUFFERSIZE.base), samplerate=float(SAMPLERATE.base), mutex=self.mutex, ) self.continously_plot = Continously(self.figure, self.buffer) # self.continously_plot.plot() if self.config.settings.daq: log.debug("Creating Daq Generator") self.daq_producer = DaqProducer(self.buffer, self.mccdaq.daq_device, [1, 1]) log.debug("Creating Sinus Generator") self.sinus_producer = SinProducer(self.buffer) self.nix_writer = NixWriter(self.buffer) def create_actions(self): self._rlx_exitaction = QAction(QIcon(":/icons/exit.png"), "Exit", self) self._rlx_exitaction.setStatusTip("Close relacs") self._rlx_exitaction.setShortcut(QKeySequence("Alt+q")) self._rlx_exitaction.triggered.connect(self.on_exit) self._rlx_aboutaction = QAction("about") self._rlx_aboutaction.setStatusTip("Show about dialog") self._rlx_aboutaction.setEnabled(True) self._rlx_aboutaction.triggered.connect(self.on_about) self._daq_connectaction = QAction( QIcon(":icons/connect.png"), "Connect DAQ", self ) if self.config.settings.daq: self._daq_connectaction.setStatusTip("Connect to daq device") # self._daq_connectaction.setShortcut(QKeySequence("Alt+d")) self._daq_connectaction.triggered.connect(self.mccdaq.connect_dac) self._daq_disconnectaction = QAction( QIcon(":/icons/disconnect.png"), "Disconnect DAQ", self ) self._daq_disconnectaction.setStatusTip("Disconnect the DAQ device") # self._daq_connectaction.setShortcut(QKeySequence("Alt+d")) self._daq_disconnectaction.triggered.connect(self.mccdaq.disconnect_daq) # self._daq_calibaction = QAction( # QIcon(":/icons/calibration.png"), "Plot calibration", self # ) # self._daq_calibaction.setStatusTip("Calibrate the attenuator device") # # self._daq_calibaction.setShortcut(QKeySequence("Alt+d")) # self._daq_calibaction.triggered.connect(self.calibration_plot.plot) self._run_action = QAction(QIcon(":/icons/record.png"), "Run", self) self._run_action.triggered.connect(self.run_daq) self._run_sinus_action = QAction(QIcon(":/icons/record.png"), "Sinus", self) self._run_sinus_action.triggered.connect(self.run_sinus) self._stop_recording = QAction(QIcon(":/icons/stop.png"), "Stop", self) self._stop_recording.triggered.connect(self.stop_recording) self._recenter_plot = QAction("Recenter", self) self._recenter_plot.triggered.connect(self.recenter_continously_plot) self._recenter_plot.setShortcut(QKeySequence("Alt+r")) self._record = QAction(QIcon(":/icons/record.png"), "Record", self) self._record.triggered.connect(self.record) self.create_menu() def create_menu(self): menu = self.menuBar() if menu is not None: file_menu = menu.addMenu("&File") device_menu = menu.addMenu("&DAQ") help_menu = menu.addMenu("&Help") if file_menu is not None: file_menu.addAction(self._rlx_exitaction) file_menu.addAction(self._rlx_aboutaction) if device_menu is not None: if self.config.settings.daq: device_menu.addAction(self._daq_connectaction) device_menu.addAction(self._daq_disconnectaction) device_menu.addSeparator() # device_menu.addAction(self._daq_calibaction) device_menu.addAction(self._run_action) if help_menu is not None: help_menu.addSeparator() # help_menu.addAction(self._help_action) else: log.error("could not create file menu and device menu") self.on_exit() self.setMenuBar(menu) def create_toolbars(self): rlx_toolbar = QToolBar("Relacs") rlx_toolbar.addAction(self._rlx_exitaction) rlx_toolbar.setIconSize(QSize(24, 24)) self.addToolBar(Qt.ToolBarArea.TopToolBarArea, rlx_toolbar) daq_toolbar = QToolBar("DAQ") if self.config.settings.daq: daq_toolbar.addAction(self._daq_connectaction) daq_toolbar.addAction(self._daq_disconnectaction) # daq_toolbar.addAction(self._daq_calibaction) daq_toolbar.addAction(self._run_action) daq_toolbar.addAction(self._run_sinus_action) daq_toolbar.addAction(self._stop_recording) daq_toolbar.addAction(self._recenter_plot) daq_toolbar.addSeparator() daq_toolbar.addAction(self._record) self.addToolBar(Qt.ToolBarArea.TopToolBarArea, daq_toolbar) repro_toolbar = QToolBar("Repros") repro_names, file_names = self.repros.names_of_repros( include_repros=self.config.settings.repros ) for rep, fn in zip(repro_names, file_names): repro_action = QAction(rep, self) repro_action.setStatusTip(rep) repro_action.triggered.connect( lambda checked, n=rep, f=fn: self.run_repro(n, f) ) repro_toolbar.addAction(repro_action) self.addToolBar(Qt.ToolBarArea.BottomToolBarArea, repro_toolbar) def recenter_continously_plot(self): self.continously_plot.refresh() def plot_continously(self): plot_con = Worker(self.continously_plot.plot) plot_con.signals.result.connect(self.print_output) plot_con.signals.finished.connect(self.thread_complete) plot_con.signals.progress.connect(self.progress_fn) self.threadpool.start(plot_con) def run_daq(self): read_daq = Worker(self.daq_producer.read_analog_continously) read_daq.signals.result.connect(self.print_output) read_daq.signals.finished.connect(self.thread_complete) read_daq.signals.progress.connect(self.progress_fn) self.threadpool.start(read_daq) self.continously_plot.plot() def run_sinus(self): sinus_pro = Worker(self.sinus_producer.produce_sin) sinus_pro.signals.result.connect(self.print_output) sinus_pro.signals.finished.connect(self.thread_complete) sinus_pro.signals.progress.connect(self.progress_fn) self.threadpool.start(sinus_pro) self.continously_plot.plot() def record(self): nix_writer = Worker(self.nix_writer.write_nix) nix_writer.signals.result.connect(self.print_output) nix_writer.signals.finished.connect(self.thread_complete) nix_writer.signals.progress.connect(self.progress_fn) self.threadpool.start(nix_writer) def stop_recording(self): self.add_to_textfield("Stopping the recording") self.continously_plot.stop_plotting() self.nix_writer.stop_writing() log.debug("Stopping acquisiton") try: self.sinus_producer.stop_request() log.debug("Stopping Sinus") except AttributeError: log.debug("Did not generate Sinus") if hasattr(PyRelacs, "daq_device"): log.debug("Stopping DAQ") self.daq_producer.stop_aquisition() def run_repro(self, n, fn): self.text.appendPlainText(f"started Repro {n}, {fn}") worker = Worker(self.repros.run_repro, self.nix_calibration, n, fn) worker.signals.result.connect(self.print_output) worker.signals.finished.connect(self.thread_complete) worker.signals.progress.connect(self.progress_fn) self.threadpool.start(worker) def add_to_textfield(self, s: str): self.text.appendPlainText(s) def on_exit(self): log.info("exit button!") self.stop_recording() self.add_to_textfield("exiting") if self.config.settings.daq: self.mccdaq.disconnect_daq() log.info("closing GUI") self.close() def on_about(self, e): about = AboutDialog(self) about.show() def print_output(self, s): log.info(s) self.add_to_textfield(s) def thread_complete(self): log.info("Thread complete!") self.add_to_textfield("Thread complete!") def progress_fn(self, n): print("%d%% done" % n)