2
0
forked from awendt/pyrelacs
minipyrelacs/pyrelacs/ui/mainwindow.py

311 lines
11 KiB
Python

import time
from pathlib import Path as path
from PyQt6.QtGui import QAction, QIcon, QKeySequence
from PyQt6.QtCore import Qt, QSize, QThreadPool, QMutex
from PyQt6.QtWidgets import (
QGridLayout,
QToolBar,
QWidget,
QMainWindow,
QPlainTextEdit,
QMenuBar,
QStatusBar,
)
import uldaq
import nixio as nix
import pyqtgraph as pg
import quantities as pq
from pyrelacs.devices.mccdaq import MccDaq
from pyrelacs.dataio.circbuffer import CircBuffer
from pyrelacs.dataio.daq_producer import DaqProducer
from pyrelacs.dataio.nix_writer import NixWriter
from pyrelacs.dataio.sin_producer import SinProducer
from pyrelacs.worker import Worker
from pyrelacs.repros.repros import Repro
from pyrelacs.ui.about import AboutDialog
from pyrelacs.ui.plots.calibration import CalibrationPlot
from pyrelacs.ui.plots.continously import Continously
from pyrelacs.util.logging import config_logging
log = config_logging()
_root = path(__file__).parent.parent
from IPython import embed
class PyRelacs(QMainWindow):
def __init__(self, config):
super().__init__()
# loaded config
self.config = config
if self.config.settings.daq:
start = time.time()
self.mccdaq = MccDaq()
end = time.time()
log.debug(f"Connection to DAQ took {end - start}")
self.repros = Repro()
self.setToolButtonStyle(
Qt.ToolButtonStyle.ToolButtonTextBesideIcon
) # Ensure icons are displayed with text
self.setWindowTitle("PyRelacs")
self.mutex = QMutex()
self.figure = pg.GraphicsLayoutWidget()
self.threadpool = QThreadPool()
self.text = QPlainTextEdit()
self.text.setReadOnly(True)
self.setMenuBar(QMenuBar(self))
self.setStatusBar(QStatusBar(self))
self.create_actions()
self.create_toolbars()
layout = QGridLayout()
layout.addWidget(self.figure, 0, 0, 2, 2)
layout.addWidget(self.text, 2, 0, 1, 2)
widget = QWidget()
widget.setLayout(layout)
self.setCentralWidget(widget)
SAMPLERATE = pq.Quantity(
self.config.pyrelacs.data.input.inputsamplerate,
self.config.pyrelacs.data.input.inputsamplerateunit,
).rescale("Hz")
INPUTTRACECAPACITY = pq.Quantity(
self.config.pyrelacs.data.input.inputtracecapacity,
self.config.pyrelacs.data.input.inputtracecapacityunit,
).rescale("s")
start = time.time()
BUFFERSIZE = (SAMPLERATE * INPUTTRACECAPACITY).simplified
end = time.time()
log.debug(f"Buffer allocation took {end - start}")
self.buffer = CircBuffer(
size=int(BUFFERSIZE.base),
samplerate=float(SAMPLERATE.base),
mutex=self.mutex,
)
self.continously_plot = Continously(self.figure, self.buffer)
self.continously_plot.plot()
if self.config.settings.daq:
log.debug("Creating Daq Generator")
self.daq_producer = DaqProducer(self.buffer, self.mccdaq.daq_device, [1, 1])
log.debug("Creating Sinus Generator")
self.sinus_producer = SinProducer(self.buffer)
self.nix_writer = NixWriter(self.buffer)
def create_actions(self):
self._rlx_exitaction = QAction(QIcon(":/icons/exit.png"), "Exit", self)
self._rlx_exitaction.setStatusTip("Close relacs")
self._rlx_exitaction.setShortcut(QKeySequence("Alt+q"))
self._rlx_exitaction.triggered.connect(self.on_exit)
self._rlx_aboutaction = QAction("about")
self._rlx_aboutaction.setStatusTip("Show about dialog")
self._rlx_aboutaction.setEnabled(True)
self._rlx_aboutaction.triggered.connect(self.on_about)
self._daq_connectaction = QAction(
QIcon(":icons/connect.png"), "Connect DAQ", self
)
if self.config.settings.daq:
self._daq_connectaction.setStatusTip("Connect to daq device")
# self._daq_connectaction.setShortcut(QKeySequence("Alt+d"))
self._daq_connectaction.triggered.connect(self.mccdaq.connect_dac)
self._daq_disconnectaction = QAction(
QIcon(":/icons/disconnect.png"), "Disconnect DAQ", self
)
self._daq_disconnectaction.setStatusTip("Disconnect the DAQ device")
# self._daq_connectaction.setShortcut(QKeySequence("Alt+d"))
self._daq_disconnectaction.triggered.connect(self.mccdaq.disconnect_daq)
# self._daq_calibaction = QAction(
# QIcon(":/icons/calibration.png"), "Plot calibration", self
# )
# self._daq_calibaction.setStatusTip("Calibrate the attenuator device")
# # self._daq_calibaction.setShortcut(QKeySequence("Alt+d"))
# self._daq_calibaction.triggered.connect(self.calibration_plot.plot)
self._run_action = QAction(QIcon(":/icons/record.png"), "Run", self)
self._run_action.triggered.connect(self.run_daq)
self._run_sinus_action = QAction(QIcon(":/icons/record.png"), "Sinus", self)
self._run_sinus_action.triggered.connect(self.run_sinus)
self._stop_recording = QAction(QIcon(":/icons/stop.png"), "Stop", self)
self._stop_recording.triggered.connect(self.stop_recording)
self._recenter_plot = QAction("Recenter", self)
self._recenter_plot.triggered.connect(self.recenter_continously_plot)
self._recenter_plot.setShortcut(QKeySequence("Alt+r"))
self._record = QAction(QIcon(":/icons/record.png"), "Record", self)
self._record.triggered.connect(self.record)
self.create_menu()
def create_menu(self):
menu = self.menuBar()
if menu is not None:
file_menu = menu.addMenu("&File")
device_menu = menu.addMenu("&DAQ")
help_menu = menu.addMenu("&Help")
if file_menu is not None:
file_menu.addAction(self._rlx_exitaction)
file_menu.addAction(self._rlx_aboutaction)
if device_menu is not None:
if self.config.settings.daq:
device_menu.addAction(self._daq_connectaction)
device_menu.addAction(self._daq_disconnectaction)
device_menu.addSeparator()
# device_menu.addAction(self._daq_calibaction)
device_menu.addAction(self._run_action)
if help_menu is not None:
help_menu.addSeparator()
# help_menu.addAction(self._help_action)
else:
log.error("could not create file menu and device menu")
self.on_exit()
self.setMenuBar(menu)
def create_toolbars(self):
rlx_toolbar = QToolBar("Relacs")
rlx_toolbar.addAction(self._rlx_exitaction)
rlx_toolbar.setIconSize(QSize(24, 24))
self.addToolBar(Qt.ToolBarArea.TopToolBarArea, rlx_toolbar)
daq_toolbar = QToolBar("DAQ")
if self.config.settings.daq:
daq_toolbar.addAction(self._daq_connectaction)
daq_toolbar.addAction(self._daq_disconnectaction)
# daq_toolbar.addAction(self._daq_calibaction)
daq_toolbar.addAction(self._run_action)
daq_toolbar.addAction(self._run_sinus_action)
daq_toolbar.addAction(self._stop_recording)
daq_toolbar.addAction(self._recenter_plot)
daq_toolbar.addSeparator()
daq_toolbar.addAction(self._record)
self.addToolBar(Qt.ToolBarArea.TopToolBarArea, daq_toolbar)
repro_toolbar = QToolBar("Repros")
repro_names, file_names = self.repros.names_of_repros(
include_repros=self.config.settings.repros
)
for rep, fn in zip(repro_names, file_names):
repro_action = QAction(rep, self)
repro_action.setStatusTip(rep)
repro_action.triggered.connect(
lambda checked, n=rep, f=fn: self.run_repro(n, f)
)
repro_toolbar.addAction(repro_action)
self.addToolBar(Qt.ToolBarArea.BottomToolBarArea, repro_toolbar)
def recenter_continously_plot(self):
self.continously_plot.refresh()
def plot_continously(self):
plot_con = Worker(self.continously_plot.plot)
plot_con.signals.result.connect(self.print_output)
plot_con.signals.finished.connect(self.thread_complete)
plot_con.signals.progress.connect(self.progress_fn)
self.threadpool.start(plot_con)
def run_daq(self):
read_daq = Worker(self.daq_producer.read_analog_continously)
read_daq.signals.result.connect(self.print_output)
read_daq.signals.finished.connect(self.thread_complete)
read_daq.signals.progress.connect(self.progress_fn)
self.threadpool.start(read_daq)
self.continously_plot.plot()
def run_sinus(self):
sinus_pro = Worker(self.sinus_producer.produce_sin)
sinus_pro.signals.result.connect(self.print_output)
sinus_pro.signals.finished.connect(self.thread_complete)
sinus_pro.signals.progress.connect(self.progress_fn)
self.threadpool.start(sinus_pro)
self.continously_plot.plot()
def record(self):
nix_writer = Worker(self.nix_writer.write_nix)
nix_writer.signals.result.connect(self.print_output)
nix_writer.signals.finished.connect(self.thread_complete)
nix_writer.signals.progress.connect(self.progress_fn)
self.threadpool.start(nix_writer)
def stop_recording(self):
self.add_to_textfield("Stopping the recording")
self.continously_plot.stop_plotting()
self.nix_writer.stop_writing()
log.debug("Stopping acquisiton")
try:
self.sinus_producer.stop_request()
log.debug("Stopping Sinus")
except AttributeError:
log.debug("Did not generate Sinus")
if hasattr(PyRelacs, "daq_device"):
log.debug("Stopping DAQ")
self.daq_producer.stop_aquisition()
def run_repro(self, n, fn):
self.text.appendPlainText(f"started Repro {n}, {fn}")
worker = Worker(self.repros.run_repro, self.nix_calibration, n, fn)
worker.signals.result.connect(self.print_output)
worker.signals.finished.connect(self.thread_complete)
worker.signals.progress.connect(self.progress_fn)
self.threadpool.start(worker)
def add_to_textfield(self, s: str):
self.text.appendPlainText(s)
def on_exit(self):
log.info("exit button!")
self.stop_recording()
self.add_to_textfield("exiting")
if self.config.settings.daq:
self.mccdaq.disconnect_daq()
log.info("closing GUI")
self.close()
def on_about(self, e):
about = AboutDialog(self)
about.show()
def print_output(self, s):
log.info(s)
self.add_to_textfield(s)
def thread_complete(self):
log.info("Thread complete!")
self.add_to_textfield("Thread complete!")
def progress_fn(self, n):
print("%d%% done" % n)