rewriting the project structure

This commit is contained in:
wendtalexander 2024-09-27 09:12:11 +02:00
parent 43e0d4b75a
commit dd3e0d045d
9 changed files with 139 additions and 222 deletions

View File

@ -1,7 +1,6 @@
from PyQt6.QtGui import QAction from PyQt6.QtGui import QAction
import sys import sys
import pathlib import pathlib
import ctypes
from PyQt6.QtCore import QProcess, QSize, QThreadPool, Qt from PyQt6.QtCore import QProcess, QSize, QThreadPool, Qt
from PyQt6.QtWidgets import ( from PyQt6.QtWidgets import (
@ -13,12 +12,13 @@ from PyQt6.QtWidgets import (
QMainWindow, QMainWindow,
QPlainTextEdit, QPlainTextEdit,
) )
import tomli
import uldaq import uldaq
from IPython import embed from IPython import embed
import numpy as np import numpy as np
from pyrelacs.util.logging import config_logging from pyrelacs.util.logging import config_logging
from pyrelacs.worker import Worker
from pyrelacs.repros.repros import Repro
log = config_logging() log = config_logging()
@ -30,8 +30,7 @@ class PyRelacs(QMainWindow):
self.setMinimumSize(1000, 1000) self.setMinimumSize(1000, 1000)
self.threadpool = QThreadPool() self.threadpool = QThreadPool()
# for starting a Qprocess self.repros = Repro()
self.p = None
self.daq_connect_button = QPushButton("Connect Daq") self.daq_connect_button = QPushButton("Connect Daq")
self.daq_connect_button.setCheckable(True) self.daq_connect_button.setCheckable(True)
@ -51,7 +50,7 @@ class PyRelacs(QMainWindow):
self.toolbar = QToolBar("Repros") self.toolbar = QToolBar("Repros")
self.addToolBar(self.toolbar) self.addToolBar(self.toolbar)
self.repro() self.repros_to_toolbar()
self.setFixedSize(QSize(400, 300)) self.setFixedSize(QSize(400, 300))
widget = QWidget() widget = QWidget()
@ -81,60 +80,18 @@ class PyRelacs(QMainWindow):
except AttributeError: except AttributeError:
log.debug("DAQ was not connected") log.debug("DAQ was not connected")
def repro(self): def repros_to_toolbar(self):
repos_path = pathlib.Path(__file__).parent / "repros" repro_names, file_names = self.repros.names_of_repros()
repos_names = list(repos_path.glob("*.py")) for rep, fn in zip(repro_names, file_names):
# exclude the repos.py file
repos_names = [
f.with_suffix("").name for f in repos_names if not f.name == "repos.py"
]
for rep in repos_names:
individual_repro_button = QAction(rep, self) individual_repro_button = QAction(rep, self)
individual_repro_button.setStatusTip("Button") individual_repro_button.setStatusTip("Button")
individual_repro_button.triggered.connect( individual_repro_button.triggered.connect(
lambda checked, n=rep: self.run_repro(n) lambda checked, n=rep, f=fn: self.run_repro(n, f)
) )
self.toolbar.addAction(individual_repro_button) self.toolbar.addAction(individual_repro_button)
def message(self, s): def run_repro(self, n, fn):
self.text.appendPlainText(s) self.text.appendPlainText(f"started Repro {n}, {fn}")
def run_repro(self, name_of_repo):
if self.p is None:
self.message(f"Executing process {name_of_repo}")
self.p = QProcess()
self.p.setWorkingDirectory(str(pathlib.Path(__file__).parent / "repros/"))
# log.debug(pathlib.Path(__file__).parent / "repos")
self.p.readyReadStandardOutput.connect(self.handle_stdout)
self.p.readyReadStandardError.connect(self.handle_stderr)
self.p.stateChanged.connect(self.handle_state)
self.p.finished.connect(self.process_finished)
self.p.start("python3", [f"{name_of_repo}" + ".py"])
def handle_stderr(self):
if self.p is not None:
data = self.p.readAllStandardError()
stderr = bytes(data).decode("utf8")
self.message(stderr)
def handle_stdout(self):
if self.p is not None:
data = self.p.readAllStandardOutput()
stdout = bytes(data).decode("utf8")
self.message(stdout)
def handle_state(self, state):
states = {
QProcess.ProcessState.NotRunning: "Not running",
QProcess.ProcessState.Starting: "Starting",
QProcess.ProcessState.Running: "Running",
}
state_name = states[state]
self.message(f"State changed: {state_name}")
def process_finished(self):
self.text.appendPlainText("Process finished")
self.p = None
if __name__ == "__main__": if __name__ == "__main__":

View File

View File

@ -7,10 +7,10 @@ import uldaq
from IPython import embed from IPython import embed
import numpy as np import numpy as np
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
from scipy.signal import peak_widths, welch, csd from scipy.signal import welch, csd
from scipy.signal import find_peaks from scipy.signal import find_peaks
from pyrelacs.repros.mccdac import MccDac from pyrelacs.devices.mccdac import MccDac
from pyrelacs.util.logging import config_logging from pyrelacs.util.logging import config_logging
log = config_logging() log = config_logging()
@ -20,6 +20,12 @@ faulthandler.enable()
class Calibration(MccDac): class Calibration(MccDac):
def __init__(self) -> None: def __init__(self) -> None:
super().__init__() super().__init__()
self.SAMPLERATE = 40_000.0
self.DURATION = 5
self.AMPLITUDE = 1
self.SINFREQ = 750
def run(self):
def segfault_handler(self, signum, frame): def segfault_handler(self, signum, frame):
print(f"Segmentation fault caught! Signal number: {signum}") print(f"Segmentation fault caught! Signal number: {signum}")
@ -31,8 +37,8 @@ class Calibration(MccDac):
colors = ["red", "green", "blue", "black", "yellow"] colors = ["red", "green", "blue", "black", "yellow"]
self.set_attenuation_level(db_channel1=0.0, db_channel2=0.0) self.set_attenuation_level(db_channel1=0.0, db_channel2=0.0)
# write to ananlog 1 # write to ananlog 1
t = np.arange(0, DURATION, 1 / SAMPLERATE) t = np.arange(0, self.DURATION, 1 / self.SAMPLERATE)
data = AMPLITUDE * np.sin(2 * np.pi * SINFREQ * t) data = self.AMPLITUDE * np.sin(2 * np.pi * self.SINFREQ * t)
fig, ax = plt.subplots() fig, ax = plt.subplots()
for i, db_value in enumerate(db_values): for i, db_value in enumerate(db_values):
@ -42,12 +48,12 @@ class Calibration(MccDac):
stim = self.write_analog( stim = self.write_analog(
data, data,
[0, 0], [0, 0],
SAMPLERATE, self.SAMPLERATE,
ScanOption=uldaq.ScanOption.EXTTRIGGER, ScanOption=uldaq.ScanOption.EXTTRIGGER,
) )
data_channel_one = self.read_analog( data_channel_one = self.read_analog(
[0, 0], DURATION, SAMPLERATE, ScanOption=uldaq.ScanOption.EXTTRIGGER [0, 0], self.DURATION, self.SAMPLERATE, ScanOption=uldaq.ScanOption.EXTTRIGGER
) )
time.sleep(1) time.sleep(1)
@ -79,8 +85,8 @@ class Calibration(MccDac):
def check_beat(self): def check_beat(self):
self.set_attenuation_level(db_channel1=-10.0, db_channel2=0.0) self.set_attenuation_level(db_channel1=-10.0, db_channel2=0.0)
t = np.arange(0, DURATION, 1 / SAMPLERATE) t = np.arange(0, self.DURATION, 1 / self.SAMPLERATE)
data = AMPLITUDE * np.sin(2 * np.pi * SINFREQ * t) data = self.AMPLITUDE * np.sin(2 * np.pi * self.SINFREQ * t)
# data = np.concatenate((data, data)) # data = np.concatenate((data, data))
db_values = [0.0, -5.0, -8.5, -10.0] db_values = [0.0, -5.0, -8.5, -10.0]
colors = ["red", "blue", "black", "green"] colors = ["red", "blue", "black", "green"]
@ -91,13 +97,13 @@ class Calibration(MccDac):
stim = self.write_analog( stim = self.write_analog(
data, data,
[0, 0], [0, 0],
SAMPLERATE, self.SAMPLERATE,
ScanOption=uldaq.ScanOption.EXTTRIGGER, ScanOption=uldaq.ScanOption.EXTTRIGGER,
) )
readout = self.read_analog( readout = self.read_analog(
[0, 1], [0, 1],
DURATION, self.DURATION,
SAMPLERATE, self.SAMPLERATE,
ScanOption=uldaq.ScanOption.EXTTRIGGER, ScanOption=uldaq.ScanOption.EXTTRIGGER,
) )
self.diggital_trigger() self.diggital_trigger()
@ -126,17 +132,17 @@ class Calibration(MccDac):
beat = channel1 + channel2 beat = channel1 + channel2
beat_square = beat**2 beat_square = beat**2
f, powerspec = welch(beat, fs=SAMPLERATE) f, powerspec = welch(beat, fs=self.SAMPLERATE)
powerspec = decibel(powerspec) powerspec = decibel(powerspec)
f_sq, powerspec_sq = welch(beat_square, fs=SAMPLERATE) f_sq, powerspec_sq = welch(beat_square, fs=self.SAMPLERATE)
powerspec_sq = decibel(powerspec_sq) powerspec_sq = decibel(powerspec_sq)
peaks = find_peaks(powerspec_sq, prominence=20)[0] peaks = find_peaks(powerspec_sq, prominence=20)[0]
f_stim, powerspec_stim = welch(channel1, fs=SAMPLERATE) f_stim, powerspec_stim = welch(channel1, fs=self.SAMPLERATE)
powerspec_stim = decibel(powerspec_stim) powerspec_stim = decibel(powerspec_stim)
f_in, powerspec_in = welch(channel2, fs=SAMPLERATE) f_in, powerspec_in = welch(channel2, fs=self.SAMPLERATE)
powerspec_in = decibel(powerspec_in) powerspec_in = decibel(powerspec_in)
axes[0, 0].plot( axes[0, 0].plot(
@ -243,15 +249,3 @@ def decibel(power, ref_power=1.0, min_power=1e-20):
return decibel_psd[0] return decibel_psd[0]
else: else:
return decibel_psd return decibel_psd
if __name__ == "__main__":
SAMPLERATE = 40_000.0
DURATION = 5
AMPLITUDE = 1
SINFREQ = 1000
cal = Calibration()
# cal.check_attenuator()
# cal.check_amplitude()
cal.check_beat()

View File

@ -1,86 +0,0 @@
import ctypes
import uldaq
from IPython import embed
import matplotlib.pyplot as plt
import numpy as np
from pyrelacs.util.logging import config_logging
log = config_logging()
class ReadWrite:
def __init__(self) -> None:
devices = uldaq.get_daq_device_inventory(uldaq.InterfaceType.USB)
log.debug(f"Found daq devices {len(devices)}, connecting to the first one")
self.daq_device = uldaq.DaqDevice(devices[0])
self.daq_device.connect()
log.debug("Connected")
# self.daq_device.enable_event(
# uldaq.DaqEventType.ON_DATA_AVAILABLE,
# 1,
# self.read_write,
# (uldaq.DaqEventType.ON_DATA_AVAILABLE, 1, 1),
# )
def read_write(self) -> None:
# event_type = callback_args.event_type
# event_data = callback_args.event_data
# user_data = callback_args.user_data
FS = 30_000.0
DURATION = 10
FREQUENCY = 100
time = np.arange(0, DURATION, 1 / FS)
data = 2 * np.sin(2 * np.pi * FREQUENCY * time)
buffer = ctypes.c_double * len(time)
data_c = buffer(*data)
buf = uldaq.create_float_buffer(1, len(time))
# Get the Ananlog In device and Analog Info
ai_device = self.daq_device.get_ai_device()
ai_info = ai_device.get_info()
# Get the Analog Out device
ao_device = self.daq_device.get_ao_device()
ao_info = ao_device.get_info()
er_ao = ao_device.a_out_scan(
0,
0,
uldaq.Range.BIP10VOLTS,
int(len(data)),
30_000.0,
uldaq.ScanOption.DEFAULTIO,
uldaq.AOutScanFlag.DEFAULT,
data_c,
)
er_ai = ai_device.a_in_scan(
1,
1,
uldaq.AiInputMode.SINGLE_ENDED,
uldaq.Range.BIP10VOLTS,
len(time),
FS,
uldaq.ScanOption.DEFAULTIO,
uldaq.AInScanFlag.DEFAULT,
data=buf,
)
ai_device.scan_wait(uldaq.WaitType.WAIT_UNTIL_DONE, timeout=-1)
log.debug("Scanning")
self.daq_device.disconnect()
self.daq_device.release()
plt.plot(buf)
plt.plot(data_c)
plt.show()
if __name__ == "__main__":
daq_input = ReadWrite()
daq_input.read_write()

View File

@ -1,28 +0,0 @@
import uldaq
import matplotlib.pyplot as plt
from pyrelacs.util.logging import config_logging
from .repos import MccDac
log = config_logging()
class ReadData(MccDac):
def __init__(self) -> None:
super().__init__()
def analog_in(self) -> None:
# Get the Ananlog In device and Analog Info
data = self.read_analog_daq(
[0, 0],
10,
3000.0,
)
plt.plot(data)
plt.show()
self.disconnect_dac()
if __name__ == "__main__":
daq_input = ReadData()
daq_input.analog_in()

View File

@ -1,28 +0,0 @@
import ctypes
import uldaq
from IPython import embed
from pyrelacs.repros.repos import MccDac
from pyrelacs.util.logging import config_logging
import numpy as np
import matplotlib.pyplot as plt
log = config_logging()
class Output_daq(MccDac):
def __init__(self) -> None:
super().__init__()
def write_daq(self):
log.debug("running repro")
time = np.arange(0, 10, 1 / 30_000.0)
data = 1 * np.sin(2 * np.pi * 1 * time)
self.write_analog(data, [0, 0], 30_000, ScanOption=uldaq.ScanOption.EXTTRIGGER)
self.diggital_trigger()
if __name__ == "__main__":
daq_input = Output_daq()
daq_input.write_daq()
# daq_input.trigger()

33
pyrelacs/repros/repros.py Normal file
View File

@ -0,0 +1,33 @@
import ast
import pathlib
from IPython import embed
class Repro:
def __init__(self) -> None:
pass
def run_repro(self, name: str, *args, **kwargs) -> None:
pass
def names_of_repros(self):
file_path_cur = pathlib.Path(__file__).parent
python_files = list(file_path_cur.glob("**/*.py"))
exclude_files = ["repros.py", "__init__.py"]
python_files = [f for f in python_files if f.name not in exclude_files]
repro_names = []
file_names = []
for python_file in python_files:
with open(python_file, "r") as file:
file_content = file.read()
tree = ast.parse(file_content)
class_name = [
node.name
for node in ast.walk(tree)
if isinstance(node, ast.ClassDef)
]
repro_names.extend(class_name)
file_names.append(python_file)
file.close()
return repro_names, file_names

75
pyrelacs/worker.py Normal file
View File

@ -0,0 +1,75 @@
import sys
import traceback
from PyQt6.QtCore import QRunnable, pyqtSlot, QObject, pyqtSignal
class WorkerSignals(QObject):
"""
Defines the signals available from a running worker thread.
Supported signals are:
finished
No data
error
tuple (exctype, value, traceback.format_exc() )
result
object data returned from processing, anything
progress
int indicating % progress
"""
finished = pyqtSignal()
error = pyqtSignal(tuple)
result = pyqtSignal(object)
progress = pyqtSignal(int)
class Worker(QRunnable):
"""
Worker thread
Inherits from QRunnable to handler worker thread setup, signals and wrap-up.
:param callback: The function callback to run on this worker thread. Supplied args and
kwargs will be passed through to the runner.
:type callback: function
:param args: Arguments to pass to the callback function
:param kwargs: Keywords to pass to the callback function
"""
def __init__(self, fn, *args, **kwargs):
super(Worker, self).__init__()
# Store constructor arguments (re-used for processing)
self.fn = fn
self.args = args
self.kwargs = kwargs
self.signals = WorkerSignals()
# Add the callback to our kwargs
self.kwargs["progress_callback"] = self.signals.progress
@pyqtSlot()
def run(self):
"""
Initialise the runner function with passed args, kwargs.
"""
# Retrieve args/kwargs here; and fire processing using them
try:
result = self.fn(*self.args, **self.kwargs)
except:
traceback.print_exc()
exctype, value = sys.exc_info()[:2]
self.signals.error.emit((exctype, value, traceback.format_exc()))
else:
self.signals.result.emit(result) # Return the result of the processing
finally:
self.signals.finished.emit() # Done