pyrelacs/pyrelacs/ui/mainwindow.py

392 lines
14 KiB
Python

import time
from pathlib import Path as path
from datetime import datetime
from dataclasses import asdict
from PyQt6.QtGui import QAction, QIcon, QKeySequence
from PyQt6.QtCore import Qt, QSize, QThreadPool, QMutex
from PyQt6.QtWidgets import (
QGridLayout,
QPushButton,
QTabWidget,
QToolBar,
QVBoxLayout,
QWidget,
QMainWindow,
QPlainTextEdit,
QMenuBar,
QStatusBar,
)
import nixio
import pyqtgraph as pg
import quantities as pq
from pyrelacs.devices.mccdaq import MccDaq
from pyrelacs.dataio.circbuffer import CircBuffer
from pyrelacs.dataio.daq_producer import DaqProducer
from pyrelacs.dataio.nix_writer import NixWriter
from pyrelacs.dataio.sin_producer import SinProducer
from pyrelacs.worker import Worker
from pyrelacs.repros.repros import Repro
from pyrelacs.ui.about import AboutDialog
from pyrelacs.ui.plots.calibration import CalibrationPlot
from pyrelacs.ui.plots.continously import Continously
from pyrelacs.util.logging import config_logging
log = config_logging()
_root = path(__file__).parent.parent
from IPython import embed
class PyRelacs(QMainWindow):
def __init__(self, config):
super().__init__()
# loaded config
self.config = config
if self.config.settings.daq:
start = time.time()
self.mccdaq = MccDaq()
end = time.time()
log.debug(f"Connection to DAQ took {end - start}")
else:
self.mccdaq = None
self.repros = Repro()
self.setToolButtonStyle(
Qt.ToolButtonStyle.ToolButtonTextBesideIcon
) # Ensure icons are displayed with text
self.setWindowTitle("PyRelacs")
self.create_nix_file(f"{_root}/test.nix", self.config.metadata)
self.mutex = QMutex()
self.figure = pg.GraphicsLayoutWidget()
self.threadpool = QThreadPool()
self.text = QPlainTextEdit()
self.text.setReadOnly(True)
self.setMenuBar(QMenuBar(self))
self.setStatusBar(QStatusBar(self))
self.create_actions()
self.create_toolbars()
self.repro_tabs = QTabWidget()
self.create_repros_tabs()
layout = QGridLayout()
layout.addWidget(self.figure, 0, 0, 2, 2)
layout.addWidget(self.repro_tabs, 2, 0, 2, 2)
layout.addWidget(self.text, 4, 0, 1, 1)
widget = QWidget()
widget.setLayout(layout)
self.setCentralWidget(widget)
SAMPLERATE = pq.Quantity(
self.config.pyrelacs.data.input.inputsamplerate,
self.config.pyrelacs.data.input.inputsamplerateunit,
).rescale("Hz")
INPUTTRACECAPACITY = pq.Quantity(
self.config.pyrelacs.data.input.inputtracecapacity,
self.config.pyrelacs.data.input.inputtracecapacityunit,
).rescale("s")
start = time.time()
BUFFERSIZE = (SAMPLERATE * INPUTTRACECAPACITY).simplified
end = time.time()
log.debug(f"Buffer allocation took {end - start}")
self.buffer = CircBuffer(
size=int(BUFFERSIZE.base),
samplerate=float(SAMPLERATE.base),
mutex=self.mutex,
)
self.continously_plot = Continously(self.figure, self.buffer)
# self.continously_plot.plot()
if self.mccdaq:
log.debug("Creating Daq Generator")
self.daq_producer = DaqProducer(self.buffer, self.mccdaq.daq_device, [1, 1])
log.debug("Creating Sinus Generator")
self.sinus_producer = SinProducer(self.buffer)
self.nix_writer = NixWriter(self.buffer)
def create_actions(self):
self._rlx_exitaction = QAction(QIcon(":/icons/exit.png"), "Exit", self)
self._rlx_exitaction.setStatusTip("Close relacs")
self._rlx_exitaction.setShortcut(QKeySequence("Alt+q"))
self._rlx_exitaction.triggered.connect(self.on_exit)
self._rlx_aboutaction = QAction("about")
self._rlx_aboutaction.setStatusTip("Show about dialog")
self._rlx_aboutaction.setEnabled(True)
self._rlx_aboutaction.triggered.connect(self.on_about)
self._daq_connectaction = QAction(
QIcon(":icons/connect.png"), "Connect DAQ", self
)
if self.mccdaq:
self._daq_connectaction.setStatusTip("Connect to daq device")
# self._daq_connectaction.setShortcut(QKeySequence("Alt+d"))
self._daq_connectaction.triggered.connect(self.mccdaq.connect_dac)
self._daq_disconnectaction = QAction(
QIcon(":/icons/disconnect.png"), "Disconnect DAQ", self
)
self._daq_disconnectaction.setStatusTip("Disconnect the DAQ device")
# self._daq_connectaction.setShortcut(QKeySequence("Alt+d"))
self._daq_disconnectaction.triggered.connect(self.mccdaq.disconnect_daq)
# self._daq_calibaction = QAction(
# QIcon(":/icons/calibration.png"), "Plot calibration", self
# )
# self._daq_calibaction.setStatusTip("Calibrate the attenuator device")
# # self._daq_calibaction.setShortcut(QKeySequence("Alt+d"))
# self._daq_calibaction.triggered.connect(self.calibration_plot.plot)
self._run_action = QAction(QIcon(":/icons/record.png"), "RunDAQ", self)
self._run_action.triggered.connect(self.run_daq)
self._run_sinus_action = QAction(QIcon(":/icons/record.png"), "Sinus", self)
self._run_sinus_action.triggered.connect(self.run_sinus)
self._stop_recording = QAction(QIcon(":/icons/stop.png"), "Stop", self)
self._stop_recording.triggered.connect(self.stop_recording)
self._recenter_plot = QAction("Recenter", self)
self._recenter_plot.triggered.connect(self.recenter_continously_plot)
self._recenter_plot.setShortcut(QKeySequence("Alt+r"))
self._record = QAction(QIcon(":/icons/record.png"), "Record", self)
self._record.triggered.connect(self.record)
self.create_menu()
def create_menu(self):
menu = self.menuBar()
if menu is not None:
file_menu = menu.addMenu("&File")
device_menu = menu.addMenu("&DAQ")
help_menu = menu.addMenu("&Help")
if file_menu is not None:
file_menu.addAction(self._rlx_exitaction)
file_menu.addAction(self._rlx_aboutaction)
if device_menu is not None:
if self.config.settings.daq:
device_menu.addAction(self._daq_connectaction)
device_menu.addAction(self._daq_disconnectaction)
device_menu.addSeparator()
# device_menu.addAction(self._daq_calibaction)
device_menu.addAction(self._run_action)
if help_menu is not None:
help_menu.addSeparator()
# help_menu.addAction(self._help_action)
else:
log.error("could not create file menu and device menu")
self.on_exit()
self.setMenuBar(menu)
def create_toolbars(self):
rlx_toolbar = QToolBar("Relacs")
rlx_toolbar.addAction(self._rlx_exitaction)
rlx_toolbar.setIconSize(QSize(24, 24))
self.addToolBar(Qt.ToolBarArea.TopToolBarArea, rlx_toolbar)
daq_toolbar = QToolBar("DAQ")
if self.config.settings.daq:
daq_toolbar.addAction(self._daq_connectaction)
daq_toolbar.addAction(self._daq_disconnectaction)
# daq_toolbar.addAction(self._daq_calibaction)
daq_toolbar.addAction(self._run_action)
daq_toolbar.addAction(self._run_sinus_action)
daq_toolbar.addAction(self._stop_recording)
daq_toolbar.addAction(self._recenter_plot)
daq_toolbar.addSeparator()
daq_toolbar.addAction(self._record)
self.addToolBar(Qt.ToolBarArea.TopToolBarArea, daq_toolbar)
def create_repros_tabs(self):
repro_names, file_names = self.repros.names_of_repros(
include_repros=self.config.settings.repros
)
nix_blocks = {
rep: self.nix_file.create_block(f"{rep}", "Data Repro")
for rep in repro_names
}
figures_repros = {rep: pg.GraphicsLayoutWidget() for rep in repro_names}
for rep, fn in zip(repro_names, file_names):
tab = QWidget()
tab_layout = QGridLayout()
run_repro_button = QPushButton(f"Run {rep}")
run_repro_button.setCheckable(True)
run_repro_button.clicked.connect(
lambda checked, n=rep, f=fn: self.run_repro(
n,
f,
nix_blocks,
figures_repros,
self.mccdaq,
self.config,
)
)
tab_layout.addWidget(run_repro_button, 0, 0, 1, 0)
tab_layout.addWidget(figures_repros[rep], 1, 0, 1, 1)
tab.setLayout(tab_layout)
self.repro_tabs.addTab(tab, f"{rep}")
def run_repro(
self,
name_of_repro: str,
file_of_repro: str,
nix_block,
figures,
*args,
):
self.text.appendPlainText(f"started Repro {name_of_repro}, {file_of_repro}")
nix_block_repro = nix_block[name_of_repro]
figure_repro = figures[name_of_repro]
worker = Worker(
self.repros.run_repro,
name_of_repro,
file_of_repro,
nix_block_repro,
figure_repro,
*args[-2:],
)
worker.signals.result.connect(self.print_output)
worker.signals.finished.connect(self.thread_complete)
worker.signals.progress.connect(self.progress_fn)
self.threadpool.start(worker)
def create_nix_file(self, file_path, metadata):
self.nix_file = nixio.File.open(
path=f"{file_path}", mode=nixio.FileMode.Overwrite
)
self.block = self.nix_file.create_block("recording", "testfile")
self.section = self.nix_file.create_section("metadata", "config.yaml")
for key, value in asdict(metadata).items():
self.section[key] = value
self.data_array_analog1 = self.block.create_data_array(
"Analog1", "ndarray", shape=(1000,), dtype=nixio.DataType.Double
)
self.data_array_analog2 = self.block.create_data_array(
"Analog2", "ndarray", shape=(1000,), dtype=nixio.DataType.Double
)
def recenter_continously_plot(self):
self.continously_plot.refresh()
def plot_continously(self):
plot_con = Worker(self.continously_plot.plot)
plot_con.signals.result.connect(self.print_output)
plot_con.signals.finished.connect(self.thread_complete)
plot_con.signals.progress.connect(self.progress_fn)
self.threadpool.start(plot_con)
def run_daq(self):
read_daq = Worker(self.daq_producer.read_analog_continously)
read_daq.signals.result.connect(self.print_output)
read_daq.signals.finished.connect(self.thread_complete)
read_daq.signals.progress.connect(self.progress_fn)
self.threadpool.start(read_daq)
self.continously_plot.plot()
def run_sinus(self):
sinus_pro = Worker(self.sinus_producer.produce_sin)
sinus_pro.signals.result.connect(self.print_output)
sinus_pro.signals.finished.connect(self.thread_complete)
sinus_pro.signals.progress.connect(self.progress_fn)
self.threadpool.start(sinus_pro)
self.continously_plot.plot()
def record(self):
self.create_nix_file("test.nix", self.config.metadata)
log.debug("Created nix file")
nix_writer = Worker(
self.nix_writer.write_nix,
data_array=self.data_array_analog1,
mutex=self.mutex,
channel=0,
chunk_size=1000,
)
nix_writer.signals.result.connect(self.print_output)
nix_writer.signals.finished.connect(self.thread_complete)
nix_writer.signals.progress.connect(self.progress_fn)
self.threadpool.start(nix_writer)
nix_writer2 = Worker(
self.nix_writer.write_nix,
data_array=self.data_array_analog2,
mutex=self.mutex,
channel=0,
chunk_size=1000,
)
nix_writer2.signals.result.connect(self.print_output)
nix_writer2.signals.finished.connect(self.thread_complete)
nix_writer2.signals.progress.connect(self.progress_fn)
self.threadpool.start(nix_writer2)
def stop_recording(self):
self.add_to_textfield("Stopping the recording")
self.continously_plot.stop_plotting()
self.nix_writer.stop_writing()
log.debug("Stopping acquisiton")
try:
self.sinus_producer.stop_request()
log.debug("Stopping Sinus")
except AttributeError:
log.debug("Did not generate Sinus")
if hasattr(PyRelacs, "daq_device"):
log.debug("Stopping DAQ")
self.daq_producer.stop_aquisition()
def add_to_textfield(self, s: str):
self.text.appendPlainText(s)
def on_exit(self):
log.info("exit button!")
self.stop_recording()
self.add_to_textfield("exiting")
if self.mccdaq:
self.mccdaq.disconnect_daq()
log.info("closing GUI")
self.close()
def on_about(self, e):
about = AboutDialog(self)
about.show()
def print_output(self, s):
log.info(s)
self.add_to_textfield(s)
def thread_complete(self):
log.info("Thread complete!")
self.add_to_textfield("Thread complete!")
def progress_fn(self, n):
print("%d%% done" % n)