import sys import pathlib from PyQt6.QtCore import QSettings, Qt from PyQt6.QtWidgets import QApplication from . import info from .ui.mainwindow import PyRelacs from .util.logging import config_logging log = config_logging() from . import resources class PyRelacs(QMainWindow): def __init__(self): super().__init__() self.setWindowTitle("PyRelacs") self.beat_plot = pg.PlotWidget() self.power_plot = pg.PlotWidget() self.threadpool = QThreadPool() self.repros = Repro() self.daq_connect_button = QPushButton("Connect Daq") self.daq_connect_button.setCheckable(True) self.daq_connect_button.clicked.connect(self.connect_dac) self.daq_disconnect_button = QPushButton("Disconnect Daq") self.daq_disconnect_button.setCheckable(True) self.daq_disconnect_button.clicked.connect(self.disconnect_dac) self.plot_calibration_button = QPushButton("Plot Calibration") self.plot_calibration_button.setCheckable(True) self.plot_calibration_button.clicked.connect(self.plot_calibration) layout = QGridLayout() layout.addWidget(self.plot_calibration_button, 0, 0) layout.addWidget(self.daq_disconnect_button, 0, 1) layout.addWidget(self.beat_plot, 2, 0, 1, 2) layout.addWidget(self.power_plot, 3, 0, 1, 2) self.toolbar = QToolBar("Repros") self.addToolBar(self.toolbar) self.repros_to_toolbar() # self.setFixedSize(QSize(400, 300)) widget = QWidget() widget.setLayout(layout) self.setCentralWidget(widget) self.nix_file = nix.File.open( str(pathlib.Path(__file__).parent / "data"), nix.FileMode.ReadOnly ) def plot_calibration(self): def decibel(power, ref_power=1.0, min_power=1e-20): """Transform power to decibel relative to ref_power. \\[ decibel = 10 \\cdot \\log_{10}(power/ref\\_power) \\] Power values smaller than `min_power` are set to `-np.inf`. Parameters ---------- power: float or array Power values, for example from a power spectrum or spectrogram. ref_power: float or None or 'peak' Reference power for computing decibel. If set to `None` or 'peak', the maximum power is used. min_power: float Power values smaller than `min_power` are set to `-np.inf`. Returns ------- decibel_psd: array Power values in decibel relative to `ref_power`. """ if np.isscalar(power): tmp_power = np.array([power]) decibel_psd = np.array([power]) else: tmp_power = power decibel_psd = power.copy() if ref_power is None or ref_power == "peak": ref_power = np.max(decibel_psd) decibel_psd[tmp_power <= min_power] = float("-inf") decibel_psd[tmp_power > min_power] = 10.0 * np.log10( decibel_psd[tmp_power > min_power] / ref_power ) if np.isscalar(power): return decibel_psd[0] else: return decibel_psd block = self.nix_file.blocks[0] colors = ["red", "green", "blue", "black", "yellow"] for i, (stim, fish) in enumerate( zip(list(block.data_arrays)[::2], list(block.data_arrays)[1::2]) ): beat = stim[:] + fish[:] beat_squared = beat**2 f, powerspec = welch(beat, fs=40_000.0) powerspec = decibel(powerspec) f_sq, powerspec_sq = welch(beat_squared, fs=40_000.0) powerspec_sq = decibel(powerspec_sq) peaks = find_peaks(powerspec_sq, prominence=20)[0] pen = pg.mkPen(colors[i]) self.beat_plot.plot( np.arange(0, len(beat)) / 40_000.0, beat_squared, pen=pen, # name=stim.name, ) self.power_plot.plot(f_sq, powerspec_sq, pen=pen) self.power_plot.plot(f[peaks], powerspec_sq[peaks], pen=None, symbol="x") def connect_dac(self): devices = uldaq.get_daq_device_inventory(uldaq.InterfaceType.USB) try: self.daq_device = uldaq.DaqDevice(devices[0]) log.debug(f"Found daq devices {len(devices)}, connecting to the first one") self.daq_device.connect() log.debug("Connected") except IndexError: log.debug("DAQ is not connected, closing") QApplication.quit() self.daq_connect_button.setDisabled(True) def disconnect_dac(self): try: log.debug(f"{self.daq_device}") self.daq_device.disconnect() self.daq_device.release() log.debug(f"{self.daq_device}") self.daq_disconnect_button.setDisabled(True) self.daq_connect_button.setEnabled(True) except AttributeError: log.debug("DAQ was not connected") def repros_to_toolbar(self): repro_names, file_names = self.repros.names_of_repros() for rep, fn in zip(repro_names, file_names): individual_repro_button = QAction(rep, self) individual_repro_button.setStatusTip("Button") individual_repro_button.triggered.connect( lambda checked, n=rep, f=fn: self.run_repro(n, f) ) self.toolbar.addAction(individual_repro_button) def run_repro(self, n, fn): log.debug(f"Running repro {n} in the file {fn}") worker = Worker(self.repros.run_repro, self.nix_file, n, fn) worker.signals.result.connect(self.print_output) worker.signals.finished.connect(self.thread_complete) worker.signals.progress.connect(self.progress_fn) self.threadpool.start(worker) def print_output(self, s): print(s) def thread_complete(self): print("THREAD COMPLETE!") def progress_fn(self, n): print("%d%% done" % n) def main(): app = QApplication(sys.argv) app.setApplicationName(info.NAME) app.setApplicationVersion(str(info.VERSION)) app.setOrganizationDomain(info.ORGANIZATION) # app.setAttribute(Qt.ApplicationAttribute.AA_DontShowIconsInMenus, False) # read window settings settings = QSettings(info.ORGANIZATION, info.NAME) width = int(settings.value("app/width", 1024)) height = int(settings.value("app/height", 768)) x = int(settings.value("app/pos_x", 100)) y = int(settings.value("app/pos_y", 100)) window = PyRelacs() window.setMinimumWidth(200) window.setMinimumHeight(200) window.resize(width, height) window.move(x, y) window.show() exit_code = app.exec() # store window position and size pos = window.pos() settings.setValue("app/width", window.width()) settings.setValue("app/height", window.height()) settings.setValue("app/pos_x", pos.x()) settings.setValue("app/pos_y", pos.y()) sys.exit(exit_code) if __name__ == "__main__": main()